Changeset 6:2e6cbf140ff8

Added initial handling of cloudevents via kafka
author unexist
date Wed, 10 Feb 2021 18:29:31 +0100
parents cd96119c88bb
children 6d4d4d6b7cbd
files pom.xml src/main/java/dev/unexist/showcase/todo/adapters/MessageResource.java src/main/java/dev/unexist/showcase/todo/adapters/TodoConsumer.java src/main/java/dev/unexist/showcase/todo/adapters/TodoProducer.java src/main/java/dev/unexist/showcase/todo/adapters/TodoResource.java src/main/java/dev/unexist/showcase/todo/application/TodoResource.java src/main/resources/application.properties
diffstat 7 files changed, 386 insertions(+), 178 deletions(-) [+]
line wrap: on
line diff
--- a/pom.xml	Wed Feb 10 16:12:35 2021 +0100
+++ b/pom.xml	Wed Feb 10 18:29:31 2021 +0100
@@ -13,6 +13,8 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
+        <cloudevents.version>2.0.0.RC2</cloudevents.version>
+
         <apache-commons-lang.version>3.11</apache-commons-lang.version>
         <hg-revision-plugin.version>0.10</hg-revision-plugin.version>
         <surefire-plugin.version>3.0.0-M5</surefire-plugin.version>
@@ -60,10 +62,6 @@
         </dependency>
         <dependency>
             <groupId>io.quarkus</groupId>
-            <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.quarkus</groupId>
             <artifactId>quarkus-hibernate-validator</artifactId>
         </dependency>
 
@@ -74,6 +72,18 @@
             <type>pom</type>
         </dependency>
 
+        <!-- Cloudevents -->
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-kafka</artifactId>
+            <version>${cloudevents.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-json-jackson</artifactId>
+            <version>${cloudevents.version}</version>
+        </dependency>
+
         <!-- Apache Commons -->
         <dependency>
             <groupId>org.apache.commons</groupId>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/dev/unexist/showcase/todo/adapters/MessageResource.java	Wed Feb 10 18:29:31 2021 +0100
@@ -0,0 +1,70 @@
+/**
+ * @package Quarkus-Messaging-Showcase
+ *
+ * @file Todo resource
+ * @copyright 2020-2021 Christoph Kappel <christoph@unexist.dev>
+ * @version $Id$
+ *
+ * This program can be distributed under the terms of the GNU GPLv2.
+ * See the file LICENSE for details.
+ **/
+
+package dev.unexist.showcase.todo.adapters;
+
+import org.eclipse.microprofile.openapi.annotations.Operation;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponses;
+import org.eclipse.microprofile.openapi.annotations.tags.Tag;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+@Path("/message")
+public class MessageResource {
+
+    @Inject
+    TodoProducer todoProducer;
+
+    @Inject
+    TodoConsumer todoConsumer;
+
+    @POST
+    @Operation(summary = "Send cloudevent via kafka")
+    @Tag(name = "Message")
+    @APIResponses({
+            @APIResponse(responseCode = "204", description = "Nothing found"),
+            @APIResponse(responseCode = "500", description = "Server error")
+    })
+    public Response sendCloudEvent() {
+        this.todoProducer.send();
+
+        return Response.noContent().build();
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Receive cloudevent via kafka")
+    @Tag(name = "Message")
+    @APIResponses({
+            @APIResponse(responseCode = "204", description = "Nothing found"),
+            @APIResponse(responseCode = "500", description = "Server error")
+    })
+    public Response receiveCloudEvent() {
+        Response.ResponseBuilder builder = Response.noContent();
+
+        List<Map<String, String>> list = this.todoConsumer.receive();
+
+        if (!list.isEmpty()) {
+            builder = Response.ok(list);
+        }
+
+        return builder.build();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/dev/unexist/showcase/todo/adapters/TodoConsumer.java	Wed Feb 10 18:29:31 2021 +0100
@@ -0,0 +1,69 @@
+/**
+ * @package Quarkus-Messaging-Showcase
+ *
+ * @file Todo consumer
+ * @copyright 2020-2021 Christoph Kappel <christoph@unexist.dev>
+ * @version $Id$
+ *
+ * This program can be distributed under the terms of the GNU GPLv2.
+ * See the file LICENSE for details.
+ **/
+
+package dev.unexist.showcase.todo.adapters;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.CloudEventDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+import javax.enterprise.context.ApplicationScoped;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@ApplicationScoped
+public class TodoConsumer {
+    private KafkaConsumer<String, CloudEvent> consumer;
+
+    TodoConsumer() {
+        Properties props = new Properties();
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "todo-cloudevents-consumer");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class);
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+
+        this.consumer = new KafkaConsumer<>(props);
+
+        consumer.subscribe(Collections.singletonList("todos"));
+
+        System.out.println("Consumer started");
+    }
+
+    public List<Map<String, String>> receive() {
+        java.util.List<Map<String, String>> list = Collections.emptyList();
+
+        ConsumerRecords<String, CloudEvent> consumerRecords = consumer.poll(Duration.ofMillis(100));
+        consumerRecords.forEach(record -> {
+            Map<String, String> recordEntry = new HashMap<>();
+
+            recordEntry.put("Record Key", record.key());
+            recordEntry.put("Record value", record.value().toString());
+            recordEntry.put("Record partition",
+                    String.valueOf(record.partition()));
+            recordEntry.put("Record offset",
+                    String.valueOf(record.offset()));
+
+            list.add(recordEntry);
+        });
+
+        return list;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/dev/unexist/showcase/todo/adapters/TodoProducer.java	Wed Feb 10 18:29:31 2021 +0100
@@ -0,0 +1,76 @@
+/**
+ * @package Quarkus-Messaging-Showcase
+ *
+ * @file Todo producer
+ * @copyright 2020-2021 Christoph Kappel <christoph@unexist.dev>
+ * @version $Id$
+ *
+ * This program can be distributed under the terms of the GNU GPLv2.
+ * See the file LICENSE for details.
+ **/
+
+package dev.unexist.showcase.todo.adapters;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.message.Encoding;
+import io.cloudevents.core.v1.CloudEventBuilder;
+import io.cloudevents.jackson.JsonFormat;
+import io.cloudevents.kafka.CloudEventSerializer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import javax.enterprise.context.ApplicationScoped;
+import java.net.URI;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+@ApplicationScoped
+public class TodoProducer {
+    private KafkaProducer<String, CloudEvent> producer;
+    private CloudEventBuilder eventBuilder;
+
+    TodoProducer() {
+        Properties props = new Properties();
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "todo-cloudevents-producer");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
+        props.put(CloudEventSerializer.ENCODING_CONFIG, Encoding.STRUCTURED);
+        props.put(CloudEventSerializer.EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);
+
+        this.producer = new KafkaProducer<>(props);
+
+        this.eventBuilder = io.cloudevents.core.builder.CloudEventBuilder.v1()
+                .withSource(URI.create("https://unexist.dev"))
+                .withType("todo");
+    }
+
+    public void send() {
+        try {
+            String id = UUID.randomUUID().toString();
+            String data = "Todo event";
+
+            CloudEvent event = this.eventBuilder.newBuilder()
+                    .withId(id)
+                    .withData("text/plain", data.getBytes())
+                    .build();
+
+            RecordMetadata metadata = this.producer
+                    .send(new ProducerRecord<>("todos", id, event))
+                    .get();
+
+            System.out.println("Record sent to partition " + metadata.partition() +
+                    " with offset " + metadata.offset());
+        } catch (InterruptedException|ExecutionException e) {
+            System.out.println("Error while trying to send the record");
+            e.printStackTrace();
+        }
+
+        this.producer.flush();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/java/dev/unexist/showcase/todo/adapters/TodoResource.java	Wed Feb 10 18:29:31 2021 +0100
@@ -0,0 +1,157 @@
+/**
+ * @package Quarkus-Messaging-Showcase
+ *
+ * @file Todo resource
+ * @copyright 2020-2021 Christoph Kappel <christoph@unexist.dev>
+ * @version $Id$
+ *
+ * This program can be distributed under the terms of the GNU GPLv2.
+ * See the file LICENSE for details.
+ **/
+
+package dev.unexist.showcase.todo.adapters;
+
+import dev.unexist.showcase.todo.domain.todo.Todo;
+import dev.unexist.showcase.todo.domain.todo.TodoBase;
+import dev.unexist.showcase.todo.domain.todo.TodoService;
+import org.eclipse.microprofile.openapi.annotations.Operation;
+import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
+import org.eclipse.microprofile.openapi.annotations.media.Content;
+import org.eclipse.microprofile.openapi.annotations.media.Schema;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponses;
+import org.eclipse.microprofile.openapi.annotations.tags.Tag;
+
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Optional;
+
+@Path("/todo")
+public class TodoResource {
+
+    @Inject
+    TodoService todoService;
+
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Create new todo")
+    @Tag(name = "Todo")
+    @APIResponses({
+            @APIResponse(responseCode = "201", description = "Todo created"),
+            @APIResponse(responseCode = "406", description = "Bad data"),
+            @APIResponse(responseCode = "500", description = "Server error")
+    })
+    public Response create(TodoBase base) {
+        Response.ResponseBuilder response;
+
+        if (this.todoService.create(base)) {
+            response = Response.status(Response.Status.CREATED);
+        } else {
+            response = Response.status(Response.Status.NOT_ACCEPTABLE);
+        }
+
+        return response.build();
+    }
+
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Get all todos")
+    @Tag(name = "Todo")
+    @APIResponses({
+            @APIResponse(responseCode = "200", description = "List of todo", content =
+                @Content(schema = @Schema(type = SchemaType.ARRAY, implementation = Todo.class))),
+            @APIResponse(responseCode = "204", description = "Nothing found"),
+            @APIResponse(responseCode = "500", description = "Server error")
+    })
+    public Response getAll() {
+        List<Todo> todoList = this.todoService.getAll();
+
+        Response.ResponseBuilder response;
+
+        if (todoList.isEmpty()) {
+            response = Response.noContent();
+        } else {
+            response = Response.ok(Entity.json(todoList));
+        }
+
+        return response.build();
+    }
+
+    @GET
+    @Path("{id}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Get todo by id")
+    @Tag(name = "Todo")
+    @APIResponses({
+            @APIResponse(responseCode = "200", description = "Todo found", content =
+                @Content(schema = @Schema(implementation = Todo.class))),
+            @APIResponse(responseCode = "404", description = "Todo not found"),
+            @APIResponse(responseCode = "500", description = "Server error")
+    })
+    public Response findById(@PathParam("id") int id) {
+        Optional<Todo> result = this.todoService.findById(id);
+
+        Response.ResponseBuilder response;
+
+        if (result.isPresent()) {
+            response = Response.ok(Entity.json(result.get()));
+        } else {
+            response = Response.status(Response.Status.NOT_FOUND);
+        }
+
+        return response.build();
+    }
+
+    @PUT
+    @Path("{id}")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Update todo by id")
+    @Tag(name = "Todo")
+    @APIResponses({
+            @APIResponse(responseCode = "204", description = "Todo updated"),
+            @APIResponse(responseCode = "404", description = "Todo not found"),
+            @APIResponse(responseCode = "500", description = "Server error")
+    })
+    public Response update(@PathParam("id") int id, TodoBase base) {
+        Response.ResponseBuilder response;
+
+        if (this.todoService.update(id, base)) {
+            response = Response.noContent();
+        } else {
+            response = Response.status(Response.Status.NOT_FOUND);
+        }
+
+        return response.build();
+    }
+
+    @DELETE
+    @Path("{id}")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Operation(summary = "Delete todo by id")
+    @Tag(name = "Todo")
+    public Response delete(@PathParam("id") int id, TodoBase base) {
+        Response.ResponseBuilder response;
+
+        if (this.todoService.delete(id)) {
+            response = Response.noContent();
+        } else {
+            response = Response.status(Response.Status.NOT_FOUND);
+        }
+
+        return response.build();
+    }
+}
\ No newline at end of file
--- a/src/main/java/dev/unexist/showcase/todo/application/TodoResource.java	Wed Feb 10 16:12:35 2021 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,157 +0,0 @@
-/**
- * @package Quarkus-Messaging-Showcase
- *
- * @file Todo resource
- * @copyright 2020-2021 Christoph Kappel <christoph@unexist.dev>
- * @version $Id$
- *
- * This program can be distributed under the terms of the GNU GPLv2.
- * See the file LICENSE for details.
- **/
-
-package dev.unexist.showcase.todo.application;
-
-import dev.unexist.showcase.todo.domain.todo.Todo;
-import dev.unexist.showcase.todo.domain.todo.TodoBase;
-import dev.unexist.showcase.todo.domain.todo.TodoService;
-import org.eclipse.microprofile.openapi.annotations.Operation;
-import org.eclipse.microprofile.openapi.annotations.enums.SchemaType;
-import org.eclipse.microprofile.openapi.annotations.media.Content;
-import org.eclipse.microprofile.openapi.annotations.media.Schema;
-import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
-import org.eclipse.microprofile.openapi.annotations.responses.APIResponses;
-import org.eclipse.microprofile.openapi.annotations.tags.Tag;
-
-import javax.inject.Inject;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.util.List;
-import java.util.Optional;
-
-@Path("/todo")
-public class TodoResource {
-
-    @Inject
-    TodoService todoService;
-
-    @POST
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    @Operation(summary = "Create new todo")
-    @Tag(name = "Todo")
-    @APIResponses({
-            @APIResponse(responseCode = "201", description = "Todo created"),
-            @APIResponse(responseCode = "406", description = "Bad data"),
-            @APIResponse(responseCode = "500", description = "Server error")
-    })
-    public Response create(TodoBase base) {
-        Response.ResponseBuilder response;
-
-        if (this.todoService.create(base)) {
-            response = Response.status(Response.Status.CREATED);
-        } else {
-            response = Response.status(Response.Status.NOT_ACCEPTABLE);
-        }
-
-        return response.build();
-    }
-
-    @GET
-    @Produces(MediaType.APPLICATION_JSON)
-    @Operation(summary = "Get all todos")
-    @Tag(name = "Todo")
-    @APIResponses({
-            @APIResponse(responseCode = "200", description = "List of todo", content =
-                @Content(schema = @Schema(type = SchemaType.ARRAY, implementation = Todo.class))),
-            @APIResponse(responseCode = "204", description = "Nothing found"),
-            @APIResponse(responseCode = "500", description = "Server error")
-    })
-    public Response getAll() {
-        List<Todo> todoList = this.todoService.getAll();
-
-        Response.ResponseBuilder response;
-
-        if (todoList.isEmpty()) {
-            response = Response.noContent();
-        } else {
-            response = Response.ok(Entity.json(todoList));
-        }
-
-        return response.build();
-    }
-
-    @GET
-    @Path("{id}")
-    @Produces(MediaType.APPLICATION_JSON)
-    @Operation(summary = "Get todo by id")
-    @Tag(name = "Todo")
-    @APIResponses({
-            @APIResponse(responseCode = "200", description = "Todo found", content =
-                @Content(schema = @Schema(implementation = Todo.class))),
-            @APIResponse(responseCode = "404", description = "Todo not found"),
-            @APIResponse(responseCode = "500", description = "Server error")
-    })
-    public Response findById(@PathParam("id") int id) {
-        Optional<Todo> result = this.todoService.findById(id);
-
-        Response.ResponseBuilder response;
-
-        if (result.isPresent()) {
-            response = Response.ok(Entity.json(result.get()));
-        } else {
-            response = Response.status(Response.Status.NOT_FOUND);
-        }
-
-        return response.build();
-    }
-
-    @PUT
-    @Path("{id}")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    @Operation(summary = "Update todo by id")
-    @Tag(name = "Todo")
-    @APIResponses({
-            @APIResponse(responseCode = "204", description = "Todo updated"),
-            @APIResponse(responseCode = "404", description = "Todo not found"),
-            @APIResponse(responseCode = "500", description = "Server error")
-    })
-    public Response update(@PathParam("id") int id, TodoBase base) {
-        Response.ResponseBuilder response;
-
-        if (this.todoService.update(id, base)) {
-            response = Response.noContent();
-        } else {
-            response = Response.status(Response.Status.NOT_FOUND);
-        }
-
-        return response.build();
-    }
-
-    @DELETE
-    @Path("{id}")
-    @Consumes(MediaType.APPLICATION_JSON)
-    @Produces(MediaType.APPLICATION_JSON)
-    @Operation(summary = "Delete todo by id")
-    @Tag(name = "Todo")
-    public Response delete(@PathParam("id") int id, TodoBase base) {
-        Response.ResponseBuilder response;
-
-        if (this.todoService.delete(id)) {
-            response = Response.noContent();
-        } else {
-            response = Response.status(Response.Status.NOT_FOUND);
-        }
-
-        return response.build();
-    }
-}
\ No newline at end of file
--- a/src/main/resources/application.properties	Wed Feb 10 16:12:35 2021 +0100
+++ b/src/main/resources/application.properties	Wed Feb 10 18:29:31 2021 +0100
@@ -4,23 +4,6 @@
 #quarkus.servlet.context-path=/todo
 #quarkus.kubernetes.expose=true
 
-quarkus.datasource.health.enabled=true
-
-# Kubernetes labels (https://quarkus.io/guides/kubernetes#labels-and-annotations)
-#quarkus.kubernetes.labels.foo=BAR
-
-# Opencontainers labels (https://github.com/opencontainers/image-spec/blob/master/annotations.md)
-quarkus.jib.labels."org.opencontainers.image.created"=@timestamp@
-quarkus.jib.labels."org.opencontainers.image.authors"=@hg.author@
-quarkus.jib.labels."org.opencontainers.image.url"=https://unexist.dev
-#quarkus.jib.labels."org.opencontainers.image.documentation"=DOCS
-#quarkus.jib.labels."org.opencontainers.image.source"=SRC
-quarkus.jib.labels."org.opencontainers.image.version"=@project.version@
-quarkus.jib.labels."org.opencontainers.image.revision"=@hg.rev@
-quarkus.jib.labels."org.opencontainers.image.licenses"=GPLv2
-quarkus.jib.labels."org.opencontainers.image.title"=@project.artifactId@
-quarkus.jib.labels."org.opencontainers.image.description"=@project.name@
-
 # OpenAPI3 specifications (https://quarkus.io/blog/openapi-for-everyone)
 mp.openapi.extensions.smallrye.info.title=OpenAPI for @project.artifactId@
 %dev.mp.openapi.extensions.smallrye.info.title=OpenAPI for @project.artifactId@ [development]