Skip to content

Commit

Permalink
Adding node id to segment and translog metadata
Browse files Browse the repository at this point in the history
Adding validation to identify multiple writers to same primary term and
generation in remote store

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Sep 26, 2023
1 parent a8969cb commit e1ff550
Show file tree
Hide file tree
Showing 22 changed files with 331 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,15 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
}
}
};
final IndexShard newShard = newIndexShard(indexService, shard, wrapper, getInstanceFromNode(CircuitBreakerService.class), listener);
NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
final IndexShard newShard = newIndexShard(
indexService,
shard,
wrapper,
getInstanceFromNode(CircuitBreakerService.class),
env.nodeId(),
listener
);
shardRef.set(newShard);
recoverShard(newShard);

Expand All @@ -674,6 +682,7 @@ public static final IndexShard newIndexShard(
final IndexShard shard,
CheckedFunction<DirectoryReader, DirectoryReader, IOException> wrapper,
final CircuitBreakerService cbs,
final String nodeId,
final IndexingOperationListener... listeners
) throws IOException {
ShardRouting initializingShardRouting = getInitializingShardRouting(shard.routingEntry());
Expand Down Expand Up @@ -702,7 +711,8 @@ public static final IndexShard newIndexShard(
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId
);
}

Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ public synchronized IndexShard createShard(
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier
clusterRemoteTranslogBufferIntervalSupplier,
nodeEnv.nodeId()
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ Runnable getGlobalCheckpointSyncer() {

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();

private final String nodeId;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand All @@ -367,7 +369,8 @@ public IndexShard(
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier,
final String nodeId
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -413,7 +416,7 @@ public IndexShard(
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId);
final String aId = shardRouting.allocationId().getId();
final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
Expand Down Expand Up @@ -463,6 +466,7 @@ public boolean shouldCache(Query query) {
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
this.nodeId = nodeId;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -556,6 +560,10 @@ protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() {
return remoteStoreStatsTrackerFactory;
}

public String getNodeId() {
return nodeId;
}

@Override
public void updateShardState(
final ShardRouting newRouting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
segmentInfosSnapshot,
storeDirectory,
translogFileGeneration,
replicationCheckpoint
replicationCheckpoint,
indexShard.getNodeId()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.util.Version;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
Expand All @@ -48,6 +49,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -109,6 +111,8 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory implement

private final AtomicLong metadataUploadCounter = new AtomicLong(0);

public static final int METADATA_FILES_TO_FETCH = 100;

public RemoteSegmentStoreDirectory(
RemoteDirectory remoteDataDirectory,
RemoteDirectory remoteMetadataDirectory,
Expand Down Expand Up @@ -178,9 +182,11 @@ public RemoteSegmentMetadata readLatestMetadataFile() throws IOException {

List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
1
METADATA_FILES_TO_FETCH
);

verifyMultipleWriters(metadataFiles);

if (metadataFiles.isEmpty() == false) {
String latestMetadataFile = metadataFiles.get(0);
logger.trace("Reading latest Metadata file {}", latestMetadataFile);
Expand All @@ -193,12 +199,34 @@ public RemoteSegmentMetadata readLatestMetadataFile() throws IOException {
}

private RemoteSegmentMetadata readMetadataFile(String metadataFilename) throws IOException {

try (InputStream inputStream = remoteMetadataDirectory.getBlobStream(metadataFilename)) {
byte[] metadataBytes = inputStream.readAllBytes();
return metadataStreamWrapper.readStream(new ByteArrayIndexInput(metadataFilename, metadataBytes));
}
}

// Visible for testing
public static void verifyMultipleWriters(List<String> mdFiles) {
Map<Tuple<Long, Long>, String> nodesByPrimaryTermAndGeneration = new HashMap<>();
mdFiles.forEach(mdFile -> {
Tuple<Tuple<Long, Long>, String> nodeIdByPrimaryTermAndGeneration = MetadataFilenameUtils.getNodeIdByPrimaryTermAndGeneration(
mdFile
);
if (nodeIdByPrimaryTermAndGeneration != null
&& nodesByPrimaryTermAndGeneration.get(nodeIdByPrimaryTermAndGeneration.v1()) != null
&& !Objects.equals(
nodesByPrimaryTermAndGeneration.get(nodeIdByPrimaryTermAndGeneration.v1()),
nodeIdByPrimaryTermAndGeneration.v2()
)) {
throw new IllegalStateException("Multiple metadata files having same primary term and generations detected");
}
if (nodeIdByPrimaryTermAndGeneration != null) {
nodesByPrimaryTermAndGeneration.put(nodeIdByPrimaryTermAndGeneration.v1(), nodeIdByPrimaryTermAndGeneration.v2());
}
});
}

/**
* Metadata of a segment that is uploaded to remote segment store.
*/
Expand Down Expand Up @@ -302,7 +330,8 @@ static String getMetadataFilename(
long generation,
long translogGeneration,
long uploadCounter,
int metadataVersion
int metadataVersion,
String nodeId
) {
return String.join(
SEPARATOR,
Expand All @@ -312,7 +341,9 @@ static String getMetadataFilename(
RemoteStoreUtils.invertLong(translogGeneration),
RemoteStoreUtils.invertLong(uploadCounter),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf(metadataVersion)
String.valueOf(metadataVersion),
UUIDs.base64UUID(),
nodeId
);
}

Expand All @@ -325,6 +356,15 @@ static long getPrimaryTerm(String[] filenameTokens) {
static long getGeneration(String[] filenameTokens) {
return RemoteStoreUtils.invertLong(filenameTokens[2]);
}

public static Tuple<Tuple<Long, Long>, String> getNodeIdByPrimaryTermAndGeneration(String filename) {
String[] tokens = filename.split(SEPARATOR);
if (tokens.length < 9) {
// For versions < 2.11, we don't have node id.
return null;
}
return new Tuple<>(new Tuple<>(RemoteStoreUtils.invertLong(tokens[1]), RemoteStoreUtils.invertLong(tokens[2])), tokens[8]);
}
}

/**
Expand Down Expand Up @@ -591,15 +631,17 @@ public void uploadMetadata(
SegmentInfos segmentInfosSnapshot,
Directory storeDirectory,
long translogGeneration,
ReplicationCheckpoint replicationCheckpoint
ReplicationCheckpoint replicationCheckpoint,
String nodeId
) throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(
replicationCheckpoint.getPrimaryTerm(),
segmentInfosSnapshot.getGeneration(),
translogGeneration,
metadataUploadCounter.incrementAndGet(),
RemoteSegmentMetadata.CURRENT_VERSION
RemoteSegmentMetadata.CURRENT_VERSION,
nodeId
);
try {
try (IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public RemoteFsTranslog(
this.primaryModeSupplier = primaryModeSupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);

this.translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
Expand Down Expand Up @@ -321,7 +322,8 @@ private boolean upload(Long primaryTerm, Long generation) throws IOException {
generation,
location,
readers,
Translog::getCommitCheckpointFileName
Translog::getCommitCheckpointFileName,
config.getNodeId()
).build()
) {
return translogTransferManager.transferSnapshot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,33 @@ public final class TranslogConfig {
private final Path translogPath;
private final ByteSizeValue bufferSize;

private final String nodeId;

/**
* Creates a new TranslogConfig instance
* @param shardId the shard ID this translog belongs to
* @param translogPath the path to use for the transaction log files
* @param indexSettings the index settings used to set internal variables
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE);
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId);
}

TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, ByteSizeValue bufferSize) {
TranslogConfig(
ShardId shardId,
Path translogPath,
IndexSettings indexSettings,
BigArrays bigArrays,
ByteSizeValue bufferSize,
String nodeId
) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
this.shardId = shardId;
this.translogPath = translogPath;
this.bigArrays = bigArrays;
this.nodeId = nodeId;
}

/**
Expand Down Expand Up @@ -110,4 +120,8 @@ public Path getTranslogPath() {
public ByteSizeValue getBufferSize() {
return bufferSize;
}

public String getNodeId() {
return nodeId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState,
shardPath.getShardId(),
translogPath,
indexSettings,
BigArrays.NON_RECYCLING_INSTANCE
BigArrays.NON_RECYCLING_INSTANCE,
""
);
long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id());
// We open translog to check for corruption, do not clean anything.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ public class TranslogCheckpointTransferSnapshot implements TransferSnapshot, Clo
private final long primaryTerm;
private long minTranslogGeneration;

TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) {
private String nodeId;

TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size, String nodeId) {
translogCheckpointFileInfoTupleSet = new HashSet<>(size);
this.size = size;
this.generation = generation;
this.primaryTerm = primaryTerm;
this.nodeId = nodeId;
}

private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) {
Expand All @@ -63,7 +66,13 @@ public Set<TransferFileSnapshot> getTranslogFileSnapshots() {

@Override
public TranslogTransferMetadata getTranslogTransferMetadata() {
return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, translogCheckpointFileInfoTupleSet.size() * 2);
return new TranslogTransferMetadata(
primaryTerm,
generation,
minTranslogGeneration,
translogCheckpointFileInfoTupleSet.size() * 2,
nodeId
);
}

@Override
Expand Down Expand Up @@ -110,19 +119,22 @@ public static class Builder {
private final List<TranslogReader> readers;
private final Function<Long, String> checkpointGenFileNameMapper;
private final Path location;
private final String nodeId;

public Builder(
long primaryTerm,
long generation,
Path location,
List<TranslogReader> readers,
Function<Long, String> checkpointGenFileNameMapper
Function<Long, String> checkpointGenFileNameMapper,
String nodeId
) {
this.primaryTerm = primaryTerm;
this.generation = generation;
this.readers = readers;
this.checkpointGenFileNameMapper = checkpointGenFileNameMapper;
this.location = location;
this.nodeId = nodeId;
}

public TranslogCheckpointTransferSnapshot build() throws IOException {
Expand All @@ -134,7 +146,8 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
TranslogCheckpointTransferSnapshot translogTransferSnapshot = new TranslogCheckpointTransferSnapshot(
primaryTerm,
generation,
readers.size()
readers.size(),
nodeId
);
for (TranslogReader reader : readers) {
final long readerGeneration = reader.getGeneration();
Expand Down
Loading

0 comments on commit e1ff550

Please sign in to comment.