Skip to content

Commit

Permalink
Support batch ingestion in bulk API (#12457) (#13306)
Browse files Browse the repository at this point in the history
* [PoC][issues-12457] Support Batch Ingestion

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Rewrite batch interface and handle error and metrics

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Remove unnecessary change

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Revert some unnecessary test change

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Keep executeBulkRequest main logic untouched

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Add UT

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Add UT & yamlRest test, fix BulkRequest se/deserialization

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Add missing java docs

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Remove Writable from BatchIngestionOption

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Add more UTs

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Fix spotlesscheck

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Rename parameter name to batch_size

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Add more rest yaml tests & update rest spec

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Remove batch_ingestion_option and only use batch_size

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Throw invalid request exception for invalid batch_size

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

* Update server/src/main/java/org/opensearch/action/bulk/BulkRequest.java

Co-authored-by: Andriy Redko <drreta@gmail.com>
Signed-off-by: Liyun Xiu <chishui2@gmail.com>

* Remove version constant

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>

---------

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
Signed-off-by: Liyun Xiu <chishui2@gmail.com>
Co-authored-by: Andriy Redko <drreta@gmail.com>
  • Loading branch information
chishui and reta authored Apr 30, 2024
1 parent d7e6b9c commit 1219c56
Show file tree
Hide file tree
Showing 18 changed files with 1,486 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Gate new stats logic behind FeatureFlags.PLUGGABLE_CACHE ([#13238](https://github.com/opensearch-project/OpenSearch/pull/13238))
- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))
- [Batch Ingestion] Add `batch_size` to `_bulk` API. ([#12457](https://github.com/opensearch-project/OpenSearch/issues/12457))
- [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941))
- Batch mode for async fetching shard information in GatewayAllocator for unassigned shards ([#8746](https://github.com/opensearch-project/OpenSearch/pull/8746))
- [Remote Store] Add settings for remote path type and hash algorithm ([#13225](https://github.com/opensearch-project/OpenSearch/pull/13225))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,90 @@ teardown:
index: test_index
id: test_id3
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1"}}

---
"Test bulk API with batch enabled happy case":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
batch_size: 2
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'
- '{"index": {"_index": "test_index", "_id": "test_id3"}}'
- '{"text": "text3"}'
- '{"index": {"_index": "test_index", "_id": "test_id4"}}'
- '{"text": "text4"}'
- '{"index": {"_index": "test_index", "_id": "test_id5", "pipeline": "pipeline2"}}'
- '{"text": "text5"}'
- '{"index": {"_index": "test_index", "_id": "test_id6", "pipeline": "pipeline2"}}'
- '{"text": "text6"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id5
- match: { _source: {"text": "text5", "field2": "value2"}}

- do:
get:
index: test_index
id: test_id3
- match: { _source: { "text": "text3", "field1": "value1" } }

---
"Test bulk API with batch_size missing":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
bulk:
refresh: true
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'

- match: { errors: false }

- do:
get:
index: test_index
id: test_id1
- match: { _source: { "text": "text1", "field1": "value1" } }

- do:
get:
index: test_index
id: test_id2
- match: { _source: { "text": "text2", "field1": "value1" } }

---
"Test bulk API with invalid batch_size":
- skip:
version: " - 2.13.99"
reason: "Added in 2.14.0"

- do:
catch: bad_request
bulk:
refresh: true
batch_size: -1
pipeline: "pipeline1"
body:
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
- '{"text": "text1"}'
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
- '{"text": "text2"}'
4 changes: 4 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
"require_alias": {
"type": "boolean",
"description": "Sets require_alias for all incoming documents. Defaults to unset (false)"
},
"batch_size": {
"type": "int",
"description": "Sets the batch size"
}
},
"body":{
Expand Down
30 changes: 29 additions & 1 deletion server/src/main/java/org/opensearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.Version;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.CompositeIndicesRequest;
Expand Down Expand Up @@ -80,7 +81,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkRequest.class);

private static final int REQUEST_OVERHEAD = 50;

/**
* Requests that are part of this request. It is only possible to add things that are both {@link ActionRequest}s and
* {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare
Expand All @@ -96,6 +96,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
private String globalRouting;
private String globalIndex;
private Boolean globalRequireAlias;
private int batchSize = 1;

private long sizeInBytes = 0;

Expand All @@ -107,6 +108,9 @@ public BulkRequest(StreamInput in) throws IOException {
requests.addAll(in.readList(i -> DocWriteRequest.readDocumentRequest(null, i)));
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = in.readTimeValue();
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
batchSize = in.readInt();
}
}

public BulkRequest(@Nullable String globalIndex) {
Expand Down Expand Up @@ -346,6 +350,27 @@ public final BulkRequest timeout(TimeValue timeout) {
return this;
}

/**
* Set batch size
* @param size batch size from input
* @return {@link BulkRequest}
*/
public BulkRequest batchSize(int size) {
if (size < 1) {
throw new IllegalArgumentException("batch_size must be greater than 0");
}
this.batchSize = size;
return this;
}

/**
* Get batch size
* @return batch size
*/
public int batchSize() {
return this.batchSize;
}

/**
* Note for internal callers (NOT high level rest client),
* the global parameter setting is ignored when used with:
Expand Down Expand Up @@ -453,6 +478,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(requests, DocWriteRequest::writeDocumentRequest);
refreshPolicy.writeTo(out);
out.writeTimeValue(timeout);
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeInt(batchSize);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,8 @@ public boolean isForceExecution() {
}
},
bulkRequestModifier::markItemAsDropped,
executorName
executorName,
original
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public void before() {
current.incrementAndGet();
}

/**
* Invoke before the given operation begins in multiple items at the same time.
* @param n number of items
*/
public void beforeN(int n) {
current.addAndGet(n);
}

/**
* Invoked upon completion (success or failure) of the given operation
* @param currentTime elapsed time of the operation
Expand All @@ -46,13 +54,35 @@ public void after(long currentTime) {
time.inc(currentTime);
}

/**
* Invoked upon completion (success or failure) of the given operation for multiple items.
* @param n number of items completed
* @param currentTime elapsed time of the operation
*/
public void afterN(int n, long currentTime) {
current.addAndGet(-n);
for (int i = 0; i < n; ++i) {
time.inc(currentTime);
}
}

/**
* Invoked upon failure of the operation.
*/
public void failed() {
failed.inc();
}

/**
* Invoked upon failure of the operation on multiple items.
* @param n number of items on operation.
*/
public void failedN(int n) {
for (int i = 0; i < n; ++i) {
failed.inc();
}
}

public void add(OperationMetrics other) {
// Don't try copying over current, since in-flight requests will be linked to the existing metrics instance.
failed.inc(other.failed.count());
Expand Down
113 changes: 113 additions & 0 deletions server/src/main/java/org/opensearch/ingest/CompoundProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -150,6 +153,108 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
innerExecute(0, ingestDocument, handler);
}

@Override
public void batchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) {
innerBatchExecute(0, ingestDocumentWrappers, handler);
}

/**
* Internal logic to process documents with current processor.
*
* @param currentProcessor index of processor to process batched documents
* @param ingestDocumentWrappers batched documents to be processed
* @param handler callback function
*/
void innerBatchExecute(
int currentProcessor,
List<IngestDocumentWrapper> ingestDocumentWrappers,
Consumer<List<IngestDocumentWrapper>> handler
) {
if (currentProcessor == processorsWithMetrics.size()) {
handler.accept(ingestDocumentWrappers);
return;
}
Tuple<Processor, OperationMetrics> processorWithMetric = processorsWithMetrics.get(currentProcessor);
final Processor processor = processorWithMetric.v1();
final OperationMetrics metric = processorWithMetric.v2();
final long startTimeInNanos = relativeTimeProvider.getAsLong();
int size = ingestDocumentWrappers.size();
metric.beforeN(size);
// Use synchronization to ensure batches are processed by processors in sequential order
AtomicInteger counter = new AtomicInteger(size);
List<IngestDocumentWrapper> allResults = Collections.synchronizedList(new ArrayList<>());
Map<Integer, IngestDocumentWrapper> slotToWrapperMap = createSlotIngestDocumentWrapperMap(ingestDocumentWrappers);
processor.batchExecute(ingestDocumentWrappers, results -> {
if (results.isEmpty()) return;
allResults.addAll(results);
// counter equals to 0 means all documents are processed and called back.
if (counter.addAndGet(-results.size()) == 0) {
long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos);
metric.afterN(allResults.size(), ingestTimeInMillis);

List<IngestDocumentWrapper> documentsDropped = new ArrayList<>();
List<IngestDocumentWrapper> documentsWithException = new ArrayList<>();
List<IngestDocumentWrapper> documentsToContinue = new ArrayList<>();
int totalFailed = 0;
// iterate all results to categorize them to: to continue, to drop, with exception
for (IngestDocumentWrapper resultDocumentWrapper : allResults) {
IngestDocumentWrapper originalDocumentWrapper = slotToWrapperMap.get(resultDocumentWrapper.getSlot());
if (resultDocumentWrapper.getException() != null) {
++totalFailed;
if (ignoreFailure) {
documentsToContinue.add(originalDocumentWrapper);
} else {
IngestProcessorException compoundProcessorException = newCompoundProcessorException(
resultDocumentWrapper.getException(),
processor,
originalDocumentWrapper.getIngestDocument()
);
documentsWithException.add(
new IngestDocumentWrapper(
resultDocumentWrapper.getSlot(),
originalDocumentWrapper.getIngestDocument(),
compoundProcessorException
)
);
}
} else {
if (resultDocumentWrapper.getIngestDocument() == null) {
documentsDropped.add(resultDocumentWrapper);
} else {
documentsToContinue.add(resultDocumentWrapper);
}
}
}
if (totalFailed > 0) {
metric.failedN(totalFailed);
}
if (!documentsDropped.isEmpty()) {
handler.accept(documentsDropped);
}
if (!documentsToContinue.isEmpty()) {
innerBatchExecute(currentProcessor + 1, documentsToContinue, handler);
}
if (!documentsWithException.isEmpty()) {
if (onFailureProcessors.isEmpty()) {
handler.accept(documentsWithException);
} else {
documentsWithException.forEach(
doc -> executeOnFailureAsync(
0,
doc.getIngestDocument(),
(IngestProcessorException) doc.getException(),
(result, ex) -> {
handler.accept(Collections.singletonList(new IngestDocumentWrapper(doc.getSlot(), result, ex)));
}
)
);
}
}
}
assert counter.get() >= 0;
});
}

void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
if (currentProcessor == processorsWithMetrics.size()) {
handler.accept(ingestDocument, null);
Expand Down Expand Up @@ -266,4 +371,12 @@ static IngestProcessorException newCompoundProcessorException(Exception e, Proce
return exception;
}

private Map<Integer, IngestDocumentWrapper> createSlotIngestDocumentWrapperMap(List<IngestDocumentWrapper> ingestDocumentWrappers) {
Map<Integer, IngestDocumentWrapper> slotIngestDocumentWrapperMap = new HashMap<>();
for (IngestDocumentWrapper ingestDocumentWrapper : ingestDocumentWrappers) {
slotIngestDocumentWrapperMap.put(ingestDocumentWrapper.getSlot(), ingestDocumentWrapper);
}
return slotIngestDocumentWrapperMap;
}

}
Loading

0 comments on commit 1219c56

Please sign in to comment.