Skip to content

Commit

Permalink
Pulsar force schema fetch on pulsar-client-thread for all schema requ…
Browse files Browse the repository at this point in the history
…iring schema info fetching

Fixes #2493
  • Loading branch information
ozangunalp committed Feb 21, 2024
1 parent f810ab8 commit bfdbf19
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ public PulsarIncomingChannel(PulsarClient client, Vertx vertx, Schema<T> schema,
.until(m -> closed.get())
.plug(msgMulti -> {
// Calling getValue on the pulsar-client-internal thread to make sure the SchemaInfo is fetched
if (schema instanceof AutoConsumeSchema || schema instanceof KeyValueSchema) {
return msgMulti.onItem().call(msg -> Uni.createFrom().item(msg::getValue));
if (schema.requireFetchingSchemaInfo()) {
return msgMulti.onItem().invoke(org.apache.pulsar.client.api.Message::getValue);
} else {
return msgMulti;
}
Expand Down

0 comments on commit bfdbf19

Please sign in to comment.