Skip to content

Commit

Permalink
test: add duplicated key IT test
Browse files Browse the repository at this point in the history
Regression test which verifies that no duplicated keys are generated on the log, after fail over
  • Loading branch information
ChrisKujawa committed Nov 5, 2021
1 parent 63412f1 commit 6e27744
Showing 1 changed file with 61 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -179,6 +181,65 @@ public void shouldFormClusterEvenWhenMissingEvents() {
clusteringRule.waitForSnapshotAtBroker(previousLeader);
}

@Test
public void shouldNotProduceDuplicatedKeys() {
// given
final var previousLeaderId = clusteringRule.getLeaderForPartition(1).getNodeId();
final var previousLeader = clusteringRule.getBroker(previousLeaderId);
client.newDeployCommand().addProcessModel(PROCESS, PROCESS_RESOURCE_NAME).send().join();
client
.newCreateInstanceCommand()
.bpmnProcessId("process")
.latestVersion()
.withResult()
.send()
.join();

// disconnect leader - becomes follower
clusteringRule.disconnect(previousLeader);
clusteringRule.stepDown(previousLeader, 1);
// remove old log content
RecordingExporter.reset();
final var newLeaderInfo = clusteringRule.awaitOtherLeader(1, previousLeaderId);
final var newLeaderId = newLeaderInfo.getNodeId();
assertThat(newLeaderId).isNotEqualTo(previousLeaderId);

// old log is replayed on new leader so we can collect keys
final var previousKeys =
RecordingExporter.processInstanceRecords()
.limitToProcessInstanceCompleted()
.collect(Collectors.toList())
.stream()
.map(Record::getKey)
.filter(k -> k != -1L)
.collect(Collectors.toList());

// remove all content again
RecordingExporter.reset();

// produce new stuff on new leader
client
.newCreateInstanceCommand()
.bpmnProcessId("process")
.latestVersion()
.withResult()
.send()
.join();

final var newKeys =
RecordingExporter.processInstanceRecords()
.limitToProcessInstanceCompleted()
.collect(Collectors.toList())
.stream()
.map(Record::getKey)
.filter(k -> k != -1L)
.toArray(Long[]::new);

assertThat(previousKeys)
.describedAs("Keys should always be unique for different entities.")
.doesNotContain(newKeys);
}

private int getNodeId(final Broker broker) {
return broker.getConfig().getCluster().getNodeId();
}
Expand Down

0 comments on commit 6e27744

Please sign in to comment.