Skip to content

Commit

Permalink
[Tracing Framework] Redefine telemetry context restoration and propag…
Browse files Browse the repository at this point in the history
…ation (#9617) (#9701)

* Add SpanBuilder support



* Refactor code



* Redefine telemetry context restoration



* Update changelog



* Stores the SpanScope in ThreadLocal



* Revert the context name changes



* Change the span::endSpan and SpanScope::close behaviour



* Supressed warnings



* Add more test cases



* Address review comment



* Address review comment



* Fix java doc



* Address review comment



* Fix failing test



* Empty-Commit



* Empty-Commit



---------

Signed-off-by: Gagan Juneja <gjjuneja@amazon.com>
Signed-off-by: Gagan Juneja <gagandeepjuneja@gmail.com>
Co-authored-by: Gagan Juneja <gjjuneja@amazon.com>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored Sep 2, 2023
1 parent fada3f0 commit 776d77c
Show file tree
Hide file tree
Showing 27 changed files with 1,238 additions and 232 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Add support to use trace propagated from client ([#9506](https://github.com/opensearch-project/OpenSearch/pull/9506))
- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469))
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Redefine telemetry context restoration and propagation ([#9617](https://github.com/opensearch-project/OpenSearch/pull/9617))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing;

import java.util.Objects;

/**
* Default implementation of Scope
*
* @opensearch.internal
*/
final class DefaultScopedSpan implements ScopedSpan {

private final Span span;

private final SpanScope spanScope;

/**
* Creates Scope instance for the given span
*
* @param span underlying span
* @param spanScope span scope.
*/
public DefaultScopedSpan(Span span, SpanScope spanScope) {
this.span = Objects.requireNonNull(span);
this.spanScope = Objects.requireNonNull(spanScope);
}

@Override
public void addAttribute(String key, String value) {
span.addAttribute(key, value);
}

@Override
public void addAttribute(String key, long value) {
span.addAttribute(key, value);
}

@Override
public void addAttribute(String key, double value) {
span.addAttribute(key, value);
}

@Override
public void addAttribute(String key, boolean value) {
span.addAttribute(key, value);
}

@Override
public void addEvent(String event) {
span.addEvent(event);
}

@Override
public void setError(Exception exception) {
span.setError(exception);
}

/**
* Executes the runnable to end the scope
*/
@Override
public void close() {
span.endSpan();
spanScope.close();
}

/**
* Returns span.
* @return
*/
Span getSpan() {
return span;
}

/**
* Returns {@link SpanScope}
* @return spanScope
*/
SpanScope getSpanScope() {
return spanScope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,68 @@

package org.opensearch.telemetry.tracing;

import java.util.function.Consumer;
import java.util.Objects;

/**
* Default implementation of Scope
*
* @opensearch.internal
* Default implementation for {@link SpanScope}
*/
final class DefaultSpanScope implements SpanScope {

public class DefaultSpanScope implements SpanScope {
private final Span span;

private final Consumer<Span> onCloseConsumer;
private final SpanScope previousSpanScope;
private static final ThreadLocal<SpanScope> spanScopeThreadLocal = new ThreadLocal<>();
private final TracerContextStorage<String, Span> tracerContextStorage;

/**
* Creates Scope instance for the given span
*
* @param span underlying span
* @param onCloseConsumer consumer to execute on scope close
* Constructor
* @param span span
* @param previousSpanScope before attached span scope.
*/
public DefaultSpanScope(Span span, Consumer<Span> onCloseConsumer) {
this.span = span;
this.onCloseConsumer = onCloseConsumer;
private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextStorage<String, Span> tracerContextStorage) {
this.span = Objects.requireNonNull(span);
this.previousSpanScope = previousSpanScope;
this.tracerContextStorage = tracerContextStorage;
}

@Override
public void addSpanAttribute(String key, String value) {
span.addAttribute(key, value);
/**
* Creates the SpanScope object.
* @param span span.
* @param tracerContextStorage tracer context storage.
* @return SpanScope spanScope
*/
public static SpanScope create(Span span, TracerContextStorage<String, Span> tracerContextStorage) {
final SpanScope beforeSpanScope = spanScopeThreadLocal.get();
SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage);
spanScopeThreadLocal.set(newSpanScope);
return newSpanScope;
}

@Override
public void addSpanAttribute(String key, long value) {
span.addAttribute(key, value);
public void close() {
detach();
spanScopeThreadLocal.set(previousSpanScope);
}

@Override
public void addSpanAttribute(String key, double value) {
span.addAttribute(key, value);
public SpanScope attach() {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span);
return this;
}

@Override
public void addSpanAttribute(String key, boolean value) {
span.addAttribute(key, value);
private void detach() {
if (previousSpanScope != null) {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan());
} else {
tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, null);
}
}

@Override
public void addSpanEvent(String event) {
span.addEvent(event);
public Span getSpan() {
return span;
}

@Override
public void setError(Exception exception) {
span.setError(exception);
static SpanScope getCurrentSpanScope() {
return spanScopeThreadLocal.get();
}

/**
* Executes the runnable to end the scope
*/
@Override
public void close() {
onCloseConsumer.accept(span);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,22 @@ public DefaultTracer(TracingTelemetry tracingTelemetry, TracerContextStorage<Str
}

@Override
public SpanScope startSpan(String spanName) {
public Span startSpan(SpanCreationContext context) {
return startSpan(context.getSpanName(), context.getAttributes());
}

@Override
public Span startSpan(String spanName) {
return startSpan(spanName, Attributes.EMPTY);
}

@Override
public SpanScope startSpan(String spanName, Attributes attributes) {
public Span startSpan(String spanName, Attributes attributes) {
return startSpan(spanName, (SpanContext) null, attributes);
}

@Override
public SpanScope startSpan(String spanName, SpanContext parentSpan, Attributes attributes) {
public Span startSpan(String spanName, SpanContext parentSpan, Attributes attributes) {
Span span = null;
if (parentSpan != null) {
span = createSpan(spanName, parentSpan.getSpan(), attributes);
Expand All @@ -60,7 +65,7 @@ public SpanScope startSpan(String spanName, SpanContext parentSpan, Attributes a
}
setCurrentSpanInContext(span);
addDefaultAttributes(span);
return new DefaultSpanScope(span, (scopeSpan) -> endSpan(scopeSpan));
return span;
}

@Override
Expand All @@ -77,11 +82,21 @@ public SpanContext getCurrentSpan() {
return (currentSpan == null) ? null : new SpanContext(currentSpan);
}

private void endSpan(Span span) {
if (span != null) {
span.endSpan();
setCurrentSpanInContext(span.getParentSpan());
}
@Override
public ScopedSpan startScopedSpan(SpanCreationContext spanCreationContext) {
return startScopedSpan(spanCreationContext, null);
}

@Override
public ScopedSpan startScopedSpan(SpanCreationContext spanCreationContext, SpanContext parentSpan) {
Span span = startSpan(spanCreationContext.getSpanName(), parentSpan, spanCreationContext.getAttributes());
SpanScope spanScope = withSpanInScope(span);
return new DefaultScopedSpan(span, spanScope);
}

@Override
public SpanScope withSpanInScope(Span span) {
return DefaultSpanScope.create(span, tracerContextStorage).attach();
}

private Span createSpan(String spanName, Span parentSpan, Attributes attributes) {
Expand All @@ -101,7 +116,7 @@ protected void addDefaultAttributes(Span span) {
}

@Override
public SpanScope startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
public Span startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.noop.NoopScopedSpan;

/**
* An auto-closeable that represents scoped span.
* It provides interface for all the span operations.
*/
public interface ScopedSpan extends AutoCloseable {
/**
* No-op Scope implementation
*/
ScopedSpan NO_OP = new NoopScopedSpan();

/**
* Adds string attribute to the {@link Span}.
*
* @param key attribute key
* @param value attribute value
*/
void addAttribute(String key, String value);

/**
* Adds long attribute to the {@link Span}.
*
* @param key attribute key
* @param value attribute value
*/
void addAttribute(String key, long value);

/**
* Adds double attribute to the {@link Span}.
*
* @param key attribute key
* @param value attribute value
*/
void addAttribute(String key, double value);

/**
* Adds boolean attribute to the {@link Span}.
*
* @param key attribute key
* @param value attribute value
*/
void addAttribute(String key, boolean value);

/**
* Adds an event to the {@link Span}.
*
* @param event event name
*/
void addEvent(String event);

/**
* Records error in the span
*
* @param exception exception to be recorded
*/
void setError(Exception exception);

/**
* closes the scope
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing;

import org.opensearch.telemetry.tracing.attributes.Attributes;

/**
* Context for span details.
*/
public final class SpanCreationContext {
private final String spanName;
private final Attributes attributes;

/**
* Constructor.
* @param spanName span name.
* @param attributes attributes.
*/
public SpanCreationContext(String spanName, Attributes attributes) {
this.spanName = spanName;
this.attributes = attributes;
}

/**
* Returns the span name.
* @return span name
*/
public String getSpanName() {
return spanName;
}

/**
* Returns the span attributes.
* @return attributes.
*/
public Attributes getAttributes() {
return attributes;
}
}
Loading

0 comments on commit 776d77c

Please sign in to comment.