From 6b2938223cf45a9298f9d40ab6ae108bea9a5a6d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 28 Mar 2024 09:58:53 -0700 Subject: [PATCH] [improve] PIP-342: OTel client metrics support (#22179) --- .../shell/src/assemble/LICENSE.bin.txt | 4 + pulsar-broker/pom.xml | 6 + .../broker/service/BrokerServiceTest.java | 4 +- .../service/BrokerServiceThrottlingTest.java | 3 +- .../service/EnableProxyProtocolTest.java | 5 +- .../api/InjectedClientCnxClientBuilder.java | 3 +- ...ListenersWithInternalListenerNameTest.java | 5 +- .../client/impl/ConnectionPoolTest.java | 18 +- .../client/impl/MessageChecksumTest.java | 3 +- .../client/impl/MessageChunkingTest.java | 3 +- .../impl/PatternTopicsConsumerImplTest.java | 3 +- .../pulsar/client/impl/PulsarTestClient.java | 5 +- .../client/metrics/ClientMetricsTest.java | 336 +++++++++++++ pulsar-client-api/pom.xml | 6 + .../pulsar/client/api/ClientBuilder.java | 22 + .../pulsar/client/api/ConsumerStats.java | 6 +- .../pulsar/client/api/ProducerStats.java | 6 +- pulsar-client/pom.xml | 10 + .../impl/BatchMessageContainerImpl.java | 6 +- .../client/impl/BinaryProtoLookupService.java | 34 ++ .../pulsar/client/impl/ClientBuilderImpl.java | 7 + .../apache/pulsar/client/impl/ClientCnx.java | 25 +- .../pulsar/client/impl/ConnectionPool.java | 29 +- .../pulsar/client/impl/ConsumerImpl.java | 58 +++ .../pulsar/client/impl/HttpLookupService.java | 66 ++- .../pulsar/client/impl/ProducerImpl.java | 475 ++++++++++-------- .../pulsar/client/impl/PulsarClientImpl.java | 17 +- .../client/impl/UnAckedMessageTracker.java | 18 +- .../impl/conf/ClientConfigurationData.java | 3 + .../pulsar/client/impl/metrics/Counter.java | 60 +++ .../impl/metrics/InstrumentProvider.java | 58 +++ .../client/impl/metrics/LatencyHistogram.java | 110 ++++ .../client/impl/metrics/MetricsUtil.java | 59 +++ .../pulsar/client/impl/metrics/Unit.java | 59 +++ .../client/impl/metrics/UpDownCounter.java | 68 +++ .../client/impl/metrics/package-info.java | 23 + .../AcknowledgementsGroupingTrackerTest.java | 5 +- .../impl/BinaryProtoLookupServiceTest.java | 2 + .../ClientCnxRequestTimeoutQueueTest.java | 3 +- .../pulsar/client/impl/ClientCnxTest.java | 19 +- .../client/impl/OpSendMsgQueueTest.java | 3 +- .../impl/PartitionedProducerImplTest.java | 2 + .../pulsar/client/impl/ProducerImplTest.java | 2 + .../client/impl/PulsarClientImplTest.java | 5 +- .../impl/UnAckedMessageTrackerTest.java | 3 + .../conf/ClientConfigurationDataTest.java | 32 ++ .../pulsar/proxy/server/ProxyClientCnx.java | 3 +- .../pulsar/proxy/server/ProxyConnection.java | 6 +- .../pulsar/proxy/server/ProxyParserTest.java | 5 +- .../apache/pulsar/proxy/server/ProxyTest.java | 5 +- pulsar-testclient/pom.xml | 17 + .../pulsar/testclient/PerfClientUtils.java | 5 +- 52 files changed, 1476 insertions(+), 264 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Counter.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/LatencyHistogram.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Unit.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/UpDownCounter.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/package-info.java diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 7e3ebbe06358d..9042257f34c67 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -387,6 +387,10 @@ The Apache Software License, Version 2.0 - log4j-core-2.18.0.jar - log4j-slf4j-impl-2.18.0.jar - log4j-web-2.18.0.jar + * OpenTelemetry + - opentelemetry-api-1.34.1.jar + - opentelemetry-context-1.34.1.jar + - opentelemetry-extension-incubator-1.34.1-alpha.jar * BookKeeper - bookkeeper-common-allocator-4.16.4.jar diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 8264459c6d9ab..e15e024ea8158 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -149,6 +149,12 @@ ${project.version} + + io.opentelemetry + opentelemetry-sdk-testing + test + + ${project.groupId} pulsar-io-batch-discovery-triggerers diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index b6a73274f440b..8ebba5c9aeabd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -99,6 +99,7 @@ import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -997,7 +998,8 @@ public void testLookupThrottlingForClientByClient() throws Exception { // Using an AtomicReference in order to reset a new CountDownLatch AtomicReference latchRef = new AtomicReference<>(); latchRef.set(new CountDownLatch(1)); - try (ConnectionPool pool = new ConnectionPool(conf, eventLoop, () -> new ClientCnx(conf, eventLoop) { + try (ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop, + () -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop) { @Override protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java index 707c350feb59c..0d517c014b315 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.testng.annotations.AfterMethod; @@ -197,7 +198,7 @@ public void testLookupThrottlingForClientByBroker() throws Exception { EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false, new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon())); ExecutorService executor = Executors.newFixedThreadPool(10); - try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) { + try (ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop)) { final int totalConsumers = 20; List> futures = new ArrayList<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java index a596e1ed32d6b..725b895fe6e14 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/EnableProxyProtocolTest.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; @@ -99,7 +100,7 @@ public void testProxyProtocol() throws Exception { ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); @Cleanup PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder, - (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { public void channelActive(ChannelHandlerContext ctx) throws Exception { byte[] bs = "PROXY TCP4 198.51.100.22 203.0.113.7 35646 80\r\n".getBytes(); ctx.writeAndFlush(Unpooled.copiedBuffer(bs)); @@ -124,7 +125,7 @@ public void testPubSubWhenSlowNetwork() throws Exception { ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); @Cleanup PulsarClientImpl protocolClient = InjectedClientCnxClientBuilder.create(clientBuilder, - (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { public void channelActive(ChannelHandlerContext ctx) throws Exception { Thread task = new Thread(() -> { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java index 2a7908242707b..13447e089eab8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InjectedClientCnxClientBuilder.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -42,7 +43,7 @@ public static PulsarClientImpl create(final ClientBuilderImpl clientBuilder, EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), conf.isEnableBusyWait(), threadFactory); // Inject into ClientCnx. - ConnectionPool pool = new ConnectionPool(conf, eventLoopGroup, + ConnectionPool pool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup, () -> clientCnxFactory.generate(conf, eventLoopGroup)); return new InjectedClientCnxPulsarClientImpl(conf, eventLoopGroup, pool); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java index 956b834e33435..a076e20b33218 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -137,7 +138,7 @@ private void doFindBrokerWithListenerName(boolean useHttp) throws Exception { conf.setMaxLookupRedirects(10); @Cleanup - LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) : + LookupService lookupService = useHttp ? new HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors) : new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient, lookupUrl.toString(), "internal", false, this.executorService); TopicName topicName = TopicName.get("persistent://public/default/test"); @@ -172,7 +173,7 @@ public void testHttpLookupRedirect() throws Exception { conf.setMaxLookupRedirects(10); @Cleanup - HttpLookupService lookupService = new HttpLookupService(conf, eventExecutors); + HttpLookupService lookupService = new HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors); NamespaceService namespaceService = pulsar.getNamespaceService(); LookupResult lookupResult = new LookupResult(pulsar.getWebServiceAddress(), null, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java index 79ffada4a90c8..1037019d608ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; @@ -68,7 +69,8 @@ protected void cleanup() throws Exception { public void testSingleIpAddress() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test")); - ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); + ConnectionPool pool = + spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop); conf.setServiceUrl(serviceUrl); PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool); @@ -118,7 +120,7 @@ public void testSelectConnectionForSameProducer() throws Exception { public void testDoubleIpAddress() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test")); - ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); + ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop); conf.setServiceUrl(serviceUrl); PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool); @@ -143,7 +145,8 @@ public void testNoConnectionPool() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setConnectionsPerBroker(0); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test")); - ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); + ConnectionPool pool = + spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop); InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); @@ -166,7 +169,8 @@ public void testEnableConnectionPool() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setConnectionsPerBroker(5); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(8, false, new DefaultThreadFactory("test")); - ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop); + ConnectionPool pool = + spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop); InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved("127.0.0.1", brokerPort); @@ -233,8 +237,10 @@ protected void doResolveAll(SocketAddress socketAddress, Promise promise) throws } }; - ConnectionPool pool = spyWithClassAndConstructorArgs(ConnectionPool.class, conf, eventLoop, - (Supplier) () -> new ClientCnx(conf, eventLoop), Optional.of(resolver)); + ConnectionPool pool = + spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop, + (Supplier) () -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop), + Optional.of(resolver)); ClientCnx cnx = pool.getConnection( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java index 515b34db8509d..0b25e3409563a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChecksumTest.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; +import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.protocol.ByteBufPair; @@ -233,7 +234,7 @@ public void testTamperingMessageIsDetected() throws Exception { // WHEN // protocol message is created with checksum ByteBufPair cmd = Commands.newSend(1, 1, 1, ChecksumType.Crc32c, msgMetadata, payload); - OpSendMsg op = OpSendMsg.create((MessageImpl) msgBuilder.getMessage(), cmd, 1, null); + OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, (MessageImpl) msgBuilder.getMessage(), cmd, 1, null); // THEN // the checksum validation passes diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index 6686edd2b67d2..da359a6aeb9c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -56,6 +56,7 @@ import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.MessageImpl.SchemaState; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; +import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.PublisherStats; import org.apache.pulsar.common.protocol.ByteBufPair; @@ -499,7 +500,7 @@ public void testExpireIncompleteChunkMessage() throws Exception{ ByteBufPair cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload); MessageImpl msgImpl = ((MessageImpl) msg.getMessage()); msgImpl.setSchemaState(SchemaState.Ready); - OpSendMsg op = OpSendMsg.create(msgImpl, cmd, 1, null); + OpSendMsg op = OpSendMsg.create(LatencyHistogram.NOOP, msgImpl, cmd, 1, null); producer.processOpSendMsg(op); retryStrategically((test) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 7707abafde8de..94d78e418ab87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.naming.NamespaceName; @@ -811,7 +812,7 @@ public void testPreciseRegexpSubscribeDisabledTopicWatcher(boolean partitioned) private PulsarClient createDelayWatchTopicsClient() throws Exception { ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); return InjectedClientCnxClientBuilder.create(clientBuilder, - (conf, eventLoopGroup) -> new ClientCnx(conf, eventLoopGroup) { + (conf, eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { public CompletableFuture newWatchTopicList( BaseCommand command, long requestId) { // Inject 2 seconds delay when sending command New Watch Topics. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java index ab273913fde29..8a79eb502439f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; @@ -79,7 +80,7 @@ public static PulsarTestClient create(ClientBuilder clientBuilder) throws Pulsar new DefaultThreadFactory("pulsar-test-client-io", Thread.currentThread().isDaemon())); AtomicReference> clientCnxSupplierReference = new AtomicReference<>(); - ConnectionPool connectionPool = new ConnectionPool(clientConfigurationData, eventLoopGroup, + ConnectionPool connectionPool = new ConnectionPool(InstrumentProvider.NOOP, clientConfigurationData, eventLoopGroup, () -> clientCnxSupplierReference.get().get()); return new PulsarTestClient(clientConfigurationData, eventLoopGroup, connectionPool, @@ -101,7 +102,7 @@ private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup eventLoopG * @return new ClientCnx instance */ protected ClientCnx createClientCnx() { - return new ClientCnx(conf, eventLoopGroup) { + return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { @Override public int getRemoteEndpointProtocolVersion() { return overrideRemoteEndpointProtocolVersion != 0 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java new file mode 100644 index 0000000000000..31305123c4148 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.metrics; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.assertj.core.api.Assertions; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-api") +public class ClientMetricsTest extends ProducerConsumerBase { + + InMemoryMetricReader reader; + OpenTelemetry otel; + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + + this.reader = InMemoryMetricReader.create(); + SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder() + .registerMetricReader(reader) + .build(); + this.otel = OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + private Map collectMetrics() { + Map metrics = new TreeMap<>(); + for (MetricData md : reader.collectAllMetrics()) { + metrics.put(md.getName(), md); + } + return metrics; + } + + private void assertCounterValue(Map metrics, String name, long expectedValue, + Attributes expectedAttributes) { + assertEquals(getCounterValue(metrics, name, expectedAttributes), expectedValue); + } + + private long getCounterValue(Map metrics, String name, + Attributes expectedAttributes) { + MetricData md = metrics.get(name); + assertNotNull(md, "metric not found: " + name); + assertEquals(md.getType(), MetricDataType.LONG_SUM); + + for (var ex : md.getLongSumData().getPoints()) { + if (ex.getAttributes().equals(expectedAttributes)) { + return ex.getValue(); + } + } + + fail("metric attributes not found: " + expectedAttributes); + return -1; + } + + private void assertHistoCountValue(Map metrics, String name, long expectedCount, + Attributes expectedAttributes) { + assertEquals(getHistoCountValue(metrics, name, expectedAttributes), expectedCount); + } + + private long getHistoCountValue(Map metrics, String name, + Attributes expectedAttributes) { + MetricData md = metrics.get(name); + assertNotNull(md, "metric not found: " + name); + assertEquals(md.getType(), MetricDataType.HISTOGRAM); + + for (var ex : md.getHistogramData().getPoints()) { + if (ex.getAttributes().equals(expectedAttributes)) { + return ex.getCount(); + } + } + + fail("metric attributes not found: " + expectedAttributes); + return -1; + } + + @Test + public void testProducerMetrics() throws Exception { + String topic = newTopicName(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(otel) + .build(); + + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + for (int i = 0; i < 5; i++) { + producer.send("Hello"); + } + + Attributes nsAttrs = Attributes.builder() + .put("pulsar.tenant", "my-property") + .put("pulsar.namespace", "my-property/my-ns") + .build(); + Attributes nsAttrsSuccess = nsAttrs.toBuilder() + .put("pulsar.response.status", "success") + .build(); + + var metrics = collectMetrics(); + + assertCounterValue(metrics, "pulsar.client.connection.opened", 1, Attributes.empty()); + assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs); + assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs); + + assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1, + Attributes.builder() + .put("pulsar.lookup.transport-type", "binary") + .put("pulsar.lookup.type", "topic") + .put("pulsar.response.status", "success") + .build()); + assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 1, + Attributes.builder() + .put("pulsar.lookup.transport-type", "binary") + .put("pulsar.lookup.type", "metadata") + .put("pulsar.response.status", "success") + .build()); + + assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 5, nsAttrsSuccess); + assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 5, nsAttrsSuccess); + assertCounterValue(metrics, "pulsar.client.producer.message.send.size", "hello".length() * 5, nsAttrs); + + + assertCounterValue(metrics, "pulsar.client.producer.opened", 1, nsAttrs); + + producer.close(); + client.close(); + + metrics = collectMetrics(); + assertCounterValue(metrics, "pulsar.client.producer.closed", 1, nsAttrs); + assertCounterValue(metrics, "pulsar.client.connection.closed", 1, Attributes.empty()); + } + + @Test + public void testConnectionsFailedMetrics() throws Exception { + String topic = newTopicName(); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://invalid-pulsar-address:1234") + .operationTimeout(3, TimeUnit.SECONDS) + .openTelemetry(otel) + .build(); + + Assertions.assertThatThrownBy(() -> { + client.newProducer(Schema.STRING) + .topic(topic) + .create(); + }).isInstanceOf(Exception.class); + + + var metrics = collectMetrics(); + + Assertions.assertThat( + getCounterValue(metrics, "pulsar.client.connection.failed", + Attributes.builder().put("pulsar.failure.type", "tcp-failed").build())) + .isGreaterThanOrEqualTo(1L); + } + + @Test + public void testPublishFailedMetrics() throws Exception { + String topic = newTopicName(); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(admin.getServiceUrl()) + .operationTimeout(3, TimeUnit.SECONDS) + .openTelemetry(otel) + .build(); + + @Cleanup + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .sendTimeout(3, TimeUnit.SECONDS) + .create(); + + // Make the client switch to non-existing broker to make publish fail + client.updateServiceUrl("pulsar://invalid-address:6650"); + + + try { + producer.send("Hello"); + fail("Should have failed to publish"); + } catch (Exception e) { + // expected + } + + var metrics = collectMetrics(); + + Attributes nsAttrs = Attributes.builder() + .put("pulsar.tenant", "my-property") + .put("pulsar.namespace", "my-property/my-ns") + .build(); + Attributes nsAttrsFailure = nsAttrs.toBuilder() + .put("pulsar.response.status", "failed") + .build(); + + assertCounterValue(metrics, "pulsar.client.producer.message.pending.count", 0, nsAttrs); + assertCounterValue(metrics, "pulsar.client.producer.message.pending.size", 0, nsAttrs); + assertHistoCountValue(metrics, "pulsar.client.producer.message.send.duration", 1, nsAttrsFailure); + assertHistoCountValue(metrics, "pulsar.client.producer.rpc.send.duration", 1, nsAttrsFailure); + } + + @Test + public void testConsumerMetrics() throws Exception { + String topic = newTopicName(); + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(otel) + .build(); + + @Cleanup + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("my-sub") + .ackTimeout(1, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.send("Hello"); + } + + Thread.sleep(1000); + + Attributes nsAttrs = Attributes.builder() + .put("pulsar.tenant", "my-property") + .put("pulsar.namespace", "my-property/my-ns") + .put("pulsar.subscription", "my-sub") + .build(); + var metrics = collectMetrics(); + + assertCounterValue(metrics, "pulsar.client.connection.opened", 1, Attributes.empty()); + + assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2, + Attributes.builder() + .put("pulsar.lookup.transport-type", "binary") + .put("pulsar.lookup.type", "topic") + .put("pulsar.response.status", "success") + .build()); + assertHistoCountValue(metrics, "pulsar.client.lookup.duration", 2, + Attributes.builder() + .put("pulsar.lookup.transport-type", "binary") + .put("pulsar.lookup.type", "metadata") + .put("pulsar.response.status", "success") + .build()); + + assertCounterValue(metrics, "pulsar.client.consumer.receive_queue.count", 10, nsAttrs); + assertCounterValue(metrics, "pulsar.client.consumer.receive_queue.size", "hello".length() * 10, nsAttrs); + assertCounterValue(metrics, "pulsar.client.consumer.opened", 1, nsAttrs); + + Message msg1 = consumer.receive(); + consumer.acknowledge(msg1); + + Message msg2 = consumer.receive(); + consumer.negativeAcknowledge(msg2); + + /* Message msg3 = */ consumer.receive(); + + metrics = collectMetrics(); + assertCounterValue(metrics, "pulsar.client.consumer.receive_queue.count", 7, nsAttrs); + assertCounterValue(metrics, "pulsar.client.consumer.receive_queue.size", "hello".length() * 7, nsAttrs); + assertCounterValue(metrics, "pulsar.client.consumer.message.received.count", 3, nsAttrs); + assertCounterValue(metrics, "pulsar.client.consumer.message.received.size", "hello".length() * 3, nsAttrs); + + + // Let msg3 to reach ack-timeout + Thread.sleep(3000); + + metrics = collectMetrics(); + assertCounterValue(metrics, "pulsar.client.consumer.receive_queue.count", 8, nsAttrs); + assertCounterValue(metrics, "pulsar.client.consumer.receive_queue.size", "hello".length() * 8, nsAttrs); + + assertCounterValue(metrics, "pulsar.client.consumer.message.ack", 1, nsAttrs); + assertCounterValue(metrics, "pulsar.client.consumer.message.nack", 1, nsAttrs); + assertCounterValue(metrics, "pulsar.client.consumer.message.ack.timeout", 1, nsAttrs); + + client.close(); + + metrics = collectMetrics(); + assertCounterValue(metrics, "pulsar.client.consumer.closed", 1, nsAttrs); + assertCounterValue(metrics, "pulsar.client.connection.closed", 1, Attributes.empty()); + } +} diff --git a/pulsar-client-api/pom.xml b/pulsar-client-api/pom.xml index d8b51713da832..35bdf73374b3e 100644 --- a/pulsar-client-api/pom.xml +++ b/pulsar-client-api/pom.xml @@ -46,6 +46,12 @@ protobuf-java provided + + + io.opentelemetry + opentelemetry-api + provided + diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index b180f6ba7f906..735aeeed55916 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import io.opentelemetry.api.OpenTelemetry; import java.io.Serializable; import java.net.InetSocketAddress; import java.time.Clock; @@ -459,7 +460,10 @@ ClientBuilder authentication(String authPluginClassName, Map aut * @param unit * time unit for {@code statsInterval} * @return the client builder instance + * + * @deprecated @see {@link #openTelemetry(OpenTelemetry)} */ + @Deprecated ClientBuilder statsInterval(long statsInterval, TimeUnit unit); /** @@ -554,6 +558,24 @@ ClientBuilder authentication(String authPluginClassName, Map aut */ ClientBuilder enableBusyWait(boolean enableBusyWait); + /** + * Configure OpenTelemetry for Pulsar Client + *

+ * When you pass an OpenTelemetry instance, Pulsar client will emit metrics that can be exported in a variety + * of different methods. + *

+ * Refer to OpenTelemetry Java SDK documentation for + * how to configure OpenTelemetry and the metrics exporter. + *

+ * By default, Pulsar client will use the {@link io.opentelemetry.api.GlobalOpenTelemetry} instance. If an + * OpenTelemetry JVM agent is configured, the metrics will be reported, otherwise the metrics will be + * completely disabled. + * + * @param openTelemetry the OpenTelemetry instance + * @return the client builder instance + */ + ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry); + /** * The clock used by the pulsar client. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java index 7935e05d55b66..e488aa81151ce 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import io.opentelemetry.api.OpenTelemetry; import java.io.Serializable; import java.util.Collections; import java.util.Map; @@ -29,9 +30,12 @@ * *

All the stats are relative to the last recording period. The interval of the stats refreshes is configured with * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute. + * + * @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to enable stats */ @InterfaceAudience.Public -@InterfaceStability.Stable +@InterfaceStability.Evolving +@Deprecated public interface ConsumerStats extends Serializable { /** diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java index a26c20e740d37..9a9ade73669dd 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerStats.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import io.opentelemetry.api.OpenTelemetry; import java.io.Serializable; import java.util.Collections; import java.util.Map; @@ -29,9 +30,12 @@ * *

All the stats are relative to the last recording period. The interval of the stats refreshes is configured with * {@link ClientBuilder#statsInterval(long, java.util.concurrent.TimeUnit)} with a default of 1 minute. + * + * @deprecated use {@link ClientBuilder#openTelemetry(OpenTelemetry)} to enable stats */ @InterfaceAudience.Public -@InterfaceStability.Stable +@InterfaceStability.Evolving +@Deprecated public interface ProducerStats extends Serializable { /** * @return the number of messages published in the last interval diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index 7424b12db5aa2..3917e2996e180 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -52,6 +52,16 @@ pkg + + io.opentelemetry + opentelemetry-api + + + + io.opentelemetry + opentelemetry-extension-incubator + + ${project.groupId} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index fc5c3a3c6798b..a3c9d1bc9ab48 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -263,8 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException { // Because when invoke `ProducerImpl.processOpSendMsg` on flush, // if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush // messageContainers before publishing this one-batch message. - op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(), firstCallback, - batchAllocatedSizeBytes); + op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(), + firstCallback, batchAllocatedSizeBytes); // NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the // ProducerStats @@ -314,7 +314,7 @@ public OpSendMsg createOpSendMsg() throws IOException { messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes()); } - OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(), + OpSendMsg op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(), messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes); op.setNumMessagesInBatch(numMessagesInBatch); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index bdf00844c1cd2..81c196c731f70 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -20,6 +20,7 @@ import static java.lang.String.format; import io.netty.buffer.ByteBuf; +import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; @@ -34,6 +35,7 @@ import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; @@ -63,6 +65,11 @@ public class BinaryProtoLookupService implements LookupService { private final ConcurrentHashMap> partitionedMetadataInProgress = new ConcurrentHashMap<>(); + private final LatencyHistogram histoGetBroker; + private final LatencyHistogram histoGetTopicMetadata; + private final LatencyHistogram histoGetSchema; + private final LatencyHistogram histoListTopics; + public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, @@ -84,6 +91,15 @@ public BinaryProtoLookupService(PulsarClientImpl client, this.serviceNameResolver = new PulsarServiceNameResolver(); this.listenerName = listenerName; updateServiceUrl(serviceUrl); + + LatencyHistogram histo = client.instrumentProvider().newLatencyHistogram("pulsar.client.lookup.duration", + "Duration of lookup operations", null, + Attributes.builder().put("pulsar.lookup.transport-type", "binary").build()); + histoGetBroker = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build()); + histoGetTopicMetadata = + histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build()); + histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build()); + histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); } @Override @@ -99,12 +115,20 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException { * @return broker-socket-address that serves given topic */ public CompletableFuture getBroker(TopicName topicName) { + long startTime = System.nanoTime(); final MutableObject newFutureCreated = new MutableObject<>(); try { return lookupInProgress.computeIfAbsent(topicName, tpName -> { CompletableFuture newFuture = findBroker(serviceNameResolver.resolveHost(), false, topicName, 0); newFutureCreated.setValue(newFuture); + + newFuture.thenRun(() -> { + histoGetBroker.recordSuccess(System.nanoTime() - startTime); + }).exceptionally(x -> { + histoGetBroker.recordFailure(System.nanoTime() - startTime); + return null; + }); return newFuture; }); } finally { @@ -224,6 +248,7 @@ private CompletableFuture findBroker(InetSocketAddress socket private CompletableFuture getPartitionedTopicMetadata(InetSocketAddress socketAddress, TopicName topicName) { + long startTime = System.nanoTime(); CompletableFuture partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { @@ -231,11 +256,13 @@ private CompletableFuture getPartitionedTopicMetadata( ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { if (t != null) { + histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime); log.warn("[{}] failed to get Partitioned metadata : {}", topicName, t.getMessage(), t); partitionFuture.completeExceptionally(t); } else { try { + histoGetTopicMetadata.recordSuccess(System.nanoTime() - startTime); partitionFuture.complete(new PartitionedTopicMetadata(r.partitions)); } catch (Exception e) { partitionFuture.completeExceptionally(new PulsarClientException.LookupException( @@ -263,6 +290,7 @@ public CompletableFuture> getSchema(TopicName topicName) { @Override public CompletableFuture> getSchema(TopicName topicName, byte[] version) { + long startTime = System.nanoTime(); CompletableFuture> schemaFuture = new CompletableFuture<>(); if (version != null && version.length == 0) { schemaFuture.completeExceptionally(new SchemaSerializationException("Empty schema version")); @@ -275,10 +303,12 @@ public CompletableFuture> getSchema(TopicName topicName, by Optional.ofNullable(BytesSchemaVersion.of(version))); clientCnx.sendGetSchema(request, requestId).whenComplete((r, t) -> { if (t != null) { + histoGetSchema.recordFailure(System.nanoTime() - startTime); log.warn("[{}] failed to get schema : {}", topicName, t.getMessage(), t); schemaFuture.completeExceptionally(t); } else { + histoGetSchema.recordSuccess(System.nanoTime() - startTime); schemaFuture.complete(r); } client.getCnxPool().releaseConnection(clientCnx); @@ -326,6 +356,8 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, Mode mode, String topicsPattern, String topicsHash) { + long startTime = System.nanoTime(); + client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( @@ -333,8 +365,10 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, clientCnx.newGetTopicsOfNamespace(request, requestId).whenComplete((r, t) -> { if (t != null) { + histoListTopics.recordFailure(System.nanoTime() - startTime); getTopicsResultFuture.completeExceptionally(t); } else { + histoListTopics.recordSuccess(System.nanoTime() - startTime); if (log.isDebugEnabled()) { log.debug("[namespace: {}] Success get topics list in request: {}", namespace, requestId); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 9a86d81c93fab..2548a52aa95a8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; +import io.opentelemetry.api.OpenTelemetry; import java.net.InetSocketAddress; import java.time.Clock; import java.util.List; @@ -121,6 +122,12 @@ public ClientBuilder authentication(Authentication authentication) { return this; } + @Override + public ClientBuilder openTelemetry(OpenTelemetry openTelemetry) { + conf.setOpenTelemetry(openTelemetry); + return this; + } + @Override public ClientBuilder authentication(String authPluginClassName, String authParamsString) throws UnsupportedAuthenticationException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 938a0b4d8f683..03e0f406dd2f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -32,6 +32,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.ssl.SslHandshakeCompletionEvent; import io.netty.util.concurrent.Promise; +import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -60,6 +61,9 @@ import org.apache.pulsar.client.api.PulsarClientException.TimeoutException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.Counter; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.client.impl.metrics.Unit; import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler; import org.apache.pulsar.client.util.TimedCompletableFuture; @@ -201,6 +205,9 @@ protected enum State { None, SentConnectFrame, Ready, Failed, Connecting } + private final Counter connectionsOpenedCounter; + private final Counter connectionsClosedCounter; + private static class RequestTime { private final long creationTimeNanos; final long requestId; @@ -236,12 +243,13 @@ String getDescription() { } } - - public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { - this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion()); + public ClientCnx(InstrumentProvider instrumentProvider, + ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { + this(instrumentProvider, conf, eventLoopGroup, Commands.getCurrentProtocolVersion()); } - public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) { + public ClientCnx(InstrumentProvider instrumentProvider, ClientConfigurationData conf, EventLoopGroup eventLoopGroup, + int protocolVersion) { super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS); checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest()); this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), false); @@ -257,11 +265,19 @@ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, in this.idleState = new ClientCnxIdleState(this); this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion() + (conf.getDescription() == null ? "" : ("-" + conf.getDescription())); + this.connectionsOpenedCounter = + instrumentProvider.newCounter("pulsar.client.connection.opened", Unit.Connections, + "The number of connections opened", null, Attributes.empty()); + this.connectionsClosedCounter = + instrumentProvider.newCounter("pulsar.client.connection.closed", Unit.Connections, + "The number of connections closed", null, Attributes.empty()); + } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); + connectionsOpenedCounter.increment(); this.localAddress = ctx.channel().localAddress(); this.remoteAddress = ctx.channel().remoteAddress(); @@ -304,6 +320,7 @@ protected ByteBuf newConnectCommand() throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); + connectionsClosedCounter.increment(); lastDisconnectedTimestamp = System.currentTimeMillis(); log.info("{} Disconnected", ctx.channel()); if (!connectionFuture.isDone()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 850e805067d12..d5adbdd7098ed 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -32,6 +32,7 @@ import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ScheduledFuture; +import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -54,6 +55,9 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.Counter; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.client.impl.metrics.Unit; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.DnsResolverUtil; @@ -88,6 +92,8 @@ public class ConnectionPool implements AutoCloseable { /** Async release useless connections task. **/ private ScheduledFuture asyncReleaseUselessConnectionsTask; + private final Counter connectionsTcpFailureCounter; + private final Counter connectionsHandshakeFailureCounter; @Value private static class Key { @@ -96,16 +102,19 @@ private static class Key { int randomKey; } - public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, () -> new ClientCnx(conf, eventLoopGroup)); + public ConnectionPool(InstrumentProvider instrumentProvider, + ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { + this(instrumentProvider, conf, eventLoopGroup, () -> new ClientCnx(instrumentProvider, conf, eventLoopGroup)); } - public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, + public ConnectionPool(InstrumentProvider instrumentProvider, + ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier clientCnxSupplier) throws PulsarClientException { - this(conf, eventLoopGroup, clientCnxSupplier, Optional.empty()); + this(instrumentProvider, conf, eventLoopGroup, clientCnxSupplier, Optional.empty()); } - public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, + public ConnectionPool(InstrumentProvider instrumentProvider, + ClientConfigurationData conf, EventLoopGroup eventLoopGroup, Supplier clientCnxSupplier, Optional> addressResolver) throws PulsarClientException { @@ -155,6 +164,14 @@ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGrou } }, idleDetectionIntervalSeconds, idleDetectionIntervalSeconds, TimeUnit.SECONDS); } + + connectionsTcpFailureCounter = + instrumentProvider.newCounter("pulsar.client.connection.failed", Unit.Connections, + "The number of failed connection attempts", null, + Attributes.builder().put("pulsar.failure.type", "tcp-failed").build()); + connectionsHandshakeFailureCounter = instrumentProvider.newCounter("pulsar.client.connection.failed", + Unit.Connections, "The number of failed connection attempts", null, + Attributes.builder().put("pulsar.failure.type", "handshake").build()); } private static AddressResolver createAddressResolver(ClientConfigurationData conf, @@ -295,6 +312,7 @@ private CompletableFuture createConnection(Key key) { } cnxFuture.complete(cnx); }).exceptionally(exception -> { + connectionsHandshakeFailureCounter.increment(); log.warn("[{}] Connection handshake failed: {}", cnx.channel(), exception.getMessage()); cnxFuture.completeExceptionally(exception); // this cleanupConnection may happen before that the @@ -306,6 +324,7 @@ private CompletableFuture createConnection(Key key) { return null; }); }).exceptionally(exception -> { + connectionsTcpFailureCounter.increment(); eventLoopGroup.execute(() -> { log.warn("Failed to open connection to {} : {}", key.physicalAddress, exception.getMessage()); pool.remove(key, cnxFuture); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 5a0e5de330d31..f1e259086ec8a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -34,6 +34,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.concurrent.FastThreadLocal; +import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -91,6 +92,10 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.client.impl.metrics.Counter; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.client.impl.metrics.Unit; +import org.apache.pulsar.client.impl.metrics.UpDownCounter; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.ExecutorProvider; @@ -216,6 +221,17 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final boolean createTopicIfDoesNotExist; private final boolean poolMessages; + private final Counter messagesReceivedCounter; + private final Counter bytesReceivedCounter; + private final UpDownCounter messagesPrefetchedGauge; + private final UpDownCounter bytesPrefetchedGauge; + private final Counter consumersOpenedCounter; + private final Counter consumersClosedCounter; + private final Counter consumerAcksCounter; + private final Counter consumerNacksCounter; + + private final Counter consumerDlqMessagesCounter; + private final AtomicReference clientCnxUsedForConsumerRegistration = new AtomicReference<>(); private final List previousExceptions = new CopyOnWriteArrayList(); private volatile boolean hasSoughtByTimestamp = false; @@ -389,7 +405,30 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat topicNameWithoutPartition = topicName.getPartitionedTopicName(); + InstrumentProvider ip = client.instrumentProvider(); + Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build(); + consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions, + "The number of consumer sessions opened", topic, attrs); + consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions, + "The number of consumer sessions closed", topic, attrs); + messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages, + "The number of messages explicitly received by the consumer application", topic, attrs); + bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes, + "The number of bytes explicitly received by the consumer application", topic, attrs); + messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages, + "The number of messages currently sitting in the consumer receive queue", topic, attrs); + bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes, + "The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs); + + consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages, + "The number of acknowledged messages", topic, attrs); + consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages, + "The number of negatively acknowledged messages", topic, attrs); + consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages, + "The number of messages sent to DLQ", topic, attrs); grabCnx(); + + consumersOpenedCounter.increment(); } public ConnectionHandler getConnectionHandler() { @@ -552,6 +591,8 @@ protected CompletableFuture> internalBatchReceiveAsync() { protected CompletableFuture doAcknowledge(MessageId messageId, AckType ackType, Map properties, TransactionImpl txn) { + consumerAcksCounter.increment(); + if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); @@ -573,6 +614,8 @@ protected CompletableFuture doAcknowledge(MessageId messageId, AckType ack @Override protected CompletableFuture doAcknowledge(List messageIdList, AckType ackType, Map properties, TransactionImpl txn) { + consumerAcksCounter.increment(); + if (getState() != State.Ready && getState() != State.Connecting) { stats.incrementNumAcksFailed(); PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState()); @@ -668,6 +711,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a .value(retryMessage.getData()) .properties(propertiesMap); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { + consumerDlqMessagesCounter.increment(); + doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { result.complete(null); }).exceptionally(ex -> { @@ -760,6 +805,7 @@ private MessageImpl getMessageImpl(Message message) { @Override public void negativeAcknowledge(MessageId messageId) { + consumerNacksCounter.increment(); negativeAcksTracker.add(messageId); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" @@ -768,6 +814,7 @@ public void negativeAcknowledge(MessageId messageId) { @Override public void negativeAcknowledge(Message message) { + consumerNacksCounter.increment(); negativeAcksTracker.add(message); // Ensure the message is not redelivered for ack-timeout, since we did receive an "ack" @@ -1048,6 +1095,8 @@ public CompletableFuture closeAsync() { return closeFuture; } + consumersClosedCounter.increment(); + if (!isConnected()) { log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription); setState(State.Closed); @@ -1240,6 +1289,9 @@ protected MessageImpl newMessage(final MessageIdImpl messageId, } private void executeNotifyCallback(final MessageImpl message) { + messagesPrefetchedGauge.increment(); + bytesPrefetchedGauge.add(message.size()); + // Enqueue the message so that it can be retrieved when application calls receive() // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. // if asyncReceive is waiting then notify callback without adding to incomingMessages queue @@ -1732,6 +1784,12 @@ protected synchronized void messageProcessed(Message msg) { ClientCnx msgCnx = ((MessageImpl) msg).getCnx(); lastDequeuedMessageId = msg.getMessageId(); + messagesPrefetchedGauge.decrement(); + messagesReceivedCounter.increment(); + + bytesPrefetchedGauge.subtract(msg.size()); + bytesReceivedCounter.add(msg.size()); + if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 02d0d10626fa6..8158b6d979efd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import io.netty.channel.EventLoopGroup; +import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; @@ -34,6 +35,8 @@ import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.client.impl.schema.SchemaUtils; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; @@ -60,11 +63,26 @@ public class HttpLookupService implements LookupService { private static final String BasePathV1 = "lookup/v2/destination/"; private static final String BasePathV2 = "lookup/v2/topic/"; - public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) + private final LatencyHistogram histoGetBroker; + private final LatencyHistogram histoGetTopicMetadata; + private final LatencyHistogram histoGetSchema; + private final LatencyHistogram histoListTopics; + + public HttpLookupService(InstrumentProvider instrumentProvider, ClientConfigurationData conf, + EventLoopGroup eventLoopGroup) throws PulsarClientException { this.httpClient = new HttpClient(conf, eventLoopGroup); this.useTls = conf.isUseTls(); this.listenerName = conf.getListenerName(); + + LatencyHistogram histo = instrumentProvider.newLatencyHistogram("pulsar.client.lookup.duration", + "Duration of lookup operations", null, + Attributes.builder().put("pulsar.lookup.transport-type", "http").build()); + histoGetBroker = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "topic").build()); + histoGetTopicMetadata = + histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build()); + histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build()); + histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); } @Override @@ -84,8 +102,18 @@ public CompletableFuture getBroker(TopicName topicName) { String basePath = topicName.isV2() ? BasePathV2 : BasePathV1; String path = basePath + topicName.getLookupName(); path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName); - return httpClient.get(path, LookupData.class) - .thenCompose(lookupData -> { + + long startTime = System.nanoTime(); + CompletableFuture httpFuture = httpClient.get(path, LookupData.class); + + httpFuture.thenRun(() -> { + histoGetBroker.recordSuccess(System.nanoTime() - startTime); + }).exceptionally(x -> { + histoGetBroker.recordFailure(System.nanoTime() - startTime); + return null; + }); + + return httpFuture.thenCompose(lookupData -> { // Convert LookupData into as SocketAddress, handling exceptions URI uri = null; try { @@ -112,9 +140,21 @@ public CompletableFuture getBroker(TopicName topicName) { @Override public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { + long startTime = System.nanoTime(); + String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; - return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true", + CompletableFuture httpFuture = httpClient.get( + String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true", PartitionedTopicMetadata.class); + + httpFuture.thenRun(() -> { + histoGetTopicMetadata.recordSuccess(System.nanoTime() - startTime); + }).exceptionally(x -> { + histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime); + return null; + }); + + return httpFuture; } @Override @@ -130,6 +170,8 @@ public InetSocketAddress resolveHost() { @Override public CompletableFuture getTopicsUnderNamespace(NamespaceName namespace, Mode mode, String topicsPattern, String topicsHash) { + long startTime = System.nanoTime(); + CompletableFuture future = new CompletableFuture<>(); String format = namespace.isV2() @@ -152,6 +194,14 @@ public CompletableFuture getTopicsUnderNamespace(NamespaceName future.completeExceptionally(cause); return null; }); + + future.thenRun(() -> { + histoListTopics.recordSuccess(System.nanoTime() - startTime); + }).exceptionally(x -> { + histoListTopics.recordFailure(System.nanoTime() - startTime); + return null; + }); + return future; } @@ -162,6 +212,7 @@ public CompletableFuture> getSchema(TopicName topicName) { @Override public CompletableFuture> getSchema(TopicName topicName, byte[] version) { + long startTime = System.nanoTime(); CompletableFuture> future = new CompletableFuture<>(); String schemaName = topicName.getSchemaName(); @@ -201,6 +252,13 @@ public CompletableFuture> getSchema(TopicName topicName, by } return null; }); + + future.thenRun(() -> { + histoGetSchema.recordSuccess(System.nanoTime() - startTime); + }).exceptionally(x -> { + histoGetSchema.recordFailure(System.nanoTime() - startTime); + return null; + }); return future; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 880185f7a9781..dbd3aae426900 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -40,6 +40,7 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.ScheduledFuture; +import io.opentelemetry.api.common.Attributes; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; @@ -76,6 +77,11 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.client.impl.metrics.Counter; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.client.impl.metrics.LatencyHistogram; +import org.apache.pulsar.client.impl.metrics.Unit; +import org.apache.pulsar.client.impl.metrics.UpDownCounter; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.client.util.MathUtils; @@ -171,6 +177,15 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne private boolean errorState; + private final LatencyHistogram latencyHistogram; + final LatencyHistogram rpcLatencyHistogram; + private final Counter publishedBytesCounter; + private final UpDownCounter pendingMessagesUpDownCounter; + private final UpDownCounter pendingBytesUpDownCounter; + + private final Counter producersOpenedCounter; + private final Counter producersClosedCounter; + public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture> producerCreatedFuture, int partitionIndex, Schema schema, ProducerInterceptors interceptors, Optional overrideProducerName) { @@ -265,6 +280,26 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties())); } + InstrumentProvider ip = client.instrumentProvider(); + latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration", + "Publish latency experienced by the application, includes client batching time", topic, + Attributes.empty()); + rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration", + "Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic, + Attributes.empty()); + publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size", + Unit.Bytes, "The number of bytes published", topic, Attributes.empty()); + pendingMessagesUpDownCounter = + ip.newUpDownCounter("pulsar.client.producer.message.pending.count", Unit.Messages, + "The number of messages in the producer internal send queue, waiting to be sent", topic, + Attributes.empty()); + pendingBytesUpDownCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.size", Unit.Bytes, + "The size of the messages in the producer internal queue, waiting to sent", topic, Attributes.empty()); + producersOpenedCounter = ip.newCounter("pulsar.client.producer.opened", Unit.Sessions, + "The number of producer sessions opened", topic, Attributes.empty()); + producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions, + "The number of producer sessions closed", topic, Attributes.empty()); + this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder() .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS) @@ -274,6 +309,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration this); setChunkMaxMessageSize(); grabCnx(); + producersOpenedCounter.increment(); } private void setChunkMaxMessageSize() { @@ -337,6 +373,11 @@ CompletableFuture internalSendAsync(Message message) { if (interceptors != null) { interceptorMessage.getProperties(); } + + int msgSize = interceptorMessage.getDataBuffer().readableBytes(); + pendingMessagesUpDownCounter.increment(); + pendingBytesUpDownCounter.add(msgSize); + sendAsync(interceptorMessage, new SendCallback() { SendCallback nextCallback = null; MessageImpl nextMsg = null; @@ -359,15 +400,22 @@ public MessageImpl getNextMessage() { @Override public void sendComplete(Exception e) { + long latencyNanos = System.nanoTime() - createdAt; + pendingMessagesUpDownCounter.decrement(); + pendingBytesUpDownCounter.subtract(msgSize); + try { if (e != null) { + latencyHistogram.recordFailure(latencyNanos); stats.incrementSendFailed(); onSendAcknowledgement(interceptorMessage, null, e); future.completeExceptionally(e); } else { + latencyHistogram.recordSuccess(latencyNanos); + publishedBytesCounter.add(msgSize); onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null); future.complete(interceptorMessage.getMessageId()); - stats.incrementNumAcksReceived(System.nanoTime() - createdAt); + stats.incrementNumAcksReceived(latencyNanos); } } finally { interceptorMessage.getDataBuffer().release(); @@ -413,15 +461,16 @@ CompletableFuture internalSendWithTxnAsync(Message message, Transa } else { CompletableFuture completableFuture = new CompletableFuture<>(); if (!((TransactionImpl) txn).checkIfOpen(completableFuture)) { - return completableFuture; + return completableFuture; } return ((TransactionImpl) txn).registerProducedTopic(topic) - .thenCompose(ignored -> internalSendAsync(message)); + .thenCompose(ignored -> internalSendAsync(message)); } } /** * Compress the payload if compression is configured. + * * @param payload * @return a new payload */ @@ -473,9 +522,10 @@ public void sendAsync(Message message, SendCallback callback) { if (!msg.isReplicated() && msgMetadata.hasProducerName()) { PulsarClientException.InvalidMessageException invalidMessageException = - new PulsarClientException.InvalidMessageException( - format("The producer %s of the topic %s can not reuse the same message", producerName, topic), - msg.getSequenceId()); + new PulsarClientException.InvalidMessageException( + format("The producer %s of the topic %s can not reuse the same message", producerName, + topic), + msg.getSequenceId()); completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); compressedPayload.release(); return; @@ -645,8 +695,8 @@ private void serializeAndSendMessage(MessageImpl msg, msgMetadata.setUuid(uuid); } msgMetadata.setChunkId(chunkId) - .setNumChunksFromMsg(totalChunks) - .setTotalChunkMsgSize(compressedPayloadSize); + .setNumChunksFromMsg(totalChunks) + .setTotalChunkMsgSize(compressedPayloadSize); } if (canAddToBatch(msg) && totalChunks <= 1) { @@ -697,9 +747,9 @@ private void serializeAndSendMessage(MessageImpl msg, if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) { ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata, encryptedPayload); - op = OpSendMsg.create(msg, cmd, sequenceId, callback); + op = OpSendMsg.create(rpcLatencyHistogram, msg, cmd, sequenceId, callback); } else { - op = OpSendMsg.create(msg, null, sequenceId, callback); + op = OpSendMsg.create(rpcLatencyHistogram, msg, null, sequenceId, callback); final MessageMetadata finalMsgMetadata = msgMetadata; op.rePopulate = () -> { if (msgMetadata.hasChunkId()) { @@ -780,8 +830,8 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call } SchemaInfo schemaInfo = msg.hasReplicateFrom() ? msg.getSchemaInfoForReplicator() : msg.getSchemaInfo(); schemaInfo = Optional.ofNullable(schemaInfo) - .filter(si -> si.getType().getValue() > 0) - .orElse(Schema.BYTES.getSchemaInfo()); + .filter(si -> si.getType().getValue() > 0) + .orElse(Schema.BYTES.getSchemaInfo()); getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> { if (ex != null) { Throwable t = FutureUtil.unwrapCompletionException(ex); @@ -816,10 +866,10 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call private CompletableFuture getOrCreateSchemaAsync(ClientCnx cnx, SchemaInfo schemaInfo) { if (!Commands.peerSupportsGetOrCreateSchema(cnx.getRemoteEndpointProtocolVersion())) { return FutureUtil.failedFuture( - new PulsarClientException.NotSupportedException( - format("The command `GetOrCreateSchema` is not supported for the protocol version %d. " - + "The producer is %s, topic is %s", - cnx.getRemoteEndpointProtocolVersion(), producerName, topic))); + new PulsarClientException.NotSupportedException( + format("The command `GetOrCreateSchema` is not supported for the protocol version %d. " + + "The producer is %s, topic is %s", + cnx.getRemoteEndpointProtocolVersion(), producerName, topic))); } long requestId = client.newRequestId(); ByteBuf request = Commands.newGetOrCreateSchema(requestId, topic, schemaInfo); @@ -891,7 +941,7 @@ private boolean canAddToBatch(MessageImpl msg) { private boolean canAddToCurrentBatch(MessageImpl msg) { return batchMessageContainer.haveEnoughSpace(msg) - && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg)) + && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg)) && batchMessageContainer.hasSameTxn(msg); } @@ -920,30 +970,31 @@ private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBu private boolean isValidProducerState(SendCallback callback, long sequenceId) { switch (getState()) { - case Ready: - // OK - case Connecting: - // We are OK to queue the messages on the client, it will be sent to the broker once we get the connection - case RegisteringSchema: - // registering schema - return true; - case Closing: - case Closed: - callback.sendComplete( - new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId)); - return false; - case ProducerFenced: - callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced")); - return false; - case Terminated: - callback.sendComplete( - new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId)); - return false; - case Failed: - case Uninitialized: - default: - callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId)); - return false; + case Ready: + // OK + case Connecting: + // We are OK to queue the messages on the client, it will be sent to the broker once we get the + // connection + case RegisteringSchema: + // registering schema + return true; + case Closing: + case Closed: + callback.sendComplete( + new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId)); + return false; + case ProducerFenced: + callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced")); + return false; + case Terminated: + callback.sendComplete( + new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId)); + return false; + case Failed: + case Uninitialized: + default: + callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId)); + return false; } } @@ -1043,9 +1094,11 @@ private static final class LastSendFutureWrapper { private LastSendFutureWrapper(CompletableFuture lastSendFuture) { this.lastSendFuture = lastSendFuture; } + static LastSendFutureWrapper create(CompletableFuture lastSendFuture) { return new LastSendFutureWrapper(lastSendFuture); } + public CompletableFuture handleOnce() { return lastSendFuture.handle((ignore, t) -> { if (t != null && THROW_ONCE_UPDATER.compareAndSet(this, FALSE, TRUE)) { @@ -1070,6 +1123,7 @@ public CompletableFuture closeAsync() { return CompletableFuture.completedFuture(null); } + producersClosedCounter.increment(); closeProducerTasks(); ClientCnx cnx = cnx(); @@ -1276,9 +1330,10 @@ protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) releaseSemaphoreForSendOp(op); try { op.sendComplete( - new PulsarClientException.ChecksumException( - format("The checksum of the message which is produced by producer %s to the topic " - + "%s is corrupted", producerName, topic))); + new PulsarClientException.ChecksumException( + format("The checksum of the message which is produced by producer %s to the " + + "topic " + + "%s is corrupted", producerName, topic))); } catch (Throwable t) { log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, sequenceId, t); @@ -1326,7 +1381,7 @@ protected synchronized void recoverNotAllowedError(long sequenceId, String error * * @param op * @return returns true only if message is not modified and computed-checksum is same as previous checksum else - * return false that means that message is corrupted. Returns true if checksum is not present. + * return false that means that message is corrupted. Returns true if checksum is not present. */ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) { ByteBufPair msg = op.cmd; @@ -1402,6 +1457,7 @@ public ReferenceCounted touch(Object hint) { } protected static final class OpSendMsg { + LatencyHistogram rpcLatencyHistogram; MessageImpl msg; List> msgs; ByteBufPair cmd; @@ -1421,6 +1477,7 @@ protected static final class OpSendMsg { int chunkId = -1; void initialize() { + rpcLatencyHistogram = null; msg = null; msgs = null; cmd = null; @@ -1440,9 +1497,11 @@ void initialize() { chunkedMessageCtx = null; } - static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, SendCallback callback) { + static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl msg, ByteBufPair cmd, + long sequenceId, SendCallback callback) { OpSendMsg op = RECYCLER.get(); op.initialize(); + op.rpcLatencyHistogram = rpcLatencyHistogram; op.msg = msg; op.cmd = cmd; op.callback = callback; @@ -1452,10 +1511,11 @@ static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, Se return op; } - static OpSendMsg create(List> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback, - int batchAllocatedSize) { + static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List> msgs, ByteBufPair cmd, + long sequenceId, SendCallback callback, int batchAllocatedSize) { OpSendMsg op = RECYCLER.get(); op.initialize(); + op.rpcLatencyHistogram = rpcLatencyHistogram; op.msgs = msgs; op.cmd = cmd; op.callback = callback; @@ -1469,10 +1529,12 @@ static OpSendMsg create(List> msgs, ByteBufPair cmd, long sequenc return op; } - static OpSendMsg create(List> msgs, ByteBufPair cmd, long lowestSequenceId, - long highestSequenceId, SendCallback callback, int batchAllocatedSize) { + static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List> msgs, ByteBufPair cmd, + long lowestSequenceId, + long highestSequenceId, SendCallback callback, int batchAllocatedSize) { OpSendMsg op = RECYCLER.get(); op.initialize(); + op.rpcLatencyHistogram = rpcLatencyHistogram; op.msgs = msgs; op.cmd = cmd; op.callback = callback; @@ -1497,30 +1559,38 @@ void updateSentTimestamp() { void sendComplete(final Exception e) { SendCallback callback = this.callback; + + long now = System.nanoTime(); if (null != callback) { Exception finalEx = e; if (finalEx instanceof TimeoutException) { TimeoutException te = (TimeoutException) e; long sequenceId = te.getSequenceId(); - long ns = System.nanoTime(); + //firstSentAt and lastSentAt maybe -1, it means that the message didn't flush to channel. String errMsg = String.format( - "%s : createdAt %s seconds ago, firstSentAt %s seconds ago, lastSentAt %s seconds ago, " - + "retryCount %s", - te.getMessage(), - RelativeTimeUtil.nsToSeconds(ns - this.createdAt), - RelativeTimeUtil.nsToSeconds(this.firstSentAt <= 0 - ? this.firstSentAt - : ns - this.firstSentAt), - RelativeTimeUtil.nsToSeconds(this.lastSentAt <= 0 - ? this.lastSentAt - : ns - this.lastSentAt), - retryCount + "%s : createdAt %s seconds ago, firstSentAt %s seconds ago, lastSentAt %s seconds ago, " + + "retryCount %s", + te.getMessage(), + RelativeTimeUtil.nsToSeconds(now - this.createdAt), + RelativeTimeUtil.nsToSeconds(this.firstSentAt <= 0 + ? this.firstSentAt + : now - this.firstSentAt), + RelativeTimeUtil.nsToSeconds(this.lastSentAt <= 0 + ? this.lastSentAt + : now - this.lastSentAt), + retryCount ); finalEx = new TimeoutException(errMsg, sequenceId); } + if (e == null) { + rpcLatencyHistogram.recordSuccess(now - this.lastSentAt); + } else { + rpcLatencyHistogram.recordFailure(now - this.lastSentAt); + } + callback.sendComplete(finalEx); } } @@ -1687,7 +1757,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { long requestId = client.newRequestId(); PRODUCER_DEADLINE_UPDATER - .compareAndSet(this, 0, System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs()); + .compareAndSet(this, 0, System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs()); SchemaInfo schemaInfo = null; if (schema != null) { @@ -1698,7 +1768,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { // but now we have standardized on every schema to generate an Avro based schema if (Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion())) { schemaInfo = schema.getSchemaInfo(); - } else if (schema instanceof JSONSchema){ + } else if (schema instanceof JSONSchema) { JSONSchema jsonSchema = (JSONSchema) schema; schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo(); } else { @@ -1721,146 +1791,148 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { conf.getAccessMode(), topicEpoch, client.conf.isEnableTransaction(), conf.getInitialSubscriptionName()), requestId).thenAccept(response -> { - String producerName = response.getProducerName(); - long lastSequenceId = response.getLastSequenceId(); - schemaVersion = Optional.ofNullable(response.getSchemaVersion()); - schemaVersion.ifPresent(v -> schemaCache.put(SchemaHash.of(schema), v)); - - // We are now reconnected to broker and clear to send messages. Re-send all pending messages and - // set the cnx pointer so that new messages will be sent immediately - synchronized (ProducerImpl.this) { - State state = getState(); - if (state == State.Closing || state == State.Closed) { - // Producer was closed while reconnecting, close the connection to make sure the broker - // drops the producer on its side - cnx.removeProducer(producerId); - cnx.channel().close(); - future.complete(null); - return; - } - resetBackoff(); + String producerName = response.getProducerName(); + long lastSequenceId = response.getLastSequenceId(); + schemaVersion = Optional.ofNullable(response.getSchemaVersion()); + schemaVersion.ifPresent(v -> schemaCache.put(SchemaHash.of(schema), v)); - log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel()); - connectionId = cnx.ctx().channel().toString(); - connectedSince = DateFormatter.now(); - if (conf.getAccessMode() != ProducerAccessMode.Shared && !topicEpoch.isPresent()) { - log.info("[{}] [{}] Producer epoch is {}", topic, producerName, response.getTopicEpoch()); - } - topicEpoch = response.getTopicEpoch(); + // We are now reconnected to broker and clear to send messages. Re-send all pending messages and + // set the cnx pointer so that new messages will be sent immediately + synchronized (ProducerImpl.this) { + State state = getState(); + if (state == State.Closing || state == State.Closed) { + // Producer was closed while reconnecting, close the connection to make sure the broker + // drops the producer on its side + cnx.removeProducer(producerId); + cnx.channel().close(); + future.complete(null); + return; + } + resetBackoff(); - if (this.producerName == null) { - this.producerName = producerName; - } + log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel()); + connectionId = cnx.ctx().channel().toString(); + connectedSince = DateFormatter.now(); + if (conf.getAccessMode() != ProducerAccessMode.Shared && !topicEpoch.isPresent()) { + log.info("[{}] [{}] Producer epoch is {}", topic, producerName, response.getTopicEpoch()); + } + topicEpoch = response.getTopicEpoch(); - if (this.msgIdGenerator == 0 && conf.getInitialSequenceId() == null) { - // Only update sequence id generator if it wasn't already modified. That means we only want - // to update the id generator the first time the producer gets established, and ignore the - // sequence id sent by broker in subsequent producer reconnects - this.lastSequenceIdPublished = lastSequenceId; - this.msgIdGenerator = lastSequenceId + 1; - } + if (this.producerName == null) { + this.producerName = producerName; + } - resendMessages(cnx, epoch); - } - future.complete(null); - }).exceptionally((e) -> { - Throwable cause = e.getCause(); - cnx.removeProducer(producerId); - State state = getState(); - if (state == State.Closing || state == State.Closed) { - // Producer was closed while reconnecting, close the connection to make sure the broker - // drops the producer on its side - cnx.channel().close(); - future.complete(null); - return null; - } + if (this.msgIdGenerator == 0 && conf.getInitialSequenceId() == null) { + // Only update sequence id generator if it wasn't already modified. That means we only want + // to update the id generator the first time the producer gets established, and ignore the + // sequence id sent by broker in subsequent producer reconnects + this.lastSequenceIdPublished = lastSequenceId; + this.msgIdGenerator = lastSequenceId + 1; + } - if (cause instanceof TimeoutException) { - // Creating the producer has timed out. We need to ensure the broker closes the producer - // in case it was indeed created, otherwise it might prevent new create producer operation, - // since we are not necessarily closing the connection. - long closeRequestId = client.newRequestId(); - ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId); - cnx.sendRequestWithId(cmd, closeRequestId); - } + resendMessages(cnx, epoch); + } + future.complete(null); + }).exceptionally((e) -> { + Throwable cause = e.getCause(); + cnx.removeProducer(producerId); + State state = getState(); + if (state == State.Closing || state == State.Closed) { + // Producer was closed while reconnecting, close the connection to make sure the broker + // drops the producer on its side + cnx.channel().close(); + future.complete(null); + return null; + } - if (cause instanceof PulsarClientException.ProducerFencedException) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Failed to create producer: {}", - topic, producerName, cause.getMessage()); - } - } else { - log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage()); + if (cause instanceof TimeoutException) { + // Creating the producer has timed out. We need to ensure the broker closes the producer + // in case it was indeed created, otherwise it might prevent new create producer operation, + // since we are not necessarily closing the connection. + long closeRequestId = client.newRequestId(); + ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId); + cnx.sendRequestWithId(cmd, closeRequestId); + } + + if (cause instanceof PulsarClientException.ProducerFencedException) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Failed to create producer: {}", + topic, producerName, cause.getMessage()); + } + } else { + log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage()); + } + // Close the producer since topic does not exist. + if (cause instanceof PulsarClientException.TopicDoesNotExistException) { + closeAsync().whenComplete((v, ex) -> { + if (ex != null) { + log.error("Failed to close producer on TopicDoesNotExistException.", ex); } - // Close the producer since topic does not exist. - if (cause instanceof PulsarClientException.TopicDoesNotExistException) { - closeAsync().whenComplete((v, ex) -> { - if (ex != null) { - log.error("Failed to close producer on TopicDoesNotExistException.", ex); - } - producerCreatedFuture.completeExceptionally(cause); - }); - future.complete(null); - return null; + producerCreatedFuture.completeExceptionally(cause); + }); + future.complete(null); + return null; + } + if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) { + synchronized (this) { + log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic, + producerName); + + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Pending messages: {}", topic, producerName, + pendingMessages.messagesCount()); } - if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) { - synchronized (this) { - log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic, - producerName); - - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Pending messages: {}", topic, producerName, - pendingMessages.messagesCount()); - } - - PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException( - format("The backlog quota of the topic %s that the producer %s produces to is exceeded", + + PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException( + format("The backlog quota of the topic %s that the producer %s produces to is exceeded", topic, producerName)); - failPendingMessages(cnx(), bqe); - } - } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) { - log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.", - producerName, topic); - } + failPendingMessages(cnx(), bqe); + } + } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) { + log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.", + producerName, topic); + } - if (cause instanceof PulsarClientException.TopicTerminatedException) { - setState(State.Terminated); - synchronized (this) { - failPendingMessages(cnx(), (PulsarClientException) cause); - } - producerCreatedFuture.completeExceptionally(cause); - closeProducerTasks(); - client.cleanupProducer(this); - } else if (cause instanceof PulsarClientException.ProducerFencedException) { - setState(State.ProducerFenced); - synchronized (this) { - failPendingMessages(cnx(), (PulsarClientException) cause); - } - producerCreatedFuture.completeExceptionally(cause); - closeProducerTasks(); - client.cleanupProducer(this); - } else if (producerCreatedFuture.isDone() - || (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause) - && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this))) { - // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are - // still within the initial timeout budget and we are dealing with a retriable error - future.completeExceptionally(cause); - } else { - setState(State.Failed); - producerCreatedFuture.completeExceptionally(cause); - closeProducerTasks(); - client.cleanupProducer(this); - Timeout timeout = sendTimeout; - if (timeout != null) { - timeout.cancel(); - sendTimeout = null; - } - } - if (!future.isDone()) { - future.complete(null); - } - return null; - }); + if (cause instanceof PulsarClientException.TopicTerminatedException) { + setState(State.Terminated); + synchronized (this) { + failPendingMessages(cnx(), (PulsarClientException) cause); + } + producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); + client.cleanupProducer(this); + } else if (cause instanceof PulsarClientException.ProducerFencedException) { + setState(State.ProducerFenced); + synchronized (this) { + failPendingMessages(cnx(), (PulsarClientException) cause); + } + producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); + client.cleanupProducer(this); + } else if (producerCreatedFuture.isDone() || ( + cause instanceof PulsarClientException + && PulsarClientException.isRetriableError(cause) + && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this) + )) { + // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are + // still within the initial timeout budget and we are dealing with a retriable error + future.completeExceptionally(cause); + } else { + setState(State.Failed); + producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); + client.cleanupProducer(this); + Timeout timeout = sendTimeout; + if (timeout != null) { + timeout.cancel(); + sendTimeout = null; + } + } + if (!future.isDone()) { + future.complete(null); + } + return null; + }); return future; } @@ -1966,7 +2038,7 @@ private void stripChecksum(OpSendMsg op) { headerFrame.setInt(0, newTotalFrameSizeLength); // rewrite new [total-size] ByteBuf metadata = headerFrame.slice(checksumMark, headerFrameSize - checksumMark); // sliced only - // metadata + // metadata headerFrame.writerIndex(headerSize); // set headerFrame write-index to overwrite metadata over checksum metadata.readBytes(headerFrame, metadata.readableBytes()); headerFrame.capacity(headerFrameSize - checksumSize); // reduce capacity by removed checksum bytes @@ -2078,6 +2150,7 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, op.sequenceId, t); } + client.getMemoryLimitController().releaseMemory(op.uncompressedSize); ReferenceCountUtil.safeRelease(op.cmd); op.recycle(); @@ -2102,7 +2175,6 @@ private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { /** * fail any pending batch messages that were enqueued, however batch was not closed out. - * */ private void failPendingBatchMessages(PulsarClientException ex) { if (batchMessageContainer.isEmpty()) { @@ -2122,7 +2194,7 @@ public CompletableFuture flushAsync() { if (isBatchMessagingEnabled()) { batchMessageAndSend(false); } - CompletableFuture lastSendFuture = this.lastSendFuture; + CompletableFuture lastSendFuture = this.lastSendFuture; if (!(lastSendFuture == this.lastSendFutureWrapper.lastSendFuture)) { this.lastSendFutureWrapper = LastSendFutureWrapper.create(lastSendFuture); } @@ -2241,7 +2313,7 @@ protected void processOpSendMsg(OpSendMsg op) { } else { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", topic, producerName, - op.sequenceId); + op.sequenceId); } } } catch (Throwable t) { @@ -2257,7 +2329,8 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e // In this case, the cnx passed to this method is no longer the active connection. This method will get // called again once the new connection registers the producer with the broker. log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the " - + " {} pending messages since they will deliver using another connection.", topic, producerName, + + " {} pending messages since they will deliver using another connection.", topic, + producerName, pendingMessages.messagesCount()); return; } @@ -2298,7 +2371,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e op.cmd.retain(); if (log.isDebugEnabled()) { log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName, - cnx.channel(), op.sequenceId); + cnx.channel(), op.sequenceId); } cnx.ctx().write(op.cmd, cnx.ctx().voidPromise()); op.updateSentTimestamp(); @@ -2322,7 +2395,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long e } /** - * Check if final message size for non-batch and non-chunked messages is larger than max message size. + * Check if final message size for non-batch and non-chunked messages is larger than max message size. */ private boolean isMessageSizeExceeded(OpSendMsg op) { if (op.msg != null && !conf.isChunkingEnabled()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 179996f4ea9f1..a919eb19a7ff8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -70,6 +70,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; @@ -149,6 +150,8 @@ public SchemaInfoProvider load(String topicName) { private final Clock clientClock; + private final InstrumentProvider instrumentProvider; + @Getter private TransactionCoordinatorClientImpl tcClient; @@ -176,6 +179,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG Timer timer, ExecutorProvider externalExecutorProvider, ExecutorProvider internalExecutorProvider, ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException { + EventLoopGroup eventLoopGroupReference = null; ConnectionPool connectionPoolReference = null; try { @@ -193,10 +197,12 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration"); } this.conf = conf; + this.instrumentProvider = new InstrumentProvider(conf.getOpenTelemetry()); clientClock = conf.getClock(); conf.getAuthentication().start(); connectionPoolReference = - connectionPool != null ? connectionPool : new ConnectionPool(conf, this.eventLoopGroup); + connectionPool != null ? connectionPool : + new ConnectionPool(instrumentProvider, conf, this.eventLoopGroup); this.cnxPool = connectionPoolReference; this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider : new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); @@ -205,7 +211,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider : new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled"); if (conf.getServiceUrl().startsWith("http")) { - lookup = new HttpLookupService(conf, this.eventLoopGroup); + lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup); } else { lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(), this.scheduledExecutorProvider.getExecutor()); @@ -1053,7 +1059,7 @@ public void reloadLookUp() throws PulsarClientException { public LookupService createLookup(String url) throws PulsarClientException { if (url.startsWith("http")) { - return new HttpLookupService(conf, eventLoopGroup); + return new HttpLookupService(instrumentProvider, conf, eventLoopGroup); } else { return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(), externalExecutorProvider.getExecutor()); @@ -1231,6 +1237,11 @@ public ScheduledExecutorProvider getScheduledExecutorProvider() { return scheduledExecutorProvider; } + InstrumentProvider instrumentProvider() { + return instrumentProvider; + } + + // // Transaction related API // diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 20ec9c3d99af4..e755b6ba1ee6d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -22,6 +22,7 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import io.netty.util.concurrent.FastThreadLocal; +import io.opentelemetry.api.common.Attributes; import java.io.Closeable; import java.util.ArrayDeque; import java.util.Collections; @@ -35,6 +36,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.metrics.Counter; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.client.impl.metrics.Unit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +56,8 @@ public class UnAckedMessageTracker implements Closeable { protected final long ackTimeoutMillis; protected final long tickDurationInMs; + private final Counter consumerAckTimeoutsCounter; + private static class UnAckedMessageTrackerDisabled extends UnAckedMessageTracker { @Override public void clear() { @@ -89,13 +95,14 @@ public void close() { protected Timeout timeout; - public UnAckedMessageTracker() { + private UnAckedMessageTracker() { readLock = null; writeLock = null; timePartitions = null; messageIdPartitionMap = null; this.ackTimeoutMillis = 0; this.tickDurationInMs = 0; + this.consumerAckTimeoutsCounter = null; } protected static final FastThreadLocal> TL_MESSAGE_IDS_SET = @@ -114,6 +121,14 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBa ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); + + InstrumentProvider ip = client.instrumentProvider(); + consumerAckTimeoutsCounter = ip.newCounter("pulsar.client.consumer.message.ack.timeout", Unit.Messages, + "The number of messages that were not acknowledged in the configured timeout period, hence, were " + + "requested by the client to be redelivered", + consumerBase.getTopic(), + Attributes.builder().put("pulsar.subscription", consumerBase.getSubscription()).build()); + if (conf.getAckTimeoutRedeliveryBackoff() == null) { this.messageIdPartitionMap = new HashMap<>(); this.timePartitions = new ArrayDeque<>(); @@ -136,6 +151,7 @@ public void run(Timeout t) throws Exception { try { HashSet headPartition = timePartitions.removeFirst(); if (!headPartition.isEmpty()) { + consumerAckTimeoutsCounter.add(headPartition.size()); log.info("[{}] {} messages will be re-delivered", consumerBase, headPartition.size()); headPartition.forEach(messageId -> { if (messageId instanceof ChunkMessageIdImpl) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 1dc1c2a8689c6..6dcea7dc46672 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.opentelemetry.api.OpenTelemetry; import io.swagger.annotations.ApiModelProperty; import java.io.Serializable; import java.net.InetSocketAddress; @@ -395,6 +396,8 @@ public class ClientConfigurationData implements Serializable, Cloneable { ) private String description; + private transient OpenTelemetry openTelemetry; + /** * Gets the authentication settings for the client. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Counter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Counter.java new file mode 100644 index 0000000000000..fffbab4217a86 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Counter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.metrics; + +import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels; +import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.extension.incubator.metrics.ExtendedLongCounterBuilder; + +public class Counter { + + private final LongCounter counter; + private final Attributes attributes; + + Counter(Meter meter, String name, Unit unit, String description, String topic, Attributes attributes) { + LongCounterBuilder builder = meter.counterBuilder(name) + .setDescription(description) + .setUnit(unit.toString()); + + if (topic != null) { + if (builder instanceof ExtendedLongCounterBuilder) { + ExtendedLongCounterBuilder eb = (ExtendedLongCounterBuilder) builder; + eb.setAttributesAdvice(getDefaultAggregationLabels(attributes)); + } + + attributes = getTopicAttributes(topic, attributes); + } + + this.counter = builder.build(); + this.attributes = attributes; + } + + public void increment() { + add(1); + } + + public void add(int delta) { + counter.add(delta, attributes); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java new file mode 100644 index 0000000000000..1e02af1fd37e1 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.metrics; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import org.apache.pulsar.PulsarVersion; + +public class InstrumentProvider { + + public static final InstrumentProvider NOOP = new InstrumentProvider(OpenTelemetry.noop()); + + private final Meter meter; + + public InstrumentProvider(OpenTelemetry otel) { + if (otel == null) { + // By default, metrics are disabled, unless the OTel java agent is configured. + // This allows to enable metrics without any code change. + otel = GlobalOpenTelemetry.get(); + } + this.meter = otel.getMeterProvider() + .meterBuilder("org.apache.pulsar.client") + .setInstrumentationVersion(PulsarVersion.getVersion()) + .build(); + } + + public Counter newCounter(String name, Unit unit, String description, String topic, Attributes attributes) { + return new Counter(meter, name, unit, description, topic, attributes); + } + + public UpDownCounter newUpDownCounter(String name, Unit unit, String description, String topic, + Attributes attributes) { + return new UpDownCounter(meter, name, unit, description, topic, attributes); + } + + public LatencyHistogram newLatencyHistogram(String name, String description, String topic, Attributes attributes) { + return new LatencyHistogram(meter, name, description, topic, attributes); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/LatencyHistogram.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/LatencyHistogram.java new file mode 100644 index 0000000000000..ed04eff03b39d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/LatencyHistogram.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.metrics; + +import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels; +import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes; +import com.google.common.collect.Lists; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.extension.incubator.metrics.ExtendedDoubleHistogramBuilder; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class LatencyHistogram { + + // Used for tests + public static final LatencyHistogram NOOP = new LatencyHistogram() { + public void recordSuccess(long latencyNanos) { + } + + public void recordFailure(long latencyNanos) { + } + }; + + private static final List latencyHistogramBuckets = + Lists.newArrayList(.0005, .001, .0025, .005, .01, .025, .05, .1, .25, .5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0); + + private static final double NANOS = TimeUnit.SECONDS.toNanos(1); + + private final Attributes successAttributes; + + private final Attributes failedAttributes; + private final DoubleHistogram histogram; + + private LatencyHistogram() { + successAttributes = null; + failedAttributes = null; + histogram = null; + } + + LatencyHistogram(Meter meter, String name, String description, String topic, Attributes attributes) { + DoubleHistogramBuilder builder = meter.histogramBuilder(name) + .setDescription(description) + .setUnit(Unit.Seconds.toString()) + .setExplicitBucketBoundariesAdvice(latencyHistogramBuckets); + + if (topic != null) { + if (builder instanceof ExtendedDoubleHistogramBuilder) { + ExtendedDoubleHistogramBuilder eb = (ExtendedDoubleHistogramBuilder) builder; + eb.setAttributesAdvice( + getDefaultAggregationLabels( + attributes.toBuilder().put("pulsar.response.status", "success").build())); + } + attributes = getTopicAttributes(topic, attributes); + } + + successAttributes = attributes.toBuilder() + .put("pulsar.response.status", "success") + .build(); + failedAttributes = attributes.toBuilder() + .put("pulsar.response.status", "failed") + .build(); + this.histogram = builder.build(); + } + + private LatencyHistogram(DoubleHistogram histogram, Attributes successAttributes, Attributes failedAttributes) { + this.histogram = histogram; + this.successAttributes = successAttributes; + this.failedAttributes = failedAttributes; + } + + /** + * Create a new histograms that inherits the old histograms attributes and adds new ones. + */ + public LatencyHistogram withAttributes(Attributes attributes) { + return new LatencyHistogram( + histogram, + successAttributes.toBuilder().putAll(attributes).build(), + failedAttributes.toBuilder().putAll(attributes).build() + ); + } + + + public void recordSuccess(long latencyNanos) { + histogram.record(latencyNanos / NANOS, successAttributes); + } + + public void recordFailure(long latencyNanos) { + histogram.record(latencyNanos / NANOS, failedAttributes); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java new file mode 100644 index 0000000000000..b9802f4f32b5f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.metrics; + +import com.google.common.collect.Lists; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import java.util.ArrayList; +import java.util.List; +import lombok.experimental.UtilityClass; +import org.apache.pulsar.common.naming.TopicName; + +@UtilityClass +public class MetricsUtil { + + // By default, advice to use namespace level aggregation only + private static final List> DEFAULT_AGGREGATION_LABELS = Lists.newArrayList( + AttributeKey.stringKey("pulsar.tenant"), + AttributeKey.stringKey("pulsar.namespace") + ); + + static List> getDefaultAggregationLabels(Attributes attrs) { + List> res = new ArrayList<>(); + res.addAll(DEFAULT_AGGREGATION_LABELS); + res.addAll(attrs.asMap().keySet()); + return res; + } + + static Attributes getTopicAttributes(String topic, Attributes baseAttributes) { + TopicName tn = TopicName.get(topic); + + AttributesBuilder ab = baseAttributes.toBuilder(); + if (tn.isPartitioned()) { + ab.put("pulsar.partition", tn.getPartitionIndex()); + } + ab.put("pulsar.topic", tn.getPartitionedTopicName()); + ab.put("pulsar.namespace", tn.getNamespace()); + ab.put("pulsar.tenant", tn.getTenant()); + return ab.build(); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Unit.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Unit.java new file mode 100644 index 0000000000000..5204cc2f03eae --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/Unit.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.metrics; + +public enum Unit { + Bytes, + + Messages, + + Seconds, + + Connections, + + Sessions, + + None, + + ; + + public String toString() { + switch (this) { + case Bytes: + return "By"; + + case Messages: + return "{message}"; + + case Seconds: + return "s"; + + case Connections: + return "{connection}"; + + case Sessions: + return "{session}"; + + case None: + default: + return "1"; + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/UpDownCounter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/UpDownCounter.java new file mode 100644 index 0000000000000..3df0c2bb42302 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/UpDownCounter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.metrics; + +import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels; +import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.api.metrics.LongUpDownCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.extension.incubator.metrics.ExtendedLongUpDownCounterBuilder; + +public class UpDownCounter { + + private final LongUpDownCounter counter; + private final Attributes attributes; + + UpDownCounter(Meter meter, String name, Unit unit, String description, String topic, Attributes attributes) { + LongUpDownCounterBuilder builder = meter.upDownCounterBuilder(name) + .setDescription(description) + .setUnit(unit.toString()); + + if (topic != null) { + if (builder instanceof ExtendedLongUpDownCounterBuilder) { + ExtendedLongUpDownCounterBuilder eb = (ExtendedLongUpDownCounterBuilder) builder; + eb.setAttributesAdvice(getDefaultAggregationLabels(attributes)); + } + + attributes = getTopicAttributes(topic, attributes); + } + + this.counter = builder.build(); + this.attributes = attributes; + } + + public void increment() { + add(1); + } + + public void decrement() { + add(-1); + } + + public void add(long delta) { + counter.add(delta, attributes); + } + + public void subtract(long diff) { + add(-diff); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/package-info.java new file mode 100644 index 0000000000000..ee99bb3332c26 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Pulsar Client OTel metrics utilities + */ +package org.apache.pulsar.client.impl.metrics; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java index 1d1a6f85bfd41..514e3dde14070 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AcknowledgementsGroupingTrackerTest.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; @@ -70,7 +71,7 @@ public void setup() throws NoSuchFieldException, IllegalAccessException { doReturn(client).when(consumer).getClient(); doReturn(cnx).when(consumer).getClientCnx(); doReturn(new ConsumerStatsRecorderImpl()).when(consumer).getStats(); - doReturn(new UnAckedMessageTracker().UNACKED_MESSAGE_TRACKER_DISABLED) + doReturn(UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED) .when(consumer).getUnAckedMessageTracker(); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); when(cnx.ctx()).thenReturn(ctx); @@ -423,7 +424,7 @@ public void testDoIndividualBatchAckAsync() throws Exception{ public class ClientCnxTest extends ClientCnx { public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { - super(conf, eventLoopGroup); + super(InstrumentProvider.NOOP, conf, eventLoopGroup); } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 87188255b20b8..983cd21a7a9d8 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.PulsarClientException.LookupException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.naming.TopicName; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -65,6 +66,7 @@ public void setup() throws Exception { doReturn(0).when(clientConfig).getMaxLookupRedirects(); PulsarClientImpl client = mock(PulsarClientImpl.class); + doReturn(InstrumentProvider.NOOP).when(client).instrumentProvider(); doReturn(cnxPool).when(client).getCnxPool(); doReturn(clientConfig).when(client).getConfiguration(); doReturn(1L).when(client).newRequestId(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java index ca6114d2ed823..d573229fddefa 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxRequestTimeoutQueueTest.java @@ -26,6 +26,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -60,7 +61,7 @@ void setupClientCnx() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setKeepAliveIntervalSeconds(0); conf.setOperationTimeoutMs(1); - cnx = new ClientCnx(conf, eventLoop); + cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 4f657da82b289..bc1d940c76bbf 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnected; @@ -63,7 +64,7 @@ public void testClientCnxTimeout() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setOperationTimeoutMs(10); conf.setKeepAliveIntervalSeconds(0); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); @@ -89,7 +90,7 @@ public void testPendingLookupRequestSemaphore() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setOperationTimeoutMs(10_000); conf.setKeepAliveIntervalSeconds(0); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); @@ -127,7 +128,7 @@ public void testPendingLookupRequestSemaphoreServiceNotReady() throws Exception ClientConfigurationData conf = new ClientConfigurationData(); conf.setOperationTimeoutMs(10_000); conf.setKeepAliveIntervalSeconds(0); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); @@ -170,7 +171,7 @@ public void testPendingWaitingLookupRequestSemaphore() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setOperationTimeoutMs(10_000); conf.setKeepAliveIntervalSeconds(0); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); @@ -196,7 +197,7 @@ public void testReceiveErrorAtSendConnectFrameState() throws Exception { EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory); ClientConfigurationData conf = new ClientConfigurationData(); conf.setOperationTimeoutMs(10); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); @@ -230,7 +231,7 @@ public void testGetLastMessageIdWithError() throws Exception { ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState"); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory); ClientConfigurationData conf = new ClientConfigurationData(); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); @@ -276,7 +277,7 @@ public void testHandleCloseConsumer() { ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer"); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory); ClientConfigurationData conf = new ClientConfigurationData(); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); long consumerId = 1; PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); @@ -300,7 +301,7 @@ public void testHandleCloseProducer() { ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer"); EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory); ClientConfigurationData conf = new ClientConfigurationData(); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); long producerId = 1; PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class); @@ -393,7 +394,7 @@ private void withConnection(String testName, Consumer test) { try { ClientConfigurationData conf = new ClientConfigurationData(); - ClientCnx cnx = new ClientCnx(conf, eventLoop); + ClientCnx cnx = new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop); ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); Channel channel = mock(Channel.class); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java index 2db23782640eb..efcc06bede3e1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java @@ -23,6 +23,7 @@ import static org.testng.Assert.assertEquals; import com.google.common.collect.Lists; import java.util.Arrays; +import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -39,7 +40,7 @@ public void createMockMessage() { } private ProducerImpl.OpSendMsg createDummyOpSendMsg() { - return ProducerImpl.OpSendMsg.create(message, null, 0L, null); + return ProducerImpl.OpSendMsg.create(LatencyHistogram.NOOP, message, null, 0L, null); } @Test diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java index b96f6a344a3dc..f96d2e2e0b0e9 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.assertj.core.util.Sets; @@ -78,6 +79,7 @@ public void setup() { producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); when(client.getConfiguration()).thenReturn(clientConfigurationData); when(client.timer()).thenReturn(timer); when(client.newProducer()).thenReturn(producerBuilderImpl); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java index 6fcedc3f94de7..f9df63759394a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue; import java.nio.ByteBuffer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.metrics.LatencyHistogram; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -42,6 +43,7 @@ public void testChunkedMessageCtxDeallocate() { for (int i = 0; i < totalChunks; i++) { ProducerImpl.OpSendMsg opSendMsg = ProducerImpl.OpSendMsg.create( + LatencyHistogram.NOOP, MessageImpl.create(new MessageMetadata(), ByteBuffer.allocate(0), Schema.STRING, null), null, 0, null); opSendMsg.chunkedMessageCtx = ctx; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index c96443c1e2f9f..274b9b4f2d572 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.client.util.ScheduledExecutorProvider; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; @@ -180,7 +181,7 @@ public void testInitializeWithTimer() throws PulsarClientException { ClientConfigurationData conf = new ClientConfigurationData(); @Cleanup("shutdownGracefully") EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("test")); - ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop)); + ConnectionPool pool = Mockito.spy(new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoop)); conf.setServiceUrl("pulsar://localhost:6650"); HashedWheelTimer timer = new HashedWheelTimer(); @@ -205,7 +206,7 @@ public void testResourceCleanup() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl(""); initializeEventLoopGroup(conf); - try (ConnectionPool connectionPool = new ConnectionPool(conf, eventLoopGroup)) { + try (ConnectionPool connectionPool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup)) { assertThrows(() -> new PulsarClientImpl(conf, eventLoopGroup, connectionPool)); } finally { // Externally passed eventLoopGroup should not be shutdown. diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java index 91ad321048226..b01fbcb879f80 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.testng.annotations.Test; @@ -46,6 +47,7 @@ public class UnAckedMessageTrackerTest { public void testAddAndRemove() { PulsarClientImpl client = mock(PulsarClientImpl.class); ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); when(client.getCnxPool()).thenReturn(connectionPool); Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); @@ -86,6 +88,7 @@ public void testAddAndRemove() { public void testTrackChunkedMessageId() { PulsarClientImpl client = mock(PulsarClientImpl.class); ConnectionPool connectionPool = mock(ConnectionPool.class); + when(client.instrumentProvider()).thenReturn(InstrumentProvider.NOOP); when(client.getCnxPool()).thenReturn(connectionPool); Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java index 5856395566a67..27f521ef1ff73 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ClientConfigurationDataTest.java @@ -22,7 +22,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import io.opentelemetry.api.OpenTelemetry; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import lombok.Cleanup; import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.testng.Assert; import org.testng.annotations.Test; /** @@ -36,10 +43,35 @@ public void testDoNotPrintSensitiveInfo() throws JsonProcessingException { clientConfigurationData.setTlsTrustStorePassword("xxxx"); clientConfigurationData.setSocks5ProxyPassword("yyyy"); clientConfigurationData.setAuthentication(new AuthenticationToken("zzzz")); + clientConfigurationData.setOpenTelemetry(OpenTelemetry.noop()); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); String serializedConf = objectMapper.writeValueAsString(clientConfigurationData); assertThat(serializedConf).doesNotContain("xxxx", "yyyy", "zzzz"); } + @Test + public void testSerializable() throws Exception { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setConnectionsPerBroker(3); + conf.setTlsTrustStorePassword("xxxx"); + conf.setOpenTelemetry(OpenTelemetry.noop()); + + @Cleanup + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + @Cleanup + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(conf); + byte[] serialized = bos.toByteArray(); + + // Deserialize + @Cleanup + ByteArrayInputStream bis = new ByteArrayInputStream(serialized); + @Cleanup + ObjectInputStream ois = new ObjectInputStream(bis); + Object object = ois.readObject(); + + Assert.assertEquals(object.getClass(), ClientConfigurationData.class); + Assert.assertEquals(object, conf); + } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java index 782454022b1ed..d15d48b9209d0 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java @@ -26,6 +26,7 @@ import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.protocol.Commands; @@ -47,7 +48,7 @@ public class ProxyClientCnx extends ClientCnx { public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, String clientAuthMethod, int protocolVersion, boolean forwardClientAuthData, ProxyConnection proxyConnection) { - super(conf, eventLoopGroup, protocolVersion); + super(InstrumentProvider.NOOP, conf, eventLoopGroup, protocolVersion); this.clientAuthRole = clientAuthRole; this.clientAuthMethod = clientAuthMethod; this.forwardClientAuthData = forwardClientAuthData; diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index ba9247a085dff..594d6cbc3bb59 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -59,6 +59,7 @@ import org.apache.pulsar.client.impl.PulsarChannelInitializer; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.internal.PropertiesUtils; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.CommandAuthResponse; @@ -383,11 +384,12 @@ private synchronized void completeConnect() throws PulsarClientException { service.getConfiguration().isForwardAuthorizationCredentials(), this); } else { clientCnxSupplier = - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise); + () -> new ClientCnx(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(), + protocolVersionToAdvertise); } if (this.connectionPool == null) { - this.connectionPool = new ConnectionPool(clientConf, service.getWorkerGroup(), + this.connectionPool = new ConnectionPool(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(), clientCnxSupplier, Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next()))); } else { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java index 3f58250e6d68a..1a9459619ebe9 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyParserTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; @@ -251,8 +252,8 @@ private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConf final EventLoopGroup eventLoopGroup) throws Exception { - ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> { - return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) { + ConnectionPool cnxPool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup, () -> { + return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup, ProtocolVersion.v11_VALUE) { @Override protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { throw new UnsupportedOperationException(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 9bc12dcc6fcb2..e1e49f9e8c5f2 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; @@ -374,8 +375,8 @@ private PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurati EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory); registerCloseable(() -> eventLoopGroup.shutdownNow()); - ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> { - return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) { + ConnectionPool cnxPool = new ConnectionPool(InstrumentProvider.NOOP, conf, eventLoopGroup, () -> { + return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup, ProtocolVersion.v11_VALUE) { @Override protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { throw new UnsupportedOperationException(); diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml index ecc12b2e563d5..ec32b57be15f4 100644 --- a/pulsar-testclient/pom.xml +++ b/pulsar-testclient/pom.xml @@ -112,6 +112,23 @@ jackson-databind + + io.opentelemetry + opentelemetry-exporter-prometheus + + + io.opentelemetry + opentelemetry-exporter-otlp + + + io.opentelemetry + opentelemetry-sdk + + + io.opentelemetry + opentelemetry-sdk-extension-autoconfigure + + org.awaitility awaitility diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java index 3b44023ef503e..b6b3d805edc75 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java @@ -19,6 +19,7 @@ package org.apache.pulsar.testclient; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import java.lang.management.ManagementFactory; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -76,7 +77,9 @@ public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArgu .listenerThreads(arguments.listenerThreads) .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath) .maxLookupRequests(arguments.maxLookupRequest) - .proxyServiceUrl(arguments.proxyServiceURL, arguments.proxyProtocol); + .proxyServiceUrl(arguments.proxyServiceURL, arguments.proxyProtocol) + .openTelemetry(AutoConfiguredOpenTelemetrySdk.builder() + .build().getOpenTelemetrySdk()); if (isNotBlank(arguments.authPluginClassName)) { clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams);