Skip to content

Commit

Permalink
[fix][schema] Fix cherry-pick issue from #18283 (#18555)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Nov 21, 2022
1 parent 643787e commit 0896eda
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
closeClientFuture.thenAccept(delete -> {
CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>();
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema())
deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema ? deleteSchema() :
CompletableFuture.completedFuture(null))
.thenCompose(__ -> deleteTopicPolicies())
.thenCompose(__ -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,29 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import lombok.Data;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -271,4 +272,69 @@ public void testPersistentPartitionedTopicUnload() throws Exception {
producer.close();
}
}

@Test
public void testCreateSchemaAfterDeletion() throws Exception {
//init namespace
final String myNamespace = "prop/ns";
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topicName = "persistent://prop/ns/test-create-schema-after-deletion" + UUID.randomUUID();

// create namespace
// Create a topic with `Person`
try (Producer<Person> producer = pulsarClient.newProducer(Schema.AVRO(Person.class))
.topic(topicName)
.create()
) {
Person person = new Person();
person.setName("Tom Hanks");
person.setAge(60);

producer.send(person);

}

// delete the topic
admin.topics().delete(topicName);

try (Producer<Student> ignored = pulsarClient.newProducer(Schema.AVRO(Student.class))
.topic(topicName)
.create()) {
Assert.fail("Should fail to create a the producer with a new schema since the schema is not deleted.");
} catch (PulsarClientException pce) {
Assert.assertTrue(pce instanceof PulsarClientException.IncompatibleSchemaException);
}

// delete the schema
admin.schemas().deleteSchema(topicName);

// after deleting the schema, try to create a topic with a different schema
try (Producer<Student> producer = pulsarClient.newProducer(Schema.AVRO(Student.class))
.topic(topicName)
.create()
) {
Student student = new Student();
student.setName("Tom Jerry");
student.setAge(30);
student.setGpa(10);

producer.send(student);

}
}

@Data
public static class Student {
private String name;
private int age;
private int gpa;
private int grade;

}

@Data
public static class Person {
private String name;
private int age;
}
}

0 comments on commit 0896eda

Please sign in to comment.