diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 1321ce6bc8..08d475bc87 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -64,7 +64,7 @@ updates: - "3.0" - dependency-name: org.slf4j:slf4j-log4j12 versions: - - "<1.8.0" + - "> 1.8.0" - dependency-name: org.slf4j:simple versions: - - "<1.8.0" + - "> 1.8.0" diff --git a/pom.xml b/pom.xml index dbf4cb1eb3..c7c88dcea1 100644 --- a/pom.xml +++ b/pom.xml @@ -110,7 +110,7 @@ 1.2.3 2.1.0 - 2.0.5 + 1.7.36 5.7.1 0.102.0 diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java index d2fd44ee60..0e234bc9da 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java @@ -59,7 +59,6 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; -import io.smallrye.reactive.messaging.providers.locals.ContextOperator; import io.vertx.amqp.AmqpClientOptions; import io.vertx.amqp.AmqpReceiverOptions; import io.vertx.amqp.AmqpSenderOptions; @@ -254,7 +253,7 @@ public PublisherBuilder> getPublisherBuilder(Config config) multi = multi.broadcast().toAllSubscribers(); } - return ReactiveStreams.fromPublisher(multi.plug(ContextOperator::apply)); + return ReactiveStreams.fromPublisher(multi); } @Override diff --git a/smallrye-reactive-messaging-kafka/revapi.json b/smallrye-reactive-messaging-kafka/revapi.json index d1a26bd769..29f5898f83 100644 --- a/smallrye-reactive-messaging-kafka/revapi.json +++ b/smallrye-reactive-messaging-kafka/revapi.json @@ -24,7 +24,18 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "code": "java.method.removed", + "old": "method org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder> io.smallrye.reactive.messaging.kafka.KafkaConnector::getPublisherBuilder(org.eclipse.microprofile.config.Config)", + "justification": "KafkaConnector moved to InboundConnector API" + }, + { + "code": "java.method.removed", + "old": "method org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder, java.lang.Void> io.smallrye.reactive.messaging.kafka.KafkaConnector::getSubscriberBuilder(org.eclipse.microprofile.config.Config)", + "justification": "KafkaConnector moved to OutboundConnector API" + } + ] } }, { "extension" : "revapi.reporter.json", @@ -43,4 +54,4 @@ "minCriticality" : "documented", "output" : "out" } -} ] \ No newline at end of file +} ] diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java index 0155351f9a..262fa64edb 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java @@ -20,18 +20,16 @@ import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; -import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory; -import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory; -import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; -import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; -import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Tracer; import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction; +import io.smallrye.reactive.messaging.connector.InboundConnector; +import io.smallrye.reactive.messaging.connector.OutboundConnector; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; @@ -116,7 +114,7 @@ @ConnectorAttribute(name = "propagate-headers", direction = Direction.OUTGOING, description = "A comma-separating list of incoming record headers to be propagated to the outgoing record", type = "string", defaultValue = "") @ConnectorAttribute(name = "key-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing keys are delegated to this handler which may provide a fallback value.") @ConnectorAttribute(name = "value-serialization-failure-handler", type = "string", direction = Direction.OUTGOING, description = "The name set in `@Identifier` of a bean that implements `io.smallrye.reactive.messaging.kafka.SerializationFailureHandler`. If set, serialization failure happening when serializing values are delegated to this handler which may provide a fallback value.") -public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, HealthReporter { +public class KafkaConnector implements InboundConnector, OutboundConnector, HealthReporter { public static final String CONNECTOR_NAME = "smallrye-kafka"; @@ -171,7 +169,7 @@ void init() { } @Override - public PublisherBuilder> getPublisherBuilder(Config config) { + public Publisher> getPublisher(Config config) { Config channelConfiguration = ConfigHelper.retrieveChannelConfiguration(configurations, config); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(channelConfiguration); @@ -207,9 +205,9 @@ public PublisherBuilder> getPublisherBuilder(Config config) stream = source.getBatchStream(); } if (broadcast) { - return ReactiveStreams.fromPublisher(stream.broadcast().toAllSubscribers()); + return stream.broadcast().toAllSubscribers(); } else { - return ReactiveStreams.fromPublisher(stream); + return stream; } } @@ -235,14 +233,14 @@ public PublisherBuilder> getPublisherBuilder(Config config) .streams(streams.toArray(new Publisher[0])); boolean broadcast = ic.getBroadcast(); if (broadcast) { - return ReactiveStreams.fromPublisher(multi.broadcast().toAllSubscribers()); + return multi.broadcast().toAllSubscribers(); } else { - return ReactiveStreams.fromPublisher(multi); + return multi; } } @Override - public SubscriberBuilder, Void> getSubscriberBuilder(Config config) { + public Subscriber> getSubscriber(Config config) { Config channelConfiguration = ConfigHelper.retrieveChannelConfiguration(configurations, config); KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(channelConfiguration); diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java index d87b0218d8..79a63efedf 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSink.java @@ -28,8 +28,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; -import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; +import org.reactivestreams.Subscriber; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; @@ -44,6 +43,7 @@ import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.health.KafkaSinkHealth; import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; @SuppressWarnings("jol") public class KafkaSink { @@ -52,7 +52,7 @@ public class KafkaSink { private final int partition; private final String topic; private final String key; - private final SubscriberBuilder, Void> subscriber; + private final Subscriber> subscriber; private final long retries; private final int deliveryTimeoutMs; @@ -118,13 +118,10 @@ public KafkaSink(KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafk } this.processor = new KafkaSenderProcessor(requests, waitForWriteCompletion, writeMessageToKafka()); - this.subscriber = ReactiveStreams.> builder() - .via(processor) - .onError(f -> { - log.unableToDispatch(f); - reportFailure(f); - }) - .ignore(); + this.subscriber = MultiUtils.via(processor, m -> m.onFailure().invoke(f -> { + log.unableToDispatch(f); + reportFailure(f); + })); } @@ -310,7 +307,7 @@ private Object getKey(Message message, return key; } - public SubscriberBuilder, Void> getSink() { + public Subscriber> getSink() { return subscriber; } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java index a5c21c0ca0..98f01a4ddb 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java @@ -31,7 +31,6 @@ import io.smallrye.reactive.messaging.kafka.commit.*; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler; import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth; -import io.smallrye.reactive.messaging.providers.locals.ContextOperator; import io.vertx.core.impl.EventLoopContext; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Vertx; @@ -157,8 +156,7 @@ public KafkaSource(Vertx vertx, incomingMulti = incomingMulti.onItem().invoke(record -> incomingTrace(record, false)); } this.stream = incomingMulti - .onFailure().invoke(t -> reportFailure(t, false)) - .plug(ContextOperator::apply); + .onFailure().invoke(t -> reportFailure(t, false)); this.batchStream = null; } else { Multi> multi; @@ -188,8 +186,7 @@ public KafkaSource(Vertx vertx, incomingMulti = incomingMulti.onItem().invoke(this::incomingTrace); } this.batchStream = incomingMulti - .onFailure().invoke(t -> reportFailure(t, false)) - .plug(ContextOperator::apply); + .onFailure().invoke(t -> reportFailure(t, false)); this.stream = null; } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java index c2d4917e3c..10f1b1b2d6 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java @@ -58,7 +58,7 @@ public void testSinkUsingInteger() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -78,7 +78,7 @@ public void testSinkUsingIntegerAndChannelName() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -99,7 +99,7 @@ public void testSinkUsingString() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(i -> Integer.toString(i)) .map(Message::of) @@ -238,7 +238,7 @@ public void testInvalidPayloadType() { List acked = new CopyOnWriteArrayList<>(); List nacked = new CopyOnWriteArrayList<>(); - Subscriber subscriber = sink.getSink().build(); + Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 6) .map(i -> { if (i == 3 || i == 5) { @@ -279,7 +279,7 @@ public void testInvalidTypeWithDefaultInflightMessages() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber subscriber = sink.getSink().build(); + Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 5) .map(i -> { if (i == 3 || i == 5) { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java index cdfcde3c8b..d1652018c6 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkWithLegacyMetadataTest.java @@ -63,7 +63,7 @@ public void testSinkUsingInteger() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -83,7 +83,7 @@ public void testSinkUsingIntegerAndChannelName() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(Message::of) .subscribe((Subscriber>) subscriber); @@ -104,7 +104,7 @@ public void testSinkUsingString() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().range(0, 10) .map(i -> Integer.toString(i)) .map(Message::of) @@ -244,7 +244,7 @@ public void testInvalidPayloadType() { List acked = new CopyOnWriteArrayList<>(); List nacked = new CopyOnWriteArrayList<>(); - Subscriber subscriber = sink.getSink().build(); + Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 6) .map(i -> { if (i == 3 || i == 5) { @@ -285,7 +285,7 @@ public void testInvalidTypeWithDefaultInflightMessages() { KafkaConnectorOutgoingConfiguration oc = new KafkaConnectorOutgoingConfiguration(config); sink = new KafkaSink(oc, CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber subscriber = sink.getSink().build(); + Subscriber subscriber = sink.getSink(); Multi.createFrom().range(0, 5) .map(i -> { if (i == 3 || i == 5) { diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java index 818e8ed48b..79e39751cf 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.jboss.weld.exceptions.DeploymentException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; @@ -149,13 +148,12 @@ public void testBroadcast() { connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory()); connector.init(); - PublisherBuilder builder = (PublisherBuilder) connector - .getPublisherBuilder(config); + Multi multi = (Multi) connector.getPublisher(config); List messages1 = new ArrayList<>(); List messages2 = new ArrayList<>(); - builder.forEach(messages1::add).run(); - builder.forEach(messages2::add).run(); + multi.subscribe().with(messages1::add); + multi.subscribe().with(messages2::add); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); @@ -190,13 +188,12 @@ public void testBroadcastWithPartitions() { connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory()); connector.init(); - PublisherBuilder builder = (PublisherBuilder) connector - .getPublisherBuilder(config); + Multi multi = (Multi) connector.getPublisher(config); List messages1 = new ArrayList<>(); List messages2 = new ArrayList<>(); - builder.forEach(messages1::add).run(); - builder.forEach(messages2::add).run(); + multi.subscribe().with(messages1::add); + multi.subscribe().with(messages2::add); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java index 7ff82a8601..b5040d87da 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceWithLegacyMetadataTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.jboss.weld.exceptions.DeploymentException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; @@ -154,13 +153,12 @@ public void testBroadcast() { connector.kafkaCDIEvents = testEvents; connector.init(); - PublisherBuilder builder = (PublisherBuilder) connector - .getPublisherBuilder(config); + Multi multi = (Multi) connector.getPublisher(config); List messages1 = new ArrayList<>(); List messages2 = new ArrayList<>(); - builder.forEach(messages1::add).run(); - builder.forEach(messages2::add).run(); + multi.subscribe().with(messages1::add); + multi.subscribe().with(messages2::add); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); @@ -195,13 +193,12 @@ public void testBroadcastWithPartitions() { connector.failureHandlerFactories = new SingletonInstance<>("fail", new KafkaFailStop.Factory()); connector.init(); - PublisherBuilder builder = (PublisherBuilder) connector - .getPublisherBuilder(config); + Multi multi = (Multi) connector.getPublisher(config); List messages1 = new ArrayList<>(); List messages2 = new ArrayList<>(); - builder.forEach(messages1::add).run(); - builder.forEach(messages2::add).run(); + multi.subscribe().with(messages1::add); + multi.subscribe().with(messages2::add); companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, i), 10); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java index e3325d0d34..00b324e5bd 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java @@ -40,6 +40,7 @@ import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry; +import io.smallrye.reactive.messaging.providers.locals.ContextDecorator; import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator; import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator; import io.smallrye.reactive.messaging.providers.wiring.Wiring; @@ -100,6 +101,7 @@ public void initWeld() { weld.addBeanClass(KafkaClientServiceImpl.class); weld.addBeanClass(MetricDecorator.class); weld.addBeanClass(MicrometerDecorator.class); + weld.addBeanClass(ContextDecorator.class); weld.disableDiscovery(); } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java index 7431c5dcdf..80c60aeff1 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ce/KafkaSinkWithCloudEventsTest.java @@ -68,8 +68,7 @@ public void testSendingStructuredCloudEvents() { .withId("some id") .build()); - Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + Multi.createFrom().> item(message).subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -115,7 +114,7 @@ public void testSendingStructuredCloudEventsWithComplexPayload() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -158,7 +157,7 @@ public void testSendingStructuredCloudEventsWithTimestampAndSubject() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -202,7 +201,7 @@ public void testSendingStructuredCloudEventsMissingMandatoryAttribute() { }); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> { HealthReport.HealthReportBuilder builder = HealthReport.builder(); @@ -245,7 +244,7 @@ public void testSendingStructuredCloudEventsWithKey() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -284,7 +283,7 @@ public void testSendingStructuredCloudEventsWithConfiguredTypeAndSource() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.count() == 1); @@ -320,7 +319,7 @@ public void testSendingStructuredCloudEventsWithConfiguredTypeAndSourceAndNoClou Message message = Message.of("hello!"); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -360,7 +359,7 @@ public void testSendingStructuredCloudEventsWithExtensions() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -399,7 +398,7 @@ public void testSendingBinaryCloudEvents() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -436,7 +435,7 @@ public void testSendingBinaryCloudEventsWithContentType() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -476,7 +475,7 @@ public void testSendingBinaryCloudEventsWithKey() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -514,7 +513,7 @@ public void testSendingBinaryCloudEventsWithConfiguredTypeAndSource() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -549,7 +548,7 @@ public void testSendingBinaryCloudEventsWithConfiguredTypeAndSourceButNoMetadata Message message = Message.of("hello!"); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -589,7 +588,7 @@ public void testSendingBinaryCloudEventsMissingMandatoryAttribute() { }); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> { HealthReport.HealthReportBuilder builder = HealthReport.builder(); @@ -617,7 +616,7 @@ public void testWithCloudEventDisabled() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); @@ -651,7 +650,7 @@ public void testSendingBinaryCloudEventsWithExtensions() { .build()); Multi.createFrom().> item(message) - .subscribe().withSubscriber((Subscriber) sink.getSink().build()); + .subscribe().withSubscriber((Subscriber) sink.getSink()); await().until(() -> records.getRecords().size() == 1); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java index 615389011e..9b7cb4008b 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/LazyInitializedTest.java @@ -57,7 +57,7 @@ void testLazyInitializedProducer() { assertThat(producer).isNotNull(); assertThat(producer.unwrap()).isNull(); - Subscriber> subscriber = (Subscriber>) sink.getSink().build(); + Subscriber> subscriber = (Subscriber>) sink.getSink(); await().untilAsserted(() -> { assertThat(getHealthReport(sink::isStarted).isOk()).isTrue(); assertThat(getHealthReport(sink::isReady).isOk()).isTrue(); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java index c1532010aa..2d7c0db8c2 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/client/ReactiveKafkaProducerTest.java @@ -132,7 +132,7 @@ private void sharedProducerTest(int numberOfThreads, int numberOfMessagesPerThre Thread actualProducer = new Thread(() -> { Multi> merge = Multi.createBy().merging().streams(multis); - Subscriber> subscriber = (Subscriber>) createSink().getSink().build(); + Subscriber> subscriber = (Subscriber>) createSink().getSink(); merge.subscribe().withSubscriber(subscriber); }); threads.add(actualProducer); @@ -225,7 +225,7 @@ public void run() { emitter.complete(); }); - Subscriber> subscriber = (Subscriber>) createSink().getSink().build(); + Subscriber> subscriber = (Subscriber>) createSink().getSink(); stream.subscribe().withSubscriber(subscriber); } } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/MultiplePartitionsThrottledStrategyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/MultiplePartitionsThrottledStrategyTest.java new file mode 100644 index 0000000000..9e09a043b2 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/MultiplePartitionsThrottledStrategyTest.java @@ -0,0 +1,325 @@ +package io.smallrye.reactive.messaging.kafka.commit; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.enterprise.context.ApplicationScoped; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.kafka.KafkaRecord; +import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.core.Vertx; +import reactor.core.publisher.Flux; + +public class MultiplePartitionsThrottledStrategyTest extends KafkaCompanionTestBase { + + @Test + public void testWithPartitions() { + companion.topics().createAndWait(topic, 3); + String sinkTopic = topic + "-sink"; + companion.topics().createAndWait(sinkTopic, 3); + String groupId = UUID.randomUUID().toString(); + + MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka") + .with("group.id", groupId) + .with("topic", topic) + .with("partitions", 3) + .with("auto.offset.reset", "earliest") + .with("commit-strategy", "throttled") + .with("value.deserializer", IntegerDeserializer.class.getName()) + .withPrefix("mp.messaging.outgoing.sink") + .with("connector", "smallrye-kafka") + .with("topic", sinkTopic) + .with("value.serializer", IntegerSerializer.class.getName()); + + ProcessorBean application = runApplication(config, ProcessorBean.class); + + int expected = 3000; + Random random = new Random(); + companion.produceIntegers().usingGenerator(i -> { + int p = random.nextInt(3); + return new ProducerRecord<>(topic, p, Integer.toString(p), i); + }, expected).awaitCompletion(Duration.ofMinutes(1)); + + await().atMost(1, TimeUnit.MINUTES) + .until(() -> application.count() >= expected); + + companion.consumeIntegers().fromTopics(sinkTopic, expected) + .awaitCompletion(Duration.ofMinutes(1)); + } + + @Test + public void testWithPartitionsBlockingUnordered() { + companion.topics().createAndWait(topic, 3); + String groupId = UUID.randomUUID().toString(); + + MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka") + .with("group.id", groupId) + .with("topic", topic) + .with("partitions", 3) + .with("auto.offset.reset", "earliest") + .with("commit-strategy", "throttled") + .with("value.deserializer", IntegerDeserializer.class.getName()); + + BlockingUnorderedBean application = runApplication(config, BlockingUnorderedBean.class); + + int expected = 3000; + Random random = new Random(); + companion.produceIntegers().usingGenerator(i -> { + int p = random.nextInt(3); + return new ProducerRecord<>(topic, p, Integer.toString(p), i); + }, expected).awaitCompletion(Duration.ofMinutes(1)); + + await().atMost(1, TimeUnit.MINUTES) + .until(() -> application.count() >= expected); + } + + @Test + public void testWithPartitionsBlocking() { + companion.topics().createAndWait(topic, 3); + String groupId = UUID.randomUUID().toString(); + + MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka") + .with("group.id", groupId) + .with("topic", topic) + .with("partitions", 3) + .with("auto.offset.reset", "earliest") + .with("commit-strategy", "throttled") + .with("value.deserializer", IntegerDeserializer.class.getName()); + + BlockingBean application = runApplication(config, BlockingBean.class); + + int expected = 3000; + Random random = new Random(); + companion.produceIntegers().usingGenerator(i -> { + int p = random.nextInt(3); + return new ProducerRecord<>(topic, p, Integer.toString(p), i); + }, expected).awaitCompletion(Duration.ofMinutes(1)); + + await().atMost(1, TimeUnit.MINUTES) + .until(() -> application.count() >= expected); + } + + @Test + public void testWithPartitionsStreamProcessor() { + companion.topics().createAndWait(topic, 3); + String sinkTopic = topic + "-sink"; + companion.topics().createAndWait(sinkTopic, 3); + String groupId = UUID.randomUUID().toString(); + + MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka") + .with("group.id", groupId) + .with("topic", topic) + .with("partitions", 3) + .with("auto.offset.reset", "earliest") + .with("commit-strategy", "throttled") + .with("value.deserializer", IntegerDeserializer.class.getName()) + .withPrefix("mp.messaging.outgoing.sink") + .with("connector", "smallrye-kafka") + .with("topic", sinkTopic) + .with("value.serializer", IntegerSerializer.class.getName()); + + StreamProcessingBean application = runApplication(config, StreamProcessingBean.class); + + int expected = 3000; + Random random = new Random(); + companion.produceIntegers().usingGenerator(i -> { + int p = random.nextInt(3); + return new ProducerRecord<>(topic, p, Integer.toString(p), i); + }, expected).awaitCompletion(Duration.ofMinutes(1)); + + await().atMost(1, TimeUnit.MINUTES) + .until(() -> application.count() >= expected); + + companion.consumeIntegers().fromTopics(sinkTopic, expected) + .awaitCompletion(Duration.ofMinutes(1)); + } + + @Test + @Disabled + public void testWithPartitionsStreamProcessorFlux() { + companion.topics().createAndWait(topic, 3); + String sinkTopic = topic + "-sink"; + companion.topics().createAndWait(sinkTopic, 3); + String groupId = UUID.randomUUID().toString(); + + MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka") + .with("group.id", groupId) + .with("topic", topic) + .with("partitions", 3) + .with("auto.offset.reset", "earliest") + .with("commit-strategy", "throttled") + .with("value.deserializer", IntegerDeserializer.class.getName()) + .withPrefix("mp.messaging.outgoing.sink") + .with("connector", "smallrye-kafka") + .with("topic", sinkTopic) + .with("value.serializer", IntegerSerializer.class.getName()); + + PublisherStreamProcessingBean application = runApplication(config, PublisherStreamProcessingBean.class); + + int expected = 2000; + Random random = new Random(); + companion.produceIntegers().usingGenerator(i -> { + int p = random.nextInt(3); + return new ProducerRecord<>(topic, p, Integer.toString(p), i); + }, expected).awaitCompletion(Duration.ofMinutes(1)); + + await().atMost(1, TimeUnit.MINUTES) + .until(() -> application.count() >= expected); + + companion.consumeIntegers().fromTopics(sinkTopic, expected) + .awaitCompletion(Duration.ofMinutes(1)); + } + + @ApplicationScoped + public static class ProcessorBean { + private final AtomicLong count = new AtomicLong(); + private final Map> received = new ConcurrentHashMap<>(); + + @Incoming("kafka") + @Outgoing("sink") + public Message consume(KafkaRecord msg) { + String k = Thread.currentThread().getName(); + List list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>()); + list.add(msg.getPayload()); + count.incrementAndGet(); + return msg.withPayload(msg.getPayload() + 1) + .addMetadata(OutgoingKafkaRecordMetadata.builder().withPartition(msg.getPartition()).build()); + } + + public Map> getReceived() { + return received; + } + + public long count() { + return count.get(); + } + } + + @ApplicationScoped + public static class StreamProcessingBean { + private final AtomicLong count = new AtomicLong(); + private final Map> received = new ConcurrentHashMap<>(); + + @Incoming("kafka") + @Outgoing("sink") + public Multi> process(Multi> multi) { + return multi.map(msg -> { + String k = Thread.currentThread().getName(); + List list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>()); + list.add(msg.getPayload()); + count.incrementAndGet(); + assert msg.getContextMetadata().get().context() == Vertx.currentContext(); + return msg.withPayload(msg.getPayload() + 1) + .addMetadata(OutgoingKafkaRecordMetadata.builder().withPartition(msg.getPartition()).build()); + }); + } + + public Map> getReceived() { + return received; + } + + public long count() { + return count.get(); + } + } + + @ApplicationScoped + public static class PublisherStreamProcessingBean { + private final AtomicLong count = new AtomicLong(); + private final Map> received = new ConcurrentHashMap<>(); + + @Incoming("kafka") + @Outgoing("sink") + public Flux> process(Flux> pub) throws InterruptedException { + return pub.map(msg -> { + String k = Thread.currentThread().getName(); + List list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>()); + list.add(msg.getPayload()); + count.incrementAndGet(); + assert msg.getContextMetadata().get().context() == Vertx.currentContext(); + return msg.withPayload(msg.getPayload() + 1) + .addMetadata(OutgoingKafkaRecordMetadata.builder().withPartition(msg.getPartition()).build()); + }); + } + + public Map> getReceived() { + return received; + } + + public long count() { + return count.get(); + } + } + + @ApplicationScoped + public static class BlockingBean { + private final AtomicLong count = new AtomicLong(); + private final Map> received = new ConcurrentHashMap<>(); + + @Incoming("kafka") + @Blocking + public CompletionStage consume(Message msg) throws InterruptedException { + String k = Thread.currentThread().getName(); + List list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>()); + list.add(msg.getPayload()); + count.incrementAndGet(); + return msg.ack(); + } + + public Map> getReceived() { + return received; + } + + public long count() { + return count.get(); + } + } + + @ApplicationScoped + public static class BlockingUnorderedBean { + private final AtomicLong count = new AtomicLong(); + private final Map> received = new ConcurrentHashMap<>(); + + @Incoming("kafka") + @Blocking(ordered = false) + public CompletionStage consume(Message msg) throws InterruptedException { + String k = Thread.currentThread().getName(); + List list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>()); + list.add(msg.getPayload()); + count.incrementAndGet(); + return msg.ack(); + } + + public Map> getReceived() { + return received; + } + + public long count() { + return count.get(); + } + } + +} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ThrottledCommitStrategyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ThrottledCommitStrategyTest.java deleted file mode 100644 index 78a63e462c..0000000000 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/commit/ThrottledCommitStrategyTest.java +++ /dev/null @@ -1,144 +0,0 @@ -package io.smallrye.reactive.messaging.kafka.commit; - -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import javax.enterprise.context.ApplicationScoped; - -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.eclipse.microprofile.reactive.messaging.Message; -import org.eclipse.microprofile.reactive.messaging.Outgoing; -import org.junit.jupiter.api.Test; - -import io.smallrye.reactive.messaging.annotations.Blocking; -import io.smallrye.reactive.messaging.kafka.KafkaRecord; -import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; -import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; -import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; - -public class ThrottledCommitStrategyTest extends KafkaCompanionTestBase { - - @Test - public void testWithPartitions() { - companion.topics().createAndWait(topic, 3); - String sinkTopic = topic + "-sink"; - companion.topics().createAndWait(sinkTopic, 3); - String groupId = UUID.randomUUID().toString(); - - MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka") - .with("group.id", groupId) - .with("topic", topic) - .with("partitions", 3) - .with("auto.offset.reset", "earliest") - .with("commit-strategy", "throttled") - .with("value.deserializer", IntegerDeserializer.class.getName()) - .withPrefix("mp.messaging.outgoing.sink") - .with("connector", "smallrye-kafka") - .with("topic", sinkTopic) - .with("value.serializer", IntegerSerializer.class.getName()); - - ProcessorBean application = runApplication(config, ProcessorBean.class); - - int expected = 3000; - Random random = new Random(); - companion.produceIntegers().usingGenerator(i -> { - int p = random.nextInt(3); - return new ProducerRecord<>(topic, p, Integer.toString(p), i); - }, expected).awaitCompletion(Duration.ofMinutes(1)); - - await().atMost(1, TimeUnit.MINUTES) - .until(() -> application.count() >= expected); - - companion.consumeIntegers().fromTopics(sinkTopic, expected) - .awaitCompletion(Duration.ofMinutes(1)); - } - - @Test - public void testWithPartitionsBlockingUnordered() { - companion.topics().createAndWait(topic, 3); - String groupId = UUID.randomUUID().toString(); - - MapBasedConfig config = kafkaConfig("mp.messaging.incoming.kafka") - .with("group.id", groupId) - .with("topic", topic) - .with("partitions", 3) - .with("auto.offset.reset", "earliest") - .with("commit-strategy", "throttled") - .with("value.deserializer", IntegerDeserializer.class.getName()); - - BlockingBean application = runApplication(config, BlockingBean.class); - - int expected = 3000; - Random random = new Random(); - companion.produceIntegers().usingGenerator(i -> { - int p = random.nextInt(3); - return new ProducerRecord<>(topic, p, Integer.toString(p), i); - }, expected).awaitCompletion(Duration.ofMinutes(1)); - - await().atMost(1, TimeUnit.MINUTES) - .until(() -> application.count() >= expected); - } - - @ApplicationScoped - public static class ProcessorBean { - private final AtomicLong count = new AtomicLong(); - private final Map> received = new ConcurrentHashMap<>(); - - @Incoming("kafka") - @Outgoing("sink") - public Message consume(KafkaRecord msg) { - String k = Thread.currentThread().getName(); - List list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>()); - list.add(msg.getPayload()); - count.incrementAndGet(); - return msg.withPayload(msg.getPayload() + 1) - .addMetadata(OutgoingKafkaRecordMetadata.builder().withPartition(msg.getPartition()).build()); - } - - public Map> getReceived() { - return received; - } - - public long count() { - return count.get(); - } - } - - @ApplicationScoped - public static class BlockingBean { - private final AtomicLong count = new AtomicLong(); - private final Map> received = new ConcurrentHashMap<>(); - - @Incoming("kafka") - @Blocking(ordered = false) - public CompletionStage consume(Message msg) throws InterruptedException { - String k = Thread.currentThread().getName(); - List list = received.computeIfAbsent(k, s -> new CopyOnWriteArrayList<>()); - list.add(msg.getPayload()); - count.incrementAndGet(); - return msg.ack(); - } - - public Map> getReceived() { - return received; - } - - public long count() { - return count.get(); - } - } - -} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java index 9321c7cf44..fb3cd4a1c5 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/serde/SerializerConfigurationTest.java @@ -48,7 +48,7 @@ public void testThatWhenNotSetKeySerializerIsString() { ConsumerTask consumed = companion.consumeStrings().fromTopics(topic, 4, Duration.ofSeconds(10)); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); Multi.createFrom().items( Message.of(of("key", "value")), Message.of(of(null, "value")), Message.of(of("key", null)), Message.of(of(null, null))) @@ -77,7 +77,7 @@ public void testKeySerializationFailure() { .with("retries", 0L); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); AtomicBoolean nacked = new AtomicBoolean(); Multi.createFrom().items( Message.of(of(125.25, new JsonObject().put("k", "v"))).withNack(t -> { @@ -96,7 +96,7 @@ public void testValueSerializationFailure() { .with("retries", 0L); sink = new KafkaSink(new KafkaConnectorOutgoingConfiguration(config), CountKafkaCdiEvents.noCdiEvents, UnsatisfiedInstance.instance()); - Subscriber> subscriber = sink.getSink().build(); + Subscriber> subscriber = sink.getSink(); AtomicBoolean nacked = new AtomicBoolean(); Multi.createFrom().items( Message.of(of(new JsonObject().put("k", "v"), 125.25)).withNack(t -> { diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java index 07b8be5b64..bced78bd87 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java @@ -16,7 +16,6 @@ import io.smallrye.reactive.messaging.mqtt.internal.MqttTopicHelper; import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS; -import io.smallrye.reactive.messaging.providers.locals.ContextOperator; import io.vertx.mutiny.core.Vertx; public class MqttSource { @@ -71,7 +70,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, .unsubscribe(topic).toCompletionStage()); else return Uni.createFrom().voidItem(); - }).plug(ContextOperator::apply) + }) .onFailure().invoke(log::unableToConnectToBroker)); } diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java index a2e2df60a0..6b852bf613 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java @@ -27,6 +27,7 @@ import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry; +import io.smallrye.reactive.messaging.providers.locals.ContextDecorator; import io.smallrye.reactive.messaging.providers.wiring.Wiring; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; import io.vertx.mqtt.MqttClientOptions; @@ -127,6 +128,7 @@ static Weld baseWeld(MapBasedConfig config) { weld.addPackages(EmitterImpl.class.getPackage()); weld.addExtension(new ReactiveMessagingExtension()); weld.addBeanClass(MqttConnector.class); + weld.addBeanClass(ContextDecorator.class); weld.addBeanClass(EmitterFactoryImpl.class); weld.addBeanClass(MutinyEmitterFactoryImpl.class); weld.addBeanClass(LegacyEmitterFactoryImpl.class); diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java index e13b446674..88a8e0842f 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java @@ -230,7 +230,7 @@ private void processMethodReturningAPublisherBuilderOfPayloadsAndConsumingPayloa Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi.onItem().transformToMultiAndConcatenate(message -> { PublisherBuilder pb = invoke(message.getPayload()); - return Multi.createFrom().publisher(pb.buildRs()) + return MultiUtils.publisher(pb.buildRs()) .onItem().transform(payload -> Message.of(payload, message.getMetadata())); // TODO We can handle post-acknowledgement here. -> onCompletion }); @@ -242,7 +242,7 @@ private void processMethodReturningAPublisherOfPayloadsAndConsumingPayloads() { Multi> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration); return multi.onItem().transformToMultiAndConcatenate(message -> { Publisher pub = invoke(message.getPayload()); - return Multi.createFrom().publisher(pub) + return MultiUtils.publisher(pub) .onItem().transform(payload -> Message.of(payload, message.getMetadata())); // TODO We can handle post-acknowledgement here. -> onCompletion }); diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java index 1417bf68ce..59da4dcd9e 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java @@ -103,21 +103,21 @@ public void initialize(Object bean) { private void produceAPublisherBuilderOfMessages() { PublisherBuilder> builder = invoke(); - this.publisher = decorate(Multi.createFrom().publisher(builder.buildRs())); + this.publisher = decorate(MultiUtils.publisher(builder.buildRs())); } private

void produceAPublisherBuilderOfPayloads() { PublisherBuilder

builder = invoke(); - this.publisher = decorate(Multi.createFrom().publisher(builder.map(Message::of).buildRs())); + this.publisher = decorate(MultiUtils.publisher(builder.map(Message::of).buildRs())); } private void produceAPublisherOfMessages() { - this.publisher = Multi.createFrom().publisher(invoke()); + this.publisher = MultiUtils.publisher(invoke()); } private

void produceAPublisherOfPayloads() { Publisher

pub = invoke(); - this.publisher = decorate(Multi.createFrom().publisher(pub).map(Message::of)); + this.publisher = decorate(MultiUtils.publisher(pub).map(Message::of)); } private void produceIndividualMessages() { diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/StreamTransformerMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/StreamTransformerMediator.java index 711af2238e..ae68cace95 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/StreamTransformerMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/StreamTransformerMediator.java @@ -97,7 +97,7 @@ private void processMethodConsumingAPublisherBuilderOfMessages() { PublisherBuilder> argument = ReactiveStreams.fromPublisher(multi); PublisherBuilder> result = invoke(argument); Objects.requireNonNull(result, msg.methodReturnedNull(configuration.methodAsString())); - return Multi.createFrom().publisher(result.buildRs()); + return MultiUtils.publisher(result.buildRs()); }; } @@ -107,13 +107,16 @@ private void processMethodConsumingAPublisherOfMessages() { Publisher> argument = convertToDesiredPublisherType(multi); Publisher> result = invoke(argument); Objects.requireNonNull(result, msg.methodReturnedNull(configuration.methodAsString())); - return Multi.createFrom().publisher(result); + return MultiUtils.publisher(result); }; } @SuppressWarnings("unchecked") private Publisher convertToDesiredPublisherType(Multi multi) { Class parameterType = configuration.getParameterTypes()[0]; + if (parameterType.equals(Multi.class)) { + return multi; + } Optional> converter = Registry.lookup(parameterType); Publisher argument = multi; if (converter.isPresent()) { @@ -129,7 +132,7 @@ private void processMethodConsumingAPublisherBuilderOfPayload() { PublisherBuilder argument = ReactiveStreams.fromPublisher(multi); PublisherBuilder result = invoke(argument); Objects.requireNonNull(result, msg.methodReturnedNull(configuration.methodAsString())); - return Multi.createFrom().publisher(result.buildRs()) + return MultiUtils.publisher(result.buildRs()) .onItem().transform(Message::of); }; } @@ -141,7 +144,7 @@ private void processMethodConsumingAPublisherOfPayload() { Publisher argument = convertToDesiredPublisherType(multi); Publisher result = invoke(argument); Objects.requireNonNull(result, msg.methodReturnedNull(configuration.methodAsString())); - return Multi.createFrom().publisher(result) + return MultiUtils.publisher(result) .onItem().transform(Message::of); }; } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java index aff5eaf3d8..73472ed071 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java @@ -18,6 +18,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.subscription.MultiSubscriber; import io.smallrye.reactive.messaging.MediatorConfiguration; import io.smallrye.reactive.messaging.Shape; import io.smallrye.reactive.messaging.providers.helpers.ClassUtils; @@ -103,7 +104,7 @@ public void run() { AtomicReference syncErrorCatcher = new AtomicReference<>(); Subscriber> delegate = this.subscriber; - Subscriber> delegating = new Subscriber>() { + Subscriber> delegating = new MultiSubscriber>() { @Override public void onSubscribe(Subscription s) { subscription.set(s); @@ -121,9 +122,9 @@ public void cancel() { } @Override - public void onNext(Message o) { + public void onItem(Message item) { try { - delegate.onNext(o); + delegate.onNext(item); } catch (Exception e) { log.messageProcessingException(configuration.methodAsString(), e); syncErrorCatcher.set(e); @@ -131,14 +132,14 @@ public void onNext(Message o) { } @Override - public void onError(Throwable t) { + public void onFailure(Throwable t) { log.messageProcessingException(configuration.methodAsString(), t); syncErrorCatcher.set(t); delegate.onError(t); } @Override - public void onComplete() { + public void onCompletion() { delegate.onComplete(); } }; diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ChannelProducer.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ChannelProducer.java index 6e8ef960f2..eb95dcd31d 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ChannelProducer.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ChannelProducer.java @@ -28,6 +28,7 @@ import io.smallrye.reactive.messaging.ChannelRegistry; import io.smallrye.reactive.messaging.MessageConverter; import io.smallrye.reactive.messaging.MutinyEmitter; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import io.smallrye.reactive.messaging.providers.helpers.TypeUtils; import io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions; @@ -172,7 +173,7 @@ private Multi> getPublisher(InjectionPoint injectionPoint) if (list.isEmpty()) { throw ex.illegalStateForStream(name, channelRegistry.getIncomingNames()); } else if (list.size() == 1) { - return Multi.createFrom().publisher(list.get(0)); + return MultiUtils.publisher(list.get(0)); } else { return Multi.createBy().merging() .streams(list.stream().map(p -> p).collect(Collectors.toList())); diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java index b8ab8bb2ef..56f7928610 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/helpers/MultiUtils.java @@ -1,13 +1,21 @@ package io.smallrye.reactive.messaging.providers.helpers; +import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; + import java.util.concurrent.CompletionStage; +import java.util.function.Function; import java.util.function.Supplier; import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Message; +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.MultiSubscriber; import io.smallrye.reactive.messaging.MediatorConfiguration; public class MultiUtils { @@ -19,6 +27,14 @@ public static Multi createFromGenerator(Supplier supplier) { }); } + public static Multi publisher(Publisher publisher) { + Publisher actual = nonNull(publisher, "publisher"); + if (actual instanceof Multi) { + return (Multi) actual; + } + return Multi.createFrom().safePublisher(publisher); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) public static Multi> handlePreProcessingAcknowledgement(Multi> multi, MediatorConfiguration configuration) { @@ -32,4 +48,40 @@ public static Multi> handlePreProcessingAcknowledgement(Mul })); } + @SuppressWarnings({ "unchecked" }) + public static Multi via(Multi multi, Processor processor) { + return multi.plug(stream -> Multi.createFrom().deferred(() -> { + Multi m = (Multi) MultiUtils.publisher(processor); + stream.subscribe(processor); + return m; + })); + } + + public static Subscriber via(Processor processor, Function, Multi

> function) { + return new MultiSubscriber<>() { + @Override + public void onSubscribe(Subscription subscription) { + processor.onSubscribe(subscription); + MultiUtils.publisher(processor).plug(function).subscribe().with(r -> { + // ignore + }); + } + + @Override + public void onItem(T item) { + processor.onNext(item); + } + + @Override + public void onFailure(Throwable throwable) { + processor.onError(throwable); + } + + @Override + public void onCompletion() { + processor.onComplete(); + } + }; + } + } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConfiguredChannelFactory.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConfiguredChannelFactory.java index 2f554b8dea..56dfdd007f 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConfiguredChannelFactory.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/impl/ConfiguredChannelFactory.java @@ -23,6 +23,7 @@ import io.smallrye.reactive.messaging.PublisherDecorator; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; /** * Look for stream factories and get instances. @@ -169,7 +170,7 @@ private Publisher> createPublisher(String name, Config conf throw ex.illegalArgumentUnknownConnector(name); } - Multi> publisher = Multi.createFrom().publisher(inboundConnector.getPublisher(config)); + Multi> publisher = MultiUtils.publisher(inboundConnector.getPublisher(config)); for (PublisherDecorator decorator : getSortedInstances(publisherDecoratorInstance)) { publisher = decorator.decorate(publisher, name, true); diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/locals/ContextDecorator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/locals/ContextDecorator.java new file mode 100644 index 0000000000..a830061369 --- /dev/null +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/locals/ContextDecorator.java @@ -0,0 +1,28 @@ +package io.smallrye.reactive.messaging.providers.locals; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.PublisherDecorator; + +/** + * Decorator to dispatch messages on the Vert.x context attached to the message via {@link LocalContextMetadata}. + * Low priority to be called before other decorators. + */ +@ApplicationScoped +public class ContextDecorator implements PublisherDecorator { + + @Override + public int getPriority() { + return 0; + } + + @Override + public Multi> decorate(Multi> publisher, String channelName, + boolean isConnector) { + return ContextOperator.apply(publisher); + } + +} diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/locals/ContextOperator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/locals/ContextOperator.java index a05c4337ef..7e3a65f5a7 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/locals/ContextOperator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/locals/ContextOperator.java @@ -11,7 +11,6 @@ import io.smallrye.mutiny.subscription.MultiSubscriber; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; import io.vertx.core.Context; -import io.vertx.core.Vertx; /** * Decorator to dispatch messages on the Vert.x context attached to the message via {@link LocalContextMetadata}. @@ -78,16 +77,11 @@ public void onItem(T item) { @Override public void request(long numberOfItems) { - Context context = Vertx.currentContext(); - if (context != null) { - super.request(numberOfItems); + Context root = ROOT_CONTEXT_UPDATER.get(this); + if (root != null) { + root.runOnContext(x -> super.request(numberOfItems)); } else { - Context root = ROOT_CONTEXT_UPDATER.get(this); - if (root != null) { - root.runOnContext(x -> super.request(numberOfItems)); - } else { - super.request(numberOfItems); - } + super.request(numberOfItems); } } diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/wiring/Wiring.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/wiring/Wiring.java index d4ff7ef5aa..01638c6178 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/wiring/Wiring.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/wiring/Wiring.java @@ -27,6 +27,7 @@ import io.smallrye.reactive.messaging.annotations.Merge; import io.smallrye.reactive.messaging.providers.AbstractMediator; import io.smallrye.reactive.messaging.providers.extension.*; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; @ApplicationScoped @@ -374,7 +375,7 @@ public void materialize(ChannelRegistry registry) { List>> publishers = registry.getPublishers(name); Multi> merged; if (publishers.size() == 1) { - merged = Multi.createFrom().publisher(publishers.get(0)); + merged = MultiUtils.publisher(publishers.get(0)); } else { merged = Multi.createBy().merging().streams(publishers.stream().map(p -> p).collect(Collectors.toList())); } @@ -635,12 +636,12 @@ public void materialize(ChannelRegistry registry) { } if (publishers.size() == 1) { - aggregates = Multi.createFrom().publisher(publishers.get(0)); + aggregates = MultiUtils.publisher(publishers.get(0)); } else if (concat) { aggregates = Multi.createBy().concatenating() .streams(publishers.stream().map(p -> p).collect(Collectors.toList())); } else if (one) { - aggregates = Multi.createFrom().publisher(publishers.get(0)); + aggregates = MultiUtils.publisher(publishers.get(0)); } else { aggregates = Multi.createBy().merging() .streams(publishers.stream().map(p -> p).collect(Collectors.toList())); @@ -789,12 +790,12 @@ public void materialize(ChannelRegistry registry) { publishers.addAll(registry.getPublishers(channel)); } if (publishers.size() == 1) { - aggregates = Multi.createFrom().publisher(publishers.get(0)); + aggregates = MultiUtils.publisher(publishers.get(0)); } else if (concat) { aggregates = Multi.createBy().concatenating() .streams(publishers.stream().map(p -> p).collect(Collectors.toList())); } else if (one) { - aggregates = Multi.createFrom().publisher(publishers.get(0)); + aggregates = MultiUtils.publisher(publishers.get(0)); } else { aggregates = Multi.createBy().merging() .streams(publishers.stream().map(p -> p).collect(Collectors.toList())); diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java index cb4ca7020c..9acf5aae34 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/WeldTestBaseWithoutTails.java @@ -35,6 +35,7 @@ import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry; +import io.smallrye.reactive.messaging.providers.locals.ContextDecorator; import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator; import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator; import io.smallrye.reactive.messaging.providers.wiring.Wiring; @@ -110,6 +111,7 @@ public void setUp() { MicrometerDecorator.class, MetricDecorator.class, HealthCenter.class, + ContextDecorator.class, // Messaging provider MyDummyConnector.class, // Emitter factories diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/locals/LocalPropagationTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/locals/LocalPropagationTest.java index 535680cbe5..54119d2fe5 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/locals/LocalPropagationTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/locals/LocalPropagationTest.java @@ -32,7 +32,6 @@ import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; -import io.smallrye.reactive.messaging.providers.locals.ContextOperator; import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; import io.vertx.core.impl.ConcurrentHashSet; import io.vertx.mutiny.core.Context; @@ -175,8 +174,7 @@ public Publisher> getPublisher(Config config) { .onItem() .transformToUniAndConcatenate(i -> Uni.createFrom().emitter(e -> context.runOnContext(() -> e.complete(i)))) .map(Message::of) - .map(ContextAwareMessage::withContextMetadata) - .plug(ContextOperator::apply); + .map(ContextAwareMessage::withContextMetadata); } } diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/extension/AlternativeAnalysisTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/extension/AlternativeAnalysisTest.java index 4aa1aaf905..d19ab0147a 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/extension/AlternativeAnalysisTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/providers/extension/AlternativeAnalysisTest.java @@ -28,6 +28,7 @@ import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry; +import io.smallrye.reactive.messaging.providers.locals.ContextDecorator; import io.smallrye.reactive.messaging.providers.wiring.Wiring; /** @@ -59,6 +60,7 @@ void testAlternativeAnalysis() { ConfiguredChannelFactory.class, ConnectorFactories.class, HealthCenter.class, + ContextDecorator.class, // Messaging provider MyDummyConnector.class, diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java index 9c88e2a711..ddfa044986 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/RabbitMQConnector.java @@ -42,7 +42,6 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.health.HealthReporter; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; -import io.smallrye.reactive.messaging.providers.locals.ContextOperator; import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck; import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler; import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck; @@ -251,7 +250,7 @@ public PublisherBuilder> getPublisherBuilder(final Config c multi = multi.broadcast().toAllSubscribers(); } - return ReactiveStreams.fromPublisher(multi.plug(ContextOperator::apply)); + return ReactiveStreams.fromPublisher(multi); } private Uni createConsumer(RabbitMQConnectorIncomingConfiguration ic, RabbitMQClient client) { diff --git a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java index 91a2a40699..8fd733c7e6 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java +++ b/smallrye-reactive-messaging-rabbitmq/src/test/java/io/smallrye/reactive/messaging/rabbitmq/WeldTestBase.java @@ -26,6 +26,7 @@ import io.smallrye.reactive.messaging.providers.impl.ConfiguredChannelFactory; import io.smallrye.reactive.messaging.providers.impl.ConnectorFactories; import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry; +import io.smallrye.reactive.messaging.providers.locals.ContextDecorator; import io.smallrye.reactive.messaging.providers.metrics.MetricDecorator; import io.smallrye.reactive.messaging.providers.metrics.MicrometerDecorator; import io.smallrye.reactive.messaging.providers.wiring.Wiring; @@ -67,6 +68,7 @@ public void initWeld() { weld.addBeanClass(RabbitMQConnector.class); weld.addBeanClass(MetricDecorator.class); weld.addBeanClass(MicrometerDecorator.class); + weld.addBeanClass(ContextDecorator.class); weld.disableDiscovery(); }