Changeset 36:fbdf12e636cb

Updated logging and split consumer handling
author unexist
date Thu, 19 Aug 2021 13:26:58 +0200
parents d5a8a2f4e912
children 69db3b1e853d
files event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java
diffstat 1 files changed, 32 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java	Thu Aug 19 13:25:00 2021 +0200
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java	Thu Aug 19 13:26:58 2021 +0200
@@ -17,7 +17,10 @@
 import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.EventBus;
 import io.vertx.kafka.client.consumer.KafkaConsumer;
+import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.enterprise.context.ApplicationScoped;
 import javax.enterprise.inject.spi.CDI;
@@ -30,11 +33,13 @@
 
 @ApplicationScoped
 public class EventSplitDispatcher {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventSplitDispatcher.class);
     private final static ObjectMapper MAPPER = new ObjectMapper();
+
     private KafkaConsumer<String, String> consumer;
 
     @Inject
-    EventSplitRuntimeConfig config;
+    EventSplitRuntimeConfig runtimeConfig;
 
     @Inject
     EventBus bus;
@@ -44,16 +49,16 @@
      **/
 
     public void start() {
+        Objects.requireNonNull(this.runtimeConfig, "Config cannot be null");
+
         Vertx vertx = CDI.current().select(Vertx.class).get();
 
-        System.out.println("Init event dispatcher: vertx=" + vertx + ", config=" + this.config);
-
-        Objects.requireNonNull(this.config, "Config cannot be null");
+        LOGGER.info("Init event dispatcher: vertx={}, config={}", vertx, this.runtimeConfig);
 
         /* Create consumer */
         Map<String, String> consumerConfig = new HashMap<>();
 
-        consumerConfig.put("bootstrap.servers", this.config.brokerServer);
+        consumerConfig.put("bootstrap.servers", this.runtimeConfig.brokerServer);
         consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         consumerConfig.put("group.id", "eventSplitGroup");
@@ -63,27 +68,35 @@
         this.consumer = KafkaConsumer.create(vertx, consumerConfig);
 
         /* Subscribe to topics */
-        if (null != this.config.topics) {
-            System.out.println("Subscribe to topics: " + this.config.topics);
+        if (null != this.runtimeConfig.topics) {
+            this.consumer.subscribe(new HashSet<>(Arrays.asList(this.runtimeConfig.topics.split(","))));
 
-            this.consumer.subscribe(new HashSet<>(Arrays.asList(this.config.topics.split(","))));
+            LOGGER.info("Subscribed to topics: " + this.runtimeConfig.topics);
         }
 
         /* Set up handler */
-        this.consumer.handler(record -> {
-            String typeName = StringUtils.EMPTY;
+        this.consumer.handler(this::handleMessage);
+    }
 
-            System.out.println("Handle message " + record.value());
+    /**
+     * Handle the messages from the consumer
+     *
+     * @param  record  The {@link KafkaConsumerRecord} to handle
+     **/
 
-            try {
-                JsonNode json = MAPPER.readTree(record.value());
+    private void handleMessage(KafkaConsumerRecord<String, String> record) {
+        String typeName = StringUtils.EMPTY;
 
-                typeName = json.get("type").asText();
-            } catch (JsonProcessingException e) {
-                System.out.println("Error reading JSON " + e);
-            }
+        System.out.println("Handle message " + record.value());
+
+        try {
+            JsonNode json = MAPPER.readTree(record.value());
 
-            this.bus.send(typeName, record.value());
-        });
+            typeName = json.get("type").asText();
+        } catch (JsonProcessingException e) {
+            System.out.println("Error reading JSON " + e);
+        }
+
+        this.bus.send(typeName, record.value());
     }
 }