Changeset 36:fbdf12e636cb
Updated logging and split consumer handling
author | unexist |
---|---|
date | Thu, 19 Aug 2021 13:26:58 +0200 |
parents | d5a8a2f4e912 |
children | 69db3b1e853d |
files | event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java |
diffstat | 1 files changed, 32 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java Thu Aug 19 13:25:00 2021 +0200 +++ b/event-split-extension-parent/runtime/src/main/java/dev/unexist/showcase/eventsplit/EventSplitDispatcher.java Thu Aug 19 13:26:58 2021 +0200 @@ -17,7 +17,10 @@ import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; import io.vertx.kafka.client.consumer.KafkaConsumer; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.inject.spi.CDI; @@ -30,11 +33,13 @@ @ApplicationScoped public class EventSplitDispatcher { + private static final Logger LOGGER = LoggerFactory.getLogger(EventSplitDispatcher.class); private final static ObjectMapper MAPPER = new ObjectMapper(); + private KafkaConsumer<String, String> consumer; @Inject - EventSplitRuntimeConfig config; + EventSplitRuntimeConfig runtimeConfig; @Inject EventBus bus; @@ -44,16 +49,16 @@ **/ public void start() { + Objects.requireNonNull(this.runtimeConfig, "Config cannot be null"); + Vertx vertx = CDI.current().select(Vertx.class).get(); - System.out.println("Init event dispatcher: vertx=" + vertx + ", config=" + this.config); - - Objects.requireNonNull(this.config, "Config cannot be null"); + LOGGER.info("Init event dispatcher: vertx={}, config={}", vertx, this.runtimeConfig); /* Create consumer */ Map<String, String> consumerConfig = new HashMap<>(); - consumerConfig.put("bootstrap.servers", this.config.brokerServer); + consumerConfig.put("bootstrap.servers", this.runtimeConfig.brokerServer); consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerConfig.put("group.id", "eventSplitGroup"); @@ -63,27 +68,35 @@ this.consumer = KafkaConsumer.create(vertx, consumerConfig); /* Subscribe to topics */ - if (null != this.config.topics) { - System.out.println("Subscribe to topics: " + this.config.topics); + if (null != this.runtimeConfig.topics) { + this.consumer.subscribe(new HashSet<>(Arrays.asList(this.runtimeConfig.topics.split(",")))); - this.consumer.subscribe(new HashSet<>(Arrays.asList(this.config.topics.split(",")))); + LOGGER.info("Subscribed to topics: " + this.runtimeConfig.topics); } /* Set up handler */ - this.consumer.handler(record -> { - String typeName = StringUtils.EMPTY; + this.consumer.handler(this::handleMessage); + } - System.out.println("Handle message " + record.value()); + /** + * Handle the messages from the consumer + * + * @param record The {@link KafkaConsumerRecord} to handle + **/ - try { - JsonNode json = MAPPER.readTree(record.value()); + private void handleMessage(KafkaConsumerRecord<String, String> record) { + String typeName = StringUtils.EMPTY; - typeName = json.get("type").asText(); - } catch (JsonProcessingException e) { - System.out.println("Error reading JSON " + e); - } + System.out.println("Handle message " + record.value()); + + try { + JsonNode json = MAPPER.readTree(record.value()); - this.bus.send(typeName, record.value()); - }); + typeName = json.get("type").asText(); + } catch (JsonProcessingException e) { + System.out.println("Error reading JSON " + e); + } + + this.bus.send(typeName, record.value()); } }