Skip to content

Commit

Permalink
Replace Streamable w/ Writeable in BaseTasksRequest and subclasses (#…
Browse files Browse the repository at this point in the history
…35854)

* Replace Streamable w/ Writeable in BaseTasksRequest and subclasses

This commit replaces usages of Streamable with Writeable for the
BaseTasksRequest / TransportTasksAction classes and subclasses of
these classes.

Relates to #34389
  • Loading branch information
martijnvg committed Dec 3, 2018
1 parent 790238b commit 30221d2
Show file tree
Hide file tree
Showing 41 changed files with 400 additions and 399 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
*/
private Float requestsPerSecond;

public RethrottleRequest() {
}

public RethrottleRequest(StreamInput in) throws IOException {
super(in);
this.requestsPerSecond = in.readFloat();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeFloat(requestsPerSecond);
}

/**
* The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
*/
Expand Down Expand Up @@ -80,15 +94,4 @@ public ActionRequestValidationException validate() {
return validationException;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestsPerSecond = in.readFloat();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeFloat(requestsPerSecond);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ public void testRethrottleRequest() throws IOException {
} else {
request.setTaskId(new TaskId(randomAlphaOfLength(5), randomLong()));
}
RethrottleRequest tripped = new RethrottleRequest();
// We use readFrom here because Rethrottle does not support the Writeable.Reader interface
tripped.readFrom(toInputByteStream(request));
RethrottleRequest tripped = new RethrottleRequest(toInputByteStream(request));
assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0.00001);
assertArrayEquals(request.getActions(), tripped.getActions());
assertEquals(request.getTaskId(), tripped.getTaskId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {

private String reason = DEFAULT_REASON;

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
reason = in.readString();
public CancelTasksRequest() {}

public CancelTasksRequest(StreamInput in) throws IOException {
super(in);
this.reason = in.readString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,12 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable

@Inject
public TransportCancelTasksAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver
indexNameExpressionResolver) {
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CancelTasksAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, CancelTasksRequest::new, CancelTasksResponse::new,
ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, BanParentTaskRequest::new, ThreadPool.Names.SAME, new
BanParentRequestHandler());
indexNameExpressionResolver, CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new,
new BanParentRequestHandler());
}

@Override
Expand Down Expand Up @@ -237,11 +236,9 @@ public void finish() {

private static class BanParentTaskRequest extends TransportRequest {

private TaskId parentTaskId;

private boolean ban;

private String reason;
private final TaskId parentTaskId;
private final boolean ban;
private final String reason;

static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason) {
return new BanParentTaskRequest(parentTaskId, reason);
Expand All @@ -260,19 +257,14 @@ private BanParentTaskRequest(TaskId parentTaskId, String reason) {
private BanParentTaskRequest(TaskId parentTaskId) {
this.parentTaskId = parentTaskId;
this.ban = false;
this.reason = null;
}

BanParentTaskRequest() {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
private BanParentTaskRequest(StreamInput in) throws IOException {
super(in);
parentTaskId = TaskId.readFromStream(in);
ban = in.readBoolean();
if (ban) {
reason = in.readString();
}
reason = ban ? in.readString() : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ public class ListTasksRequest extends BaseTasksRequest<ListTasksRequest> {
private boolean detailed = false;
private boolean waitForCompletion = false;

public ListTasksRequest() {
}

public ListTasksRequest(StreamInput in) throws IOException {
super(in);
detailed = in.readBoolean();
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(detailed);
out.writeBoolean(waitForCompletion);
}

/**
* Should the detailed task information be returned.
*/
Expand Down Expand Up @@ -63,17 +79,4 @@ public ListTasksRequest setWaitForCompletion(boolean waitForCompletion) {
return this;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
detailed = in.readBoolean();
waitForCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(detailed);
out.writeBoolean(waitForCompletion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,30 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends

private TaskId taskId = TaskId.EMPTY_TASK_ID;

// NOTE: This constructor is only needed, because the setters in this class,
// otherwise it can be removed and above fields can be made final.
public BaseTasksRequest() {
}

protected BaseTasksRequest(StreamInput in) throws IOException {
super(in);
taskId = TaskId.readFromStream(in);
parentTaskId = TaskId.readFromStream(in);
nodes = in.readStringArray();
actions = in.readStringArray();
timeout = in.readOptionalTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
taskId.writeTo(out);
parentTaskId.writeTo(out);
out.writeStringArrayNullable(nodes);
out.writeStringArrayNullable(actions);
out.writeOptionalTimeValue(timeout);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand Down Expand Up @@ -137,26 +158,6 @@ public final Request setTimeout(String timeout) {
return (Request) this;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = TaskId.readFromStream(in);
parentTaskId = TaskId.readFromStream(in);
nodes = in.readStringArray();
actions = in.readStringArray();
timeout = in.readOptionalTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
taskId.writeTo(out);
parentTaskId.writeTo(out);
out.writeStringArrayNullable(nodes);
out.writeStringArrayNullable(actions);
out.writeOptionalTimeValue(timeout);
}

public boolean match(Task task) {
if (getActions() != null && getActions().length > 0 && Regex.simpleMatch(getActions(), task.getAction()) == false) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,24 @@ public abstract class TransportTasksAction<

protected final ClusterService clusterService;
protected final TransportService transportService;
protected final Supplier<TasksRequest> requestSupplier;
protected final Writeable.Reader<TasksRequest> requestSupplier;
protected final Supplier<TasksResponse> responseSupplier;

protected final String transportNodeAction;

protected TransportTasksAction(Settings settings, String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<TasksRequest> requestSupplier,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<TasksRequest> requestSupplier,
Supplier<TasksResponse> responseSupplier,
String nodeExecutor) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestSupplier);
super(settings, actionName, threadPool, transportService, actionFilters, requestSupplier, indexNameExpressionResolver);
this.clusterService = clusterService;
this.transportService = transportService;
this.transportNodeAction = actionName + "[n]";
this.requestSupplier = requestSupplier;
this.responseSupplier = responseSupplier;

transportService.registerRequestHandler(transportNodeAction, NodeTaskRequest::new, nodeExecutor, new NodeTransportHandler());
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, NodeTaskRequest::new, new NodeTransportHandler());
}

@Override
Expand Down Expand Up @@ -372,27 +372,22 @@ public void onFailure(Exception e) {
private class NodeTaskRequest extends TransportRequest {
private TasksRequest tasksRequest;

protected NodeTaskRequest() {
super();
}

protected NodeTaskRequest(TasksRequest tasksRequest) {
super();
this.tasksRequest = tasksRequest;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tasksRequest = requestSupplier.get();
tasksRequest.readFrom(in);
protected NodeTaskRequest(StreamInput in) throws IOException {
super(in);
this.tasksRequest = requestSupplier.read(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
tasksRequest.writeTo(out);
}

protected NodeTaskRequest(TasksRequest tasksRequest) {
super();
this.tasksRequest = tasksRequest;
}

}

private class NodeTasksResponse extends TransportResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,13 @@ public void writeTo(StreamOutput out) throws IOException {


public static class UnblockTestTasksRequest extends BaseTasksRequest<UnblockTestTasksRequest> {

UnblockTestTasksRequest() {}

UnblockTestTasksRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public boolean match(Task task) {
return task instanceof TestTask && super.match(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ public String getStatus() {

static class TestTasksRequest extends BaseTasksRequest<TestTasksRequest> {

TestTasksRequest(StreamInput in) throws IOException {
super(in);
}

TestTasksRequest() {}
}

static class TestTasksResponse extends BaseTasksResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,8 @@ public static class TestTasksRequest extends BaseTasksRequest<TestTasksRequest>
public TestTasksRequest() {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public TestTasksRequest(StreamInput in) throws IOException {
super(in);
operation = in.readOptionalString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in)
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);

if (persistentTasksMetaData == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

public class StatsRequestTests extends AbstractStreamableTestCase<FollowStatsAction.StatsRequest> {
public class StatsRequestTests extends AbstractWireSerializingTestCase<FollowStatsAction.StatsRequest> {

@Override
protected FollowStatsAction.StatsRequest createBlankInstance() {
return new FollowStatsAction.StatsRequest();
protected Writeable.Reader<FollowStatsAction.StatsRequest> instanceReader() {
return FollowStatsAction.StatsRequest::new;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ public static class StatsRequest extends BaseTasksRequest<StatsRequest> implemen

private String[] indices;

public StatsRequest() {}

public StatsRequest(StreamInput in) throws IOException {
super(in);
indices = in.readOptionalStringArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(indices);
}

@Override
public String[] indices() {
return indices;
Expand Down Expand Up @@ -164,18 +177,6 @@ public ActionRequestValidationException validate() {
return null;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readOptionalStringArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(indices);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Loading

0 comments on commit 30221d2

Please sign in to comment.