Skip to content

Commit

Permalink
[#38] Support structured and binary content modes
Browse files Browse the repository at this point in the history
* Use of quarkus-reactive libraries
* Add CORS configuration
* Bump cloudevents and quarkus versions
* Allow the loopback endpoint to be configured based on quarkus.http.port

Signed-off-by: ruromero <rromerom@redhat.com>
  • Loading branch information
ruromero committed Jan 25, 2023
1 parent 4871773 commit b12abf9
Show file tree
Hide file tree
Showing 8 changed files with 5,057 additions and 877 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ $ java -Dplayer.mode=LOCAL -jar target/quarkus-app/quarkus-run.jar
...
2022-06-24 18:39:07,794 INFO [io.und.websockets] (main) UT026003: Adding annotated server endpoint class com.redhat.syseng.tools.cloudevents.resources.MessagesSocket for path /socket
2022-06-24 18:39:08,130 INFO [io.qua.sma.ope.run.OpenApiRecorder] (main) Default CORS properties will be used, please use 'quarkus.http.cors' properties instead
2022-06-24 18:39:08,216 INFO [io.quarkus] (main) cloudevent-player 1.2-SNAPSHOT on JVM (powered by Quarkus 2.15.0.Final) started in 0.879s. Listening on: http://0.0.0.0:8080
2022-06-24 18:39:08,216 INFO [io.quarkus] (main) cloudevent-player 1.2-SNAPSHOT on JVM (powered by Quarkus 2.15.3.Final) started in 0.879s. Listening on: http://0.0.0.0:8080
2022-06-24 18:39:08,217 INFO [io.quarkus] (main) Profile prod activated.
2022-06-24 18:39:08,217 INFO [io.quarkus] (main) Installed features: [cdi, hibernate-validator, kubernetes-client, rest-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-openapi, vertx, websockets, websockets-client]
```
Expand All @@ -94,7 +94,7 @@ Listening for transport dt_socket at address: 5005
2022-06-24 18:51:43,172 INFO [io.und.websockets] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.redhat.syseng.tools.cloudevents.resources.MessagesSocket for path /socket

2022-06-24 18:51:43,229 WARN [org.jbo.res.res.i18n] (Quarkus Main Thread) RESTEASY002155: Provider class io.cloudevents.http.restful.ws.CloudEventsProvider is already registered. 2nd registration is being ignored.
2022-06-24 18:51:43,513 INFO [io.quarkus] (Quarkus Main Thread) cloudevent-player 1.2-SNAPSHOT on JVM (powered by Quarkus 2.15.0.Final) started in 2.543s. Listening on: http://localhost:8080
2022-06-24 18:51:43,513 INFO [io.quarkus] (Quarkus Main Thread) cloudevent-player 1.2-SNAPSHOT on JVM (powered by Quarkus 2.15.3.Final) started in 2.543s. Listening on: http://localhost:8080
2022-06-24 18:51:43,514 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2022-06-24 18:51:43,515 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, hibernate-validator, kubernetes-client, rest-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-openapi, swagger-ui, vertx, websockets, websockets-client]

Expand All @@ -114,7 +114,7 @@ Run
```shell script
$ ./target/cloudevent-player-1.2-SNAPSHOT-runner -Dplayer.mode=LOCAL
...
2022-06-24 18:48:11,565 INFO [io.quarkus] (main) cloudevent-player 1.2-SNAPSHOT native (powered by Quarkus 2.15.0.Final) started in 0.022s. Listening on: http://0.0.0.0:8080
2022-06-24 18:48:11,565 INFO [io.quarkus] (main) cloudevent-player 1.2-SNAPSHOT native (powered by Quarkus 2.15.3.Final) started in 0.022s. Listening on: http://0.0.0.0:8080
2022-06-24 18:48:11,565 INFO [io.quarkus] (main) Profile prod activated.
2022-06-24 18:48:11,565 INFO [io.quarkus] (main) Installed features: [cdi, hibernate-validator, kubernetes-client, rest-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-openapi, vertx, websockets, websockets-client]
2022-06-24 18:48:17,028 INFO [com.red.sys.too.clo.ser.MessageService] (ForkJoinPool.commonPool-worker-3) Player mode LOCAL - broker: http://localhost:8080/
Expand Down
5,788 changes: 4,977 additions & 811 deletions frontend/package-lock.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<url>scm:git:git@github.com:ruromero/cloudevents-player.git</url>
</scm>
<properties>
<cloudevents.version>2.4.0</cloudevents.version>
<cloudevents.version>2.4.1</cloudevents.version>
<compiler-plugin.version>3.10.1</compiler-plugin.version>
<frontend-maven-plugin.version>1.12.1</frontend-maven-plugin.version>
<maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
Expand All @@ -28,7 +28,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<quarkus.platform.version>2.15.0.Final</quarkus.platform.version>
<quarkus.platform.version>2.15.3.Final</quarkus.platform.version>
<quarkus-plugin.version>${quarkus.platform.version}</quarkus-plugin.version>

<quarkus.container-image.registry>quay.io</quarkus.container-image.registry>
Expand Down Expand Up @@ -65,15 +65,15 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
<artifactId>quarkus-rest-client-reactive-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kubernetes-client</artifactId>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>${cloudevents.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.redhat.syseng.tools.cloudevents.resources;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.inject.Inject;
import javax.validation.ConstraintViolation;
Expand All @@ -24,10 +22,13 @@
import com.redhat.syseng.tools.cloudevents.service.MessageService;

import io.cloudevents.CloudEvent;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.runtime.annotations.RegisterForReflection;

@Path("/")
@Consumes(MediaType.APPLICATION_JSON)
@Consumes({MediaType.APPLICATION_JSON, JsonFormat.CONTENT_TYPE})
@Produces(MediaType.APPLICATION_JSON)
@RegisterForReflection
public class MessageReceiverResource {

private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiverResource.class);
Expand All @@ -43,20 +44,17 @@ public class MessageReceiverResource {

@POST
@ResponseStatus(202)
public CompletionStage<Void> receive(CloudEvent object) {
return CompletableFuture.supplyAsync(() -> {
LOGGER.debug("Received event: {}", object.getId());
Set<ConstraintViolation<CloudEvent>> violations = validator.validate(object);
if (!violations.isEmpty()) {
try {
throw new BadRequestException(mapper.writeValueAsString(violations));
} catch (JsonProcessingException e) {
throw new InternalServerErrorException(e);
}
public void receive(CloudEvent object) {
LOGGER.debug("Received event: {}", object.getId());
Set<ConstraintViolation<CloudEvent>> violations = validator.validate(object);
if (!violations.isEmpty()) {
try {
throw new BadRequestException(mapper.writeValueAsString(violations));
} catch (JsonProcessingException e) {
throw new InternalServerErrorException(e);
}
msgService.receive(object);
return null;
});
}
msgService.receive(object);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.inject.Inject;
import javax.validation.ConstraintViolation;
Expand All @@ -17,11 +15,11 @@
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.jboss.resteasy.reactive.ResponseStatus;
import org.jboss.resteasy.reactive.RestHeader;
import org.jboss.resteasy.reactive.RestQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,9 +29,10 @@
import com.redhat.syseng.tools.cloudevents.service.MessageService;

import io.cloudevents.CloudEvent;
import io.cloudevents.jackson.JsonFormat;

@Path("/messages")
@Consumes(MediaType.APPLICATION_JSON)
@Consumes({MediaType.APPLICATION_JSON, JsonFormat.CONTENT_TYPE})
@Produces(MediaType.APPLICATION_JSON)
public class MessageResource {

Expand All @@ -49,35 +48,31 @@ public class MessageResource {
ObjectMapper mapper;

@GET
public CompletionStage<List<Message>> list(@QueryParam("page") @DefaultValue("0") Integer page,
@QueryParam("size") @DefaultValue("10") Integer size) {
return CompletableFuture.supplyAsync(() -> msgService.list(page, size));
public List<Message> list(@RestQuery @DefaultValue("0") Integer page,
@RestQuery @DefaultValue("10") Integer size) {
return msgService.list(page, size);
}

@POST
@ResponseStatus(202)
public CompletableFuture<Void> sendEvent(CloudEvent object) {
return CompletableFuture.supplyAsync(() -> {
Set<ConstraintViolation<CloudEvent>> violations = validator.validate(object);
if (!violations.isEmpty()) {
LOGGER.debug("Validation error {}", violations);
try {
throw new BadRequestException(mapper.writeValueAsString(violations));
} catch (JsonProcessingException e) {
throw new InternalServerErrorException(e);
}
public void sendEvent(CloudEvent object, @RestHeader String contentType) {
Set<ConstraintViolation<CloudEvent>> violations = validator.validate(object);
if (!violations.isEmpty()) {
LOGGER.debug("Validation error {}", violations);
try {
throw new BadRequestException(mapper.writeValueAsString(violations));
} catch (JsonProcessingException e) {
throw new InternalServerErrorException(e);
}
LOGGER.debug("New event to send: {}", object);
msgService.send(object);
return null;
});
}
LOGGER.debug("New event to send: {}", object);
msgService.send(object, JsonFormat.CONTENT_TYPE.equalsIgnoreCase(contentType));
}


@DELETE
public CompletionStage<Response> clear() {
return CompletableFuture.supplyAsync(() -> {
msgService.clear();
return Response.accepted().build();
});
@ResponseStatus(202)
public void clear() {
msgService.clear();
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
package com.redhat.syseng.tools.cloudevents.service;

import java.util.concurrent.CompletionStage;

import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.eclipse.microprofile.rest.client.annotation.RegisterClientHeaders;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.jboss.resteasy.reactive.RestResponse;

import io.cloudevents.CloudEvent;
import io.cloudevents.jackson.JsonFormat;

@Path("/")
@RegisterClientHeaders
@RegisterRestClient
public interface BrokerService {

@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
RestResponse<Void> sendBinary(CloudEvent payload);

@POST
@Consumes(JsonFormat.CONTENT_TYPE)
@Produces(MediaType.APPLICATION_JSON)
CompletionStage<Response> send(CloudEvent payload);
RestResponse<Void> sendStructured(CloudEvent payload);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.rest.client.RestClientBuilder;
import org.jboss.resteasy.reactive.RestResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +31,7 @@
@ApplicationScoped
public class MessageService {

private static final URI LOOPBACK_BASE_URI = URI.create("http://localhost:8080/");
private static final String LOOPBACK_BASE_URI = "http://localhost:%s/";
private static final String DEFAULT_BROKER = "default";
private static final int MAX_SIZE = 200;
private static final Logger LOGGER = LoggerFactory.getLogger(MessageService.class);
Expand All @@ -47,6 +48,9 @@ public class MessageService {
@ConfigProperty(name = "broker.uri")
Optional<String> brokerUri;

@ConfigProperty(name = "quarkus.http.port")
String port;

private final List<Message> messages = new ArrayList<>();

@Inject
Expand All @@ -59,7 +63,7 @@ public class MessageService {

@PostConstruct
public void init() {
URI baseUri = LOOPBACK_BASE_URI;
URI baseUri = URI.create(String.format(LOOPBACK_BASE_URI, port));
PlayerMode playerMode = mode.orElse(PlayerMode.KNATIVE);
LOGGER.info("Player mode {}", playerMode);
if (PlayerMode.KNATIVE.equals(playerMode)) {
Expand All @@ -77,19 +81,25 @@ public void init() {
LOGGER.info("Broker endpoint: {}", baseUri);
}

public void send(CloudEvent event) {
brokerService.send(event).whenComplete((response, throwable) -> {
if (throwable != null) {
LOGGER.error("Unable to send cloudEvent", throwable);
newEvent(event, MessageType.FAILED);
} else if (Response.Status.BAD_REQUEST.getStatusCode() <= response.getStatus()) {
public void send(CloudEvent event, boolean isStructured) {
try {
RestResponse<Void> response;
if (isStructured) {
response = brokerService.sendStructured(event);
} else {
response = brokerService.sendBinary(event);
}
if (Response.Status.BAD_REQUEST.getStatusCode() <= response.getStatus()) {
LOGGER.error("Unable to send cloudEvent. StatusCode: {}", response.getStatus());
newEvent(event, MessageType.FAILED);
} else {
LOGGER.debug("Successfully sent cloudevent {}", event);
newEvent(event, MessageType.SENT);
}
});
}
} catch(Throwable t) {
LOGGER.error("Unable to send cloudEvent", t);
newEvent(event, MessageType.FAILED);
}
}

public void receive(CloudEvent event) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
# player.mode=KNATIVE
## Player broker name, 'default' if not provided
# player.broker=mybroker
quarkus.index-dependency.cloudevents.group-id=io.cloudevents
quarkus.index-dependency.cloudevents.artifact-id=cloudevents-http-restful-ws
quarkus.http.cors=true
quarkus.http.cors.origins=http://localhost

0 comments on commit b12abf9

Please sign in to comment.