Skip to content

Commit

Permalink
Partial document updates are now supported in the Bulk API
Browse files Browse the repository at this point in the history
  • Loading branch information
msimons committed Feb 22, 2013
1 parent 3bc9e25 commit d5a11be
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 33 deletions.
11 changes: 8 additions & 3 deletions src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.PartialDocumentUpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand Down Expand Up @@ -63,11 +64,13 @@ public static BulkItemRequest readBulkItem(StreamInput in) throws IOException {
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
byte type = in.readByte();
int type = in.readInt();
if (type == 0) {
request = new IndexRequest();
} else if (type == 1) {
request = new DeleteRequest();
} else if (type == 2) {
request = new PartialDocumentUpdateRequest();
}
request.readFrom(in);
}
Expand All @@ -76,9 +79,11 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
if (request instanceof IndexRequest) {
out.writeByte((byte) 0);
out.writeInt(0);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
out.writeInt(1);
} else if (request instanceof PartialDocumentUpdateRequest){
out.writeInt(2);
}
request.writeTo(out);
}
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public BulkRequest add(ActionRequest request, @Nullable Object payload) {
add((IndexRequest) request, payload);
} else if (request instanceof DeleteRequest) {
add((DeleteRequest) request, payload);
} else if (request instanceof PartialDocumentUpdateRequest) {
add((PartialDocumentUpdateRequest) request,payload);
} else {
throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]");
}
Expand All @@ -98,6 +100,8 @@ public BulkRequest add(Iterable<ActionRequest> requests) {
add((IndexRequest) request);
} else if (request instanceof DeleteRequest) {
add((DeleteRequest) request);
} else if (request instanceof PartialDocumentUpdateRequest) {
add((PartialDocumentUpdateRequest) request);
} else {
throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]");
}
Expand Down Expand Up @@ -417,15 +421,15 @@ public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
int type = in.readInt();
if (type == 1) {
if (type == 0) {
IndexRequest request = new IndexRequest();
request.readFrom(in);
requests.add(request);
} else if (type == 2) {
} else if (type == 1) {
DeleteRequest request = new DeleteRequest();
request.readFrom(in);
requests.add(request);
} else if (type == 3) {
} else if (type == 2) {
PartialDocumentUpdateRequest request = new PartialDocumentUpdateRequest();
request.readFrom(in);
requests.add(request);
Expand All @@ -442,11 +446,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(requests.size());
for (ActionRequest request : requests) {
if (request instanceof IndexRequest) {
out.writeInt(1);
out.writeInt(0);
} else if (request instanceof DeleteRequest) {
out.writeInt(2);
out.writeInt(1);
} else if (request instanceof PartialDocumentUpdateRequest) {
out.writeInt(3);
out.writeInt(2);
}
request.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.PartialDocumentUpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.common.Nullable;
Expand All @@ -49,6 +50,15 @@ public BulkRequestBuilder add(IndexRequest request) {
super.request.add(request);
return this;
}

/**
* Adds an {@link PartialDocumentUpdateRequest} to the list of actions to execute. Follows the same behavior of {@link PartialDocumentUpdateRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
*/
public BulkRequestBuilder add(PartialDocumentUpdateRequest request) {
super.request.add(request);
return this;
}

/**
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.update.PartialDocumentUpdateRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -99,6 +100,11 @@ protected void doExecute(final BulkRequest bulkRequest, final ActionListener<Bul
if (!indices.contains(deleteRequest.getIndex())) {
indices.add(deleteRequest.getIndex());
}
} else if (request instanceof PartialDocumentUpdateRequest) {
PartialDocumentUpdateRequest updateRequest = (PartialDocumentUpdateRequest) request;
if (!indices.contains(updateRequest.getIndex())) {
indices.add(updateRequest.getIndex());
}
}
}

Expand Down Expand Up @@ -160,6 +166,10 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
DeleteRequest deleteRequest = (DeleteRequest) request;
deleteRequest.setRouting(clusterState.metaData().resolveIndexRouting(deleteRequest.getRouting(), deleteRequest.getIndex()));
deleteRequest.setIndex(clusterState.metaData().concreteIndex(deleteRequest.getIndex()));
} else if(request instanceof PartialDocumentUpdateRequest) {
PartialDocumentUpdateRequest updateRequest = (PartialDocumentUpdateRequest) request;
updateRequest.setIndex(clusterState.metaData().concreteIndex(updateRequest.getIndex()));

}
}
final BulkItemResponse[] responses = new BulkItemResponse[bulkRequest.requests.size()];
Expand All @@ -178,6 +188,15 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
requestsByShard.put(shardId, list);
}
list.add(new BulkItemRequest(i, request));
} else if (request instanceof PartialDocumentUpdateRequest) {
PartialDocumentUpdateRequest updateRequest = (PartialDocumentUpdateRequest) request;
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, updateRequest.getIndex(), updateRequest.getType(), updateRequest.getId(), updateRequest.getRouting()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
requestsByShard.put(shardId, list);
}
list.add(new BulkItemRequest(i, request));
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
MappingMetaData mappingMd = clusterState.metaData().index(deleteRequest.getIndex()).mappingOrDefault(deleteRequest.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
for (int i = 0; i < request.getItems().length; i++) {
BulkItemRequest item = request.getItems()[i];
if (item.getRequest() instanceof IndexRequest) {

indexRequest(clusterState, shardRequest, ops, versions, responses,
mappingsToUpdate, item, indexShard, request, (IndexRequest) item.getRequest(), i);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -41,7 +41,7 @@

/**
*/
public class PartialDocumentUpdateRequest extends InstanceShardOperationRequest<PartialDocumentUpdateRequest> {
public class PartialDocumentUpdateRequest extends ShardReplicationOperationRequest<PartialDocumentUpdateRequest> {

private String type;
private String id;
Expand Down Expand Up @@ -151,11 +151,6 @@ public String getRouting() {
return this.routing;
}

public int getShardId() {
return this.shardId;
}


/**
* Explicitly specify the fields that will be returned. By default, nothing is returned.
*/
Expand Down Expand Up @@ -219,26 +214,10 @@ public ReplicationType setReplicationType() {
return this.replicationType;
}

/**
* Sets the replication type.
*/
public PartialDocumentUpdateRequest setReplicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}

public WriteConsistencyLevel getConsistencyLevel() {
return this.consistencyLevel;
}

/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
public PartialDocumentUpdateRequest setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* Sets the doc to use for updates when a script is not specified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public long timestamp() {
}

public SourceToParse timestamp(String timestamp) {
this.timestamp = Long.parseLong(timestamp);
if(timestamp != null){
this.timestamp = Long.parseLong(timestamp);
}
return this;
}

Expand Down

0 comments on commit d5a11be

Please sign in to comment.