Skip to content

Commit

Permalink
Add peek_transform_name for debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed May 24, 2024
1 parent 1b7cb23 commit 99dbb23
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class GetTransformNodeStatsAction extends ActionType<GetTransformNodeStat

private static final String TOTAL_FIELD_NAME = "total";
private static final String REGISTERED_TRANSFORM_COUNT_FIELD_NAME = "registered_transform_count";
private static final String PEEK_TRANSFORM_FIELD_NAME = "peek_transform";

private GetTransformNodeStatsAction() {
super(NAME);
Expand Down Expand Up @@ -112,31 +113,40 @@ public void writeTo(StreamOutput out) throws IOException {
public static class NodeStatsResponse extends BaseNodeResponse implements ToXContentObject {

private final int registeredTransformCount;
private final String peekTransformName;

public int getRegisteredTransformCount() {
return this.registeredTransformCount;
}

public NodeStatsResponse(DiscoveryNode node, int registeredTransformCount) {
this(node, registeredTransformCount, null);
}

public NodeStatsResponse(DiscoveryNode node, int registeredTransformCount, String peekTransformName) {
super(node);
this.registeredTransformCount = registeredTransformCount;
this.peekTransformName = peekTransformName;
}

public NodeStatsResponse(StreamInput in) throws IOException {
super(in);
this.registeredTransformCount = in.readVInt();
this.peekTransformName = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(this.registeredTransformCount);
out.writeOptionalString(peekTransformName);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(REGISTERED_TRANSFORM_COUNT_FIELD_NAME, registeredTransformCount);
builder.field(PEEK_TRANSFORM_FIELD_NAME, peekTransformName);
return builder.endObject();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ protected NodeStatsResponse newNodeResponse(StreamInput in, DiscoveryNode node)
@Override
protected NodeStatsResponse nodeOperation(NodeStatsRequest request, Task task) {
final DiscoveryNode localNode = transportService.getLocalNode();
return new NodeStatsResponse(localNode, scheduler.getRegisteredTransformCount());
final TransformScheduler.Stats schedulerStats = scheduler.getStats();
return new NodeStatsResponse(localNode, schedulerStats.registeredTransformCount(), schedulerStats.peekTransformName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -60,6 +61,9 @@ public interface Listener {
void triggered(Event event);
}

public record Stats(int registeredTransformCount, String peekTransformName) {
}

private static final Logger logger = LogManager.getLogger(TransformScheduler.class);

private final Clock clock;
Expand Down Expand Up @@ -270,12 +274,20 @@ public void deregisterTransform(String transformId) {
scheduledTasks.remove(transformId);
}

public Stats getStats() {
return new Stats(
scheduledTasks.size(),
Optional.ofNullable(scheduledTasks.first()).map(TransformScheduledTask::getTransformId).orElse(null)
);
}

// Visible for testing
/**
* Returns the number of transforms currently in the queue.
*
* @return number of transforms currently in the queue
*/
public int getRegisteredTransformCount() {
int getRegisteredTransformCount() {
return scheduledTasks.size();
}

Expand Down

0 comments on commit 99dbb23

Please sign in to comment.