Changeset 31:381fd23551a5
Fixed lifecycle of extension
author | unexist |
---|---|
date | Wed, 18 Aug 2021 16:48:38 +0200 |
parents | e8d95600d0a2 |
children | f24ffacf0547 |
files | event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitLifecycle.java event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRuntimeConfig.java |
diffstat | 4 files changed, 134 insertions(+), 17 deletions(-) [+] |
line wrap: on
line diff
--- a/event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java Wed Aug 18 08:36:10 2021 +0200 +++ b/event-split-extension-parent/deployment/src/main/java/dev/unexist/showcase/eventsplit/EventSplitProcessor.java Wed Aug 18 16:48:38 2021 +0200 @@ -12,7 +12,6 @@ package dev.unexist.showcase.eventsplit; import io.quarkus.arc.deployment.AdditionalBeanBuildItem; -import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.FeatureBuildItem; @@ -25,7 +24,7 @@ } @BuildStep - void build(BuildProducer<AdditionalBeanBuildItem> additionalBean) { - additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(EventSplitDispatcher.class)); + AdditionalBeanBuildItem beans() { + return new AdditionalBeanBuildItem(EventSplitLifecycle.class, EventSplitDispatcher.class); } }
--- a/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java Wed Aug 18 08:36:10 2021 +0200 +++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java Wed Aug 18 16:48:38 2021 +0200 @@ -14,38 +14,76 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; +import io.vertx.kafka.client.consumer.KafkaConsumer; import org.apache.commons.lang3.StringUtils; -import org.eclipse.microprofile.reactive.messaging.Channel; import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.spi.CDI; import javax.inject.Inject; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; @ApplicationScoped public class EventSplitDispatcher { private final static ObjectMapper MAPPER = new ObjectMapper(); + private KafkaConsumer<String, String> consumer; + + @Inject + EventSplitRuntimeConfig config; @Inject EventBus bus; - EventSplitDispatcher() { - System.out.println("Init event dispatcher"); - } + /** + * Start the dispatcher + **/ + + public void start() { + Vertx vertx = CDI.current().select(Vertx.class).get(); - @Channel("todo_in") - private void dispatchEvents(String message) { - String typeName = StringUtils.EMPTY; + System.out.println("Init event dispatcher: vertx=" + vertx + ", config=" + this.config); + + Objects.requireNonNull(this.config, "Config cannot be null"); + + /* Create consumer */ + Map<String, String> consumerConfig = new HashMap<>(); - System.out.println("Handle message " + message); + consumerConfig.put("bootstrap.servers", this.config.brokerServer); + consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + consumerConfig.put("group.id", "eventSplitGroup"); + consumerConfig.put("auto.offset.reset", "earliest"); + consumerConfig.put("enable.auto.commit", "false"); - try { - JsonNode json = MAPPER.readTree(message); + this.consumer = KafkaConsumer.create(vertx, consumerConfig); - typeName = json.get("type").asText(); - } catch (JsonProcessingException e) { - System.out.println("Error reading JSON " + e); + /* Subscribe to topics */ + if (null != this.config.topics) { + System.out.println("Subscribe to topics: " + this.config.topics); + + this.consumer.subscribe(new HashSet<>(Arrays.asList(this.config.topics.split(",")))); } - this.bus.send(typeName, message); + /* Set up handler */ + this.consumer.handler(record -> { + String typeName = StringUtils.EMPTY; + + System.out.println("Handle message " + record.value()); + + try { + JsonNode json = MAPPER.readTree(record.value()); + + typeName = json.get("type").asText(); + } catch (JsonProcessingException e) { + System.out.println("Error reading JSON " + e); + } + + this.bus.send(typeName, record.value()); + }); } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitLifecycle.java Wed Aug 18 16:48:38 2021 +0200 @@ -0,0 +1,46 @@ +/** + * @package Showcase + * + * @file Extension lifecycle + * @copyright 2021 Christoph Kappel <christoph@unexist.dev> + * @version Id + * + * This program can be distributed under the terms of the Apache License v2.0. + * See the file LICENSE for details. + **/ + +package dev.unexist.showcase.eventsplit; + +import io.quarkus.runtime.StartupEvent; + +import javax.annotation.Priority; +import javax.enterprise.context.Dependent; +import javax.enterprise.event.Observes; +import javax.enterprise.inject.spi.DefinitionException; +import javax.enterprise.inject.spi.DeploymentException; +import javax.inject.Inject; +import javax.interceptor.Interceptor; + +@Dependent +public class EventSplitLifecycle { + + @Inject + EventSplitDispatcher dispatcher; + + /** + * Handle application start + * + * @param event The handled event + **/ + + void onApplicationStart(@Observes @Priority(Interceptor.Priority.LIBRARY_BEFORE) StartupEvent event) { + try { + this.dispatcher.start(); + } catch (Exception e) { + if (e instanceof DeploymentException || e instanceof DefinitionException) { + throw e; + } + throw new DeploymentException(e); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitRuntimeConfig.java Wed Aug 18 16:48:38 2021 +0200 @@ -0,0 +1,34 @@ +/** + * @package Showcase + * + * @file Event split config + * @copyright 2021 Christoph Kappel <christoph@unexist.dev> + * @version $Id$ + * + * This program can be distributed under the terms of the Apache License v2.0. + * See the file LICENSE for details. + **/ + +package dev.unexist.showcase.eventsplit; + +import io.quarkus.runtime.annotations.ConfigItem; +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(phase = ConfigPhase.RUN_TIME, name = "event-split") +public class EventSplitRuntimeConfig { + + /** + * List of topics to listen to + **/ + + @ConfigItem(name = "topics", defaultValue = "", defaultValueDocumentation = "List of topics") + public String topics; + + /** + * List of topics to listen to + **/ + + @ConfigItem(name = "broker.server", defaultValue = "localhost:9092", defaultValueDocumentation = "Host of Kafka broker with port") + public String brokerServer; +}