Skip to content

Commit

Permalink
Make remote translog store path consistent to remote segment store (o…
Browse files Browse the repository at this point in the history
…pensearch-project#7947)

Signed-off-by: bansvaru <bansvaru@amazon.com>
  • Loading branch information
linuxpi authored Jun 8, 2023
1 parent c58dc1a commit e6348c5
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class RemoteFsTranslog extends Translog {
private final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

private static final int REMOTE_DELETION_PERMITS = 2;
public static final String TRANSLOG = "translog";

// Semaphore used to allow only single remote generation to happen at a time
private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS);
Expand Down Expand Up @@ -167,7 +168,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
return new TranslogTransferManager(
shardId,
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG),
fileTransferTracker
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class TranslogTransferManager {

private final ShardId shardId;
private final TransferService transferService;
private final BlobPath remoteBaseTransferPath;
private final BlobPath remoteDataTransferPath;
private final BlobPath remoteMetadataTransferPath;
private final FileTransferTracker fileTransferTracker;

Expand All @@ -59,17 +59,18 @@ public class TranslogTransferManager {
private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class);

private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";

public TranslogTransferManager(
ShardId shardId,
TransferService transferService,
BlobPath remoteBaseTransferPath,
BlobPath remoteDataTransferPath,
FileTransferTracker fileTransferTracker
) {
this.shardId = shardId;
this.transferService = transferService;
this.remoteBaseTransferPath = remoteBaseTransferPath;
this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR);
this.remoteDataTransferPath = remoteDataTransferPath.add(DATA_DIR);
this.remoteMetadataTransferPath = remoteDataTransferPath.add(METADATA_DIR);
this.fileTransferTracker = fileTransferTracker;
}

Expand Down Expand Up @@ -110,7 +111,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
fileSnapshot -> transferService.uploadBlobAsync(
ThreadPool.Names.TRANSLOG_TRANSFER,
fileSnapshot,
remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())),
remoteDataTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())),
latchedActionListener
)
);
Expand Down Expand Up @@ -164,7 +165,7 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th
if (Files.exists(filePath)) {
Files.delete(filePath);
}
try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) {
try (InputStream inputStream = transferService.downloadBlob(remoteDataTransferPath.add(primaryTerm), fileName)) {
Files.copy(inputStream, filePath);
}
// Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync
Expand Down Expand Up @@ -238,7 +239,7 @@ public void deleteGenerationAsync(long primaryTerm, Set<Long> generations, Runna
*/
public void deletePrimaryTermsAsync(long minPrimaryTermToKeep) {
logger.info("Deleting primary terms from remote store lesser than {} for {}", minPrimaryTermToKeep, shardId);
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() {
transferService.listFoldersAsync(ThreadPool.Names.REMOTE_PURGE, remoteDataTransferPath, new ActionListener<>() {
@Override
public void onResponse(Set<String> folders) {
Set<Long> primaryTermsInRemote = folders.stream().filter(folderName -> {
Expand Down Expand Up @@ -271,7 +272,7 @@ public void onFailure(Exception e) {
private void deletePrimaryTermAsync(long primaryTerm) {
transferService.deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteBaseTransferPath.add(String.valueOf(primaryTerm)),
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
Expand Down Expand Up @@ -317,7 +318,7 @@ private void deleteTranslogFilesAsync(long primaryTerm, List<String> files, Runn
try {
transferService.deleteBlobsAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteBaseTransferPath.add(String.valueOf(primaryTerm)),
remoteDataTransferPath.add(String.valueOf(primaryTerm)),
files,
new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.opensearch.index.translog.RemoteFsTranslog.TRANSLOG;
import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;

Expand All @@ -111,6 +112,7 @@ public class RemoteFSTranslogTests extends OpenSearchTestCase {
private final AtomicReference<LongConsumer> persistedSeqNoConsumer = new AtomicReference<>();
private ThreadPool threadPool;
private final static String METADATA_DIR = "metadata";
private final static String DATA_DIR = "data";
BlobStoreRepository repository;

BlobStoreTransferService blobStoreTransferService;
Expand Down Expand Up @@ -483,22 +485,17 @@ public void testSimpleOperationsUpload() throws Exception {
translog.rollGeneration();
assertEquals(6, translog.allUploaded().size());

Set<String> mdFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")
);
Set<String> mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertEquals(2, mdFiles.size());
logger.info("All md files {}", mdFiles);

Set<String> tlogFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get()))
getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))
);
logger.info("All data files {}", tlogFiles);

// assert content of ckp and tlog files
BlobPath path = repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()));
BlobPath path = getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()));
for (TranslogReader reader : translog.readers) {
final long readerGeneration = reader.getGeneration();
logger.error("Asserting content of {}", readerGeneration);
Expand Down Expand Up @@ -533,12 +530,7 @@ public void testSimpleOperationsUpload() throws Exception {
assertEquals(4, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size()
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

Expand All @@ -551,12 +543,7 @@ public void testSimpleOperationsUpload() throws Exception {
assertEquals(4, translog.allUploaded().size());
assertEquals(
4,
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size()
blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size()
);
});

Expand All @@ -583,14 +570,7 @@ public void testMetadataFileDeletion() throws Exception {
assertEquals(1, translog.readers.size());
}
assertBusy(() -> assertEquals(4, translog.allUploaded().size()));
assertBusy(
() -> assertEquals(
2,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));
int moreDocs = randomIntBetween(3, 10);
logger.info("numDocs={} moreDocs={}", numDocs, moreDocs);
for (int i = numDocs; i < numDocs + moreDocs; i++) {
Expand All @@ -599,14 +579,7 @@ public void testMetadataFileDeletion() throws Exception {
translog.trimUnreferencedReaders();
assertEquals(1 + moreDocs, translog.readers.size());
assertBusy(() -> assertEquals(2 + 2L * moreDocs, translog.allUploaded().size()));
assertBusy(
() -> assertEquals(
1 + moreDocs,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(1 + moreDocs, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

int totalDocs = numDocs + moreDocs;
translog.setMinSeqNoToKeep(totalDocs - 1);
Expand All @@ -619,14 +592,7 @@ public void testMetadataFileDeletion() throws Exception {
);
translog.setMinSeqNoToKeep(totalDocs);
translog.trimUnreferencedReaders();
assertBusy(
() -> assertEquals(
2,
blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
).size()
)
);
assertBusy(() -> assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()));

// Change primary term and test the deletion of older primaries
String translogUUID = translog.translogUUID;
Expand All @@ -642,9 +608,7 @@ public void testMetadataFileDeletion() throws Exception {
long newPrimaryTerm = primaryTerm.incrementAndGet();

// Check all metadata files corresponds to old primary term
Set<String> mdFileNames = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
);
Set<String> mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(oldPrimaryTerm).concat("__"))));

// Creating RemoteFsTranslog with the same location
Expand All @@ -658,9 +622,7 @@ public void testMetadataFileDeletion() throws Exception {
}

// Check that all metadata files are belonging now to the new primary
mdFileNames = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR)
);
mdFileNames = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR));
assertTrue(mdFileNames.stream().allMatch(name -> name.startsWith(String.valueOf(newPrimaryTerm).concat("__"))));

try {
Expand All @@ -671,6 +633,10 @@ public void testMetadataFileDeletion() throws Exception {
}
}

private BlobPath getTranslogDirectory() {
return repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG);
}

private Long populateTranslogOps(boolean withMissingOps) throws IOException {
long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
Expand Down

0 comments on commit e6348c5

Please sign in to comment.