Skip to content

Commit

Permalink
Make modifying operations durable by default.
Browse files Browse the repository at this point in the history
This commit makes create, update and delete operations on an index durable
by default. The user has the option to opt out to use async translog flushes
on a per-index basis by settings `index.translog.durability=request`.

Initial benchmarks running on SSDs have show that indexing is about 7% - 10% slower
with bulk indexing compared to async translog flushes. This change is orthogonal to
the transaction log sync interval and will only sync the transaction log if the operation
has not yet been concurrently synced. Ie. if multiple indexing requests are submitted and
one operations sync call already persists the operations of others only one sync call is executed.

Relates to elastic#10933
  • Loading branch information
s1monw committed May 7, 2015
1 parent cff3685 commit 796e965
Show file tree
Hide file tree
Showing 22 changed files with 469 additions and 305 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,4 @@ public final BulkRequestBuilder setTimeout(String timeout) {
public int numberOfActions() {
return request.numberOfActions();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(items.length);
for (BulkItemRequest item : items) {
if (item != null) {
// if we are serializing to a node that is pre 1.3.3, make sure to pass null to maintain
// the old behavior of putting null in the request to be ignored on the replicas
if (item.isIgnoreOnReplica() && out.getVersion().before(Version.V_1_3_3)) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
item.writeTo(out);
}
out.writeBoolean(true);
item.writeTo(out);
} else {
out.writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
Expand All @@ -41,7 +40,6 @@
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand All @@ -51,14 +49,13 @@
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.river.RiverIndexName;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -116,19 +113,21 @@ protected ShardIterator shards(ClusterState clusterState, InternalRequest reques
@Override
protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final BulkShardRequest request = shardRequest.request;
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());
final IndexService indexService = indicesService.indexServiceSafe(request.index());
final IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());

long[] preVersions = new long[request.items().length];
VersionType[] preVersionTypes = new VersionType[request.items().length];
Translog.Location location = null;
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
BulkItemRequest item = request.items()[requestIndex];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
preVersions[requestIndex] = indexRequest.version();
preVersionTypes[requestIndex] = indexRequest.versionType();
try {
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, indexService, true);
WriteResult<IndexResponse> result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
location = locationToSync(location, result.location);
// add the response
IndexResponse indexResponse = result.response();
setResponse(item, new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse));
Expand Down Expand Up @@ -163,7 +162,9 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Clu

try {
// add the response
DeleteResponse deleteResponse = shardDeleteOperation(request, deleteRequest, indexShard).response();
final WriteResult<DeleteResponse> writeResult = shardDeleteOperation(request, deleteRequest, indexShard);
DeleteResponse deleteResponse = writeResult.response();
location = locationToSync(location, writeResult.location);
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_DELETE, deleteResponse));
} catch (Throwable e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
Expand Down Expand Up @@ -197,15 +198,18 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Clu
for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
UpdateResult updateResult;
try {
updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard, indexService);
updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard);
} catch (Throwable t) {
updateResult = new UpdateResult(null, null, false, t, null);
}
if (updateResult.success()) {
if (updateResult.writeResult != null) {
location = locationToSync(location, updateResult.writeResult.location);
}
switch (updateResult.result.operation()) {
case UPSERT:
case INDEX:
WriteResult result = updateResult.writeResult;
WriteResult<IndexResponse> result = updateResult.writeResult;
IndexRequest indexRequest = updateResult.request();
BytesReference indexSourceAsBytes = indexRequest.source();
// add the response
Expand All @@ -219,7 +223,8 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Clu
setResponse(item, new BulkItemResponse(item.id(), OP_TYPE_UPDATE, updateResponse));
break;
case DELETE:
DeleteResponse response = updateResult.writeResult.response();
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
DeleteResponse response = writeResult.response();
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, shardRequest.request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
Expand Down Expand Up @@ -297,13 +302,7 @@ protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(Clu
assert preVersionTypes[requestIndex] != null;
}

if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_bulk");
} catch (Throwable e) {
// ignore
}
}
processAfter(request, indexShard, location);
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
Expand All @@ -319,26 +318,8 @@ private void setResponse(BulkItemRequest request, BulkItemResponse response) {
}
}

static class WriteResult {

final ActionWriteResponse response;

WriteResult(ActionWriteResponse response) {
this.response = response;
}

@SuppressWarnings("unchecked")
<T extends ActionWriteResponse> T response() {
// this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica
// request and not use it
response.setShardInfo(new ActionWriteResponse.ShardInfo());
return (T) response;
}

}

private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
IndexShard indexShard, IndexService indexService, boolean processed) throws Throwable {
IndexShard indexShard, boolean processed) throws Throwable {

// validate, if routing is required, that we got routing
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type());
Expand All @@ -352,11 +333,10 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
indexRequest.process(clusterState.metaData(), mappingMd, allowIdGeneration, request.index());
}

final IndexResponse response = executeIndexRequestOnPrimary(request, indexRequest, indexShard);
return new WriteResult(response);
return executeIndexRequestOnPrimary(request, indexRequest, indexShard);
}

private WriteResult shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
private WriteResult<DeleteResponse> shardDeleteOperation(BulkShardRequest request, DeleteRequest deleteRequest, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
Expand All @@ -366,7 +346,7 @@ private WriteResult shardDeleteOperation(BulkShardRequest request, DeleteRequest
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());

DeleteResponse deleteResponse = new DeleteResponse(request.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.found());
return new WriteResult(deleteResponse);
return new WriteResult(deleteResponse, delete.getTranslogLocation());
}

static class UpdateResult {
Expand Down Expand Up @@ -422,14 +402,14 @@ <T extends ActionRequest> T request() {

}

private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard, IndexService indexService) {
private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
switch (translate.operation()) {
case UPSERT:
case INDEX:
IndexRequest indexRequest = translate.action();
try {
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, indexService, false);
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, false);
return new UpdateResult(translate, indexRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
Expand Down Expand Up @@ -466,6 +446,7 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe
protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
Translog.Location location = null;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
if (item == null || item.isIgnoreOnReplica()) {
Expand All @@ -491,6 +472,7 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
location = locationToSync(location, operation.getTranslogLocation());
} catch (Throwable e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
Expand All @@ -503,6 +485,7 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request
try {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);
location = locationToSync(location, delete.getTranslogLocation());
} catch (Throwable e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
Expand All @@ -515,13 +498,21 @@ protected void shardOperationOnReplica(ShardId shardId, BulkShardRequest request
}
}

processAfter(request, indexShard, location);
}

private void processAfter(BulkShardRequest request, IndexShard indexShard, Translog.Location location) {
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_bulk");
} catch (Throwable e) {
// ignore
}
}

if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
}

private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {
Expand All @@ -535,4 +526,15 @@ private void applyVersion(BulkItemRequest item, long version, VersionType versio
// log?
}
}

private Translog.Location locationToSync(Translog.Location current, Translog.Location next) {
/* here we are moving forward in the translog with each operation. Under the hood
* this might cross translog files which is ok since from the user perspective
* the translog is like a tape where only the highest location needs to be fsynced
* in order to sync all previous locations even though they are not in the same file.
* When the translog rolls over files the previous file is fsynced on after closing if needed.*/
assert next != null : "next operation can't be null";
assert current == null || current.compareTo(next) < 0 : "translog locations are not increasing";
return next;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.elasticsearch.action.delete;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
Expand All @@ -41,11 +43,14 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Performs the delete operation.
*/
Expand Down Expand Up @@ -140,13 +145,7 @@ protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterSt

assert request.versionType().validateVersionForWrites(request.version());

if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_delete");
} catch (Exception e) {
// ignore
}
}
processAfter(request, indexShard, delete.getTranslogLocation());

DeleteResponse response = new DeleteResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), delete.version(), delete.found());
return new Tuple<>(response, shardRequest.request);
Expand All @@ -158,19 +157,26 @@ protected void shardOperationOnReplica(ShardId shardId, DeleteRequest request) {
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);

indexShard.delete(delete);
processAfter(request, indexShard, delete.getTranslogLocation());
}

@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
return clusterService.operationRouting()
.deleteShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
}

private void processAfter(DeleteRequest request, IndexShard indexShard, Translog.Location location) {
if (request.refresh()) {
try {
indexShard.refresh("refresh_flag_delete");
} catch (Exception e) {
} catch (Throwable e) {
// ignore
}
}
}

@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
return clusterService.operationRouting()
.deleteShards(clusterService.state(), request.concreteIndex(), request.request().type(), request.request().id(), request.request().routing());
if (indexShard.getTranslogDurability() == Translog.Durabilty.REQUEST && location != null) {
indexShard.sync(location);
}
}
}
Loading

0 comments on commit 796e965

Please sign in to comment.