-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move GlobalCheckpointTracker and remove SequenceNumbersService #27837
Conversation
I like this very much. I think we should do it but also take it a step further and total isolate the GlobalCheckpointTracker from the engine. Something like this, wdyt? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Awesome stats.
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException { | ||
final long maxSeqNo; | ||
final long localCheckpoint; | ||
if (openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paranoia - can you use a switch statement with a fallback logic and an exception on default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in ea23a0d
final EngineConfig config = newEngineConfig(openMode); | ||
|
||
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment why it's important to do it before we open the engine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, added in ea23a0d
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk | ||
*/ | ||
public SeqNoStats loadSeqNoStats(final long globalCheckpoint) throws IOException { | ||
public Tuple<Long, Long> loadSeqNoStats() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call this load seqNoInfo? This way people won't be confused about SeqNoStats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -461,7 +462,7 @@ TranslogWriter createWriter(long fileGeneration) throws IOException { | |||
* needed to solve and initialization problem while constructing an empty translog. | |||
* With no readers and no current, a call to {@link #getMinFileGeneration()} would not work. | |||
*/ | |||
private TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen) throws IOException { | |||
private TranslogWriter createWriter(long fileGeneration, long initialMinTranslogGen, long initialGlobalCheckpoint) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java doc update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
final LongSupplier writerGlobalCheckpointSupplier; | ||
if (Assertions.ENABLED) { | ||
writerGlobalCheckpointSupplier = () -> { | ||
long gcp = globalCheckpointSupplier.getAsLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null), seqNoServiceSupplier); | ||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); | ||
final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); | ||
engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
|
||
assertThat( | ||
Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), | ||
equalTo(localCheckpoint)); | ||
initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint | ||
assertThat( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this gone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it tricky to keep these checks in the test with the other changes around Engine not exposing GlobalCheckpointTracker. I also did not like that this test was checking a mixture between local checkpoint and global checkpoint properties (where the global checkpoint properties are mostly trivial, and covered by GlobalCheckpointTrackerTests). I've found a way to reintroduce these checks though (see ea23a0d)
@@ -164,7 +164,9 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An | |||
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), | |||
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), | |||
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), | |||
config.getCircuitBreakerService()); | |||
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier()); | |||
//new GlobalCheckpointTracker(config.getShardId(), config.getAllocationId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left overs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right -> removed
@@ -633,4 +634,8 @@ protected void snapshotShard(final IndexShard shard, | |||
public static Engine getEngine(IndexShard indexShard) { | |||
return indexShard.getEngine(); | |||
} | |||
|
|||
public static GlobalCheckpointTracker getGlobalCheckpointTracker(IndexShard indexShard) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
backport pending waiting on backport of #27826 |
The PR #27837 unintentionally changed to an in memory global checkpoint.
This commit moves GlobalCheckpointTracker from the engine to IndexShard, where it better fits logically: Tracking the global checkpoint based on the local checkpoints of all shards in the replication group is not a property of the engine, but rather a property fulfilled by the current primary shard. The LocalCheckpointTracker on the other hand is driven by the contents of the local translog. By moving GlobalCheckpointTracker to IndexShard, it makes little sense to keep the SequenceNumbersService class around - it would only wrap the LocalCheckpointTracker. This commit therefore removes the class and replaces occurrences of SequenceNumbersService in the engine directly by LocalCheckpointTracker.
Backported to 6.x in 1576a2b |
* es/6.x: (43 commits) ingest: upgraded ingest geoip's geoip2's dependencies. [TEST] logging for update by query test #27820 Use full profile on JDK 10 builds Require Gradle 4.3 Add unreleased v6.1.2 version TEST: reduce blob size #testExecuteMultipartUpload Check index under the store metadata lock (#27768) Upgrade to Lucene 7.2.0. (#27910) Fixed test to be up to date with the new database files. Use `_refresh` to shrink the version map on inactivity (#27918) Make KeyedLock reentrant (#27920) Fixes DocStats to not report index size < -1 (#27863) Disable TestZenDiscovery in cloud providers integrations test ingest: Upgraded the geolite2 databases. [Issue-27716]: CONTRIBUTING.md IntelliJ configurations settings are confusing. (#27717) [Test] Fix IndicesClientDocumentationIT (#27899) Move uid lock into LiveVersionMap (#27905) Mute testRetentionPolicyChangeDuringRecovery Increase Gradle heap space to 1536m Move GlobalCheckpointTracker and remove SequenceNumbersService (#27837) ...
Today we did not set the global checkpoint when opening an engine from an existing store. If we are forced to close an engine before advancing the global checkpoint, we also have to close translog which in turn sync a new checkpoint with an unassigned global checkpoint. This is not caught until the global checkpoint assertion was introduced in PR elastic#27837. This commit tightens the syncNeeded conditions. Relates elastic#27970
This PR moves GlobalCheckpointTracker from the engine to IndexShard, where it better fits logically: Tracking the global checkpoint based on the local checkpoints of all shards in the replication group is not a property of the engine, but rather a property fulfilled by the current primary shard. The LocalCheckpointTracker on the other hand is driven by the contents of the local translog. By moving GlobalCheckpointTracker to IndexShard, it makes little sense to keep the SequenceNumbersService class around - it would only wrap the LocalCheckpointTracker. This PR therefore removes the class and replaces occurrences of SequenceNumbersService in the engine directly by LocalCheckpointTracker.