Skip to content

Commit

Permalink
feat: add useful context to actor
Browse files Browse the repository at this point in the history
Improve debuggability and tracebility for actor logs. Add partition ID to log context, and sets specific actor name.

(cherry picked from commit 3b5b11a)
  • Loading branch information
ChrisKujawa authored and lenaschoenburg committed Mar 7, 2023
1 parent 9c16218 commit d758268
Showing 1 changed file with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private ProcessingScheduleServiceImpl processorActorService;
private ProcessingScheduleServiceImpl asyncScheduleService;
private AsyncProcessingScheduleServiceActor asyncActor;
private final int nodeId;

protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
actorSchedulingService = processorBuilder.getActorSchedulingService();
Expand All @@ -131,7 +132,8 @@ protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
.abortCondition(this::isClosed);
logStream = streamProcessorContext.getLogStream();
partitionId = logStream.getPartitionId();
actorName = buildActorName(processorBuilder.getNodeId(), "StreamProcessor", partitionId);
nodeId = processorBuilder.getNodeId();
actorName = buildActorName(nodeId, "StreamProcessor", partitionId);
metrics = new StreamProcessorMetrics(partitionId);
recordProcessors.addAll(processorBuilder.getRecordProcessors());
}
Expand Down Expand Up @@ -182,7 +184,8 @@ protected void onActorStarted() {
streamProcessorContext::getStreamProcessorPhase, // this is volatile
() -> false, // we will just stop the actor in this case, no need to provide this
logStream::newLogStreamBatchWriter);
asyncActor = new AsyncProcessingScheduleServiceActor(asyncScheduleService);
asyncActor =
new AsyncProcessingScheduleServiceActor(asyncScheduleService, nodeId, partitionId);
final var extendedProcessingScheduleService =
new ExtendedProcessingScheduleServiceImpl(
processorActorService, asyncScheduleService, asyncActor.getActorControl());
Expand Down Expand Up @@ -580,10 +583,28 @@ private static final class AsyncProcessingScheduleServiceActor extends Actor {

private final ProcessingScheduleServiceImpl scheduleService;
private CompletableActorFuture<Void> closeFuture = CompletableActorFuture.completed(null);
private final String asyncScheduleActorName;
private final int partitionId;

public AsyncProcessingScheduleServiceActor(
final ProcessingScheduleServiceImpl scheduleService) {
final ProcessingScheduleServiceImpl scheduleService,
final int nodeId,
final int partitionId) {
this.scheduleService = scheduleService;
asyncScheduleActorName = buildActorName(nodeId, "AsyncProcessingScheduleActor", partitionId);
this.partitionId = partitionId;
}

@Override
protected Map<String, String> createContext() {
final var context = super.createContext();
context.put(ACTOR_PROP_PARTITION_ID, Integer.toString(partitionId));
return context;
}

@Override
public String getName() {
return asyncScheduleActorName;
}

@Override
Expand Down

0 comments on commit d758268

Please sign in to comment.