Skip to content

Commit

Permalink
Changing file upload stats to sync level from file level
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com>
  • Loading branch information
shourya035 committed Jul 11, 2023
1 parent 331cb81 commit 8de6d00
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (!shardRouting.primary()) {
builder.startObject(SubFields.DOWNLOAD);
builder.field(DownloadStatsFields.LAST_DOWNLOAD_TIMESTAMP, remoteSegmentShardStats.lastDownloadTimestampMs);
builder.startObject(DownloadStatsFields.TOTAL_FILES_DOWNLOADED)
builder.startObject(DownloadStatsFields.TOTAL_SYNCS_FROM_REMOTE)
.field(SubFields.STARTED, remoteSegmentShardStats.totalDownloadsStarted)
.field(SubFields.SUCCEEDED, remoteSegmentShardStats.totalDownloadsSucceeded)
.field(SubFields.FAILED, remoteSegmentShardStats.totalDownloadsFailed);
Expand Down Expand Up @@ -224,7 +224,7 @@ static final class DownloadStatsFields {
/**
* Total number of segment files downloaded from the remote store for a specific shard
*/
static final String TOTAL_FILES_DOWNLOADED = "total_file_downloads";
static final String TOTAL_SYNCS_FROM_REMOTE = "total_syncs_from_remote";

/**
* Total bytes of segment files downloaded from the remote store for a specific shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ public class RemoteRefreshSegmentTracker {
*/
private final AtomicReference<MovingAverage> downloadBytesMovingAverageReference;

private final Object downloadBytesMutex = new Object();

/**
* Provides moving average over the last N upload speed (in bytes/s) of segment files uploaded as part of remote refresh.
* N is window size. Wrapped with {@code AtomicReference} for dynamic changes in window size.
Expand All @@ -212,8 +210,6 @@ public class RemoteRefreshSegmentTracker {
*/
private final AtomicReference<MovingAverage> downloadBytesPerSecMovingAverageReference;

private final Object downloadBytesPerSecMutex = new Object();

/**
* Provides moving average over the last N overall upload time (in millis) as part of remote refresh.N is window size.
* Wrapped with {@code AtomicReference} for dynamic changes in window size.
Expand All @@ -230,8 +226,6 @@ public class RemoteRefreshSegmentTracker {

private final int SEGMENT_DOWNLOADS_DEFAULT_WINDOW_SIZE = 20;

private final Object downloadTimeMutex = new Object();

public RemoteRefreshSegmentTracker(
ShardId shardId,
int uploadBytesMovingAverageWindowSize,
Expand Down Expand Up @@ -449,8 +443,8 @@ public long getTotalDownloadsStarted() {
return totalDownloadsStarted;
}

public void addTotalDownloadsStarted(long totalFiles) {
totalDownloadsStarted += totalFiles;
public void incrementTotalDownloadsStarted() {
totalDownloadsStarted += 1;
}

public long getTotalDownloadsFailed() {
Expand Down Expand Up @@ -561,9 +555,7 @@ boolean isDownloadBytesAverageReady() {

public void addDownloadBytes(long size) {
lastSuccessfulSegmentDownloadBytes = size;
synchronized (downloadBytesMutex) {
this.downloadBytesMovingAverageReference.get().record(size);
}
this.downloadBytesMovingAverageReference.get().record(size);
}

boolean isUploadBytesPerSecAverageReady() {
Expand All @@ -589,9 +581,7 @@ boolean isDownloadBytesPerSecAverageReady() {
}

public void addDownloadBytesPerSec(long bytesPerSec) {
synchronized (downloadBytesPerSecMutex) {
this.downloadBytesPerSecMovingAverageReference.get().record(bytesPerSec);
}
this.downloadBytesPerSecMovingAverageReference.get().record(bytesPerSec);
}

/**
Expand Down Expand Up @@ -639,9 +629,7 @@ boolean isDownloadTimeAverageReady() {
}

public void addDownloadTime(long timeMs) {
synchronized (downloadTimeMutex) {
this.downloadTimeMovingAverageReference.get().record(timeMs);
}
this.downloadTimeMovingAverageReference.get().record(timeMs);
}

public RemoteRefreshSegmentTracker.Stats stats() {
Expand Down
39 changes: 24 additions & 15 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4777,23 +4777,32 @@ private String copySegmentFiles(
} else {
skippedSegments.add(file);
}
if (targetRemoteDirectory != null) {

}

}
beforeSegmentDownload(downloadStatsTracker, segmentsToDownload.size(), totalSizeOfSegmentsToDownload);
// Copying segments files to local store directory from remote store directory (Shard Recovery)
for (String file : segmentsToDownload) {
// Upload stats before starting Segment downloads
// Adding a check to prevent stats being published for NoOp runs
if (!segmentsToDownload.isEmpty()) {
beforeSegmentDownloads(downloadStatsTracker, totalSizeOfSegmentsToDownload);
}
long sizeOfDownloadedSegments = 0;
try {
long startTimeInMs = System.currentTimeMillis();
try {
// Copying segments files to local store directory from remote store directory (Shard Recovery)
for (String file : segmentsToDownload) {
// Throws IOException if download fails
storeDirectory.copyFrom(sourceRemoteDirectory, file, file, IOContext.DEFAULT);
storeDirectory.sync(Collections.singleton(file));
downloadedSegments.add(file);
afterSegmentDownloadCompleted(downloadStatsTracker, uploadedSegments.get(file).getLength(), startTimeInMs);
} catch (IOException e) {
afterSegmentDownloadFailed(downloadStatsTracker, uploadedSegments.get(file).getLength());
sizeOfDownloadedSegments += uploadedSegments.get(file).getLength();
}
// Upload stats after download batch is completed
// Adding a check to prevent stats being published for NoOp runs
if (!downloadedSegments.isEmpty()) {
afterSegmentDownloadsCompleted(downloadStatsTracker, sizeOfDownloadedSegments, startTimeInMs);
}
} catch (IOException e) {
// Upload stats if file download batch fails
afterSegmentDownloadsFailed(downloadStatsTracker, totalSizeOfSegmentsToDownload - sizeOfDownloadedSegments);
throw e;
}
// Copying segment files over to target remote directory from local store directory (Snapshot)
for (String file : uploadedSegments.keySet()) {
Expand Down Expand Up @@ -4828,7 +4837,7 @@ private boolean localDirectoryContains(Directory localDirectory, String file, lo
}

// TODO: Move the following three methods also to the generic class with `copySegmentFiles`
private void beforeSegmentDownload(RemoteRefreshSegmentTracker downloadStatsTracker, long totalFiles, long incomingFilesSize) {
private void beforeSegmentDownloads(RemoteRefreshSegmentTracker downloadStatsTracker, long incomingFilesSize) {
// The `copySegmentFiles` method is being used for both
// - `syncSegmentsFromRemoteSegmentStore` (Remote store based shard recovery)
// - `syncSegmentsFromGivenRemoteSegmentStore` (Snapshot inter-op with remote store)
Expand All @@ -4837,11 +4846,11 @@ private void beforeSegmentDownload(RemoteRefreshSegmentTracker downloadStatsTrac
if (!indexSettings.isRemoteStoreEnabled()) {
return;
}
downloadStatsTracker.addTotalDownloadsStarted(totalFiles);
downloadStatsTracker.incrementTotalDownloadsStarted();
downloadStatsTracker.addDownloadBytesStarted(incomingFilesSize);
}

private void afterSegmentDownloadCompleted(
private void afterSegmentDownloadsCompleted(
RemoteRefreshSegmentTracker downloadStatsTracker,
long downloadedFileSize,
long startTimeInMs
Expand All @@ -4859,7 +4868,7 @@ private void afterSegmentDownloadCompleted(
downloadStatsTracker.addDownloadBytesPerSec((downloadedFileSize * 1_000L) / timeTakenInMS);
}

private void afterSegmentDownloadFailed(RemoteRefreshSegmentTracker downloadStatsTracker, long failedFileSize) {
private void afterSegmentDownloadsFailed(RemoteRefreshSegmentTracker downloadStatsTracker, long failedFileSize) {
if (!indexSettings.isRemoteStoreEnabled()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ static void compareStatsResponse(
(int) pressureTrackerStats.lastDownloadTimestampMs
);
assertEquals(
((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_FILES_DOWNLOADED)).get(
((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_SYNCS_FROM_REMOTE)).get(
RemoteStoreStats.SubFields.STARTED
),
(int) pressureTrackerStats.totalDownloadsStarted
);
assertEquals(
((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_FILES_DOWNLOADED)).get(
((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_SYNCS_FROM_REMOTE)).get(
RemoteStoreStats.SubFields.SUCCEEDED
),
(int) pressureTrackerStats.totalDownloadsSucceeded
);
assertEquals(
((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_FILES_DOWNLOADED)).get(
((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_SYNCS_FROM_REMOTE)).get(
RemoteStoreStats.SubFields.FAILED
),
(int) pressureTrackerStats.totalDownloadsFailed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,10 @@ public void testIncrementTotalDownloadsStarted() {
pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(),
pressureSettings.getUploadTimeMovingAverageWindowSize()
);
long firstDownloadBatch = 20;
pressureTracker.addTotalDownloadsStarted(firstDownloadBatch);
assertEquals(firstDownloadBatch, pressureTracker.getTotalDownloadsStarted());
long secondDownloadBatch = 20;
pressureTracker.addTotalDownloadsStarted(secondDownloadBatch);
assertEquals(firstDownloadBatch + secondDownloadBatch, pressureTracker.getTotalDownloadsStarted());
pressureTracker.incrementTotalDownloadsStarted();
assertEquals(1, pressureTracker.getTotalDownloadsStarted());
pressureTracker.incrementTotalDownloadsStarted();
assertEquals(2, pressureTracker.getTotalDownloadsStarted());
}

public void testIncrementTotalDownloadsFailed() {
Expand Down Expand Up @@ -387,9 +385,9 @@ public void testGetInflightDownloads() {
pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(),
pressureSettings.getUploadTimeMovingAverageWindowSize()
);
pressureTracker.addTotalDownloadsStarted(1);
pressureTracker.incrementTotalDownloadsStarted();
assertEquals(1, pressureTracker.getInflightDownloads());
pressureTracker.addTotalDownloadsStarted(1);
pressureTracker.incrementTotalDownloadsStarted();
assertEquals(2, pressureTracker.getInflightDownloads());
pressureTracker.incrementTotalDownloadsSucceeded();
assertEquals(1, pressureTracker.getInflightDownloads());
Expand Down Expand Up @@ -700,7 +698,7 @@ private RemoteRefreshSegmentTracker constructTracker() {
segmentPressureTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100));
segmentPressureTracker.incrementRejectionCount();
segmentPressureTracker.updateLastDownloadTimestampMs(System.currentTimeMillis());
segmentPressureTracker.addTotalDownloadsStarted(10);
segmentPressureTracker.incrementTotalDownloadsStarted();
segmentPressureTracker.incrementTotalDownloadsSucceeded();
segmentPressureTracker.addDownloadBytesStarted(50);
segmentPressureTracker.addDownloadBytesSucceeded(50);
Expand Down

0 comments on commit 8de6d00

Please sign in to comment.