Skip to content

Commit

Permalink
kafka-streams: consolidate code (#1408)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
codefromthecrypt authored Jan 17, 2024
1 parent b62f266 commit ad678d2
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2013-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave.kafka.streams;

import brave.Span;
import brave.Tracer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.ProcessingContext;

import static brave.internal.Throwables.propagateIfFatal;

abstract class BaseTracingProcessor<C extends ProcessingContext, R, P> {
final KafkaStreamsTracing kafkaStreamsTracing;
final Tracer tracer;
final String spanName;
final P delegate;
C context;

BaseTracingProcessor(KafkaStreamsTracing kafkaStreamsTracing, String spanName, P delegate) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracer;
this.spanName = spanName;
this.delegate = delegate;
}

abstract Headers headers(R record);

abstract void process(P delegate, R record);

public void process(R record) {
Span span = kafkaStreamsTracing.nextSpan(context, headers(record));
if (!span.isNoop()) {
span.name(spanName);
span.start();
}

Tracer.SpanInScope scope = tracer.withSpanInScope(span);
Throwable error = null;
try {
process(delegate, record);
} catch (Throwable e) {
error = e;
propagateIfFatal(e);
throw e;
} finally {
// Inject this span so that the next stage uses it as a parent
kafkaStreamsTracing.injector.inject(span.context(), headers(record));
if (error != null) span.error(error);
span.finish();
scope.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
Expand Down Expand Up @@ -148,18 +147,12 @@ public <KIn, VIn, VOut> FixedKeyProcessorSupplier<KIn, VIn, VOut> processValues(
return new TracingFixedKeyProcessorSupplier<>(this, spanName, processorSupplier);
}

static void addTags(ProcessorContext processorContext, SpanCustomizer result) {
result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, processorContext.applicationId());
result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processorContext.taskId().toString());
static <C extends ProcessingContext> void addTags(C context, SpanCustomizer result) {
result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG, context.applicationId());
result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, context.taskId().toString());
}

static void addTags(ProcessingContext processingContext, SpanCustomizer result) {
result.tag(KafkaStreamsTags.KAFKA_STREAMS_APPLICATION_ID_TAG,
processingContext.applicationId());
result.tag(KafkaStreamsTags.KAFKA_STREAMS_TASK_ID_TAG, processingContext.taskId().toString());
}

Span nextSpan(ProcessingContext context, Headers headers) {
<C extends ProcessingContext> Span nextSpan(C context, Headers headers) {
TraceContextOrSamplingFlags extracted = extractor.extract(headers);
// Clear any propagation keys present in the headers
if (!extracted.equals(emptyExtraction)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,35 @@
*/
package brave.kafka.streams;

import brave.Span;
import brave.Tracer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;

import static brave.internal.Throwables.propagateIfFatal;
class TracingFixedKeyProcessor<KIn, VIn, VOut> extends
BaseTracingProcessor<FixedKeyProcessorContext<KIn, VOut>, FixedKeyRecord<KIn, VIn>, FixedKeyProcessor<KIn, VIn, VOut>>
implements FixedKeyProcessor<KIn, VIn, VOut> {

class TracingFixedKeyProcessor<KIn, VIn, VOut> implements FixedKeyProcessor<KIn, VIn, VOut> {
final KafkaStreamsTracing kafkaStreamsTracing;
final Tracer tracer;
final String spanName;
final FixedKeyProcessor<KIn, VIn, VOut> delegateProcessor;

FixedKeyProcessorContext processorContext;

TracingFixedKeyProcessor(KafkaStreamsTracing kafkaStreamsTracing,
String spanName, FixedKeyProcessor<KIn, VIn, VOut> delegateProcessor) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracer;
this.spanName = spanName;
this.delegateProcessor = delegateProcessor;
TracingFixedKeyProcessor(KafkaStreamsTracing kafkaStreamsTracing, String spanName,
FixedKeyProcessor<KIn, VIn, VOut> delegate) {
super(kafkaStreamsTracing, spanName, delegate);
}

@Override
public void init(FixedKeyProcessorContext<KIn, VOut> context) {
this.processorContext = context;
delegateProcessor.init(processorContext);
@Override Headers headers(FixedKeyRecord<KIn, VIn> record) {
return record.headers();
}

@Override
public void process(FixedKeyRecord<KIn, VIn> record) {
Span span = kafkaStreamsTracing.nextSpan(processorContext, record.headers());
if (!span.isNoop()) {
span.name(spanName);
span.start();
}
void process(FixedKeyProcessor<KIn, VIn, VOut> delegate, FixedKeyRecord<KIn, VIn> record) {
delegate.process(record);
}

Tracer.SpanInScope scope = tracer.withSpanInScope(span);
Throwable error = null;
try {
delegateProcessor.process(record);
} catch (Throwable e) {
error = e;
propagateIfFatal(e);
throw e;
} finally {
// Inject this span so that the next stage uses it as a parent
kafkaStreamsTracing.injector.inject(span.context(), record.headers());
if (error != null) span.error(error);
span.finish();
scope.close();
}
@Override public void init(FixedKeyProcessorContext<KIn, VOut> context) {
this.context = context;
delegate.init(context);
}

@Override
public void close() {
delegateProcessor.close();
@Override public void close() {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,34 @@
*/
package brave.kafka.streams;

import brave.Span;
import brave.Tracer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;

import static brave.internal.Throwables.propagateIfFatal;
final class TracingProcessor<KIn, VIn, KOut, VOut> extends
BaseTracingProcessor<ProcessorContext<KOut, VOut>, Record<KIn, VIn>, Processor<KIn, VIn, KOut, VOut>>
implements Processor<KIn, VIn, KOut, VOut> {

class TracingProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
final KafkaStreamsTracing kafkaStreamsTracing;
final Tracer tracer;
final String spanName;
final Processor<KIn, VIn, KOut, VOut> delegateProcessor;

ProcessorContext processorContext;

TracingProcessor(KafkaStreamsTracing kafkaStreamsTracing,
String spanName, Processor<KIn, VIn, KOut, VOut> delegateProcessor) {
this.kafkaStreamsTracing = kafkaStreamsTracing;
this.tracer = kafkaStreamsTracing.tracer;
this.spanName = spanName;
this.delegateProcessor = delegateProcessor;
TracingProcessor(KafkaStreamsTracing kafkaStreamsTracing, String spanName,
Processor<KIn, VIn, KOut, VOut> delegate) {
super(kafkaStreamsTracing, spanName, delegate);
}

@Override
public void init(ProcessorContext<KOut, VOut> context) {
this.processorContext = context;
delegateProcessor.init(processorContext);
@Override Headers headers(Record<KIn, VIn> record) {
return record.headers();
}

@Override
public void process(Record<KIn, VIn> record) {
Span span = kafkaStreamsTracing.nextSpan(processorContext, record.headers());
if (!span.isNoop()) {
span.name(spanName);
span.start();
}
@Override void process(Processor<KIn, VIn, KOut, VOut> delegate, Record<KIn, VIn> record) {
delegate.process(record);
}

Tracer.SpanInScope scope = tracer.withSpanInScope(span);
Throwable error = null;
try {
delegateProcessor.process(record);
} catch (Throwable e) {
error = e;
propagateIfFatal(e);
throw e;
} finally {
// Inject this span so that the next stage uses it as a parent
kafkaStreamsTracing.injector.inject(span.context(), record.headers());
if (error != null) span.error(error);
span.finish();
scope.close();
}
@Override public void init(ProcessorContext<KOut, VOut> context) {
this.context = context;
delegate.init(context);
}

@Override
public void close() {
delegateProcessor.close();
@Override public void close() {
delegate.close();
}
}

0 comments on commit ad678d2

Please sign in to comment.