From 5fe26b76abc21b74440e63526d7fa87ea86259ea Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 23 Nov 2018 13:17:14 +0100 Subject: [PATCH] Replace Streamable w/ Writeable in BaseTasksRequest and subclasses This commit replaces usages of Streamable with Writeable for the BaseTasksRequest / TransportTasksAction classes and subclasses of these classes. Relates to #34389 --- .../index/reindex/RethrottleRequest.java | 14 +++++----- .../node/tasks/cancel/CancelTasksRequest.java | 9 ++++--- .../cancel/TransportCancelTasksAction.java | 25 +++++++----------- .../node/tasks/list/ListTasksRequest.java | 16 +++++++----- .../support/tasks/BaseTasksRequest.java | 21 ++++++++------- .../support/tasks/TransportTasksAction.java | 18 +++++-------- .../cluster/node/tasks/TestTaskPlugin.java | 7 +++++ .../node/tasks/TransportTasksActionTests.java | 5 ++++ .../persistent/TestPersistentTasksPlugin.java | 5 ++-- .../action/TransportFollowStatsAction.java | 1 - .../core/ccr/action/FollowStatsAction.java | 13 +++++----- .../xpack/core/ml/action/CloseJobAction.java | 25 +++++++++--------- .../xpack/core/ml/action/FlushJobAction.java | 19 +++++++------- .../core/ml/action/ForecastJobAction.java | 13 +++++----- .../core/ml/action/GetJobsStatsAction.java | 19 +++++++------- .../core/ml/action/IsolateDatafeedAction.java | 11 ++++---- .../xpack/core/ml/action/JobTaskRequest.java | 11 ++++---- .../core/ml/action/KillProcessAction.java | 4 +++ .../core/ml/action/PersistJobAction.java | 13 +++++----- .../xpack/core/ml/action/PostDataAction.java | 23 ++++++++-------- .../core/ml/action/StopDatafeedAction.java | 23 ++++++++-------- .../core/ml/action/UpdateProcessAction.java | 26 +++++++++---------- .../rollup/action/DeleteRollupJobAction.java | 11 ++++---- .../rollup/action/GetRollupJobsAction.java | 17 ++++++------ .../rollup/action/StartRollupJobAction.java | 11 ++++---- .../rollup/action/StopRollupJobAction.java | 19 +++++++------- .../ml/action/PostDataFlushRequestTests.java | 4 +-- .../ml/action/TransportJobTaskAction.java | 2 +- 28 files changed, 189 insertions(+), 196 deletions(-) diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleRequest.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleRequest.java index 968d998974b47..2c9166246c2b3 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleRequest.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleRequest.java @@ -39,6 +39,14 @@ public class RethrottleRequest extends BaseTasksRequest { */ private Float requestsPerSecond; + public RethrottleRequest() { + } + + public RethrottleRequest(StreamInput in) throws IOException { + super(in); + this.requestsPerSecond = in.readFloat(); + } + /** * The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default. */ @@ -80,12 +88,6 @@ public ActionRequestValidationException validate() { return validationException; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - requestsPerSecond = in.readFloat(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java index e92695d61e242..5c87b1da45d12 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksRequest.java @@ -36,10 +36,11 @@ public class CancelTasksRequest extends BaseTasksRequest { private String reason = DEFAULT_REASON; - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - reason = in.readString(); + public CancelTasksRequest() {} + + public CancelTasksRequest(StreamInput in) throws IOException { + super(in); + this.reason = in.readString(); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index 1e4138f7581c5..8673000523aa8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -65,8 +65,8 @@ public class TransportCancelTasksAction extends TransportTasksAction { private boolean detailed = false; private boolean waitForCompletion = false; + public ListTasksRequest() { + } + + public ListTasksRequest(StreamInput in) throws IOException { + super(in); + detailed = in.readBoolean(); + waitForCompletion = in.readBoolean(); + } + /** * Should the detailed task information be returned. */ @@ -63,13 +72,6 @@ public ListTasksRequest setWaitForCompletion(boolean waitForCompletion) { return this; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - detailed = in.readBoolean(); - waitForCompletion = in.readBoolean(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java index cbfdfc294c581..7b4cab1da6554 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksRequest.java @@ -52,9 +52,20 @@ public class BaseTasksRequest> extends private TaskId taskId = TaskId.EMPTY_TASK_ID; + // NOTE: This constructor is only needed, because the setters in this class, + // otherwise it can be removed and above fields can be made final. public BaseTasksRequest() { } + protected BaseTasksRequest(StreamInput in) throws IOException { + super(in); + taskId = TaskId.readFromStream(in); + parentTaskId = TaskId.readFromStream(in); + nodes = in.readStringArray(); + actions = in.readStringArray(); + timeout = in.readOptionalTimeValue(); + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -137,16 +148,6 @@ public final Request setTimeout(String timeout) { return (Request) this; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - taskId = TaskId.readFromStream(in); - parentTaskId = TaskId.readFromStream(in); - nodes = in.readStringArray(); - actions = in.readStringArray(); - timeout = in.readOptionalTimeValue(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index ad35da0943b36..aff12d22e4f3c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -71,13 +71,13 @@ public abstract class TransportTasksAction< protected final ClusterService clusterService; protected final TransportService transportService; - protected final Supplier requestSupplier; + protected final Writeable.Reader requestSupplier; protected final Supplier responseSupplier; protected final String transportNodeAction; protected TransportTasksAction(String actionName, ClusterService clusterService, TransportService transportService, - ActionFilters actionFilters, Supplier requestSupplier, + ActionFilters actionFilters, Writeable.Reader requestSupplier, Supplier responseSupplier, String nodeExecutor) { super(actionName, transportService, actionFilters, requestSupplier); this.clusterService = clusterService; @@ -86,7 +86,7 @@ protected TransportTasksAction(String actionName, ClusterService clusterService, this.requestSupplier = requestSupplier; this.responseSupplier = responseSupplier; - transportService.registerRequestHandler(transportNodeAction, NodeTaskRequest::new, nodeExecutor, new NodeTransportHandler()); + transportService.registerRequestHandler(transportNodeAction, nodeExecutor, NodeTaskRequest::new, new NodeTransportHandler()); } @Override @@ -362,8 +362,9 @@ public void onFailure(Exception e) { private class NodeTaskRequest extends TransportRequest { private TasksRequest tasksRequest; - protected NodeTaskRequest() { - super(); + protected NodeTaskRequest(StreamInput in) throws IOException { + super(in); + this.tasksRequest = requestSupplier.read(in); } protected NodeTaskRequest(TasksRequest tasksRequest) { @@ -371,13 +372,6 @@ protected NodeTaskRequest(TasksRequest tasksRequest) { this.tasksRequest = tasksRequest; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - tasksRequest = requestSupplier.get(); - tasksRequest.readFrom(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index cac1ff61b2073..ab2eadf69c1bc 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -373,6 +373,13 @@ public void writeTo(StreamOutput out) throws IOException { public static class UnblockTestTasksRequest extends BaseTasksRequest { + + UnblockTestTasksRequest() {} + + UnblockTestTasksRequest(StreamInput in) throws IOException { + super(in); + } + @Override public boolean match(Task task) { return task instanceof TestTask && super.match(task); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index f1768f6bf3e3c..16e0caa526dde 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -191,6 +191,11 @@ public String getStatus() { static class TestTasksRequest extends BaseTasksRequest { + TestTasksRequest(StreamInput in) throws IOException { + super(in); + } + + TestTasksRequest() {} } static class TestTasksResponse extends BaseTasksResponse { diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 0faf226ff0500..bfe5cfa3a234d 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -447,9 +447,8 @@ public static class TestTasksRequest extends BaseTasksRequest public TestTasksRequest() { } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public TestTasksRequest(StreamInput in) throws IOException { + super(in); operation = in.readOptionalString(); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java index 95ac7fc8be630..7965e9e0e2a28 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java @@ -84,7 +84,6 @@ protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in) protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer operation) { final ClusterState state = clusterService.state(); final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - if (persistentTasksMetaData == null) { return; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java index ff47f6e105b92..3248843541c5e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java @@ -129,6 +129,13 @@ public static class StatsRequest extends BaseTasksRequest implemen private String[] indices; + public StatsRequest() {} + + public StatsRequest(StreamInput in) throws IOException { + super(in); + indices = in.readOptionalStringArray(); + } + @Override public String[] indices() { return indices; @@ -161,12 +168,6 @@ public ActionRequestValidationException validate() { return null; } - @Override - public void readFrom(final StreamInput in) throws IOException { - super.readFrom(in); - indices = in.readOptionalStringArray(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java index 019bad54a5d24..7b098b6b6b8c8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java @@ -79,6 +79,18 @@ public Request() { openJobIds = new String[] {}; } + public Request(StreamInput in) throws IOException { + super(in); + jobId = in.readString(); + timeout = in.readTimeValue(); + force = in.readBoolean(); + openJobIds = in.readStringArray(); + local = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { + allowNoJobs = in.readBoolean(); + } + } + public Request(String jobId) { this(); this.jobId = jobId; @@ -128,19 +140,6 @@ public void setOpenJobIds(String [] openJobIds) { this.openJobIds = openJobIds; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - jobId = in.readString(); - timeout = in.readTimeValue(); - force = in.readBoolean(); - openJobIds = in.readStringArray(); - local = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - allowNoJobs = in.readBoolean(); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java index 4b96a4d6b2746..ec7a659c0706d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java @@ -75,6 +75,15 @@ public static Request parseRequest(String jobId, XContentParser parser) { public Request() { } + public Request(StreamInput in) throws IOException { + super(in); + calcInterim = in.readBoolean(); + start = in.readOptionalString(); + end = in.readOptionalString(); + advanceTime = in.readOptionalString(); + skipTime = in.readOptionalString(); + } + public Request(String jobId) { super(jobId); } @@ -119,16 +128,6 @@ public void setSkipTime(String skipTime) { this.skipTime = skipTime; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - calcInterim = in.readBoolean(); - start = in.readOptionalString(); - end = in.readOptionalString(); - advanceTime = in.readOptionalString(); - skipTime = in.readOptionalString(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ForecastJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ForecastJobAction.java index 327941a2c055c..56a60669cd3fe 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ForecastJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ForecastJobAction.java @@ -68,6 +68,12 @@ public static Request parseRequest(String jobId, XContentParser parser) { public Request() { } + public Request(StreamInput in) throws IOException { + super(in); + this.duration = in.readOptionalTimeValue(); + this.expiresIn = in.readOptionalTimeValue(); + } + public Request(String jobId) { super(jobId); } @@ -108,13 +114,6 @@ public void setExpiresIn(TimeValue expiresIn) { } } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - this.duration = in.readOptionalTimeValue(); - this.expiresIn = in.readOptionalTimeValue(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java index d2d5d09090e76..30cd76b346858 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java @@ -76,6 +76,15 @@ public Request(String jobId) { public Request() {} + public Request(StreamInput in) throws IOException { + super(in); + jobId = in.readString(); + expandedJobsIds = in.readList(StreamInput::readString); + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { + allowNoJobs = in.readBoolean(); + } + } + public List getExpandedJobsIds() { return expandedJobsIds; } public void setExpandedJobsIds(List expandedJobsIds) { this.expandedJobsIds = expandedJobsIds; } @@ -102,16 +111,6 @@ public ActionRequestValidationException validate() { return null; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - jobId = in.readString(); - expandedJobsIds = in.readList(StreamInput::readString); - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - allowNoJobs = in.readBoolean(); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java index 48cef12f01c43..cffc759c3ee32 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java @@ -78,6 +78,11 @@ public Request(String datafeedId) { public Request() { } + public Request(StreamInput in) throws IOException { + super(in); + datafeedId = in.readString(); + } + public String getDatafeedId() { return datafeedId; } @@ -93,12 +98,6 @@ public ActionRequestValidationException validate() { return null; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - datafeedId = in.readString(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/JobTaskRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/JobTaskRequest.java index adc84b2cf46d8..666417e9a9eac 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/JobTaskRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/JobTaskRequest.java @@ -21,6 +21,11 @@ public class JobTaskRequest> extends BaseTasksReques JobTaskRequest() { } + JobTaskRequest(StreamInput in) throws IOException { + super(in); + this.jobId = in.readString(); + } + JobTaskRequest(String jobId) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); } @@ -29,12 +34,6 @@ public String getJobId() { return jobId; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - jobId = in.readString(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/KillProcessAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/KillProcessAction.java index 96440ebe50306..1a91d0c32cabe 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/KillProcessAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/KillProcessAction.java @@ -46,6 +46,10 @@ public Request(String jobId) { public Request() { super(); } + + public Request(StreamInput in) throws IOException { + super(in); + } } public static class Response extends BaseTasksResponse implements Writeable { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java index 12ebed924dbcc..71bdae40ae593 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java @@ -35,6 +35,12 @@ public static class Request extends JobTaskRequest { public Request() { } + public Request(StreamInput in) throws IOException { + super(in); + // isBackground for fwc + in.readBoolean(); + } + public Request(String jobId) { super(jobId); } @@ -47,13 +53,6 @@ public boolean isForeground() { return !isBackGround(); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - // isBackground for fwc - in.readBoolean(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PostDataAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PostDataAction.java index ccc745d9742ad..1dca5d3abf90f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PostDataAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PostDataAction.java @@ -126,6 +126,17 @@ public static class Request extends JobTaskRequest { public Request() { } + public Request(StreamInput in) throws IOException { + super(in); + resetStart = in.readOptionalString(); + resetEnd = in.readOptionalString(); + dataDescription = in.readOptionalWriteable(DataDescription::new); + content = in.readBytesReference(); + if (in.readBoolean()) { + xContentType = in.readEnum(XContentType.class); + } + } + public Request(String jobId) { super(jobId); } @@ -165,18 +176,6 @@ public void setContent(BytesReference content, XContentType xContentType) { this.xContentType = xContentType; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - resetStart = in.readOptionalString(); - resetEnd = in.readOptionalString(); - dataDescription = in.readOptionalWriteable(DataDescription::new); - content = in.readBytesReference(); - if (in.readBoolean()) { - xContentType = in.readEnum(XContentType.class); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java index 55b9312f70b5d..d49fe8ed0fb95 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java @@ -85,6 +85,17 @@ public Request(String datafeedId) { public Request() { } + public Request(StreamInput in) throws IOException { + super(in); + datafeedId = in.readString(); + resolvedStartedDatafeedIds = in.readStringArray(); + stopTimeout = in.readTimeValue(); + force = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { + allowNoDatafeeds = in.readBoolean(); + } + } + public String getDatafeedId() { return datafeedId; } @@ -137,18 +148,6 @@ public ActionRequestValidationException validate() { return null; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - datafeedId = in.readString(); - resolvedStartedDatafeedIds = in.readStringArray(); - stopTimeout = in.readTimeValue(); - force = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - allowNoDatafeeds = in.readBoolean(); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessAction.java index 00b1d67bfff69..e1674228a93a3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessAction.java @@ -109,7 +109,18 @@ public static class Request extends JobTaskRequest { private MlFilter filter; private boolean updateScheduledEvents = false; - public Request() { + public Request() {} + + public Request(StreamInput in) throws IOException { + super(in); + modelPlotConfig = in.readOptionalWriteable(ModelPlotConfig::new); + if (in.readBoolean()) { + detectorUpdates = in.readList(JobUpdate.DetectorUpdate::new); + } + if (in.getVersion().onOrAfter(Version.V_6_2_0)) { + filter = in.readOptionalWriteable(MlFilter::new); + updateScheduledEvents = in.readBoolean(); + } } public Request(String jobId, ModelPlotConfig modelPlotConfig, List detectorUpdates, MlFilter filter, @@ -137,19 +148,6 @@ public boolean isUpdateScheduledEvents() { return updateScheduledEvents; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - modelPlotConfig = in.readOptionalWriteable(ModelPlotConfig::new); - if (in.readBoolean()) { - detectorUpdates = in.readList(JobUpdate.DetectorUpdate::new); - } - if (in.getVersion().onOrAfter(Version.V_6_2_0)) { - filter = in.readOptionalWriteable(MlFilter::new); - updateScheduledEvents = in.readBoolean(); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java index f1c6213cd70e3..a429f1e5b681f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java @@ -52,6 +52,11 @@ public Request(String id) { public Request() {} + public Request(StreamInput in) throws IOException { + super(in); + id = in.readString(); + } + @Override public boolean match(Task task) { return task.getDescription().equals(RollupField.NAME + "_" + id); @@ -61,12 +66,6 @@ public String getId() { return id; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readString(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/GetRollupJobsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/GetRollupJobsAction.java index 7bbbf07e6dcbe..8cd8a4bdf30d2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/GetRollupJobsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/GetRollupJobsAction.java @@ -66,6 +66,14 @@ public Request(String id) { public Request() {} + public Request(StreamInput in) throws IOException { + super(in); + id = in.readString(); + if (Strings.isNullOrEmpty(id) || id.equals("*")) { + this.id = MetaData.ALL; + } + } + @Override public boolean match(Task task) { // If we are retrieving all the jobs, the task description just needs to start @@ -81,15 +89,6 @@ public String getId() { return id; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readString(); - if (Strings.isNullOrEmpty(id) || id.equals("*")) { - this.id = MetaData.ALL; - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StartRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StartRollupJobAction.java index e3dcb1a882f9f..b09ed4b44b23c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StartRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StartRollupJobAction.java @@ -48,14 +48,13 @@ public Request(String id) { public Request() {} - public String getId() { - return id; + public Request(StreamInput in) throws IOException { + super(in); + id = in.readString(); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readString(); + public String getId() { + return id; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java index 361f81ab130e8..3720b2f016366 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java @@ -63,6 +63,15 @@ public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout public Request() {} + public Request(StreamInput in) throws IOException { + super(in); + id = in.readString(); + if (in.getVersion().onOrAfter(Version.V_6_6_0)) { + waitForCompletion = in.readBoolean(); + timeout = in.readTimeValue(); + } + } + public String getId() { return id; } @@ -75,16 +84,6 @@ public boolean waitForCompletion() { return waitForCompletion; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readString(); - if (in.getVersion().onOrAfter(Version.V_6_6_0)) { - waitForCompletion = in.readBoolean(); - timeout = in.readTimeValue(); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushRequestTests.java index a4fd8c3c47069..0ac0f04183210 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushRequestTests.java @@ -32,6 +32,6 @@ protected Request createBlankInstance() { } public void testNullJobIdThrows() { - expectThrows(IllegalArgumentException.class, () -> new Request(null)); + expectThrows(IllegalArgumentException.class, () -> new Request((String) null)); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index f610e88adcefc..688a0d24560de 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -39,7 +39,7 @@ public abstract class TransportJobTaskAction requestSupplier, + Writeable.Reader requestSupplier, Supplier responseSupplier, String nodeExecutor, AutodetectProcessManager processManager) { super(actionName, clusterService, transportService, actionFilters, requestSupplier, responseSupplier, nodeExecutor);