From 6fa3a0dc017d0891d1a263f44202ea88691833e5 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 30 Nov 2023 19:23:07 -0800 Subject: [PATCH 1/8] Fix bug where replication lag grows post primary relocation (#11238) * Fix bug where replication lag grows post primary relocation Signed-off-by: Marc Handalian * Fix broken UT Signed-off-by: Marc Handalian * add unit test for cluster state update Signed-off-by: Marc Handalian * PR feedback Signed-off-by: Marc Handalian * add changelog entry Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian --- CHANGELOG.md | 1 + ...plicationUsingRemoteStoreDisruptionIT.java | 77 +++++++++++++++++++ .../index/seqno/ReplicationTracker.java | 6 +- .../opensearch/index/shard/IndexShard.java | 10 +-- .../SegmentReplicationTargetService.java | 55 ++++++++++++- .../main/java/org/opensearch/node/Node.java | 2 + .../SegmentReplicationTargetServiceTests.java | 74 +++++++++++++++++- 7 files changed, 209 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01c56fe762ec5..b0def508db314 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -175,6 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152)) - Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369)) - Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167)) +- Fix bug where replication lag grows post primary relocation ([#11238](https://github.com/opensearch-project/OpenSearch/pull/11238)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java index b7b3f1d14f422..d5cdc22a15478 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreDisruptionIT.java @@ -8,10 +8,16 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.index.Index; import org.opensearch.index.IndexService; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.SegmentReplicationState; @@ -20,10 +26,12 @@ import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.disruption.SlowClusterStateProcessing; import java.nio.file.Path; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * This class runs tests with remote store + segRep while blocking file downloads @@ -111,6 +119,75 @@ public void testCancelReplicationWhileFetchingMetadata() throws Exception { cleanupRepo(); } + public void testUpdateVisibleCheckpointWithLaggingClusterStateUpdates_primaryRelocation() throws Exception { + Path location = randomRepoPath().toAbsolutePath(); + Settings nodeSettings = Settings.builder().put(buildRemoteStoreNodeAttributes(location, 0d, "metadata", Long.MAX_VALUE)).build(); + internalCluster().startClusterManagerOnlyNode(nodeSettings); + internalCluster().startDataOnlyNodes(2, nodeSettings); + final Settings indexSettings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build(); + createIndex(INDEX_NAME, indexSettings); + ensureGreen(INDEX_NAME); + final Set dataNodeNames = internalCluster().getDataNodeNames(); + final String replicaNode = getNode(dataNodeNames, false); + final String oldPrimary = getNode(dataNodeNames, true); + + // index a doc. + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", randomInt()).get(); + refresh(INDEX_NAME); + + logger.info("--> start another node"); + final String newPrimary = internalCluster().startDataOnlyNode(nodeSettings); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("4") + .get(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(replicaNode, random(), 0, 0, 1000, 2000); + internalCluster().setDisruptionScheme(disruption); + disruption.startDisrupting(); + + // relocate the primary + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(new TimeValue(5, TimeUnit.MINUTES)) + .execute() + .actionGet(); + assertEquals(clusterHealthResponse.isTimedOut(), false); + + IndexShard newPrimary_shard = getIndexShard(newPrimary, INDEX_NAME); + IndexShard replica = getIndexShard(replicaNode, INDEX_NAME); + assertBusy(() -> { + assertEquals( + newPrimary_shard.getLatestReplicationCheckpoint().getSegmentInfosVersion(), + replica.getLatestReplicationCheckpoint().getSegmentInfosVersion() + ); + }); + + assertBusy(() -> { + ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().get(); + ReplicationStats replicationStats = clusterStatsResponse.getIndicesStats().getSegments().getReplicationStats(); + assertEquals(0L, replicationStats.maxBytesBehind); + assertEquals(0L, replicationStats.maxReplicationLag); + assertEquals(0L, replicationStats.totalBytesBehind); + }); + disruption.stopDisrupting(); + disableRepoConsistencyCheck("Remote Store Creates System Repository"); + cleanupRepo(); + } + private String getNode(Set dataNodeNames, boolean primary) { assertEquals(2, dataNodeNames.size()); for (String name : dataNodeNames) { diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 3113428ec60ef..7b9c1d3aa548f 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1322,8 +1322,10 @@ private SegmentReplicationShardStats buildShardStats(final String allocationId, allocationId, cps.checkpointTimers.size(), bytesBehind, - cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0), - cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0), + bytesBehind > 0L ? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0) : 0, + bytesBehind > 0L + ? cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0) + : 0, cps.lastCompletedReplicationLag ); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7f9e5f31d1976..cbb246219546b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1764,8 +1764,8 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp if (isSegmentReplicationAllowed() == false) { return false; } - ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); - if (localCheckpoint.isAheadOf(requestCheckpoint)) { + final ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + if (requestCheckpoint.isAheadOf(localCheckpoint) == false) { logger.trace( () -> new ParameterizedMessage( "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", @@ -1775,12 +1775,6 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp ); return false; } - if (localCheckpoint.equals(requestCheckpoint)) { - logger.trace( - () -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint) - ); - return false; - } return true; } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index cb738d74000bc..d6db154a4e0e3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -15,10 +15,13 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -26,6 +29,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -61,7 +65,7 @@ * * @opensearch.internal */ -public class SegmentReplicationTargetService implements IndexEventListener { +public class SegmentReplicationTargetService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); @@ -144,6 +148,53 @@ public SegmentReplicationTargetService( ); } + @Override + protected void doStart() { + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + clusterService.addListener(this); + } + } + + @Override + protected void doStop() { + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + clusterService.removeListener(this); + } + } + + @Override + protected void doClose() throws IOException { + + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.routingTableChanged()) { + for (IndexService indexService : indicesService) { + if (indexService.getIndexSettings().isSegRepEnabled() && event.indexRoutingTableChanged(indexService.index().getName())) { + for (IndexShard shard : indexService) { + if (shard.routingEntry().primary() == false) { + // for this shard look up its primary routing, if it has completed a relocation trigger replication + final String previousNode = event.previousState() + .routingTable() + .shardRoutingTable(shard.shardId()) + .primaryShard() + .currentNodeId(); + final String currentNode = event.state() + .routingTable() + .shardRoutingTable(shard.shardId()) + .primaryShard() + .currentNodeId(); + if (previousNode.equals(currentNode) == false) { + processLatestReceivedCheckpoint(shard, Thread.currentThread()); + } + } + } + } + } + } + } + /** * Cancel any replications on this node for a replica that is about to be closed. */ @@ -395,7 +446,7 @@ private DiscoveryNode getPrimaryNode(ShardRouting primaryShard) { // visible to tests protected boolean processLatestReceivedCheckpoint(IndexShard replicaShard, Thread thread) { final ReplicationCheckpoint latestPublishedCheckpoint = latestReceivedCheckpoint.get(replicaShard.shardId()); - if (latestPublishedCheckpoint != null && latestPublishedCheckpoint.isAheadOf(replicaShard.getLatestReplicationCheckpoint())) { + if (latestPublishedCheckpoint != null) { logger.trace( () -> new ParameterizedMessage( "Processing latest received checkpoint for shard {} {}", diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 3a4860a9bf5ff..4cbf8dc191a9d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1414,6 +1414,7 @@ public Node start() throws NodeValidationException { assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; injector.getInstance(PeerRecoverySourceService.class).start(); + injector.getInstance(SegmentReplicationTargetService.class).start(); injector.getInstance(SegmentReplicationSourceService.class).start(); final RemoteClusterStateService remoteClusterStateService = injector.getInstance(RemoteClusterStateService.class); @@ -1602,6 +1603,7 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(IndicesStore.class)); toClose.add(injector.getInstance(PeerRecoverySourceService.class)); toClose.add(injector.getInstance(SegmentReplicationSourceService.class)); + toClose.add(injector.getInstance(SegmentReplicationTargetService.class)); toClose.add(() -> stopWatch.stop().start("cluster")); toClose.add(injector.getInstance(ClusterService.class)); toClose.add(() -> stopWatch.stop().start("node_connections_service")); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 252f3975bab25..f284a425a417b 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -12,11 +12,18 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -24,6 +31,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.index.IndexService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; @@ -51,6 +59,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -91,6 +100,8 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private SegmentReplicationState state; private ReplicationCheckpoint initialCheckpoint; + private ClusterState clusterState; + private static final long TRANSPORT_TIMEOUT = 30000;// 30sec @Override @@ -129,7 +140,7 @@ public void setUp() throws Exception { indicesService = mock(IndicesService.class); ClusterService clusterService = mock(ClusterService.class); - ClusterState clusterState = mock(ClusterState.class); + clusterState = mock(ClusterState.class); RoutingTable mockRoutingTable = mock(RoutingTable.class); when(clusterService.state()).thenReturn(clusterState); when(clusterState.routingTable()).thenReturn(mockRoutingTable); @@ -465,9 +476,22 @@ public void testStartReplicationListenerFailure() throws InterruptedException { verify(spy, (never())).updateVisibleCheckpoint(eq(0L), eq(replicaShard)); } - public void testDoNotProcessLatestCheckpointIfItIsbehind() { - sut.updateLatestReceivedCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard); - assertFalse(sut.processLatestReceivedCheckpoint(replicaShard, null)); + public void testDoNotProcessLatestCheckpointIfCheckpointIsBehind() { + SegmentReplicationTargetService service = spy(sut); + doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any()); + ReplicationCheckpoint checkpoint = replicaShard.getLatestReplicationCheckpoint(); + service.updateLatestReceivedCheckpoint(checkpoint, replicaShard); + service.processLatestReceivedCheckpoint(replicaShard, null); + verify(service, times(0)).startReplication(eq(replicaShard), eq(checkpoint), any()); + } + + public void testProcessLatestCheckpointIfCheckpointAhead() { + SegmentReplicationTargetService service = spy(sut); + doNothing().when(service).startReplication(any()); + doReturn(mock(SegmentReplicationTarget.class)).when(service).startReplication(any(), any(), any()); + service.updateLatestReceivedCheckpoint(aheadCheckpoint, replicaShard); + service.processLatestReceivedCheckpoint(replicaShard, null); + verify(service, times(1)).startReplication(eq(replicaShard), eq(aheadCheckpoint), any()); } public void testOnNewCheckpointInvokedOnClosedShardDoesNothing() throws IOException { @@ -617,4 +641,46 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile target.cancel("test"); sut.startReplication(target); } + + public void testProcessCheckpointOnClusterStateUpdate() { + // set up mocks on indicies & index service to return our replica's index & shard. + IndexService indexService = mock(IndexService.class); + when(indexService.iterator()).thenReturn(Set.of(replicaShard).iterator()); + when(indexService.getIndexSettings()).thenReturn(replicaShard.indexSettings()); + when(indexService.index()).thenReturn(replicaShard.routingEntry().index()); + when(indicesService.iterator()).thenReturn(Set.of(indexService).iterator()); + + // create old & new cluster states + final String targetNodeId = "targetNodeId"; + ShardRouting initialRouting = primaryShard.routingEntry().relocate(targetNodeId, 0L); + assertEquals(ShardRoutingState.RELOCATING, initialRouting.state()); + + ShardRouting targetRouting = ShardRouting.newUnassigned( + primaryShard.shardId(), + true, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, "test") + ).initialize(targetNodeId, initialRouting.allocationId().getId(), 0L).moveToStarted(); + assertEquals(targetNodeId, targetRouting.currentNodeId()); + assertEquals(ShardRoutingState.STARTED, targetRouting.state()); + ClusterState oldState = ClusterState.builder(ClusterName.DEFAULT) + .routingTable( + RoutingTable.builder() + .add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(initialRouting).build()) + .build() + ) + .build(); + ClusterState newState = ClusterState.builder(ClusterName.DEFAULT) + .routingTable( + RoutingTable.builder() + .add(IndexRoutingTable.builder(primaryShard.shardId().getIndex()).addShard(targetRouting).build()) + .build() + ) + .build(); + + // spy so we can verify process is invoked + SegmentReplicationTargetService spy = spy(sut); + spy.clusterChanged(new ClusterChangedEvent("ignored", oldState, newState)); + verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any()); + } } From 77a4daf3a13866ab1887dc4174f63ff1fb9912fb Mon Sep 17 00:00:00 2001 From: Vikas Bansal <43470111+vikasvb90@users.noreply.github.com> Date: Fri, 1 Dec 2023 13:56:33 +0530 Subject: [PATCH 2/8] Optimizations in s3 async upload flow (#11327) Signed-off-by: vikasvb90 --- .../repositories/s3/S3AsyncService.java | 2 +- .../repositories/s3/S3BlobContainer.java | 51 ++++++- .../repositories/s3/S3BlobStore.java | 19 +++ .../repositories/s3/S3Repository.java | 16 +++ .../repositories/s3/S3RepositoryPlugin.java | 4 +- .../opensearch/repositories/s3/S3Service.java | 2 +- .../s3/async/AsyncPartsHandler.java | 34 +++-- .../s3/async/AsyncTransferManager.java | 20 ++- .../repositories/s3/async/UploadRequest.java | 10 +- .../s3/S3BlobContainerMockClientTests.java | 125 +++++++++++++++++- .../s3/async/AsyncTransferManagerTests.java | 8 +- 11 files changed, 263 insertions(+), 28 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java index 262304029a0d3..d691cad9c9d03 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java @@ -374,7 +374,7 @@ private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) { return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName); } - private synchronized void releaseCachedClients() { + public synchronized void releaseCachedClients() { // the clients will shutdown when they will not be used anymore for (final AmazonAsyncS3Reference clientReference : clientsCache.values()) { clientReference.decRef(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index c1180aab0e0c7..3a55fcb0bdbcd 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -91,6 +91,7 @@ import org.opensearch.repositories.s3.async.UploadRequest; import org.opensearch.repositories.s3.utils.HttpRangeUtils; +import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -188,10 +189,38 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp writeContext.getWritePriority(), writeContext.getUploadFinalizer(), writeContext.doRemoteDataIntegrityCheck(), - writeContext.getExpectedChecksum() + writeContext.getExpectedChecksum(), + blobStore.isUploadRetryEnabled() ); try { - long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize()); + if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) { + StreamContext streamContext = SocketAccess.doPrivileged( + () -> writeContext.getStreamProvider(uploadRequest.getContentLength()) + ); + InputStreamContainer inputStream = streamContext.provideStream(0); + try { + executeMultipartUpload( + blobStore, + uploadRequest.getKey(), + inputStream.getInputStream(), + uploadRequest.getContentLength() + ); + completionListener.onResponse(null); + } catch (Exception ex) { + logger.error( + () -> new ParameterizedMessage( + "Failed to upload large file {} of size {} ", + uploadRequest.getKey(), + uploadRequest.getContentLength() + ), + ex + ); + completionListener.onFailure(ex); + } + return; + } + long partSize = blobStore.getAsyncTransferManager() + .calculateOptimalPartSize(writeContext.getFileSize(), writeContext.getWritePriority(), blobStore.isUploadRetryEnabled()); StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize)); try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { @@ -537,8 +566,14 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin PutObjectRequest putObjectRequest = putObjectRequestBuilder.build(); try (AmazonS3Reference clientReference = blobStore.clientReference()) { + final InputStream requestInputStream; + if (blobStore.isUploadRetryEnabled()) { + requestInputStream = new BufferedInputStream(input, (int) (blobSize + 1)); + } else { + requestInputStream = input; + } SocketAccess.doPrivilegedVoid( - () -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(input, blobSize)) + () -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(requestInputStream, blobSize)) ); } catch (final SdkException e) { throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e); @@ -578,6 +613,13 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); } + final InputStream requestInputStream; + if (blobStore.isUploadRetryEnabled()) { + requestInputStream = new BufferedInputStream(input, (int) (partSize + 1)); + } else { + requestInputStream = input; + } + CreateMultipartUploadRequest createMultipartUploadRequest = createMultipartUploadRequestBuilder.build(); try (AmazonS3Reference clientReference = blobStore.clientReference()) { uploadId.set( @@ -601,10 +643,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, .build(); bytesCount += uploadPartRequest.contentLength(); - final UploadPartResponse uploadResponse = SocketAccess.doPrivileged( () -> clientReference.get() - .uploadPart(uploadPartRequest, RequestBody.fromInputStream(input, uploadPartRequest.contentLength())) + .uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, uploadPartRequest.contentLength())) ); parts.add(CompletedPart.builder().partNumber(uploadPartRequest.partNumber()).eTag(uploadResponse.eTag()).build()); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index e8e043357e126..fc70fbb0db00e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -56,8 +56,10 @@ import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING; import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING; +import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD; import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING; import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING; +import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED; class S3BlobStore implements BlobStore { @@ -71,6 +73,10 @@ class S3BlobStore implements BlobStore { private volatile ByteSizeValue bufferSize; + private volatile boolean redirectLargeUploads; + + private volatile boolean uploadRetryEnabled; + private volatile boolean serverSideEncryption; private volatile ObjectCannedACL cannedACL; @@ -119,6 +125,9 @@ class S3BlobStore implements BlobStore { this.normalExecutorBuilder = normalExecutorBuilder; this.priorityExecutorBuilder = priorityExecutorBuilder; this.urgentExecutorBuilder = urgentExecutorBuilder; + // Settings to initialize blobstore with. + this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); + this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -130,6 +139,8 @@ public void reload(RepositoryMetadata repositoryMetadata) { this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings())); this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings())); this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings()); + this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); + this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -149,6 +160,14 @@ int getMaxRetries() { return service.settings(repositoryMetadata).maxRetries; } + public boolean isRedirectLargeUploads() { + return redirectLargeUploads; + } + + public boolean isUploadRetryEnabled() { + return uploadRetryEnabled; + } + public String bucket() { return bucket; } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 728a99b1220a6..f7772a57c9afd 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -147,6 +147,20 @@ class S3Repository extends MeteredBlobStoreRepository { */ static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB); + /** + * Whether large uploads need to be redirected to slow sync s3 client. + */ + static final Setting REDIRECT_LARGE_S3_UPLOAD = Setting.boolSetting( + "redirect_large_s3_upload", + true, + Setting.Property.NodeScope + ); + + /** + * Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries. + */ + static final Setting UPLOAD_RETRY_ENABLED = Setting.boolSetting("s3_upload_retry_enabled", true, Setting.Property.NodeScope); + /** * Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, * the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and @@ -391,7 +405,9 @@ public void reload(RepositoryMetadata newRepositoryMetadata) { // Reload configs for S3RepositoryPlugin service.settings(metadata); + service.releaseCachedClients(); s3AsyncService.settings(metadata); + s3AsyncService.releaseCachedClients(); // Reload configs for S3BlobStore BlobStore blobStore = getBlobStore(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index dd420baa970d9..e7d2a4d024e60 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -261,7 +261,9 @@ public List> getSettings() { S3ClientSettings.IDENTITY_TOKEN_FILE_SETTING, S3ClientSettings.ROLE_SESSION_NAME_SETTING, S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING, - S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING + S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING, + S3Repository.REDIRECT_LARGE_S3_UPLOAD, + S3Repository.UPLOAD_RETRY_ENABLED ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java index b1b3e19eac275..24387fb98a425 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java @@ -438,7 +438,7 @@ private static IrsaCredentials buildFromEnviroment(IrsaCredentials defaults) { return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName); } - private synchronized void releaseCachedClients() { + public synchronized void releaseCachedClients() { // the clients will shutdown when they will not be used anymore for (final AmazonS3Reference clientReference : clientsCache.values()) { clientReference.decRef(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index 2bead6b588696..b4c4ed0ecaa75 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -23,7 +23,6 @@ import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.s3.SocketAccess; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; @@ -55,8 +54,8 @@ public class AsyncPartsHandler { * @param uploadId Upload Id against which multi-part is being performed * @param completedParts Reference of completed parts * @param inputStreamContainers Checksum containers - * @return list of completable futures * @param statsMetricPublisher sdk metric publisher + * @return list of completable futures * @throws IOException thrown in case of an IO error */ public static List> uploadParts( @@ -69,7 +68,8 @@ public static List> uploadParts( String uploadId, AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, - StatsMetricPublisher statsMetricPublisher + StatsMetricPublisher statsMetricPublisher, + boolean uploadRetryEnabled ) throws IOException { List> futures = new ArrayList<>(); for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { @@ -95,7 +95,8 @@ public static List> uploadParts( futures, uploadPartRequestBuilder.build(), inputStreamContainer, - uploadRequest + uploadRequest, + uploadRetryEnabled ); } @@ -132,6 +133,18 @@ public static void cleanUpParts(S3AsyncClient s3AsyncClient, UploadRequest uploa })); } + public static InputStream maybeRetryInputStream( + InputStream inputStream, + WritePriority writePriority, + boolean uploadRetryEnabled, + long contentLength + ) { + if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { + return new BufferedInputStream(inputStream, (int) (contentLength + 1)); + } + return inputStream; + } + private static void uploadPart( S3AsyncClient s3AsyncClient, ExecutorService executorService, @@ -142,7 +155,8 @@ private static void uploadPart( List> futures, UploadPartRequest uploadPartRequest, InputStreamContainer inputStreamContainer, - UploadRequest uploadRequest + UploadRequest uploadRequest, + boolean uploadRetryEnabled ) { Integer partNumber = uploadPartRequest.partNumber(); @@ -154,9 +168,13 @@ private static void uploadPart( } else { streamReadExecutor = executorService; } - // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered - // data can be retried instead of retrying whole file by the application. - InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); + + InputStream inputStream = maybeRetryInputStream( + inputStreamContainer.getInputStream(), + uploadRequest.getWritePriority(), + uploadRetryEnabled, + uploadPartRequest.contentLength() + ); CompletableFuture uploadPartResponseFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.uploadPart( uploadPartRequest, diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 46fbdd3d0487b..2259780c95276 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -34,11 +34,11 @@ import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.util.ByteUtils; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.s3.SocketAccess; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; @@ -183,7 +183,8 @@ private void doUploadInParts( uploadId, completedParts, inputStreamContainers, - statsMetricPublisher + statsMetricPublisher, + uploadRequest.isUploadRetryEnabled() ); } catch (Exception ex) { try { @@ -302,10 +303,13 @@ private static void handleException(CompletableFuture returnFuture, Suppli /** * Calculates the optimal part size of each part request if the upload operation is carried out as multipart upload. */ - public long calculateOptimalPartSize(long contentLengthOfSource) { + public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority writePriority, boolean uploadRetryEnabled) { if (contentLengthOfSource < ByteSizeUnit.MB.toBytes(5)) { return contentLengthOfSource; } + if (uploadRetryEnabled && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { + return new ByteSizeValue(5, ByteSizeUnit.MB).getBytes(); + } double optimalPartSize = contentLengthOfSource / (double) MAX_UPLOAD_PARTS; optimalPartSize = Math.ceil(optimalPartSize); return (long) Math.max(optimalPartSize, minimumPartSize); @@ -335,9 +339,13 @@ private void uploadInOneChunk( } else { streamReadExecutor = executorService; } - // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered - // data can be retried instead of retrying whole file by the application. - InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); + + InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream( + inputStreamContainer.getInputStream(), + uploadRequest.getWritePriority(), + uploadRequest.isUploadRetryEnabled(), + uploadRequest.getContentLength() + ); CompletableFuture putObjectFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.putObject( putObjectRequestBuilder.build(), diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java index 3804c8417eb9f..a5304dc4a97d6 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java @@ -25,6 +25,8 @@ public class UploadRequest { private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; + private boolean uploadRetryEnabled; + /** * Construct a new UploadRequest object * @@ -43,7 +45,8 @@ public UploadRequest( WritePriority writePriority, CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, - Long expectedChecksum + Long expectedChecksum, + boolean uploadRetryEnabled ) { this.bucket = bucket; this.key = key; @@ -52,6 +55,7 @@ public UploadRequest( this.uploadFinalizer = uploadFinalizer; this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; + this.uploadRetryEnabled = uploadRetryEnabled; } public String getBucket() { @@ -81,4 +85,8 @@ public boolean doRemoteDataIntegrityCheck() { public Long getExpectedChecksum() { return expectedChecksum; } + + public boolean isUploadRetryEnabled() { + return uploadRetryEnabled; + } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 7c67519f2f3b0..8c7e196d7c812 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -9,7 +9,10 @@ package org.opensearch.repositories.s3; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; @@ -18,8 +21,10 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.StorageClass; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; @@ -37,6 +42,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; @@ -61,15 +67,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class S3BlobContainerMockClientTests extends OpenSearchTestCase implements ConfigPathSupport { @@ -516,7 +528,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { + }, blobSize, false, WritePriority.NORMAL, uploadSuccess -> { assertTrue(uploadSuccess); if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); @@ -546,4 +558,115 @@ private long calculateLastPartSize(long totalSize, long partSize) { private int calculateNumberOfParts(long contentLength, long partSize) { return (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); } + + public void testFailureWhenLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { + testLargeFilesRedirectedToSlowSyncClient(true); + } + + public void testLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { + testLargeFilesRedirectedToSlowSyncClient(false); + } + + private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) throws IOException, InterruptedException { + final ByteSizeValue partSize = new ByteSizeValue(1024, ByteSizeUnit.MB); + + int numberOfParts = 20; + final long lastPartSize = new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(); + final long blobSize = ((numberOfParts - 1) * partSize.getBytes()) + lastPartSize; + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + ActionListener completionListener = ActionListener.wrap(resp -> { countDownLatch.countDown(); }, ex -> { + exceptionRef.set(ex); + countDownLatch.countDown(); + }); + + final String bucketName = randomAlphaOfLengthBetween(1, 10); + + final BlobPath blobPath = new BlobPath(); + if (randomBoolean()) { + IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); + } + + final long bufferSize = ByteSizeUnit.MB.toBytes(randomIntBetween(5, 1024)); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + + final boolean serverSideEncryption = randomBoolean(); + when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); + + final StorageClass storageClass = randomFrom(StorageClass.values()); + when(blobStore.getStorageClass()).thenReturn(storageClass); + when(blobStore.isRedirectLargeUploads()).thenReturn(true); + + final ObjectCannedACL cannedAccessControlList = randomBoolean() ? randomFrom(ObjectCannedACL.values()) : null; + if (cannedAccessControlList != null) { + when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); + } + + final S3Client client = mock(S3Client.class); + final AmazonS3Reference clientReference = Mockito.spy(new AmazonS3Reference(client)); + doNothing().when(clientReference).close(); + when(blobStore.clientReference()).thenReturn(clientReference); + final CreateMultipartUploadResponse createMultipartUploadResponse = CreateMultipartUploadResponse.builder() + .uploadId(randomAlphaOfLength(10)) + .build(); + when(client.createMultipartUpload(any(CreateMultipartUploadRequest.class))).thenReturn(createMultipartUploadResponse); + if (expectException) { + when(client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenThrow( + SdkException.create("Expected upload part request to fail", new RuntimeException()) + ); + } else { + when(client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenReturn(UploadPartResponse.builder().build()); + } + + // Fail the completion request + when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn( + CompleteMultipartUploadResponse.builder().build() + ); + when(client.abortMultipartUpload(any(AbortMultipartUploadRequest.class))).thenReturn( + AbortMultipartUploadResponse.builder().build() + ); + + List openInputStreams = new ArrayList<>(); + final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore)); + s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() { + @Override + public StreamContext supplyStreamContext(long partSize) { + return new StreamContext(new CheckedTriFunction() { + @Override + public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + } + }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); + } + }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); + + assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); + if (expectException) { + assertNotNull(exceptionRef.get()); + } else { + assertNull(exceptionRef.get()); + } + verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong()); + + if (expectException) { + verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + } else { + verify(client, times(0)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + } + + openInputStreams.forEach(inputStream -> { + try { + inputStream.close(); + } catch (IOException ex) { + logger.error("Error closing input stream"); + } + }); + } + } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 2437547a80a6f..b753b847df869 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -82,7 +82,7 @@ public void testOneChunkUpload() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, false, null), + }, false, null, true), new StreamContext((partIdx, partSize, position) -> { streamRef.set(new ZeroInputStream(partSize)); return new InputStreamContainer(streamRef.get(), partSize, position); @@ -127,7 +127,7 @@ public void testOneChunkUploadCorruption() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, false, null), + }, false, null, true), new StreamContext( (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), ByteSizeUnit.MB.toBytes(1), @@ -180,7 +180,7 @@ public void testMultipartUpload() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, true, 3376132981L), + }, true, 3376132981L, true), new StreamContext((partIdx, partSize, position) -> { InputStream stream = new ZeroInputStream(partSize); streams.add(stream); @@ -240,7 +240,7 @@ public void testMultipartUploadCorruption() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, true, 0L), + }, true, 0L, true), new StreamContext( (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), ByteSizeUnit.MB.toBytes(1), From 69cc2a1b41a3e853675a9a2aa2ab47c9338bea37 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Fri, 1 Dec 2023 17:39:41 +0530 Subject: [PATCH 3/8] Fix for flaky test IndexServiceTests.testAsyncTranslogTrimTaskOnClosedIndex (#11337) Signed-off-by: Ankit Kala --- .../org/opensearch/index/IndexServiceTests.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/IndexServiceTests.java b/server/src/test/java/org/opensearch/index/IndexServiceTests.java index db9f4bd305c79..14451ef21726e 100644 --- a/server/src/test/java/org/opensearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/opensearch/index/IndexServiceTests.java @@ -452,12 +452,7 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { assertTrue(indexService.getTrimTranslogTask().mustReschedule()); final Engine readOnlyEngine = getEngine(indexService.getShard(0)); - assertBusy( - () -> assertThat( - readOnlyEngine.translogManager().getTranslogStats().getTranslogSizeInBytes(), - equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES) - ) - ); + assertBusy(() -> assertTrue(isTranslogEmpty(readOnlyEngine))); assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT)); @@ -467,6 +462,12 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0)); } + boolean isTranslogEmpty(Engine engine) { + long tlogSize = engine.translogManager().getTranslogStats().getTranslogSizeInBytes(); + // translog contains 1(or 2 in some corner cases) empty readers. + return tlogSize == Translog.DEFAULT_HEADER_SIZE_IN_BYTES || tlogSize == 2 * Translog.DEFAULT_HEADER_SIZE_IN_BYTES; + } + public void testIllegalFsyncInterval() { Settings settings = Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable From 0f835e02dd07cb880e4bf8c7331fb061ecfd4fa0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Dec 2023 09:34:38 -0500 Subject: [PATCH 4/8] Bump actions/setup-java from 3 to 4 (#11447) * Bump actions/setup-java from 3 to 4 Bumps [actions/setup-java](https://github.com/actions/setup-java) from 3 to 4. - [Release notes](https://github.com/actions/setup-java/releases) - [Commits](https://github.com/actions/setup-java/compare/v3...v4) --- updated-dependencies: - dependency-name: actions/setup-java dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- .github/workflows/assemble.yml | 2 +- .github/workflows/lucene-snapshots.yml | 2 +- .github/workflows/precommit.yml | 2 +- .github/workflows/publish-maven-snapshots.yml | 2 +- CHANGELOG.md | 1 + 5 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/assemble.yml b/.github/workflows/assemble.yml index 87cecdf38c072..382105364c048 100644 --- a/.github/workflows/assemble.yml +++ b/.github/workflows/assemble.yml @@ -12,7 +12,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: ${{ matrix.java }} distribution: temurin diff --git a/.github/workflows/lucene-snapshots.yml b/.github/workflows/lucene-snapshots.yml index d6b37051c032e..05ca93e7be2aa 100644 --- a/.github/workflows/lucene-snapshots.yml +++ b/.github/workflows/lucene-snapshots.yml @@ -35,7 +35,7 @@ jobs: echo "REVISION=$(git rev-parse --short HEAD)" >> $GITHUB_ENV - name: Setup JDK ${{ env.JAVA_VERSION }} - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: ${{ env.JAVA_VERSION }} distribution: 'temurin' diff --git a/.github/workflows/precommit.yml b/.github/workflows/precommit.yml index cd75eb47946a4..800aacec98516 100644 --- a/.github/workflows/precommit.yml +++ b/.github/workflows/precommit.yml @@ -12,7 +12,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: java-version: ${{ matrix.java }} distribution: temurin diff --git a/.github/workflows/publish-maven-snapshots.yml b/.github/workflows/publish-maven-snapshots.yml index 93bbfb8bbeab8..1b2db22c7c20b 100644 --- a/.github/workflows/publish-maven-snapshots.yml +++ b/.github/workflows/publish-maven-snapshots.yml @@ -20,7 +20,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Set up JDK 17 - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: adopt java-version: 17 diff --git a/CHANGELOG.md b/CHANGELOG.md index b0def508db314..4ae634cd67255 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -139,6 +139,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `org.apache.commons:commons-text` from 1.10.0 to 1.11.0 ([#11344](https://github.com/opensearch-project/OpenSearch/pull/11344)) - Bump `reactor-netty-core` from 1.1.12 to 1.1.13 ([#11350](https://github.com/opensearch-project/OpenSearch/pull/11350)) - Bump `com.gradle.enterprise` from 3.14.1 to 3.15.1 ([#11339](https://github.com/opensearch-project/OpenSearch/pull/11339)) +- Bump `actions/setup-java` from 3 to 4 ([#11447](https://github.com/opensearch-project/OpenSearch/pull/11447)) ### Changed - Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840)) From aeac27bc5c0f657a32792758e88e93aef58c4a9a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Dec 2023 10:40:22 -0500 Subject: [PATCH 5/8] Bump commons-net:commons-net from 3.9.0 to 3.10.0 in /test/fixtures/hdfs-fixture (#11450) * Bump commons-net:commons-net in /test/fixtures/hdfs-fixture Bumps commons-net:commons-net from 3.9.0 to 3.10.0. --- updated-dependencies: - dependency-name: commons-net:commons-net dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 1 + test/fixtures/hdfs-fixture/build.gradle | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ae634cd67255..c763c797d7fca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -140,6 +140,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `reactor-netty-core` from 1.1.12 to 1.1.13 ([#11350](https://github.com/opensearch-project/OpenSearch/pull/11350)) - Bump `com.gradle.enterprise` from 3.14.1 to 3.15.1 ([#11339](https://github.com/opensearch-project/OpenSearch/pull/11339)) - Bump `actions/setup-java` from 3 to 4 ([#11447](https://github.com/opensearch-project/OpenSearch/pull/11447)) +- Bump `commons-net:commons-net` from 3.9.0 to 3.10.0 ([#11450](https://github.com/opensearch-project/OpenSearch/pull/11450)) ### Changed - Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840)) diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index ea677de632254..2b56a0fc3f993 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -66,7 +66,7 @@ dependencies { api "org.eclipse.jetty.websocket:javax-websocket-server-impl:${versions.jetty}" api 'org.apache.zookeeper:zookeeper:3.9.1' api "org.apache.commons:commons-text:1.11.0" - api "commons-net:commons-net:3.9.0" + api "commons-net:commons-net:3.10.0" runtimeOnly "com.google.guava:guava:${versions.guava}" runtimeOnly("com.squareup.okhttp3:okhttp:4.12.0") { exclude group: "com.squareup.okio" From 3f5432ea9beaef3178ca578744b10965219199b2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Dec 2023 11:07:12 -0500 Subject: [PATCH 6/8] Bump commons-io:commons-io from 2.13.0 to 2.15.1 (#11446) * Bump commons-io:commons-io from 2.13.0 to 2.15.1 Bumps commons-io:commons-io from 2.13.0 to 2.15.1. --- updated-dependencies: - dependency-name: commons-io:commons-io dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 2 +- buildSrc/build.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c763c797d7fca..91f4920c032b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -119,7 +119,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Dependencies - Bump Lucene from 9.7.0 to 9.8.0 ([10276](https://github.com/opensearch-project/OpenSearch/pull/10276)) -- Bump `commons-io:commons-io` from 2.13.0 to 2.15.0 ([#10294](https://github.com/opensearch-project/OpenSearch/pull/10294), [#11001](https://github.com/opensearch-project/OpenSearch/pull/11001), [#11002](https://github.com/opensearch-project/OpenSearch/pull/11002)) +- Bump `commons-io:commons-io` from 2.13.0 to 2.15.1 ([#10294](https://github.com/opensearch-project/OpenSearch/pull/10294), [#11001](https://github.com/opensearch-project/OpenSearch/pull/11001), [#11002](https://github.com/opensearch-project/OpenSearch/pull/11002), [#11446](https://github.com/opensearch-project/OpenSearch/pull/11446)) - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) - Bump `com.netflix.nebula.ospackage-base` from 11.4.0 to 11.5.0 ([#10295](https://github.com/opensearch-project/OpenSearch/pull/10295)) - Bump `org.apache.zookeeper:zookeeper` from 3.9.0 to 3.9.1 ([#10506](https://github.com/opensearch-project/OpenSearch/pull/10506)) diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 4d2e02646cc33..9cb73034a6e6c 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -109,7 +109,7 @@ dependencies { api 'com.netflix.nebula:nebula-publishing-plugin:20.3.0' api 'com.netflix.nebula:gradle-info-plugin:12.1.6' api 'org.apache.rat:apache-rat:0.15' - api 'commons-io:commons-io:2.13.0' + api 'commons-io:commons-io:2.15.1' api "net.java.dev.jna:jna:5.13.0" api 'com.github.johnrengelman:shadow:8.1.1' api 'org.jdom:jdom2:2.0.6.1' From 35c3532e273852c013cd41cdc1770da027f90f9c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Dec 2023 11:40:54 -0500 Subject: [PATCH 7/8] Bump org.apache.maven:maven-model from 3.9.4 to 3.9.6 (#11445) * Bump org.apache.maven:maven-model from 3.9.4 to 3.9.6 Bumps [org.apache.maven:maven-model](https://github.com/apache/maven) from 3.9.4 to 3.9.6. - [Release notes](https://github.com/apache/maven/releases) - [Commits](https://github.com/apache/maven/compare/maven-3.9.4...maven-3.9.6) --- updated-dependencies: - dependency-name: org.apache.maven:maven-model dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 1 + buildSrc/build.gradle | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 91f4920c032b8..54781eaac0812 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -141,6 +141,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `com.gradle.enterprise` from 3.14.1 to 3.15.1 ([#11339](https://github.com/opensearch-project/OpenSearch/pull/11339)) - Bump `actions/setup-java` from 3 to 4 ([#11447](https://github.com/opensearch-project/OpenSearch/pull/11447)) - Bump `commons-net:commons-net` from 3.9.0 to 3.10.0 ([#11450](https://github.com/opensearch-project/OpenSearch/pull/11450)) +- Bump `org.apache.maven:maven-model` from 3.9.4 to 3.9.6 ([#11445](https://github.com/opensearch-project/OpenSearch/pull/11445)) ### Changed - Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840)) diff --git a/buildSrc/build.gradle b/buildSrc/build.gradle index 9cb73034a6e6c..b8db8504d5b85 100644 --- a/buildSrc/build.gradle +++ b/buildSrc/build.gradle @@ -117,7 +117,7 @@ dependencies { api 'de.thetaphi:forbiddenapis:3.6' api 'com.avast.gradle:gradle-docker-compose-plugin:0.17.5' api "org.yaml:snakeyaml:${props.getProperty('snakeyaml')}" - api 'org.apache.maven:maven-model:3.9.4' + api 'org.apache.maven:maven-model:3.9.6' api 'com.networknt:json-schema-validator:1.0.86' api 'org.jruby.jcodings:jcodings:1.0.58' api 'org.jruby.joni:joni:2.2.1' From 50babcffc007fed9c553b9940612c8b347f12cc6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 4 Dec 2023 12:52:37 -0500 Subject: [PATCH 8/8] Bump org.apache.xmlbeans:xmlbeans from 5.1.1 to 5.2.0 in /plugins/ingest-attachment (#11448) * Bump org.apache.xmlbeans:xmlbeans in /plugins/ingest-attachment Bumps org.apache.xmlbeans:xmlbeans from 5.1.1 to 5.2.0. --- updated-dependencies: - dependency-name: org.apache.xmlbeans:xmlbeans dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Updating SHAs Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 1 + plugins/ingest-attachment/build.gradle | 2 +- plugins/ingest-attachment/licenses/xmlbeans-5.1.1.jar.sha1 | 1 - plugins/ingest-attachment/licenses/xmlbeans-5.2.0.jar.sha1 | 1 + 4 files changed, 3 insertions(+), 2 deletions(-) delete mode 100644 plugins/ingest-attachment/licenses/xmlbeans-5.1.1.jar.sha1 create mode 100644 plugins/ingest-attachment/licenses/xmlbeans-5.2.0.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 54781eaac0812..b6cfe6dd92dd8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -142,6 +142,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `actions/setup-java` from 3 to 4 ([#11447](https://github.com/opensearch-project/OpenSearch/pull/11447)) - Bump `commons-net:commons-net` from 3.9.0 to 3.10.0 ([#11450](https://github.com/opensearch-project/OpenSearch/pull/11450)) - Bump `org.apache.maven:maven-model` from 3.9.4 to 3.9.6 ([#11445](https://github.com/opensearch-project/OpenSearch/pull/11445)) +- Bump `org.apache.xmlbeans:xmlbeans` from 5.1.1 to 5.2.0 ([#11448](https://github.com/opensearch-project/OpenSearch/pull/11448)) ### Changed - Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840)) diff --git a/plugins/ingest-attachment/build.gradle b/plugins/ingest-attachment/build.gradle index 0cfdd8f24325a..57a2493053956 100644 --- a/plugins/ingest-attachment/build.gradle +++ b/plugins/ingest-attachment/build.gradle @@ -79,7 +79,7 @@ dependencies { api "org.apache.poi:poi:${versions.poi}" api "org.apache.poi:poi-ooxml-lite:${versions.poi}" api "commons-codec:commons-codec:${versions.commonscodec}" - api 'org.apache.xmlbeans:xmlbeans:5.1.1' + api 'org.apache.xmlbeans:xmlbeans:5.2.0' api 'org.apache.commons:commons-collections4:4.4' // MS Office api "org.apache.poi:poi-scratchpad:${versions.poi}" diff --git a/plugins/ingest-attachment/licenses/xmlbeans-5.1.1.jar.sha1 b/plugins/ingest-attachment/licenses/xmlbeans-5.1.1.jar.sha1 deleted file mode 100644 index 4d1d2ad0807e7..0000000000000 --- a/plugins/ingest-attachment/licenses/xmlbeans-5.1.1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -48a369df0eccb509d46203104e4df9cb00f0f68b \ No newline at end of file diff --git a/plugins/ingest-attachment/licenses/xmlbeans-5.2.0.jar.sha1 b/plugins/ingest-attachment/licenses/xmlbeans-5.2.0.jar.sha1 new file mode 100644 index 0000000000000..f34274d593697 --- /dev/null +++ b/plugins/ingest-attachment/licenses/xmlbeans-5.2.0.jar.sha1 @@ -0,0 +1 @@ +6198ac997b3f234f2b5393fa415f78fac2e06510 \ No newline at end of file