From 7dc412fb7f314fc738eb405a5b75706d8eaebeb9 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 21 Sep 2016 16:29:16 -0700 Subject: [PATCH] In unit tests, do ZK cache reloads in same thread, to avoid race conditions --- .../yahoo/pulsar/broker/PulsarService.java | 6 +-- .../auth/MockedPulsarServiceBaseTest.java | 7 +++ .../auth/SameThreadOrderedSafeExecutor.java | 46 +++++++++++++++++++ 3 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java index acd915d05109f..040faae248267 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/PulsarService.java @@ -180,7 +180,7 @@ public void close() throws PulsarServerException { adminClient.close(); adminClient = null; } - + nsservice = null; // executor is not initialized in mocks even when real close method is called @@ -357,10 +357,10 @@ private void startZkCacheService() throws PulsarServerException { LOG.info("starting configuration cache service"); - this.localZkCache = new LocalZooKeeperCache(getZkClient(), this.orderedExecutor); + this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor()); this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), (int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(), - this.orderedExecutor, this.executor); + getOrderedExecutor(), this.executor); try { this.globalZkCache.start(); } catch (IOException e) { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 13d318b5006a4..7795b74c41553 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -68,6 +68,8 @@ public abstract class MockedPulsarServiceBaseTest { protected MockZooKeeper mockZookKeeper; protected NonClosableMockBookKeeper mockBookKeeper; + private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor; + public MockedPulsarServiceBaseTest() { this.conf = new ServiceConfiguration(); this.conf.setBrokerServicePort(BROKER_PORT); @@ -96,6 +98,8 @@ private final void init() throws Exception { mockZookKeeper = createMockZooKeeper(); mockBookKeeper = new NonClosableMockBookKeeper(new ClientConfiguration(), mockZookKeeper); + sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor(); + startBroker(); brokerUrl = new URL("http://localhost:" + BROKER_WEBSERVICE_PORT); @@ -110,6 +114,7 @@ protected final void internalCleanup() throws Exception { pulsar.close(); mockBookKeeper.reallyShutdow(); mockZookKeeper.shutdown(); + sameThreadOrderedSafeExecutor.shutdown(); } protected abstract void setup() throws Exception; @@ -146,6 +151,8 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception { Supplier namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar)); doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider(); + + doReturn(sameThreadOrderedSafeExecutor).when(pulsar).getOrderedExecutor(); } private MockZooKeeper createMockZooKeeper() throws Exception { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java new file mode 100644 index 0000000000000..5dce811bb2e68 --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.broker.auth; + +import org.apache.bookkeeper.util.OrderedSafeExecutor; +import org.apache.bookkeeper.util.SafeRunnable; + +public class SameThreadOrderedSafeExecutor extends OrderedSafeExecutor { + + public SameThreadOrderedSafeExecutor() { + super(1, "ordered-executor"); + } + + @Override + public void submit(SafeRunnable r) { + r.run(); + } + + @Override + public void submitOrdered(int orderingKey, SafeRunnable r) { + r.run(); + } + + @Override + public void submitOrdered(long orderingKey, SafeRunnable r) { + r.run(); + } + + @Override + public void submitOrdered(Object orderingKey, SafeRunnable r) { + r.run(); + } +}