Changeset 6:49fe7419634d

Added event consumer
author unexist
date Fri, 30 Jul 2021 16:50:32 +0200
parents eb955b6b261d
children 3b8e8d76dd23
files src/main/java/dev/unexist/showcase/todo/adapter/TodoSink.java
diffstat 1 files changed, 34 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/src/main/java/dev/unexist/showcase/todo/adapter/TodoSink.java	Fri Jul 30 16:50:19 2021 +0200
+++ b/src/main/java/dev/unexist/showcase/todo/adapter/TodoSink.java	Fri Jul 30 16:50:32 2021 +0200
@@ -11,9 +11,13 @@
 package dev.unexist.showcase.todo.adapter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import dev.unexist.showcase.todo.domain.todo.TodoBase;
-import dev.unexist.showcase.todo.domain.todo.TodoService;
+import io.quarkus.vertx.ConsumeEvent;
+import io.vertx.mutiny.core.eventbus.EventBus;
+import org.apache.commons.lang3.StringUtils;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
 import org.eclipse.microprofile.reactive.messaging.Incoming;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,22 +29,43 @@
 public class TodoSink {
     private static final Logger LOGGER = LoggerFactory.getLogger(TodoSink.class);
 
+    static final ObjectMapper MAPPER = new ObjectMapper();
+
     @Inject
-    TodoService todoService;
+    EventBus bus;
 
-    private final ObjectMapper mapper = new ObjectMapper();
+    @Channel("todo_out")
+    Emitter<String> emitter;
 
-    @Incoming("todo-sink")
-    public void consumeTodos(String json) {
-        TodoBase todo = null;
+    @Incoming("todo_in")
+    public void consumeAll(String message) {
+        String typeName = StringUtils.EMPTY;
+
+        LOGGER.info("consumeAll: {}", message);
 
         try {
-            todo = this.mapper.readValue(json, TodoBase.class);
+            JsonNode json = MAPPER.readTree(message);
+
+            typeName = json.get("type").asText();
         } catch (JsonProcessingException e) {
             LOGGER.error("Error reading JSON", e);
         }
 
-        todoService.create(todo);
+        this.bus.send(typeName, message);
+    }
+
+    @ConsumeEvent("todo_in1")
+    public void consumeTodoIn1(String message) {
+        LOGGER.info("consumeTest1: {}", message);
+
+        this.emitter.send(message);
+    }
+
+    @ConsumeEvent("todo_in2")
+    public void consumeTodoIn2(String message) {
+        LOGGER.info("consumeTest2: {}", message);
+
+        this.emitter.send(message);
     }
 }