Skip to content

Commit

Permalink
Fix CountBoundedQueue draining for oversized spans
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <drreta@gmail.com>
  • Loading branch information
reta committed Apr 11, 2024
1 parent a8ec9f8 commit 6176cab
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,12 @@ int doDrain(SpanWithSizeConsumer<S> consumer) {
int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes);
metrics.incrementSpanBytes(nextSizeInBytes);

if (messageSizeOfNextSpan > messageMaxBytes) break;
if (consumer.offer(next, nextSizeInBytes)) {
if (messageSizeOfNextSpan > messageMaxBytes) {
drainedCount++;
metrics.incrementSpansDropped(1);
elements[readPos] = null; // should have been dropped before
if (++readPos == elements.length) readPos = 0; // circle back to the front of the array
} else if (consumer.offer(next, nextSizeInBytes)) {
drainedCount++;

elements[readPos] = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,26 @@ void report_incrementsSpansDropped(int queuedMaxBytes) {
assertThat(metrics.spans()).isEqualTo(2);
assertThat(metrics.spansDropped()).isEqualTo(1);
}


@ParameterizedTest(name = "queuedMaxBytes={0}")
@ValueSource(ints = { 0, 1000000 })
void report_incrementsSpansDroppedOversizing(int queuedMaxBytes) {
AsyncReporter<Span> reporter = AsyncReporter.newBuilder(FakeSender.create())
.messageMaxBytes(1)
.metrics(metrics)
.queuedMaxBytes(queuedMaxBytes)
.messageTimeout(0, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);

reporter.report(span);
reporter.report(span);
reporter.flush();
reporter.close();

assertThat(metrics.spans()).isEqualTo(2);
assertThat(metrics.spansDropped()).isEqualTo(2);
}

@ParameterizedTest(name = "queuedMaxBytes={0}")
@ValueSource(ints = { 0, 1000000 })
Expand Down

0 comments on commit 6176cab

Please sign in to comment.