Skip to content

Commit

Permalink
test: make IT test 1.2 compatible
Browse files Browse the repository at this point in the history
In order to work with the old transition logic, the regression test need to take snapshots so we restart from the snapshot and not replay the complete log on the new leader
  • Loading branch information
ChrisKujawa committed Nov 5, 2021
1 parent 9dcf828 commit 6b81b1d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -774,26 +775,48 @@ public void takeSnapshot(final Broker broker) {
broker.getBrokerAdminService().takeSnapshot();
}

public void triggerAndWaitForSnapshots() {
/**
* Verifies and awaits that the same snapshot is taken on all nodes. It will not do any
* sophisticated computation in order to determine it is the same. The snapshot metadata contain
* the exporter and processing position, which should be the same (equal). If this is the case the
* snapshot is expected to be the same.
*/
public void awaitSameSnapshotOnAllNodes() {
Awaitility.await("Await the same snapshot on all nodes.")
.timeout(Duration.ofMinutes(1))
.until(
this::triggerAndWaitForSnapshots,
(snapshotList) -> {
final var distinctCount = snapshotList.stream().distinct().count();
return snapshotList.size() == brokers.size() && distinctCount == 1;
});
}

public ArrayList<SnapshotId> triggerAndWaitForSnapshots() {
// Ensure that the exporter positions are distributed to the followers
getClock().addTime(ExporterDirectorContext.DEFAULT_DISTRIBUTION_INTERVAL);
getBrokers().stream()
.map(Broker::getBrokerAdminService)
.forEach(BrokerAdminService::takeSnapshot);

final var snapshots = new ArrayList<SnapshotId>();
getBrokers()
.forEach(
broker ->
Awaitility.await()
.pollInterval(2, TimeUnit.SECONDS)
.timeout(60, TimeUnit.SECONDS)
.until(
() -> {
// Trigger snapshot again in case snapshot is not already taken
broker.getBrokerAdminService().takeSnapshot();
return getSnapshot(broker);
},
Optional::isPresent));
broker -> {
final var brokerSnapshot =
Awaitility.await()
.pollInterval(2, TimeUnit.SECONDS)
.timeout(60, TimeUnit.SECONDS)
.until(
() -> {
// Trigger snapshot again in case snapshot is not already taken
broker.getBrokerAdminService().takeSnapshot();
return getSnapshot(broker);
},
Optional::isPresent);
brokerSnapshot.ifPresent(snapshots::add);
});
return snapshots;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ public void shouldFormClusterEvenWhenMissingEvents() {
}

@Test
// regression test https://github.com/camunda-cloud/zeebe/issues/8129
public void shouldNotProduceDuplicatedKeys() {
// given
// we produce some records on the old leader
final var previousLeaderId = clusteringRule.getLeaderForPartition(1).getNodeId();
final var previousLeader = clusteringRule.getBroker(previousLeaderId);
client.newDeployCommand().addProcessModel(PROCESS, PROCESS_RESOURCE_NAME).send().join();
Expand All @@ -195,16 +197,14 @@ public void shouldNotProduceDuplicatedKeys() {
.send()
.join();

// disconnect leader - becomes follower
// since the snapshot is the same on all nodes we know that all have the same state
// and processed or replayed everything
clusteringRule.awaitSameSnapshotOnAllNodes();

// we disconnect the leader and step down
clusteringRule.disconnect(previousLeader);
clusteringRule.stepDown(previousLeader, 1);
// remove old log content
RecordingExporter.reset();
final var newLeaderInfo = clusteringRule.awaitOtherLeader(1, previousLeaderId);
final var newLeaderId = newLeaderInfo.getNodeId();
assertThat(newLeaderId).isNotEqualTo(previousLeaderId);

// old log is replayed on new leader so we can collect keys
final var previousKeys =
RecordingExporter.processInstanceRecords()
.limitToProcessInstanceCompleted()
Expand All @@ -213,10 +213,15 @@ public void shouldNotProduceDuplicatedKeys() {
.map(Record::getKey)
.filter(k -> k != -1L)
.collect(Collectors.toList());

// remove all content again
// remove old exported records content - for easier verification later
RecordingExporter.reset();

// when
// we await a new leader
final var newLeaderInfo = clusteringRule.awaitOtherLeader(1, previousLeaderId);
final var newLeaderId = newLeaderInfo.getNodeId();
assertThat(newLeaderId).isNotEqualTo(previousLeaderId);

// produce new stuff on new leader
client
.newCreateInstanceCommand()
Expand All @@ -235,8 +240,10 @@ public void shouldNotProduceDuplicatedKeys() {
.filter(k -> k != -1L)
.toArray(Long[]::new);

// then
assertThat(newKeys).isNotEmpty();
assertThat(previousKeys)
.isNotEmpty()
.describedAs("Keys should always be unique for different entities.")
.doesNotContain(newKeys);
}
Expand Down

0 comments on commit 6b81b1d

Please sign in to comment.