diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java index c16db06e517de..ce109602cb687 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java @@ -61,7 +61,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (!shardRouting.primary()) { builder.startObject(SubFields.DOWNLOAD); builder.field(DownloadStatsFields.LAST_DOWNLOAD_TIMESTAMP, remoteSegmentShardStats.lastDownloadTimestampMs); - builder.startObject(DownloadStatsFields.TOTAL_FILES_DOWNLOADED) + builder.startObject(DownloadStatsFields.TOTAL_SYNCS_FROM_REMOTE) .field(SubFields.STARTED, remoteSegmentShardStats.totalDownloadsStarted) .field(SubFields.SUCCEEDED, remoteSegmentShardStats.totalDownloadsSucceeded) .field(SubFields.FAILED, remoteSegmentShardStats.totalDownloadsFailed); @@ -224,7 +224,7 @@ static final class DownloadStatsFields { /** * Total number of segment files downloaded from the remote store for a specific shard */ - static final String TOTAL_FILES_DOWNLOADED = "total_file_downloads"; + static final String TOTAL_SYNCS_FROM_REMOTE = "total_syncs_from_remote"; /** * Total bytes of segment files downloaded from the remote store for a specific shard diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java index 9b69a228d055e..d769ebf51ba63 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java @@ -196,8 +196,6 @@ public class RemoteRefreshSegmentTracker { */ private final AtomicReference downloadBytesMovingAverageReference; - private final Object downloadBytesMutex = new Object(); - /** * Provides moving average over the last N upload speed (in bytes/s) of segment files uploaded as part of remote refresh. * N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size. @@ -212,8 +210,6 @@ public class RemoteRefreshSegmentTracker { */ private final AtomicReference downloadBytesPerSecMovingAverageReference; - private final Object downloadBytesPerSecMutex = new Object(); - /** * Provides moving average over the last N overall upload time (in millis) as part of remote refresh.N is window size. * Wrapped with {@code AtomicReference} for dynamic changes in window size. @@ -230,8 +226,6 @@ public class RemoteRefreshSegmentTracker { private final int SEGMENT_DOWNLOADS_DEFAULT_WINDOW_SIZE = 20; - private final Object downloadTimeMutex = new Object(); - public RemoteRefreshSegmentTracker( ShardId shardId, int uploadBytesMovingAverageWindowSize, @@ -449,8 +443,8 @@ public long getTotalDownloadsStarted() { return totalDownloadsStarted; } - public void addTotalDownloadsStarted(long totalFiles) { - totalDownloadsStarted += totalFiles; + public void incrementTotalDownloadsStarted() { + totalDownloadsStarted += 1; } public long getTotalDownloadsFailed() { @@ -561,9 +555,7 @@ boolean isDownloadBytesAverageReady() { public void addDownloadBytes(long size) { lastSuccessfulSegmentDownloadBytes = size; - synchronized (downloadBytesMutex) { - this.downloadBytesMovingAverageReference.get().record(size); - } + this.downloadBytesMovingAverageReference.get().record(size); } boolean isUploadBytesPerSecAverageReady() { @@ -589,9 +581,7 @@ boolean isDownloadBytesPerSecAverageReady() { } public void addDownloadBytesPerSec(long bytesPerSec) { - synchronized (downloadBytesPerSecMutex) { - this.downloadBytesPerSecMovingAverageReference.get().record(bytesPerSec); - } + this.downloadBytesPerSecMovingAverageReference.get().record(bytesPerSec); } /** @@ -639,9 +629,7 @@ boolean isDownloadTimeAverageReady() { } public void addDownloadTime(long timeMs) { - synchronized (downloadTimeMutex) { - this.downloadTimeMovingAverageReference.get().record(timeMs); - } + this.downloadTimeMovingAverageReference.get().record(timeMs); } public RemoteRefreshSegmentTracker.Stats stats() { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9b9ce1cef22af..a4d610766752d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4777,23 +4777,32 @@ private String copySegmentFiles( } else { skippedSegments.add(file); } - if (targetRemoteDirectory != null) { - - } - } - beforeSegmentDownload(downloadStatsTracker, segmentsToDownload.size(), totalSizeOfSegmentsToDownload); - // Copying segments files to local store directory from remote store directory (Shard Recovery) - for (String file : segmentsToDownload) { + // Upload stats before starting Segment downloads + // Adding a check to prevent stats being published for NoOp runs + if (!segmentsToDownload.isEmpty()) { + beforeSegmentDownloads(downloadStatsTracker, totalSizeOfSegmentsToDownload); + } + long sizeOfDownloadedSegments = 0; + try { long startTimeInMs = System.currentTimeMillis(); - try { + // Copying segments files to local store directory from remote store directory (Shard Recovery) + for (String file : segmentsToDownload) { + // Throws IOException if download fails storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT); storeDirectory.sync(Collections.singleton(file)); downloadedSegments.add(file); - afterSegmentDownloadCompleted(downloadStatsTracker, uploadedSegments.get(file).getLength(), startTimeInMs); - } catch (IOException e) { - afterSegmentDownloadFailed(downloadStatsTracker, uploadedSegments.get(file).getLength()); + sizeOfDownloadedSegments += uploadedSegments.get(file).getLength(); + } + // Upload stats after download batch is completed + // Adding a check to prevent stats being published for NoOp runs + if (!downloadedSegments.isEmpty()) { + afterSegmentDownloadsCompleted(downloadStatsTracker, sizeOfDownloadedSegments, startTimeInMs); } + } catch (IOException e) { + // Upload stats if file download batch fails + afterSegmentDownloadsFailed(downloadStatsTracker, totalSizeOfSegmentsToDownload - sizeOfDownloadedSegments); + throw e; } // Copying segment files over to target remote directory from local store directory (Snapshot) for (String file : uploadedSegments.keySet()) { @@ -4828,7 +4837,7 @@ private boolean localDirectoryContains(Directory localDirectory, String file, lo } // TODO: Move the following three methods also to the generic class with `copySegmentFiles` - private void beforeSegmentDownload(RemoteRefreshSegmentTracker downloadStatsTracker, long totalFiles, long incomingFilesSize) { + private void beforeSegmentDownloads(RemoteRefreshSegmentTracker downloadStatsTracker, long incomingFilesSize) { // The `copySegmentFiles` method is being used for both // - `syncSegmentsFromRemoteSegmentStore` (Remote store based shard recovery) // - `syncSegmentsFromGivenRemoteSegmentStore` (Snapshot inter-op with remote store) @@ -4837,11 +4846,11 @@ private void beforeSegmentDownload(RemoteRefreshSegmentTracker downloadStatsTrac if (!indexSettings.isRemoteStoreEnabled()) { return; } - downloadStatsTracker.addTotalDownloadsStarted(totalFiles); + downloadStatsTracker.incrementTotalDownloadsStarted(); downloadStatsTracker.addDownloadBytesStarted(incomingFilesSize); } - private void afterSegmentDownloadCompleted( + private void afterSegmentDownloadsCompleted( RemoteRefreshSegmentTracker downloadStatsTracker, long downloadedFileSize, long startTimeInMs @@ -4859,7 +4868,7 @@ private void afterSegmentDownloadCompleted( downloadStatsTracker.addDownloadBytesPerSec((downloadedFileSize * 1_000L) / timeTakenInMS); } - private void afterSegmentDownloadFailed(RemoteRefreshSegmentTracker downloadStatsTracker, long failedFileSize) { + private void afterSegmentDownloadsFailed(RemoteRefreshSegmentTracker downloadStatsTracker, long failedFileSize) { if (!indexSettings.isRemoteStoreEnabled()) { return; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java index 75696dd3a5cd2..5d5703a1ba642 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java @@ -91,19 +91,19 @@ static void compareStatsResponse( (int) pressureTrackerStats.lastDownloadTimestampMs ); assertEquals( - ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_FILES_DOWNLOADED)).get( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_SYNCS_FROM_REMOTE)).get( RemoteStoreStats.SubFields.STARTED ), (int) pressureTrackerStats.totalDownloadsStarted ); assertEquals( - ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_FILES_DOWNLOADED)).get( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_SYNCS_FROM_REMOTE)).get( RemoteStoreStats.SubFields.SUCCEEDED ), (int) pressureTrackerStats.totalDownloadsSucceeded ); assertEquals( - ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_FILES_DOWNLOADED)).get( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_SYNCS_FROM_REMOTE)).get( RemoteStoreStats.SubFields.FAILED ), (int) pressureTrackerStats.totalDownloadsFailed diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java index bd33e69679e26..c94e8b8648528 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java @@ -290,12 +290,10 @@ public void testIncrementTotalDownloadsStarted() { pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() ); - long firstDownloadBatch = 20; - pressureTracker.addTotalDownloadsStarted(firstDownloadBatch); - assertEquals(firstDownloadBatch, pressureTracker.getTotalDownloadsStarted()); - long secondDownloadBatch = 20; - pressureTracker.addTotalDownloadsStarted(secondDownloadBatch); - assertEquals(firstDownloadBatch + secondDownloadBatch, pressureTracker.getTotalDownloadsStarted()); + pressureTracker.incrementTotalDownloadsStarted(); + assertEquals(1, pressureTracker.getTotalDownloadsStarted()); + pressureTracker.incrementTotalDownloadsStarted(); + assertEquals(2, pressureTracker.getTotalDownloadsStarted()); } public void testIncrementTotalDownloadsFailed() { @@ -387,9 +385,9 @@ public void testGetInflightDownloads() { pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() ); - pressureTracker.addTotalDownloadsStarted(1); + pressureTracker.incrementTotalDownloadsStarted(); assertEquals(1, pressureTracker.getInflightDownloads()); - pressureTracker.addTotalDownloadsStarted(1); + pressureTracker.incrementTotalDownloadsStarted(); assertEquals(2, pressureTracker.getInflightDownloads()); pressureTracker.incrementTotalDownloadsSucceeded(); assertEquals(1, pressureTracker.getInflightDownloads()); @@ -700,7 +698,7 @@ private RemoteRefreshSegmentTracker constructTracker() { segmentPressureTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); segmentPressureTracker.incrementRejectionCount(); segmentPressureTracker.updateLastDownloadTimestampMs(System.currentTimeMillis()); - segmentPressureTracker.addTotalDownloadsStarted(10); + segmentPressureTracker.incrementTotalDownloadsStarted(); segmentPressureTracker.incrementTotalDownloadsSucceeded(); segmentPressureTracker.addDownloadBytesStarted(50); segmentPressureTracker.addDownloadBytesSucceeded(50);