From 21128e7af3342b943f406455709a745b5219b7c2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 3 Sep 2024 23:26:59 +0800 Subject: [PATCH 1/9] [improve][pip] PIP-376: Make topic policies service pluggable --- pip/pip-376.md | 138 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 pip/pip-376.md diff --git a/pip/pip-376.md b/pip/pip-376.md new file mode 100644 index 0000000000000..0eab4f2bb410a --- /dev/null +++ b/pip/pip-376.md @@ -0,0 +1,138 @@ +# PIP-376: Make topic policies service pluggable + +# Background knowledge + +## Topic policies service and system topics + +[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events) introduces system topics and the topic level policies. However, the topic policies service (`TopicPoliciesService`) only has one implementation (`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So the following configs are both required (though they're all enabled by default now): + +```properties +systemTopicEnabled=true +topicLevelPoliciesEnabled=true +``` + +However, if the Pulsar storage is switched to a S3-based solution (by modifying the `managedLedgerStorageClassName` config), using system topics to manage topic policies could have low performance (due to the S3 write and read latency) and higher cost (due to redundant S3 API calls). + +## Badly designed TopicPoliciesService interface + +The `TopicPoliciesService` interface is a terrible abstraction because it's never designed for 3rd party implementations. + +1. Methods that should not be exposed + +`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only used internally in `SystemTopicBasedTopicPoliciesService`. + +`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just creates a reader to replay the `__change_events` topic and construct the topic policies map. + +2. Confusing and inconsistent `getTopicPolicies` family + +There are two overrides of `getTopicPolicies`: + +```java +TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; +TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException; +``` + +The 2nd method is equivalent to `getTopicPolicies(topicName, false)`. + +The semantics of these two methods are very intuitive. First, they are not synchronous methods that are blocked by waiting a future. They just start an asynchronous policies initialization (creating a reader to replay the `__change_events` topic), and then try to get the policies from the cache. If the asynchronous policies initialization didn't start, just throw `TopicPoliciesCacheNotInitException`. + +As you can see, these two methods are hard to use. And they are also only used in tests except for the `getTopicPoliciesAsyncWithRetry` method, which uses a user-provided executor and backoff policy to call `getTopicPolicies` until `TopicPoliciesCacheNotInitException` is not thrown: + +```java + default CompletableFuture> getTopicPoliciesAsyncWithRetry(TopicName topicName, + final Backoff backoff, ScheduledExecutorService scheduledExecutorService, boolean isGlobal) { +``` + +The `getTopicPolicies` overrides are only called in tests while `getTopicPoliciesAsyncWithRetry` is used in the core. It would be very confusing to users that want to implement their own topic policies service. They have to look deeply into the Pulsar's source code to know these details. + +https://github.com/apache/pulsar/pull/21231 adds two asynchronous overrides that are much more friendly to users: + +```java +CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal); +CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName); +``` + +Now we have **5** asynchronous get methods. What's worse, unlike `getTopicPolicies`, `getTopicPoliciesAsync(topic)` is not equivalent to `getTopicPoliciesAsync(topic, false)`, instead, +- `getTopicPoliciesAsync(topic)` will try getting local policies first, if absent, then try getting global policies +- `getTopicPoliciesAsync(topic, true)` will try getting global policies +- `getTopicPoliciesAsync(topic, false)` will try getting local policies + +It should be noted that the topic policies support global policies across clusters since [#12517](https://github.com/apache/pulsar/pull/12517). So there are local policies and global policies. + +Currently, +- `getTopicPoliciesAsync(TopicName)` is used in `BrokerService#getTopicPoliciesBypassSystemTopic`, which is called when initializing the topic policies of `PersistentTopic` objects. So it uses the "local-first" semantics in case the global policies or local policies is deleted. +- `getTopicPoliciesAsyncWithRetry` is used in `AdminResource#getTopicPoliciesAsyncWithRetry`, which is called for all topic policies admin APIs. Since these admin APIs all have a `isGlobal` field to indicate whether to get the global policies, it uses the "local only" or "global only" semantics. +- Other methods are never called directly other than tests. + +Actually there is the 6th method `getTopicPoliciesIfExists`, which just tries to get the local topic policies from the cache. This method is the most clear and simple in all these stuffs. + +```java + TopicPolicies getTopicPoliciesIfExists(TopicName topicName); +``` + +# Motivation + +Make `TopicPoliciesService` pluggable so that users can customize topic policies service via another backend metadata store. + +# Goals + +## In Scope + +Redesign a clear and simple `TopicPoliciesService` interface for users to customize. + +# High Level Design + +Add a `topicPoliciesServiceClassName` config to specify the topic policies service class name. If the class name is not the default `SystemTopicBasedTopicPoliciesService`, `systemTopicEnabled` will not be required unless the implementation requires it. + +# Detailed Design + +## Design & Implementation Details + +1. Add a unified method to get topic policies. + +```java + enum GetType { + LOCAL_FIRST, // try getting the local topic policies, if not present, then get the global policies + GLOBAL_ONLY, // only get the global policies + LOCAL_ONLY, // only get the local policies + } + CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type); +``` + +`getTopicPoliciesAsyncWithRetry` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_ONLY)` or `getTopicPoliciesAsync(topicName, GLOBAL_ONLY)`. Other two original `getTopicPoliciesAsync` methods will be removed and replaced by `getTopicPoliciesAsync(topicName, LOCAL_FIRST)`. + +2. Move `addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` to private methods of `SystemTopicBasedTopicPoliciesService`. + +3. Add a `TestUtils` class in tests to include `getTopicPolicies` and `getTopicPoliciesBypassCacheAsync` methods. + +4. The generic parameter is removed from `TopicPolicyListener` because the value type should always be `TopicPolicies`. Also, mark this listener interface as `Stable`. + +5. Add a `PulserService` parameter to the `start` method so that the implementation can implement a constructor with empty parameter list and get the `PulsarService` instance from the `start` method. + +6. Add a `boolean` return value to `registerListener` since `PersistentTopic#initTopicPolicy` checks if the topic policies is enabled, while we can just use the return value to indicate if the `TopicPoliciesService` instance is `topicPoliciesServiceClassName.DISABLED`. + +Since now the topic policies service is decoupled with system topics, remove all `isSystemTopicAndTopicLevelPoliciesEnabled()` calls. + +### Configuration + +New config `topicLevelPoliciesEnabled` will be added. + +# Backward & Forward Compatibility + +If the downstream application needs to call APIs from `TopicPoliciesService`, it should modify the code to use the new API. + +# Alternatives + +## Keep the `TopicPoliciesService` interface compatible. + +This interface was badly designed because it has only one implementation. Keeping these methods here will be a burden for developers to develop a customized interface. They need to know where these confusing methods are called and need to take them very carefully. + +# General Notes + +# Links + + +* Mailing List discussion thread: +* Mailing List voting thread: From 246c9280b2aaca83b1051a2c993923de9a237c6b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Sep 2024 10:53:33 +0800 Subject: [PATCH 2/9] Remove unnecessary mention for storage --- pip/pip-376.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-376.md b/pip/pip-376.md index 0eab4f2bb410a..3d25edd4d79d5 100644 --- a/pip/pip-376.md +++ b/pip/pip-376.md @@ -11,7 +11,7 @@ systemTopicEnabled=true topicLevelPoliciesEnabled=true ``` -However, if the Pulsar storage is switched to a S3-based solution (by modifying the `managedLedgerStorageClassName` config), using system topics to manage topic policies could have low performance (due to the S3 write and read latency) and higher cost (due to redundant S3 API calls). +However, sometimes using system topics to manage topic policies might not be the best choice. In this case, users might need a way to adopt another approach to manage topic policies. ## Badly designed TopicPoliciesService interface From d18754a67fc0aaeda2c18abab2f72fccb95800ff Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Sep 2024 16:25:34 +0800 Subject: [PATCH 3/9] Address comments and refactor the documents --- pip/pip-376.md | 140 +++++++++++++++++++++++-------------------------- 1 file changed, 65 insertions(+), 75 deletions(-) diff --git a/pip/pip-376.md b/pip/pip-376.md index 3d25edd4d79d5..9405f562cd298 100644 --- a/pip/pip-376.md +++ b/pip/pip-376.md @@ -1,138 +1,128 @@ -# PIP-376: Make topic policies service pluggable +# PIP-376: Make Topic Policies Service Pluggable -# Background knowledge +## Background -## Topic policies service and system topics +### Topic Policies Service and System Topics -[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events) introduces system topics and the topic level policies. However, the topic policies service (`TopicPoliciesService`) only has one implementation (`SystemTopicBasedTopicPoliciesService`) that depends on the system topics. So the following configs are both required (though they're all enabled by default now): +[PIP-39](https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events) introduces system topics and topic-level policies. Currently, the topic policies service (`TopicPoliciesService`) has only one implementation (`SystemTopicBasedTopicPoliciesService`) that depends on system topics. Therefore, the following configurations are required (though they are enabled by default): ```properties systemTopicEnabled=true topicLevelPoliciesEnabled=true ``` -However, sometimes using system topics to manage topic policies might not be the best choice. In this case, users might need a way to adopt another approach to manage topic policies. +However, using system topics to manage topic policies may not always be the best choice. Users might need an alternative approach to manage topic policies. -## Badly designed TopicPoliciesService interface +### Issues with the Current `TopicPoliciesService` Interface -The `TopicPoliciesService` interface is a terrible abstraction because it's never designed for 3rd party implementations. +The `TopicPoliciesService` interface is poorly designed for third-party implementations due to the following reasons: -1. Methods that should not be exposed +1. **Methods that Should Not Be Exposed**: + - `addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are used internally in `SystemTopicBasedTopicPoliciesService`. + - `getTopicPoliciesBypassCacheAsync` is used only in tests to replay the `__change_events` topic and construct the topic policies map. -`addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` are only used internally in `SystemTopicBasedTopicPoliciesService`. +2. **Confusing and Inconsistent `getTopicPolicies` Methods**: + - There are two overrides of `getTopicPolicies`: + ```java + TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; + TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException; + ``` + - The second method is equivalent to `getTopicPolicies(topicName, false)`. + - These methods are asynchronous and start an asynchronous policies initialization, then try to get the policies from the cache. If the initialization hasn't started, they throw `TopicPoliciesCacheNotInitException`. -`getTopicPoliciesBypassCacheAsync` is only used in tests. This method just creates a reader to replay the `__change_events` topic and construct the topic policies map. - -2. Confusing and inconsistent `getTopicPolicies` family - -There are two overrides of `getTopicPolicies`: +These methods are hard to use and are primarily used in tests. The `getTopicPoliciesAsyncWithRetry` method uses a user-provided executor and backoff policy to call `getTopicPolicies` until `TopicPoliciesCacheNotInitException` is not thrown: ```java -TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; -TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException; +default CompletableFuture> getTopicPoliciesAsyncWithRetry(TopicName topicName, + final Backoff backoff, ScheduledExecutorService scheduledExecutorService, boolean isGlobal) { ``` -The 2nd method is equivalent to `getTopicPolicies(topicName, false)`. - -The semantics of these two methods are very intuitive. First, they are not synchronous methods that are blocked by waiting a future. They just start an asynchronous policies initialization (creating a reader to replay the `__change_events` topic), and then try to get the policies from the cache. If the asynchronous policies initialization didn't start, just throw `TopicPoliciesCacheNotInitException`. +The `getTopicPolicies` methods are confusing for users who want to implement their own topic policies service. They need to look deeply into Pulsar's source code to understand these details. -As you can see, these two methods are hard to use. And they are also only used in tests except for the `getTopicPoliciesAsyncWithRetry` method, which uses a user-provided executor and backoff policy to call `getTopicPolicies` until `TopicPoliciesCacheNotInitException` is not thrown: - -```java - default CompletableFuture> getTopicPoliciesAsyncWithRetry(TopicName topicName, - final Backoff backoff, ScheduledExecutorService scheduledExecutorService, boolean isGlobal) { -``` - -The `getTopicPolicies` overrides are only called in tests while `getTopicPoliciesAsyncWithRetry` is used in the core. It would be very confusing to users that want to implement their own topic policies service. They have to look deeply into the Pulsar's source code to know these details. - -https://github.com/apache/pulsar/pull/21231 adds two asynchronous overrides that are much more friendly to users: +[PR #21231](https://github.com/apache/pulsar/pull/21231) adds two asynchronous overrides that are more user-friendly: ```java CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal); CompletableFuture> getTopicPoliciesAsync(@Nonnull TopicName topicName); ``` -Now we have **5** asynchronous get methods. What's worse, unlike `getTopicPolicies`, `getTopicPoliciesAsync(topic)` is not equivalent to `getTopicPoliciesAsync(topic, false)`, instead, -- `getTopicPoliciesAsync(topic)` will try getting local policies first, if absent, then try getting global policies -- `getTopicPoliciesAsync(topic, true)` will try getting global policies -- `getTopicPoliciesAsync(topic, false)` will try getting local policies +Now there are five asynchronous `get` methods. Unlike `getTopicPolicies`, `getTopicPoliciesAsync(topic)` is not equivalent to `getTopicPoliciesAsync(topic, false)`. Instead: +- `getTopicPoliciesAsync(topic)` tries getting local policies first, then global policies if absent. +- `getTopicPoliciesAsync(topic, true)` tries getting global policies. +- `getTopicPoliciesAsync(topic, false)` tries getting local policies. -It should be noted that the topic policies support global policies across clusters since [#12517](https://github.com/apache/pulsar/pull/12517). So there are local policies and global policies. +Since [PR #12517](https://github.com/apache/pulsar/pull/12517), topic policies support global policies across clusters. Therefore, there are local and global policies. -Currently, -- `getTopicPoliciesAsync(TopicName)` is used in `BrokerService#getTopicPoliciesBypassSystemTopic`, which is called when initializing the topic policies of `PersistentTopic` objects. So it uses the "local-first" semantics in case the global policies or local policies is deleted. -- `getTopicPoliciesAsyncWithRetry` is used in `AdminResource#getTopicPoliciesAsyncWithRetry`, which is called for all topic policies admin APIs. Since these admin APIs all have a `isGlobal` field to indicate whether to get the global policies, it uses the "local only" or "global only" semantics. -- Other methods are never called directly other than tests. +Currently: +- `getTopicPoliciesAsync(TopicName)` is used in `BrokerService#getTopicPoliciesBypassSystemTopic` for initializing topic policies of `PersistentTopic` objects. +- `getTopicPoliciesAsyncWithRetry` is used in `AdminResource#getTopicPoliciesAsyncWithRetry` for all topic policies admin APIs. +- Other methods are used only in tests. -Actually there is the 6th method `getTopicPoliciesIfExists`, which just tries to get the local topic policies from the cache. This method is the most clear and simple in all these stuffs. +There is also a sixth method, `getTopicPoliciesIfExists`, which tries to get local topic policies from the cache: ```java - TopicPolicies getTopicPoliciesIfExists(TopicName topicName); +TopicPolicies getTopicPoliciesIfExists(TopicName topicName); ``` -# Motivation +## Motivation -Make `TopicPoliciesService` pluggable so that users can customize topic policies service via another backend metadata store. +Make `TopicPoliciesService` pluggable so users can customize the topic policies service via another backend metadata store. -# Goals +## Goals -## In Scope +### In Scope Redesign a clear and simple `TopicPoliciesService` interface for users to customize. -# High Level Design - -Add a `topicPoliciesServiceClassName` config to specify the topic policies service class name. If the class name is not the default `SystemTopicBasedTopicPoliciesService`, `systemTopicEnabled` will not be required unless the implementation requires it. +## High-Level Design -# Detailed Design +Add a `topicPoliciesServiceClassName` configuration to specify the topic policies service class name. If the class name is not the default `SystemTopicBasedTopicPoliciesService`, `systemTopicEnabled` will not be required unless the implementation requires it. -## Design & Implementation Details +## Detailed Design -1. Add a unified method to get topic policies. +### Design & Implementation Details -```java - enum GetType { - LOCAL_FIRST, // try getting the local topic policies, if not present, then get the global policies - GLOBAL_ONLY, // only get the global policies - LOCAL_ONLY, // only get the local policies - } - CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type); -``` +1. Add a unified method to get topic policies: + ```java + enum GetType { + LOCAL_FIRST, // Try getting the local topic policies, if not present, then get the global policies + GLOBAL_ONLY, // Only get the global policies + LOCAL_ONLY, // Only get the local policies + } + CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type); + ``` -`getTopicPoliciesAsyncWithRetry` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_ONLY)` or `getTopicPoliciesAsync(topicName, GLOBAL_ONLY)`. Other two original `getTopicPoliciesAsync` methods will be removed and replaced by `getTopicPoliciesAsync(topicName, LOCAL_FIRST)`. + `getTopicPoliciesAsyncWithRetry` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_ONLY)` or `getTopicPoliciesAsync(topicName, GLOBAL_ONLY)`. The other two original `getTopicPoliciesAsync` methods will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_FIRST)`. 2. Move `addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` to private methods of `SystemTopicBasedTopicPoliciesService`. 3. Add a `TestUtils` class in tests to include `getTopicPolicies` and `getTopicPoliciesBypassCacheAsync` methods. -4. The generic parameter is removed from `TopicPolicyListener` because the value type should always be `TopicPolicies`. Also, mark this listener interface as `Stable`. +4. Remove the generic parameter from `TopicPolicyListener` as the value type should always be `TopicPolicies`. Mark this listener interface as `Stable`. -5. Add a `PulserService` parameter to the `start` method so that the implementation can implement a constructor with empty parameter list and get the `PulsarService` instance from the `start` method. +5. Add a `PulsarService` parameter to the `start` method so that the implementation can have a constructor with an empty parameter list and get the `PulsarService` instance from the `start` method. -6. Add a `boolean` return value to `registerListener` since `PersistentTopic#initTopicPolicy` checks if the topic policies is enabled, while we can just use the return value to indicate if the `TopicPoliciesService` instance is `topicPoliciesServiceClassName.DISABLED`. +6. Add a `boolean` return value to `registerListener` since `PersistentTopic#initTopicPolicy` checks if the topic policies are enabled. The return value will indicate if the `TopicPoliciesService` instance is `topicPoliciesServiceClassName.DISABLED`. -Since now the topic policies service is decoupled with system topics, remove all `isSystemTopicAndTopicLevelPoliciesEnabled()` calls. +Since the topic policies service is now decoupled from system topics, remove all `isSystemTopicAndTopicLevelPoliciesEnabled()` calls. ### Configuration -New config `topicLevelPoliciesEnabled` will be added. +Add a new configuration `topicLevelPoliciesEnabled`. -# Backward & Forward Compatibility +## Backward & Forward Compatibility -If the downstream application needs to call APIs from `TopicPoliciesService`, it should modify the code to use the new API. +If downstream applications need to call APIs from `TopicPoliciesService`, they should modify the code to use the new API. -# Alternatives +## Alternatives -## Keep the `TopicPoliciesService` interface compatible. +### Keep the `TopicPoliciesService` Interface Compatible -This interface was badly designed because it has only one implementation. Keeping these methods here will be a burden for developers to develop a customized interface. They need to know where these confusing methods are called and need to take them very carefully. +The current interface is poorly designed because it has only one implementation. Keeping these methods will burden developers who want to develop a customized interface. They need to understand where these confusing methods are called and handle them carefully. -# General Notes +## General Notes -# Links +## Links - -* Mailing List discussion thread: +* Mailing List discussion thread: https://lists.apache.org/thread/gf6h4n5n1z4n8v6bxdthct1n07onfdxt * Mailing List voting thread: From 45c93fa57894afce41fbfd85d4076c5eef570f51 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Sep 2024 18:20:28 +0800 Subject: [PATCH 4/9] Remove getTopicPoliciesIfExists --- pip/pip-376.md | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/pip/pip-376.md b/pip/pip-376.md index 9405f562cd298..2e6d9e2cc4b57 100644 --- a/pip/pip-376.md +++ b/pip/pip-376.md @@ -64,6 +64,27 @@ There is also a sixth method, `getTopicPoliciesIfExists`, which tries to get loc TopicPolicies getTopicPoliciesIfExists(TopicName topicName); ``` +However, this method is called just because there was no `getTopicPoliciesAsync` methods before and `getTopicPolicies` is hard to use. For example, here is an example code snippet in `PersistentTopicsBase#internalUpdatePartitionedTopicAsync`: + +```java +TopicPolicies topicPolicies = + pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName); +if (topicPolicies != null && topicPolicies.getReplicationClusters() != null) { + replicationClusters = topicPolicies.getReplicationClustersSet(); +} +``` + +With the new `getTopicPoliciesAsync` methods, this code can be replaced with: + +```java +pulsarService.getTopicPoliciesService().getTopicPoliciesAsync(topicName, GetType.LOCAL_ONLY) + .thenAccept(topicPolicies -> { + if (topicPolicies.isPresent() && topicPolicies.get().getReplicationClusters() != null) { + replicationClusters = topicPolicies.get().getReplicationClustersSet(); + } + }); +``` + ## Motivation Make `TopicPoliciesService` pluggable so users can customize the topic policies service via another backend metadata store. @@ -92,7 +113,7 @@ Add a `topicPoliciesServiceClassName` configuration to specify the topic policie CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type); ``` - `getTopicPoliciesAsyncWithRetry` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_ONLY)` or `getTopicPoliciesAsync(topicName, GLOBAL_ONLY)`. The other two original `getTopicPoliciesAsync` methods will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_FIRST)`. + `getTopicPoliciesAsyncWithRetry` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_ONLY)` or `getTopicPoliciesAsync(topicName, GLOBAL_ONLY)`. The other two original `getTopicPoliciesAsync` methods and `getTopicPoliciesIfExists` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_FIRST)`. 2. Move `addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` to private methods of `SystemTopicBasedTopicPoliciesService`. From 8c64caadde1b49cf1f9ee40ace63ea1fdbb0f6a1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Sep 2024 18:25:27 +0800 Subject: [PATCH 5/9] Add the full interface --- pip/pip-376.md | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/pip/pip-376.md b/pip/pip-376.md index 2e6d9e2cc4b57..876e3c7cf9662 100644 --- a/pip/pip-376.md +++ b/pip/pip-376.md @@ -127,6 +127,73 @@ Add a `topicPoliciesServiceClassName` configuration to specify the topic policie Since the topic policies service is now decoupled from system topics, remove all `isSystemTopicAndTopicLevelPoliciesEnabled()` calls. +Here is the refactored `TopicPoliciesService` interface: + +```java +@InterfaceStability.Evolving +public interface TopicPoliciesService extends AutoCloseable { + + /** + * Delete policies for a topic async. + * + * @param topicName topic name + */ + CompletableFuture deleteTopicPoliciesAsync(TopicName topicName); + + /** + * Update policies for a topic async. + * + * @param topicName topic name + * @param policies policies for the topic name + */ + CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies); + + /** + * It controls the behavior of {@link TopicPoliciesService#getTopicPoliciesAsync}. + */ + enum GetType { + LOCAL_FIRST, // try getting the local topic policies, if not present, then get the global policies + GLOBAL_ONLY, // only get the global policies + LOCAL_ONLY, // only get the local policies + } + + /** + * Retrieve the topic policies. + */ + CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type); + + /** + * Start the topic policy service. + */ + default void start(PulsarService pulsar) { + } + + /** + * Close the resources if necessary. + */ + default void close() throws Exception { + } + + /** + * Registers a listener for topic policies updates. + * + *

+ * The listener will receive the latest topic policies when they are updated. If the policies are removed, the listener will receive a null value. + * Note that not every update is guaranteed to trigger the listener. For instance, if the policies change from A -> B -> null -> C in quick succession, + * only the final state (C) is guaranteed to be received by the listener. + * In summary, the listener is guaranteed to receive only the latest value. + *

+ * + * @return true if the listener is registered successfully + */ + boolean registerListener(TopicName topicName, TopicPolicyListener listener); + + /** + * Unregister the topic policies listener. + */ + void unregisterListener(TopicName topicName, TopicPolicyListener listener); +``` + ### Configuration Add a new configuration `topicLevelPoliciesEnabled`. From bca4f5871933c834206d69933cbbf68f4aa5d632 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 4 Sep 2024 18:42:52 +0800 Subject: [PATCH 6/9] Change LOCAL_FIRST to DEFAULT --- pip/pip-376.md | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/pip/pip-376.md b/pip/pip-376.md index 876e3c7cf9662..9fa5f8252b673 100644 --- a/pip/pip-376.md +++ b/pip/pip-376.md @@ -106,14 +106,14 @@ Add a `topicPoliciesServiceClassName` configuration to specify the topic policie 1. Add a unified method to get topic policies: ```java enum GetType { - LOCAL_FIRST, // Try getting the local topic policies, if not present, then get the global policies - GLOBAL_ONLY, // Only get the global policies - LOCAL_ONLY, // Only get the local policies + DEFAULT, // try getting the local topic policies, if not present, then get the global policies + GLOBAL_ONLY, // only get the global policies + LOCAL_ONLY, // only get the local policies } CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type); ``` - `getTopicPoliciesAsyncWithRetry` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_ONLY)` or `getTopicPoliciesAsync(topicName, GLOBAL_ONLY)`. The other two original `getTopicPoliciesAsync` methods and `getTopicPoliciesIfExists` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_FIRST)`. + `getTopicPoliciesAsyncWithRetry` will be replaced by `getTopicPoliciesAsync(topicName, LOCAL_ONLY)` or `getTopicPoliciesAsync(topicName, GLOBAL_ONLY)`. The other two original `getTopicPoliciesAsync` methods and `getTopicPoliciesIfExists` will be replaced by `getTopicPoliciesAsync(topicName, DEFAULT)`. 2. Move `addOwnedNamespaceBundleAsync` and `removeOwnedNamespaceBundleAsync` to private methods of `SystemTopicBasedTopicPoliciesService`. @@ -130,18 +130,15 @@ Since the topic policies service is now decoupled from system topics, remove all Here is the refactored `TopicPoliciesService` interface: ```java -@InterfaceStability.Evolving -public interface TopicPoliciesService extends AutoCloseable { - /** - * Delete policies for a topic async. + * Delete policies for a topic asynchronously. * * @param topicName topic name */ CompletableFuture deleteTopicPoliciesAsync(TopicName topicName); /** - * Update policies for a topic async. + * Update policies for a topic asynchronously. * * @param topicName topic name * @param policies policies for the topic name @@ -152,7 +149,7 @@ public interface TopicPoliciesService extends AutoCloseable { * It controls the behavior of {@link TopicPoliciesService#getTopicPoliciesAsync}. */ enum GetType { - LOCAL_FIRST, // try getting the local topic policies, if not present, then get the global policies + DEFAULT, // try getting the local topic policies, if not present, then get the global policies GLOBAL_ONLY, // only get the global policies LOCAL_ONLY, // only get the local policies } @@ -178,9 +175,10 @@ public interface TopicPoliciesService extends AutoCloseable { * Registers a listener for topic policies updates. * *

- * The listener will receive the latest topic policies when they are updated. If the policies are removed, the listener will receive a null value. - * Note that not every update is guaranteed to trigger the listener. For instance, if the policies change from A -> B -> null -> C in quick succession, - * only the final state (C) is guaranteed to be received by the listener. + * The listener will receive the latest topic policies when they are updated. If the policies are removed, the + * listener will receive a null value. Note that not every update is guaranteed to trigger the listener. For + * instance, if the policies change from A -> B -> null -> C in quick succession, only the final state (C) is + * guaranteed to be received by the listener. * In summary, the listener is guaranteed to receive only the latest value. *

* @@ -194,6 +192,14 @@ public interface TopicPoliciesService extends AutoCloseable { void unregisterListener(TopicName topicName, TopicPolicyListener listener); ``` +```java +@InterfaceStability.Stable +public interface TopicPolicyListener { + + void onUpdate(TopicPolicies data); +} +``` + ### Configuration Add a new configuration `topicLevelPoliciesEnabled`. From b3e471a9ab68eb055fd915bcdf8ede1a217833f9 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 6 Sep 2024 17:17:24 +0800 Subject: [PATCH 7/9] Update pip/pip-376.md Co-authored-by: Zike Yang --- pip/pip-376.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-376.md b/pip/pip-376.md index 9fa5f8252b673..34ebdd76abccb 100644 --- a/pip/pip-376.md +++ b/pip/pip-376.md @@ -202,7 +202,7 @@ public interface TopicPolicyListener { ### Configuration -Add a new configuration `topicLevelPoliciesEnabled`. +Add a new configuration `topicPoliciesServiceClassName `. ## Backward & Forward Compatibility From 4d7498d75080f7f6ce9dceaae05bda097e3e4931 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 11 Sep 2024 11:08:17 +0800 Subject: [PATCH 8/9] Add voting thread link --- pip/pip-376.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pip/pip-376.md b/pip/pip-376.md index 34ebdd76abccb..0659de812af3d 100644 --- a/pip/pip-376.md +++ b/pip/pip-376.md @@ -202,7 +202,7 @@ public interface TopicPolicyListener { ### Configuration -Add a new configuration `topicPoliciesServiceClassName `. +Add a new configuration `topicPoliciesServiceClassName`. ## Backward & Forward Compatibility @@ -219,4 +219,4 @@ The current interface is poorly designed because it has only one implementation. ## Links * Mailing List discussion thread: https://lists.apache.org/thread/gf6h4n5n1z4n8v6bxdthct1n07onfdxt -* Mailing List voting thread: +* Mailing List voting thread: https://lists.apache.org/thread/potjbkb4w8brcwscgdwzlxnowgdf11gd From 179b7ced0370657a270694f76d887308b10c7eec Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 11 Sep 2024 11:08:29 +0800 Subject: [PATCH 9/9] Rename it --- pip/{pip-376.md => pip-376-Topic-Policies-Service-Pluggable.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pip/{pip-376.md => pip-376-Topic-Policies-Service-Pluggable.md} (100%) diff --git a/pip/pip-376.md b/pip/pip-376-Topic-Policies-Service-Pluggable.md similarity index 100% rename from pip/pip-376.md rename to pip/pip-376-Topic-Policies-Service-Pluggable.md