diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java index 281f19e388837..68135598e3339 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java @@ -28,6 +28,7 @@ import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Map; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -207,6 +208,9 @@ private Pair buildOpenTelemetryAndReader var metricReader = InMemoryMetricReader.create(); var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder() .disableShutdownHook() + .addPropertiesSupplier(() -> Map.of("otel.metrics.exporter", "none", + "otel.traces.exporter", "none", + "otel.logs.exporter", "none")) .addMeterProviderCustomizer((builder, __) -> builder.registerMetricReader(metricReader)) .build() .getOpenTelemetrySdk(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 11f00fb28e34b..96ea2004be8d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1363,8 +1363,8 @@ public static CompletableFuture> getMigratedClusterUrlAsync .getClusterPoliciesAsync(pulsar.getConfig().getClusterName()) .thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic), ((clusterData, isNamespaceMigrationEnabled) -> { - Optional url = ((clusterData.isPresent() && clusterData.get().isMigrated()) - || isNamespaceMigrationEnabled) + Optional url = (clusterData.isPresent() && (clusterData.get().isMigrated() + || isNamespaceMigrationEnabled)) ? Optional.ofNullable(clusterData.get().getMigratedClusterUrl()) : Optional.empty(); return url; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index e56a3495600f0..e6a7d049366e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -297,7 +297,7 @@ public void testClusterMigration() throws Exception { assertFalse(topic2.getProducers().isEmpty()); ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), - pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); + pulsar2.getBrokerServiceUrl(), null); admin1.clusters().updateClusterMigration("r1", true, migratedUrl); assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java index 69653ea26e240..0d46e80a70302 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java @@ -36,12 +36,20 @@ public static Consumer getOpenTelemetrySd sdkBuilder.addMeterProviderCustomizer( (meterProviderBuilder, __) -> meterProviderBuilder.registerMetricReader(reader)); sdkBuilder.disableShutdownHook(); + disableExporters(sdkBuilder); sdkBuilder.addPropertiesSupplier( () -> Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false", "otel.java.enabled.resource.providers", "none")); }; } + public static void disableExporters(AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder) { + sdkBuilder.addPropertiesSupplier(() -> + Map.of("otel.metrics.exporter", "none", + "otel.traces.exporter", "none", + "otel.logs.exporter", "none")); + } + public static void assertMetricDoubleSumValue(Collection metrics, String metricName, Attributes attributes, Consumer valueConsumer) { assertThat(metrics) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java index cdb047079bfcd..6403c3bcec4c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java @@ -761,7 +761,8 @@ protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) { var reader = InMemoryMetricReader.create(); openTelemetryMetricReader(reader); registerCloseable(reader); - openTelemetrySdkBuilderCustomizer = BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(reader); + openTelemetrySdkBuilderCustomizer = + BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(reader); } else { openTelemetrySdkBuilderCustomizer = null; } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index 9f11b48513867..7ea66ef36f5ad 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -163,7 +163,7 @@ private class GetClusterMigration extends CliCommand { @Override void run() throws PulsarAdminException { - getAdmin().clusters().getClusterMigration(cluster); + print(getAdmin().clusters().getClusterMigration(cluster)); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 24163c631ffe9..35c41455e8987 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -710,9 +710,12 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn @Override protected void handleTopicMigrated(CommandTopicMigrated commandTopicMigrated) { final long resourceId = commandTopicMigrated.getResourceId(); - final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl(); - final String serviceUrlTls = commandTopicMigrated.getBrokerServiceUrlTls(); - + final String serviceUrl = commandTopicMigrated.hasBrokerServiceUrl() + ? commandTopicMigrated.getBrokerServiceUrl() + : null; + final String serviceUrlTls = commandTopicMigrated.hasBrokerServiceUrlTls() + ? commandTopicMigrated.getBrokerServiceUrlTls() + : null; HandlerState resource = commandTopicMigrated.getResourceType() == ResourceType.Producer ? producers.get(resourceId) : consumers.get(resourceId); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 8635368f00f0b..19aa9907549d9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -88,6 +88,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; +import org.apache.pulsar.common.api.proto.CommandTopicMigrated; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.IntRange; @@ -768,11 +769,14 @@ public static ByteBuf newReachedEndOfTopic(long consumerId) { public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls) { BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED); - cmd.setTopicMigrated() - .setResourceType(type) - .setResourceId(resourceId) - .setBrokerServiceUrl(brokerUrl) - .setBrokerServiceUrlTls(brokerUrlTls); + CommandTopicMigrated migratedCmd = cmd.setTopicMigrated(); + migratedCmd.setResourceType(type).setResourceId(resourceId); + if (StringUtils.isNotBlank(brokerUrl)) { + migratedCmd.setBrokerServiceUrl(brokerUrl); + } + if (StringUtils.isNotBlank(brokerUrlTls)) { + migratedCmd.setBrokerServiceUrlTls(brokerUrlTls); + } return serializeWithSize(cmd); } diff --git a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java index f1a7ffaa12289..99d4189d8f803 100644 --- a/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java +++ b/pulsar-opentelemetry/src/test/java/org/apache/pulsar/opentelemetry/OpenTelemetryServiceTest.java @@ -76,6 +76,11 @@ private static Consumer getBuilderCustomi (sdkMeterProviderBuilder, __) -> sdkMeterProviderBuilder.registerMetricReader(extraReader)); } autoConfigurationCustomizer.disableShutdownHook(); + // disable all autoconfigured exporters + autoConfigurationCustomizer.addPropertiesSupplier(() -> + Map.of("otel.metrics.exporter", "none", + "otel.traces.exporter", "none", + "otel.logs.exporter", "none")); autoConfigurationCustomizer.addPropertiesSupplier(() -> extraProperties); }; } diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java index 67345ebd47e31..f446961c1d8fe 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java @@ -36,7 +36,16 @@ class DLOutputStream { private final DistributedLogManager distributedLogManager; private final AsyncLogWriter writer; - private final byte[] readBuffer = new byte[8192]; + /* + * The LogRecord structure is: + * ------------------- + * Bytes 0 - 7 : Metadata (Long) + * Bytes 8 - 15 : TxId (Long) + * Bytes 16 - 19 : Payload length (Integer) + * Bytes 20 - 20+payload.length-1 : Payload (Byte[]) + * So the max buffer size should be LogRecord.MAX_LOGRECORD_SIZE - 2 * (Long.SIZE / 8) - Integer.SIZE / 8 + */ + private final byte[] readBuffer = new byte[LogRecord.MAX_LOGRECORD_SIZE - 2 * (Long.SIZE / 8) - Integer.SIZE / 8]; private long offset = 0L; private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter writer) { @@ -51,9 +60,9 @@ static CompletableFuture openWriterAsync(DistributedLogManager d private void writeAsyncHelper(InputStream is, CompletableFuture result) { try { - int read = is.read(readBuffer); - if (read != -1) { - log.info("write something into the ledgers offset: {}, length: {}", offset, read); + int read = is.readNBytes(readBuffer, 0, readBuffer.length); + if (read > 0) { + log.debug("write something into the ledgers offset: {}, length: {}", offset, read); final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read); offset += writeBuf.readableBytes(); final LogRecord record = new LogRecord(offset, writeBuf); diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java index b55e0e0d34a4f..235cb4fefc0c3 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java @@ -99,7 +99,7 @@ public void writeBytesArrayData() throws ExecutionException, InterruptedExceptio @Test public void writeLongBytesArrayData() throws ExecutionException, InterruptedException { - byte[] data = new byte[8192 * 3 + 4096]; + byte[] data = new byte[1040364 * 3 + 4096]; DLOutputStream.openWriterAsync(dlm) .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)) .thenCompose(DLOutputStream::closeAsync)).get();