Skip to content

Commit

Permalink
merge: #8158
Browse files Browse the repository at this point in the history
8158: [Backport 1.2]  Update the key generator on replay  r=npepinpe a=Zelldon

## Description

Backports the fix from #8156 and adjusts the IT test to get it work with 1.2. Snapshot need to be taken so the new leader will not replay the complete log and start with the latest state he had.


<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #8129



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
3 people authored Nov 5, 2021
2 parents c24deba + 6b81b1d commit aa69521
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public final class ReplayStateMachine implements LogRecordAwaiter {
private long batchSourceEventPosition = StreamProcessor.UNSET_POSITION;

private long snapshotPosition;
private long highestRecordKey = -1L;
private long lastReadRecordPosition = StreamProcessor.UNSET_POSITION;
private long lastReplayedEventPosition = StreamProcessor.UNSET_POSITION;

Expand Down Expand Up @@ -234,9 +233,6 @@ private void replayEvent(final LoggedEvent currentEvent) {
* the last source event, which has caused the last applied event.
*/
private void onRecordsReplayed() {
// restore the key generate with the highest key from the log
keyGeneratorControls.setKeyIfHigher(highestRecordKey);

LOG.info(LOG_STMT_REPLAY_FINISHED, lastReadRecordPosition);
recoveryFuture.complete(lastSourceEventPosition);
}
Expand Down Expand Up @@ -267,8 +263,7 @@ private void onRecordReplayed(final LoggedEvent currentEvent) {

// records from other partitions should not influence the key generator of this partition
if (Protocol.decodePartitionId(currentRecordKey) == zeebeState.getPartitionId()) {
// remember the highest key on the stream to restore the key generator after replay
highestRecordKey = Math.max(currentRecordKey, highestRecordKey);
keyGeneratorControls.setKeyIfHigher(currentRecordKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ private void assertStates() {
processingState.entrySet().stream()
.filter(entry -> entry.getKey() != ZbColumnFamilies.DEFAULT)
// ignores transient states
.filter(entry -> entry.getKey() != ZbColumnFamilies.KEY)
// on followers we don't need to reset the key
// this will happen anyway then on leader replay
.forEach(
entry -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -774,26 +775,48 @@ public void takeSnapshot(final Broker broker) {
broker.getBrokerAdminService().takeSnapshot();
}

public void triggerAndWaitForSnapshots() {
/**
* Verifies and awaits that the same snapshot is taken on all nodes. It will not do any
* sophisticated computation in order to determine it is the same. The snapshot metadata contain
* the exporter and processing position, which should be the same (equal). If this is the case the
* snapshot is expected to be the same.
*/
public void awaitSameSnapshotOnAllNodes() {
Awaitility.await("Await the same snapshot on all nodes.")
.timeout(Duration.ofMinutes(1))
.until(
this::triggerAndWaitForSnapshots,
(snapshotList) -> {
final var distinctCount = snapshotList.stream().distinct().count();
return snapshotList.size() == brokers.size() && distinctCount == 1;
});
}

public ArrayList<SnapshotId> triggerAndWaitForSnapshots() {
// Ensure that the exporter positions are distributed to the followers
getClock().addTime(ExporterDirectorContext.DEFAULT_DISTRIBUTION_INTERVAL);
getBrokers().stream()
.map(Broker::getBrokerAdminService)
.forEach(BrokerAdminService::takeSnapshot);

final var snapshots = new ArrayList<SnapshotId>();
getBrokers()
.forEach(
broker ->
Awaitility.await()
.pollInterval(2, TimeUnit.SECONDS)
.timeout(60, TimeUnit.SECONDS)
.until(
() -> {
// Trigger snapshot again in case snapshot is not already taken
broker.getBrokerAdminService().takeSnapshot();
return getSnapshot(broker);
},
Optional::isPresent));
broker -> {
final var brokerSnapshot =
Awaitility.await()
.pollInterval(2, TimeUnit.SECONDS)
.timeout(60, TimeUnit.SECONDS)
.until(
() -> {
// Trigger snapshot again in case snapshot is not already taken
broker.getBrokerAdminService().takeSnapshot();
return getSnapshot(broker);
},
Optional::isPresent);
brokerSnapshot.ifPresent(snapshots::add);
});
return snapshots;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -179,6 +181,73 @@ public void shouldFormClusterEvenWhenMissingEvents() {
clusteringRule.waitForSnapshotAtBroker(previousLeader);
}

@Test
// regression test https://github.com/camunda-cloud/zeebe/issues/8129
public void shouldNotProduceDuplicatedKeys() {
// given
// we produce some records on the old leader
final var previousLeaderId = clusteringRule.getLeaderForPartition(1).getNodeId();
final var previousLeader = clusteringRule.getBroker(previousLeaderId);
client.newDeployCommand().addProcessModel(PROCESS, PROCESS_RESOURCE_NAME).send().join();
client
.newCreateInstanceCommand()
.bpmnProcessId("process")
.latestVersion()
.withResult()
.send()
.join();

// since the snapshot is the same on all nodes we know that all have the same state
// and processed or replayed everything
clusteringRule.awaitSameSnapshotOnAllNodes();

// we disconnect the leader and step down
clusteringRule.disconnect(previousLeader);
clusteringRule.stepDown(previousLeader, 1);

final var previousKeys =
RecordingExporter.processInstanceRecords()
.limitToProcessInstanceCompleted()
.collect(Collectors.toList())
.stream()
.map(Record::getKey)
.filter(k -> k != -1L)
.collect(Collectors.toList());
// remove old exported records content - for easier verification later
RecordingExporter.reset();

// when
// we await a new leader
final var newLeaderInfo = clusteringRule.awaitOtherLeader(1, previousLeaderId);
final var newLeaderId = newLeaderInfo.getNodeId();
assertThat(newLeaderId).isNotEqualTo(previousLeaderId);

// produce new stuff on new leader
client
.newCreateInstanceCommand()
.bpmnProcessId("process")
.latestVersion()
.withResult()
.send()
.join();

final var newKeys =
RecordingExporter.processInstanceRecords()
.limitToProcessInstanceCompleted()
.collect(Collectors.toList())
.stream()
.map(Record::getKey)
.filter(k -> k != -1L)
.toArray(Long[]::new);

// then
assertThat(newKeys).isNotEmpty();
assertThat(previousKeys)
.isNotEmpty()
.describedAs("Keys should always be unique for different entities.")
.doesNotContain(newKeys);
}

private int getNodeId(final Broker broker) {
return broker.getConfig().getCluster().getNodeId();
}
Expand Down

0 comments on commit aa69521

Please sign in to comment.