Skip to content

Commit

Permalink
Instrument primary/replica write in TransportWriteAction instead of T…
Browse files Browse the repository at this point in the history
…ransportShardBulkAction

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed Oct 19, 2023
1 parent eaf80f7 commit 9feb78c
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,7 @@
import org.opensearch.indices.SystemIndices;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -184,7 +180,8 @@ public TransportShardBulkAction(
EXECUTOR_NAME_FUNCTION,
false,
indexingPressureService,
systemIndices
systemIndices,
tracer
);
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
Expand Down Expand Up @@ -415,30 +412,27 @@ protected void dispatchedShardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
) {
Span span = tracer.startSpan(SpanBuilder.from("shardPrimaryAction", clusterService.localNode().getId(), request));
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
assert update != null;
assert shardId != null;
mappingUpdatedAction.updateMappingOnClusterManager(shardId.getIndex(), update, mappingListener);
}, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
mappingUpdateListener.onResponse(null);
}
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
assert update != null;
assert shardId != null;
mappingUpdatedAction.updateMappingOnClusterManager(shardId.getIndex(), update, mappingListener);
}, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
mappingUpdateListener.onResponse(null);
}

@Override
public void onClusterServiceClose() {
mappingUpdateListener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onClusterServiceClose() {
mappingUpdateListener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
}
}), TraceableActionListener.create(listener, span, tracer), threadPool, executor(primary));
}
@Override
public void onTimeout(TimeValue timeout) {
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
}
}), listener, threadPool, executor(primary));
}

@Override
Expand Down Expand Up @@ -813,13 +807,10 @@ static BulkItemResponse processUpdateResponse(

@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
Span span = tracer.startSpan(SpanBuilder.from("shardReplicaAction", clusterService.localNode().getId(), request));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
ActionListener.completeWith(TraceableActionListener.create(listener, span, tracer), () -> {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
});
}
ActionListener.completeWith(listener, () -> {
final Translog.Location location = performOnReplica(request, replica);
return new WriteReplicaResult<>(request, location, null, replica, logger);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.threadpool.ThreadPool.Names;
import org.opensearch.transport.TransportException;
Expand Down Expand Up @@ -93,7 +94,8 @@ public TransportResyncReplicationAction(
ShardStateAction shardStateAction,
ActionFilters actionFilters,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
super(
settings,
Expand All @@ -109,7 +111,8 @@ public TransportResyncReplicationAction(
EXECUTOR_NAME_FUNCTION,
true, /* we should never reject resync because of thread pool capacity on primary */
indexingPressureService,
systemIndices
systemIndices,
tracer
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
import org.opensearch.index.translog.Translog.Location;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -82,6 +87,7 @@ public abstract class TransportWriteAction<
protected final SystemIndices systemIndices;

private final Function<IndexShard, String> executorFunction;
private final Tracer tracer;

protected TransportWriteAction(
Settings settings,
Expand All @@ -97,7 +103,8 @@ protected TransportWriteAction(
Function<IndexShard, String> executorFunction,
boolean forceExecutionOnPrimary,
IndexingPressureService indexingPressureService,
SystemIndices systemIndices
SystemIndices systemIndices,
Tracer tracer
) {
// We pass ThreadPool.Names.SAME to the super class as we control the dispatching to the
// ThreadPool.Names.WRITE/ThreadPool.Names.SYSTEM_WRITE thread pools in this class.
Expand All @@ -119,6 +126,7 @@ protected TransportWriteAction(
this.executorFunction = executorFunction;
this.indexingPressureService = indexingPressureService;
this.systemIndices = systemIndices;
this.tracer = tracer;
}

protected String executor(IndexShard shard) {
Expand Down Expand Up @@ -220,7 +228,10 @@ protected void shardOperationOnPrimary(
threadPool.executor(executor).execute(new ActionRunnable<PrimaryResult<ReplicaRequest, Response>>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnPrimary(request, primary, listener);
Span span = tracer.startSpan(SpanBuilder.from("shardPrimaryAction", clusterService.localNode().getId(), request));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnPrimary(request, primary, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
Expand Down Expand Up @@ -248,7 +259,10 @@ protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replic
threadPool.executor(executorFunction.apply(replica)).execute(new ActionRunnable<ReplicaResult>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnReplica(request, replica, listener);
Span span = tracer.startSpan(SpanBuilder.from("shardReplicaAction", clusterService.localNode().getId(), request));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
dispatchedShardOperationOnReplica(request, replica, TraceableActionListener.create(listener, span, tracer));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
Expand Down Expand Up @@ -99,7 +100,8 @@ public RetentionLeaseSyncAction(
final ShardStateAction shardStateAction,
final ActionFilters actionFilters,
final IndexingPressureService indexingPressureService,
final SystemIndices systemIndices
final SystemIndices systemIndices,
final Tracer tracer
) {
super(
settings,
Expand All @@ -115,7 +117,8 @@ public RetentionLeaseSyncAction(
ignore -> ThreadPool.Names.MANAGEMENT,
false,
indexingPressureService,
systemIndices
systemIndices,
tracer
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.telemetry.tracing;

import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.replication.ReplicatedWriteRequest;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.core.common.Strings;
import org.opensearch.http.HttpRequest;
Expand Down Expand Up @@ -69,8 +70,8 @@ public static SpanCreationContext from(String action, Transport.Connection conne
return SpanCreationContext.server().name(createSpanName(action, connection)).attributes(buildSpanAttributes(action, connection));
}

public static SpanCreationContext from(String spanName, String nodeId, BulkShardRequest bulkShardRequest) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, bulkShardRequest));
public static SpanCreationContext from(String spanName, String nodeId, ReplicatedWriteRequest request) {
return SpanCreationContext.server().name(spanName).attributes(buildSpanAttributes(nodeId, request));
}

private static String createSpanName(HttpRequest httpRequest) {
Expand Down Expand Up @@ -155,14 +156,15 @@ private static Attributes buildSpanAttributes(String action, TcpChannel tcpChann
return attributes;
}

private static Attributes buildSpanAttributes(String nodeId, BulkShardRequest bulkShardRequest) {
Attributes attributes = Attributes.create()
.addAttribute(AttributeNames.NODE_ID, nodeId)
.addAttribute(AttributeNames.BULK_REQUEST_ITEMS, bulkShardRequest.items().length)
.addAttribute(AttributeNames.REFRESH_POLICY, bulkShardRequest.getRefreshPolicy().getValue());
if (bulkShardRequest.shardId() != null) {
attributes.addAttribute(AttributeNames.INDEX, bulkShardRequest.shardId().getIndexName())
.addAttribute(AttributeNames.SHARD_ID, bulkShardRequest.shardId().getId());
private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequest request) {
Attributes attributes = Attributes.create().addAttribute(AttributeNames.NODE_ID, nodeId);
if (request instanceof BulkShardRequest) {
attributes.addAttribute(AttributeNames.BULK_REQUEST_ITEMS, ((BulkShardRequest) request).items().length)
.addAttribute(AttributeNames.REFRESH_POLICY, request.getRefreshPolicy().getValue());
if (request.shardId() != null) {
attributes.addAttribute(AttributeNames.INDEX, request.shardId().getIndexName())
.addAttribute(AttributeNames.SHARD_ID, request.shardId().getId());
}
}
return attributes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ public void testResyncDoesNotBlockOnPrimaryAction() throws Exception {
shardStateAction,
new ActionFilters(new HashSet<>()),
new IndexingPressureService(Settings.EMPTY, clusterService),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);

assertThat(action.globalBlockLevel(), nullValue());
Expand Down Expand Up @@ -256,7 +257,8 @@ private TransportResyncReplicationAction createAction() {
mock(ShardStateAction.class),
new ActionFilters(new HashSet<>()),
mock(IndexingPressureService.class),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ protected TestAction(
ignore -> ThreadPool.Names.SAME,
false,
TransportWriteActionForIndexingPressureTests.this.indexingPressureService,
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF
ignore -> ThreadPool.Names.SAME,
false,
new IndexingPressureService(Settings.EMPTY, TransportWriteActionTests.this.clusterService),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
Expand Down Expand Up @@ -505,7 +506,8 @@ protected TestAction(
ignore -> ThreadPool.Names.SAME,
false,
new IndexingPressureService(settings, clusterService),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
this.withDocumentFailureOnPrimary = false;
this.withDocumentFailureOnReplica = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public void testRetentionLeaseSyncActionOnPrimary() {
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexingPressureService(Settings.EMPTY, clusterService),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
Expand Down Expand Up @@ -162,7 +163,8 @@ public void testRetentionLeaseSyncActionOnReplica() throws Exception {
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexingPressureService(Settings.EMPTY, clusterService),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
final RetentionLeases retentionLeases = mock(RetentionLeases.class);
final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases);
Expand Down Expand Up @@ -203,7 +205,8 @@ public void testBlocks() {
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexingPressureService(Settings.EMPTY, clusterService),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);

assertNull(action.indexBlockLevel());
Expand Down Expand Up @@ -233,7 +236,8 @@ private RetentionLeaseSyncAction createAction() {
shardStateAction,
new ActionFilters(Collections.emptySet()),
new IndexingPressureService(Settings.EMPTY, clusterService),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2123,7 +2123,8 @@ public void onFailure(final Exception e) {
shardStateAction,
actionFilters,
new IndexingPressureService(settings, clusterService),
new SystemIndices(emptyMap())
new SystemIndices(emptyMap()),
NoopTracer.INSTANCE
)
),
new GlobalCheckpointSyncAction(
Expand Down

0 comments on commit 9feb78c

Please sign in to comment.