Skip to content

Commit

Permalink
Replace Streamable w/ Writeable in BaseTasksRequest and subclasses
Browse files Browse the repository at this point in the history
This commit replaces usages of Streamable with Writeable for the
BaseTasksRequest / TransportTasksAction classes and subclasses of
these classes.

Relates to elastic#34389
  • Loading branch information
martijnvg committed Nov 23, 2018
1 parent d01436d commit 5fe26b7
Show file tree
Hide file tree
Showing 28 changed files with 189 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
*/
private Float requestsPerSecond;

public RethrottleRequest() {
}

public RethrottleRequest(StreamInput in) throws IOException {
super(in);
this.requestsPerSecond = in.readFloat();
}

/**
* The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
*/
Expand Down Expand Up @@ -80,12 +88,6 @@ public ActionRequestValidationException validate() {
return validationException;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestsPerSecond = in.readFloat();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {

private String reason = DEFAULT_REASON;

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
reason = in.readString();
public CancelTasksRequest() {}

public CancelTasksRequest(StreamInput in) throws IOException {
super(in);
this.reason = in.readString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
public TransportCancelTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(CancelTasksAction.NAME, clusterService, transportService, actionFilters,
CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, BanParentTaskRequest::new, ThreadPool.Names.SAME, new
BanParentRequestHandler());
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new,
new BanParentRequestHandler());
}

@Override
Expand Down Expand Up @@ -233,11 +233,9 @@ public void finish() {

private static class BanParentTaskRequest extends TransportRequest {

private TaskId parentTaskId;

private boolean ban;

private String reason;
private final TaskId parentTaskId;
private final boolean ban;
private final String reason;

static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason) {
return new BanParentTaskRequest(parentTaskId, reason);
Expand All @@ -256,19 +254,14 @@ private BanParentTaskRequest(TaskId parentTaskId, String reason) {
private BanParentTaskRequest(TaskId parentTaskId) {
this.parentTaskId = parentTaskId;
this.ban = false;
this.reason = null;
}

BanParentTaskRequest() {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
private BanParentTaskRequest(StreamInput in) throws IOException {
super(in);
parentTaskId = TaskId.readFromStream(in);
ban = in.readBoolean();
if (ban) {
reason = in.readString();
}
reason = ban ? in.readString() : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public class ListTasksRequest extends BaseTasksRequest<ListTasksRequest> {
private boolean detailed = false;
private boolean waitForCompletion = false;

public ListTasksRequest() {
}

public ListTasksRequest(StreamInput in) throws IOException {
super(in);
detailed = in.readBoolean();
waitForCompletion = in.readBoolean();
}

/**
* Should the detailed task information be returned.
*/
Expand Down Expand Up @@ -63,13 +72,6 @@ public ListTasksRequest setWaitForCompletion(boolean waitForCompletion) {
return this;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
detailed = in.readBoolean();
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,20 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends

private TaskId taskId = TaskId.EMPTY_TASK_ID;

// NOTE: This constructor is only needed, because the setters in this class,
// otherwise it can be removed and above fields can be made final.
public BaseTasksRequest() {
}

protected BaseTasksRequest(StreamInput in) throws IOException {
super(in);
taskId = TaskId.readFromStream(in);
parentTaskId = TaskId.readFromStream(in);
nodes = in.readStringArray();
actions = in.readStringArray();
timeout = in.readOptionalTimeValue();
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down Expand Up @@ -137,16 +148,6 @@ public final Request setTimeout(String timeout) {
return (Request) this;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = TaskId.readFromStream(in);
parentTaskId = TaskId.readFromStream(in);
nodes = in.readStringArray();
actions = in.readStringArray();
timeout = in.readOptionalTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ public abstract class TransportTasksAction<

protected final ClusterService clusterService;
protected final TransportService transportService;
protected final Supplier<TasksRequest> requestSupplier;
protected final Writeable.Reader<TasksRequest> requestSupplier;
protected final Supplier<TasksResponse> responseSupplier;

protected final String transportNodeAction;

protected TransportTasksAction(String actionName, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, Supplier<TasksRequest> requestSupplier,
ActionFilters actionFilters, Writeable.Reader<TasksRequest> requestSupplier,
Supplier<TasksResponse> responseSupplier, String nodeExecutor) {
super(actionName, transportService, actionFilters, requestSupplier);
this.clusterService = clusterService;
Expand All @@ -86,7 +86,7 @@ protected TransportTasksAction(String actionName, ClusterService clusterService,
this.requestSupplier = requestSupplier;
this.responseSupplier = responseSupplier;

transportService.registerRequestHandler(transportNodeAction, NodeTaskRequest::new, nodeExecutor, new NodeTransportHandler());
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, NodeTaskRequest::new, new NodeTransportHandler());
}

@Override
Expand Down Expand Up @@ -362,22 +362,16 @@ public void onFailure(Exception e) {
private class NodeTaskRequest extends TransportRequest {
private TasksRequest tasksRequest;

protected NodeTaskRequest() {
super();
protected NodeTaskRequest(StreamInput in) throws IOException {
super(in);
this.tasksRequest = requestSupplier.read(in);
}

protected NodeTaskRequest(TasksRequest tasksRequest) {
super();
this.tasksRequest = tasksRequest;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tasksRequest = requestSupplier.get();
tasksRequest.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,13 @@ public void writeTo(StreamOutput out) throws IOException {


public static class UnblockTestTasksRequest extends BaseTasksRequest<UnblockTestTasksRequest> {

UnblockTestTasksRequest() {}

UnblockTestTasksRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public boolean match(Task task) {
return task instanceof TestTask && super.match(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ public String getStatus() {

static class TestTasksRequest extends BaseTasksRequest<TestTasksRequest> {

TestTasksRequest(StreamInput in) throws IOException {
super(in);
}

TestTasksRequest() {}
}

static class TestTasksResponse extends BaseTasksResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,8 @@ public static class TestTasksRequest extends BaseTasksRequest<TestTasksRequest>
public TestTasksRequest() {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public TestTasksRequest(StreamInput in) throws IOException {
super(in);
operation = in.readOptionalString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in)
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);

if (persistentTasksMetaData == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ public static class StatsRequest extends BaseTasksRequest<StatsRequest> implemen

private String[] indices;

public StatsRequest() {}

public StatsRequest(StreamInput in) throws IOException {
super(in);
indices = in.readOptionalStringArray();
}

@Override
public String[] indices() {
return indices;
Expand Down Expand Up @@ -161,12 +168,6 @@ public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readOptionalStringArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ public Request() {
openJobIds = new String[] {};
}

public Request(StreamInput in) throws IOException {
super(in);
jobId = in.readString();
timeout = in.readTimeValue();
force = in.readBoolean();
openJobIds = in.readStringArray();
local = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}

public Request(String jobId) {
this();
this.jobId = jobId;
Expand Down Expand Up @@ -128,19 +140,6 @@ public void setOpenJobIds(String [] openJobIds) {
this.openJobIds = openJobIds;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
timeout = in.readTimeValue();
force = in.readBoolean();
openJobIds = in.readStringArray();
local = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ public static Request parseRequest(String jobId, XContentParser parser) {
public Request() {
}

public Request(StreamInput in) throws IOException {
super(in);
calcInterim = in.readBoolean();
start = in.readOptionalString();
end = in.readOptionalString();
advanceTime = in.readOptionalString();
skipTime = in.readOptionalString();
}

public Request(String jobId) {
super(jobId);
}
Expand Down Expand Up @@ -119,16 +128,6 @@ public void setSkipTime(String skipTime) {
this.skipTime = skipTime;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calcInterim = in.readBoolean();
start = in.readOptionalString();
end = in.readOptionalString();
advanceTime = in.readOptionalString();
skipTime = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public static Request parseRequest(String jobId, XContentParser parser) {
public Request() {
}

public Request(StreamInput in) throws IOException {
super(in);
this.duration = in.readOptionalTimeValue();
this.expiresIn = in.readOptionalTimeValue();
}

public Request(String jobId) {
super(jobId);
}
Expand Down Expand Up @@ -108,13 +114,6 @@ public void setExpiresIn(TimeValue expiresIn) {
}
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.duration = in.readOptionalTimeValue();
this.expiresIn = in.readOptionalTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Loading

0 comments on commit 5fe26b7

Please sign in to comment.