From 287e3d6b308eaebdf35c18164893e343e6538dd8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 21 Dec 2022 19:54:54 +0200 Subject: [PATCH 1/2] [fix][broker] Fix thread safety issue in info-internal admin api for partitioned topics - the resulting map isn't thread safe in PartitionedManagedLedgerInfo - it's better to refactor the logic to compose the results after the futures have completed --- .../admin/impl/PersistentTopicsBase.java | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a3cfecc075066..7caf03d40e4bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Base64; import java.util.Collections; @@ -63,6 +64,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.admin.AdminResource; @@ -1288,38 +1290,28 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean getPartitionedTopicMetadataAsync(topicName, authoritative, false) .thenAccept(partitionMetadata -> { if (partitionMetadata.partitions > 0) { - final List> futures = + final List>> futures = new ArrayList<>(partitionMetadata.partitions); - PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = new PartitionedManagedLedgerInfo(); for (int i = 0; i < partitionMetadata.partitions; i++) { TopicName topicNamePartition = topicName.getPartition(i); try { futures.add(pulsar().getAdminClient().topics() .getInternalInfoAsync(topicNamePartition.toString()) - .whenComplete((response, throwable) -> { - if (throwable != null) { - log.error("[{}] Failed to get managed info for {}", - clientAppId(), topicNamePartition, throwable); - asyncResponse.resume(new RestException(throwable)); - } + .thenApply((response) -> { try { - partitionedManagedLedgerInfo.partitions - .put(topicNamePartition.toString(), jsonMapper() - .readValue(response, ManagedLedgerInfo.class)); - } catch (JsonProcessingException ex) { - log.error("[{}] Failed to parse ManagedLedgerInfo for {} from [{}]", - clientAppId(), topicNamePartition, response, ex); + return Pair.of(topicNamePartition.toString(), jsonMapper() + .readValue(response, ManagedLedgerInfo.class)); + } catch (JsonProcessingException e) { + throw new UncheckedIOException(e); } - }) - ); + })); } catch (PulsarServerException e) { log.error("[{}] Failed to get admin client while get managed info for {}" , clientAppId(), topicNamePartition, e); throw new RestException(e); } } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { + FutureUtil.waitForAll(futures).whenComplete((result, exception) -> { if (exception != null) { Throwable t = exception.getCause(); if (t instanceof NotFoundException) { @@ -1330,10 +1322,15 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean asyncResponse.resume(new RestException(t)); } } + PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = + new PartitionedManagedLedgerInfo(); + for (CompletableFuture> infoFuture : futures) { + Pair info = infoFuture.getNow(null); + partitionedManagedLedgerInfo.partitions.put(info.getKey(), info.getValue()); + } asyncResponse.resume((StreamingOutput) output -> { jsonMapper().writer().writeValue(output, partitionedManagedLedgerInfo); }); - return null; }); } else { internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse); From d574cdbed6b81b6b7ed7ba1c6be8f7e3de54b9f7 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 22 Dec 2022 18:30:26 +0200 Subject: [PATCH 2/2] Fix review comment --- .../admin/impl/PersistentTopicsBase.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7caf03d40e4bf..78c137f94e3fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1321,16 +1321,17 @@ protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t); asyncResponse.resume(new RestException(t)); } + } else { + PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = + new PartitionedManagedLedgerInfo(); + for (CompletableFuture> infoFuture : futures) { + Pair info = infoFuture.getNow(null); + partitionedManagedLedgerInfo.partitions.put(info.getKey(), info.getValue()); + } + asyncResponse.resume((StreamingOutput) output -> { + jsonMapper().writer().writeValue(output, partitionedManagedLedgerInfo); + }); } - PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = - new PartitionedManagedLedgerInfo(); - for (CompletableFuture> infoFuture : futures) { - Pair info = infoFuture.getNow(null); - partitionedManagedLedgerInfo.partitions.put(info.getKey(), info.getValue()); - } - asyncResponse.resume((StreamingOutput) output -> { - jsonMapper().writer().writeValue(output, partitionedManagedLedgerInfo); - }); }); } else { internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);