Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Writeable for TransportReplAction derivatives #40894

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,9 @@ static DocWriteRequest<?> readDocumentRequest(StreamInput in) throws IOException
byte type = in.readByte();
DocWriteRequest<?> docWriteRequest;
if (type == 0) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.readFrom(in);
docWriteRequest = indexRequest;
docWriteRequest = new IndexRequest(in);
} else if (type == 1) {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.readFrom(in);
docWriteRequest = deleteRequest;
docWriteRequest = new DeleteRequest(in);
} else if (type == 2) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all

public static class ShardRequest extends ReplicationRequest<ShardRequest> {

private ClusterBlock clusterBlock;
private final ClusterBlock clusterBlock;

ShardRequest(){
ShardRequest(StreamInput in) throws IOException {
super(in);
clusterBlock = new ClusterBlock(in);
}

public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
Expand All @@ -153,9 +155,8 @@ public String toString() {
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
clusterBlock = new ClusterBlock(in);
public void readFrom(final StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public FlushRequest(String... indices) {
super(indices);
}

public FlushRequest(StreamInput in) throws IOException {
super(in);
force = in.readBoolean();
waitIfOngoing = in.readBoolean();
}

/**
* Returns {@code true} iff a flush should block
* if a another flush operation is already running. Otherwise {@code false}
Expand Down Expand Up @@ -103,9 +109,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
force = in.readBoolean();
waitIfOngoing = in.readBoolean();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@

public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {

private FlushRequest request = new FlushRequest();
private final FlushRequest request;

public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(shardId);
this.request = request;
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
}

public ShardFlushRequest() {
public ShardFlushRequest(StreamInput in) throws IOException {
super(in);
request = new FlushRequest(in);
}

FlushRequest getRequest() {
Expand All @@ -46,8 +48,7 @@ FlushRequest getRequest() {

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request.readFrom(in);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected PrimaryResult<ShardFlushRequest, ReplicationResponse> shardOperationOn
IndexShard primary) {
primary.flush(shardRequest.getRequest());
logger.trace("{} flush request executed on primary", primary.shardId());
return new PrimaryResult<ShardFlushRequest, ReplicationResponse>(shardRequest, new ReplicationResponse());
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* A refresh request making all operations performed since the last refresh available for search. The (near) real-time
Expand All @@ -35,4 +38,8 @@ public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
public RefreshRequest(String... indices) {
super(indices);
}

public RefreshRequest(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {

private BulkItemRequest[] items;

public BulkShardRequest() {
public BulkShardRequest(StreamInput in) throws IOException {
super(in);
items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) {
if (in.readBoolean()) {
items[i] = BulkItemRequest.readBulkItem(in);
}
}
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
Expand All @@ -60,7 +67,7 @@ public String[] indices() {
indices.add(item.index());
}
}
return indices.toArray(new String[indices.size()]);
return indices.toArray(new String[0]);
}

@Override
Expand All @@ -78,14 +85,8 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
items = new BulkItemRequest[in.readVInt()];
for (int i = 0; i < items.length; i++) {
if (in.readBoolean()) {
items[i] = BulkItemRequest.readBulkItem(in);
}
}
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.util.function.Supplier;

/** use transport bulk action directly */
@Deprecated
public abstract class TransportSingleItemBulkWriteAction<
Expand All @@ -43,8 +42,8 @@ public abstract class TransportSingleItemBulkWriteAction<
private final TransportBulkAction bulkAction;

protected TransportSingleItemBulkWriteAction(String actionName, TransportService transportService, ActionFilters actionFilters,
Supplier<Request> request, TransportBulkAction bulkAction) {
super(actionName, transportService, actionFilters, request);
Writeable.Reader<Request> requestReader, TransportBulkAction bulkAction) {
super(actionName, transportService, actionFilters, requestReader);
this.bulkAction = bulkAction;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {

private static final ShardId NO_SHARD_ID = null;

// Set to null initially so we can know to override in bulk requests that have a default type.
private String type;
private String id;
Expand All @@ -62,14 +64,27 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest>
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;

public DeleteRequest(StreamInput in) throws IOException {
super(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
}

public DeleteRequest() {
super(NO_SHARD_ID);
}

/**
* Constructs a new delete request against the specified index. The {@link #type(String)} and {@link #id(String)}
* must be set.
*/
public DeleteRequest(String index) {
super(NO_SHARD_ID);
this.index = index;
}

Expand All @@ -84,6 +99,7 @@ public DeleteRequest(String index) {
*/
@Deprecated
public DeleteRequest(String index, String type, String id) {
super(NO_SHARD_ID);
this.index = index;
this.type = type;
this.id = id;
Expand All @@ -96,6 +112,7 @@ public DeleteRequest(String index, String type, String id) {
* @param id The id of the document
*/
public DeleteRequest(String index, String id) {
super(NO_SHARD_ID);
this.index = index;
this.id = id;
}
Expand Down Expand Up @@ -273,15 +290,8 @@ public OpType opType() {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand All @@ -302,14 +312,4 @@ public void writeTo(StreamOutput out) throws IOException {
public String toString() {
return "delete {[" + index + "][" + type() + "][" + id + "]}";
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the DeleteRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public DeleteRequest setShardId(ShardId shardId) {
throw new UnsupportedOperationException("shard id should never be set on DeleteRequest");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
*/
static final int MAX_SOURCE_LENGTH_IN_TOSTRING = 2048;

private static final ShardId NO_SHARD_ID = null;

// Set to null initially so we can know to override in bulk requests that have a default type.
private String type;
private String id;
Expand Down Expand Up @@ -112,15 +114,37 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;

public IndexRequest(StreamInput in) throws IOException {
super(in);
type = in.readOptionalString();
id = in.readOptionalString();
routing = in.readOptionalString();
source = in.readBytesReference();
opType = OpType.fromId(in.readByte());
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
if (in.readBoolean()) {
contentType = in.readEnum(XContentType.class);
} else {
contentType = null;
}
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
}

public IndexRequest() {
super(NO_SHARD_ID);
}

/**
* Constructs a new index request against the specific index. The {@link #type(String)}
* {@link #source(byte[], XContentType)} must be set.
*/
public IndexRequest(String index) {
super(NO_SHARD_ID);
this.index = index;
}

Expand All @@ -131,6 +155,7 @@ public IndexRequest(String index) {
*/
@Deprecated
public IndexRequest(String index, String type) {
super(NO_SHARD_ID);
this.index = index;
this.type = type;
}
Expand All @@ -146,6 +171,7 @@ public IndexRequest(String index, String type) {
*/
@Deprecated
public IndexRequest(String index, String type, String id) {
super(NO_SHARD_ID);
this.index = index;
this.type = type;
this.id = id;
Expand Down Expand Up @@ -593,25 +619,8 @@ public void resolveRouting(MetaData metaData) {
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readOptionalString();
id = in.readOptionalString();
routing = in.readOptionalString();
source = in.readBytesReference();
opType = OpType.fromId(in.readByte());
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
isRetry = in.readBoolean();
autoGeneratedTimestamp = in.readLong();
if (in.readBoolean()) {
contentType = in.readEnum(XContentType.class);
} else {
contentType = null;
}
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

@Override
Expand Down Expand Up @@ -675,15 +684,4 @@ public void onRetry() {
public long getAutoGeneratedTimestamp() {
return autoGeneratedTimestamp;
}

/**
* Override this method from ReplicationAction, this is where we are storing our state in the request object (which we really shouldn't
* do). Once the transport client goes away we can move away from making this available, but in the meantime this is dangerous to set or
* use because the IndexRequest object will always be wrapped in a bulk request envelope, which is where this *should* be set.
*/
@Override
public IndexRequest setShardId(ShardId shardId) {
throw new UnsupportedOperationException("shard id should never be set on IndexRequest");
}

}
Loading