-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Remote Store] Add support for refresh level durability #5253
[Remote Store] Add support for refresh level durability #5253
Conversation
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
try { | ||
if (segment_info_snapshot_filename != null) { | ||
storeDirectory.deleteFile(segment_info_snapshot_filename); | ||
} | ||
} catch (IOException e) { | ||
logger.warn("Exception while deleting: " + segment_info_snapshot_filename, e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
\why are we deleting segment_info_snapshot_filename
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, RemoteDirectory
does not support writing to file in remote store using buffer (we have a plan to support it). So, for any file to upload to remote store, today, it is required to be in the local directory (Referred as storeDirectory
in this code).
There are 2 reasons to delete:
- local directory does not support overriding a file with the same name. This is mostly done to keep the immutable nature of segment files.
- As this file is of no use locally, we are deleting it once if it is uploaded. Even if the upload fails, next refresh can create a new snapshot file.
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Codecov Report
@@ Coverage Diff @@
## main #5253 +/- ##
============================================
+ Coverage 70.95% 71.00% +0.04%
- Complexity 58304 58326 +22
============================================
Files 4733 4733
Lines 278256 278294 +38
Branches 40249 40254 +5
============================================
+ Hits 197441 197594 +153
+ Misses 64628 64553 -75
+ Partials 16187 16147 -40
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Rebase pls, the above should be resolved. |
37be057
to
70a03a7
Compare
Gradle Check (Jenkins) Run Completed with:
|
@@ -44,6 +47,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres | |||
static final Set<String> EXCLUDE_FILES = Set.of("write.lock"); | |||
// Visible for testing | |||
static final int LAST_N_METADATA_FILES_TO_KEEP = 10; | |||
public static final String SEGMENT_INFO_SNAPSHOT_FILENAME = "segment_infos_snapshot_filename"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : should we rename to SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and please make remove public
for (String file : remoteDirectory.listAll()) { | ||
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); | ||
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME)) { | ||
segmentInfosSnapshotFilename = file; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would we need the latest file here. The list wouldn't guarantee sorted in the order we need, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we create instance of RemoteSegmentStoreDirectory
, we initialize it by reading latest metadata file from the remote store. Each metadata file holds reference to only one SegmentInfosSnapshot
file.
@@ -317,6 +317,33 @@ public void testCopyFromException() throws IOException { | |||
storeDirectory.close(); | |||
} | |||
|
|||
public void testCpoyFromOverride() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : spelling of copy.
IndexOutput indexOutput = storeDirectory.createOutput(segment_info_snapshot_filename, IOContext.DEFAULT); | ||
segmentInfosSnapshot.write(indexOutput); | ||
indexOutput.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do try with resource here and ensure we close all streams
store.commitSegmentInfos(infos_snapshot, Long.parseLong(filenameTokens[2]), Long.parseLong(filenameTokens[2])); | ||
} catch (IOException e) { | ||
logger.info("Exception while reading {}, falling back to commit level restore", segmentInfosSnapshotFilename); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that the handling isn't correct, f.e if we support refresh level durability but are unable to guarantee it should be not be a silent(info logging) failure. We should instead throw this exception and let the shard decide on the recovery completion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also lets add assertions on checkpoints and max seq no
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Bukhtawar I will add the required assertions but wanted to check on the first comment.
In the first comment, are you suggesting to fail the restore if refresh level durability is not possible? In that case, we need to provide a way to restore from the last commit if user wants.
Would having a query parameter to the restore API work?
deleteStaleCommits(); | ||
} | ||
String segment_info_snapshot_filename = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow the naming conventions
@@ -106,13 +113,19 @@ public void afterRefresh(boolean didRefresh) { | |||
.max(Comparator.comparingLong(IndexFileNames::parseGeneration)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use:
final String latestSegmentInfos = segmentInfos.getSegmentsFileName();
Instead of filtering the collection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
segmentInfosFiles
can contain multiple segments_N
files. We are making sure to get list of all such files and pick only the latest one by generation.
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); | ||
segmentInfosFiles.stream() | ||
.filter(file -> !file.equals(latestSegmentInfos.get())) | ||
.forEach(refreshedLocalFiles::remove); | ||
|
||
boolean uploadStatus = uploadNewSegments(refreshedLocalFiles); | ||
if (uploadStatus) { | ||
if (segmentFilesFromSnapshot.equals(new HashSet<>(refreshedLocalFiles))) { | ||
segment_info_snapshot_filename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos); | ||
refreshedLocalFiles.add(segment_info_snapshot_filename); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The uploadSegmentInfosSnapshot
seems to be uploading the file to remoteDirectory
, why do we need to add it to refreshedLocalFiles
(and apparently re-upload again with remoteDirectory.uploadMetadata
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remoteDirectory.uploadMetadata()
uploads only the metadata file which keeps the mapping of files uploaded to remote store.
Filename is added to refreshedLocalFiles
so that metadata file knows about snapshot file uploaded to remote store.
String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException { | ||
long localCheckpoint = indexShard.getEngine().getProcessedLocalCheckpoint(); | ||
String commitGeneration = latestSegmentsNFilename.substring((IndexFileNames.SEGMENTS + "_").length()); | ||
String segment_info_snapshot_filename = SEGMENT_INFO_SNAPSHOT_FILENAME + "__" + commitGeneration + "__" + localCheckpoint; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow the naming conventions
@@ -141,6 +162,18 @@ public void afterRefresh(boolean didRefresh) { | |||
} | |||
} | |||
|
|||
String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException { | |||
long localCheckpoint = indexShard.getEngine().getProcessedLocalCheckpoint(); | |||
String commitGeneration = latestSegmentsNFilename.substring((IndexFileNames.SEGMENTS + "_").length()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IndexFileNames.parseGeneration(latestSegmentsNFilename)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used SegmentInfos.generationFromSegmentsFileName
instead. IndexFileNames.parseGeneration
only works for segment files where name starts with _
70a03a7
to
58a125f
Compare
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Signed-off-by: Sachin Kale <kalsac@amazon.com>
Signed-off-by: Sachin Kale <kalsac@amazon.com>
4547ac4
to
9562461
Compare
Gradle Check (Jenkins) Run Completed with:
|
Yes @gbbafna . Currently, we don't have any integ tests written for remote store feature. Will add. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks LGTM
Description
_remotestore/_restore
API only supports commit level durability.Proposed Solution
SegmentInfosSnapshot
during each refresh and keep the information of the uploaded file in the metadata file.SegmentInfosSnapshot
is present, we use it as aSegmentInfos
file and trigger a commit.SegmentInfos
file), we pass processed checkpoint as part of filename.Issues Resolved
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.