Skip to content

Commit

Permalink
Review comments fixes and refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed Oct 17, 2023
1 parent f84cf99 commit 301a2cc
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,10 @@ private void finishHim() {
}
}, releasable::close), span, tracer)
);
} catch (Exception e) {
span.setError(e);
span.endSpan();
throw e;
}
}
bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,6 @@ public TransportShardBulkAction(
);
}

@Override
protected Tracer getTracer() {
return tracer;
}

protected void handlePrimaryTermValidationRequest(
final PrimaryTermValidationRequest request,
final TransportChannel channel,
Expand Down Expand Up @@ -420,9 +415,9 @@ protected void dispatchedShardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
) {
Span span = getTracer().startSpan(SpanBuilder.from("shardPrimaryAction", clusterService.localNode().getId(), request));
Span span = tracer.startSpan(SpanBuilder.from("shardPrimaryAction", clusterService.localNode().getId(), request));
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
try (SpanScope spanScope = getTracer().withSpanInScope(span)) {
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
assert update != null;
assert shardId != null;
Expand All @@ -442,7 +437,7 @@ public void onClusterServiceClose() {
public void onTimeout(TimeValue timeout) {
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
}
}), TraceableActionListener.create(listener, span, getTracer()), threadPool, executor(primary));
}), TraceableActionListener.create(listener, span, tracer), threadPool, executor(primary));
}
}

Expand Down Expand Up @@ -818,9 +813,9 @@ static BulkItemResponse processUpdateResponse(

@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
Span span = getTracer().startSpan(SpanBuilder.from("shardReplicaAction", clusterService.localNode().getId(), request));
try (SpanScope spanScope = getTracer().withSpanInScope(span)) {
ActionListener.completeWith(TraceableActionListener.create(listener, span, getTracer()), () -> {
Span span = tracer.startSpan(SpanBuilder.from("shardReplicaAction", clusterService.localNode().getId(), request));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
ActionListener.completeWith(TraceableActionListener.create(listener, span, tracer), () -> {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskListener;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -81,10 +79,6 @@ private Releasable registerChildNode(TaskId parentTask) {
}
}

protected Tracer getTracer() {
return NoopTracer.INSTANCE;
}

/**
* Use this method when the transport action call should result in creation of a new task associated with the call.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private AttributeNames() {
/**
* Number of request items in bulk request
*/
public static final String NUM_BULK_ITEMS = "num_bulk_items";
public static final String BULK_REQUEST_ITEMS = "bulk_request_items";

/**
* Node ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ public static SpanCreationContext from(String spanName, String nodeId, BulkShard
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, bulkShardRequest));
}

public static SpanCreationContext from(String spanName, String nodeId, ShardId shardId) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, shardId));
}

private static String createSpanName(HttpRequest httpRequest) {
return httpRequest.method().name() + SEPARATOR + httpRequest.uri();
}
Expand Down Expand Up @@ -139,16 +135,13 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio
}

private static Attributes buildSpanAttributes(String nodeId, BulkShardRequest bulkShardRequest) {
Attributes attributes = buildSpanAttributes(nodeId, bulkShardRequest.shardId());
attributes.addAttribute(AttributeNames.NUM_BULK_ITEMS, bulkShardRequest.items().length);
return attributes;
}

private static Attributes buildSpanAttributes(String nodeId, ShardId shardId) {
Attributes attributes = Attributes.create()
.addAttribute(AttributeNames.NODE_ID, nodeId)
.addAttribute(AttributeNames.INDEX, (shardId != null) ? shardId.getIndexName() : "NULL")
.addAttribute(AttributeNames.SHARD_ID, (shardId != null) ? shardId.getId() : -1);
.addAttribute(AttributeNames.BULK_REQUEST_ITEMS, bulkShardRequest.items().length);
if (bulkShardRequest.shardId() != null) {
attributes.addAttribute(AttributeNames.INDEX, bulkShardRequest.shardId().getIndexName())
.addAttribute(AttributeNames.SHARD_ID, bulkShardRequest.shardId().getId());
}
return attributes;
}

Expand Down

0 comments on commit 301a2cc

Please sign in to comment.