From b03ef130ced7e0a891dfc9cb6aaa623e6f9f5b79 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Wed, 6 Mar 2024 22:35:22 +0100 Subject: [PATCH] Take 2 : Pulsar force schema fetch on pulsar-client-thread for all schema using AbstractMultiVersionReader Fixes #2513 --- .../messaging/pulsar/PulsarIncomingChannel.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java index 4d84016b8f..2da71b1516 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingChannel.java @@ -13,6 +13,10 @@ import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema; import org.eclipse.microprofile.reactive.messaging.Message; import io.opentelemetry.api.GlobalOpenTelemetry; @@ -112,7 +116,7 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, .until(m -> closed.get()) .plug(msgMulti -> { // Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched - if (schema.requireFetchingSchemaInfo()) { + if (schemaRequiresBlockingFetch(schema)) { return msgMulti.onItem().invoke(org.apache.pulsar.client.api.Message::getValue); } else { return msgMulti; @@ -136,7 +140,7 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, .filter(m -> m.size() > 0) .plug(msgMulti -> { // Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched - if (schema.requireFetchingSchemaInfo()) { + if (schemaRequiresBlockingFetch(schema)) { return msgMulti.onItem().invoke(msg -> msg.forEach(org.apache.pulsar.client.api.Message::getValue)); } else { return msgMulti; @@ -169,6 +173,14 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema schema, .buildConsumerInstrumenter(PulsarTraceTextMapGetter.INSTANCE); } + private static boolean schemaRequiresBlockingFetch(Schema schema) { + return schema.requireFetchingSchemaInfo() + || schema instanceof AvroSchema + || schema instanceof GenericAvroSchema + || schema instanceof GenericJsonSchema + || schema instanceof GenericProtobufNativeSchema; + } + public void incomingTrace(PulsarMessage pulsarMessage) { PulsarIncomingMessageMetadata metadata = pulsarMessage.getMetadata(PulsarIncomingMessageMetadata.class).get(); TracingUtils.traceIncoming(instrumenter, pulsarMessage, new PulsarTrace.Builder()