diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index 9c18e5a7d3fba..eac6dea32905b 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -151,17 +151,17 @@ protected void handleLookup(CommandLookupTopic lookup) { if (log.isDebugEnabled()) { log.debug("Received Lookup from {}", remoteAddress); } - lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(lookup.getTopic()), - lookup.getAuthoritative(), getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> { + final long requestId = lookup.getRequestId(); + final String topic = lookup.getTopic(); + lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), + getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> { ctx.writeAndFlush(lookupResponse); }).exceptionally(ex -> { // it should never happen - log.warn("[{}] lookup failed with error {}", remoteAddress, ex.getMessage(), ex); - ctx.writeAndFlush( - newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), lookup.getRequestId())); + log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); + ctx.writeAndFlush(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); return null; }); - } @Override @@ -169,21 +169,23 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa if (log.isDebugEnabled()) { log.debug("Received PartitionMetadataLookup from {}", remoteAddress); } - getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), - DestinationName.get(partitionMetadata.getTopic())).thenAccept(metadata -> { + final long requestId = partitionMetadata.getRequestId(); + final String topic = partitionMetadata.getTopic(); + getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic)) + .thenAccept(metadata -> { int partitions = metadata.partitions; - ctx.writeAndFlush( - Commands.newPartitionMetadataResponse(partitions, partitionMetadata.getRequestId())); + ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId)); }).exceptionally(ex -> { if (ex instanceof PulsarClientException) { - log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, - partitionMetadata.getTopic(), ex.getMessage()); + log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, topic, + ex.getMessage()); ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, - ex.getMessage(), partitionMetadata.getRequestId())); + ex.getMessage(), requestId)); } else { - log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, ex.getMessage()); + log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic, + ex.getMessage(), ex); ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, - ex.getMessage(), partitionMetadata.getRequestId())); + ex.getMessage(), requestId)); } return null; }); @@ -238,17 +240,18 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } else { authorizationFuture = CompletableFuture.completedFuture(true); } + final String topicName = subscribe.getTopic(); + final String subscriptionName = subscribe.getSubscription(); + final long requestId = subscribe.getRequestId(); + final long consumerId = subscribe.getConsumerId(); + final SubType subType = subscribe.getSubType(); + final String consumerName = subscribe.getConsumerName(); + authorizationFuture.thenApply(isAuthorized -> { if (isAuthorized) { if (log.isDebugEnabled()) { log.debug("[{}] Client is authorized to subscribe with role {}", remoteAddress, authRole); } - final String topicName = subscribe.getTopic(); - final String subscriptionName = subscribe.getSubscription(); - final long requestId = subscribe.getRequestId(); - final long consumerId = subscribe.getConsumerId(); - final SubType subType = subscribe.getSubType(); - final String consumerName = subscribe.getConsumerName(); log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName); @@ -299,8 +302,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { subscriptionName, exception.getCause().getMessage()); // If client timed out, the future would have been completed by subsequent close. Send error - // back to - // client, only if not completed already. + // back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception.getCause()), @@ -314,7 +316,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } else { String msg = "Client is not authorized to subscribe"; log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); - ctx.writeAndFlush(Commands.newError(subscribe.getRequestId(), ServerError.AuthorizationError, msg)); + ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; }); @@ -330,26 +332,19 @@ protected void handleProducer(final CommandProducer cmdProducer) { } else { authorizationFuture = CompletableFuture.completedFuture(true); } + + // Use producer name provided by client if present + final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() + : service.generateUniqueProducerName(); + final String topicName = cmdProducer.getTopic(); + final long producerId = cmdProducer.getProducerId(); + final long requestId = cmdProducer.getRequestId(); authorizationFuture.thenApply(isAuthorized -> { if (isAuthorized) { if (log.isDebugEnabled()) { log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, authRole); } - final String producerName; - if (cmdProducer.hasProducerName()) { - // Use producer name provided by client - producerName = cmdProducer.getProducerName(); - } else { - // Need to generate a unique id - producerName = service.generateUniqueProducerName(); - } - - final String topicName = cmdProducer.getTopic(); - final long producerId = cmdProducer.getProducerId(); - final long requestId = cmdProducer.getRequestId(); - CompletableFuture producerFuture = new CompletableFuture<>(); - CompletableFuture existingProducerFuture = producers.putIfAbsent(producerId, producerFuture); if (existingProducerFuture != null) { @@ -359,9 +354,11 @@ protected void handleProducer(final CommandProducer cmdProducer) { ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName())); return null; } else { - // There was an early request to create a producer with same producerId. This can happen when + // There was an early request to create a producer with + // same producerId. This can happen when // client - // timeout is lower the broker timeouts. We need to wait until the previous producer creation + // timeout is lower the broker timeouts. We need to wait + // until the previous producer creation // request // either complete or fails. log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName); @@ -374,7 +371,8 @@ protected void handleProducer(final CommandProducer cmdProducer) { log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); service.getTopic(topicName).thenAccept((Topic topic) -> { - // Before creating producer, check if backlog quota exceeded on topic + // Before creating producer, check if backlog quota exceeded + // on topic if (topic.isBacklogQuotaExceeded(producerName)) { IllegalStateException illegalStateException = new IllegalStateException( "Cannot create producer on topic with backlog quota exceeded"); @@ -405,7 +403,8 @@ protected void handleProducer(final CommandProducer cmdProducer) { ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName)); return; } else { - // The producer's future was completed before by a close command + // The producer's future was completed before by + // a close command producer.closeNow(); log.info("[{}] Cleared producer created after timeout on client side {}", remoteAddress, producer); @@ -433,7 +432,8 @@ protected void handleProducer(final CommandProducer cmdProducer) { log.error("[{}] Failed to create topic {}", remoteAddress, topicName, exception); } - // If client timed out, the future would have been completed by subsequent close. Send error back to + // If client timed out, the future would have been completed + // by subsequent close. Send error back to // client, only if not completed already. if (producerFuture.completeExceptionally(exception)) { ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(cause), @@ -446,7 +446,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { } else { String msg = "Client is not authorized to Produce"; log.warn("[{}] {} with role {}", remoteAddress, msg, authRole); - ctx.writeAndFlush(Commands.newError(cmdProducer.getRequestId(), ServerError.AuthorizationError, msg)); + ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg)); } return null; }); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java index 9644f19389338..fa6d914642bfc 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/ServerCnxTest.java @@ -497,6 +497,7 @@ public void testProducerCommandWithAuthorizationNegative() throws Exception { doReturn(authorizationManager).when(brokerService).getAuthorizationManager(); doReturn(true).when(brokerService).isAuthenticationEnabled(); doReturn(true).when(brokerService).isAuthorizationEnabled(); + doReturn("prod1").when(brokerService).generateUniqueProducerName(); resetChannel(); setChannelConnected(); diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BinaryProtoLookupService.java index 9029eeeb61bfd..fadd3aa3977e8 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/BinaryProtoLookupService.java @@ -15,7 +15,6 @@ */ package com.yahoo.pulsar.client.impl; -import static com.yahoo.pulsar.client.impl.PulsarClientImpl.requestIdGenerator; import static java.lang.String.format; import java.net.InetSocketAddress; @@ -34,13 +33,13 @@ class BinaryProtoLookupService implements LookupService { - private final ConnectionPool cnxPool; + private final PulsarClientImpl client; protected final InetSocketAddress serviceAddress; private final boolean useTls; - public BinaryProtoLookupService(ConnectionPool cnxPool, String serviceUrl, boolean useTls) + public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls) throws PulsarClientException { - this.cnxPool = cnxPool; + this.client = client; this.useTls = useTls; URI uri; try { @@ -75,8 +74,8 @@ private CompletableFuture findBroker(InetSocketAddress socket DestinationName destination) { CompletableFuture addressFuture = new CompletableFuture(); - cnxPool.getConnection(socketAddress).thenAccept(clientCnx -> { - long requestId = requestIdGenerator.getAndIncrement(); + client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + long requestId = client.newRequestId(); ByteBuf request = Commands.newLookup(destination.toString(), authoritative, requestId); clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> { URI uri = null; @@ -133,8 +132,8 @@ private CompletableFuture getPartitionedTopicMetadata( CompletableFuture partitionFuture = new CompletableFuture(); - cnxPool.getConnection(socketAddress).thenAccept(clientCnx -> { - long requestId = requestIdGenerator.getAndIncrement(); + client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + long requestId = client.newRequestId(); ByteBuf request = Commands.newPartitionMetadataRequest(destination.toString(), requestId); clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> { try { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java index 50a5d308a52c5..679af2d0e7224 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java @@ -71,7 +71,7 @@ enum State { private final AtomicLong producerIdGenerator = new AtomicLong(); private final AtomicLong consumerIdGenerator = new AtomicLong(); - protected static final AtomicLong requestIdGenerator = new AtomicLong(); + private final AtomicLong requestIdGenerator = new AtomicLong(); private final EventLoopGroup eventLoopGroup; @@ -93,7 +93,7 @@ public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGr conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath()); lookup = new HttpLookupService(httpClient, conf.isUseTls()); } else { - lookup = new BinaryProtoLookupService(cnxPool, serviceUrl, conf.isUseTls()); + lookup = new BinaryProtoLookupService(this, serviceUrl, conf.isUseTls()); } timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); externalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-external-listener"); @@ -365,6 +365,10 @@ long newRequestId() { return requestIdGenerator.getAndIncrement(); } + ConnectionPool getCnxPool() { + return cnxPool; + } + EventLoopGroup eventLoopGroup() { return eventLoopGroup; }