Skip to content

Commit

Permalink
Merge pull request #30449 from computerlove/issue-30417
Browse files Browse the repository at this point in the history
Register eventbus message codec when headers are present in consumer
  • Loading branch information
cescoffier authored Jan 20, 2023
2 parents bad8aa2 + d5e05ee commit 051f0c5
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ private static Type extractPayloadTypeFromParameter(MethodInfo method) {
if (parameters.isEmpty()) {
return null;
}
Type param = method.parameterType(0);
/*
* VertxProcessor.collectEventConsumers makes sure that only methods with either just the message object,
* or headers as first argument then message object are allowed.
*/
int messageIndex = parameters.size() == 1 ? 0 : 1;
Type param = method.parameterType(messageIndex);
if (param.kind() == Type.Kind.CLASS) {
return param;
} else if (param.kind() == Type.Kind.PARAMETERIZED_TYPE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.inject.Inject;
Expand All @@ -18,6 +19,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.vertx.core.MultiMap;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.eventbus.Message;

Expand Down Expand Up @@ -83,6 +85,8 @@ public void testWithPrimitiveTypesAndCompletionStage() {

@Test
public void testCodecRegistrationBasedOnParameterType() {
assertThat(bean.getSink().size()).isEqualTo(0);

String address = "address-5";
vertx.eventBus().send(address, new CustomType1("foo"));
vertx.eventBus().send(address, new CustomType1("bar"));
Expand All @@ -104,6 +108,20 @@ public void testCodecRegistrationBasedOnParameterType() {
set = bean.getSink().stream().map(x -> (CustomType1) x).map(CustomType1::getName)
.collect(Collectors.toSet());
assertThat(set).contains("foo-x", "bar-x", "baz-x");
bean.getSink().clear();
}

@Test
public void testCodecRegistrationBasedOnHeadersAndParameterType() {
assertThat(bean.getSink().size()).isEqualTo(0);

vertx.eventBus().send("address-9", new CustomType5("foo-x"));

await().timeout(5, TimeUnit.SECONDS).until(() -> bean.getSink().size() == 1);
Set<String> set = bean.getSink().stream().map(x -> (CustomType5) x).map(CustomType5::getName)
.collect(Collectors.toSet());
assertThat(set).contains("foo-x");
bean.getSink().clear();
}

@Test
Expand Down Expand Up @@ -199,6 +217,11 @@ CompletionStage<CustomType4> codecRegistrationBasedReturnTypeAndCS(String n) {
return CompletableFuture.completedFuture(new CustomType4(n));
}

@ConsumeEvent("address-9")
void codecRegistrationBasedOnHeadersParam(MultiMap headers, CustomType5 ct) {
sink.add(ct);
}

public List<Object> getSink() {
return sink;
}
Expand Down Expand Up @@ -259,4 +282,16 @@ public String getName() {
return name;
}
}

static class CustomType5 {
private final String name;

CustomType5(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import javax.enterprise.context.ApplicationScoped;

import io.quarkus.vertx.ConsumeEvent;
import io.vertx.core.MultiMap;

@ApplicationScoped
public class EventBusConsumer {
Expand All @@ -17,4 +18,10 @@ public String name(String name) {
return "Hello " + name;
}

@ConsumeEvent("person-headers")
public String personWithHeader(MultiMap headers, Person person) {
String s = "Hello " + person.getFirstName() + " " + person.getLastName() + ", " + headers;
return s;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

import io.smallrye.mutiny.Uni;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.eventbus.EventBus;
import io.vertx.mutiny.core.eventbus.Message;
Expand All @@ -22,6 +24,17 @@ public Uni<String> helloToPerson(JsonObject json) {
.onItem().transform(Message::body);
}

@POST
@Path("/person2")
@Produces("text/plain")
public Uni<String> helloToPersonWithHeaders(JsonObject json) {
return bus.<String> request(
"person-headers",
new Person(json.getString("firstName"), json.getString("lastName")),
new DeliveryOptions().addHeader("header", "headerValue"))
.onItem().transform(Message::body);
}

@POST
@Path("/pet")
public Uni<String> helloToPet(JsonObject json) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ public void testEventBusWithString() {
.then().statusCode(200).body(equalTo("Hello Bob Morane"));
}

@Test
public void testEventBusWithObjectAndHeader() {
String body = new JsonObject()
.put("firstName", "Bob")
.put("lastName", "Morane")
.toString();
given().contentType(ContentType.JSON).body(body)
.post("/vertx-test/event-bus/person2")
.then().statusCode(200)
// For some reason Multimap.toString() has \n at the end.
.body(equalTo("Hello Bob Morane, header=headerValue\n"));
}

@Test
public void testEventBusWithPet() {
String body = new JsonObject().put("name", "Neo").put("kind", "rabbit").toString();
Expand Down

0 comments on commit 051f0c5

Please sign in to comment.