Skip to content

Commit

Permalink
Merge branch 'master' into fix/doReconsumerLater
Browse files Browse the repository at this point in the history
  • Loading branch information
hanmz committed Nov 5, 2024
2 parents aa709c0 + 7a47888 commit 227cd3b
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.util.Map;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -207,6 +208,9 @@ private Pair<OpenTelemetrySdk, InMemoryMetricReader> buildOpenTelemetryAndReader
var metricReader = InMemoryMetricReader.create();
var openTelemetry = AutoConfiguredOpenTelemetrySdk.builder()
.disableShutdownHook()
.addPropertiesSupplier(() -> Map.of("otel.metrics.exporter", "none",
"otel.traces.exporter", "none",
"otel.logs.exporter", "none"))
.addMeterProviderCustomizer((builder, __) -> builder.registerMetricReader(metricReader))
.build()
.getOpenTelemetrySdk();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1363,8 +1363,8 @@ public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync
.getClusterPoliciesAsync(pulsar.getConfig().getClusterName())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
((clusterData, isNamespaceMigrationEnabled) -> {
Optional<ClusterUrl> url = ((clusterData.isPresent() && clusterData.get().isMigrated())
|| isNamespaceMigrationEnabled)
Optional<ClusterUrl> url = (clusterData.isPresent() && (clusterData.get().isMigrated()
|| isNamespaceMigrationEnabled))
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty();
return url;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testClusterMigration() throws Exception {
assertFalse(topic2.getProducers().isEmpty());

ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
pulsar2.getBrokerServiceUrl(), null);
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,20 @@ public static Consumer<AutoConfiguredOpenTelemetrySdkBuilder> getOpenTelemetrySd
sdkBuilder.addMeterProviderCustomizer(
(meterProviderBuilder, __) -> meterProviderBuilder.registerMetricReader(reader));
sdkBuilder.disableShutdownHook();
disableExporters(sdkBuilder);
sdkBuilder.addPropertiesSupplier(
() -> Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false",
"otel.java.enabled.resource.providers", "none"));
};
}

public static void disableExporters(AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder) {
sdkBuilder.addPropertiesSupplier(() ->
Map.of("otel.metrics.exporter", "none",
"otel.traces.exporter", "none",
"otel.logs.exporter", "none"));
}

public static void assertMetricDoubleSumValue(Collection<MetricData> metrics, String metricName,
Attributes attributes, Consumer<Double> valueConsumer) {
assertThat(metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
var reader = InMemoryMetricReader.create();
openTelemetryMetricReader(reader);
registerCloseable(reader);
openTelemetrySdkBuilderCustomizer = BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(reader);
openTelemetrySdkBuilderCustomizer =
BrokerOpenTelemetryTestUtil.getOpenTelemetrySdkBuilderConsumer(reader);
} else {
openTelemetrySdkBuilderCustomizer = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private class GetClusterMigration extends CliCommand {

@Override
void run() throws PulsarAdminException {
getAdmin().clusters().getClusterMigration(cluster);
print(getAdmin().clusters().getClusterMigration(cluster));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,12 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn
@Override
protected void handleTopicMigrated(CommandTopicMigrated commandTopicMigrated) {
final long resourceId = commandTopicMigrated.getResourceId();
final String serviceUrl = commandTopicMigrated.getBrokerServiceUrl();
final String serviceUrlTls = commandTopicMigrated.getBrokerServiceUrlTls();

final String serviceUrl = commandTopicMigrated.hasBrokerServiceUrl()
? commandTopicMigrated.getBrokerServiceUrl()
: null;
final String serviceUrlTls = commandTopicMigrated.hasBrokerServiceUrlTls()
? commandTopicMigrated.getBrokerServiceUrlTls()
: null;
HandlerState resource = commandTopicMigrated.getResourceType() == ResourceType.Producer
? producers.get(resourceId)
: consumers.get(resourceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.IntRange;
Expand Down Expand Up @@ -768,11 +769,14 @@ public static ByteBuf newReachedEndOfTopic(long consumerId) {

public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls) {
BaseCommand cmd = localCmd(Type.TOPIC_MIGRATED);
cmd.setTopicMigrated()
.setResourceType(type)
.setResourceId(resourceId)
.setBrokerServiceUrl(brokerUrl)
.setBrokerServiceUrlTls(brokerUrlTls);
CommandTopicMigrated migratedCmd = cmd.setTopicMigrated();
migratedCmd.setResourceType(type).setResourceId(resourceId);
if (StringUtils.isNotBlank(brokerUrl)) {
migratedCmd.setBrokerServiceUrl(brokerUrl);
}
if (StringUtils.isNotBlank(brokerUrlTls)) {
migratedCmd.setBrokerServiceUrlTls(brokerUrlTls);
}
return serializeWithSize(cmd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ private static Consumer<AutoConfiguredOpenTelemetrySdkBuilder> getBuilderCustomi
(sdkMeterProviderBuilder, __) -> sdkMeterProviderBuilder.registerMetricReader(extraReader));
}
autoConfigurationCustomizer.disableShutdownHook();
// disable all autoconfigured exporters
autoConfigurationCustomizer.addPropertiesSupplier(() ->
Map.of("otel.metrics.exporter", "none",
"otel.traces.exporter", "none",
"otel.logs.exporter", "none"));
autoConfigurationCustomizer.addPropertiesSupplier(() -> extraProperties);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,16 @@ class DLOutputStream {

private final DistributedLogManager distributedLogManager;
private final AsyncLogWriter writer;
private final byte[] readBuffer = new byte[8192];
/*
* The LogRecord structure is:
* -------------------
* Bytes 0 - 7 : Metadata (Long)
* Bytes 8 - 15 : TxId (Long)
* Bytes 16 - 19 : Payload length (Integer)
* Bytes 20 - 20+payload.length-1 : Payload (Byte[])
* So the max buffer size should be LogRecord.MAX_LOGRECORD_SIZE - 2 * (Long.SIZE / 8) - Integer.SIZE / 8
*/
private final byte[] readBuffer = new byte[LogRecord.MAX_LOGRECORD_SIZE - 2 * (Long.SIZE / 8) - Integer.SIZE / 8];
private long offset = 0L;

private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter writer) {
Expand All @@ -51,9 +60,9 @@ static CompletableFuture<DLOutputStream> openWriterAsync(DistributedLogManager d

private void writeAsyncHelper(InputStream is, CompletableFuture<DLOutputStream> result) {
try {
int read = is.read(readBuffer);
if (read != -1) {
log.info("write something into the ledgers offset: {}, length: {}", offset, read);
int read = is.readNBytes(readBuffer, 0, readBuffer.length);
if (read > 0) {
log.debug("write something into the ledgers offset: {}, length: {}", offset, read);
final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read);
offset += writeBuf.readableBytes();
final LogRecord record = new LogRecord(offset, writeBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void writeBytesArrayData() throws ExecutionException, InterruptedExceptio

@Test
public void writeLongBytesArrayData() throws ExecutionException, InterruptedException {
byte[] data = new byte[8192 * 3 + 4096];
byte[] data = new byte[1040364 * 3 + 4096];
DLOutputStream.openWriterAsync(dlm)
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();
Expand Down

0 comments on commit 227cd3b

Please sign in to comment.