Changeset 40:bdb294219524

Added CloudEvent handling and event registry to the extension
author unexist
date Tue, 24 Aug 2021 11:42:50 +0200
parents 860c4b7ce505
children ed948d466af9
files event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java event-split-extension-parent/runtime/pom.xml event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitEvent.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRecorder.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRegistry.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRuntimeConfig.java
diffstat 7 files changed, 227 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java	Thu Aug 19 14:40:21 2021 +0200
+++ b/event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java	Tue Aug 24 11:42:50 2021 +0200
@@ -12,8 +12,15 @@
 package dev.unexist.showcase.eventsplit;
 
 import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
+import io.quarkus.arc.deployment.BeanArchiveIndexBuildItem;
+import io.quarkus.arc.deployment.BeanContainerBuildItem;
 import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.annotations.Record;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
+import org.jboss.jandex.ClassInfo;
+import org.jboss.jandex.DotName;
+
+import static io.quarkus.deployment.annotations.ExecutionTime.RUNTIME_INIT;
 
 public class EventSplitProcessor {
     private static final String EVENT_SPLIT = "event-split";
@@ -24,7 +31,25 @@
     }
 
     @BuildStep
-    AdditionalBeanBuildItem beans() {
-        return new AdditionalBeanBuildItem(EventSplitLifecycle.class, EventSplitDispatcher.class);
+    AdditionalBeanBuildItem registerBeans() {
+        return AdditionalBeanBuildItem.builder()
+                .addBeanClass(EventSplitLifecycle.class)
+                .addBeanClass(EventSplitDispatcher.class)
+                .addBeanClass(EventSplitRegistry.class)
+                .build();
+    }
+
+    @BuildStep
+    @Record(RUNTIME_INIT)
+    void discoverEvents(EventSplitRecorder recorder,
+                        BeanArchiveIndexBuildItem beanArchiveIndex,
+                        BeanContainerBuildItem beanContainer)
+    {
+        beanArchiveIndex.getIndex()
+                .getAllKnownSubclasses(
+                        DotName.createSimple(EventSplitEvent.class.getName())).stream()
+                            .map(ClassInfo::name)
+                            .map(DotName::toString)
+                            .forEach(eventType -> recorder.addEventType(beanContainer.getValue(), eventType));
     }
 }
--- a/event-split-extension-parent/runtime/pom.xml	Thu Aug 19 14:40:21 2021 +0200
+++ b/event-split-extension-parent/runtime/pom.xml	Tue Aug 24 11:42:50 2021 +0200
@@ -19,6 +19,11 @@
         <!-- Util -->
         <apache-commons-lang.version>3.12.0</apache-commons-lang.version>
         <jackson-databind.version>2.13.0-rc1</jackson-databind.version>
+
+        <!-- Cloudevents -->
+        <cloudevents.version>2.2.0</cloudevents.version>
+        <cloudevents-api.version>2.2.0</cloudevents-api.version>
+        <guava.version>30.1.1-jre</guava.version>
     </properties>
 
     <dependencies>
@@ -47,6 +52,28 @@
             <version>${smallrye-reactive-messaging-vertx-eventbus.version}</version>
         </dependency>
 
+        <!-- Cloudevents -->
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-core</artifactId>
+            <version>${cloudevents.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-api</artifactId>
+            <version>${cloudevents-api.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-kafka</artifactId>
+            <version>${cloudevents.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-json-jackson</artifactId>
+            <version>${cloudevents.version}</version>
+        </dependency>
+
         <!-- Util -->
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -58,6 +85,11 @@
             <artifactId>jackson-databind</artifactId>
             <version>${jackson-databind.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
--- a/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java	Thu Aug 19 14:40:21 2021 +0200
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java	Tue Aug 24 11:42:50 2021 +0200
@@ -12,12 +12,21 @@
 package dev.unexist.showcase.eventsplit;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.google.common.base.CaseFormat;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.core.data.PojoCloudEventData;
+import io.cloudevents.jackson.PojoCloudEventDataMapper;
 import io.vertx.core.Vertx;
 import io.vertx.core.eventbus.EventBus;
 import io.vertx.kafka.client.consumer.KafkaConsumer;
 import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
+import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +39,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 @ApplicationScoped
 public class EventSplitDispatcher {
@@ -40,9 +50,26 @@
     EventSplitRuntimeConfig runtimeConfig;
 
     @Inject
+    EventSplitRegistry registry;
+
+    @Inject
     EventBus bus;
 
-    private KafkaConsumer<String, String> consumer;
+    private final ObjectMapper mapper;
+    private KafkaConsumer<String, ?> consumer;
+
+    /**
+     * Constructor
+     **/
+
+    EventSplitDispatcher() {
+        this.mapper = new ObjectMapper();
+
+        mapper.registerModule(new JavaTimeModule())
+                .registerModule(io.cloudevents.jackson.JsonFormat.getCloudEventJacksonModule())
+                .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
 
     /**
      * Start the dispatcher
@@ -60,7 +87,13 @@
 
         consumerConfig.put("bootstrap.servers", this.runtimeConfig.brokerServer);
         consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+        if (BooleanUtils.isTrue(this.runtimeConfig.useCloudEvents)) {
+            consumerConfig.put("value.deserializer", "io.cloudevents.kafka.CloudEventDeserializer");
+        } else {
+            consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        }
+
         consumerConfig.put("group.id", "eventSplitGroup");
         consumerConfig.put("auto.offset.reset", "earliest");
         consumerConfig.put("enable.auto.commit", "false");
@@ -75,7 +108,13 @@
         }
 
         /* Set up handler */
-        this.consumer.handler(this::handleMessage);
+        this.consumer.handler(record -> {
+            if (BooleanUtils.isTrue(this.runtimeConfig.useCloudEvents)) {
+                this.handleMessageAsCloudEvent((KafkaConsumerRecord<String, CloudEvent>) record);
+            } else {
+                this.handleMessage((KafkaConsumerRecord<String, String>) record);
+            }
+        });
     }
 
     /**
@@ -99,4 +138,34 @@
 
         this.bus.send(typeName, record.value());
     }
+
+    /**
+     * Handle the messages from the consumer
+     *
+     * @param  record  The {@link KafkaConsumerRecord} to handle
+     **/
+
+    private void handleMessageAsCloudEvent(KafkaConsumerRecord<String, CloudEvent> record) {
+        CloudEvent cloudEvent = record.value();
+
+        LOGGER.info("Handle message type {}", cloudEvent.getType());
+
+        Optional<Class<EventSplitEvent>> candidate = this.registry.findType(cloudEvent.getType());
+
+        if (candidate.isPresent()) {
+            PojoCloudEventData<?> cloudEventData = CloudEventUtils.mapData(
+                    cloudEvent,
+                    PojoCloudEventDataMapper.from(this.mapper, candidate.get()));
+
+            if (null != cloudEventData && null != cloudEventData.getValue()) {
+                String typeName = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_HYPHEN, cloudEvent.getType());
+
+                LOGGER.info("Sent to {}", typeName);
+
+                this.bus.send(typeName, cloudEventData.getValue());
+            }
+        } else {
+            LOGGER.error("Event type {} not found", cloudEvent.getType());
+        }
+    }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitEvent.java	Tue Aug 24 11:42:50 2021 +0200
@@ -0,0 +1,15 @@
+/**
+ * @package Showcase
+ *
+ * @file Dispatched event
+ * @copyright 2021 Christoph Kappel <christoph@unexist.dev>
+ * @version $Id$
+ *
+ * This program can be distributed under the terms of the Apache License v2.0.
+ * See the file LICENSE for details.
+ **/
+ 
+package dev.unexist.showcase.eventsplit;
+
+public class EventSplitEvent {
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRecorder.java	Tue Aug 24 11:42:50 2021 +0200
@@ -0,0 +1,36 @@
+/**
+ * @package Showcase
+ *
+ * @file Dispatched event recorder
+ * @copyright 2021 Christoph Kappel <christoph@unexist.dev>
+ * @version $Id\$
+ *
+ * This program can be distributed under the terms of the Apache License v2.0.
+ * See the file LICENSE for details.
+ **/
+ 
+package dev.unexist.showcase.eventsplit;
+
+import io.quarkus.arc.runtime.BeanContainer;
+import io.quarkus.runtime.annotations.Recorder;
+
+@Recorder
+public class EventSplitRecorder {
+    public void addEventType(BeanContainer beanContainer, String eventType) {
+        EventSplitRegistry registry =
+                beanContainer.instance(EventSplitRegistry.class);
+
+        registry.add(createClass(eventType));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Class<? extends EventSplitEvent> createClass(String name) {
+        try {
+            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+
+            return (Class<? extends EventSplitEvent>) classLoader.loadClass(name);
+        } catch (ClassNotFoundException ex) {
+            throw new IllegalStateException(ex);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRegistry.java	Tue Aug 24 11:42:50 2021 +0200
@@ -0,0 +1,37 @@
+/**
+ * @package Showcase
+ *
+ * @file Dispatched event registry
+ * @copyright 2021 Christoph Kappel <christoph@unexist.dev>
+ * @version $Id\$
+ *
+ * This program can be distributed under the terms of the Apache License v2.0.
+ * See the file LICENSE for details.
+ **/
+ 
+package dev.unexist.showcase.eventsplit;
+
+import javax.enterprise.context.ApplicationScoped;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+@ApplicationScoped
+public class EventSplitRegistry {
+    private final ConcurrentMap<String, Class<? extends EventSplitEvent>> eventTypes;
+
+    public EventSplitRegistry() {
+        this.eventTypes = new ConcurrentHashMap<>();
+    }
+
+    void add(Class<? extends EventSplitEvent> eventType) {
+        String name = eventType.getSimpleName();
+
+        eventTypes.put(name, eventType);
+    }
+
+    @SuppressWarnings("unchecked")
+    <T extends EventSplitEvent> Optional<Class<T>> findType(String name) {
+        return Optional.ofNullable((Class<T>) eventTypes.get(name));
+    }
+}
--- a/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRuntimeConfig.java	Thu Aug 19 14:40:21 2021 +0200
+++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRuntimeConfig.java	Tue Aug 24 11:42:50 2021 +0200
@@ -34,4 +34,12 @@
     @ConfigItem(name = "broker.server", defaultValue = "localhost:9092",
         defaultValueDocumentation = "Address and port of the Kafka broker")
     public String brokerServer;
+
+    /**
+     * Enable cloudevents
+     **/
+
+    @ConfigItem(name = "cloudevents.enabled", defaultValue = "false",
+        defaultValueDocumentation = "Whether to enable cloudevents")
+    public Boolean useCloudEvents;
 }