Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add support for refresh level durability #5253

Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `opencensus-contrib-http-util` from 0.18.0 to 0.31.1 ([#3633](https://github.com/opensearch-project/OpenSearch/pull/3633))
- Bump `geoip2` from 3.0.1 to 3.0.2 ([#5103](https://github.com/opensearch-project/OpenSearch/pull/5103))
- Bump gradle-extra-configurations-plugin from 7.0.0 to 8.0.0 ([#4808](https://github.com/opensearch-project/OpenSearch/pull/4808))

### Changed
- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253))

### Deprecated
- Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2748,7 +2748,7 @@ public long tryDeleteDocument(IndexReader readerIn, int docID) {
/**
* Returned the last local checkpoint value has been refreshed internally.
*/
final long lastRefreshedCheckpoint() {
public final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -34,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*
Expand All @@ -44,6 +50,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;
static final String SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX = "segment_infos_snapshot_filename";

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -88,46 +95,67 @@ public void afterRefresh(boolean didRefresh) {
this.remoteDirectory.init();
}
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (!remoteDirectory.containsFile(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration.

Do we need to change this behavior when remote txlog is not enabled ? We will need to fail refresh in that case. Since it might be async , it will be tricky for customer to see the failures though .

Alternatively we need to call out that refresh level/commit level durability is best effort and provide some visibility in what is durably stored at any given point of time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, as of now, commit/refresh level durability is best efforts and we need to combine it with remote translog to provide request level durability.

As I think more, failing refresh/commit on segment upload failure would be useful in following cases:

  1. If we decide to support refresh level durability alone without remote translog
  2. After we integrate Segment Replication with remote segment store

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would have to model the refresh listener as a fail-fast blocking listener. We could add a condition to fail based on whether operation level durability is enabled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. With remote translog enabled, we can change the way segments are uploaded.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does a failure impact staleness of data on replicas independent of operation level durability

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bukhtawar Are you referring to remote store integration with Segment replication?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we track this as an open item

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue: #5578

lastCommittedLocalSegmentFileName,
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)
)) {
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why just first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to avoid triggering deletes post each refresh.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it maybe add that in the comments

// This is done to avoid delete post each refresh.
// Ideally, we want this to be done in async flow. (GitHub issue #4315)
if (isRefreshAfterCommit()) {
deleteStaleCommits();
}

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Collection<String> refreshedLocalFiles = segmentInfos.files(true);

List<String> segmentInfosFiles = refreshedLocalFiles.stream()
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the variable name from refreshedLocalFiles to localSegmentsPostRefresh to make it more clearer but it would mean the diff is shown each time the variable is used.


List<String> segmentInfosFiles = localSegmentsPostRefresh.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toList());
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
.max(Comparator.comparingLong(IndexFileNames::parseGeneration));
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));

if (latestSegmentInfos.isPresent()) {
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true));
// SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain
// all the segments from last commit if they are merged away but not yet committed.
// Each metadata file in the remote segment store represents a commit and the following
// statement keeps sure that each metadata will always contain all the segments from last commit + refreshed
// segments.
localSegmentsPostRefresh.addAll(
SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)
);
segmentInfosFiles.stream()
.filter(file -> !file.equals(latestSegmentInfos.get()))
.forEach(refreshedLocalFiles::remove);
.forEach(localSegmentsPostRefresh::remove);

boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh);
if (uploadStatus) {
segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
localSegmentsPostRefresh.add(segmentInfoSnapshotFilename);

remoteDirectory.uploadMetadata(
refreshedLocalFiles,
localSegmentsPostRefresh,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
localSegmentChecksumMap.keySet()
.stream()
.filter(file -> !refreshedLocalFiles.contains(file))
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
}
}
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
} finally {
try {
if (segmentInfoSnapshotFilename != null) {
storeDirectory.deleteFile(segmentInfoSnapshotFilename);
}
} catch (IOException e) {
logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e);
}
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
Expand All @@ -141,6 +169,39 @@ public void afterRefresh(boolean didRefresh) {
}
}

private boolean isRefreshAfterCommit() throws IOException {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
return (lastCommittedLocalSegmentFileName != null
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
// We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using
// getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up
// in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on
// LOCAL_CHECKPOINT_KEY.
// lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint()
// will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers
// will not be replayed.
assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: "
+ indexShard.getEngine().getClass();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();

Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint));
segmentInfosSnapshot.setUserData(userData, false);

long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename);
String segmentInfoSnapshotFilename = SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + commitGeneration;
try (IndexOutput indexOutput = storeDirectory.createOutput(segmentInfoSnapshotFilename, IOContext.DEFAULT)) {
segmentInfosSnapshot.write(indexOutput);
}
storeDirectory.sync(Collections.singleton(segmentInfoSnapshotFilename));
remoteDirectory.copyFrom(storeDirectory, segmentInfoSnapshotFilename, segmentInfoSnapshotFilename, IOContext.DEFAULT, true);
return segmentInfoSnapshotFilename;
}

// Visible for testing
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -76,6 +78,8 @@
import java.util.stream.Collectors;

import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

/**
* This package private utility class encapsulates the logic to recover an index shard from either an existing index on
Expand Down Expand Up @@ -463,9 +467,30 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
for (String file : storeDirectory.listAll()) {
storeDirectory.deleteFile(file);
}
String segmentInfosSnapshotFilename = null;
for (String file : remoteDirectory.listAll()) {
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) {
segmentInfosSnapshotFilename = file;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would we need the latest file here. The list wouldn't guarantee sorted in the order we need, right ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we create instance of RemoteSegmentStoreDirectory, we initialize it by reading latest metadata file from the remote store. Each metadata file holds reference to only one SegmentInfosSnapshot file.

}
}

if (segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
)
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}

// This creates empty trans-log for now
// ToDo: Add code to restore from remote trans-log
bootstrap(indexShard, store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory {
* To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation
* This is achieved by uploading refresh metadata file with the same UUID suffix.
*/
private String metadataFileUniqueSuffix;
private String commonFilenameSuffix;

/**
* Keeps track of local segment filename to uploaded filename along with other attributes like checksum.
Expand All @@ -92,7 +92,7 @@ public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDi
* @throws IOException if there were any failures in reading the metadata file
*/
public void init() throws IOException {
this.metadataFileUniqueSuffix = UUIDs.base64UUID();
this.commonFilenameSuffix = UUIDs.base64UUID();
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readLatestMetadataFile());
}

Expand Down Expand Up @@ -293,17 +293,26 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
}
}

public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException {
String remoteFilename;
if (useCommonSuffix) {
remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix;
} else {
remoteFilename = getNewRemoteSegmentFilename(dest);
}
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
String checksum = getChecksumOfLocalFile(from, src);
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum);
segmentsUploadedToRemoteStore.put(src, segmentMetadata);
}

/**
* Copies an existing src file from directory from to a non-existent file dest in this directory.
* Once the segment is uploaded to remote segment store, update the cache accordingly.
*/
@Override
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
String remoteFilename = getNewRemoteSegmentFilename(dest);
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
String checksum = getChecksumOfLocalFile(from, src);
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum);
segmentsUploadedToRemoteStore.put(src, segmentMetadata);
copyFrom(from, src, dest, context, false);
}

/**
Expand All @@ -330,7 +339,7 @@ public boolean containsFile(String localFilename, String checksum) {
public void uploadMetadata(Collection<String> segmentFiles, Directory storeDirectory, long primaryTerm, long generation)
throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix);
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.commonFilenameSuffix);
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2736,18 +2736,31 @@ public void restoreShard(
closeShards(target);
}

public void testRestoreShardFromRemoteStore() throws IOException {
public void testRefreshLevelRestoreShardFromRemoteStore() throws IOException {
testRestoreShardFromRemoteStore(false);
}

public void testCommitLevelRestoreShardFromRemoteStore() throws IOException {
testRestoreShardFromRemoteStore(true);
}

public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException {
IndexShard target = newStartedShard(
true,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build(),
new InternalEngineFactory()
);

indexDoc(target, "_doc", "1");
indexDoc(target, "_doc", "2");
target.refresh("test");
assertDocs(target, "1", "2");
flushShard(target);
if (performFlush) {
flushShard(target);
}

ShardRouting routing = ShardRoutingHelper.initWithSameId(
target.routingEntry(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.shard;

import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
Expand All @@ -30,6 +31,8 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
private IndexShard indexShard;
private RemoteStoreRefreshListener remoteStoreRefreshListener;
Expand Down Expand Up @@ -204,13 +207,29 @@ public void onFailure(Exception e) {
private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException {
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentStoreDirectory
.getSegmentsUploadedToRemoteStore();
String segmentsNFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
for (String file : segmentInfos.files(true)) {
if (!RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file)) {
assertTrue(uploadedSegments.containsKey(file));
}
if (file.startsWith(IndexFileNames.SEGMENTS)) {
segmentsNFilename = file;
}
}
}
if (segmentsNFilename != null) {
String commitGeneration = segmentsNFilename.substring((IndexFileNames.SEGMENTS + "_").length());
assertTrue(
uploadedSegments.keySet()
.stream()
.anyMatch(
s -> s.startsWith(
SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + Long.parseLong(commitGeneration, Character.MAX_RADIX)
)
)
);
}
}
}
Loading