From 73675d067128412b54877a9c26de71d467599b7b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 28 Oct 2024 09:05:15 -0700 Subject: [PATCH] clean up some thread pools in tests (#17421) --- .../druid/common/guava/GuavaUtilsTest.java | 77 ++--- .../util/emitter/core/HttpEmitterTest.java | 36 ++- .../core/HttpPostEmitterLoggerStressTest.java | 96 +++--- .../core/HttpPostEmitterStressTest.java | 273 +++++++++--------- .../emitter/core/HttpPostEmitterTest.java | 32 +- .../metrics/BasicMonitorSchedulerTest.java | 7 + .../epinephelinae/ConcurrentGrouperTest.java | 210 +++++++------- .../nested/NestedDataColumnSupplierTest.java | 45 +-- .../NestedDataColumnSupplierV4Test.java | 45 +-- .../ScalarDoubleColumnSupplierTest.java | 45 +-- .../nested/ScalarLongColumnSupplierTest.java | 45 +-- .../ScalarStringColumnSupplierTest.java | 45 +-- .../nested/VariantColumnSupplierTest.java | 43 +-- 13 files changed, 535 insertions(+), 464 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java b/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java index 317cb350f970..8ec9584dcac9 100644 --- a/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java @@ -75,46 +75,53 @@ public void testCancelAll() int tasks = 3; ExecutorService service = Execs.multiThreaded(tasks, "GuavaUtilsTest-%d"); ListeningExecutorService exc = MoreExecutors.listeningDecorator(service); - //a flag what time to throw exception. - AtomicBoolean someoneFailed = new AtomicBoolean(false); - List latches = new ArrayList<>(tasks); - Function>> function = (taskCount) -> { - List> futures = new ArrayList<>(); - for (int i = 0; i < taskCount; i++) { - final CountDownLatch latch = new CountDownLatch(1); - latches.add(latch); - ListenableFuture future = exc.submit(new Callable() { - @Override - public Object call() throws RuntimeException, InterruptedException + try { + //a flag what time to throw exception. + AtomicBoolean someoneFailed = new AtomicBoolean(false); + List latches = new ArrayList<>(tasks); + Function>> function = (taskCount) -> { + List> futures = new ArrayList<>(); + for (int i = 0; i < taskCount; i++) { + final CountDownLatch latch = new CountDownLatch(1); + latches.add(latch); + ListenableFuture future = exc.submit(new Callable() { - latch.await(60, TimeUnit.SECONDS); - if (someoneFailed.compareAndSet(false, true)) { - throw new RuntimeException("This exception simulates an error"); + @Override + public Object call() throws RuntimeException, InterruptedException + { + latch.await(60, TimeUnit.SECONDS); + if (someoneFailed.compareAndSet(false, true)) { + throw new RuntimeException("This exception simulates an error"); + } + return null; } - return null; - } - }); - futures.add(future); - } - return futures; - }; + }); + futures.add(future); + } + return futures; + }; - List> futures = function.apply(tasks); - Assert.assertEquals(tasks, futures.stream().filter(f -> !f.isDone()).count()); - // "release" the last tasks, which will cause it to fail as someoneFailed will still be false - latches.get(tasks - 1).countDown(); + List> futures = function.apply(tasks); + Assert.assertEquals(tasks, futures.stream().filter(f -> !f.isDone()).count()); + // "release" the last tasks, which will cause it to fail as someoneFailed will still be false + latches.get(tasks - 1).countDown(); - ListenableFuture> future = Futures.allAsList(futures); + ListenableFuture> future = Futures.allAsList(futures); - ExecutionException thrown = Assert.assertThrows( - ExecutionException.class, - future::get - ); - Assert.assertEquals("This exception simulates an error", thrown.getCause().getMessage()); - GuavaUtils.cancelAll(true, future, futures); - Assert.assertEquals(0, futures.stream().filter(f -> !f.isDone()).count()); - for (CountDownLatch latch : latches) { - latch.countDown(); + ExecutionException thrown = Assert.assertThrows( + ExecutionException.class, + future::get + ); + Assert.assertEquals("This exception simulates an error", thrown.getCause().getMessage()); + GuavaUtils.cancelAll(true, future, futures); + Assert.assertEquals(0, futures.stream().filter(f -> !f.isDone()).count()); + for (CountDownLatch latch : latches) { + latch.countDown(); + } + } + finally { + exc.shutdownNow(); + service.shutdownNow(); } } } diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java index b589455b4d83..384b3a0a8eaa 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpEmitterTest.java @@ -72,21 +72,27 @@ public void timeoutEmptyQueue() throws IOException, InterruptedException .setHttpTimeoutAllowanceFactor(timeoutAllowanceFactor) .setFlushTimeout(BaseHttpEmittingConfig.TEST_FLUSH_TIMEOUT_MILLIS) .build(); - final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER); + try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER)) { + long startMs = System.currentTimeMillis(); + emitter.start(); + emitter.emitAndReturnBatch(new IntEvent()); + emitter.flush(); + long fillTimeMs = System.currentTimeMillis() - startMs; + MatcherAssert.assertThat( + (double) timeoutUsed.get(), + Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)) + ); - long startMs = System.currentTimeMillis(); - emitter.start(); - emitter.emitAndReturnBatch(new IntEvent()); - emitter.flush(); - long fillTimeMs = System.currentTimeMillis() - startMs; - MatcherAssert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5))); - - startMs = System.currentTimeMillis(); - final Batch batch = emitter.emitAndReturnBatch(new IntEvent()); - Thread.sleep(1000); - batch.seal(); - emitter.flush(); - fillTimeMs = System.currentTimeMillis() - startMs; - MatcherAssert.assertThat((double) timeoutUsed.get(), Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5))); + startMs = System.currentTimeMillis(); + final Batch batch = emitter.emitAndReturnBatch(new IntEvent()); + Thread.sleep(1000); + batch.seal(); + emitter.flush(); + fillTimeMs = System.currentTimeMillis() - startMs; + MatcherAssert.assertThat( + (double) timeoutUsed.get(), + Matchers.lessThan(fillTimeMs * (timeoutAllowanceFactor + 0.5)) + ); + } } } diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java index ad4094679292..b4a6f5a25e36 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterLoggerStressTest.java @@ -52,61 +52,63 @@ public void testBurstFollowedByQuietPeriod() throws InterruptedException, IOExce .setBatchQueueSizeLimit(10) .setMinHttpTimeoutMillis(100) .build(); - final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper()); + try (HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper())) { - emitter.start(); + emitter.start(); - httpClient.setGoHandler(new GoHandler() { - @Override - protected ListenableFuture go(Request request) + httpClient.setGoHandler(new GoHandler() { - return GoHandlers.immediateFuture(EmitterTest.okResponse()); + @Override + protected ListenableFuture go(Request request) + { + return GoHandlers.immediateFuture(EmitterTest.okResponse()); + } + }); + + Event smallEvent = ServiceMetricEvent.builder() + .setFeed("smallEvents") + .setDimension("test", "hi") + .setMetric("metric", 10) + .build("qwerty", "asdfgh"); + + for (int i = 0; i < 1000; i++) { + emitter.emit(smallEvent); + + Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10); + Assert.assertTrue(emitter.getBuffersToEmit() <= 12); } - }); - Event smallEvent = ServiceMetricEvent.builder() - .setFeed("smallEvents") - .setDimension("test", "hi") - .setMetric("metric", 10) - .build("qwerty", "asdfgh"); + // by the end of this test, there should be no outstanding failed buffers - for (int i = 0; i < 1000; i++) { - emitter.emit(smallEvent); + // with a flush time of 5s, min timeout of 100ms, 20s should be + // easily enough to get through all of the events - Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10); - Assert.assertTrue(emitter.getBuffersToEmit() <= 12); - } - - // by the end of this test, there should be no outstanding failed buffers - - // with a flush time of 5s, min timeout of 100ms, 20s should be - // easily enough to get through all of the events + while (emitter.getTotalFailedBuffers() > 0) { + Thread.sleep(500); + } - while (emitter.getTotalFailedBuffers() > 0) { - Thread.sleep(500); + // there is also no reason to have too many log events + // refer to: https://github.com/apache/druid/issues/11279; + + long countOfTimeouts = logCapture.getLogEvents().stream() + .filter(ev -> ev.getLevel() == Level.DEBUG) + .filter(ev -> ev.getThrown() instanceof TimeoutException) + .count(); + + // 1000 events limit, implies we should have no more than + // 1000 rejected send events within the expected 20sec + // duration of the test + long limitTimeoutEvents = 1000; + + Assert.assertTrue( + String.format( + Locale.getDefault(), + "too many timeouts (%d), expect less than (%d)", + countOfTimeouts, + limitTimeoutEvents + ), + countOfTimeouts < limitTimeoutEvents + ); } - - // there is also no reason to have too many log events - // refer to: https://github.com/apache/druid/issues/11279; - - long countOfTimeouts = logCapture.getLogEvents().stream() - .filter(ev -> ev.getLevel() == Level.DEBUG) - .filter(ev -> ev.getThrown() instanceof TimeoutException) - .count(); - - // 1000 events limit, implies we should have no more than - // 1000 rejected send events within the expected 20sec - // duration of the test - long limitTimeoutEvents = 1000; - - Assert.assertTrue( - String.format( - Locale.getDefault(), - "too many timeouts (%d), expect less than (%d)", - countOfTimeouts, - limitTimeoutEvents), - countOfTimeouts < limitTimeoutEvents); - - emitter.close(); } } diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java index 5935175d7de4..2509db7a4b0a 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterStressTest.java @@ -65,78 +65,80 @@ public void eventCountBased() throws InterruptedException, IOException // For this test, we don't need any batches to be dropped, i. e. "gaps" in data .setBatchQueueSizeLimit(1000) .build(); - final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER); - int nThreads = Runtime.getRuntime().availableProcessors() * 2; - final List eventsPerThread = new ArrayList<>(nThreads); - final List> eventBatchesPerThread = new ArrayList<>(nThreads); - for (int i = 0; i < nThreads; i++) { - eventsPerThread.add(new IntArrayList()); - eventBatchesPerThread.add(new ArrayList()); - } - for (int i = 0; i < N; i++) { - eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i); - } - final BitSet emittedEvents = new BitSet(N); - httpClient.setGoHandler(new GoHandler() - { - @Override - protected ListenableFuture go(Request request) - { - ByteBuffer batch = request.getByteBufferData().slice(); - while (batch.remaining() > 0) { - emittedEvents.set(batch.getInt()); - } - return GoHandlers.immediateFuture(EmitterTest.okResponse()); + try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER)) { + int nThreads = Runtime.getRuntime().availableProcessors() * 2; + final List eventsPerThread = new ArrayList<>(nThreads); + final List> eventBatchesPerThread = new ArrayList<>(nThreads); + for (int i = 0; i < nThreads; i++) { + eventsPerThread.add(new IntArrayList()); + eventBatchesPerThread.add(new ArrayList()); + } + for (int i = 0; i < N; i++) { + eventsPerThread.get(ThreadLocalRandom.current().nextInt(nThreads)).add(i); } - }); - emitter.start(); - final CountDownLatch threadsCompleted = new CountDownLatch(nThreads); - for (int i = 0; i < nThreads; i++) { - final int threadIndex = i; - new Thread() { + final BitSet emittedEvents = new BitSet(N); + httpClient.setGoHandler(new GoHandler() + { @Override - public void run() + protected ListenableFuture go(Request request) { - IntList events = eventsPerThread.get(threadIndex); - List eventBatches = eventBatchesPerThread.get(threadIndex); - IntEvent event = new IntEvent(); - for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) { - event.index = events.getInt(i); - eventBatches.add(emitter.emitAndReturnBatch(event)); - if (i % 16 == 0) { - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - throw new RuntimeException(e); + ByteBuffer batch = request.getByteBufferData().slice(); + while (batch.remaining() > 0) { + emittedEvents.set(batch.getInt()); + } + return GoHandlers.immediateFuture(EmitterTest.okResponse()); + } + }); + emitter.start(); + final CountDownLatch threadsCompleted = new CountDownLatch(nThreads); + for (int i = 0; i < nThreads; i++) { + final int threadIndex = i; + new Thread() + { + @Override + public void run() + { + IntList events = eventsPerThread.get(threadIndex); + List eventBatches = eventBatchesPerThread.get(threadIndex); + IntEvent event = new IntEvent(); + for (int i = 0, eventsSize = events.size(); i < eventsSize; i++) { + event.index = events.getInt(i); + eventBatches.add(emitter.emitAndReturnBatch(event)); + if (i % 16 == 0) { + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } } + threadsCompleted.countDown(); } - threadsCompleted.countDown(); - } - }.start(); - } - threadsCompleted.await(); - emitter.flush(); - System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers()); - for (int eventIndex = 0; eventIndex < N; eventIndex++) { - if (!emittedEvents.get(eventIndex)) { - for (int threadIndex = 0; threadIndex < eventsPerThread.size(); threadIndex++) { - IntList threadEvents = eventsPerThread.get(threadIndex); - int indexOfEvent = threadEvents.indexOf(eventIndex); - if (indexOfEvent >= 0) { - Batch batch = eventBatchesPerThread.get(threadIndex).get(indexOfEvent); - System.err.println(batch); - int bufferWatermark = batch.getSealedBufferWatermark(); - ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer); - batchBuffer.limit(bufferWatermark); - while (batchBuffer.remaining() > 0) { - System.err.println(batchBuffer.getInt()); + }.start(); + } + threadsCompleted.await(); + emitter.flush(); + System.out.println("Allocated buffers: " + emitter.getTotalAllocatedBuffers()); + for (int eventIndex = 0; eventIndex < N; eventIndex++) { + if (!emittedEvents.get(eventIndex)) { + for (int threadIndex = 0; threadIndex < eventsPerThread.size(); threadIndex++) { + IntList threadEvents = eventsPerThread.get(threadIndex); + int indexOfEvent = threadEvents.indexOf(eventIndex); + if (indexOfEvent >= 0) { + Batch batch = eventBatchesPerThread.get(threadIndex).get(indexOfEvent); + System.err.println(batch); + int bufferWatermark = batch.getSealedBufferWatermark(); + ByteBuffer batchBuffer = ByteBuffer.wrap(batch.buffer); + batchBuffer.limit(bufferWatermark); + while (batchBuffer.remaining() > 0) { + System.err.println(batchBuffer.getInt()); + } + break; } - break; } + throw new AssertionError("event " + eventIndex); } - throw new AssertionError("event " + eventIndex); } } } @@ -151,34 +153,36 @@ public void testLargeEventsQueueLimit() throws IOException .setMaxBatchSize(1024 * 1024) .setBatchQueueSizeLimit(10) .build(); - final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper()); + try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper())) { - emitter.start(); + emitter.start(); - httpClient.setGoHandler(new GoHandler() { - @Override - protected ListenableFuture go(Request request) + httpClient.setGoHandler(new GoHandler() { - return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE); - } - }); + @Override + protected ListenableFuture go(Request request) + { + return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE); + } + }); - char[] chars = new char[600000]; - Arrays.fill(chars, '*'); - String bigString = new String(chars); + char[] chars = new char[600000]; + Arrays.fill(chars, '*'); + String bigString = new String(chars); - Event bigEvent = ServiceMetricEvent.builder() - .setFeed("bigEvents") - .setDimension("test", bigString) - .setMetric("metric", 10) - .build("qwerty", "asdfgh"); + Event bigEvent = ServiceMetricEvent.builder() + .setFeed("bigEvents") + .setDimension("test", bigString) + .setMetric("metric", 10) + .build("qwerty", "asdfgh"); - for (int i = 0; i < 1000; i++) { - emitter.emit(bigEvent); - Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11); - } + for (int i = 0; i < 1000; i++) { + emitter.emit(bigEvent); + Assert.assertTrue(emitter.getLargeEventsToEmit() <= 11); + } - emitter.flush(); + emitter.flush(); + } } @Test @@ -191,64 +195,67 @@ public void testLargeAndSmallEventsQueueLimit() throws InterruptedException, IOE .setMaxBatchSize(1024 * 1024) .setBatchQueueSizeLimit(10) .build(); - final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper()); + try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, new ObjectMapper())) { + emitter.start(); - emitter.start(); - - httpClient.setGoHandler(new GoHandler() { - @Override - protected ListenableFuture go(Request request) + httpClient.setGoHandler(new GoHandler() { - return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE); - } - }); - - char[] chars = new char[600000]; - Arrays.fill(chars, '*'); - String bigString = new String(chars); - - Event smallEvent = ServiceMetricEvent.builder() - .setFeed("smallEvents") - .setDimension("test", "hi") - .setMetric("metric", 10) - .build("qwerty", "asdfgh"); - - Event bigEvent = ServiceMetricEvent.builder() - .setFeed("bigEvents") - .setDimension("test", bigString) - .setMetric("metric", 10) - .build("qwerty", "asdfgh"); - - final CountDownLatch threadsCompleted = new CountDownLatch(2); - new Thread() { - @Override - public void run() + @Override + protected ListenableFuture go(Request request) + { + return GoHandlers.immediateFuture(EmitterTest.BAD_RESPONSE); + } + }); + + char[] chars = new char[600000]; + Arrays.fill(chars, '*'); + String bigString = new String(chars); + + Event smallEvent = ServiceMetricEvent.builder() + .setFeed("smallEvents") + .setDimension("test", "hi") + .setMetric("metric", 10) + .build("qwerty", "asdfgh"); + + Event bigEvent = ServiceMetricEvent.builder() + .setFeed("bigEvents") + .setDimension("test", bigString) + .setMetric("metric", 10) + .build("qwerty", "asdfgh"); + + final CountDownLatch threadsCompleted = new CountDownLatch(2); + new Thread() { - for (int i = 0; i < 1000; i++) { + @Override + public void run() + { + for (int i = 0; i < 1000; i++) { - emitter.emit(smallEvent); + emitter.emit(smallEvent); - Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10); - Assert.assertTrue(emitter.getBuffersToEmit() <= 12); + Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10); + Assert.assertTrue(emitter.getBuffersToEmit() <= 12); + } + threadsCompleted.countDown(); } - threadsCompleted.countDown(); - } - }.start(); - new Thread() { - @Override - public void run() + }.start(); + new Thread() { - for (int i = 0; i < 1000; i++) { + @Override + public void run() + { + for (int i = 0; i < 1000; i++) { - emitter.emit(bigEvent); + emitter.emit(bigEvent); - Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10); - Assert.assertTrue(emitter.getBuffersToEmit() <= 12); + Assert.assertTrue(emitter.getTotalFailedBuffers() <= 10); + Assert.assertTrue(emitter.getBuffersToEmit() <= 12); + } + threadsCompleted.countDown(); } - threadsCompleted.countDown(); - } - }.start(); - threadsCompleted.await(); - emitter.flush(); + }.start(); + threadsCompleted.await(); + emitter.flush(); + } } } diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java index fd3d312e91f7..987de0b3c5e7 100644 --- a/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/emitter/core/HttpPostEmitterTest.java @@ -73,26 +73,26 @@ public void testRecoveryEmitAndReturnBatch() .setMaxBatchSize(1024 * 1024) .setBatchQueueSizeLimit(1000) .build(); - final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER); - emitter.start(); + try (final HttpPostEmitter emitter = new HttpPostEmitter(config, httpClient, OBJECT_MAPPER)) { + emitter.start(); - // emit first event - emitter.emitAndReturnBatch(new IntEvent()); - Thread.sleep(1000L); + // emit first event + emitter.emitAndReturnBatch(new IntEvent()); + Thread.sleep(1000L); - // get concurrentBatch reference and set value to lon as if it would fail while - // HttpPostEmitter#onSealExclusive method invocation. - Field concurrentBatch = emitter.getClass().getDeclaredField("concurrentBatch"); - concurrentBatch.setAccessible(true); - ((AtomicReference) concurrentBatch.get(emitter)).getAndSet(1L); - // something terrible happened previously so that batch has to recover + // get concurrentBatch reference and set value to lon as if it would fail while + // HttpPostEmitter#onSealExclusive method invocation. + Field concurrentBatch = emitter.getClass().getDeclaredField("concurrentBatch"); + concurrentBatch.setAccessible(true); + ((AtomicReference) concurrentBatch.get(emitter)).getAndSet(1L); + // something terrible happened previously so that batch has to recover - // emit second event - emitter.emitAndReturnBatch(new IntEvent()); + // emit second event + emitter.emitAndReturnBatch(new IntEvent()); - emitter.flush(); - emitter.close(); + emitter.flush(); - Assert.assertEquals(2, emitter.getTotalEmittedEvents()); + Assert.assertEquals(2, emitter.getTotalEmittedEvents()); + } } } diff --git a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java index ceac1e55644d..6e6ca49cdd9a 100644 --- a/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/metrics/BasicMonitorSchedulerTest.java @@ -23,6 +23,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.joda.time.Duration; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatchers; @@ -50,6 +51,12 @@ public void setup() exec = Execs.scheduledSingleThreaded("BasicMonitorSchedulerTest"); } + @After + public void teardown() + { + exec.shutdownNow(); + } + @Test public void testStart_RepeatScheduling() throws InterruptedException { diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java index 6ab2f1a7bf0b..4da2fbbb9a2c 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java @@ -24,6 +24,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.jackson.DefaultObjectMapper; @@ -150,67 +151,72 @@ public void testAggregate() throws InterruptedException, ExecutionException, IOE temporaryFolder.newFolder(), 1024 * 1024 ); + final ListeningExecutorService service = MoreExecutors.listeningDecorator(exec); + try { + final ConcurrentGrouper grouper = new ConcurrentGrouper<>( + bufferSupplier, + TEST_RESOURCE_HOLDER, + KEY_SERDE_FACTORY, + KEY_SERDE_FACTORY, + NULL_FACTORY, + new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + 1024, + 0.7f, + 1, + temporaryStorage, + new DefaultObjectMapper(), + concurrencyHint, + null, + false, + service, + 0, + false, + 0, + 4, + parallelCombineThreads, + mergeThreadLocal + ); + closer.register(grouper); + grouper.init(); + + final int numRows = 1000; + + Future[] futures = new Future[concurrencyHint]; + + for (int i = 0; i < concurrencyHint; i++) { + futures[i] = exec.submit(() -> { + for (long j = 0; j < numRows; j++) { + if (!grouper.aggregate(new LongKey(j)).isOk()) { + throw new ISE("Grouper is full"); + } + } + }); + } - final ConcurrentGrouper grouper = new ConcurrentGrouper<>( - bufferSupplier, - TEST_RESOURCE_HOLDER, - KEY_SERDE_FACTORY, - KEY_SERDE_FACTORY, - NULL_FACTORY, - new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, - 1024, - 0.7f, - 1, - temporaryStorage, - new DefaultObjectMapper(), - concurrencyHint, - null, - false, - MoreExecutors.listeningDecorator(exec), - 0, - false, - 0, - 4, - parallelCombineThreads, - mergeThreadLocal - ); - closer.register(grouper); - grouper.init(); + for (Future eachFuture : futures) { + eachFuture.get(); + } - final int numRows = 1000; + final List> expected = new ArrayList<>(); + for (long i = 0; i < numRows; i++) { + expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long) concurrencyHint})); + } - Future[] futures = new Future[concurrencyHint]; + final CloseableIterator> iterator = closer.register(grouper.iterator(true)); - for (int i = 0; i < concurrencyHint; i++) { - futures[i] = exec.submit(() -> { - for (long j = 0; j < numRows; j++) { - if (!grouper.aggregate(new LongKey(j)).isOk()) { - throw new ISE("Grouper is full"); - } - } - }); - } - - for (Future eachFuture : futures) { - eachFuture.get(); - } + if (parallelCombineThreads > 1 && (mergeThreadLocal || temporaryStorage.currentSize() > 0)) { + // Parallel combiner configured, and expected to actually be used due to thread-local merge (either explicitly + // configured, or due to spilling). + Assert.assertTrue(TEST_RESOURCE_HOLDER.taken); + } else { + Assert.assertFalse(TEST_RESOURCE_HOLDER.taken); + } - final List> expected = new ArrayList<>(); - for (long i = 0; i < numRows; i++) { - expected.add(new ReusableEntry<>(new LongKey(i), new Object[]{(long) concurrencyHint})); + GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator); } - - final CloseableIterator> iterator = closer.register(grouper.iterator(true)); - - if (parallelCombineThreads > 1 && (mergeThreadLocal || temporaryStorage.currentSize() > 0)) { - // Parallel combiner configured, and expected to actually be used due to thread-local merge (either explicitly - // configured, or due to spilling). - Assert.assertTrue(TEST_RESOURCE_HOLDER.taken); - } else { - Assert.assertFalse(TEST_RESOURCE_HOLDER.taken); + finally { + service.shutdownNow(); } - - GrouperTestUtil.assertEntriesEquals(expected.iterator(), iterator); } @Test @@ -221,56 +227,62 @@ public void testGrouperTimeout() throws Exception return; } - final ConcurrentGrouper grouper = new ConcurrentGrouper<>( - bufferSupplier, - TEST_RESOURCE_HOLDER, - KEY_SERDE_FACTORY, - KEY_SERDE_FACTORY, - NULL_FACTORY, - new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, - 1024, - 0.7f, - 1, - new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024), - new DefaultObjectMapper(), - concurrencyHint, - null, - false, - MoreExecutors.listeningDecorator(exec), - 0, - true, - 1, - 4, - parallelCombineThreads, - mergeThreadLocal - ); - closer.register(grouper); - grouper.init(); + ListeningExecutorService service = MoreExecutors.listeningDecorator(exec); + try { + final ConcurrentGrouper grouper = new ConcurrentGrouper<>( + bufferSupplier, + TEST_RESOURCE_HOLDER, + KEY_SERDE_FACTORY, + KEY_SERDE_FACTORY, + NULL_FACTORY, + new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + 1024, + 0.7f, + 1, + new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024), + new DefaultObjectMapper(), + concurrencyHint, + null, + false, + service, + 0, + true, + 1, + 4, + parallelCombineThreads, + mergeThreadLocal + ); + closer.register(grouper); + grouper.init(); + + final int numRows = 1000; + + Future[] futures = new Future[concurrencyHint]; + + for (int i = 0; i < concurrencyHint; i++) { + futures[i] = exec.submit(() -> { + for (long j = 0; j < numRows; j++) { + if (!grouper.aggregate(new LongKey(j)).isOk()) { + throw new ISE("Grouper is full"); + } + } + }); + } - final int numRows = 1000; + for (Future eachFuture : futures) { + eachFuture.get(); + } - Future[] futures = new Future[concurrencyHint]; + final QueryTimeoutException e = Assert.assertThrows( + QueryTimeoutException.class, + () -> closer.register(grouper.iterator(true)) + ); - for (int i = 0; i < concurrencyHint; i++) { - futures[i] = exec.submit(() -> { - for (long j = 0; j < numRows; j++) { - if (!grouper.aggregate(new LongKey(j)).isOk()) { - throw new ISE("Grouper is full"); - } - } - }); + Assert.assertEquals("Query timeout", e.getErrorCode()); } - - for (Future eachFuture : futures) { - eachFuture.get(); + finally { + service.shutdownNow(); } - - final QueryTimeoutException e = Assert.assertThrows( - QueryTimeoutException.class, - () -> closer.register(grouper.iterator(true)) - ); - - Assert.assertEquals("Query timeout", e.getErrorCode()); } static class TestResourceHolder extends ReferenceCountingResourceHolder diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java index 64ba2679f04c..7cf942201006 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierTest.java @@ -299,30 +299,35 @@ public void testConcurrency() throws ExecutionException, InterruptedException final int threads = 10; ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d") + Execs.multiThreaded(threads, "NestedColumnSupplierTest-%d") ); - Collection> futures = new ArrayList<>(threads); - final CountDownLatch threadsStartLatch = new CountDownLatch(1); - for (int i = 0; i < threads; ++i) { - futures.add( - executorService.submit(() -> { - try { - threadsStartLatch.await(); - for (int iter = 0; iter < 5000; iter++) { - try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) { - smokeTest(column); + try { + Collection> futures = new ArrayList<>(threads); + final CountDownLatch threadsStartLatch = new CountDownLatch(1); + for (int i = 0; i < threads; ++i) { + futures.add( + executorService.submit(() -> { + try { + threadsStartLatch.await(); + for (int iter = 0; iter < 5000; iter++) { + try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) { + smokeTest(column); + } } } - } - catch (Throwable ex) { - failureReason.set(ex.getMessage()); - } - }) - ); + catch (Throwable ex) { + failureReason.set(ex.getMessage()); + } + }) + ); + } + threadsStartLatch.countDown(); + Futures.allAsList(futures).get(); + Assert.assertEquals(expectedReason, failureReason.get()); + } + finally { + executorService.shutdownNow(); } - threadsStartLatch.countDown(); - Futures.allAsList(futures).get(); - Assert.assertEquals(expectedReason, failureReason.get()); } private void smokeTest(NestedDataComplexColumn column) throws IOException diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java index aa42d58710da..315077b4160e 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedDataColumnSupplierV4Test.java @@ -255,30 +255,35 @@ public void testConcurrency() throws ExecutionException, InterruptedException final int threads = 10; ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Execs.multiThreaded(threads, "NestedDataColumnSupplierTest-%d") + Execs.multiThreaded(threads, "NestedDataColumnSupplierV4Test-%d") ); - Collection> futures = new ArrayList<>(threads); - final CountDownLatch threadsStartLatch = new CountDownLatch(1); - for (int i = 0; i < threads; ++i) { - futures.add( - executorService.submit(() -> { - try { - threadsStartLatch.await(); - for (int iter = 0; iter < 5000; iter++) { - try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) { - smokeTest(column); + try { + Collection> futures = new ArrayList<>(threads); + final CountDownLatch threadsStartLatch = new CountDownLatch(1); + for (int i = 0; i < threads; ++i) { + futures.add( + executorService.submit(() -> { + try { + threadsStartLatch.await(); + for (int iter = 0; iter < 5000; iter++) { + try (NestedDataComplexColumn column = (NestedDataComplexColumn) supplier.get()) { + smokeTest(column); + } } } - } - catch (Throwable ex) { - failureReason.set(ex.getMessage()); - } - }) - ); + catch (Throwable ex) { + failureReason.set(ex.getMessage()); + } + }) + ); + } + threadsStartLatch.countDown(); + Futures.allAsList(futures).get(); + Assert.assertEquals(expectedReason, failureReason.get()); + } + finally { + executorService.shutdownNow(); } - threadsStartLatch.countDown(); - Futures.allAsList(futures).get(); - Assert.assertEquals(expectedReason, failureReason.get()); } @Test diff --git a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java index f483e297be01..b095ad73aa6d 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java @@ -219,30 +219,35 @@ public void testConcurrency() throws ExecutionException, InterruptedException final int threads = 10; ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d") + Execs.multiThreaded(threads, "ScalarDoubleColumnSupplierTest-%d") ); - Collection> futures = new ArrayList<>(threads); - final CountDownLatch threadsStartLatch = new CountDownLatch(1); - for (int i = 0; i < threads; ++i) { - futures.add( - executorService.submit(() -> { - try { - threadsStartLatch.await(); - for (int iter = 0; iter < 5000; iter++) { - try (ScalarDoubleColumn column = (ScalarDoubleColumn) supplier.get()) { - smokeTest(supplier, column); + try { + Collection> futures = new ArrayList<>(threads); + final CountDownLatch threadsStartLatch = new CountDownLatch(1); + for (int i = 0; i < threads; ++i) { + futures.add( + executorService.submit(() -> { + try { + threadsStartLatch.await(); + for (int iter = 0; iter < 5000; iter++) { + try (ScalarDoubleColumn column = (ScalarDoubleColumn) supplier.get()) { + smokeTest(supplier, column); + } } } - } - catch (Throwable ex) { - failureReason.set(ex.getMessage()); - } - }) - ); + catch (Throwable ex) { + failureReason.set(ex.getMessage()); + } + }) + ); + } + threadsStartLatch.countDown(); + Futures.allAsList(futures).get(); + Assert.assertEquals(expectedReason, failureReason.get()); + } + finally { + executorService.shutdownNow(); } - threadsStartLatch.countDown(); - Futures.allAsList(futures).get(); - Assert.assertEquals(expectedReason, failureReason.get()); } private void smokeTest(ScalarDoubleColumnAndIndexSupplier supplier, ScalarDoubleColumn column) diff --git a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java index c8830f3aefd0..68b73a27bc2f 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java @@ -219,30 +219,35 @@ public void testConcurrency() throws ExecutionException, InterruptedException final int threads = 10; ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d") + Execs.multiThreaded(threads, "ScalarLongColumnSupplierTest-%d") ); - Collection> futures = new ArrayList<>(threads); - final CountDownLatch threadsStartLatch = new CountDownLatch(1); - for (int i = 0; i < threads; ++i) { - futures.add( - executorService.submit(() -> { - try { - threadsStartLatch.await(); - for (int iter = 0; iter < 5000; iter++) { - try (ScalarLongColumn column = (ScalarLongColumn) supplier.get()) { - smokeTest(supplier, column); + try { + Collection> futures = new ArrayList<>(threads); + final CountDownLatch threadsStartLatch = new CountDownLatch(1); + for (int i = 0; i < threads; ++i) { + futures.add( + executorService.submit(() -> { + try { + threadsStartLatch.await(); + for (int iter = 0; iter < 5000; iter++) { + try (ScalarLongColumn column = (ScalarLongColumn) supplier.get()) { + smokeTest(supplier, column); + } } } - } - catch (Throwable ex) { - failureReason.set(ex.getMessage()); - } - }) - ); + catch (Throwable ex) { + failureReason.set(ex.getMessage()); + } + }) + ); + } + threadsStartLatch.countDown(); + Futures.allAsList(futures).get(); + Assert.assertEquals(expectedReason, failureReason.get()); + } + finally { + executorService.shutdownNow(); } - threadsStartLatch.countDown(); - Futures.allAsList(futures).get(); - Assert.assertEquals(expectedReason, failureReason.get()); } private void smokeTest(ScalarLongColumnAndIndexSupplier supplier, ScalarLongColumn column) diff --git a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java index d72970b3b120..b8684c9d3709 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java @@ -217,30 +217,35 @@ public void testConcurrency() throws ExecutionException, InterruptedException final int threads = 10; ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d") + Execs.multiThreaded(threads, "ScalarStringColumnSupplierTest-%d") ); - Collection> futures = new ArrayList<>(threads); - final CountDownLatch threadsStartLatch = new CountDownLatch(1); - for (int i = 0; i < threads; ++i) { - futures.add( - executorService.submit(() -> { - try { - threadsStartLatch.await(); - for (int iter = 0; iter < 5000; iter++) { - try (StringUtf8DictionaryEncodedColumn column = (StringUtf8DictionaryEncodedColumn) supplier.get()) { - smokeTest(supplier, column); + try { + Collection> futures = new ArrayList<>(threads); + final CountDownLatch threadsStartLatch = new CountDownLatch(1); + for (int i = 0; i < threads; ++i) { + futures.add( + executorService.submit(() -> { + try { + threadsStartLatch.await(); + for (int iter = 0; iter < 5000; iter++) { + try (StringUtf8DictionaryEncodedColumn column = (StringUtf8DictionaryEncodedColumn) supplier.get()) { + smokeTest(supplier, column); + } } } - } - catch (Throwable ex) { - failureReason.set(ex.getMessage()); - } - }) - ); + catch (Throwable ex) { + failureReason.set(ex.getMessage()); + } + }) + ); + } + threadsStartLatch.countDown(); + Futures.allAsList(futures).get(); + Assert.assertEquals(expectedReason, failureReason.get()); + } + finally { + executorService.shutdownNow(); } - threadsStartLatch.countDown(); - Futures.allAsList(futures).get(); - Assert.assertEquals(expectedReason, failureReason.get()); } private void smokeTest(ScalarStringColumnAndIndexSupplier supplier, StringUtf8DictionaryEncodedColumn column) diff --git a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java index 0598552d5193..14ce46521043 100644 --- a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java +++ b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java @@ -346,28 +346,33 @@ public void testConcurrency() throws ExecutionException, InterruptedException ListeningExecutorService executorService = MoreExecutors.listeningDecorator( Execs.multiThreaded(threads, "StandardNestedColumnSupplierTest-%d") ); - Collection> futures = new ArrayList<>(threads); - final CountDownLatch threadsStartLatch = new CountDownLatch(1); - for (int i = 0; i < threads; ++i) { - futures.add( - executorService.submit(() -> { - try { - threadsStartLatch.await(); - for (int iter = 0; iter < 5000; iter++) { - try (VariantColumn column = (VariantColumn) supplier.get()) { - smokeTest(supplier, column, data, expectedTypes); + try { + Collection> futures = new ArrayList<>(threads); + final CountDownLatch threadsStartLatch = new CountDownLatch(1); + for (int i = 0; i < threads; ++i) { + futures.add( + executorService.submit(() -> { + try { + threadsStartLatch.await(); + for (int iter = 0; iter < 5000; iter++) { + try (VariantColumn column = (VariantColumn) supplier.get()) { + smokeTest(supplier, column, data, expectedTypes); + } } } - } - catch (Throwable ex) { - failureReason.set(ex.getMessage()); - } - }) - ); + catch (Throwable ex) { + failureReason.set(ex.getMessage()); + } + }) + ); + } + threadsStartLatch.countDown(); + Futures.allAsList(futures).get(); + Assert.assertEquals(expectedReason, failureReason.get()); + } + finally { + executorService.shutdownNow(); } - threadsStartLatch.countDown(); - Futures.allAsList(futures).get(); - Assert.assertEquals(expectedReason, failureReason.get()); } private void smokeTest(