From 71f1fabe149fd0777edf44502ace4a8f0911feeb Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Tue, 9 Jan 2024 22:54:02 +0530 Subject: [PATCH 1/4] Add support for conditional Transient header propagation (#11490) * Clear transient header from system context Signed-off-by: Gagan Juneja * Clear transient header from system context Signed-off-by: Gagan Juneja * Adds changelog Signed-off-by: Gagan Juneja * Update CHANGELOG.md Co-authored-by: Andriy Redko Signed-off-by: Gagan Juneja * Adds unit tests Signed-off-by: Gagan Juneja * Refactor code Signed-off-by: Gagan Juneja * Refactor code Signed-off-by: Gagan Juneja * Refactor code Signed-off-by: Gagan Juneja * Supress warning Signed-off-by: Gagan Juneja * Refactor code Signed-off-by: Gagan Juneja --------- Signed-off-by: Gagan Juneja Signed-off-by: Gagan Juneja Co-authored-by: Gagan Juneja Co-authored-by: Andriy Redko --- CHANGELOG.md | 1 + .../common/util/concurrent/ThreadContext.java | 24 ++++--- .../ThreadContextStatePropagator.java | 30 ++++++++- .../TaskThreadContextStatePropagator.java | 13 ++++ ...hreadContextBasedTracerContextStorage.java | 19 +++++- .../transport/TransportService.java | 17 ++--- .../util/concurrent/ThreadContextTests.java | 67 +++++++++++++++++++ ...TaskThreadContextStatePropagatorTests.java | 34 ++++++++++ ...ContextBasedTracerContextStorageTests.java | 16 +++++ 9 files changed, 193 insertions(+), 28 deletions(-) create mode 100644 server/src/test/java/org/opensearch/tasks/TaskThreadContextStatePropagatorTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 45ab53b0d4246..6da048c65d8d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -214,6 +214,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix template setting override for replication type ([#11417](https://github.com/opensearch-project/OpenSearch/pull/11417)) - Fix Automatic addition of protocol broken in #11512 ([#11609](https://github.com/opensearch-project/OpenSearch/pull/11609)) - Fix issue when calling Delete PIT endpoint and no PITs exist ([#11711](https://github.com/opensearch-project/OpenSearch/pull/11711)) +- Fix tracing context propagation for local transport instrumentation ([#11490](https://github.com/opensearch-project/OpenSearch/pull/11490)) ### Security diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index 3da21a6777456..6580b0e0085ef 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -161,7 +161,7 @@ public StoredContext stashContext() { ); } - final Map transientHeaders = propagateTransients(context.transientHeaders); + final Map transientHeaders = propagateTransients(context.transientHeaders, context.isSystemContext); if (!transientHeaders.isEmpty()) { threadContextStruct = threadContextStruct.putTransient(transientHeaders); } @@ -182,7 +182,7 @@ public StoredContext stashContext() { public Writeable captureAsWriteable() { final ThreadContextStruct context = threadLocal.get(); return out -> { - final Map propagatedHeaders = propagateHeaders(context.transientHeaders); + final Map propagatedHeaders = propagateHeaders(context.transientHeaders, context.isSystemContext); context.writeTo(out, defaultHeader, propagatedHeaders); }; } @@ -245,7 +245,7 @@ public StoredContext newStoredContext(boolean preserveResponseHeaders, Collectio final Map newTransientHeaders = new HashMap<>(originalContext.transientHeaders); boolean transientHeadersModified = false; - final Map transientHeaders = propagateTransients(originalContext.transientHeaders); + final Map transientHeaders = propagateTransients(originalContext.transientHeaders, originalContext.isSystemContext); if (!transientHeaders.isEmpty()) { newTransientHeaders.putAll(transientHeaders); transientHeadersModified = true; @@ -322,7 +322,7 @@ public Supplier wrapRestorable(StoredContext storedContext) { @Override public void writeTo(StreamOutput out) throws IOException { final ThreadContextStruct context = threadLocal.get(); - final Map propagatedHeaders = propagateHeaders(context.transientHeaders); + final Map propagatedHeaders = propagateHeaders(context.transientHeaders, context.isSystemContext); context.writeTo(out, defaultHeader, propagatedHeaders); } @@ -534,7 +534,7 @@ boolean isDefaultContext() { * by the system itself rather than by a user action. */ public void markAsSystemContext() { - threadLocal.set(threadLocal.get().setSystemContext()); + threadLocal.set(threadLocal.get().setSystemContext(propagators)); } /** @@ -573,15 +573,15 @@ public static Map buildDefaultHeaders(Settings settings) { } } - private Map propagateTransients(Map source) { + private Map propagateTransients(Map source, boolean isSystemContext) { final Map transients = new HashMap<>(); - propagators.forEach(p -> transients.putAll(p.transients(source))); + propagators.forEach(p -> transients.putAll(p.transients(source, isSystemContext))); return transients; } - private Map propagateHeaders(Map source) { + private Map propagateHeaders(Map source, boolean isSystemContext) { final Map headers = new HashMap<>(); - propagators.forEach(p -> headers.putAll(p.headers(source))); + propagators.forEach(p -> headers.putAll(p.headers(source, isSystemContext))); return headers; } @@ -603,11 +603,13 @@ private static final class ThreadContextStruct { // saving current warning headers' size not to recalculate the size with every new warning header private final long warningHeadersSize; - private ThreadContextStruct setSystemContext() { + private ThreadContextStruct setSystemContext(final List propagators) { if (isSystemContext) { return this; } - return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, persistentHeaders, true); + final Map transients = new HashMap<>(); + propagators.forEach(p -> transients.putAll(p.transients(transientHeaders, true))); + return new ThreadContextStruct(requestHeaders, responseHeaders, transients, persistentHeaders, true); } private ThreadContextStruct( diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java index dac70b0e8124e..e8c12ae13d5eb 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContextStatePropagator.java @@ -22,15 +22,41 @@ public interface ThreadContextStatePropagator { /** * Returns the list of transient headers that needs to be propagated from current context to new thread context. - * @param source current context transient headers + * + * @param source current context transient headers * @return the list of transient headers that needs to be propagated from current context to new thread context */ + @Deprecated(since = "2.12.0", forRemoval = true) Map transients(Map source); + /** + * Returns the list of transient headers that needs to be propagated from current context to new thread context. + * + * @param source current context transient headers + * @param isSystemContext if the propagation is for system context. + * @return the list of transient headers that needs to be propagated from current context to new thread context + */ + default Map transients(Map source, boolean isSystemContext) { + return transients(source); + }; + /** * Returns the list of request headers that needs to be propagated from current context to request. - * @param source current context headers + * + * @param source current context headers * @return the list of request headers that needs to be propagated from current context to request */ + @Deprecated(since = "2.12.0", forRemoval = true) Map headers(Map source); + + /** + * Returns the list of request headers that needs to be propagated from current context to request. + * + * @param source current context headers + * @param isSystemContext if the propagation is for system context. + * @return the list of request headers that needs to be propagated from current context to request + */ + default Map headers(Map source, boolean isSystemContext) { + return headers(source); + } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java index ed111b34f048f..99559e45aaaee 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java +++ b/server/src/main/java/org/opensearch/tasks/TaskThreadContextStatePropagator.java @@ -20,7 +20,9 @@ * Propagates TASK_ID across thread contexts */ public class TaskThreadContextStatePropagator implements ThreadContextStatePropagator { + @Override + @SuppressWarnings("removal") public Map transients(Map source) { final Map transients = new HashMap<>(); @@ -32,7 +34,18 @@ public Map transients(Map source) { } @Override + public Map transients(Map source, boolean isSystemContext) { + return transients(source); + } + + @Override + @SuppressWarnings("removal") public Map headers(Map source) { return Collections.emptyMap(); } + + @Override + public Map headers(Map source, boolean isSystemContext) { + return headers(source); + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 863f56d9fbe94..908164d1935a7 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -12,6 +12,7 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.concurrent.ThreadContextStatePropagator; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -50,20 +51,29 @@ public void put(String key, Span span) { } @Override + @SuppressWarnings("removal") public Map transients(Map source) { final Map transients = new HashMap<>(); - if (source.containsKey(CURRENT_SPAN)) { final SpanReference current = (SpanReference) source.get(CURRENT_SPAN); if (current != null) { transients.put(CURRENT_SPAN, new SpanReference(current.getSpan())); } } - return transients; } @Override + public Map transients(Map source, boolean isSystemContext) { + if (isSystemContext == true) { + return Collections.emptyMap(); + } else { + return transients(source); + } + } + + @Override + @SuppressWarnings("removal") public Map headers(Map source) { final Map headers = new HashMap<>(); @@ -77,6 +87,11 @@ public Map headers(Map source) { return headers; } + @Override + public Map headers(Map source, boolean isSystemContext) { + return headers(source); + } + Span getCurrentSpan(String key) { SpanReference currentSpanRef = threadContext.getTransient(key); return (currentSpanRef == null) ? null : currentSpanRef.getSpan(); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index a1697b1898eeb..d50266d8c9e4a 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -868,19 +868,10 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - if (connection == localNodeConnection) { - // See please https://github.com/opensearch-project/OpenSearch/issues/10291 - sendRequestAsync(connection, action, request, options, handler); - } else { - final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create( - handler, - span, - tracer - ); - sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler); - } + final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer); + sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler); } } diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java index a0531c76bf897..10669ca1a805b 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java @@ -44,6 +44,8 @@ import java.util.Map; import java.util.function.Supplier; +import org.mockito.Mockito; + import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -740,6 +742,71 @@ public void testMarkAsSystemContext() throws IOException { assertFalse(threadContext.isSystemContext()); } + public void testSystemContextWithPropagator() { + Settings build = Settings.builder().put("request.headers.default", "1").build(); + Map transientHeaderMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map transientHeaderTransformedMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map headerMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map headerTransformedMap = Collections.singletonMap("test_transient_propagation_key", "test"); + ThreadContext threadContext = new ThreadContext(build); + ThreadContextStatePropagator mockPropagator = Mockito.mock(ThreadContextStatePropagator.class); + Mockito.when(mockPropagator.transients(transientHeaderMap, true)).thenReturn(Collections.emptyMap()); + Mockito.when(mockPropagator.transients(transientHeaderMap, false)).thenReturn(transientHeaderTransformedMap); + + Mockito.when(mockPropagator.headers(headerMap, true)).thenReturn(headerTransformedMap); + Mockito.when(mockPropagator.headers(headerMap, false)).thenReturn(headerTransformedMap); + threadContext.registerThreadContextStatePropagator(mockPropagator); + threadContext.putHeader("foo", "bar"); + threadContext.putTransient("test_transient_propagation_key", 1); + assertEquals(Integer.valueOf(1), threadContext.getTransient("test_transient_propagation_key")); + assertEquals("bar", threadContext.getHeader("foo")); + try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { + threadContext.markAsSystemContext(); + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("test_transient_propagation_key")); + assertEquals("1", threadContext.getHeader("default")); + } + + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals(Integer.valueOf(1), threadContext.getTransient("test_transient_propagation_key")); + assertEquals("1", threadContext.getHeader("default")); + } + + public void testSerializeSystemContext() throws IOException { + Settings build = Settings.builder().put("request.headers.default", "1").build(); + Map transientHeaderMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map transientHeaderTransformedMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map headerMap = Collections.singletonMap("test_transient_propagation_key", "test"); + Map headerTransformedMap = Collections.singletonMap("test_transient_propagation_key", "test"); + ThreadContext threadContext = new ThreadContext(build); + ThreadContextStatePropagator mockPropagator = Mockito.mock(ThreadContextStatePropagator.class); + Mockito.when(mockPropagator.transients(transientHeaderMap, true)).thenReturn(Collections.emptyMap()); + Mockito.when(mockPropagator.transients(transientHeaderMap, false)).thenReturn(transientHeaderTransformedMap); + + Mockito.when(mockPropagator.headers(headerMap, true)).thenReturn(headerTransformedMap); + Mockito.when(mockPropagator.headers(headerMap, false)).thenReturn(headerTransformedMap); + threadContext.registerThreadContextStatePropagator(mockPropagator); + threadContext.putHeader("foo", "bar"); + threadContext.putTransient("test_transient_propagation_key", "test"); + BytesStreamOutput out = new BytesStreamOutput(); + BytesStreamOutput outFromSystemContext = new BytesStreamOutput(); + threadContext.writeTo(out); + try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { + assertEquals("test", threadContext.getTransient("test_transient_propagation_key")); + threadContext.markAsSystemContext(); + threadContext.writeTo(outFromSystemContext); + assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getTransient("test_transient_propagation_key")); + threadContext.readHeaders(outFromSystemContext.bytes().streamInput()); + assertNull(threadContext.getHeader("test_transient_propagation_key")); + } + assertEquals("test", threadContext.getTransient("test_transient_propagation_key")); + threadContext.readHeaders(out.bytes().streamInput()); + assertEquals("bar", threadContext.getHeader("foo")); + assertEquals("test", threadContext.getHeader("test_transient_propagation_key")); + assertEquals("1", threadContext.getHeader("default")); + } + public void testPutHeaders() { Settings build = Settings.builder().put("request.headers.default", "1").build(); ThreadContext threadContext = new ThreadContext(build); diff --git a/server/src/test/java/org/opensearch/tasks/TaskThreadContextStatePropagatorTests.java b/server/src/test/java/org/opensearch/tasks/TaskThreadContextStatePropagatorTests.java new file mode 100644 index 0000000000000..bfa0d566aabd7 --- /dev/null +++ b/server/src/test/java/org/opensearch/tasks/TaskThreadContextStatePropagatorTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID; + +public class TaskThreadContextStatePropagatorTests extends OpenSearchTestCase { + private final TaskThreadContextStatePropagator taskThreadContextStatePropagator = new TaskThreadContextStatePropagator(); + + public void testTransient() { + Map transientHeader = new HashMap<>(); + transientHeader.put(TASK_ID, "t_1"); + Map transientPropagatedHeader = taskThreadContextStatePropagator.transients(transientHeader, false); + assertEquals("t_1", transientPropagatedHeader.get(TASK_ID)); + } + + public void testTransientForSystemContext() { + Map transientHeader = new HashMap<>(); + transientHeader.put(TASK_ID, "t_1"); + Map transientPropagatedHeader = taskThreadContextStatePropagator.transients(transientHeader, true); + assertEquals("t_1", transientPropagatedHeader.get(TASK_ID)); + } +} diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java index ee816aa5f596d..bf11bcaf39a96 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java @@ -252,4 +252,20 @@ public void run() { assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); } + + public void testSpanNotPropagatedToChildSystemThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + try (StoredContext ignored = threadContext.stashContext()) { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + threadContext.markAsSystemContext(); + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } } From 6e2e72ba310409f7009e3801b1fb49c9823f5af1 Mon Sep 17 00:00:00 2001 From: Vikas Bansal <43470111+vikasvb90@users.noreply.github.com> Date: Tue, 9 Jan 2024 23:22:00 +0530 Subject: [PATCH 2/4] Fixed ingest pipeline script issue (#11725) Signed-off-by: vikasvb90 --- .../ingest/common/ScriptProcessor.java | 7 +- .../ingest/common/ScriptProcessorTests.java | 12 +++ .../test/ingest/190_script_processor.yml | 76 +++++++++++++++++++ .../rest-api-spec/test/ingest/90_simulate.yml | 45 +++++++++++ .../org/opensearch/ingest/IngestDocument.java | 2 + .../ingest/IngestDocumentTests.java | 6 ++ 6 files changed, 146 insertions(+), 2 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/ScriptProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/ScriptProcessor.java index cc8889af27621..d1b4a0961b7bd 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/ScriptProcessor.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/ScriptProcessor.java @@ -102,8 +102,11 @@ public IngestDocument execute(IngestDocument document) { } else { ingestScript = precompiledIngestScript; } - ingestScript.execute(document.getSourceAndMetadata()); - CollectionUtils.ensureNoSelfReferences(document.getSourceAndMetadata(), "ingest script"); + IngestDocument mutableDocument = new IngestDocument(document); + ingestScript.execute(mutableDocument.getSourceAndMetadata()); + CollectionUtils.ensureNoSelfReferences(mutableDocument.getSourceAndMetadata(), "ingest script"); + document.getSourceAndMetadata().clear(); + document.getSourceAndMetadata().putAll(mutableDocument.getSourceAndMetadata()); return document; } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ScriptProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ScriptProcessorTests.java index 96d9be75c4ab7..e900458e361ce 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ScriptProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ScriptProcessorTests.java @@ -105,4 +105,16 @@ private void assertIngestDocument(IngestDocument ingestDocument) { int bytesTotal = ingestDocument.getFieldValue("bytes_in", Integer.class) + ingestDocument.getFieldValue("bytes_out", Integer.class); assertThat(ingestDocument.getSourceAndMetadata().get("bytes_total"), is(bytesTotal)); } + + public void testScriptingWithSelfReferencingSourceMetadata() { + ScriptProcessor processor = new ScriptProcessor(randomAlphaOfLength(10), null, script, null, scriptService); + IngestDocument originalIngestDocument = randomDocument(); + String index = originalIngestDocument.getSourceAndMetadata().get(IngestDocument.Metadata.INDEX.getFieldName()).toString(); + String id = originalIngestDocument.getSourceAndMetadata().get(IngestDocument.Metadata.ID.getFieldName()).toString(); + Map sourceMetadata = originalIngestDocument.getSourceAndMetadata(); + originalIngestDocument.getSourceAndMetadata().put("_source", sourceMetadata); + IngestDocument ingestDocument = new IngestDocument(index, id, null, null, null, originalIngestDocument.getSourceAndMetadata()); + expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); + } + } diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml index 3230fb37b43f7..a66f02d6b6a6d 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/190_script_processor.yml @@ -202,3 +202,79 @@ teardown: id: 1 - match: { _source.source_field: "foo%20bar" } - match: { _source.target_field: "foo bar" } + +--- +"Test self referencing source with ignore failure": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "script" : { + "lang": "painless", + "source" : "ctx.foo['foo']=ctx.foo;ctx['test-field']='test-value'", + "ignore_failure": true + } + }, + { + "script" : { + "lang": "painless", + "source" : "ctx.target_field = Processors.uppercase(ctx.source_field)" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {source_field: "fooBar", foo: {foo: "bar"}} + + - do: + get: + index: test + id: 1 + - match: { _source.source_field: "fooBar" } + - match: { _source.target_field: "FOOBAR"} + - match: { _source.test-field: null} + +--- +"Test self referencing source without ignoring failure": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "script" : { + "lang": "painless", + "source" : "ctx.foo['foo']=ctx.foo;ctx['test-field']='test-value'" + } + }, + { + "script" : { + "lang": "painless", + "source" : "ctx.target_field = Processors.uppercase(ctx.source_field)" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: bad_request + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: {source_field: "fooBar", foo: {foo: "bar"}} + - match: { error.root_cause.0.type: "illegal_argument_exception" } + - match: { error.root_cause.0.reason: "Iterable object is self-referencing itself (ingest script)" } diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml index 7c073739f6a1f..edd649a310d42 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -1113,3 +1113,48 @@ teardown: - match: { status: 400 } - match: { error.root_cause.0.type: "illegal_argument_exception" } - match: { error.root_cause.0.reason: "Failed to parse parameter [_if_primary_term], only int or long is accepted" } + +--- +"Test simulate with pipeline with ignore failure and cyclic field assignments in script": + - do: + ingest.simulate: + verbose: true + body: > + { + "pipeline": { + "description": "_description", + "processors": [ + { + "script" : { + "ignore_failure" : true, + "lang": "painless", + "source": "ctx.foo['foo']=ctx.foo;ctx.tag='recursive'" + } + }, + { + "script" : { + "lang": "painless", + "source" : "ctx.target_field = Processors.uppercase(ctx.foo.foo)" + } + } + ] + }, + "docs": [ + { + "_source": { + "foo": { + "foo": "bar" + } + } + } + ] + } + - length: { docs: 1 } + - length: { docs.0.processor_results: 2 } + - match: { docs.0.processor_results.0.status: "error_ignored" } + - match: { docs.0.processor_results.0.ignored_error.error.type: "illegal_argument_exception" } + - match: { docs.0.processor_results.0.doc._source.tag: null } + - match: { docs.0.processor_results.1.doc._source.target_field: "BAR" } + - match: { docs.0.processor_results.1.doc._source.foo.foo: "bar" } + - match: { docs.0.processor_results.1.status: "success" } + - match: { docs.0.processor_results.1.processor_type: "script" } diff --git a/server/src/main/java/org/opensearch/ingest/IngestDocument.java b/server/src/main/java/org/opensearch/ingest/IngestDocument.java index e0de0a9488ad9..10e9e64db561e 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/opensearch/ingest/IngestDocument.java @@ -33,6 +33,7 @@ package org.opensearch.ingest; import org.opensearch.core.common.Strings; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.index.VersionType; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.IndexFieldMapper; @@ -752,6 +753,7 @@ public Map getSourceAndMetadata() { @SuppressWarnings("unchecked") public static Map deepCopyMap(Map source) { + CollectionUtils.ensureNoSelfReferences(source, "IngestDocument: Self reference present in object."); return (Map) deepCopy(source); } diff --git a/server/src/test/java/org/opensearch/ingest/IngestDocumentTests.java b/server/src/test/java/org/opensearch/ingest/IngestDocumentTests.java index 8358dadf9cc3a..be035bc6ef7ea 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestDocumentTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestDocumentTests.java @@ -95,6 +95,12 @@ public void setTestIngestDocument() { ingestDocument = new IngestDocument("index", "id", null, null, null, document); } + public void testSelfReferencingSource() { + Map value = new HashMap<>(); + value.put("foo", value); + expectThrows(IllegalArgumentException.class, () -> IngestDocument.deepCopyMap(value)); + } + public void testSimpleGetFieldValue() { assertThat(ingestDocument.getFieldValue("foo", String.class), equalTo("bar")); assertThat(ingestDocument.getFieldValue("int", Integer.class), equalTo(123)); From ebda9639f4ca98a2181bdcb2c43f82224b1d2a34 Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Tue, 9 Jan 2024 10:57:52 -0800 Subject: [PATCH 3/4] [Tiered caching] Framework changes (#10753) * [Tiered caching] Framework changes Signed-off-by: Sagar Upadhyaya * Added javadoc for new files/packages Signed-off-by: Sagar Upadhyaya * Added changelog Signed-off-by: Sagar Upadhyaya * Fixing javadoc warnings Signed-off-by: Sagar Upadhyaya * Addressing comments Signed-off-by: Sagar Upadhyaya * Addressing additional minor comments Signed-off-by: Sagar Upadhyaya * Moving non null check to builder for OS onHeapCache Signed-off-by: Sagar Upadhyaya * Adding package-info for new packages Signed-off-by: Sagar Upadhyaya * Removing service and adding different cache interfaces along with event listener support Signed-off-by: Sagar Upadhyaya * Fixing gradle missingDoc issue Signed-off-by: Sagar Upadhyaya * Changing listener logic, removing tiered cache integration with IRC Signed-off-by: Sagar Upadhyaya * Adding opensearch.internal tag for LoadAwareCacheLoader Signed-off-by: Sagar Upadhyaya * Fixing thread safety issue Signed-off-by: Sagar Upadhyaya * Remove compute function and event listener logic change for TieredCache Signed-off-by: Sagar Upadhyaya * Making Cache.compute function private Signed-off-by: Sagar Upadhyaya * Adding javadoc and more test for cache.put Signed-off-by: Sagar Upadhyaya * Adding write locks to refresh API as well Signed-off-by: Sagar Upadhyaya * Removing unwanted EventType class and refactoring one UT Signed-off-by: Sagar Upadhyaya * Removing TieredCache interface Signed-off-by: Sagar Upadhyaya --------- Signed-off-by: Sagar Upadhyaya Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> --- CHANGELOG.md | 2 + .../org/opensearch/common/cache/Cache.java | 112 +-- .../org/opensearch/common/cache/ICache.java | 34 + .../common/cache/LoadAwareCacheLoader.java | 20 + .../cache/store/OpenSearchOnHeapCache.java | 128 +++ .../common/cache/store/StoreAwareCache.java | 23 + .../StoreAwareCacheRemovalNotification.java | 33 + .../cache/store/StoreAwareCacheValue.java | 35 + .../builders/StoreAwareCacheBuilder.java | 73 ++ .../cache/store/builders/package-info.java | 12 + .../cache/store/enums/CacheStoreType.java | 20 + .../cache/store/enums/package-info.java | 10 + .../StoreAwareCacheEventListener.java | 30 + .../cache/store/listeners/package-info.java | 10 + .../common/cache/store/package-info.java | 10 + .../cache/tier/TieredSpilloverCache.java | 268 ++++++ .../common/cache/tier/package-info.java | 10 + .../indices/IndicesRequestCache.java | 6 +- .../cache/tier/TieredSpilloverCacheTests.java | 786 ++++++++++++++++++ 19 files changed, 1567 insertions(+), 55 deletions(-) create mode 100644 server/src/main/java/org/opensearch/common/cache/ICache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/LoadAwareCacheLoader.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/StoreAwareCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheRemovalNotification.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/builders/StoreAwareCacheBuilder.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/builders/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/enums/CacheStoreType.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/enums/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/listeners/StoreAwareCacheEventListener.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/listeners/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/store/package-info.java create mode 100644 server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java create mode 100644 server/src/main/java/org/opensearch/common/cache/tier/package-info.java create mode 100644 server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 6da048c65d8d0..d23a439c35d08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) +- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753] + (https://github.com/opensearch-project/OpenSearch/pull/10753)) - [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853)) - Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247)) - Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248)) diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java index 30e7c014a2ec0..d8aa4e93735e6 100644 --- a/server/src/main/java/org/opensearch/common/cache/Cache.java +++ b/server/src/main/java/org/opensearch/common/cache/Cache.java @@ -424,68 +424,74 @@ public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionExcept } }); if (value == null) { - // we need to synchronize loading of a value for a given key; however, holding the segment lock while - // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we - // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding - // the segment lock; to do this, we atomically put a future in the map that can load the value, and then - // get the value from this future on the thread that won the race to place the future into the segment map - CacheSegment segment = getCacheSegment(key); - CompletableFuture> future; - CompletableFuture> completableFuture = new CompletableFuture<>(); + value = compute(key, loader); + } + return value; + } - try (ReleasableLock ignored = segment.writeLock.acquire()) { - future = segment.map.putIfAbsent(key, completableFuture); - } + private V compute(K key, CacheLoader loader) throws ExecutionException { + long now = now(); + // we need to synchronize loading of a value for a given key; however, holding the segment lock while + // invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we + // need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding + // the segment lock; to do this, we atomically put a future in the map that can load the value, and then + // get the value from this future on the thread that won the race to place the future into the segment map + CacheSegment segment = getCacheSegment(key); + CompletableFuture> future; + CompletableFuture> completableFuture = new CompletableFuture<>(); - BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { - if (ok != null) { - try (ReleasableLock ignored = lruLock.acquire()) { - promote(ok, now); - } - return ok.value; - } else { - try (ReleasableLock ignored = segment.writeLock.acquire()) { - CompletableFuture> sanity = segment.map.get(key); - if (sanity != null && sanity.isCompletedExceptionally()) { - segment.map.remove(key); - } - } - return null; - } - }; + try (ReleasableLock ignored = segment.writeLock.acquire()) { + future = segment.map.putIfAbsent(key, completableFuture); + } - CompletableFuture completableValue; - if (future == null) { - future = completableFuture; - completableValue = future.handle(handler); - V loaded; - try { - loaded = loader.load(key); - } catch (Exception e) { - future.completeExceptionally(e); - throw new ExecutionException(e); - } - if (loaded == null) { - NullPointerException npe = new NullPointerException("loader returned a null value"); - future.completeExceptionally(npe); - throw new ExecutionException(npe); - } else { - future.complete(new Entry<>(key, loaded, now)); + BiFunction, Throwable, ? extends V> handler = (ok, ex) -> { + if (ok != null) { + try (ReleasableLock ignored = lruLock.acquire()) { + promote(ok, now); } + return ok.value; } else { - completableValue = future.handle(handler); + try (ReleasableLock ignored = segment.writeLock.acquire()) { + CompletableFuture> sanity = segment.map.get(key); + if (sanity != null && sanity.isCompletedExceptionally()) { + segment.map.remove(key); + } + } + return null; } + }; + CompletableFuture completableValue; + if (future == null) { + future = completableFuture; + completableValue = future.handle(handler); + V loaded; try { - value = completableValue.get(); - // check to ensure the future hasn't been completed with an exception - if (future.isCompletedExceptionally()) { - future.get(); // call get to force the exception to be thrown for other concurrent callers - throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); - } - } catch (InterruptedException e) { - throw new IllegalStateException(e); + loaded = loader.load(key); + } catch (Exception e) { + future.completeExceptionally(e); + throw new ExecutionException(e); } + if (loaded == null) { + NullPointerException npe = new NullPointerException("loader returned a null value"); + future.completeExceptionally(npe); + throw new ExecutionException(npe); + } else { + future.complete(new Entry<>(key, loaded, now)); + } + } else { + completableValue = future.handle(handler); + } + V value; + try { + value = completableValue.get(); + // check to ensure the future hasn't been completed with an exception + if (future.isCompletedExceptionally()) { + future.get(); // call get to force the exception to be thrown for other concurrent callers + throw new IllegalStateException("the future was completed exceptionally but no exception was thrown"); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); } return value; } diff --git a/server/src/main/java/org/opensearch/common/cache/ICache.java b/server/src/main/java/org/opensearch/common/cache/ICache.java new file mode 100644 index 0000000000000..c6ea5fca1a8fe --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/ICache.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +/** + * Represents a cache interface. + * @param Type of key. + * @param Type of value. + * + * @opensearch.experimental + */ +public interface ICache { + V get(K key); + + void put(K key, V value); + + V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception; + + void invalidate(K key); + + void invalidateAll(); + + Iterable keys(); + + long count(); + + void refresh(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/LoadAwareCacheLoader.java b/server/src/main/java/org/opensearch/common/cache/LoadAwareCacheLoader.java new file mode 100644 index 0000000000000..57aa4aa39c782 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/LoadAwareCacheLoader.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache; + +/** + * Extends a cache loader with awareness of whether the data is loaded or not. + * @param Type of key. + * @param Type of value. + * + * @opensearch.internal + */ +public interface LoadAwareCacheLoader extends CacheLoader { + boolean isLoaded(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java new file mode 100644 index 0000000000000..c497c8dbb7ea9 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/OpenSearchOnHeapCache.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.RemovalListener; +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder; +import org.opensearch.common.cache.store.enums.CacheStoreType; +import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; + +/** + * This variant of on-heap cache uses OpenSearch custom cache implementation. + * @param Type of key. + * @param Type of value. + * + * @opensearch.experimental + */ +public class OpenSearchOnHeapCache implements StoreAwareCache, RemovalListener { + + private final Cache cache; + + private final StoreAwareCacheEventListener eventListener; + + public OpenSearchOnHeapCache(Builder builder) { + CacheBuilder cacheBuilder = CacheBuilder.builder() + .setMaximumWeight(builder.getMaxWeightInBytes()) + .weigher(builder.getWeigher()) + .removalListener(this); + if (builder.getExpireAfterAcess() != null) { + cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess()); + } + cache = cacheBuilder.build(); + this.eventListener = builder.getEventListener(); + } + + @Override + public V get(K key) { + V value = cache.get(key); + if (value != null) { + eventListener.onHit(key, value, CacheStoreType.ON_HEAP); + } else { + eventListener.onMiss(key, CacheStoreType.ON_HEAP); + } + return value; + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + eventListener.onCached(key, value, CacheStoreType.ON_HEAP); + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + V value = cache.computeIfAbsent(key, key1 -> loader.load(key)); + if (!loader.isLoaded()) { + eventListener.onHit(key, value, CacheStoreType.ON_HEAP); + } else { + eventListener.onMiss(key, CacheStoreType.ON_HEAP); + eventListener.onCached(key, value, CacheStoreType.ON_HEAP); + } + return value; + } + + @Override + public void invalidate(K key) { + cache.invalidate(key); + } + + @Override + public void invalidateAll() { + cache.invalidateAll(); + } + + @Override + public Iterable keys() { + return cache.keys(); + } + + @Override + public long count() { + return cache.count(); + } + + @Override + public void refresh() { + cache.refresh(); + } + + @Override + public CacheStoreType getTierType() { + return CacheStoreType.ON_HEAP; + } + + @Override + public void onRemoval(RemovalNotification notification) { + eventListener.onRemoval( + new StoreAwareCacheRemovalNotification<>( + notification.getKey(), + notification.getValue(), + notification.getRemovalReason(), + CacheStoreType.ON_HEAP + ) + ); + } + + /** + * Builder object + * @param Type of key + * @param Type of value + */ + public static class Builder extends StoreAwareCacheBuilder { + + @Override + public StoreAwareCache build() { + return new OpenSearchOnHeapCache(this); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCache.java b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCache.java new file mode 100644 index 0000000000000..45ca48d94c140 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCache.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.store.enums.CacheStoreType; + +/** + * Represents a cache with a specific type of store like onHeap, disk etc. + * @param Type of key. + * @param Type of value. + * + * @opensearch.experimental + */ +public interface StoreAwareCache extends ICache { + CacheStoreType getTierType(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheRemovalNotification.java b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheRemovalNotification.java new file mode 100644 index 0000000000000..492dbff3532a1 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheRemovalNotification.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.store.enums.CacheStoreType; + +/** + * Removal notification for store aware cache. + * @param Type of key. + * @param Type of value. + * + * @opensearch.internal + */ +public class StoreAwareCacheRemovalNotification extends RemovalNotification { + private final CacheStoreType cacheStoreType; + + public StoreAwareCacheRemovalNotification(K key, V value, RemovalReason removalReason, CacheStoreType cacheStoreType) { + super(key, value, removalReason); + this.cacheStoreType = cacheStoreType; + } + + public CacheStoreType getCacheStoreType() { + return cacheStoreType; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java new file mode 100644 index 0000000000000..4fbbbbfebfaa7 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/StoreAwareCacheValue.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store; + +import org.opensearch.common.cache.store.enums.CacheStoreType; + +/** + * Represents a store aware cache value. + * @param Type of value. + * + * @opensearch.internal + */ +public class StoreAwareCacheValue { + private final V value; + private final CacheStoreType source; + + public StoreAwareCacheValue(V value, CacheStoreType source) { + this.value = value; + this.source = source; + } + + public V getValue() { + return value; + } + + public CacheStoreType getCacheStoreType() { + return source; + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/builders/StoreAwareCacheBuilder.java b/server/src/main/java/org/opensearch/common/cache/store/builders/StoreAwareCacheBuilder.java new file mode 100644 index 0000000000000..fc5aa48aae90f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/builders/StoreAwareCacheBuilder.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store.builders; + +import org.opensearch.common.cache.store.StoreAwareCache; +import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; +import org.opensearch.common.unit.TimeValue; + +import java.util.function.ToLongBiFunction; + +/** + * Builder for store aware cache. + * @param Type of key. + * @param Type of value. + * + * @opensearch.internal + */ +public abstract class StoreAwareCacheBuilder { + + private long maxWeightInBytes; + + private ToLongBiFunction weigher; + + private TimeValue expireAfterAcess; + + private StoreAwareCacheEventListener eventListener; + + public StoreAwareCacheBuilder() {} + + public StoreAwareCacheBuilder setMaximumWeightInBytes(long sizeInBytes) { + this.maxWeightInBytes = sizeInBytes; + return this; + } + + public StoreAwareCacheBuilder setWeigher(ToLongBiFunction weigher) { + this.weigher = weigher; + return this; + } + + public StoreAwareCacheBuilder setExpireAfterAccess(TimeValue expireAfterAcess) { + this.expireAfterAcess = expireAfterAcess; + return this; + } + + public StoreAwareCacheBuilder setEventListener(StoreAwareCacheEventListener eventListener) { + this.eventListener = eventListener; + return this; + } + + public long getMaxWeightInBytes() { + return maxWeightInBytes; + } + + public TimeValue getExpireAfterAcess() { + return expireAfterAcess; + } + + public ToLongBiFunction getWeigher() { + return weigher; + } + + public StoreAwareCacheEventListener getEventListener() { + return eventListener; + } + + public abstract StoreAwareCache build(); +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/builders/package-info.java b/server/src/main/java/org/opensearch/common/cache/store/builders/package-info.java new file mode 100644 index 0000000000000..ac4590ae3bff7 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/builders/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base package for builders. + */ +package org.opensearch.common.cache.store.builders; diff --git a/server/src/main/java/org/opensearch/common/cache/store/enums/CacheStoreType.java b/server/src/main/java/org/opensearch/common/cache/store/enums/CacheStoreType.java new file mode 100644 index 0000000000000..04c0825787b66 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/enums/CacheStoreType.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store.enums; + +/** + * Cache store types in tiered cache. + * + * @opensearch.internal + */ +public enum CacheStoreType { + + ON_HEAP, + DISK; +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/enums/package-info.java b/server/src/main/java/org/opensearch/common/cache/store/enums/package-info.java new file mode 100644 index 0000000000000..7a4e0fa7201fd --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/enums/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Package related to tiered cache enums */ +package org.opensearch.common.cache.store.enums; diff --git a/server/src/main/java/org/opensearch/common/cache/store/listeners/StoreAwareCacheEventListener.java b/server/src/main/java/org/opensearch/common/cache/store/listeners/StoreAwareCacheEventListener.java new file mode 100644 index 0000000000000..6d7e4b39aaf9f --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/listeners/StoreAwareCacheEventListener.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.store.listeners; + +import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; +import org.opensearch.common.cache.store.enums.CacheStoreType; + +/** + * This can be used to listen to tiered caching events + * @param Type of key + * @param Type of value + * + * @opensearch.internal + */ +public interface StoreAwareCacheEventListener { + + void onMiss(K key, CacheStoreType cacheStoreType); + + void onRemoval(StoreAwareCacheRemovalNotification notification); + + void onHit(K key, V value, CacheStoreType cacheStoreType); + + void onCached(K key, V value, CacheStoreType cacheStoreType); +} diff --git a/server/src/main/java/org/opensearch/common/cache/store/listeners/package-info.java b/server/src/main/java/org/opensearch/common/cache/store/listeners/package-info.java new file mode 100644 index 0000000000000..c3222ca3ffb62 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/listeners/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Package related to tiered cache listeners */ +package org.opensearch.common.cache.store.listeners; diff --git a/server/src/main/java/org/opensearch/common/cache/store/package-info.java b/server/src/main/java/org/opensearch/common/cache/store/package-info.java new file mode 100644 index 0000000000000..edc1ecd7d5e7a --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/store/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Base package for store aware caches. */ +package org.opensearch.common.cache.store; diff --git a/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java b/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java new file mode 100644 index 0000000000000..8b432c9484aed --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/TieredSpilloverCache.java @@ -0,0 +1,268 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.store.StoreAwareCache; +import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; +import org.opensearch.common.cache.store.StoreAwareCacheValue; +import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder; +import org.opensearch.common.cache.store.enums.CacheStoreType; +import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.common.util.iterable.Iterables; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; + +/** + * This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap + * and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full, + * then items are eventually evicted from it and removed which will result in cache miss. + * + * @param Type of key + * @param Type of value + * + * @opensearch.experimental + */ +public class TieredSpilloverCache implements ICache, StoreAwareCacheEventListener { + + // TODO: Remove optional when diskCache implementation is integrated. + private final Optional> onDiskCache; + private final StoreAwareCache onHeapCache; + private final StoreAwareCacheEventListener listener; + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock()); + ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock()); + + /** + * Maintains caching tiers in ascending order of cache latency. + */ + private final List> cacheList; + + TieredSpilloverCache(Builder builder) { + Objects.requireNonNull(builder.onHeapCacheBuilder, "onHeap cache builder can't be null"); + this.onHeapCache = builder.onHeapCacheBuilder.setEventListener(this).build(); + if (builder.onDiskCacheBuilder != null) { + this.onDiskCache = Optional.of(builder.onDiskCacheBuilder.setEventListener(this).build()); + } else { + this.onDiskCache = Optional.empty(); + } + this.listener = builder.listener; + this.cacheList = this.onDiskCache.map(diskTier -> Arrays.asList(this.onHeapCache, diskTier)).orElse(List.of(this.onHeapCache)); + } + + // Package private for testing + StoreAwareCache getOnHeapCache() { + return onHeapCache; + } + + // Package private for testing + Optional> getOnDiskCache() { + return onDiskCache; + } + + @Override + public V get(K key) { + StoreAwareCacheValue cacheValue = getValueFromTieredCache(true).apply(key); + if (cacheValue == null) { + return null; + } + return cacheValue.getValue(); + } + + @Override + public void put(K key, V value) { + try (ReleasableLock ignore = writeLock.acquire()) { + onHeapCache.put(key, value); + listener.onCached(key, value, CacheStoreType.ON_HEAP); + } + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + // We are skipping calling event listeners at this step as we do another get inside below computeIfAbsent. + // Where we might end up calling onMiss twice for a key not present in onHeap cache. + // Similary we might end up calling both onMiss and onHit for a key, in case we are receiving concurrent + // requests for the same key which requires loading only once. + StoreAwareCacheValue cacheValue = getValueFromTieredCache(false).apply(key); + if (cacheValue == null) { + // Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside. + // This is needed as there can be many requests for the same key at the same time and we only want to load + // the value once. + V value = null; + try (ReleasableLock ignore = writeLock.acquire()) { + value = onHeapCache.computeIfAbsent(key, loader); + } + if (loader.isLoaded()) { + listener.onMiss(key, CacheStoreType.ON_HEAP); + onDiskCache.ifPresent(diskTier -> listener.onMiss(key, CacheStoreType.DISK)); + listener.onCached(key, value, CacheStoreType.ON_HEAP); + } else { + listener.onHit(key, value, CacheStoreType.ON_HEAP); + } + return value; + } + listener.onHit(key, cacheValue.getValue(), cacheValue.getCacheStoreType()); + if (cacheValue.getCacheStoreType().equals(CacheStoreType.DISK)) { + listener.onMiss(key, CacheStoreType.ON_HEAP); + } + return cacheValue.getValue(); + } + + @Override + public void invalidate(K key) { + // We are trying to invalidate the key from all caches though it would be present in only of them. + // Doing this as we don't know where it is located. We could do a get from both and check that, but what will + // also trigger a hit/miss listener event, so ignoring it for now. + try (ReleasableLock ignore = writeLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + storeAwareCache.invalidate(key); + } + } + } + + @Override + public void invalidateAll() { + try (ReleasableLock ignore = writeLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + storeAwareCache.invalidateAll(); + } + } + } + + /** + * Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache. + * @return An iterable over (onHeap + disk) keys + */ + @Override + public Iterable keys() { + Iterable onDiskKeysIterable; + if (onDiskCache.isPresent()) { + onDiskKeysIterable = onDiskCache.get().keys(); + } else { + onDiskKeysIterable = Collections::emptyIterator; + } + return Iterables.concat(onHeapCache.keys(), onDiskKeysIterable); + } + + @Override + public long count() { + long totalCount = 0; + for (StoreAwareCache storeAwareCache : cacheList) { + totalCount += storeAwareCache.count(); + } + return totalCount; + } + + @Override + public void refresh() { + try (ReleasableLock ignore = writeLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + storeAwareCache.refresh(); + } + } + } + + @Override + public void onMiss(K key, CacheStoreType cacheStoreType) { + // Misses for tiered cache are tracked here itself. + } + + @Override + public void onRemoval(StoreAwareCacheRemovalNotification notification) { + if (RemovalReason.EVICTED.equals(notification.getRemovalReason()) + || RemovalReason.CAPACITY.equals(notification.getRemovalReason())) { + switch (notification.getCacheStoreType()) { + case ON_HEAP: + try (ReleasableLock ignore = writeLock.acquire()) { + onDiskCache.ifPresent(diskTier -> { diskTier.put(notification.getKey(), notification.getValue()); }); + } + onDiskCache.ifPresent( + diskTier -> listener.onCached(notification.getKey(), notification.getValue(), CacheStoreType.DISK) + ); + break; + default: + break; + } + } + listener.onRemoval(notification); + } + + @Override + public void onHit(K key, V value, CacheStoreType cacheStoreType) { + // Hits for tiered cache are tracked here itself. + } + + @Override + public void onCached(K key, V value, CacheStoreType cacheStoreType) { + // onCached events for tiered cache are tracked here itself. + } + + private Function> getValueFromTieredCache(boolean triggerEventListener) { + return key -> { + try (ReleasableLock ignore = readLock.acquire()) { + for (StoreAwareCache storeAwareCache : cacheList) { + V value = storeAwareCache.get(key); + if (value != null) { + if (triggerEventListener) { + listener.onHit(key, value, storeAwareCache.getTierType()); + } + return new StoreAwareCacheValue<>(value, storeAwareCache.getTierType()); + } else { + if (triggerEventListener) { + listener.onMiss(key, storeAwareCache.getTierType()); + } + } + } + } + return null; + }; + } + + /** + * Builder object for tiered spillover cache. + * @param Type of key + * @param Type of value + */ + public static class Builder { + private StoreAwareCacheBuilder onHeapCacheBuilder; + private StoreAwareCacheBuilder onDiskCacheBuilder; + private StoreAwareCacheEventListener listener; + + public Builder() {} + + public Builder setOnHeapCacheBuilder(StoreAwareCacheBuilder onHeapCacheBuilder) { + this.onHeapCacheBuilder = onHeapCacheBuilder; + return this; + } + + public Builder setOnDiskCacheBuilder(StoreAwareCacheBuilder onDiskCacheBuilder) { + this.onDiskCacheBuilder = onDiskCacheBuilder; + return this; + } + + public Builder setListener(StoreAwareCacheEventListener listener) { + this.listener = listener; + return this; + } + + public TieredSpilloverCache build() { + return new TieredSpilloverCache<>(this); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/cache/tier/package-info.java b/server/src/main/java/org/opensearch/common/cache/tier/package-info.java new file mode 100644 index 0000000000000..7ad81dbe3073c --- /dev/null +++ b/server/src/main/java/org/opensearch/common/cache/tier/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Base package for cache tier support. */ +package org.opensearch.common.cache.tier; diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 629cea102a8b2..4a19f8eb8714d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -247,7 +247,7 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - static class Key implements Accountable { + public static class Key implements Accountable { private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); public final CacheEntity entity; // use as identity equality @@ -328,6 +328,9 @@ public int hashCode() { } } + /** + * Logic to clean up in-memory cache. + */ synchronized void cleanCache() { final Set currentKeysToClean = new HashSet<>(); final Set currentFullClean = new HashSet<>(); @@ -355,7 +358,6 @@ synchronized void cleanCache() { } } } - cache.refresh(); } diff --git a/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java b/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java new file mode 100644 index 0000000000000..eb75244c6f8b1 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/cache/tier/TieredSpilloverCacheTests.java @@ -0,0 +1,786 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.cache.tier; + +import org.opensearch.common.cache.LoadAwareCacheLoader; +import org.opensearch.common.cache.RemovalReason; +import org.opensearch.common.cache.store.OpenSearchOnHeapCache; +import org.opensearch.common.cache.store.StoreAwareCache; +import org.opensearch.common.cache.store.StoreAwareCacheRemovalNotification; +import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder; +import org.opensearch.common.cache.store.enums.CacheStoreType; +import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class TieredSpilloverCacheTests extends OpenSearchTestCase { + + public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockCacheEventListener eventListener = new MockCacheEventListener(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + randomIntBetween(1, 4), + eventListener, + 0 + ); + int numOfItems1 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + List keys = new ArrayList<>(); + // Put values in cache. + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + keys.add(key); + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).hitCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(1, onHeapCacheSize / 2 - 1); + int cacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { + // Hit cache with stored key + cacheHit++; + int index = randomIntBetween(0, keys.size() - 1); + tieredSpilloverCache.computeIfAbsent(keys.get(index), getLoadAwareCacheLoader()); + } else { + // Hit cache with randomized key which is expected to miss cache always. + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), getLoadAwareCacheLoader()); + cacheMiss++; + } + } + assertEquals(cacheHit, eventListener.enumMap.get(CacheStoreType.ON_HEAP).hitCount.count()); + assertEquals(numOfItems1 + cacheMiss, eventListener.enumMap.get(CacheStoreType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count()); + } + + public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + MockCacheEventListener eventListener = new MockCacheEventListener(); + StoreAwareCacheBuilder cacheBuilder = new OpenSearchOnHeapCache.Builder().setMaximumWeightInBytes( + onHeapCacheSize * 50 + ).setWeigher((k, v) -> 50); // Will support onHeapCacheSize entries. + + StoreAwareCacheBuilder diskCacheBuilder = new MockOnDiskCache.Builder().setMaxSize(diskCacheSize) + .setDeliberateDelay(0); + + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() + .setOnHeapCacheBuilder(cacheBuilder) + .setOnDiskCacheBuilder(diskCacheBuilder) + .setListener(eventListener) + .build(); + + // Put values in cache more than it's size and cause evictions from onHeap. + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader); + } + long actualDiskCacheSize = tieredSpilloverCache.getOnDiskCache().get().count(); + assertEquals(numOfItems1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).missCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).hitCount.count()); + assertEquals(actualDiskCacheSize, eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count()); + + assertEquals( + eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count(), + eventListener.enumMap.get(CacheStoreType.DISK).cachedCount.count() + ); + assertEquals(actualDiskCacheSize, eventListener.enumMap.get(CacheStoreType.DISK).cachedCount.count()); + + tieredSpilloverCache.getOnHeapCache().keys().forEach(onHeapKeys::add); + tieredSpilloverCache.getOnDiskCache().get().keys().forEach(diskTierKeys::add); + + assertEquals(tieredSpilloverCache.getOnHeapCache().count(), onHeapKeys.size()); + assertEquals(tieredSpilloverCache.getOnDiskCache().get().count(), diskTierKeys.size()); + + // Try to hit cache again with some randomization. + int numOfItems2 = randomIntBetween(50, 200); + int onHeapCacheHit = 0; + int diskCacheHit = 0; + int cacheMiss = 0; + for (int iter = 0; iter < numOfItems2; iter++) { + if (randomBoolean()) { // Hit cache with key stored in onHeap cache. + onHeapCacheHit++; + int index = randomIntBetween(0, onHeapKeys.size() - 1); + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(onHeapKeys.get(index), loadAwareCacheLoader); + assertFalse(loadAwareCacheLoader.isLoaded()); + } else { // Hit cache with key stored in disk cache. + diskCacheHit++; + int index = randomIntBetween(0, diskTierKeys.size() - 1); + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(diskTierKeys.get(index), loadAwareCacheLoader); + assertFalse(loadAwareCacheLoader.isLoaded()); + } + } + for (int iter = 0; iter < randomIntBetween(50, 200); iter++) { + // Hit cache with randomized key which is expected to miss cache always. + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + cacheMiss++; + } + // On heap cache misses would also include diskCacheHits as it means it missed onHeap cache. + assertEquals(numOfItems1 + cacheMiss + diskCacheHit, eventListener.enumMap.get(CacheStoreType.ON_HEAP).missCount.count()); + assertEquals(onHeapCacheHit, eventListener.enumMap.get(CacheStoreType.ON_HEAP).hitCount.count()); + assertEquals(cacheMiss + numOfItems1, eventListener.enumMap.get(CacheStoreType.DISK).missCount.count()); + assertEquals(diskCacheHit, eventListener.enumMap.get(CacheStoreType.DISK).hitCount.count()); + } + + public void testComputeIfAbsentWithEvictionsFromBothTier() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheEventListener eventListener = new MockCacheEventListener(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + int numOfItems = randomIntBetween(totalSize + 1, totalSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), tieredCacheLoader); + } + assertTrue(eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count() > 0); + assertTrue(eventListener.enumMap.get(CacheStoreType.DISK).evictionsMetric.count() > 0); + } + + public void testGetAndCount() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheEventListener eventListener = new MockCacheEventListener(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } + + for (int iter = 0; iter < numOfItems1; iter++) { + if (randomBoolean()) { + if (randomBoolean()) { + int index = randomIntBetween(0, onHeapKeys.size() - 1); + assertNotNull(tieredSpilloverCache.get(onHeapKeys.get(index))); + } else { + int index = randomIntBetween(0, diskTierKeys.size() - 1); + assertNotNull(tieredSpilloverCache.get(diskTierKeys.get(index))); + } + } else { + assertNull(tieredSpilloverCache.get(UUID.randomUUID().toString())); + } + } + assertEquals(numOfItems1, tieredSpilloverCache.count()); + } + + public void testWithDiskTierNull() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + MockCacheEventListener eventListener = new MockCacheEventListener(); + + StoreAwareCacheBuilder onHeapCacheBuilder = new OpenSearchOnHeapCache.Builder() + .setMaximumWeightInBytes(onHeapCacheSize * 20) + .setWeigher((k, v) -> 20); // Will support upto onHeapCacheSize entries + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() + .setOnHeapCacheBuilder(onHeapCacheBuilder) + .setListener(eventListener) + .build(); + + int numOfItems = randomIntBetween(onHeapCacheSize + 1, onHeapCacheSize * 3); + for (int iter = 0; iter < numOfItems; iter++) { + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), loadAwareCacheLoader); + } + assertTrue(eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count() > 0); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.DISK).cachedCount.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.DISK).evictionsMetric.count()); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.DISK).missCount.count()); + } + + public void testPut() { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(onHeapCacheSize + 1, 100); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + String key = UUID.randomUUID().toString(); + String value = UUID.randomUUID().toString(); + tieredSpilloverCache.put(key, value); + assertEquals(1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).cachedCount.count()); + assertEquals(1, tieredSpilloverCache.count()); + } + + public void testPutAndVerifyNewItemsArePresentOnHeapCache() throws Exception { + int onHeapCacheSize = randomIntBetween(200, 400); + int diskCacheSize = randomIntBetween(450, 800); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + for (int i = 0; i < onHeapCacheSize; i++) { + tieredSpilloverCache.computeIfAbsent(UUID.randomUUID().toString(), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(String key) throws Exception { + return UUID.randomUUID().toString(); + } + }); + } + + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count()); + assertEquals(0, tieredSpilloverCache.getOnDiskCache().get().count()); + + // Again try to put OnHeap cache capacity amount of new items. + List newKeyList = new ArrayList<>(); + for (int i = 0; i < onHeapCacheSize; i++) { + newKeyList.add(UUID.randomUUID().toString()); + } + + for (int i = 0; i < newKeyList.size(); i++) { + tieredSpilloverCache.computeIfAbsent(newKeyList.get(i), new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(String key) { + return UUID.randomUUID().toString(); + } + }); + } + + // Verify that new items are part of onHeap cache. + List actualOnHeapCacheKeys = new ArrayList<>(); + tieredSpilloverCache.getOnHeapCache().keys().forEach(actualOnHeapCacheKeys::add); + + assertEquals(newKeyList.size(), actualOnHeapCacheKeys.size()); + for (int i = 0; i < actualOnHeapCacheKeys.size(); i++) { + assertTrue(newKeyList.contains(actualOnHeapCacheKeys.get(i))); + } + + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnHeapCache().count()); + assertEquals(onHeapCacheSize, tieredSpilloverCache.getOnDiskCache().get().count()); + } + + public void testInvalidate() { + int onHeapCacheSize = 1; + int diskCacheSize = 10; + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + String key = UUID.randomUUID().toString(); + String value = UUID.randomUUID().toString(); + // First try to invalidate without the key present in cache. + tieredSpilloverCache.invalidate(key); + assertEquals(0, eventListener.enumMap.get(CacheStoreType.ON_HEAP).invalidationMetric.count()); + + // Now try to invalidate with the key present in onHeap cache. + tieredSpilloverCache.put(key, value); + tieredSpilloverCache.invalidate(key); + assertEquals(1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).invalidationMetric.count()); + assertEquals(0, tieredSpilloverCache.count()); + + tieredSpilloverCache.put(key, value); + // Put another key/value so that one of the item is evicted to disk cache. + String key2 = UUID.randomUUID().toString(); + tieredSpilloverCache.put(key2, UUID.randomUUID().toString()); + assertEquals(2, tieredSpilloverCache.count()); + // Again invalidate older key + tieredSpilloverCache.invalidate(key); + assertEquals(1, eventListener.enumMap.get(CacheStoreType.DISK).invalidationMetric.count()); + assertEquals(1, tieredSpilloverCache.count()); + } + + public void testCacheKeys() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + // During first round add onHeapCacheSize entries. Will go to onHeap cache initially. + for (int i = 0; i < onHeapCacheSize; i++) { + String key = UUID.randomUUID().toString(); + diskTierKeys.add(key); + tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader()); + } + // In another round, add another onHeapCacheSize entries. These will go to onHeap and above ones will be + // evicted to onDisk cache. + for (int i = 0; i < onHeapCacheSize; i++) { + String key = UUID.randomUUID().toString(); + onHeapKeys.add(key); + tieredSpilloverCache.computeIfAbsent(key, getLoadAwareCacheLoader()); + } + + List actualOnHeapKeys = new ArrayList<>(); + List actualOnDiskKeys = new ArrayList<>(); + Iterable onHeapiterable = tieredSpilloverCache.getOnHeapCache().keys(); + Iterable onDiskiterable = tieredSpilloverCache.getOnDiskCache().get().keys(); + onHeapiterable.iterator().forEachRemaining(actualOnHeapKeys::add); + onDiskiterable.iterator().forEachRemaining(actualOnDiskKeys::add); + for (String onHeapKey : onHeapKeys) { + assertTrue(actualOnHeapKeys.contains(onHeapKey)); + } + for (String onDiskKey : actualOnDiskKeys) { + assertTrue(actualOnDiskKeys.contains(onDiskKey)); + } + + // Testing keys() which returns all keys. + List actualMergedKeys = new ArrayList<>(); + List expectedMergedKeys = new ArrayList<>(); + expectedMergedKeys.addAll(onHeapKeys); + expectedMergedKeys.addAll(diskTierKeys); + + Iterable mergedIterable = tieredSpilloverCache.keys(); + mergedIterable.iterator().forEachRemaining(actualMergedKeys::add); + + assertEquals(expectedMergedKeys.size(), actualMergedKeys.size()); + for (String key : expectedMergedKeys) { + assertTrue(actualMergedKeys.contains(key)); + } + } + + public void testRefresh() { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + tieredSpilloverCache.refresh(); + } + + public void testInvalidateAll() throws Exception { + int onHeapCacheSize = randomIntBetween(10, 30); + int diskCacheSize = randomIntBetween(60, 100); + int totalSize = onHeapCacheSize + diskCacheSize; + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + // Put values in cache more than it's size and cause evictions from onHeap. + int numOfItems1 = randomIntBetween(onHeapCacheSize + 1, totalSize); + List onHeapKeys = new ArrayList<>(); + List diskTierKeys = new ArrayList<>(); + for (int iter = 0; iter < numOfItems1; iter++) { + String key = UUID.randomUUID().toString(); + if (iter > (onHeapCacheSize - 1)) { + // All these are bound to go to disk based cache. + diskTierKeys.add(key); + } else { + onHeapKeys.add(key); + } + LoadAwareCacheLoader tieredCacheLoader = getLoadAwareCacheLoader(); + tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader); + } + assertEquals(numOfItems1, tieredSpilloverCache.count()); + tieredSpilloverCache.invalidateAll(); + assertEquals(0, tieredSpilloverCache.count()); + } + + public void testComputeIfAbsentConcurrently() throws Exception { + int onHeapCacheSize = randomIntBetween(100, 300); + int diskCacheSize = randomIntBetween(200, 400); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + + TieredSpilloverCache tieredSpilloverCache = intializeTieredSpilloverCache( + onHeapCacheSize, + diskCacheSize, + eventListener, + 0 + ); + + int numberOfSameKeys = randomIntBetween(10, onHeapCacheSize - 1); + String key = UUID.randomUUID().toString(); + String value = UUID.randomUUID().toString(); + + Thread[] threads = new Thread[numberOfSameKeys]; + Phaser phaser = new Phaser(numberOfSameKeys + 1); + CountDownLatch countDownLatch = new CountDownLatch(numberOfSameKeys); // To wait for all threads to finish. + + List> loadAwareCacheLoaderList = new CopyOnWriteArrayList<>(); + + for (int i = 0; i < numberOfSameKeys; i++) { + threads[i] = new Thread(() -> { + try { + LoadAwareCacheLoader loadAwareCacheLoader = new LoadAwareCacheLoader() { + boolean isLoaded = false; + + @Override + public boolean isLoaded() { + return isLoaded; + } + + @Override + public Object load(Object key) throws Exception { + isLoaded = true; + return value; + } + }; + loadAwareCacheLoaderList.add(loadAwareCacheLoader); + phaser.arriveAndAwaitAdvance(); + tieredSpilloverCache.computeIfAbsent(key, loadAwareCacheLoader); + } catch (Exception e) { + throw new RuntimeException(e); + } + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); // Wait for rest of tasks to be cancelled. + int numberOfTimesKeyLoaded = 0; + assertEquals(numberOfSameKeys, loadAwareCacheLoaderList.size()); + for (int i = 0; i < loadAwareCacheLoaderList.size(); i++) { + LoadAwareCacheLoader loader = loadAwareCacheLoaderList.get(i); + if (loader.isLoaded()) { + numberOfTimesKeyLoaded++; + } + } + assertEquals(1, numberOfTimesKeyLoaded); // It should be loaded only once. + } + + public void testConcurrencyForEvictionFlow() throws Exception { + int diskCacheSize = randomIntBetween(450, 800); + + MockCacheEventListener eventListener = new MockCacheEventListener<>(); + + StoreAwareCacheBuilder cacheBuilder = new OpenSearchOnHeapCache.Builder().setMaximumWeightInBytes( + 200 + ).setWeigher((k, v) -> 150); + + StoreAwareCacheBuilder diskCacheBuilder = new MockOnDiskCache.Builder().setMaxSize(diskCacheSize) + .setDeliberateDelay(500); + + TieredSpilloverCache tieredSpilloverCache = new TieredSpilloverCache.Builder() + .setOnHeapCacheBuilder(cacheBuilder) + .setOnDiskCacheBuilder(diskCacheBuilder) + .setListener(eventListener) + .build(); + + String keyToBeEvicted = "key1"; + String secondKey = "key2"; + + // Put first key on tiered cache. Will go into onHeap cache. + tieredSpilloverCache.computeIfAbsent(keyToBeEvicted, new LoadAwareCacheLoader<>() { + @Override + public boolean isLoaded() { + return false; + } + + @Override + public String load(String key) { + return UUID.randomUUID().toString(); + } + }); + CountDownLatch countDownLatch = new CountDownLatch(1); + CountDownLatch countDownLatch1 = new CountDownLatch(1); + // Put second key on tiered cache. Will cause eviction of first key from onHeap cache and should go into + // disk cache. + LoadAwareCacheLoader loadAwareCacheLoader = getLoadAwareCacheLoader(); + Thread thread = new Thread(() -> { + try { + tieredSpilloverCache.computeIfAbsent(secondKey, loadAwareCacheLoader); + countDownLatch1.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + thread.start(); + assertBusy(() -> { assertTrue(loadAwareCacheLoader.isLoaded()); }, 100, TimeUnit.MILLISECONDS); // We wait for new key to be loaded + // after which it eviction flow is + // guaranteed to occur. + StoreAwareCache onDiskCache = tieredSpilloverCache.getOnDiskCache().get(); + + // Now on a different thread, try to get key(above one which got evicted) from tiered cache. We expect this + // should return not null value as it should be present on diskCache. + AtomicReference actualValue = new AtomicReference<>(); + Thread thread1 = new Thread(() -> { + try { + actualValue.set(tieredSpilloverCache.get(keyToBeEvicted)); + } catch (Exception e) { + throw new RuntimeException(e); + } + countDownLatch.countDown(); + }); + thread1.start(); + countDownLatch.await(); + assertNotNull(actualValue.get()); + countDownLatch1.await(); + assertEquals(1, eventListener.enumMap.get(CacheStoreType.ON_HEAP).evictionsMetric.count()); + assertEquals(1, tieredSpilloverCache.getOnHeapCache().count()); + assertEquals(1, onDiskCache.count()); + assertNotNull(onDiskCache.get(keyToBeEvicted)); + } + + class MockCacheEventListener implements StoreAwareCacheEventListener { + + EnumMap enumMap = new EnumMap<>(CacheStoreType.class); + + MockCacheEventListener() { + for (CacheStoreType cacheStoreType : CacheStoreType.values()) { + enumMap.put(cacheStoreType, new TestStatsHolder()); + } + } + + @Override + public void onMiss(K key, CacheStoreType cacheStoreType) { + enumMap.get(cacheStoreType).missCount.inc(); + } + + @Override + public void onRemoval(StoreAwareCacheRemovalNotification notification) { + if (notification.getRemovalReason().equals(RemovalReason.EVICTED)) { + enumMap.get(notification.getCacheStoreType()).evictionsMetric.inc(); + } else if (notification.getRemovalReason().equals(RemovalReason.INVALIDATED)) { + enumMap.get(notification.getCacheStoreType()).invalidationMetric.inc(); + } + } + + @Override + public void onHit(K key, V value, CacheStoreType cacheStoreType) { + enumMap.get(cacheStoreType).hitCount.inc(); + } + + @Override + public void onCached(K key, V value, CacheStoreType cacheStoreType) { + enumMap.get(cacheStoreType).cachedCount.inc(); + } + + class TestStatsHolder { + final CounterMetric evictionsMetric = new CounterMetric(); + final CounterMetric hitCount = new CounterMetric(); + final CounterMetric missCount = new CounterMetric(); + final CounterMetric cachedCount = new CounterMetric(); + final CounterMetric invalidationMetric = new CounterMetric(); + } + } + + private LoadAwareCacheLoader getLoadAwareCacheLoader() { + return new LoadAwareCacheLoader() { + boolean isLoaded = false; + + @Override + public String load(String key) { + isLoaded = true; + return UUID.randomUUID().toString(); + } + + @Override + public boolean isLoaded() { + return isLoaded; + } + }; + } + + private TieredSpilloverCache intializeTieredSpilloverCache( + int onHeapCacheSize, + int diksCacheSize, + StoreAwareCacheEventListener eventListener, + long diskDeliberateDelay + ) { + StoreAwareCacheBuilder diskCacheBuilder = new MockOnDiskCache.Builder().setMaxSize(diksCacheSize) + .setDeliberateDelay(diskDeliberateDelay); + StoreAwareCacheBuilder onHeapCacheBuilder = new OpenSearchOnHeapCache.Builder() + .setMaximumWeightInBytes(onHeapCacheSize * 20) + .setWeigher((k, v) -> 20); // Will support upto onHeapCacheSize entries + return new TieredSpilloverCache.Builder().setOnHeapCacheBuilder(onHeapCacheBuilder) + .setOnDiskCacheBuilder(diskCacheBuilder) + .setListener(eventListener) + .build(); + } +} + +class MockOnDiskCache implements StoreAwareCache { + + Map cache; + int maxSize; + + long delay; + StoreAwareCacheEventListener eventListener; + + MockOnDiskCache(int maxSize, StoreAwareCacheEventListener eventListener, long delay) { + this.maxSize = maxSize; + this.eventListener = eventListener; + this.delay = delay; + this.cache = new ConcurrentHashMap(); + } + + @Override + public V get(K key) { + V value = cache.get(key); + if (value != null) { + eventListener.onHit(key, value, CacheStoreType.DISK); + } else { + eventListener.onMiss(key, CacheStoreType.DISK); + } + return value; + } + + @Override + public void put(K key, V value) { + if (this.cache.size() >= maxSize) { // For simplification + eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, value, RemovalReason.EVICTED, CacheStoreType.DISK)); + return; + } + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + this.cache.put(key, value); + eventListener.onCached(key, value, CacheStoreType.DISK); + } + + @Override + public V computeIfAbsent(K key, LoadAwareCacheLoader loader) throws Exception { + V value = cache.computeIfAbsent(key, key1 -> { + try { + return loader.load(key); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + if (!loader.isLoaded()) { + eventListener.onHit(key, value, CacheStoreType.DISK); + } else { + eventListener.onMiss(key, CacheStoreType.DISK); + eventListener.onCached(key, value, CacheStoreType.DISK); + } + return value; + } + + @Override + public void invalidate(K key) { + if (this.cache.containsKey(key)) { + eventListener.onRemoval(new StoreAwareCacheRemovalNotification<>(key, null, RemovalReason.INVALIDATED, CacheStoreType.DISK)); + } + this.cache.remove(key); + } + + @Override + public void invalidateAll() { + this.cache.clear(); + } + + @Override + public Iterable keys() { + return this.cache.keySet(); + } + + @Override + public long count() { + return this.cache.size(); + } + + @Override + public void refresh() {} + + @Override + public CacheStoreType getTierType() { + return CacheStoreType.DISK; + } + + public static class Builder extends StoreAwareCacheBuilder { + + int maxSize; + long delay; + + @Override + public StoreAwareCache build() { + return new MockOnDiskCache(maxSize, this.getEventListener(), delay); + } + + public Builder setMaxSize(int maxSize) { + this.maxSize = maxSize; + return this; + } + + public Builder setDeliberateDelay(long millis) { + this.delay = millis; + return this; + } + } +} From a9ce180b57dc20408e5a36337dbdc85d6e1fd306 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Tue, 9 Jan 2024 11:27:06 -0800 Subject: [PATCH 4/4] Unmute testNoFailuresOnFileReads flaky test (#11824) Signed-off-by: Suraj Singh --- .../java/org/opensearch/index/shard/RemoteIndexShardTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java index eacc504428ef1..57a561bc8f2a3 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java @@ -465,7 +465,6 @@ public void onReplicationFailure( * blocking update of reader. Once this is done, it corrupts one segment file and ensure that file is deleted in next * round of segment replication by ensuring doc count. */ - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10885") public void testNoFailuresOnFileReads() throws Exception { try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) { shards.startAll();