Skip to content

Commit

Permalink
Cleanup of refresh level durability code
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Nov 15, 2022
1 parent 51b29df commit 3bdc3d5
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,7 @@ public void afterRefresh(boolean didRefresh) {

boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
if (uploadStatus) {
long localCheckpoint_key = indexShard.getEngine().getProcessedLocalCheckpoint();
//long max_sequence_number = indexShard.getEngine().getMaxSeqNoOfUpdatesOrDeletes();
long max_sequence_number = indexShard.getEngine().getProcessedLocalCheckpoint();
segment_info_snapshot_filename = SEGMENT_INFO_SNAPSHOT_FILENAME +
"_" +
latestSegmentInfos.get().substring("segments_".length()) +
"__" +
localCheckpoint_key +
"__" +
max_sequence_number;
IndexOutput indexOutput = storeDirectory.createOutput(segment_info_snapshot_filename, IOContext.DEFAULT);
segmentInfos.write(indexOutput);
indexOutput.close();
storeDirectory.sync(Collections.singleton(segment_info_snapshot_filename));
remoteDirectory.copyFrom(storeDirectory, segment_info_snapshot_filename, segment_info_snapshot_filename, IOContext.DEFAULT, true);
segment_info_snapshot_filename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
refreshedLocalFiles.add(segment_info_snapshot_filename);

remoteDirectory.uploadMetadata(
Expand Down Expand Up @@ -173,6 +159,22 @@ public void afterRefresh(boolean didRefresh) {
}
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
long localCheckpoint = indexShard.getEngine().getProcessedLocalCheckpoint();
String commitGeneration = latestSegmentsNFilename.substring("segments_".length());
String segment_info_snapshot_filename = SEGMENT_INFO_SNAPSHOT_FILENAME +
"__" +
commitGeneration +
"__" +
localCheckpoint;
IndexOutput indexOutput = storeDirectory.createOutput(segment_info_snapshot_filename, IOContext.DEFAULT);
segmentInfosSnapshot.write(indexOutput);
indexOutput.close();
storeDirectory.sync(Collections.singleton(segment_info_snapshot_filename));
remoteDirectory.copyFrom(storeDirectory, segment_info_snapshot_filename, segment_info_snapshot_filename, IOContext.DEFAULT, true);
return segment_info_snapshot_filename;
}

// Visible for testing
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,28 +466,23 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
for (String file : storeDirectory.listAll()) {
storeDirectory.deleteFile(file);
}
String[] remoteFiles = remoteDirectory.listAll();
String segmentInfosSnapshotFilename = null;
String segmentsNFileName = null;
for (String file : remoteFiles) {
for (String file : remoteDirectory.listAll()) {
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
if(file.startsWith("segment_infos_snapshot_filename")) {
segmentInfosSnapshotFilename = file;
}
if(file.startsWith("segments_")) {
segmentsNFileName = file;
}
}

SegmentInfos infos_snapshot = SegmentInfos.readCommit(
store.directory(),
new BufferedChecksumIndexInput(storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)),
Integer.parseInt(segmentsNFileName.substring("segments_".length()))
Integer.parseInt(segmentInfosSnapshotFilename.split("__")[1])
);

store.commitSegmentInfos(infos_snapshot,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[2]),
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1]));
Long.parseLong(segmentInfosSnapshotFilename.split("__")[2]));

// This creates empty trans-log for now
// ToDo: Add code to restore from remote trans-log
Expand Down

0 comments on commit 3bdc3d5

Please sign in to comment.