From 2d79fd641e4ce6a8bfae72cba9cdddb5f4d38ef2 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Fri, 28 Jun 2019 12:02:00 -0700 Subject: [PATCH] adding Intercept Interface (#10) * adding Intercept Interface * adding intercept classes * addressing comments * updating impl * adding * improving * adding implementation * fixes * improving impl * removing impl Refactor intercept interface (#12) * refactoring intercept interface * cleaning up * fix bug --- .../mledger/ManagedLedgerConfig.java | 8 + .../mledger/impl/ManagedLedgerImpl.java | 2 +- .../bookkeeper/mledger/impl/MetaStore.java | 8 +- .../mledger/impl/MetaStoreImplZookeeper.java | 14 +- pulsar-broker-common/pom.xml | 6 + .../pulsar/broker/ServiceConfiguration.java | 3 + .../intercept/FunctionsInterceptProvider.java | 23 + .../intercept/FunctionsInterceptService.java | 34 + .../broker/intercept/InterceptException.java | 36 + .../broker/intercept/InterceptProvider.java | 59 ++ .../broker/intercept/InterceptService.java | 95 +++ .../NamespacesInterceptProvider.java | 142 ++++ .../intercept/NamespacesInterceptService.java | 265 ++++++++ .../intercept/SinksInterceptProvider.java | 21 + .../intercept/SinksInterceptService.java | 32 + .../intercept/SourcesInterceptProvider.java | 21 + .../intercept/SourcesInterceptService.java | 32 + .../intercept/TenantsInterceptProvider.java | 14 + .../intercept/TenantsInterceptService.java | 23 + .../intercept/TopicInterceptService.java | 43 ++ .../intercept/TopicsInterceptProvider.java | 30 + .../apache/pulsar/broker/PulsarService.java | 11 +- .../broker/admin/impl/NamespacesBase.java | 613 +++++++++++++++++- .../admin/impl/PersistentTopicsBase.java | 42 +- .../pulsar/broker/admin/impl/TenantsBase.java | 11 + .../broker/admin/v2/NonPersistentTopics.java | 13 + .../pulsar/broker/service/BrokerService.java | 36 + .../worker/PulsarWorkerAssignmentTest.java | 2 +- .../pulsar/common/functions/Resources.java | 8 +- .../pulsar/functions/worker/Worker.java | 23 +- .../functions/worker/WorkerService.java | 7 +- .../worker/rest/api/ComponentImpl.java | 2 +- .../worker/rest/api/FunctionsImpl.java | 45 ++ .../functions/worker/rest/api/SinksImpl.java | 65 +- .../worker/rest/api/SourcesImpl.java | 65 +- .../api/v2/FunctionApiV2ResourceTest.java | 3 + .../api/v3/FunctionApiV3ResourceTest.java | 3 + .../rest/api/v3/SinkApiV3ResourceTest.java | 3 + .../rest/api/v3/SourceApiV3ResourceTest.java | 3 + 39 files changed, 1804 insertions(+), 62 deletions(-) create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/FunctionsInterceptProvider.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/FunctionsInterceptService.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptException.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptProvider.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptService.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/NamespacesInterceptProvider.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/NamespacesInterceptService.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SinksInterceptProvider.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SinksInterceptService.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SourcesInterceptProvider.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SourcesInterceptService.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TenantsInterceptProvider.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TenantsInterceptService.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TopicInterceptService.java create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TopicsInterceptProvider.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 4af66ebb88385..a5cdfaf53d2e3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -26,11 +26,16 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; +import org.apache.pulsar.common.naming.TopicName; + /** * Configuration class for a ManagedLedger. */ @@ -68,6 +73,9 @@ public class ManagedLedgerConfig { private Map bookKeeperEnsemblePlacementPolicyProperties; private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE; private Clock clock = Clock.systemUTC(); + @Getter + @Setter + private Runnable createFunctionInterceptFunc; public boolean isCreateIfMissing() { return createIfMissing; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 3a803864f5ecd..eeba0f1d7b88e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -338,7 +338,7 @@ public void operationFailed(MetaStoreException e) { callback.initializeFailed(new ManagedLedgerException(e)); } } - }); + }, config.getCreateFunctionInterceptFunc()); scheduleTimeoutTask(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index 72ef566a9cc7d..a824aa3aa09be 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -19,9 +19,12 @@ package org.apache.bookkeeper.mledger.impl; import java.util.List; +import java.util.function.Function; + import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; +import org.apache.pulsar.common.naming.TopicName; /** * Interface that describes the operations that the ManagedLedger need to do on the metadata store. @@ -56,8 +59,11 @@ interface MetaStoreCallback { * whether the managed ledger metadata should be created if it doesn't exist already * @throws MetaStoreException */ - void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback callback); + void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback callback, Runnable createTopicIntercept); + default void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback callback) { + getManagedLedgerInfo(ledgerName, createIfMissing, callback, null); + } /** * * @param ledgerName diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 2e69614f65000..1927ded862239 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.function.Consumer; +import java.util.function.Function; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -38,6 +39,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; +import org.apache.pulsar.common.naming.TopicName; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -128,7 +130,7 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) { @Override public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissing, - final MetaStoreCallback callback) { + final MetaStoreCallback callback, Runnable createTopicIntercept) { // Try to get the content or create an empty node zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> { @@ -143,6 +145,16 @@ public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissin } else if (rc == Code.NONODE.intValue()) { // Z-node doesn't exist if (createIfMissing) { + // intercept + if (createTopicIntercept != null) { + try { + createTopicIntercept.run(); + } catch (Exception e) { + callback.operationFailed(new MetaStoreException(e)); + return; + } + } + log.info("Creating '{}{}'", prefix, ledgerName); StringCallback createcb = (rc1, path1, ctx1, name) -> { diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 3809c815ac746..c4910f998c166 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -45,6 +45,12 @@ ${project.version} + + org.apache.pulsar + pulsar-client-admin-original + ${project.version} + + com.google.guava guava diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 79d4da53bd7d8..616f75c9ccff5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -170,6 +170,9 @@ public class ServiceConfiguration implements PulsarConfiguration { + " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.") private long delayedDeliveryTickTimeMillis = 1000; + @FieldContext(category = CATEGORY_SERVER, doc = "The class name of Intercept provider") + private String interceptProvider; + @FieldContext( category = CATEGORY_WEBSOCKET, doc = "Enable the WebSocket API service in broker" diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/FunctionsInterceptProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/FunctionsInterceptProvider.java new file mode 100644 index 0000000000000..6483e86c8a391 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/FunctionsInterceptProvider.java @@ -0,0 +1,23 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.functions.FunctionConfig; +import org.apache.pulsar.common.io.SinkConfig; +import org.apache.pulsar.common.io.SourceConfig; + +public interface FunctionsInterceptProvider { + /** + * Intercept call for create function + * + * @param functionConfig function config of the function to be created + * @param clientRole the role used to create function + */ + default void createFunction(FunctionConfig functionConfig, String clientRole) throws InterceptException {} + + /** + * Intercept call for update function + * @param functionConfig function config of the function to be updated + * @param existingFunctionConfig + * @param clientRole the role used to update function + */ + default void updateFunction(FunctionConfig functionConfig, FunctionConfig existingFunctionConfig, String clientRole) throws InterceptException {} +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/FunctionsInterceptService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/FunctionsInterceptService.java new file mode 100644 index 0000000000000..84e96d82ee04c --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/FunctionsInterceptService.java @@ -0,0 +1,34 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.functions.FunctionConfig; + +public class FunctionsInterceptService { + + private final FunctionsInterceptProvider provider; + + public FunctionsInterceptService(FunctionsInterceptProvider functionsInterceptProvider) { + this.provider = functionsInterceptProvider; + } + + /** + * Intercept call for create function + * + * @param functionConfig function config of the function to be created + * @param clientRole the role used to create function + */ + public void createFunction(FunctionConfig functionConfig, String clientRole) throws InterceptException { + provider.createFunction(functionConfig, clientRole); + } + + /** + * Intercept call for update source + * + * @param updates updates to this function's function config + * @param existingFunctionConfig the existing function config + * @param clientRole the role used to update function + */ + public void updateFunction(FunctionConfig updates, FunctionConfig existingFunctionConfig, String clientRole) throws InterceptException { + provider.updateFunction(updates, existingFunctionConfig, clientRole); + } + +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptException.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptException.java new file mode 100644 index 0000000000000..c9e6d15a9053d --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import lombok.Getter; +import lombok.NoArgsConstructor; + +import java.util.Optional; + +@Getter +@NoArgsConstructor +public class InterceptException extends Exception { + + private Optional errorCode = Optional.empty(); + + public InterceptException(Integer errorCode, String errorMessage) { + super(errorMessage); + this.errorCode = Optional.of(errorCode); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptProvider.java new file mode 100644 index 0000000000000..c3d4649967904 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptProvider.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; + +/** + * This class provides a mechanism to intercept various API calls + */ +public interface InterceptProvider { + + default TenantsInterceptProvider getTenantInterceptProvider() { + return new TenantsInterceptProvider() {}; + } + + default NamespacesInterceptProvider getNamespaceInterceptProvider() { + return new NamespacesInterceptProvider() {}; + } + + default TopicsInterceptProvider getTopicInterceptProvider() { + return new TopicsInterceptProvider() {}; + } + + default FunctionsInterceptProvider getFunctionsInterceptProvider() { + return new FunctionsInterceptProvider() {}; + } + + default SourcesInterceptProvider getSourcesInterceptProvider() { + return new SourcesInterceptProvider() {}; + } + + default SinksInterceptProvider getSinksInterceptProvider() { + return new SinksInterceptProvider() {}; + } + + /** + * Perform initialization for the intercept provider + * + * @param conf broker config object + */ + default void initialize(ServiceConfiguration conf, PulsarAdmin pulsarAdmin) throws InterceptException {} +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptService.java new file mode 100644 index 0000000000000..82c659f35f55f --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/InterceptService.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import com.google.common.annotations.Beta; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Service that manages intercepting API calls to a pulsar cluster + */ +@Beta +public class InterceptService { + private static final Logger log = LoggerFactory.getLogger(InterceptService.class); + + private InterceptProvider provider; + private final ServiceConfiguration conf; + private final TenantsInterceptService tenantInterceptService; + private final NamespacesInterceptService namespaceInterceptService; + private final TopicInterceptService topicInterceptService; + private final FunctionsInterceptService functionInterceptService; + private final SinksInterceptService sinkInterceptService; + private final SourcesInterceptService sourceInterceptService; + + public InterceptService(ServiceConfiguration conf, PulsarAdmin pulsarAdmin) + throws PulsarServerException { + this.conf = conf; + + try { + final String providerClassname = conf.getInterceptProvider(); + if (StringUtils.isNotBlank(providerClassname)) { + provider = (InterceptProvider) Class.forName(providerClassname).newInstance(); + provider.initialize(conf, pulsarAdmin); + log.info("Interceptor {} has been loaded.", providerClassname); + } else { + provider = new InterceptProvider() {}; + } + + tenantInterceptService = new TenantsInterceptService(provider.getTenantInterceptProvider()); + namespaceInterceptService = new NamespacesInterceptService(provider.getNamespaceInterceptProvider()); + topicInterceptService = new TopicInterceptService(provider.getTopicInterceptProvider()); + functionInterceptService = new FunctionsInterceptService(provider.getFunctionsInterceptProvider()); + sourceInterceptService = new SourcesInterceptService(provider.getSourcesInterceptProvider()); + sinkInterceptService = new SinksInterceptService(provider.getSinksInterceptProvider()); + + } catch (Throwable e) { + throw new PulsarServerException("Failed to load an intercept provider.", e); + } + } + + public TenantsInterceptService tenants() { + return tenantInterceptService; + } + + public NamespacesInterceptService namespaces() { + return namespaceInterceptService; + } + + public TopicInterceptService topics() { + return topicInterceptService; + } + + public FunctionsInterceptService functions() { + return functionInterceptService; + } + + public SourcesInterceptService sources() { + return sourceInterceptService; + } + + public SinksInterceptService sinks() { + return sinkInterceptService; + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/NamespacesInterceptProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/NamespacesInterceptProvider.java new file mode 100644 index 0000000000000..b8f15caff2acb --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/NamespacesInterceptProvider.java @@ -0,0 +1,142 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; + +import java.util.List; +import java.util.Set; + +public interface NamespacesInterceptProvider { + /** + * Intercept call for creating namespace + * + * @param namespaceName the namespace name + * @param policies polices for this namespace + * @param clientRole the role used to create namespace + */ + default void createNamespace(NamespaceName namespaceName, Policies policies, String clientRole) throws InterceptException {} + + default void deleteNamespace(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void deleteNamespaceBundle(NamespaceName namespaceName, String bundleRange, String clientRole) throws InterceptException {} + + default void setNamespaceMessageTTL(NamespaceName namespaceName, int messageTTL, String clientRole) throws InterceptException {} + + default void grantPermissionOnNamespace(NamespaceName namespaceName, String role, Set actions, String clientRole) throws InterceptException {} + + default void grantPermissionOnSubscription(NamespaceName namespaceName, String subscription, Set roles, String clientRole) throws InterceptException {} + + default void revokePermissionsOnNamespace(NamespaceName namespaceName, String role, String clientRole) throws InterceptException {} + + default void revokePermissionsOnSubscription(NamespaceName namespaceName, String subscriptionName, String role, String clientRole) throws InterceptException {} + + default void getNamespaceReplicationClusters(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setNamespaceReplicationClusters(NamespaceName namespaceName, List clusterIds, String clientRole) throws InterceptException {} + + default void modifyDeduplication(NamespaceName namespaceName, boolean enableDeduplication, String clientRole) {} + + default void getTenantNamespaces(NamespaceName namespaceName, String tenant, String clientRole) throws InterceptException {} + + default void unloadNamespace(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setBookieAffinityGroup(NamespaceName namespaceName, BookieAffinityGroupData bookieAffinityGroup, String clientRole) throws InterceptException {} + + default void getBookieAffinityGroup(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void unloadNamespaceBundle(NamespaceName namespaceName, String bundleRange, boolean authoritative, String clientRole) throws InterceptException {} + + default void splitNamespaceBundle(NamespaceName namespaceName, String bundleRange, boolean authoritative, boolean unload, String clientRole) throws InterceptException {} + + default void setTopicDispatchRate(NamespaceName namespaceName, DispatchRate dispatchRate, String clientRole) throws InterceptException {} + + default void getTopicDispatchRate(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setSubscriptionDispatchRate(NamespaceName namespaceName, DispatchRate dispatchRate, String clientRole) throws InterceptException {} + + default void getSubscriptionDispatchRate(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setSubscribeRate(NamespaceName namespaceName, SubscribeRate subscribeRate, String clientRole) throws InterceptException {} + + default void getSubscribeRate(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setReplicatorDispatchRate(NamespaceName namespaceName, DispatchRate dispatchRate, String clientRole) throws InterceptException {} + + default void getReplicatorDispatchRate(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setBacklogQuota(NamespaceName namespaceName, BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota, String clientRole) throws InterceptException {} + + default void removeBacklogQuota(NamespaceName namespaceName, BacklogQuota.BacklogQuotaType backlogQuotaType, String clientRole) throws InterceptException {} + + default void setRetention(NamespaceName namespaceName, RetentionPolicies retention, String clientRole) throws InterceptException {} + + default void setPersistence(NamespaceName namespaceName, PersistencePolicies persistence, String clientRole) throws InterceptException {} + + default void getPersistence(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void clearNamespaceBacklog(NamespaceName namespaceName, boolean authoritative, String clientRole) throws InterceptException {} + + default void clearNamespaceBundleBacklog(NamespaceName namespaceName, String bundleRange, boolean authoritative, String clientRole) throws InterceptException {} + + default void clearNamespaceBacklogForSubscription(NamespaceName namespaceName, String subscription, boolean authoritative, String clientRole) throws InterceptException {} + + default void clearNamespaceBundleBacklogForSubscription(NamespaceName namespaceName, String subscription, String bundleRange, boolean authoritative, String clientRole) throws InterceptException {} + + default void unsubscribeNamespace(NamespaceName namespaceName, String subscription, boolean authoritative, String clientRole) throws InterceptException {} + + default void unsubscribeNamespaceBundle(NamespaceName namespaceName, String subscription, String bundleRange, boolean authoritative, String clientRole) throws InterceptException {} + + default void setSubscriptionAuthMode(NamespaceName namespaceName, SubscriptionAuthMode subscriptionAuthMode, String clientRole) throws InterceptException {} + + default void modifyEncryptionRequired(NamespaceName namespaceName, boolean encryptionRequired, String clientRole) throws InterceptException {} + + default void setNamespaceAntiAffinityGroup(NamespaceName namespaceName, String antiAffinityGroup, String clientRole) throws InterceptException {} + + default void getNamespaceAntiAffinityGroup(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void removeNamespaceAntiAffinityGroup(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void getAntiAffinityNamespaces(NamespaceName namespaceName, String cluster, String antiAffinityGroup, String clientRole) throws InterceptException {} + + default void getMaxProducersPerTopic(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setMaxProducersPerTopic(NamespaceName namespaceName, int maxProducersPerTopic, String clientRole) throws InterceptException {} + + default void getMaxConsumersPerTopic(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setMaxConsumersPerTopic(NamespaceName namespaceName, int maxConsumersPerTopic, String clientRole) throws InterceptException {} + + default void getMaxConsumersPerSubscription(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setMaxConsumersPerSubscription(NamespaceName namespaceName, int maxConsumersPerSubscription, String clientRole) throws InterceptException {} + + default void getCompactionThreshold(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setCompactionThreshold(NamespaceName namespaceName, long newThreshold, String clientRole) throws InterceptException {} + + default void getOffloadThreshold(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setOffloadThreshold(NamespaceName namespaceName, long newThreshold, String clientRole) throws InterceptException {} + + default void getOffloadDeletionLag(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setOffloadDeletionLag(NamespaceName namespaceName, Long newDeletionLagMs, String clientRole) throws InterceptException {} + + default void getSchemaAutoUpdateCompatibilityStrategy(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setSchemaAutoUpdateCompatibilityStrategy(NamespaceName namespaceName, + SchemaAutoUpdateCompatibilityStrategy strategy, String clientRole) throws InterceptException {} + + default void getSchemaValidationEnforced(NamespaceName namespaceName, String clientRole) throws InterceptException {} + + default void setSchemaValidationEnforced(NamespaceName namespaceName, boolean schemaValidationEnforced, String clientRole) throws InterceptException {} +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/NamespacesInterceptService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/NamespacesInterceptService.java new file mode 100644 index 0000000000000..a29edaadeccba --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/NamespacesInterceptService.java @@ -0,0 +1,265 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; +import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; + +import java.util.List; +import java.util.Set; + +public class NamespacesInterceptService { + + private final NamespacesInterceptProvider provider; + + public NamespacesInterceptService(NamespacesInterceptProvider namespaceInterceptProvider) { + this.provider = namespaceInterceptProvider; + } + + /** + * Intercept call for creating namespace + * + * @param namespaceName the namespace name + * @param policies polices for this namespace + * @param clientRole the role used to create namespace + */ + public void createNamespace(NamespaceName namespaceName, Policies policies, String clientRole) throws InterceptException { + provider.createNamespace(namespaceName, policies, clientRole); + } + + public void deleteNamespace(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.deleteNamespace(namespaceName, clientRole); + } + + public void deleteNamespaceBundle(NamespaceName namespaceName, String bundleRange, String clientRole) throws InterceptException { + provider.deleteNamespaceBundle(namespaceName, bundleRange, clientRole); + } + + public void setNamespaceMessageTTL(NamespaceName namespaceName, int messageTTL, String clientRole) throws InterceptException { + provider.setNamespaceMessageTTL(namespaceName, messageTTL, clientRole); + } + + public void grantPermissionOnNamespace(NamespaceName namespaceName, String role, Set actions, String clientRole) throws InterceptException { + provider.grantPermissionOnNamespace(namespaceName, role, actions, clientRole); + } + + public void grantPermissionOnSubscription(NamespaceName namespaceName, String subscription, Set roles, String clientRole) throws InterceptException { + provider.grantPermissionOnSubscription(namespaceName, subscription, roles, clientRole); + } + + public void revokePermissionsOnNamespace(NamespaceName namespaceName, String role, String clientRole) throws InterceptException { + provider.revokePermissionsOnNamespace(namespaceName, role, clientRole); + } + + public void revokePermissionsOnSubscription(NamespaceName namespaceName, String subscriptionName, String role, String clientRole) throws InterceptException { + provider.revokePermissionsOnSubscription(namespaceName, subscriptionName, role, clientRole); + } + + public void getNamespaceReplicationClusters(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getNamespaceReplicationClusters(namespaceName, clientRole); + } + + public void setNamespaceReplicationClusters(NamespaceName namespaceName, List clusterIds, String clientRole) throws InterceptException { + provider.setNamespaceReplicationClusters(namespaceName, clusterIds, clientRole); + } + + public void modifyDeduplication(NamespaceName namespaceName, boolean enableDeduplication, String clientRole) throws InterceptException { + provider.modifyDeduplication(namespaceName, enableDeduplication, clientRole); + } + + public void getTenantNamespaces(NamespaceName namespaceName, String tenant, String clientRole) throws InterceptException { + provider.getTenantNamespaces(namespaceName, tenant, clientRole); + } + + public void unloadNamespace(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.unloadNamespace(namespaceName, clientRole); + } + + public void setBookieAffinityGroup(NamespaceName namespaceName, BookieAffinityGroupData bookieAffinityGroup, String clientRole) throws InterceptException { + provider.setBookieAffinityGroup(namespaceName, bookieAffinityGroup, clientRole); + } + + public void getBookieAffinityGroup(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getBookieAffinityGroup(namespaceName, clientRole); + } + + public void unloadNamespaceBundle(NamespaceName namespaceName, String bundleRange, boolean authoritative, String clientRole) throws InterceptException { + provider.unloadNamespaceBundle(namespaceName, bundleRange, authoritative, clientRole); + } + + public void splitNamespaceBundle(NamespaceName namespaceName, String bundleRange, boolean authoritative, boolean unload, String clientRole) throws InterceptException { + provider.splitNamespaceBundle(namespaceName, bundleRange, authoritative, unload, clientRole); + } + + public void setTopicDispatchRate(NamespaceName namespaceName, DispatchRate dispatchRate, String clientRole) throws InterceptException { + provider.setTopicDispatchRate(namespaceName, dispatchRate, clientRole); + } + + public void getTopicDispatchRate(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getTopicDispatchRate(namespaceName, clientRole); + } + + public void setSubscriptionDispatchRate(NamespaceName namespaceName, DispatchRate dispatchRate, String clientRole) throws InterceptException { + provider.setSubscriptionDispatchRate(namespaceName, dispatchRate, clientRole); + } + + public void getSubscriptionDispatchRate(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getSubscriptionDispatchRate(namespaceName, clientRole); + } + + public void setSubscribeRate(NamespaceName namespaceName, SubscribeRate subscribeRate, String clientRole) throws InterceptException { + provider.setSubscribeRate(namespaceName, subscribeRate, clientRole); + } + + public void getSubscribeRate(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getSubscribeRate(namespaceName, clientRole); + } + + public void setReplicatorDispatchRate(NamespaceName namespaceName, DispatchRate dispatchRate, String clientRole) throws InterceptException { + provider.setReplicatorDispatchRate(namespaceName, dispatchRate, clientRole); + } + + public void getReplicatorDispatchRate(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getReplicatorDispatchRate(namespaceName, clientRole); + } + + public void setBacklogQuota(NamespaceName namespaceName, BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota, String clientRole) throws InterceptException { + provider.setBacklogQuota(namespaceName, backlogQuotaType, backlogQuota, clientRole); + } + + public void removeBacklogQuota(NamespaceName namespaceName, BacklogQuota.BacklogQuotaType backlogQuotaType, String clientRole) throws InterceptException { + provider.removeBacklogQuota(namespaceName, backlogQuotaType, clientRole); + } + + public void setRetention(NamespaceName namespaceName, RetentionPolicies retention, String clientRole) throws InterceptException { + provider.setRetention(namespaceName, retention, clientRole); + } + + public void setPersistence(NamespaceName namespaceName, PersistencePolicies persistence, String clientRole) throws InterceptException { + provider.setPersistence(namespaceName, persistence, clientRole); + } + + public void getPersistence(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getPersistence(namespaceName, clientRole); + } + + public void clearNamespaceBacklog(NamespaceName namespaceName, boolean authoritative, String clientRole) throws InterceptException { + provider.clearNamespaceBacklog(namespaceName, authoritative, clientRole); + } + + public void clearNamespaceBundleBacklog(NamespaceName namespaceName, String bundleRange, boolean authoritative, String clientRole) throws InterceptException { + provider.clearNamespaceBundleBacklog(namespaceName, bundleRange, authoritative, clientRole); + } + + public void clearNamespaceBacklogForSubscription(NamespaceName namespaceName, String subscription, boolean authoritative, String clientRole) throws InterceptException { + provider.clearNamespaceBacklogForSubscription(namespaceName, subscription, authoritative, clientRole); + } + + public void clearNamespaceBundleBacklogForSubscription(NamespaceName namespaceName, String subscription, String bundleRange, boolean authoritative, String clientRole) throws InterceptException { + provider.clearNamespaceBundleBacklogForSubscription(namespaceName, subscription, bundleRange, authoritative, clientRole); + } + + public void unsubscribeNamespace(NamespaceName namespaceName, String subscription, boolean authoritative, String clientRole) throws InterceptException { + provider.unsubscribeNamespace(namespaceName, subscription, authoritative, clientRole); + } + + public void unsubscribeNamespaceBundle(NamespaceName namespaceName, String subscription, String bundleRange, boolean authoritative, String clientRole) throws InterceptException { + provider.unsubscribeNamespaceBundle(namespaceName, subscription, bundleRange, authoritative, clientRole); + } + + public void setSubscriptionAuthMode(NamespaceName namespaceName, SubscriptionAuthMode subscriptionAuthMode, String clientRole) throws InterceptException { + provider.setSubscriptionAuthMode(namespaceName, subscriptionAuthMode, clientRole); + } + + public void modifyEncryptionRequired(NamespaceName namespaceName, boolean encryptionRequired, String clientRole) throws InterceptException { + provider.modifyEncryptionRequired(namespaceName, encryptionRequired, clientRole); + } + + public void setNamespaceAntiAffinityGroup(NamespaceName namespaceName, String antiAffinityGroup, String clientRole) throws InterceptException { + provider.setNamespaceAntiAffinityGroup(namespaceName, antiAffinityGroup, clientRole); + } + + public void getNamespaceAntiAffinityGroup(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getNamespaceAntiAffinityGroup(namespaceName, clientRole); + } + + public void removeNamespaceAntiAffinityGroup(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.removeNamespaceAntiAffinityGroup(namespaceName, clientRole); + } + + public void getAntiAffinityNamespaces(NamespaceName namespaceName, String cluster, String antiAffinityGroup, String clientRole) throws InterceptException { + provider.getAntiAffinityNamespaces(namespaceName, cluster, antiAffinityGroup, clientRole); + } + + public void getMaxProducersPerTopic(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getMaxProducersPerTopic(namespaceName, clientRole); + } + + public void setMaxProducersPerTopic(NamespaceName namespaceName, int maxProducersPerTopic, String clientRole) throws InterceptException { + provider.setMaxProducersPerTopic(namespaceName, maxProducersPerTopic, clientRole); + } + + public void getMaxConsumersPerTopic(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getMaxConsumersPerTopic(namespaceName, clientRole); + } + + public void setMaxConsumersPerTopic(NamespaceName namespaceName, int maxConsumersPerTopic, String clientRole) throws InterceptException { + provider.setMaxConsumersPerTopic(namespaceName, maxConsumersPerTopic, clientRole); + } + + public void getMaxConsumersPerSubscription(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getMaxConsumersPerSubscription(namespaceName, clientRole); + } + + public void setMaxConsumersPerSubscription(NamespaceName namespaceName, int maxConsumersPerSubscription, String clientRole) throws InterceptException { + provider.setMaxConsumersPerSubscription(namespaceName, maxConsumersPerSubscription, clientRole); + } + + public void getCompactionThreshold(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getCompactionThreshold(namespaceName, clientRole); + } + + public void setCompactionThreshold(NamespaceName namespaceName, long newThreshold, String clientRole) throws InterceptException { + provider.setCompactionThreshold(namespaceName, newThreshold, clientRole); + } + + public void getOffloadThreshold(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getOffloadThreshold(namespaceName, clientRole); + } + + public void setOffloadThreshold(NamespaceName namespaceName, long newThreshold, String clientRole) throws InterceptException { + provider.setOffloadThreshold(namespaceName, newThreshold, clientRole); + } + + public void getOffloadDeletionLag(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getOffloadDeletionLag(namespaceName, clientRole); + } + + public void setOffloadDeletionLag(NamespaceName namespaceName, Long newDeletionLagMs, String clientRole) throws InterceptException { + provider.setOffloadDeletionLag(namespaceName, newDeletionLagMs, clientRole); + } + + public void getSchemaAutoUpdateCompatibilityStrategy(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getSchemaAutoUpdateCompatibilityStrategy(namespaceName, clientRole); + } + + public void setSchemaAutoUpdateCompatibilityStrategy(NamespaceName namespaceName, + SchemaAutoUpdateCompatibilityStrategy strategy, String clientRole) throws InterceptException { + provider.setSchemaAutoUpdateCompatibilityStrategy(namespaceName, strategy, clientRole); + } + + public void getSchemaValidationEnforced(NamespaceName namespaceName, String clientRole) throws InterceptException { + provider.getSchemaValidationEnforced(namespaceName, clientRole); + } + + public void setSchemaValidationEnforced(NamespaceName namespaceName, boolean schemaValidationEnforced, String clientRole) throws InterceptException { + provider.setSchemaValidationEnforced(namespaceName, schemaValidationEnforced, clientRole); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SinksInterceptProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SinksInterceptProvider.java new file mode 100644 index 0000000000000..29b6920382a89 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SinksInterceptProvider.java @@ -0,0 +1,21 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.io.SinkConfig; + +public interface SinksInterceptProvider { + /** + * Intercept call for create sink + * + * @param sinkConfig the sink config of the sink to be created + * @param clientRole the role used to create sink + */ + default void createSink(SinkConfig sinkConfig, String clientRole) throws InterceptException {} ; + + /** + * Intercept call for update sink + * @param sinkConfig the sink config of the sink to be updated + * @param existingSinkConfig + * @param clientRole the role used to update sink + */ + default void updateSink(SinkConfig sinkConfig, SinkConfig existingSinkConfig, String clientRole) throws InterceptException {} +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SinksInterceptService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SinksInterceptService.java new file mode 100644 index 0000000000000..25007300e27f2 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SinksInterceptService.java @@ -0,0 +1,32 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.io.SinkConfig; + +public class SinksInterceptService { + + private final SinksInterceptProvider provider; + + public SinksInterceptService(SinksInterceptProvider sinksInterceptProvider) { + this.provider = sinksInterceptProvider; + } + + /** + * Intercept call for create sink + * + * @param sinkConfig the sink config of the sink to be created + * @param clientRole the role used to create sink + */ + public void createSink(SinkConfig sinkConfig, String clientRole) throws InterceptException { + provider.createSink(sinkConfig, clientRole); + } + + /** + * Intercept call for update sink + * @param updates updates to this sink's source config + * @param existingSinkConfig the existing source config + * @param clientRole the role used to update sink + */ + public void updateSink(SinkConfig updates, SinkConfig existingSinkConfig, String clientRole) throws InterceptException { + provider.updateSink(updates, existingSinkConfig, clientRole); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SourcesInterceptProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SourcesInterceptProvider.java new file mode 100644 index 0000000000000..4e68f13df32b2 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SourcesInterceptProvider.java @@ -0,0 +1,21 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.io.SourceConfig; + +public interface SourcesInterceptProvider { + /** + * Intercept call for create source + * + * @param sourceConfig the source config of the source to be created + * @param clientRole the role used to create source + */ + default void createSource(SourceConfig sourceConfig, String clientRole) throws InterceptException {} + + /** + * Intercept call for update source + * @param sourceConfig the source config of the source to be updated + * @param existingSourceConfig + * @param clientRole the role used to update source + */ + default void updateSource(SourceConfig sourceConfig, SourceConfig existingSourceConfig, String clientRole) throws InterceptException {} +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SourcesInterceptService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SourcesInterceptService.java new file mode 100644 index 0000000000000..c1193afd64f7f --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/SourcesInterceptService.java @@ -0,0 +1,32 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.io.SourceConfig; + +public class SourcesInterceptService { + + private SourcesInterceptProvider provider; + + public SourcesInterceptService(SourcesInterceptProvider sourcesInterceptProvider) { + this.provider = sourcesInterceptProvider; + } + + /** + * Intercept call for create source + * + * @param sourceConfig the source config of the source to be created + * @param clientRole the role used to create source + */ + public void createSource(SourceConfig sourceConfig, String clientRole) throws InterceptException { + provider.createSource(sourceConfig, clientRole); + } + + /** + * Intercept call for update source + * @param updates updates to this source's source config + * @param existingSourceConfig the existing source config + * @param clientRole the role used to update source + */ + public void updateSource(SourceConfig updates, SourceConfig existingSourceConfig, String clientRole) throws InterceptException { + provider.updateSource(updates, existingSourceConfig, clientRole); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TenantsInterceptProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TenantsInterceptProvider.java new file mode 100644 index 0000000000000..44e8225f8a8a0 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TenantsInterceptProvider.java @@ -0,0 +1,14 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.policies.data.TenantInfo; + +public interface TenantsInterceptProvider { + /** + * Intercept call for create tenant + * + * @param tenant tenant name + * @param tenantInfo tenant info + * @param clientRole the role used to create tenant + */ + default void createTenant(String tenant, TenantInfo tenantInfo, String clientRole) throws InterceptException {} +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TenantsInterceptService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TenantsInterceptService.java new file mode 100644 index 0000000000000..5ffabb502ae0d --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TenantsInterceptService.java @@ -0,0 +1,23 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.policies.data.TenantInfo; + +public class TenantsInterceptService { + + private final TenantsInterceptProvider provider; + + public TenantsInterceptService(TenantsInterceptProvider tenantInterceptProvider) { + this.provider = tenantInterceptProvider; + } + + /** + * Intercept call for create tenant + * + * @param tenant tenant name + * @param tenantInfo tenant info + * @param clientRole the role used to create tenant + */ + public void createTenant(String tenant, TenantInfo tenantInfo, String clientRole) throws InterceptException { + provider.createTenant(tenant, tenantInfo, clientRole); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TopicInterceptService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TopicInterceptService.java new file mode 100644 index 0000000000000..361fd488ede46 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TopicInterceptService.java @@ -0,0 +1,43 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; + +public class TopicInterceptService { + + private final TopicsInterceptProvider provider; + + public TopicInterceptService(TopicsInterceptProvider topicInterceptProvider) { + this.provider = topicInterceptProvider; + } + + /* + * Intercept create partitioned topic + * @param topicName the topic name + * @param partitionedTopicMetadata metadata related to the partioned topic + * @param clientRole the role used to create partitioned topic + */ + public void createPartitionedTopic(TopicName topicName, PartitionedTopicMetadata partitionedTopicMetadata, String clientRole) throws InterceptException { + provider.createPartitionedTopic(topicName, partitionedTopicMetadata, clientRole); + } + + /** + * Intercept call for create topic + * + * @param topicName the topic name + * @param clientRole the role used to create topic + */ + public void createTopic(TopicName topicName, String clientRole) throws InterceptException { + provider.createTopic(topicName, clientRole); + } + + /** + * Intercept update partitioned topic + * @param topicName the topic name + * @param partitionedTopicMetadata metadata related to the partioned topic + * @param clientRole the role used to update partitioned topic + */ + public void updatePartitionedTopic(TopicName topicName, PartitionedTopicMetadata partitionedTopicMetadata, String clientRole) throws InterceptException { + provider.updatePartitionedTopic(topicName, partitionedTopicMetadata, clientRole); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TopicsInterceptProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TopicsInterceptProvider.java new file mode 100644 index 0000000000000..b30b1540dff65 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/intercept/TopicsInterceptProvider.java @@ -0,0 +1,30 @@ +package org.apache.pulsar.broker.intercept; + +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; + +public interface TopicsInterceptProvider { + /** + * Intercept call for create topic + * + * @param topicName the topic name + * @param clientRole the role used to create topic + */ + default void createTopic(TopicName topicName, String clientRole) throws InterceptException {} + + /** + * Intercept create partitioned topic + * @param topicName the topic name + * @param numPartitions number of partitions to create for this partitioned topic + * @param clientRole the role used to create partitioned topic + */ + default void createPartitionedTopic(TopicName topicName, PartitionedTopicMetadata numPartitions, String clientRole) throws InterceptException {} + + /** + * Intercept update partitioned topic + * @param topicName the topic name + * @param numPartitions number of partitions to update to + * @param clientRole the role used to update partitioned topic + */ + default void updatePartitionedTopic(TopicName topicName, PartitionedTopicMetadata numPartitions, String clientRole) throws InterceptException {} +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index fd169fe1d1d59..f8e7e4027d7d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -71,6 +71,7 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; +import org.apache.pulsar.broker.intercept.InterceptService; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.LeaderElectionService.LeaderListener; import org.apache.pulsar.broker.loadbalance.LoadManager; @@ -502,7 +503,9 @@ public Boolean get() { acquireSLANamespace(); // start function worker service if necessary - this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService()); + this.startWorkerService(brokerService.getAuthenticationService(), + brokerService.getAuthorizationService(), + brokerService.getInterceptService()); final String bootstrapMessage = "bootstrap service " + (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "") @@ -999,7 +1002,8 @@ public String getSafeBrokerServiceUrl() { } private void startWorkerService(AuthenticationService authenticationService, - AuthorizationService authorizationService) + AuthorizationService authorizationService, + InterceptService interceptService) throws InterruptedException, IOException, KeeperException { if (functionWorkerService.isPresent()) { LOG.info("Starting function worker service"); @@ -1105,7 +1109,8 @@ private void startWorkerService(AuthenticationService authenticationService, throw ioe; } LOG.info("Function worker service setup completed"); - functionWorkerService.get().start(dlogURI, authenticationService, authorizationService); + functionWorkerService.get().start(dlogURI, authenticationService, authorizationService, + interceptService); LOG.info("Function worker service started"); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index adc7bd4d502b6..3ea6f0ba71b41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -55,6 +55,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.intercept.InterceptException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -99,6 +100,17 @@ public abstract class NamespacesBase extends AdminResource { protected List internalGetTenantNamespaces(String tenant) { validateAdminAccessForTenant(tenant); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getTenantNamespaces(namespaceName, tenant, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { return getListOfNamespaces(tenant); } catch (KeeperException.NoNodeException e) { @@ -115,6 +127,16 @@ protected void internalCreateNamespace(Policies policies) { validateAdminAccessForTenant(namespaceName.getTenant()); validatePolicies(namespaceName, policies); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .createNamespace(namespaceName, policies, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } try { policiesCache().invalidate(path(POLICIES, namespaceName.toString())); @@ -206,6 +228,17 @@ && getPartitionedTopicList(TopicDomain.persistent).isEmpty() return; } + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .deleteNamespace(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + // set the policies to deleted so that somebody else cannot acquire this namespace try { policies.deleted = true; @@ -319,6 +352,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + try { List topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); for (String topic : topics) { @@ -329,10 +363,19 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori } } + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .deleteNamespaceBundle(namespaceName, bundleRange, clientAppId()); + // remove from owned namespace map and ephemeral node from ZK pulsar().getNamespaceService().removeOwnedServiceUnit(bundle); } catch (WebApplicationException wae) { throw wae; + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(), bundleRange, e); @@ -346,6 +389,11 @@ protected void internalGrantPermissionOnNamespace(String role, Set a try { AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService(); if (null != authService) { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .grantPermissionOnNamespace(namespaceName, role, actions, clientAppId()); + authService.grantPermissionAsync(namespaceName, actions, role, null/*additional auth-data json*/) .get(); } else { @@ -354,6 +402,10 @@ protected void internalGrantPermissionOnNamespace(String role, Set a } catch (InterruptedException e) { log.error("[{}] Failed to get permissions for namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (ExecutionException e) { if (e.getCause() instanceof IllegalArgumentException) { log.warn("[{}] Failed to set permissions for namespace {}: does not exist", clientAppId(), @@ -378,6 +430,11 @@ protected void internalGrantPermissionOnSubscription(String subscription, Set internalGetNamespaceReplicationClusters() { "Cannot get the replication clusters for a non-global namespace"); } + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getNamespaceReplicationClusters(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + Policies policies = getNamespacePolicies(namespaceName); return policies.replication_clusters; } @@ -489,6 +581,11 @@ protected void internalSetNamespaceReplicationClusters(List clusterIds) () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); policiesNode.getKey().replication_clusters = replicationClusterSet; + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setNamespaceReplicationClusters(namespaceName, clusterIds, clientAppId()); + // Write back the new policies into zookeeper globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion()); @@ -506,6 +603,10 @@ protected void internalSetNamespaceReplicationClusters(List clusterIds) clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update the replication clusters on namespace {}", clientAppId(), namespaceName, e); @@ -527,6 +628,12 @@ protected void internalSetNamespaceMessageTTL(int messageTTL) { // Force to read the data s.t. the watch to the cache content is setup. policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setNamespaceMessageTTL(namespaceName, messageTTL, clientAppId()); + policiesNode.getKey().message_ttl_in_seconds = messageTTL; // Write back the new policies into zookeeper @@ -545,6 +652,10 @@ protected void internalSetNamespaceMessageTTL(int messageTTL) { clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update the message TTL on namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -563,6 +674,11 @@ protected void internalModifyDeduplication(boolean enableDeduplication) { () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); policiesNode.getKey().deduplicationEnabled = enableDeduplication; + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .modifyDeduplication(namespaceName, enableDeduplication, clientAppId()); + // Write back the new policies into zookeeper globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion()); @@ -580,6 +696,10 @@ protected void internalModifyDeduplication(boolean enableDeduplication) { clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to modify deplication status on namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -604,6 +724,18 @@ protected void internalUnloadNamespace(AsyncResponse asyncResponse) { final List> futures = Lists.newArrayList(); List boundaries = policies.bundles.getBoundaries(); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .unloadNamespace(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + for (int i = 0; i < boundaries.size() - 1; i++) { String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1)); try { @@ -648,6 +780,17 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); } + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setBookieAffinityGroup(namespaceName, bookieAffinityGroup, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString()); Stat nodeStat = new Stat(); @@ -702,6 +845,17 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() { validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); } + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getBookieAffinityGroup(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString()); try { Optional policies = pulsar().getLocalZkCacheService().policiesCache().get(path); @@ -763,6 +917,18 @@ public void internalUnloadNamespaceBundle(String bundleRange, boolean authoritat NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .unloadNamespaceBundle(namespaceName, bundleRange, authoritative, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle); log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), nsBundle.toString()); @@ -791,6 +957,17 @@ protected void internalSplitNamespaceBundle(String bundleRange, boolean authorit NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .splitNamespaceBundle(namespaceName, bundleRange, authoritative, unload, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get(); log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString()); @@ -865,6 +1042,12 @@ protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) { // Force to read the data s.t. the watch to the cache content is setup. policiesNode = policiesCache().getWithStat(path).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setTopicDispatchRate(namespaceName, dispatchRate, clientAppId()); + policiesNode.getKey().topicDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); // Write back the new policies into zookeeper @@ -884,6 +1067,10 @@ protected void internalSetTopicDispatchRate(DispatchRate dispatchRate) { clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update the dispatchRate for cluster on namespace {}", clientAppId(), namespaceName, e); @@ -895,6 +1082,18 @@ protected DispatchRate internalGetTopicDispatchRate() { validateAdminAccessForTenant(namespaceName.getTenant()); Policies policies = getNamespacePolicies(namespaceName); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getTopicDispatchRate(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + DispatchRate dispatchRate = policies.topicDispatchRate.get(pulsar().getConfiguration().getClusterName()); if (dispatchRate != null) { return dispatchRate; @@ -915,6 +1114,12 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) { // Force to read the data s.t. the watch to the cache content is setup. policiesNode = policiesCache().getWithStat(path).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setSubscriptionDispatchRate(namespaceName, dispatchRate, clientAppId()); + policiesNode.getKey().subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); // Write back the new policies into zookeeper @@ -934,6 +1139,10 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) { clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}", clientAppId(), namespaceName, e); @@ -945,6 +1154,18 @@ protected DispatchRate internalGetSubscriptionDispatchRate() { validateAdminAccessForTenant(namespaceName.getTenant()); Policies policies = getNamespacePolicies(namespaceName); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getSubscriptionDispatchRate(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + DispatchRate dispatchRate = policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName()); if (dispatchRate != null) { return dispatchRate; @@ -965,6 +1186,12 @@ protected void internalSetSubscribeRate(SubscribeRate subscribeRate) { // Force to read the data s.t. the watch to the cache content is setup. policiesNode = policiesCache().getWithStat(path).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setSubscribeRate(namespaceName, subscribeRate, clientAppId()); + policiesNode.getKey().clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate); // Write back the new policies into zookeeper @@ -984,6 +1211,10 @@ protected void internalSetSubscribeRate(SubscribeRate subscribeRate) { clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}", clientAppId(), namespaceName, e); @@ -994,6 +1225,18 @@ protected void internalSetSubscribeRate(SubscribeRate subscribeRate) { protected SubscribeRate internalGetSubscribeRate() { validateAdminAccessForTenant(namespaceName.getTenant()); Policies policies = getNamespacePolicies(namespaceName); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getSubscribeRate(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + SubscribeRate subscribeRate = policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName()); if (subscribeRate != null) { return subscribeRate; @@ -1014,6 +1257,12 @@ protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) { // Force to read the data s.t. the watch to the cache content is setup. policiesNode = policiesCache().getWithStat(path).orElseThrow( () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setReplicatorDispatchRate(namespaceName, dispatchRate, clientAppId()); + policiesNode.getKey().replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); // Write back the new policies into zookeeper @@ -1033,6 +1282,10 @@ protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) { clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(), namespaceName, e); @@ -1044,6 +1297,18 @@ protected DispatchRate internalGetReplicatorDispatchRate() { validateAdminAccessForTenant(namespaceName.getTenant()); Policies policies = getNamespacePolicies(namespaceName); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getReplicatorDispatchRate(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + DispatchRate dispatchRate = policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName()); if (dispatchRate != null) { return dispatchRate; @@ -1079,6 +1344,12 @@ protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, Backlo } } policies.backlog_quota_map.put(backlogQuotaType, backlogQuota); + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setBacklogQuota(namespaceName, backlogQuotaType, backlogQuota, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(), namespaceName, @@ -1094,6 +1365,10 @@ protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, Backlo throw new RestException(Status.CONFLICT, "Concurrent modification"); } catch (RestException pfe) { throw pfe; + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -1114,6 +1389,12 @@ protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) { byte[] content = globalZk().getData(path, null, nodeStat); Policies policies = jsonMapper().readValue(content, Policies.class); policies.backlog_quota_map.remove(backlogQuotaType); + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .removeBacklogQuota(namespaceName, backlogQuotaType, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully removed backlog namespace={}, quota={}", clientAppId(), namespaceName, @@ -1127,6 +1408,10 @@ protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) { log.warn("[{}] Failed to update backlog quota map for namespace {}: concurrent modification", clientAppId(), namespaceName); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -1149,6 +1434,12 @@ protected void internalSetRetention(RetentionPolicies retention) { "Retention Quota must exceed configured backlog quota for namespace."); } policies.retention_policies = retention; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setRetention(namespaceName, retention, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated retention configuration: namespace={}, map={}", clientAppId(), @@ -1164,6 +1455,10 @@ protected void internalSetRetention(RetentionPolicies retention) { throw new RestException(Status.CONFLICT, "Concurrent modification"); } catch (RestException pfe) { throw pfe; + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update retention configuration for namespace {}", clientAppId(), namespaceName, e); @@ -1182,6 +1477,12 @@ protected void internalSetPersistence(PersistencePolicies persistence) { byte[] content = globalZk().getData(path, null, nodeStat); Policies policies = jsonMapper().readValue(content, Policies.class); policies.persistence = persistence; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setPersistence(namespaceName, persistence, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated persistence configuration: namespace={}, map={}", clientAppId(), @@ -1195,6 +1496,10 @@ protected void internalSetPersistence(PersistencePolicies persistence) { log.warn("[{}] Failed to update persistence configuration for namespace {}: concurrent modification", clientAppId(), namespaceName); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update persistence configuration for namespace {}", clientAppId(), namespaceName, e); @@ -1206,6 +1511,18 @@ protected PersistencePolicies internalGetPersistence() { validateAdminAccessForTenant(namespaceName.getTenant()); Policies policies = getNamespacePolicies(namespaceName); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getPersistence(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + if (policies.persistence == null) { return new PersistencePolicies(config().getManagedLedgerDefaultEnsembleSize(), config().getManagedLedgerDefaultWriteQuorum(), config().getManagedLedgerDefaultAckQuorum(), 0.0d); @@ -1218,6 +1535,17 @@ protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolea validateAdminAccessForTenant(namespaceName.getTenant()); final List> futures = Lists.newArrayList(); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .clearNamespaceBacklog(namespaceName, authoritative, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); @@ -1271,6 +1599,17 @@ protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean a validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .clearNamespaceBundleBacklog(namespaceName, bundleRange, authoritative, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + clearBacklog(namespaceName, bundleRange, null); log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange); @@ -1281,6 +1620,17 @@ protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncR validateAdminAccessForTenant(namespaceName.getTenant()); final List> futures = Lists.newArrayList(); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .clearNamespaceBacklogForSubscription(namespaceName, subscription, authoritative, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); @@ -1335,6 +1685,17 @@ protected void internalClearNamespaceBundleBacklogForSubscription(String subscri validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .clearNamespaceBundleBacklogForSubscription(namespaceName, subscription, bundleRange, authoritative, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + clearBacklog(namespaceName, bundleRange, subscription); log.info("[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", clientAppId(), subscription, namespaceName, bundleRange); @@ -1345,6 +1706,17 @@ protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String validateAdminAccessForTenant(namespaceName.getTenant()); final List> futures = Lists.newArrayList(); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .unsubscribeNamespace(namespaceName, subscription, authoritative, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); @@ -1398,6 +1770,18 @@ protected void internalUnsubscribeNamespaceBundle(String subscription, String bu validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .unsubscribeNamespaceBundle(namespaceName, subscription, bundleRange, authoritative, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + unsubscribe(namespaceName, bundleRange, subscription); log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", clientAppId(), subscription, namespaceName, bundleRange); @@ -1417,6 +1801,12 @@ protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscription byte[] content = globalZk().getData(path, null, nodeStat); Policies policies = jsonMapper().readValue(content, Policies.class); policies.subscription_auth_mode = subscriptionAuthMode; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setSubscriptionAuthMode(namespaceName, subscriptionAuthMode, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated subscription auth mode: namespace={}, map={}", clientAppId(), @@ -1432,6 +1822,10 @@ protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscription throw new RestException(Status.CONFLICT, "Concurrent modification"); } catch (RestException pfe) { throw pfe; + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update subscription auth mode for namespace {}", clientAppId(), namespaceName, e); @@ -1451,6 +1845,11 @@ protected void internalModifyEncryptionRequired(boolean encryptionRequired) { () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); policiesNode.getKey().encryption_required = encryptionRequired; + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .modifyEncryptionRequired(namespaceName, encryptionRequired, clientAppId()); + // Write back the new policies into zookeeper globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion()); @@ -1468,6 +1867,10 @@ protected void internalModifyEncryptionRequired(boolean encryptionRequired) { clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to modify encryption required status on namespace {}", clientAppId(), namespaceName, e); @@ -1494,6 +1897,11 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); policiesNode.getKey().antiAffinityGroup = antiAffinityGroup; + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setNamespaceAntiAffinityGroup(namespaceName, antiAffinityGroup, clientAppId()); + // Write back the new policies into zookeeper globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion()); @@ -1511,6 +1919,10 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { clientAppId(), namespaceName, policiesNode.getValue().getVersion()); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update the antiAffinityGroup on namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -1519,6 +1931,18 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) { protected String internalGetNamespaceAntiAffinityGroup() { validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getNamespaceAntiAffinityGroup(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).antiAffinityGroup; } @@ -1534,6 +1958,12 @@ protected void internalRemoveNamespaceAntiAffinityGroup() { byte[] content = globalZk().getData(path, null, nodeStat); Policies policies = jsonMapper().readValue(content, Policies.class); policies.antiAffinityGroup = null; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .removeNamespaceAntiAffinityGroup(namespaceName, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName); @@ -1546,6 +1976,10 @@ protected void internalRemoveNamespaceAntiAffinityGroup() { log.warn("[{}] Failed to remove anti-affinity group for namespace {}: concurrent modification", clientAppId(), namespaceName); throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -1563,6 +1997,17 @@ protected List internalGetAntiAffinityNamespaces(String cluster, String } validateClusterExists(cluster); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getAntiAffinityNamespaces(namespaceName, cluster, antiAffinityGroup, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { List namespaces = getListOfNamespaces(tenant); @@ -1700,7 +2145,7 @@ private void unsubscribe(NamespaceName nsName, String bundleRange, String subscr * * @param clusterName: * given cluster whose peer-clusters can't be present into replication-cluster list - * @param clusters: + * @param replicationClusters: * replication-cluster list */ private void validatePeerClusterConflict(String clusterName, Set replicationClusters) { @@ -1800,6 +2245,18 @@ private void validatePolicies(NamespaceName ns, Policies policies) { protected int internalGetMaxProducersPerTopic() { validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getMaxProducersPerTopic(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).max_producers_per_topic; } @@ -1817,6 +2274,12 @@ protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) { "maxProducersPerTopic must be 0 or more"); } policies.max_producers_per_topic = maxProducersPerTopic; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setMaxProducersPerTopic(namespaceName, maxProducersPerTopic, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated maxProducersPerTopic configuration: namespace={}, value={}", clientAppId(), @@ -1832,6 +2295,10 @@ protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) { throw new RestException(Status.CONFLICT, "Concurrent modification"); } catch (RestException pfe) { throw pfe; + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update maxProducersPerTopic configuration for namespace {}", clientAppId(), namespaceName, e); @@ -1841,6 +2308,18 @@ protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) { protected int internalGetMaxConsumersPerTopic() { validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getMaxConsumersPerTopic(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).max_consumers_per_topic; } @@ -1858,6 +2337,12 @@ protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) { "maxConsumersPerTopic must be 0 or more"); } policies.max_consumers_per_topic = maxConsumersPerTopic; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setMaxConsumersPerTopic(namespaceName, maxConsumersPerTopic, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated maxConsumersPerTopic configuration: namespace={}, value={}", clientAppId(), @@ -1882,6 +2367,18 @@ protected void internalSetMaxConsumersPerTopic(int maxConsumersPerTopic) { protected int internalGetMaxConsumersPerSubscription() { validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getMaxConsumersPerSubscription(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).max_consumers_per_subscription; } @@ -1899,6 +2396,12 @@ protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscri "maxConsumersPerSubscription must be 0 or more"); } policies.max_consumers_per_subscription = maxConsumersPerSubscription; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setMaxConsumersPerSubscription(namespaceName, maxConsumersPerSubscription, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated maxConsumersPerSubscription configuration: namespace={}, value={}", clientAppId(), @@ -1914,6 +2417,10 @@ protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscri throw new RestException(Status.CONFLICT, "Concurrent modification"); } catch (RestException pfe) { throw pfe; + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}", clientAppId(), namespaceName, e); @@ -1923,6 +2430,18 @@ protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscri protected long internalGetCompactionThreshold() { validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getCompactionThreshold(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).compaction_threshold; } @@ -1940,6 +2459,12 @@ protected void internalSetCompactionThreshold(long newThreshold) { "compactionThreshold must be 0 or more"); } policies.compaction_threshold = newThreshold; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setCompactionThreshold(namespaceName, newThreshold, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated compactionThreshold configuration: namespace={}, value={}", @@ -1964,6 +2489,18 @@ protected void internalSetCompactionThreshold(long newThreshold) { protected long internalGetOffloadThreshold() { validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getOffloadThreshold(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).offload_threshold; } @@ -1977,6 +2514,12 @@ protected void internalSetOffloadThreshold(long newThreshold) { byte[] content = globalZk().getData(path, null, nodeStat); Policies policies = jsonMapper().readValue(content, Policies.class); policies.offload_threshold = newThreshold; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setOffloadThreshold(namespaceName, newThreshold, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated offloadThreshold configuration: namespace={}, value={}", @@ -2001,6 +2544,18 @@ protected void internalSetOffloadThreshold(long newThreshold) { protected Long internalGetOffloadDeletionLag() { validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getOffloadDeletionLag(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).offload_deletion_lag_ms; } @@ -2014,6 +2569,12 @@ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) { byte[] content = globalZk().getData(path, null, nodeStat); Policies policies = jsonMapper().readValue(content, Policies.class); policies.offload_deletion_lag_ms = newDeletionLagMs; + + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setOffloadDeletionLag(namespaceName, newDeletionLagMs, clientAppId()); + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); policiesCache().invalidate(path(POLICIES, namespaceName.toString())); log.info("[{}] Successfully updated offloadDeletionLagMs configuration: namespace={}, value={}", @@ -2029,6 +2590,10 @@ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) { throw new RestException(Status.CONFLICT, "Concurrent modification"); } catch (RestException pfe) { throw pfe; + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); } catch (Exception e) { log.error("[{}] Failed to update offloadDeletionLag configuration for namespace {}", clientAppId(), namespaceName, e); @@ -2039,6 +2604,18 @@ protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) { @Deprecated protected SchemaAutoUpdateCompatibilityStrategy internalGetSchemaAutoUpdateCompatibilityStrategy() { validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getSchemaAutoUpdateCompatibilityStrategy(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).schema_auto_update_compatibility_strategy; } @@ -2058,6 +2635,17 @@ protected void internalSetSchemaAutoUpdateCompatibilityStrategy(SchemaAutoUpdate validateSuperUserAccess(); validatePoliciesReadOnlyAccess(); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setSchemaAutoUpdateCompatibilityStrategy(namespaceName, strategy, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + mutatePolicy((policies) -> { policies.schema_auto_update_compatibility_strategy = strategy; return policies; @@ -2079,6 +2667,18 @@ protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrateg protected boolean internalGetSchemaValidationEnforced() { validateSuperUserAccess(); validateAdminAccessForTenant(namespaceName.getTenant()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .getSchemaValidationEnforced(namespaceName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + return getNamespacePolicies(namespaceName).schema_validation_enforced; } @@ -2086,6 +2686,17 @@ protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnfor validateSuperUserAccess(); validatePoliciesReadOnlyAccess(); + try { + pulsar().getBrokerService() + .getInterceptService() + .namespaces() + .setSchemaValidationEnforced(namespaceName, schemaValidationEnforced, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + mutatePolicy((policies) -> { policies. schema_validation_enforced = schemaValidationEnforced; return policies; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 972e1c3207bd9..e3373afcead42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -66,6 +66,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.admin.ZkAdminPaths; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.intercept.InterceptException; import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -396,7 +397,20 @@ protected void internalCreatePartitionedTopic(int numPartitions) { if (numPartitions <= 0) { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); } + validatePartitionTopicName(topicName.getLocalName()); + + try { + pulsar().getBrokerService() + .getInterceptService() + .topics() + .createPartitionedTopic(topicName, new PartitionedTopicMetadata(numPartitions), clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { boolean topicExist = pulsar().getNamespaceService() .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) @@ -438,7 +452,19 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) { } validateTopicOwnership(topicName, authoritative); - try { + + try { + pulsar().getBrokerService() + .getInterceptService() + .topics() + .createTopic(topicName, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + + try { Topic createdTopic = getOrCreateTopic(topicName); log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic); } catch (Exception e) { @@ -518,10 +544,22 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL } return; } - + if (numPartitions <= 0) { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); } + + try { + pulsar().getBrokerService() + .getInterceptService() + .topics() + .updatePartitionedTopic(topicName, new PartitionedTopicMetadata(numPartitions), clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { updatePartitionedTopic(topicName, numPartitions).get(); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java index 223b06aab8fa8..74932d648f4a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java @@ -32,6 +32,7 @@ import javax.ws.rs.core.Response.Status; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.intercept.InterceptException; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -98,6 +99,16 @@ public void createTenant( validateSuperUserAccess(); validatePoliciesReadOnlyAccess(); validateClusters(config); + try { + pulsar().getBrokerService() + .getInterceptService() + .tenants() + .createTenant(tenant, config, clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } try { NamedEntity.checkName(tenant); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index eeaeb96a649ee..82bddd0326439 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -46,6 +46,7 @@ import javax.ws.rs.core.Response.Status; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.intercept.InterceptException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.web.RestException; @@ -172,6 +173,18 @@ public void createPartitionedTopic( if (numPartitions <= 0) { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); } + + try { + pulsar().getBrokerService() + .getInterceptService() + .topics() + .createPartitionedTopic(topicName, new PartitionedTopicMetadata(numPartitions), clientAppId()); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + try { boolean topicExist = pulsar().getNamespaceService() .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index aa0e1982577ca..936b666cc4cea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -63,6 +63,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import lombok.AccessLevel; import lombok.Getter; @@ -89,6 +90,8 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; +import org.apache.pulsar.broker.intercept.InterceptException; +import org.apache.pulsar.broker.intercept.InterceptService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; @@ -102,6 +105,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.broker.web.PulsarWebResource; +import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -147,6 +151,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.core.Response; @Getter(AccessLevel.PUBLIC) @Setter(AccessLevel.PROTECTED) @@ -217,6 +222,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener deserialize(String key, byte[] content) throws Except .loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration()); this.defaultServerBootstrap = defaultServerBootstrap(); + this.interceptService = new InterceptService(pulsar.getConfiguration(), pulsar().getAdminClient()); } // This call is used for starting additional protocol handlers @@ -682,6 +690,16 @@ private CompletableFuture> createNonPersistentTopic(String topic new NotAllowedException("Broker is not unable to load non-persistent topic")); return topicFuture; } + + try { + interceptService + .topics() + .createTopic(TopicName.get(topic), null); + } catch (InterceptException e) { + topicFuture.completeExceptionally(new BrokerServiceException(e)); + return topicFuture; + } + final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this); CompletableFuture replicationFuture = nonPersistentTopic.checkReplication(); @@ -984,6 +1002,20 @@ public CompletableFuture getManagedLedgerConfig(TopicName t managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader()); + managedLedgerConfig.setCreateFunctionInterceptFunc(new Runnable() { + @Override + public void run() { + try { + interceptService + .topics() + .createTopic(topicName, null); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + } + }); policies.ifPresent(p -> { long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs(); if (p.offload_deletion_lag_ms != null) { @@ -2003,4 +2035,8 @@ public Optional getListenPortTls() { return Optional.empty(); } } + + public InterceptService getInterceptService() { + return interceptService; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index d5a6f08bcebb6..6ba24e4f02317 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -289,7 +289,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception { final URI dlUri = functionsWorkerService.getDlogUri(); functionsWorkerService.stop(); functionsWorkerService = new WorkerService(workerConfig); - functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null); + functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null, null); final FunctionRuntimeManager runtimeManager2 = functionsWorkerService.getFunctionRuntimeManager(); retryStrategically((test) -> { try { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java index 436bb4ddee68b..c90408409b046 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/Resources.java @@ -42,10 +42,10 @@ public class Resources { private static final Resources DEFAULT = new Resources(); - // Default cpu is 1 core - private Double cpu = 1d; - // Default memory is 1GB - private Long ram = 1073741824L; + // Default cpu is 0.5 core + private Double cpu = 0.5d; + // Default memory is 256MB + private Long ram = 256 * (long) Math.pow(2, 20); // Default disk is 10GB private Long disk = 10737418240L; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 0807d0f9a264b..abbbb449996d5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -25,6 +25,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import org.apache.pulsar.broker.intercept.InterceptService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.conf.InternalConfigurationData; @@ -64,21 +65,22 @@ public Worker(WorkerConfig workerConfig) { } protected void start() throws Exception { - URI dlogUri = initialize(this.workerConfig); + PulsarAdmin admin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), + workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), + workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(), + workerConfig.isTlsHostnameVerificationEnable()); - workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService()); + URI dlogUri = initialize(this.workerConfig, admin); + + workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService(), getInterceptService(admin)); this.server = new WorkerServer(workerService); this.server.start(); log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort()); } - private static URI initialize(WorkerConfig workerConfig) + private static URI initialize(WorkerConfig workerConfig, PulsarAdmin admin) throws InterruptedException, PulsarAdminException, IOException { - // initializing pulsar functions namespace - PulsarAdmin admin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(), - workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(), - workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection(), - workerConfig.isTlsHostnameVerificationEnable()); + InternalConfigurationData internalConf; // make sure pulsar broker is up log.info("Checking if pulsar service at {} is up...", workerConfig.getPulsarWebServiceUrl()); @@ -180,6 +182,11 @@ private AuthenticationService getAuthenticationService() throws PulsarServerExce return new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)); } + private InterceptService getInterceptService(PulsarAdmin admin) throws PulsarServerException { + return new InterceptService(PulsarConfigurationLoader.convertFrom(workerConfig), admin); + + } + public ZooKeeperClientFactory getZooKeeperClientFactory() { if (zkClientFactory == null) { zkClientFactory = new ZookeeperBkClientFactoryImpl(orderedExecutor); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index fdfb7dec9b626..55fca349bd2b8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -33,6 +33,7 @@ import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.intercept.InterceptService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -64,6 +65,7 @@ public class WorkerService { private final ScheduledExecutorService statsUpdater; private AuthenticationService authenticationService; private AuthorizationService authorizationService; + private InterceptService interceptService; private ConnectorsManager connectorsManager; private PulsarAdmin brokerAdmin; private PulsarAdmin functionAdmin; @@ -83,7 +85,8 @@ public WorkerService(WorkerConfig workerConfig) { public void start(URI dlogUri, AuthenticationService authenticationService, - AuthorizationService authorizationService) throws InterruptedException { + AuthorizationService authorizationService, + InterceptService interceptService) throws InterruptedException { log.info("Starting worker {}...", workerConfig.getWorkerId()); try { @@ -183,6 +186,8 @@ public void start(URI dlogUri, this.authorizationService = authorizationService; + this.interceptService = interceptService; + // Starting cluster services log.info("Start cluster services..."); this.clusterServiceCoordinator = new ClusterServiceCoordinator( diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 98cc3a5eb83bc..6e7689a992c7e 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -1138,7 +1138,7 @@ public void putFunctionState(final String tenant, log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e); throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); } - + if (!key.equals(state.getKey())) { log.error("{}/{}/{} Bad putFunction Request, path key doesn't match key in json", tenant, namespace, functionName); throw new RestException(Status.BAD_REQUEST, "Path key doesn't match key in json"); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 026fb77bf0918..61248d4ea74ae 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -18,10 +18,32 @@ */ package org.apache.pulsar.functions.worker.rest.api; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; +import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; +import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; + import com.google.protobuf.ByteString; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import lombok.extern.slf4j.Slf4j; + import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.intercept.InterceptException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.UpdateOptions; @@ -226,6 +248,19 @@ public void registerFunction(final String tenant, } functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + + try { + worker().getInterceptService() + .functions() + .createFunction(FunctionConfigUtils.convertFromDetails( + functionMetaDataBuilder.build().getFunctionDetails()), + clientRole); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + updateRequest(functionMetaDataBuilder.build()); } finally { @@ -417,6 +452,16 @@ public void updateFunction(final String tenant, functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + try { + worker().getInterceptService() + .functions() + .updateFunction(functionConfig, existingFunctionConfig, clientRole); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + updateRequest(functionMetaDataBuilder.build()); } finally { if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE)) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java index 70e7b85149574..8348b68cf15c0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java @@ -18,11 +18,34 @@ */ package org.apache.pulsar.functions.worker.rest.api; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; +import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; +import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; + import com.google.protobuf.ByteString; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import lombok.extern.slf4j.Slf4j; + import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.intercept.InterceptException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.functions.Utils; @@ -42,25 +65,6 @@ import org.apache.pulsar.functions.worker.rest.RestException; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.nio.file.Path; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.function.Supplier; - -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; -import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; -import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; - @Slf4j public class SinksImpl extends ComponentImpl { @@ -226,6 +230,19 @@ public void registerSink(final String tenant, } functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + + try { + worker().getInterceptService() + .sinks() + .createSink(SinkConfigUtils.convertFromDetails( + functionMetaDataBuilder.build().getFunctionDetails()), + clientRole); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + updateRequest(functionMetaDataBuilder.build()); } finally { @@ -420,6 +437,16 @@ public void updateSink(final String tenant, functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + try { + worker().getInterceptService() + .sinks() + .updateSink(sinkConfig, existingSinkConfig, clientRole); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + updateRequest(functionMetaDataBuilder.build()); } finally { if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE)) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java index 8cec959e990bb..57920b3c5367b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java @@ -18,11 +18,34 @@ */ package org.apache.pulsar.functions.worker.rest.api; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; +import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; +import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; + import com.google.protobuf.ByteString; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Path; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; + import lombok.extern.slf4j.Slf4j; + import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.intercept.InterceptException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.functions.Utils; @@ -42,25 +65,6 @@ import org.apache.pulsar.functions.worker.rest.RestException; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Response; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.nio.file.Path; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.function.Supplier; - -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; -import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData; -import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin; -import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; - @Slf4j public class SourcesImpl extends ComponentImpl { @@ -226,6 +230,19 @@ public void registerSource(final String tenant, } functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + + try { + worker().getInterceptService() + .sources() + .createSource(SourceConfigUtils.convertFromDetails( + functionMetaDataBuilder.build().getFunctionDetails()), + clientRole); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + updateRequest(functionMetaDataBuilder.build()); } finally { @@ -417,6 +434,16 @@ public void updateSource(final String tenant, functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder); + try { + worker().getInterceptService() + .sources() + .updateSource(sourceConfig, existingSourceConfig, clientRole); + } catch (InterceptException e) { + throw new RestException( + e.getErrorCode().orElse(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()), + e.getMessage()); + } + updateRequest(functionMetaDataBuilder.build()); } finally { if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE)) diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 20988e0cb7646..7f4738e180bcf 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -54,6 +54,8 @@ import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.intercept.InterceptService; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -164,6 +166,7 @@ public void setup() throws Exception { when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); + when(mockedWorkerService.getInterceptService()).thenReturn(new InterceptService(new ServiceConfiguration(), null)); when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index 5f90945983e7f..a2b9588230645 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -51,6 +51,8 @@ import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.intercept.InterceptService; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -168,6 +170,7 @@ public void setup() throws Exception { when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); + when(mockedWorkerService.getInterceptService()).thenReturn(new InterceptService(new ServiceConfiguration(), null)); when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java index c47f9e52b7b4d..649bfc1d8c068 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java @@ -22,6 +22,8 @@ import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.intercept.InterceptService; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -149,6 +151,7 @@ public void setup() throws Exception { when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); + when(mockedWorkerService.getInterceptService()).thenReturn(new InterceptService(new ServiceConfiguration(), null)); when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java index faf3f8812ff95..7537d464d6022 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java @@ -46,6 +46,8 @@ import org.apache.distributedlog.api.namespace.Namespace; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.intercept.InterceptService; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -146,6 +148,7 @@ public void setup() throws Exception { when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); when(mockedWorkerService.isInitialized()).thenReturn(true); when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); + when(mockedWorkerService.getInterceptService()).thenReturn(new InterceptService(new ServiceConfiguration(), null)); when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);