Changeset 31:381fd23551a5

Fixed lifecycle of extension
author unexist
date Wed, 18 Aug 2021 16:48:38 +0200
parents e8d95600d0a2
children f24ffacf0547
files event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitLifecycle.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRuntimeConfig.java
diffstat 4 files changed, 134 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java	Wed Aug 18 08:36:10 2021 +0200
+++ b/event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java	Wed Aug 18 16:48:38 2021 +0200
@@ -12,7 +12,6 @@
 package dev.unexist.showcase.eventsplit;
 
 import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
-import io.quarkus.deployment.annotations.BuildProducer;
 import io.quarkus.deployment.annotations.BuildStep;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
 
@@ -25,7 +24,7 @@
     }
 
     @BuildStep
-    void build(BuildProducer<AdditionalBeanBuildItem> additionalBean) {
-        additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(EventSplitDispatcher.class));
+    AdditionalBeanBuildItem beans() {
+        return new AdditionalBeanBuildItem(EventSplitLifecycle.class, EventSplitDispatcher.class);
     }
 }
--- a/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java	Wed Aug 18 08:36:10 2021 +0200
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java	Wed Aug 18 16:48:38 2021 +0200
@@ -14,38 +14,76 @@
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.EventBus;
+import io.vertx.kafka.client.consumer.KafkaConsumer;
 import org.apache.commons.lang3.StringUtils;
-import org.eclipse.microprofile.reactive.messaging.Channel;
 
 import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.spi.CDI;
 import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
 
 @ApplicationScoped
 public class EventSplitDispatcher {
     private final static ObjectMapper MAPPER = new ObjectMapper();
+    private KafkaConsumer<String, String> consumer;
+
+    @Inject
+    EventSplitRuntimeConfig config;
 
     @Inject
     EventBus bus;
 
-    EventSplitDispatcher() {
-        System.out.println("Init event dispatcher");
-    }
+    /**
+     * Start the dispatcher
+     **/
+
+    public void start() {
+        Vertx vertx = CDI.current().select(Vertx.class).get();
 
-    @Channel("todo_in")
-    private void dispatchEvents(String message) {
-        String typeName = StringUtils.EMPTY;
+        System.out.println("Init event dispatcher: vertx=" + vertx + ", config=" + this.config);
+
+        Objects.requireNonNull(this.config, "Config cannot be null");
+
+        /* Create consumer */
+        Map<String, String> consumerConfig = new HashMap<>();
 
-        System.out.println("Handle message " + message);
+        consumerConfig.put("bootstrap.servers", this.config.brokerServer);
+        consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        consumerConfig.put("group.id", "eventSplitGroup");
+        consumerConfig.put("auto.offset.reset", "earliest");
+        consumerConfig.put("enable.auto.commit", "false");
 
-        try {
-            JsonNode json = MAPPER.readTree(message);
+        this.consumer = KafkaConsumer.create(vertx, consumerConfig);
 
-            typeName = json.get("type").asText();
-        } catch (JsonProcessingException e) {
-            System.out.println("Error reading JSON " + e);
+        /* Subscribe to topics */
+        if (null != this.config.topics) {
+            System.out.println("Subscribe to topics: " + this.config.topics);
+
+            this.consumer.subscribe(new HashSet<>(Arrays.asList(this.config.topics.split(","))));
         }
 
-        this.bus.send(typeName, message);
+        /* Set up handler */
+        this.consumer.handler(record -> {
+            String typeName = StringUtils.EMPTY;
+
+            System.out.println("Handle message " + record.value());
+
+            try {
+                JsonNode json = MAPPER.readTree(record.value());
+
+                typeName = json.get("type").asText();
+            } catch (JsonProcessingException e) {
+                System.out.println("Error reading JSON " + e);
+            }
+
+            this.bus.send(typeName, record.value());
+        });
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitLifecycle.java	Wed Aug 18 16:48:38 2021 +0200
@@ -0,0 +1,46 @@
+/**
+ * @package Showcase
+ *
+ * @file Extension lifecycle
+ * @copyright 2021 Christoph Kappel <christoph@unexist.dev>
+ * @version Id
+ *
+ * This program can be distributed under the terms of the Apache License v2.0.
+ * See the file LICENSE for details.
+ **/
+ 
+package dev.unexist.showcase.eventsplit;
+
+import io.quarkus.runtime.StartupEvent;
+
+import javax.annotation.Priority;
+import javax.enterprise.context.Dependent;
+import javax.enterprise.event.Observes;
+import javax.enterprise.inject.spi.DefinitionException;
+import javax.enterprise.inject.spi.DeploymentException;
+import javax.inject.Inject;
+import javax.interceptor.Interceptor;
+
+@Dependent
+public class EventSplitLifecycle {
+
+    @Inject
+    EventSplitDispatcher dispatcher;
+
+    /**
+     * Handle application start
+     *
+     * @param  event  The handled event
+     **/
+
+    void onApplicationStart(@Observes @Priority(Interceptor.Priority.LIBRARY_BEFORE) StartupEvent event) {
+        try {
+            this.dispatcher.start();
+        } catch (Exception e) {
+            if (e instanceof DeploymentException || e instanceof DefinitionException) {
+                throw e;
+            }
+            throw new DeploymentException(e);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRuntimeConfig.java	Wed Aug 18 16:48:38 2021 +0200
@@ -0,0 +1,34 @@
+/**
+ * @package Showcase
+ *
+ * @file Event split config
+ * @copyright 2021 Christoph Kappel <christoph@unexist.dev>
+ * @version $Id$
+ *
+ * This program can be distributed under the terms of the Apache License v2.0.
+ * See the file LICENSE for details.
+ **/
+ 
+package dev.unexist.showcase.eventsplit;
+
+import io.quarkus.runtime.annotations.ConfigItem;
+import io.quarkus.runtime.annotations.ConfigPhase;
+import io.quarkus.runtime.annotations.ConfigRoot;
+
+@ConfigRoot(phase = ConfigPhase.RUN_TIME, name = "event-split")
+public class EventSplitRuntimeConfig {
+
+    /**
+     * List of topics to listen to
+     **/
+
+    @ConfigItem(name = "topics", defaultValue = "", defaultValueDocumentation = "List of topics")
+    public String topics;
+
+    /**
+     * List of topics to listen to
+     **/
+
+    @ConfigItem(name = "broker.server", defaultValue = "localhost:9092", defaultValueDocumentation = "Host of Kafka broker with port")
+    public String brokerServer;
+}