Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Persistent Topics v3 API - use custom media type instead #14117

Merged
merged 4 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
Expand Down Expand Up @@ -243,6 +244,49 @@ public void createPartitionedTopic(
}
}

@PUT
@Consumes(PartitionedTopicMetadata.MEDIA_TYPE)
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Create a partitioned topic.",
notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and"
+ " less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412,
message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createPartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The metadata for the topic",
required = true, type = "PartitionedTopicMetadata") PartitionedTopicMetadata metadata,
@QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
try {
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
validateCreateTopic(topicName);
internalCreatePartitionedTopic(asyncResponse, metadata.partitions, createLocalTopicOnly,
metadata.properties);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}

@PUT
@Path("/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Create a non-partitioned topic.",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {

private PersistentTopics persistentTopics;
private org.apache.pulsar.broker.admin.v3.PersistentTopics persistentTopicsV3;
private final String testTenant = "my-tenant";
private final String testLocalCluster = "use";
private final String testNamespace = "my-namespace";
Expand All @@ -124,23 +123,13 @@ protected void setup() throws Exception {
persistentTopics = spy(PersistentTopics.class);
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
persistentTopicsV3 = spy(org.apache.pulsar.broker.admin.v3.PersistentTopics.class);
persistentTopicsV3.setServletContext(new MockServletContext());
persistentTopicsV3.setPulsar(pulsar);
doReturn(false).when(persistentTopics).isRequestHttps();
doReturn(null).when(persistentTopics).originalPrincipal();
doReturn("test").when(persistentTopics).clientAppId();
doReturn(TopicDomain.persistent.value()).when(persistentTopics).domain();
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();

doReturn(false).when(persistentTopicsV3).isRequestHttps();
doReturn(null).when(persistentTopicsV3).originalPrincipal();
doReturn("test").when(persistentTopicsV3).clientAppId();
doReturn(TopicDomain.persistent.value()).when(persistentTopicsV3).domain();
doNothing().when(persistentTopicsV3).validateAdminAccessForTenant(this.testTenant);
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopicsV3).clientAuthData();

nonPersistentTopic = spy(NonPersistentTopics.class);
nonPersistentTopic.setServletContext(new MockServletContext());
nonPersistentTopic.setPulsar(pulsar);
Expand Down Expand Up @@ -448,7 +437,7 @@ public void testCreatePartitionedTopic() {
Map<String, String> topicMetadata = Maps.newHashMap();
topicMetadata.put("key1", "value1");
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
persistentTopicsV3.createPartitionedTopic(response, testTenant, testNamespace, topicName2, metadata, true);
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName2, metadata, true);
Awaitility.await().untilAsserted(() -> {
PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
testTenant, testNamespace, topicName2, true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* Metadata of a partitioned topic.
*/
public class PartitionedTopicMetadata {
public static final String MEDIA_TYPE = "application/vnd.partitioned-topic-metadata+json";

/* Number of partitions for the topic */
public int partitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
public class TopicsImpl extends BaseResource implements Topics {
private final WebTarget adminTopics;
private final WebTarget adminV2Topics;
private final WebTarget adminV3Topics;
// CHECKSTYLE.OFF: MemberName
private static final String BATCH_HEADER = "X-Pulsar-num-batch-message";
private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size";
Expand Down Expand Up @@ -133,7 +132,6 @@ public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminTopics = web.path("/admin");
adminV2Topics = web.path("/admin/v2");
adminV3Topics = web.path("/admin/v3");
}

@Override
Expand Down Expand Up @@ -347,12 +345,12 @@ public CompletableFuture<Void> createPartitionedTopicAsync(
String topic, int numPartitions, boolean createLocalTopicOnly, Map<String, String> properties) {
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, properties, "partitions")
WebTarget path = topicPath(tn, "partitions")
.queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly));
Entity entity;
if (properties != null) {
if (properties != null && !properties.isEmpty()) {
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions, properties);
entity = Entity.entity(metadata, MediaType.APPLICATION_JSON);
entity = Entity.entity(metadata, MediaType.valueOf(PartitionedTopicMetadata.MEDIA_TYPE));
} else {
entity = Entity.entity(numPartitions, MediaType.APPLICATION_JSON);
}
Expand Down Expand Up @@ -1251,22 +1249,6 @@ private WebTarget namespacePath(String domain, NamespaceName namespace, String..
return namespacePath;
}

/**
* As we support topic metadata, user can add some properties when create topic.
* For compatibility, we have to define a new method, so when metadata is not null, v3 will be called.
* Details could be found here : https://github.com/apache/pulsar/pull/12818#discussion_r789340203
* @param topic
* @param metadata
* @param parts
* @return
*/
private WebTarget topicPath(TopicName topic, Map<String, String> metadata, String... parts) {
final WebTarget base = metadata != null ? adminV3Topics : (topic.isV2() ? adminV2Topics : adminTopics);
WebTarget topicPath = base.path(topic.getRestPath());
topicPath = WebTargets.addParts(topicPath, parts);
return topicPath;
}

private WebTarget topicPath(TopicName topic, String... parts) {
final WebTarget base = topic.isV2() ? adminV2Topics : adminTopics;
WebTarget topicPath = base.path(topic.getRestPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,9 @@ private class CreatePartitionedCmd extends CliCommand {
@Override
void run() throws Exception {
String topic = validateTopicName(params);
Map<String, String> map = new HashMap<>();
if (metadata != null) {
Map<String, String> map = null;
if (metadata != null && !metadata.isEmpty()) {
map = new HashMap<>();
lhotari marked this conversation as resolved.
Show resolved Hide resolved
for (String property : metadata) {
if (!property.contains("=")) {
throw new ParameterException(String.format("Invalid key value pair '%s', "
Expand Down