From f30ee664716116cc28b245b302c05cf906fed304 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 12 Apr 2024 13:18:20 -0400 Subject: [PATCH] AsyncReporter/SpanHandler: make queuedMaxBytes=0 disable pre-flight size checks (#260) Signed-off-by: Andriy Redko Signed-off-by: Adrian Cole Co-authored-by: Adrian Cole --- .../internal/AsyncReporterBenchmarks.java | 15 +- .../internal/BoundedQueueBenchmarks.java | 146 +++++++++++++++ .../internal/ByteBoundedQueueBenchmarks.java | 2 +- .../reporter/internal/AsyncReporter.java | 29 ++- .../reporter/internal/BoundedQueue.java | 49 +++++ .../reporter/internal/ByteBoundedQueue.java | 41 ++++- .../reporter/internal/CountBoundedQueue.java | 136 ++++++++++++++ .../reporter/internal/AsyncReporterTest.java | 172 +++++++++++++----- .../internal/ByteBoundedQueueTest.java | 6 +- .../internal/CountBoundedQueueTest.java | 64 +++++++ 10 files changed, 588 insertions(+), 72 deletions(-) create mode 100644 benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java create mode 100644 core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java create mode 100644 core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java create mode 100644 core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java diff --git a/benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java b/benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java index 7b0ff0d2..34b73d7e 100644 --- a/benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java +++ b/benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java @@ -6,6 +6,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -24,6 +26,7 @@ import org.openjdk.jmh.annotations.Warmup; import zipkin2.Span; import zipkin2.TestObjects; +import zipkin2.reporter.BytesEncoder; import zipkin2.reporter.Encoding; import zipkin2.reporter.InMemoryReporterMetrics; import zipkin2.reporter.SpanBytesEncoder; @@ -42,6 +45,9 @@ public class AsyncReporterBenchmarks { @Param public Encoding encoding; + @Param({"0", "20000000"}) + public int maxBytes; + @AuxCounters @State(Scope.Thread) public static class InMemoryReporterMetricsAsCounters { @@ -77,10 +83,17 @@ public void clean() { @Setup(Level.Trial) public void setup() { + final BytesEncoder encoder = Stream + .of(SpanBytesEncoder.JSON_V2, SpanBytesEncoder.PROTO3, SpanBytesEncoder.THRIFT) + .filter(e -> e.encoding().equals(encoding)) + .findAny() + .orElseThrow(() -> new IllegalStateException("Unable to find BytesEncoder for " + encoding)); + reporter = AsyncReporter.newBuilder(new NoopSender(encoding)) .messageMaxBytes(1000000) // example default from Kafka message.max.bytes + .queuedMaxBytes(maxBytes) .metrics(metrics) - .build(SpanBytesEncoder.JSON_V2); + .build(encoder); } @Benchmark @Group("no_contention") @GroupThreads(1) diff --git a/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java b/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java new file mode 100644 index 00000000..4162d2fe --- /dev/null +++ b/benchmarks/src/test/java/zipkin2/reporter/internal/BoundedQueueBenchmarks.java @@ -0,0 +1,146 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.reporter.internal; + +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@Measurement(iterations = 5, time = 1) +@Warmup(iterations = 10, time = 1) +@Fork(3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@State(Scope.Group) +public class BoundedQueueBenchmarks { + static final byte ONE = 1; + + @Param( {"0", "10000"}) + public int maxBytes; + + @AuxCounters + @State(Scope.Thread) + public static class OfferCounters { + public int offersFailed; + public int offersMade; + + @Setup(Level.Iteration) + public void clean() { + offersFailed = offersMade = 0; + } + } + + @AuxCounters + @State(Scope.Thread) + public static class DrainCounters { + public int drained; + + @Setup(Level.Iteration) + public void clean() { + drained = 0; + } + } + + private static ThreadLocal marker = new ThreadLocal<>(); + + @State(Scope.Thread) + public static class ConsumerMarker { + public ConsumerMarker() { + marker.set(this); + } + } + + BoundedQueue q; + + @Setup + public void setup() { + q = BoundedQueue.create(null, null, null, 10000, 10000, maxBytes); + } + + @Benchmark @Group("no_contention") @GroupThreads(1) + public void no_contention_offer(OfferCounters counters) { + if (q.offer(ONE, 1)) { + counters.offersMade++; + } else { + counters.offersFailed++; + } + } + + @Benchmark @Group("no_contention") @GroupThreads(1) + public void no_contention_drain(DrainCounters counters, ConsumerMarker cm) { + q.drainTo((s, b) -> { + counters.drained++; + return true; + }, 1000); + } + + @Benchmark @Group("mild_contention") @GroupThreads(2) + public void mild_contention_offer(OfferCounters counters) { + if (q.offer(ONE, 1)) { + counters.offersMade++; + } else { + counters.offersFailed++; + } + } + + @Benchmark @Group("mild_contention") @GroupThreads(1) + public void mild_contention_drain(DrainCounters counters, ConsumerMarker cm) { + q.drainTo((s, b) -> { + counters.drained++; + return true; + }, 1000); + } + + @Benchmark @Group("high_contention") @GroupThreads(8) + public void high_contention_offer(OfferCounters counters) { + if (q.offer(ONE, 1)) { + counters.offersMade++; + } else { + counters.offersFailed++; + } + } + + @Benchmark @Group("high_contention") @GroupThreads(1) + public void high_contention_drain(DrainCounters counters, ConsumerMarker cm) { + q.drainTo((s, b) -> { + counters.drained++; + return true; + }, 1000); + } + + @TearDown(Level.Iteration) + public void emptyQ() { + // If this thread didn't drain, return + if (marker.get() == null) return; + q.clear(); + } + + // Convenience main entry-point + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(".*" + BoundedQueueBenchmarks.class.getSimpleName() + ".*") + .build(); + + new Runner(opt).run(); + } +} diff --git a/benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java b/benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java index 5a4ada72..8744a26c 100644 --- a/benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java +++ b/benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java @@ -70,7 +70,7 @@ public ConsumerMarker() { @Setup public void setup() { - q = new ByteBoundedQueue<>(10000, 10000); + q = new ByteBoundedQueue<>(null, null, null, 10000, 10000, 10000); } @Benchmark @Group("no_contention") @GroupThreads(1) diff --git a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java index 093ce106..4ebbc17d 100644 --- a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java @@ -45,7 +45,8 @@ * @param type of the span, usually {@code zipkin2.Span} * @since 3.0 */ -public abstract class AsyncReporter extends Component implements Reporter, Closeable, Flushable { +public abstract class AsyncReporter extends Component + implements Reporter, Closeable, Flushable { public static Builder newBuilder(BytesMessageSender sender) { return new Builder(sender); } @@ -82,8 +83,8 @@ public static final class Builder { this.messageMaxBytes = asyncReporter.messageMaxBytes; this.messageTimeoutNanos = asyncReporter.messageTimeoutNanos; this.closeTimeoutNanos = asyncReporter.closeTimeoutNanos; - this.queuedMaxSpans = asyncReporter.pending.maxSize; - this.queuedMaxBytes = asyncReporter.pending.maxBytes; + this.queuedMaxSpans = asyncReporter.pending.maxSize(); + this.queuedMaxBytes = asyncReporter.queuedMaxBytes; } static int onePercentOfMemory() { @@ -181,8 +182,9 @@ static final class BoundedAsyncReporter extends AsyncReporter { static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName()); final AtomicBoolean started, closed; final BytesEncoder encoder; - final ByteBoundedQueue pending; + final BoundedQueue pending; final BytesMessageSender sender; + final int queuedMaxBytes; final int messageMaxBytes; final long messageTimeoutNanos, closeTimeoutNanos; final CountDownLatch close; @@ -193,8 +195,10 @@ static final class BoundedAsyncReporter extends AsyncReporter { private boolean shouldWarnException = true; BoundedAsyncReporter(Builder builder, BytesEncoder encoder) { - this.pending = new ByteBoundedQueue(builder.queuedMaxSpans, builder.queuedMaxBytes); + this.pending = BoundedQueue.create(encoder, builder.sender, builder.metrics, + builder.messageMaxBytes, builder.queuedMaxSpans, builder.queuedMaxBytes); this.sender = builder.sender; + this.queuedMaxBytes = builder.queuedMaxBytes; this.messageMaxBytes = builder.messageMaxBytes; this.messageTimeoutNanos = builder.messageTimeoutNanos; this.closeTimeoutNanos = builder.closeTimeoutNanos; @@ -216,18 +220,15 @@ void startFlusherThread() { flushThread.start(); } + @SuppressWarnings("unchecked") @Override public void report(S next) { if (next == null) throw new NullPointerException("span == null"); // Lazy start so that reporters never used don't spawn threads if (started.compareAndSet(false, true)) startFlusherThread(); metrics.incrementSpans(1); - int nextSizeInBytes = encoder.sizeInBytes(next); - int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); - metrics.incrementSpanBytes(nextSizeInBytes); - if (closed.get() || - // don't enqueue something larger than we can drain - messageSizeOfNextSpan > messageMaxBytes || - !pending.offer(next, nextSizeInBytes)) { + + // enqueue now and filter our when we drain + if (closed.get() || !pending.offer(next)) { metrics.incrementSpansDropped(1); } } @@ -240,10 +241,6 @@ void startFlusherThread() { void flush(BufferNextMessage bundler) { pending.drainTo(bundler, bundler.remainingNanos()); - // record after flushing reduces the amount of gauge events vs on doing this on report - metrics.updateQueuedSpans(pending.count); - metrics.updateQueuedBytes(pending.sizeInBytes); - // loop around if we are running, and the bundle isn't full // if we are closed, try to send what's pending if (!bundler.isReady() && !closed.get()) return; diff --git a/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java new file mode 100644 index 00000000..2ca40683 --- /dev/null +++ b/core/src/main/java/zipkin2/reporter/internal/BoundedQueue.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package zipkin2.reporter.internal; + +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ReporterMetrics; + +/** + * Multi-producer, multi-consumer queue that could be bounded by count or/and size. + */ +abstract class BoundedQueue implements SpanWithSizeConsumer { + static BoundedQueue create(BytesEncoder encoder, BytesMessageSender sender, + ReporterMetrics metrics, int messageMaxBytes, int maxSize, int maxBytes) { + if (maxBytes > 0) { + return new ByteBoundedQueue(encoder, sender, metrics, messageMaxBytes, maxSize, maxBytes); + } else { + return new CountBoundedQueue(encoder, sender, metrics, messageMaxBytes, maxSize); + } + } + + /** + * Max element's count of this bounded queue + */ + abstract int maxSize(); + + /** + * Clear this bounded queue + */ + abstract int clear(); + + /** + * Drains this bounded queue. Blocks for up to nanosTimeout for spans to appear. + * Then, consume as many as possible. + */ + abstract int drainTo(SpanWithSizeConsumer bundler, long remainingNanos); + + /** Returns true if the element could be added or false if it could not. */ + abstract boolean offer(S next); +} + +interface SpanWithSizeConsumer { + /** Returns true if the element could be added or false if it could not due to its size. */ + boolean offer(S next, int nextSizeInBytes); +} + diff --git a/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java index cedbbe60..9e875750 100644 --- a/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java +++ b/core/src/main/java/zipkin2/reporter/internal/ByteBoundedQueue.java @@ -7,13 +7,20 @@ import java.util.Arrays; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ReporterMetrics; /** * Multi-producer, multi-consumer queue that is bounded by both count and size. * *

This is similar to {@link java.util.concurrent.ArrayBlockingQueue} in implementation. */ -final class ByteBoundedQueue implements SpanWithSizeConsumer { +final class ByteBoundedQueue extends BoundedQueue { + final BytesEncoder encoder; + final BytesMessageSender sender; + final ReporterMetrics metrics; + final int messageMaxBytes; final ReentrantLock lock = new ReentrantLock(false); final Condition available = lock.newCondition(); @@ -28,7 +35,13 @@ final class ByteBoundedQueue implements SpanWithSizeConsumer { int writePos; int readPos; - @SuppressWarnings("unchecked") ByteBoundedQueue(int maxSize, int maxBytes) { + @SuppressWarnings("unchecked") ByteBoundedQueue(BytesEncoder encoder, + BytesMessageSender sender, ReporterMetrics metrics, int messageMaxBytes, int maxSize, + int maxBytes) { + this.encoder = encoder; + this.sender = sender; + this.metrics = metrics; + this.messageMaxBytes = messageMaxBytes; this.elements = (S[]) new Object[maxSize]; this.sizesInBytes = new int[maxSize]; this.maxSize = maxSize; @@ -60,7 +73,7 @@ final class ByteBoundedQueue implements SpanWithSizeConsumer { } /** Blocks for up to nanosTimeout for spans to appear. Then, consume as many as possible. */ - int drainTo(SpanWithSizeConsumer consumer, long nanosTimeout) { + @Override int drainTo(SpanWithSizeConsumer consumer, long nanosTimeout) { try { // This may be called by multiple threads. If one is holding a lock, another is waiting. We // use lockInterruptibly to ensure the one waiting can be interrupted. @@ -77,11 +90,24 @@ int drainTo(SpanWithSizeConsumer consumer, long nanosTimeout) { } } catch (InterruptedException e) { return 0; + } finally { + // record after draining reduces the amount of gauge events vs on doing this on report + metrics.updateQueuedSpans(count); + metrics.updateQueuedBytes(sizeInBytes); } } + @Override boolean offer(S next) { + int nextSizeInBytes = encoder.sizeInBytes(next); + int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); + metrics.incrementSpanBytes(nextSizeInBytes); + // don't enqueue something larger than we can drain + if (messageSizeOfNextSpan > messageMaxBytes) return false; + return offer(next, nextSizeInBytes); + } + /** Clears the queue unconditionally and returns count of spans cleared. */ - int clear() { + @Override int clear() { lock.lock(); try { int result = count; @@ -115,9 +141,8 @@ int doDrain(SpanWithSizeConsumer consumer) { sizeInBytes -= drainedSizeInBytes; return drainedCount; } -} -interface SpanWithSizeConsumer { - /** Returns true if the element could be added or false if it could not due to its size. */ - boolean offer(S next, int nextSizeInBytes); + @Override int maxSize() { + return maxSize; + } } diff --git a/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java new file mode 100644 index 00000000..2127d5fe --- /dev/null +++ b/core/src/main/java/zipkin2/reporter/internal/CountBoundedQueue.java @@ -0,0 +1,136 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.reporter.internal; + +import java.util.Arrays; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ReporterMetrics; + +/** + * Multi-producer, multi-consumer queue that is bounded by count. + * + *

This is similar to {@link java.util.concurrent.ArrayBlockingQueue} in implementation. + */ +final class CountBoundedQueue extends BoundedQueue { + + final ReentrantLock lock = new ReentrantLock(false); + final Condition available = lock.newCondition(); + + final BytesEncoder encoder; + final BytesMessageSender sender; + final ReporterMetrics metrics; + final int messageMaxBytes; + final int maxSize; + + final S[] elements; + int count; + int writePos; + int readPos; + + @SuppressWarnings("unchecked") CountBoundedQueue(BytesEncoder encoder, + BytesMessageSender sender, ReporterMetrics metrics, int messageMaxBytes, int maxSize) { + this.encoder = encoder; + this.sender = sender; + this.metrics = metrics; + this.messageMaxBytes = messageMaxBytes; + this.elements = (S[]) new Object[maxSize]; + this.maxSize = maxSize; + } + + @Override public boolean offer(S next, int nextSizeInBytes) { + return offer(next); + } + + /** + * Returns true if the element could be added or false if it could not due to its size. + */ + @Override public boolean offer(S next) { + lock.lock(); + try { + if (count == maxSize) return false; + + elements[writePos++] = next; + + if (writePos == maxSize) writePos = 0; // circle back to the front of the array + + count++; + + available.signal(); // alert any drainers + return true; + } finally { + lock.unlock(); + } + } + + /** Blocks for up to nanosTimeout for spans to appear. Then, consume as many as possible. */ + @Override int drainTo(SpanWithSizeConsumer consumer, long nanosTimeout) { + try { + // This may be called by multiple threads. If one is holding a lock, another is waiting. We + // use lockInterruptibly to ensure the one waiting can be interrupted. + lock.lockInterruptibly(); + try { + long nanosLeft = nanosTimeout; + while (count == 0) { + if (nanosLeft <= 0) return 0; + nanosLeft = available.awaitNanos(nanosLeft); + } + return doDrain(consumer); + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + return 0; + } finally { + // record after draining reduces the amount of gauge events vs on doing this on report + metrics.updateQueuedSpans(count); + } + } + + /** Clears the queue unconditionally and returns count of spans cleared. */ + @Override public int clear() { + lock.lock(); + try { + int result = count; + count = readPos = writePos = 0; + Arrays.fill(elements, null); + return result; + } finally { + lock.unlock(); + } + } + + int doDrain(SpanWithSizeConsumer consumer) { + int drainedCount = 0; + while (drainedCount < count) { + S next = elements[readPos]; + + if (next == null) break; + + int nextSizeInBytes = encoder.sizeInBytes(next); + int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes); + metrics.incrementSpanBytes(nextSizeInBytes); + + if (messageSizeOfNextSpan > messageMaxBytes) { + metrics.incrementSpansDropped(1); + } else if (!consumer.offer(next, nextSizeInBytes)) { + break; + } + + drainedCount++; + elements[readPos] = null; + if (++readPos == elements.length) readPos = 0; // circle back to the front of the array + } + count -= drainedCount; + return drainedCount; + } + + @Override int maxSize() { + return maxSize; + } +} diff --git a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java index b132cbf1..ee6a16d7 100644 --- a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java @@ -16,8 +16,10 @@ import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + import zipkin2.Span; import zipkin2.TestObjects; import zipkin2.reporter.BytesEncoder; @@ -39,102 +41,147 @@ class AsyncReporterTest { Encoding.JSON.listSizeInBytes( Collections.singletonList(SpanBytesEncoder.JSON_V2.encode(span))); - AsyncReporter reporter; InMemoryReporterMetrics metrics = new InMemoryReporterMetrics(); - @AfterEach void close() { - if (reporter != null) reporter.close(); - } - - @Test void messageMaxBytes_defaultsToSender() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageMaxBytes_defaultsToSender(int queuedMaxBytes) { AtomicInteger sentSpans = new AtomicInteger(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.addAndGet(spans.size())) - .messageMaxBytes(sizeInBytesOfSingleSpanMessage)) + .messageMaxBytes(sizeInBytesOfSingleSpanMessage)) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); // drops reporter.flush(); + reporter.close(); assertThat(sentSpans.get()).isEqualTo(1); } - @Test void messageMaxBytes_dropsWhenOverqueuing() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageMaxBytes_dropsWhenOverqueuing(int queuedMaxBytes) { AtomicInteger sentSpans = new AtomicInteger(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.addAndGet(spans.size()))) .messageMaxBytes(sizeInBytesOfSingleSpanMessage) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); // dropped the one that queued more than allowed bytes reporter.flush(); + reporter.close(); assertThat(sentSpans.get()).isEqualTo(1); } - @Test void messageMaxBytes_dropsWhenTooLarge() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageMaxBytes_dropsWhenTooLarge(int queuedMaxBytes) { AtomicInteger sentSpans = new AtomicInteger(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.addAndGet(spans.size()))) .messageMaxBytes(sizeInBytesOfSingleSpanMessage) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span.toBuilder().addAnnotation(1L, "fooooo").build()); reporter.flush(); + reporter.close(); assertThat(sentSpans.get()).isEqualTo(0); } - @Test void queuedMaxSpans_dropsWhenOverqueuing() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void queuedMaxSpans_dropsWhenOverqueuing(int queuedMaxBytes) { AtomicInteger sentSpans = new AtomicInteger(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.addAndGet(spans.size()))) .queuedMaxSpans(1) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); // dropped the one that queued more than allowed count reporter.flush(); - + reporter.close(); + assertThat(sentSpans.get()).isEqualTo(1); } - @Test void report_incrementsMetrics() { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void report_incrementsMetrics(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); + reporter.flush(); + reporter.close(); + assertThat(metrics.spans()).isEqualTo(2); assertThat(metrics.spanBytes()).isEqualTo(SpanBytesEncoder.JSON_V2.encode(span).length * 2); } - @Test void report_incrementsSpansDropped() { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void report_incrementsSpansDropped(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .queuedMaxSpans(1) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.report(span); + reporter.flush(); + reporter.close(); assertThat(metrics.spans()).isEqualTo(2); assertThat(metrics.spansDropped()).isEqualTo(1); } + + + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void report_incrementsSpansDroppedOversizing(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) + .messageMaxBytes(1) + .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) + .messageTimeout(0, TimeUnit.MILLISECONDS) + .build(SpanBytesEncoder.JSON_V2); - @Test void flush_incrementsMetrics() { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + reporter.report(span); + reporter.report(span); + reporter.flush(); + reporter.close(); + + assertThat(metrics.spans()).isEqualTo(2); + assertThat(metrics.spansDropped()).isEqualTo(2); + } + + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void flush_incrementsMetrics(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) .messageMaxBytes(sizeInBytesOfSingleSpanMessage) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -144,33 +191,40 @@ class AsyncReporterTest { reporter.flush(); assertThat(metrics.queuedSpans()).isEqualTo(1); // still one span in the backlog - assertThat(metrics.queuedBytes()).isEqualTo(SpanBytesEncoder.JSON_V2.encode(span).length); + assertThat(metrics.queuedBytes()).isEqualTo(queuedMaxBytes > 0 ? SpanBytesEncoder.JSON_V2.encode(span).length : 0); assertThat(metrics.messages()).isEqualTo(1); assertThat(metrics.messageBytes()).isEqualTo(sizeInBytesOfSingleSpanMessage); reporter.flush(); + reporter.close(); assertThat(metrics.queuedSpans()).isZero(); assertThat(metrics.queuedBytes()).isZero(); assertThat(metrics.messages()).isEqualTo(2); assertThat(metrics.messageBytes()).isEqualTo(sizeInBytesOfSingleSpanMessage * 2); } - @Test void flush_incrementsMessagesDropped() { - reporter = AsyncReporter.newBuilder(FakeSender.create() + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void flush_incrementsMessagesDropped(int queuedMaxBytes) { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> { throw new RuntimeException(); })) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); reporter.flush(); + reporter.close(); assertThat(metrics.messagesDropped()).isEqualTo(1); } - @Test void flush_logsFirstErrorAsWarn() { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void flush_logsFirstErrorAsWarn(int queuedMaxBytes) { List logRecords = new ArrayList<>(); Handler testHandler = new Handler() { @Override @@ -191,10 +245,11 @@ public void close() throws SecurityException { logger.addHandler(testHandler); logger.setLevel(Level.FINE); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> { throw new RuntimeException(); })) + .queuedMaxBytes(queuedMaxBytes) .build(SpanBytesEncoder.JSON_V2); reporter.report(span); @@ -211,13 +266,18 @@ public void close() throws SecurityException { assertThat(logRecords.get(2).getLevel()).isEqualTo(Level.FINE); assertThat(logRecords.get(2).getMessage()).contains("RuntimeException"); + + reporter.close(); } /** It can take up to the messageTimeout past the first span to send */ - @Test void messageTimeout_flushesWhenTimeoutExceeded() throws InterruptedException { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageTimeout_flushesWhenTimeoutExceeded(int queuedMaxBytes) throws InterruptedException { CountDownLatch sentSpans = new CountDownLatch(1); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.countDown())) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(10, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -226,12 +286,17 @@ public void close() throws SecurityException { .isFalse(); assertThat(sentSpans.await(10, TimeUnit.MILLISECONDS)) .isTrue(); + + reporter.close(); } - @Test void messageTimeout_disabled() throws InterruptedException { + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void messageTimeout_disabled(int queuedMaxBytes) throws InterruptedException { CountDownLatch sentSpans = new CountDownLatch(1); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> sentSpans.countDown())) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(0, TimeUnit.NANOSECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -241,11 +306,13 @@ public void close() throws SecurityException { // Since no threads started, the above lingers assertThat(sentSpans.await(10, TimeUnit.MILLISECONDS)) .isFalse(); + + reporter.close(); } @Test void senderThread_threadHasAPrettyName() throws InterruptedException { BlockingQueue threadName = new LinkedBlockingQueue<>(); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> threadName.offer(Thread.currentThread().getName()))) .build(SpanBytesEncoder.JSON_V2); @@ -254,10 +321,12 @@ public void close() throws SecurityException { // check name is pretty assertThat(threadName.take()) .isEqualTo("AsyncReporter{FakeSender}"); + + reporter.close(); } @Test void close_close_stopsFlushThread() throws InterruptedException { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) .messageTimeout(2, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -273,11 +342,13 @@ public void close() throws SecurityException { BoundedAsyncReporter impl = (BoundedAsyncReporter) reporter; assertThat(impl.close.await(3, TimeUnit.MILLISECONDS)) .isTrue(); + + reporter.close(); } @Test void flush_throwsOnClose() { assertThrows(IllegalStateException.class, () -> { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -289,7 +360,7 @@ public void close() throws SecurityException { } @Test void report_doesntThrowWhenClosed() { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -310,7 +381,7 @@ public void close() throws SecurityException { }); @Test void senderThread_dropsOnSenderClose_flushThread() throws InterruptedException { - reporter = AsyncReporter.newBuilder(sleepingSender) + AsyncReporter reporter = AsyncReporter.newBuilder(sleepingSender) .metrics(metrics) .messageMaxBytes(sizeInBytesOfSingleSpanMessage) .build(SpanBytesEncoder.JSON_V2); @@ -325,12 +396,14 @@ public void close() throws SecurityException { assertThat(metrics.messagesDropped()).isEqualTo(1); assertThat(metrics.messagesDroppedByCause().keySet().iterator().next()) .isEqualTo(ClosedSenderException.class); + + reporter.close(); } @Test void senderThread_dropsOnReporterClose_flushThread() throws InterruptedException { CountDownLatch received = new CountDownLatch(1); CountDownLatch sent = new CountDownLatch(1); - reporter = AsyncReporter.newBuilder(FakeSender.create() + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> { received.countDown(); try { @@ -352,9 +425,12 @@ public void close() throws SecurityException { assertThat(metrics.spansDropped()).isEqualTo(1); } - @Test void blocksToClearPendingSpans() throws InterruptedException { - reporter = AsyncReporter.newBuilder(FakeSender.create()) + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void blocksToClearPendingSpans(int queuedMaxBytes) throws InterruptedException { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .messageTimeout(30, TimeUnit.SECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -367,8 +443,10 @@ public void close() throws SecurityException { assertThat(metrics.spansDropped()).isEqualTo(0); } - @Test void quitsBlockingWhenOverTimeout() throws InterruptedException { - reporter = AsyncReporter.newBuilder(FakeSender.create() + @ParameterizedTest(name = "queuedMaxBytes={0}") + @ValueSource(ints = { 0, 1000000 }) + void quitsBlockingWhenOverTimeout(int queuedMaxBytes) throws InterruptedException { + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create() .onSpans(spans -> { // note: we don't yet have a hook to cancel a sender, so this will remain in-flight // eventhough we are unblocking close. A later close on sender usually will kill in-flight @@ -379,6 +457,7 @@ public void close() throws SecurityException { } })) .metrics(metrics) + .queuedMaxBytes(queuedMaxBytes) .closeTimeout(1, TimeUnit.NANOSECONDS) .messageTimeout(30, TimeUnit.SECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -391,10 +470,12 @@ public void close() throws SecurityException { reporter.close(); // close while there's a pending span assertThat(System.nanoTime() - start) .isLessThan(TimeUnit.MILLISECONDS.toNanos(10)); // give wiggle room + + reporter.close(); } @Test void flush_incrementsMetricsAndThrowsWhenClosed() { - reporter = AsyncReporter.newBuilder(sleepingSender) + AsyncReporter reporter = AsyncReporter.newBuilder(sleepingSender) .metrics(metrics) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -411,7 +492,7 @@ public void close() throws SecurityException { } @Test void flush_incrementsMetricsAndThrowsWhenSenderClosed() { - reporter = AsyncReporter.newBuilder(sleepingSender) + AsyncReporter reporter = AsyncReporter.newBuilder(sleepingSender) .metrics(metrics) .messageTimeout(0, TimeUnit.MILLISECONDS) .build(SpanBytesEncoder.JSON_V2); @@ -425,12 +506,14 @@ public void close() throws SecurityException { } catch (IllegalStateException e) { assertThat(metrics.spansDropped()).isEqualTo(1); assertThat(metrics.messagesDropped()).isEqualTo(1); + } finally { + reporter.close(); } } @Test void build_threadFactory() { Thread thread = new Thread(); - reporter = AsyncReporter.newBuilder(FakeSender.create()) + AsyncReporter reporter = AsyncReporter.newBuilder(FakeSender.create()) .threadFactory(r -> thread) .build(SpanBytesEncoder.JSON_V2); @@ -442,6 +525,7 @@ public void close() throws SecurityException { assertThat(thread.toString()).contains("AsyncReporter{FakeSender}"); assertThat(thread.isDaemon()).isTrue(); + reporter.close(); thread.interrupt(); } diff --git a/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java b/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java index 984a790c..8cefef06 100644 --- a/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueTest.java @@ -8,10 +8,12 @@ import java.util.List; import org.junit.jupiter.api.Test; +import zipkin2.reporter.ReporterMetrics; + import static org.assertj.core.api.Assertions.assertThat; class ByteBoundedQueueTest { - ByteBoundedQueue queue = new ByteBoundedQueue<>(10, 10); + ByteBoundedQueue queue = new ByteBoundedQueue<>(null, null, ReporterMetrics.NOOP_METRICS, 10, 10, 10); @Test void offer_failsWhenFull_size() { for (int i = 0; i < queue.maxSize; i++) { @@ -40,7 +42,7 @@ class ByteBoundedQueueTest { } @Test void circular() { - ByteBoundedQueue queue = new ByteBoundedQueue<>(10, 10); + ByteBoundedQueue queue = new ByteBoundedQueue<>(null, null, ReporterMetrics.NOOP_METRICS, 10, 10, 10); List polled = new ArrayList<>(); SpanWithSizeConsumer consumer = (next, ignored) -> polled.add(next); diff --git a/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java b/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java new file mode 100644 index 00000000..36e15d2d --- /dev/null +++ b/core/src/test/java/zipkin2/reporter/internal/CountBoundedQueueTest.java @@ -0,0 +1,64 @@ +/* + * Copyright The OpenZipkin Authors + * SPDX-License-Identifier: Apache-2.0 + */ +package zipkin2.reporter.internal; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.Encoding; +import zipkin2.reporter.FakeSender; +import zipkin2.reporter.ReporterMetrics; +import zipkin2.reporter.SpanBytesEncoder; + +import static org.assertj.core.api.Assertions.assertThat; + +class CountBoundedQueueTest { + CountBoundedQueue queue = new CountBoundedQueue<>(null, null, ReporterMetrics.NOOP_METRICS, 10, 10); + + @Test void offer_failsWhenFull_size() { + for (int i = 0; i < queue.maxSize; i++) { + assertThat(queue.offer(new byte[1], 1)).isTrue(); + } + assertThat(queue.offer(new byte[1], 1)).isFalse(); + } + + @Test void offer_updatesCount() { + for (int i = 0; i < queue.maxSize; i++) { + queue.offer(new byte[1], 1); + } + assertThat(queue.count).isEqualTo(10); + } + + @Test void circular() { + CountBoundedQueue queue = new CountBoundedQueue<>(new BytesEncoder() { + @Override public Encoding encoding() { + throw new UnsupportedOperationException(); + } + + @Override public int sizeInBytes(Integer input) { + return 4; + } + + @Override public byte[] encode(Integer input) { + throw new UnsupportedOperationException(); + } + }, FakeSender.create(), ReporterMetrics.NOOP_METRICS, 10, 10); + + List polled = new ArrayList<>(); + SpanWithSizeConsumer consumer = (next, ignored) -> polled.add(next); + + // Offer more than the capacity, flushing via poll on interval + for (int i = 0; i < 15; i++) { + queue.offer(i, 1); + queue.drainTo(consumer, 1); + } + + // ensure we have all of the spans + assertThat(polled) + .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14); + } +}