Skip to content

Commit

Permalink
Take 2 : Pulsar force schema fetch on pulsar-client-thread for all sc…
Browse files Browse the repository at this point in the history
…hema using AbstractMultiVersionReader

Fixes #2513
  • Loading branch information
ozangunalp committed Mar 6, 2024
1 parent 249d074 commit 94b3fa1
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.GlobalOpenTelemetry;
Expand Down Expand Up @@ -112,7 +116,7 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
.until(m -> closed.get())
.plug(msgMulti -> {
// Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched
if (schema.requireFetchingSchemaInfo()) {
if (schemaRequiresBLockingFetch(schema)) {
return msgMulti.onItem().invoke(org.apache.pulsar.client.api.Message::getValue);
} else {
return msgMulti;
Expand All @@ -136,7 +140,7 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
.filter(m -> m.size() > 0)
.plug(msgMulti -> {
// Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched
if (schema.requireFetchingSchemaInfo()) {
if (schemaRequiresBLockingFetch(schema)) {
return msgMulti.onItem().invoke(msg -> msg.forEach(org.apache.pulsar.client.api.Message::getValue));
} else {
return msgMulti;
Expand Down Expand Up @@ -169,6 +173,14 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
.buildConsumerInstrumenter(PulsarTraceTextMapGetter.INSTANCE);
}

private static <T> boolean schemaRequiresBLockingFetch(Schema<T> schema) {
return schema.requireFetchingSchemaInfo()
|| schema instanceof AvroSchema
|| schema instanceof GenericAvroSchema
|| schema instanceof GenericJsonSchema
|| schema instanceof GenericProtobufNativeSchema;
}

public void incomingTrace(PulsarMessage<T> pulsarMessage) {
PulsarIncomingMessageMetadata metadata = pulsarMessage.getMetadata(PulsarIncomingMessageMetadata.class).get();
TracingUtils.traceIncoming(instrumenter, pulsarMessage, new PulsarTrace.Builder()
Expand Down

0 comments on commit 94b3fa1

Please sign in to comment.