Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of time of backup latest ACK on primary #3769

Merged
merged 18 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .daily_canary
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Cui cui cui.
Cui cui cui
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## Unreleased

### Added

- Primary node now also reports time at which the ack from each backup node was last received (`GET /node/consensus` endpoint). This can be used by operators to detect one-way partitions between the primary and backup nodes (#3769).

## [2.0.0-rc7]

### Fixed
Expand Down
23 changes: 22 additions & 1 deletion doc/schemas/node_openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
"ConsensusDetails": {
"properties": {
"acks": {
"$ref": "#/components/schemas/NodeId_to_uint64"
"$ref": "#/components/schemas/NodeId_to_ConsensusDetails__Ack"
},
"configs": {
"$ref": "#/components/schemas/Configuration_array"
Expand Down Expand Up @@ -107,6 +107,21 @@
],
"type": "object"
},
"ConsensusDetails__Ack": {
"properties": {
"last_received_ms": {
"$ref": "#/components/schemas/uint64"
},
"seqno": {
"$ref": "#/components/schemas/uint64"
}
},
"required": [
"seqno",
"last_received_ms"
],
"type": "object"
},
"ConsensusNodeConfig": {
"properties": {
"address": {
Expand Down Expand Up @@ -445,6 +460,12 @@
},
"type": "object"
},
"NodeId_to_ConsensusDetails__Ack": {
"additionalProperties": {
"$ref": "#/components/schemas/ConsensusDetails__Ack"
},
"type": "object"
},
"NodeId_to_uint64": {
"additionalProperties": {
"$ref": "#/components/schemas/uint64"
Expand Down
47 changes: 37 additions & 10 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ namespace aft
// the highest matching index with the node that was confirmed
jumaffre marked this conversation as resolved.
Show resolved Hide resolved
Index match_idx;

// timeout tracking the last time an ack was received from the node
std::chrono::milliseconds last_ack_timeout;

NodeState() = default;

NodeState(
Expand All @@ -52,7 +55,8 @@ namespace aft
Index match_idx_ = 0) :
node_info(node_info_),
sent_idx(sent_idx_),
match_idx(match_idx_)
match_idx(match_idx_),
last_ack_timeout(0)
{}
};

Expand Down Expand Up @@ -577,6 +581,15 @@ namespace aft
}
}

void reset_last_ack_timeouts()
{
for (auto& node : nodes)
{
using namespace std::chrono_literals;
node.second.last_ack_timeout = 0ms;
}
}

// For more info about Observed Reconfiguration Commits see
// https://microsoft.github.io/CCF/main/architecture/consensus/2tx-reconfig.html
//
Expand Down Expand Up @@ -679,7 +692,8 @@ namespace aft
}
for (auto& [k, v] : nodes)
{
details.acks[k] = v.match_idx;
details.acks[k] = {
v.match_idx, static_cast<size_t>(v.last_ack_timeout.count())};
}
details.reconfiguration_type = reconfiguration_type;
if (reconfiguration_type == ReconfigurationType::TWO_TRANSACTION)
Expand Down Expand Up @@ -874,6 +888,11 @@ namespace aft
send_append_entries(it.first, it.second.sent_idx + 1);
}
}

for (auto& node : nodes)
{
node.second.last_ack_timeout += elapsed;
}
}
else if (consensus_type != ConsensusType::BFT)
{
Expand Down Expand Up @@ -1461,7 +1480,11 @@ namespace aft
from);
return;
}
else if (state->current_view < r.term)

using namespace std::chrono_literals;
node->second.last_ack_timeout = 0ms;

if (state->current_view < r.term)
{
// We are behind, update our state.
LOG_DEBUG_FMT(
Expand Down Expand Up @@ -1744,16 +1767,17 @@ namespace aft
state->current_view++;

restart_election_timeout();
reset_last_ack_timeouts();
add_vote_for_me(state->my_node_id);

LOG_INFO_FMT(
"Becoming candidate {}: {}", state->my_node_id, state->current_view);

if (consensus_type != ConsensusType::BFT)
{
for (auto it = nodes.begin(); it != nodes.end(); ++it)
for (auto const& node : nodes)
{
send_request_vote(it->first);
send_request_vote(node.first);
}
}
}
Expand Down Expand Up @@ -1791,6 +1815,8 @@ namespace aft
using namespace std::chrono_literals;
timeout_elapsed = 0ms;

reset_last_ack_timeouts();

LOG_INFO_FMT(
"Becoming leader {}: {}", state->my_node_id, state->current_view);

Expand All @@ -1804,13 +1830,13 @@ namespace aft
// Reset next, match, and sent indices for all nodes.
auto next = state->last_idx + 1;

for (auto it = nodes.begin(); it != nodes.end(); ++it)
for (auto& node : nodes)
{
it->second.match_idx = 0;
it->second.sent_idx = next - 1;
node.second.match_idx = 0;
node.second.sent_idx = next - 1;

// Send an empty append_entries to all nodes.
send_append_entries(it->first, next);
send_append_entries(node.first, next);
}
}

Expand Down Expand Up @@ -1840,6 +1866,7 @@ namespace aft
voted_for.reset();
votes_for_me.clear();
clear_orc_sets();
reset_last_ack_timeouts();

rollback(last_committable_index());

Expand Down Expand Up @@ -2428,7 +2455,7 @@ namespace aft
// configuration.
std::vector<ccf::NodeId> to_remove;

for (auto& node : nodes)
for (const auto& node : nodes)
{
if (active_nodes.find(node.first) == active_nodes.end())
{
Expand Down
11 changes: 10 additions & 1 deletion src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,14 @@ namespace kv

struct ConsensusDetails
{
struct Ack
{
ccf::SeqNo seqno;
size_t last_received_ms;
};

std::vector<Configuration> configs = {};
std::unordered_map<ccf::NodeId, ccf::SeqNo> acks = {};
std::unordered_map<ccf::NodeId, Ack> acks = {};
MembershipState membership_state;
std::optional<LeadershipState> leadership_state = std::nullopt;
std::optional<RetirementPhase> retirement_phase = std::nullopt;
Expand All @@ -206,6 +212,9 @@ namespace kv
bool ticking = false;
};

DECLARE_JSON_TYPE(ConsensusDetails::Ack);
DECLARE_JSON_REQUIRED_FIELDS(ConsensusDetails::Ack, seqno, last_received_ms);

DECLARE_JSON_TYPE_WITH_OPTIONAL_FIELDS(ConsensusDetails);
DECLARE_JSON_REQUIRED_FIELDS(
ConsensusDetails,
Expand Down
21 changes: 17 additions & 4 deletions tests/election.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
# This test starts from a given number of nodes (hosts), commits
# a transaction, stops the current primary, waits for an election and repeats
# this process until no progress can be made (i.e. no primary can be elected
# as F > N/2).+
# as F > N/2).


@reqs.description("Stop current primary and wait for a new one to be elected")
def test_kill_primary_no_reqs(network, args):
primary, _ = network.find_primary_and_any_backup()
primary.stop()
network.wait_for_new_primary(primary)
old_primary, _ = network.find_primary_and_any_backup()
old_primary.stop()
new_primary, _ = network.wait_for_new_primary(old_primary)

# Verify that the TxID reported just after an election is valid
# Note that the first TxID read after an election may be of a signature
Expand All @@ -35,6 +35,19 @@ def test_kill_primary_no_reqs(network, args):
r = c.get("/node/network")
c.wait_for_commit(r)

# Also verify that reported last ack time are as expected
jumaffre marked this conversation as resolved.
Show resolved Hide resolved
r = c.get("/node/consensus")
acks = r.body.json()["details"]["acks"]
for ack in acks.values():
if node is new_primary:
assert (
ack["last_received_ms"] < network.args.election_timeout_ms
), acks
else:
assert (
ack["last_received_ms"] == 0
), f"Backup {node.local_node_id} should report time of last acks of 0: {acks}"

return network


Expand Down
37 changes: 26 additions & 11 deletions tests/infra/health_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class HealthState(Enum):
stable = auto() # Network can commit new transactions
unavailable = auto() # The primary or majority of nodes are unreachable
election = auto() # An election is in progress
partition = auto() # Primary node is partitioned from a f backups


def get_primary(
Expand All @@ -39,11 +40,11 @@ def get_primary(
logs = None if verbose else []
r = c.get(
"/node/consensus", timeout=client_node_timeout_s, log_capture=logs
).body.json()
return (r["details"]["primary_id"], r["details"]["current_view"])
).body.json()["details"]
return (r["primary_id"], r["current_view"]), r["acks"]
except Exception as e:
LOG.warning(f"Could not connect to node {node.local_node_id}: {e}")
return None
return None, None


def get_network_health(network, get_primary_fn, client_node_timeout_s=3, verbose=True):
Expand All @@ -54,8 +55,9 @@ def get_network_health(network, get_primary_fn, client_node_timeout_s=3, verbose
majority = (len(nodes) + 1) // 2

primaries = {}
acks = {}
for node in nodes:
primaries[node.node_id] = get_primary_fn(
primaries[node.node_id], acks[node.node_id] = get_primary_fn(
node, client_node_timeout_s, verbose=verbose
)
assert len(primaries) == len(nodes)
Expand Down Expand Up @@ -83,6 +85,19 @@ def get_network_health(network, get_primary_fn, client_node_timeout_s=3, verbose
# about to lose its primaryship
return HealthState.election

# The current primary is one-way partitioned from f backups, i.e. cannot
# receive acks from a majority of backups
# Note: This can be removed once the CheckQuorum extension is implemented
# as the partitioned primary node will automatically step down.
delayed_acks = [
jumaffre marked this conversation as resolved.
Show resolved Hide resolved
ack
for ack in acks[most_common_primary[0]].values()
if ack["last_received_ms"] > network.args.election_timeout_ms
]

if len(delayed_acks) >= majority:
return HealthState.partition

return HealthState.stable if most_common_count >= majority else HealthState.election


Expand All @@ -106,7 +121,7 @@ def __init__(
self.verbose = verbose

def wait_for_recovery(self, timeout=None):
timeout = timeout or self.unstable_threshold_s
timeout = timeout or 2 * self.unstable_threshold_s
LOG.info(f"Waiting {timeout}s for recovery to be detected")
self.join(timeout=timeout)
# Stop thread manually if it does not terminate in time
Expand All @@ -123,7 +138,7 @@ def run(self):
a specific period (election_timeout * election_timeout_factor), the health
watcher automatically stops and a disaster recovery procedure should be staged.
"""
election_start_time = None
unstable_start_time = None

# Note: this currently does not detect one-way partitions backups -> primary
# See https://github.com/microsoft/CCF/issues/3688 for fix.
Expand All @@ -138,12 +153,12 @@ def run(self):
verbose=self.verbose,
)
if health_state == HealthState.stable:
election_start_time = None
unstable_start_time = None
else:
LOG.info("Network is unstable")
if election_start_time is None:
election_start_time = time.time()
if time.time() - election_start_time > self.unstable_threshold_s:
LOG.info(f"Network is unstable: {health_state.name}")
if unstable_start_time is None:
unstable_start_time = time.time()
if time.time() - unstable_start_time > self.unstable_threshold_s:
LOG.error(
f"Network has been unstable for more than {self.unstable_threshold_s}s. Exiting"
)
Expand Down
6 changes: 3 additions & 3 deletions tests/infra/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ def wait_for_new_primary(
flush_info(logs, None)
delay = time.time() - start_time
LOG.info(
f"New primary after {delay}s is {new_primary.local_node_id} ({new_primary.node_id}) in term {new_term}"
f"New primary after {delay:.2f}s is {new_primary.local_node_id} ({new_primary.node_id}) in term {new_term}"
)
return (new_primary, new_term)
except PrimaryNotFound:
Expand Down Expand Up @@ -1107,7 +1107,7 @@ def wait_for_new_primary_in(
flush_info(logs, None)
delay = time.time() - start_time
LOG.info(
f"New primary after {delay}s is {new_primary.local_node_id} ({new_primary.node_id}) in term {new_term}"
f"New primary after {delay:.2f}s is {new_primary.local_node_id} ({new_primary.node_id}) in term {new_term}"
)
return (new_primary, new_term)
except PrimaryNotFound:
Expand Down Expand Up @@ -1156,7 +1156,7 @@ def wait_for_primary_unanimity(
assert all_good, f"Multiple primaries: {primaries}"
delay = time.time() - start_time
LOG.info(
f"Primary unanimity after {delay}s: {primaries[0].local_node_id} ({primaries[0].node_id})"
f"Primary unanimity after {delay:.2f}s: {primaries[0].local_node_id} ({primaries[0].node_id})"
)
return primaries[0]

Expand Down
Loading