Skip to content

Commit

Permalink
Fix local segments stats update in RemoteStoreRefreshListener (opense…
Browse files Browse the repository at this point in the history
…arch-project#8758)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
ashking94 authored and kaushalmahi12 committed Sep 12, 2023
1 parent 251e000 commit 55a12dc
Show file tree
Hide file tree
Showing 11 changed files with 670 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -18,21 +22,26 @@
import org.opensearch.index.store.DirectoryFileTransferTracker;

import java.io.IOException;
import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES;

/**
* Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics.
*
* @opensearch.internal
*/
public class RemoteSegmentTransferTracker {

private final Logger logger;

/**
* ShardId for which this instance tracks the remote segment upload metadata.
*/
Expand Down Expand Up @@ -124,14 +133,15 @@ public class RemoteSegmentTransferTracker {
private final Map<String, AtomicLong> rejectionCountMap = ConcurrentCollections.newConcurrentMap();

/**
* Map of name to size of the segment files created as part of the most recent refresh.
* Keeps track of segment files and their size in bytes which are part of the most recent refresh.
*/
private volatile Map<String, Long> latestLocalFileNameLengthMap;
private final Map<String, Long> latestLocalFileNameLengthMap = ConcurrentCollections.newConcurrentMap();

/**
* Set of names of segment files that were uploaded as part of the most recent remote refresh.
* This contains the files from the last successful remote refresh and ongoing uploads. This gets reset to just the
* last successful remote refresh state on successful remote refresh.
*/
private final Set<String> latestUploadedFiles = new HashSet<>();
private final Set<String> latestUploadedFiles = ConcurrentCollections.newConcurrentSet();

/**
* Keeps the bytes lag computed so that we do not compute it for every request.
Expand Down Expand Up @@ -182,6 +192,7 @@ public RemoteSegmentTransferTracker(
int uploadBytesPerSecMovingAverageWindowSize,
int uploadTimeMsMovingAverageWindowSize
) {
logger = Loggers.getLogger(getClass(), shardId);
this.shardId = shardId;
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
long currentClockTimeMs = System.currentTimeMillis();
Expand All @@ -193,8 +204,6 @@ public RemoteSegmentTransferTracker(
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));

latestLocalFileNameLengthMap = new HashMap<>();
this.directoryFileTransferTracker = directoryFileTransferTracker;
}

Expand All @@ -206,7 +215,8 @@ public long getLocalRefreshSeqNo() {
return localRefreshSeqNo;
}

public void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
// Visible for testing
void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
assert localRefreshSeqNo >= this.localRefreshSeqNo : "newLocalRefreshSeqNo="
+ localRefreshSeqNo
+ " < "
Expand All @@ -224,7 +234,17 @@ public long getLocalRefreshClockTimeMs() {
return localRefreshClockTimeMs;
}

public void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
/**
* Updates the last refresh time and refresh seq no which is seen by local store.
*/
public void updateLocalRefreshTimeAndSeqNo() {
updateLocalRefreshClockTimeMs(System.currentTimeMillis());
updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
updateLocalRefreshSeqNo(getLocalRefreshSeqNo() + 1);
}

// Visible for testing
void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs="
+ localRefreshTimeMs
+ " < "
Expand All @@ -234,7 +254,7 @@ public void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
computeTimeMsLag();
}

public void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
this.localRefreshClockTimeMs = localRefreshClockTimeMs;
}

Expand Down Expand Up @@ -369,12 +389,36 @@ long getRejectionCount(String rejectionReason) {
return rejectionCountMap.get(rejectionReason).get();
}

Map<String, Long> getLatestLocalFileNameLengthMap() {
return latestLocalFileNameLengthMap;
public Map<String, Long> getLatestLocalFileNameLengthMap() {
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
}

public void setLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
/**
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
*
* @param segmentFiles list of local refreshed segment files
* @param fileSizeFunction function is used to determine the file size in bytes
*/
public void updateLatestLocalFileNameLengthMap(
Collection<String> segmentFiles,
CheckedFunction<String, Long, IOException> fileSizeFunction
) {
// Update the map
segmentFiles.stream()
.filter(file -> EXCLUDE_FILES.contains(file) == false)
.filter(file -> latestLocalFileNameLengthMap.containsKey(file) == false || latestLocalFileNameLengthMap.get(file) == 0)
.forEach(file -> {
long fileSize = 0;
try {
fileSize = fileSizeFunction.apply(file);
} catch (IOException e) {
logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e);
}
latestLocalFileNameLengthMap.put(file, fileSize);
});
Set<String> fileSet = new HashSet<>(segmentFiles);
// Remove keys from the fileSizeMap that do not exist in the latest segment files
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
computeBytesLag();
}

Expand All @@ -390,7 +434,7 @@ public void setLatestUploadedFiles(Set<String> files) {
}

private void computeBytesLag() {
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
if (latestLocalFileNameLengthMap.isEmpty()) {
return;
}
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void beforeRefresh() throws IOException {
}

@Override
protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) {
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
Expand Down
Loading

0 comments on commit 55a12dc

Please sign in to comment.