From 9d5cfccbce638e2f08ef9ab31f106387607f68a9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Chris=20Suszy=C5=84ski?=
Date: Thu, 2 Mar 2023 19:55:25 +0100
Subject: [PATCH 01/16] Events endpoint PoC
---
.../knative/showcase/events/Endpoint.java | 98 +++++++++++++++++++
1 file changed, 98 insertions(+)
create mode 100644 quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
diff --git a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
new file mode 100644
index 0000000..777c6fc
--- /dev/null
+++ b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
@@ -0,0 +1,98 @@
+package com.redhat.openshift.knative.showcase.events;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.http.restful.ws.StructuredEncoding;
+import io.cloudevents.jackson.JsonFormat;
+import io.quarkus.runtime.StartupEvent;
+import io.smallrye.mutiny.Multi;
+import org.eclipse.microprofile.openapi.annotations.Operation;
+import org.jboss.resteasy.reactive.RestStreamElementType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.random.RandomGenerator;
+
+@Path("events")
+@ApplicationScoped
+class Endpoint {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Endpoint.class);
+ private final List events = new ArrayList<>();
+
+ void init(@Observes StartupEvent ignored, ObjectMapper om)
+ throws JsonProcessingException {
+ var rg = RandomGenerator.getDefault();
+ for (int i = 0; i < 3; i++) {
+ var s = Score.random(rg);
+ events.add(CloudEventBuilder.v1()
+ .withId(String.valueOf(i))
+ .withSource(URI.create("//localhost/dev"))
+ .withType(Endpoint.class.getName())
+ .withData(MediaType.APPLICATION_JSON, om.writeValueAsBytes(s))
+ .build());
+ }
+ }
+
+ @GET
+ @Operation(summary = "Retrieves all registered events as a JSON stream")
+ @RestStreamElementType(MediaType.APPLICATION_JSON)
+ @StructuredEncoding(JsonFormat.CONTENT_TYPE)
+ public Multi events() {
+ return Multi.createFrom().iterable(events);
+ }
+
+ @GET
+ @Path("last")
+ @Produces(MediaType.APPLICATION_JSON)
+ @StructuredEncoding(JsonFormat.CONTENT_TYPE)
+ public CloudEvent last() {
+ return events.get(events.size() - 1);
+ }
+
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Operation(summary = "Receives a CloudEvent and stores it")
+ public void receive(CloudEvent event) {
+ events.add(event);
+ LOGGER.info("Received event: {}", event);
+ }
+
+ private static final class Score {
+ @JsonProperty
+ Play play;
+ @JsonProperty
+ int score;
+
+ static Score random(RandomGenerator rg) {
+ var s = new Score();
+ s.score = rg.nextInt(1_000);
+ s.play = new Play();
+ s.play.id = new UUID(rg.nextLong(), rg.nextLong()).toString();
+ s.play.game = rg.nextInt(300);
+ return s;
+ }
+ }
+
+ private static class Play {
+ @JsonProperty
+ String id;
+ @JsonProperty
+ Integer game;
+ }
+}
From c56a7fdeeec71f7c6d8453d73e3d840e513a7047 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Chris=20Suszy=C5=84ski?=
Date: Fri, 3 Mar 2023 16:29:50 +0100
Subject: [PATCH 02/16] Workaround for
https://github.com/quarkusio/quarkus/issues/31587 and
https://github.com/cloudevents/sdk-java/issues/533
---
.../knative/showcase/events/Endpoint.java | 75 ++++---------------
.../knative/showcase/events/Event.java | 61 +++++++++++++++
2 files changed, 77 insertions(+), 59 deletions(-)
create mode 100644 quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Event.java
diff --git a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
index 777c6fc..5bd6dd4 100644
--- a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
+++ b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
@@ -1,13 +1,7 @@
package com.redhat.openshift.knative.showcase.events;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.CloudEvent;
-import io.cloudevents.core.builder.CloudEventBuilder;
-import io.cloudevents.http.restful.ws.StructuredEncoding;
import io.cloudevents.jackson.JsonFormat;
-import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.openapi.annotations.Operation;
import org.jboss.resteasy.reactive.RestStreamElementType;
@@ -15,18 +9,13 @@
import org.slf4j.LoggerFactory;
import javax.enterprise.context.ApplicationScoped;
-import javax.enterprise.event.Observes;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
-import java.util.random.RandomGenerator;
@Path("events")
@ApplicationScoped
@@ -35,64 +24,32 @@ class Endpoint {
private static final Logger LOGGER = LoggerFactory.getLogger(Endpoint.class);
private final List events = new ArrayList<>();
- void init(@Observes StartupEvent ignored, ObjectMapper om)
- throws JsonProcessingException {
- var rg = RandomGenerator.getDefault();
- for (int i = 0; i < 3; i++) {
- var s = Score.random(rg);
- events.add(CloudEventBuilder.v1()
- .withId(String.valueOf(i))
- .withSource(URI.create("//localhost/dev"))
- .withType(Endpoint.class.getName())
- .withData(MediaType.APPLICATION_JSON, om.writeValueAsBytes(s))
- .build());
- }
- }
-
@GET
@Operation(summary = "Retrieves all registered events as a JSON stream")
- @RestStreamElementType(MediaType.APPLICATION_JSON)
- @StructuredEncoding(JsonFormat.CONTENT_TYPE)
- public Multi events() {
- return Multi.createFrom().iterable(events);
- }
-
- @GET
- @Path("last")
- @Produces(MediaType.APPLICATION_JSON)
- @StructuredEncoding(JsonFormat.CONTENT_TYPE)
- public CloudEvent last() {
- return events.get(events.size() - 1);
+ @RestStreamElementType(JsonFormat.CONTENT_TYPE)
+ public Multi events() {
+ return Multi.createFrom()
+ .iterable(events)
+ .map(this::workaroundQuarkus31587);
}
@POST
- @Consumes(MediaType.APPLICATION_JSON)
+ @Consumes({MediaType.APPLICATION_JSON, JsonFormat.CONTENT_TYPE})
@Operation(summary = "Receives a CloudEvent and stores it")
public void receive(CloudEvent event) {
events.add(event);
- LOGGER.info("Received event: {}", event);
+ LOGGER.debug("Received event: {}", event);
}
- private static final class Score {
- @JsonProperty
- Play play;
- @JsonProperty
- int score;
-
- static Score random(RandomGenerator rg) {
- var s = new Score();
- s.score = rg.nextInt(1_000);
- s.play = new Play();
- s.play.id = new UUID(rg.nextLong(), rg.nextLong()).toString();
- s.play.game = rg.nextInt(300);
- return s;
- }
+ /**
+ * A workaround for
+ * quarkusio/quarkus#31587
+ * and cloudevents/sdk-java#533.
+ *
+ * TODO: Remove this method once the above issues is fixed.
+ */
+ private Event workaroundQuarkus31587(CloudEvent event) {
+ return Event.from(event, om);
}
-
- private static class Play {
- @JsonProperty
- String id;
- @JsonProperty
- Integer game;
}
}
diff --git a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Event.java b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Event.java
new file mode 100644
index 0000000..6d16996
--- /dev/null
+++ b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Event.java
@@ -0,0 +1,61 @@
+package com.redhat.openshift.knative.showcase.events;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * A workaround for
+ * quarkusio/quarkus#31587
+ * and cloudevents/sdk-java#533.
+ *
+ * TODO: Remove this class once the above issues is fixed.
+ */
+class Event {
+ @JsonProperty
+ String id;
+ @JsonProperty
+ String source;
+ @JsonProperty
+ String type;
+ @JsonProperty("specversion")
+ String specVersion;
+ @JsonProperty("datacontenttype")
+ String dataContentType;
+ @JsonProperty
+ Map data;
+
+ static Event from(CloudEvent event, ObjectMapper om) {
+ var e = new Event();
+ e.id = event.getId();
+ e.source = event.getSource().toString();
+ e.type = event.getType();
+ e.specVersion = event.getSpecVersion().toString();
+ e.dataContentType = event.getDataContentType();
+ e.data = dataToMap(event, om);
+ return e;
+ }
+
+ @Nullable
+ private static Map dataToMap(CloudEvent event, ObjectMapper om) {
+ var data = event.getData();
+ if (data == null) {
+ return null;
+ }
+ var mt = MediaType.valueOf(event.getDataContentType());
+ if (mt.isCompatible(MediaType.APPLICATION_JSON_TYPE)) {
+ try {
+ return om.readValue(data.toBytes(), Map.class);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ throw new IllegalArgumentException("Unsupported media type: " + mt);
+ }
+
+}
From ec910eab07068d382a5d7f003be69d8cccdb138e Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Chris=20Suszy=C5=84ski?=
Date: Tue, 7 Mar 2023 14:11:46 +0100
Subject: [PATCH 03/16] Better workaround, thx @pierDipi
for https://github.com/quarkusio/quarkus/issues/31587 and https://github.com/cloudevents/sdk-java/issues/533
---
.../knative/showcase/events/Endpoint.java | 10 +--
.../knative/showcase/events/Event.java | 61 -------------------
2 files changed, 5 insertions(+), 66 deletions(-)
delete mode 100644 quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Event.java
diff --git a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
index 5bd6dd4..9a35cae 100644
--- a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
+++ b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
@@ -27,10 +27,10 @@ class Endpoint {
@GET
@Operation(summary = "Retrieves all registered events as a JSON stream")
@RestStreamElementType(JsonFormat.CONTENT_TYPE)
- public Multi events() {
+ public Multi events() {
return Multi.createFrom()
.iterable(events)
- .map(this::workaroundQuarkus31587);
+ .map(Endpoint::workaroundQuarkus31587);
}
@POST
@@ -48,8 +48,8 @@ public void receive(CloudEvent event) {
*
* TODO: Remove this method once the above issues is fixed.
*/
- private Event workaroundQuarkus31587(CloudEvent event) {
- return Event.from(event, om);
- }
+ private static byte[] workaroundQuarkus31587(CloudEvent event) {
+ var serializer = new JsonFormat();
+ return serializer.serialize(event);
}
}
diff --git a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Event.java b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Event.java
deleted file mode 100644
index 6d16996..0000000
--- a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Event.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.redhat.openshift.knative.showcase.events;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.cloudevents.CloudEvent;
-
-import javax.annotation.Nullable;
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * A workaround for
- * quarkusio/quarkus#31587
- * and cloudevents/sdk-java#533.
- *
- * TODO: Remove this class once the above issues is fixed.
- */
-class Event {
- @JsonProperty
- String id;
- @JsonProperty
- String source;
- @JsonProperty
- String type;
- @JsonProperty("specversion")
- String specVersion;
- @JsonProperty("datacontenttype")
- String dataContentType;
- @JsonProperty
- Map data;
-
- static Event from(CloudEvent event, ObjectMapper om) {
- var e = new Event();
- e.id = event.getId();
- e.source = event.getSource().toString();
- e.type = event.getType();
- e.specVersion = event.getSpecVersion().toString();
- e.dataContentType = event.getDataContentType();
- e.data = dataToMap(event, om);
- return e;
- }
-
- @Nullable
- private static Map dataToMap(CloudEvent event, ObjectMapper om) {
- var data = event.getData();
- if (data == null) {
- return null;
- }
- var mt = MediaType.valueOf(event.getDataContentType());
- if (mt.isCompatible(MediaType.APPLICATION_JSON_TYPE)) {
- try {
- return om.readValue(data.toBytes(), Map.class);
- } catch (IOException e) {
- throw new IllegalArgumentException(e);
- }
- }
- throw new IllegalArgumentException("Unsupported media type: " + mt);
- }
-
-}
From 5efb5afcc2ac6cba69badfeeba2ebc8cb6964b8c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Chris=20Suszy=C5=84ski?=
Date: Tue, 7 Mar 2023 14:13:23 +0100
Subject: [PATCH 04/16] Development data
---
.../knative/showcase/events/DevData.java | 61 +++++++++++++++++++
1 file changed, 61 insertions(+)
create mode 100644 quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/DevData.java
diff --git a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/DevData.java b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/DevData.java
new file mode 100644
index 0000000..55d71e6
--- /dev/null
+++ b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/DevData.java
@@ -0,0 +1,61 @@
+package com.redhat.openshift.knative.showcase.events;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.redhat.openshift.knative.showcase.config.Config;
+import com.redhat.openshift.knative.showcase.hello.Hello;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.quarkus.arc.profile.IfBuildProfile;
+import io.quarkus.runtime.StartupEvent;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
+import javax.inject.Inject;
+import javax.ws.rs.core.MediaType;
+import java.net.URI;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.util.UUID;
+import java.util.random.RandomGenerator;
+import java.util.random.RandomGeneratorFactory;
+
+@IfBuildProfile("dev")
+@ApplicationScoped
+class DevData {
+ private final Config config;
+
+ @Inject
+ DevData(Config config) {
+ this.config = config;
+ }
+
+ void init(@Observes StartupEvent ignored, ObjectMapper om, Endpoint endpoint)
+ throws JsonProcessingException {
+ var rg = RandomGeneratorFactory.all()
+ .findFirst()
+ .orElseGet(RandomGeneratorFactory::getDefault)
+ .create(DevData.class.hashCode());
+
+ for (int i = 1; i <= 2; i++) {
+ var h = random(rg);
+ endpoint.receive(CloudEventBuilder.v1()
+ .withId(new UUID(rg.nextLong(), rg.nextLong()).toString())
+ .withSource(URI.create("//devdata"))
+ .withType(Hello.class.getName())
+ .withTime(OffsetDateTime.ofInstant(
+ Instant.ofEpochSecond(rg.nextLong(1_672_527_600, 1_677_883_620)),
+ ZoneId.systemDefault()))
+ .withData(MediaType.APPLICATION_JSON, om.writeValueAsBytes(h))
+ .build());
+ }
+ }
+
+ private Hello random(RandomGenerator rg) {
+ return new Hello(
+ config.getGreet(),
+ "Developer" + rg.nextInt(1, 100),
+ rg.nextInt(1, 100)
+ );
+ }
+}
From cee77d6f44d2e5d72b79fc951494c4435183b145 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Chris=20Suszy=C5=84ski?=
Date: Tue, 7 Mar 2023 14:13:50 +0100
Subject: [PATCH 05/16] Proper streaming of events SSE
---
.../knative/showcase/events/Endpoint.java | 7 +--
.../knative/showcase/events/EventStore.java | 54 +++++++++++++++++++
.../resources/META-INF/resources/events.js | 18 +++++++
.../src/main/resources/templates/index.html | 7 ++-
.../src/main/templates/application.properties | 2 +
.../src/test/resources/application.properties | 1 -
6 files changed, 81 insertions(+), 8 deletions(-)
create mode 100644 quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/EventStore.java
create mode 100644 quarkus/src/main/resources/META-INF/resources/events.js
diff --git a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
index 9a35cae..8402f94 100644
--- a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
+++ b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/Endpoint.java
@@ -14,22 +14,19 @@
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;
-import java.util.ArrayList;
-import java.util.List;
@Path("events")
@ApplicationScoped
class Endpoint {
private static final Logger LOGGER = LoggerFactory.getLogger(Endpoint.class);
- private final List events = new ArrayList<>();
+ private final EventStore events = new EventStore();
@GET
@Operation(summary = "Retrieves all registered events as a JSON stream")
@RestStreamElementType(JsonFormat.CONTENT_TYPE)
public Multi events() {
- return Multi.createFrom()
- .iterable(events)
+ return events.stream()
.map(Endpoint::workaroundQuarkus31587);
}
diff --git a/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/EventStore.java b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/EventStore.java
new file mode 100644
index 0000000..a54e3ae
--- /dev/null
+++ b/quarkus/src/main/java/com/redhat/openshift/knative/showcase/events/EventStore.java
@@ -0,0 +1,54 @@
+package com.redhat.openshift.knative.showcase.events;
+
+import io.cloudevents.CloudEvent;
+import io.smallrye.mutiny.Multi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+class EventStore {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(EventStore.class);
+ private final List events = new ArrayList<>();
+ private final ExecutorService executorService =
+ Executors.newFixedThreadPool(6);
+
+ Multi stream() {
+ var p = new EventsPuller();
+ return Multi.createBy()
+ .repeating()
+ .supplier(p::pull)
+ .indefinitely()
+ .runSubscriptionOn(executorService);
+ }
+
+ void add(CloudEvent event) {
+ events.add(event);
+ }
+
+ private final class EventsPuller {
+ private int index = 0;
+ CloudEvent pull() {
+ while (index >= events.size()) {
+ block();
+ }
+ var e = events.get(index);
+ LOGGER.trace("Pulling({}): {}", index, e.getId());
+ index++;
+ return e;
+ }
+
+ private void block() {
+ try {
+ Thread.sleep(25);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted!", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
diff --git a/quarkus/src/main/resources/META-INF/resources/events.js b/quarkus/src/main/resources/META-INF/resources/events.js
new file mode 100644
index 0000000..b4c96ad
--- /dev/null
+++ b/quarkus/src/main/resources/META-INF/resources/events.js
@@ -0,0 +1,18 @@
+const eventsList = document.getElementById('events-list')
+function init() {
+ const source = new EventSource('/events')
+ source.onmessage = (e) => {
+ const li = document.createElement('li')
+ const d = JSON.parse(e.data)
+ li.innerHTML = `${JSON.stringify(d.data)}
`
+ eventsList.appendChild(li)
+ }
+ source.onopen = (e) => {
+ eventsList.innerHTML = ''
+ }
+ source.onerror = (e) => {
+ source.close()
+ init()
+ }
+}
+init()
diff --git a/quarkus/src/main/resources/templates/index.html b/quarkus/src/main/resources/templates/index.html
index 040540d..df57013 100644
--- a/quarkus/src/main/resources/templates/index.html
+++ b/quarkus/src/main/resources/templates/index.html
@@ -26,7 +26,10 @@ What can I do from here?
💡 It will send CloudEvent to K_SINK = {config.sink}
- Powered by:
+ Collected events:
+
+
+ Powered by:
@@ -49,6 +52,6 @@
Application
-
+