Skip to content

Commit

Permalink
Revert changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Oct 6, 2023
1 parent 45a8492 commit 7523090
Showing 1 changed file with 11 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,7 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -1626,51 +1615,24 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull To
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());

if (serviceConfig.isStrictBookieAffinityEnabled()) {
if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
IsolatedBookieEnsemblePlacementPolicy.class);
if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
} else if (isSystemTopic(topicName)) {
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*");
properties.put(IsolatedBookieEnsemblePlacementPolicy
.SECONDARY_ISOLATION_BOOKIE_GROUPS, "*");
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
} else {
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "");
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
} else {
if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(
IsolatedBookieEnsemblePlacementPolicy.class);
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
Map<String, Object> properties = new HashMap<>();
properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary());
properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary());
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}

managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());

managedLedgerConfig
.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig.setPersistentUnackedRangesWithMultipleEntriesEnabled(
serviceConfig.isPersistentUnackedRangesWithMultipleEntriesEnabled());
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInMetadataStore());
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
managedLedgerConfig
.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
Expand Down Expand Up @@ -1705,12 +1667,6 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull To
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
serviceConfig.isCacheEvictionByMarkDeletedPosition());
managedLedgerConfig.setMinimumBacklogCursorsForCaching(
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
managedLedgerConfig.setMinimumBacklogEntriesForCaching(
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());

OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
Expand Down Expand Up @@ -1740,6 +1696,7 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull To
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
managedLedgerConfig.setNewEntriesCheckDelayInMillis(
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());

return managedLedgerConfig;
});
});
Expand Down

0 comments on commit 7523090

Please sign in to comment.