Skip to content

Commit

Permalink
Introduce TransformSchedulerStats class
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed May 27, 2024
1 parent 8762927 commit b12aa67
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformSchedulerStats;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.xpack.core.transform.transforms.TransformSchedulerStats.REGISTERED_TRANSFORM_COUNT_FIELD_NAME;

public class GetTransformNodeStatsAction extends ActionType<GetTransformNodeStatsAction.NodesStatsResponse> {

public static final GetTransformNodeStatsAction INSTANCE = new GetTransformNodeStatsAction();
public static final String NAME = "cluster:admin/transform/node_stats";

private static final String TOTAL_FIELD_NAME = "total";
private static final String REGISTERED_TRANSFORM_COUNT_FIELD_NAME = "registered_transform_count";
private static final String PEEK_TRANSFORM_FIELD_NAME = "peek_transform";
private static final String SCHEDULER_STATS_FIELD_NAME = "scheduler";

private GetTransformNodeStatsAction() {
super(NAME);
Expand All @@ -54,10 +55,12 @@ public void writeTo(StreamOutput out) throws IOException {

public static class NodesStatsResponse extends BaseNodesResponse<NodeStatsResponse> implements ToXContentObject {

private static final String TOTAL_FIELD_NAME = "total";

public int getTotalRegisteredTransformCount() {
int totalRegisteredTransformCount = 0;
for (var nodeResponse : getNodes()) {
totalRegisteredTransformCount += nodeResponse.getRegisteredTransformCount();
totalRegisteredTransformCount += nodeResponse.schedulerStats().registeredTransformCount();
}
return totalRegisteredTransformCount;
}
Expand All @@ -80,8 +83,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
nodeResponse.toXContent(builder, params);
}
builder.startObject(TOTAL_FIELD_NAME);
builder.startObject(SCHEDULER_STATS_FIELD_NAME);
builder.field(REGISTERED_TRANSFORM_COUNT_FIELD_NAME, getTotalRegisteredTransformCount());
builder.endObject();
builder.endObject();
return builder.endObject();
}

Expand Down Expand Up @@ -112,41 +117,32 @@ public void writeTo(StreamOutput out) throws IOException {

public static class NodeStatsResponse extends BaseNodeResponse implements ToXContentObject {

private final int registeredTransformCount;
private final String peekTransformName;

public int getRegisteredTransformCount() {
return this.registeredTransformCount;
}
private final TransformSchedulerStats schedulerStats;

public NodeStatsResponse(DiscoveryNode node, int registeredTransformCount) {
this(node, registeredTransformCount, null);
}

public NodeStatsResponse(DiscoveryNode node, int registeredTransformCount, String peekTransformName) {
public NodeStatsResponse(DiscoveryNode node, TransformSchedulerStats schedulerStats) {
super(node);
this.registeredTransformCount = registeredTransformCount;
this.peekTransformName = peekTransformName;
this.schedulerStats = schedulerStats;
}

public NodeStatsResponse(StreamInput in) throws IOException {
super(in);
this.registeredTransformCount = in.readVInt();
this.peekTransformName = in.readOptionalString();
this.schedulerStats = in.readOptionalWriteable(TransformSchedulerStats::new);
}

TransformSchedulerStats schedulerStats() {
return schedulerStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(this.registeredTransformCount);
out.writeOptionalString(peekTransformName);
out.writeOptionalWriteable(schedulerStats);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(REGISTERED_TRANSFORM_COUNT_FIELD_NAME, registeredTransformCount);
builder.field(PEEK_TRANSFORM_FIELD_NAME, peekTransformName);
builder.field(SCHEDULER_STATS_FIELD_NAME, schedulerStats);
return builder.endObject();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.transform.transforms;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

public record TransformSchedulerStats(int registeredTransformCount, String peekTransformName) implements ToXContent, Writeable {

public static final String REGISTERED_TRANSFORM_COUNT_FIELD_NAME = "registered_transform_count";
public static final String PEEK_TRANSFORM_FIELD_NAME = "peek_transform";

public TransformSchedulerStats(StreamInput in) throws IOException {
this(in.readVInt(), in.readOptionalString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(this.registeredTransformCount);
out.writeOptionalString(this.peekTransformName);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(REGISTERED_TRANSFORM_COUNT_FIELD_NAME, this.registeredTransformCount);
builder.field(PEEK_TRANSFORM_FIELD_NAME, this.peekTransformName);
return builder.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodeStatsResponse;
import org.elasticsearch.xpack.core.transform.action.GetTransformNodeStatsAction.NodesStatsResponse;
import org.elasticsearch.xpack.core.transform.transforms.TransformSchedulerStats;

import java.util.List;

Expand All @@ -36,9 +37,9 @@ public void testEmptyResponse() {
}

public void testResponse() {
var nodeA = new NodeStatsResponse(createNode("node-A"), 7);
var nodeB = new NodeStatsResponse(createNode("node-B"), 0);
var nodeC = new NodeStatsResponse(createNode("node-C"), 4);
var nodeA = new NodeStatsResponse(createNode("node-A"), new TransformSchedulerStats(7, null));
var nodeB = new NodeStatsResponse(createNode("node-B"), new TransformSchedulerStats(0, null));
var nodeC = new NodeStatsResponse(createNode("node-C"), new TransformSchedulerStats(4, null));

var nodesStatsResponse = new NodesStatsResponse(CLUSTER_NAME, List.of(nodeA, nodeB, nodeC), List.of());
assertThat(nodesStatsResponse.getNodes(), containsInAnyOrder(nodeA, nodeB, nodeC));
Expand All @@ -47,8 +48,8 @@ public void testResponse() {
}

public void testResponseWithFailure() {
var nodeA = new NodeStatsResponse(createNode("node-A"), 7);
var nodeB = new NodeStatsResponse(createNode("node-B"), 0);
var nodeA = new NodeStatsResponse(createNode("node-A"), new TransformSchedulerStats(7, null));
var nodeB = new NodeStatsResponse(createNode("node-B"), new TransformSchedulerStats(0, null));
var nodeC = new FailedNodeException("node-C", "node C failed", null);

var nodesStatsResponse = new NodesStatsResponse(CLUSTER_NAME, List.of(nodeA, nodeB), List.of(nodeC));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected Response getNodeStats() throws IOException {

protected int getTotalRegisteredTransformCount() throws IOException {
Response response = getNodeStats();
return (int) XContentMapValues.extractValue(entityAsMap(response), "total", "registered_transform_count");
return (int) XContentMapValues.extractValue(entityAsMap(response), "total", "scheduler", "registered_transform_count");
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ public void testTransformNodeStats() throws Exception {
assertThat(response, hasKey("total"));
assertThat(
"Response was: " + response,
XContentMapValues.extractValue(response, "total", "registered_transform_count"),
XContentMapValues.extractValue(response, "total", "scheduler", "registered_transform_count"),
is(equalTo(0))
);
for (String nodeId : nodesInfo.keySet()) {
assertThat(response, hasKey(nodeId));
assertThat(
"Response was: " + response,
XContentMapValues.extractValue(response, nodeId, "registered_transform_count"),
XContentMapValues.extractValue(response, nodeId, "scheduler", "registered_transform_count"),
is(equalTo(0))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ protected NodeStatsResponse newNodeResponse(StreamInput in, DiscoveryNode node)

@Override
protected NodeStatsResponse nodeOperation(NodeStatsRequest request, Task task) {
final DiscoveryNode localNode = transportService.getLocalNode();
final TransformScheduler.Stats schedulerStats = scheduler.getStats();
return new NodeStatsResponse(localNode, schedulerStats.registeredTransformCount(), schedulerStats.peekTransformName());
var localNode = transportService.getLocalNode();
var schedulerStats = scheduler.getStats();
return new NodeStatsResponse(localNode, schedulerStats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.transform.transforms.TransformSchedulerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.transform.Transform;

Expand Down Expand Up @@ -61,8 +62,6 @@ public interface Listener {
void triggered(Event event);
}

public record Stats(int registeredTransformCount, String peekTransformName) {}

private static final Logger logger = LogManager.getLogger(TransformScheduler.class);

private final Clock clock;
Expand Down Expand Up @@ -273,8 +272,8 @@ public void deregisterTransform(String transformId) {
scheduledTasks.remove(transformId);
}

public Stats getStats() {
return new Stats(
public TransformSchedulerStats getStats() {
return new TransformSchedulerStats(
scheduledTasks.size(),
Optional.ofNullable(scheduledTasks.first()).map(TransformScheduledTask::getTransformId).orElse(null)
);
Expand Down

0 comments on commit b12aa67

Please sign in to comment.