Skip to content

Commit

Permalink
[PIP-236][fix][broker]Fix using schema to create consumer fails after…
Browse files Browse the repository at this point in the history
… using AUTO_CONSUME consumer to subscribe topic (#17449)

 Fixes #17354 
 PIP #19113

### Motivation

*Fixed the failure to use schema to create consumer after using AUTO-CONSUME consumer to subscribe an empty topic, and Broker returned the error message as  `IncompatibleSchemaException("Topic does not have schema to check")`.*

### Modifications

*In PersistentTopic::addSchemaIfIdleOrCheckCompatible, when there is an active consumer, but the consumer is using the AUTO_CONSUME schema to subscribe to the topic. Continuing to create a schema consumer to subscribe to the topic will fail.*

-  When `numActiveConsumers != 0`, and check the schema of the currently existing consumers is AUTO_CONSUME schema.
  • Loading branch information
Denovo1998 authored Mar 7, 2023
1 parent b2658af commit af1360f
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -142,12 +143,24 @@ public class Consumer {

private long negtiveUnackedMsgsTimestamp;

@Getter
private final SchemaType schemaType;

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted,
KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) {
this(subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable, cnx, appId,
metadata, readCompacted, keySharedMeta, startMessageId, consumerEpoch, null);
}

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted,
KeySharedMeta keySharedMeta, MessageId startMessageId,
long consumerEpoch, SchemaType schemaType) {
this.subscription = subscription;
this.subType = subType;
this.topicName = topicName;
Expand Down Expand Up @@ -204,6 +217,8 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.consumerEpoch = consumerEpoch;
this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService()
.getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled();

this.schemaType = schemaType;
}

@VisibleForTesting
Expand Down Expand Up @@ -231,6 +246,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.clientAddress = null;
this.startMessageId = null;
this.isAcknowledgmentAtBatchIndexLevelEnabled = false;
this.schemaType = null;
MESSAGE_PERMITS_UPDATER.set(this, availablePermits);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1195,8 +1195,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.schemaType(schema == null ? null : schema.getType())
.build();
if (schema != null) {
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(option));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;

@Getter
@Builder
Expand All @@ -49,6 +50,7 @@ public class SubscriptionOption {
private KeySharedMeta keySharedMeta;
private Optional<Map<String, String>> subscriptionProperties;
private long consumerEpoch;
private SchemaType schemaType;

public static Optional<Map<String, String>> getPropertiesMap(List<KeyValue> list) {
if (list == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -255,7 +256,8 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.isDurable(), option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null));
option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null),
option.getSchemaType());
}

@Override
Expand All @@ -268,7 +270,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, resetStartMessageBackInSec,
replicateSubscriptionState, keySharedMeta, null);
replicateSubscriptionState, keySharedMeta, null, null);
}

private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
Expand All @@ -279,7 +281,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties) {
Map<String, String> subscriptionProperties,
SchemaType schemaType) {

return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
Expand Down Expand Up @@ -321,8 +324,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
name -> new NonPersistentSubscription(this, subscriptionName, isDurable, subscriptionProperties));

Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta, MessageId.latest,
DEFAULT_CONSUMER_EPOCH, schemaType);
if (isMigrated()) {
consumer.topicMigrated(getClusterMigrationUrl());
}
Expand Down Expand Up @@ -1162,12 +1165,14 @@ public CompletableFuture<MessageId> getLastMessageId() {
@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema().thenCompose((hasSchema) -> {
int numActiveConsumers = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().size())
int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().stream()
.filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
.toList().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (numActiveConsumers != 0)
|| (numActiveConsumersWithoutAutoSchema != 0)
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -727,7 +728,8 @@ public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(),
option.getInitialPosition(), option.getStartMessageRollbackDurationSec(),
option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(),
option.getSubscriptionProperties().orElse(Collections.emptyMap()), option.getConsumerEpoch());
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
option.getConsumerEpoch(), option.getSchemaType());
}

private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
Expand All @@ -740,7 +742,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
boolean replicatedSubscriptionStateArg,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
long consumerEpoch) {
long consumerEpoch,
SchemaType schemaType) {
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
return FutureUtil.failedFuture(new NotAllowedException(
"readCompacted only allowed on failover or exclusive subscriptions"));
Expand Down Expand Up @@ -828,7 +831,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
readCompacted, keySharedMeta, startMessageId, consumerEpoch);
readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaType);

return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
if (subscription instanceof PersistentSubscription persistentSubscription) {
Expand Down Expand Up @@ -907,7 +910,7 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH);
replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH, null);
}

private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
Expand Down Expand Up @@ -3107,21 +3110,22 @@ public synchronized OffloadProcessStatus offloadStatus() {

@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
int numActiveConsumers = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (numActiveConsumers != 0)
|| (ledger.getTotalSize() != 0)) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
});
return hasSchema().thenCompose((hasSchema) -> {
int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().stream()
.filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
.toList().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (numActiveConsumersWithoutAutoSchema != 0)
|| (ledger.getTotalSize() != 0)) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
});
}

public synchronized void checkReplicatedSubscriptionControllerState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,79 @@ public void testAutoCreatedSchema(String domain) throws Exception {
Assert.assertEquals(admin.schemas().getSchemaInfo(topic2).getType(), SchemaType.STRING);
}

@Test(dataProvider = "topicDomain")
public void testSubscribeWithSchemaAfterAutoConsumeNewTopic(String domain) throws Exception {
final String topic = domain + "my-property/my-ns/testSubscribeWithSchemaAfterAutoConsume-1";

@Cleanup
Consumer<GenericRecord> autoConsumer1 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub0")
.consumerName("autoConsumer1")
.subscribe();
@Cleanup
Consumer<GenericRecord> autoConsumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub0")
.consumerName("autoConsumer2")
.subscribe();
@Cleanup
Consumer<GenericRecord> autoConsumer3 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub1")
.consumerName("autoConsumer3")
.subscribe();
@Cleanup
Consumer<GenericRecord> autoConsumer4 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub1")
.consumerName("autoConsumer4")
.subscribe();
try {
log.info("The autoConsumer1 isConnected: " + autoConsumer1.isConnected());
log.info("The autoConsumer2 isConnected: " + autoConsumer2.isConnected());
log.info("The autoConsumer3 isConnected: " + autoConsumer3.isConnected());
log.info("The autoConsumer4 isConnected: " + autoConsumer4.isConnected());
admin.schemas().getSchemaInfo(topic);
fail("The schema of topic should not exist");
} catch (PulsarAdminException e) {
assertEquals(e.getStatusCode(), 404);
}

@Cleanup
Consumer<V1Data> consumerWithSchema1 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub0")
.consumerName("consumerWithSchema-1")
.subscribe();
@Cleanup
Consumer<V1Data> consumerWithSchema2 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub0")
.consumerName("consumerWithSchema-2")
.subscribe();
@Cleanup
Consumer<V1Data> consumerWithSchema3 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub1")
.consumerName("consumerWithSchema-3")
.subscribe();
@Cleanup
Consumer<V1Data> consumerWithSchema4 = pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("sub1")
.consumerName("consumerWithSchema-4")
.subscribe();
}

@DataProvider(name = "keyEncodingType")
public static Object[] keyEncodingType() {
return new Object[] { KeyValueEncodingType.SEPARATED, KeyValueEncodingType.INLINE };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.RetryMessageUtil;
Expand Down Expand Up @@ -811,6 +812,11 @@ public void connectionOpened(final ClientCnx cnx) {
if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
// don't set schema for Schema.BYTES
si = null;
} else {
if (schema instanceof AutoConsumeSchema
&& Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion())) {
si = AutoConsumeSchema.SCHEMA_INFO;
}
}
// startMessageRollbackDurationInSec should be consider only once when consumer connects to first time
long startMessageRollbackDuration = (startMessageRollbackDurationInSec > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {

private SchemaInfoProvider schemaInfoProvider;

public static final SchemaInfo SCHEMA_INFO = SchemaInfoImpl.builder()
.name("AutoConsume")
.type(SchemaType.AUTO_CONSUME)
.schema(new byte[0])
.build();

private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
ConcurrentMap<SchemaVersion, Schema<?>> schemaMap = new ConcurrentHashMap<>();
// The Schema.BYTES will not be uploaded to the broker and store in the schema storage,
Expand Down
Loading

0 comments on commit af1360f

Please sign in to comment.