Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amend the transfer reassembly state machine and prepare v3.1 release #213

Merged
merged 5 commits into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ If you find the examples to be unclear or incorrect, please, open a ticket.

## Revisions

### v3.1

- Remove the Dockerfile; use [toolshed](https://github.com/OpenCyphal/docker_toolchains/pkgs/container/toolshed)
instead if necessary.
- Simplify the transfer reassembly state machine to address [#212](https://github.com/OpenCyphal/libcanard/issues/212).
See also <https://forum.opencyphal.org/t/amendment-to-the-transfer-reception-state-machine-implementations/1870>.

### v3.0

- Update branding as [UAVCAN v1 is renamed to Cyphal](https://forum.opencyphal.org/t/uavcan-v1-is-now-cyphal/1622).
Expand Down
61 changes: 34 additions & 27 deletions libcanard/canard.c
Original file line number Diff line number Diff line change
Expand Up @@ -824,16 +824,28 @@ CANARD_PRIVATE int8_t rxSessionUpdate(CanardInstance* const ins,
CANARD_ASSERT(rxs->transfer_id <= CANARD_TRANSFER_ID_MAX);
CANARD_ASSERT(frame->transfer_id <= CANARD_TRANSFER_ID_MAX);

// The transfer ID timeout is measured relative to the timestamp of the last start-of-transfer frame.
// Triggering a TID timeout when the TID is the same is undesirable because it may cause the reassembler to
// switch to another interface if the start-of-transfer frame of the current transfer is duplicated
// on the other interface more than (transfer-ID timeout) units of time after the start of
// the transfer while the reassembly of this transfer is still in progress.
// While this behavior is not visible to the application because the transfer will still be reassembled,
// it may delay the delivery of the transfer.
const bool tid_timed_out = (frame->timestamp_usec > rxs->transfer_timestamp_usec) &&
(frame->transfer_id != rxs->transfer_id) &&
((frame->timestamp_usec - rxs->transfer_timestamp_usec) > transfer_id_timeout_usec);

// Examples: rxComputeTransferIDDifference(2, 3)==31
// rxComputeTransferIDDifference(2, 2)==0
// rxComputeTransferIDDifference(2, 1)==1
const bool not_previous_tid = rxComputeTransferIDDifference(rxs->transfer_id, frame->transfer_id) > 1;

const bool need_restart = tid_timed_out || ((rxs->redundant_transport_index == redundant_transport_index) &&
frame->start_of_transfer && not_previous_tid);

// Restarting the transfer reassembly only makes sense if the new frame is a start of transfer.
// Otherwise, the new transfer would be impossible to reassemble anyway since the first frame is lost.
const bool need_restart =
frame->start_of_transfer &&
(tid_timed_out || ((rxs->redundant_transport_index == redundant_transport_index) && not_previous_tid));
if (need_restart)
{
CANARD_ASSERT(frame->start_of_transfer);
rxs->total_payload_size = 0U;
rxs->payload_size = 0U;
rxs->calculated_crc = CRC_INITIAL;
Expand All @@ -843,29 +855,24 @@ CANARD_PRIVATE int8_t rxSessionUpdate(CanardInstance* const ins,
}

int8_t out = 0;
if (need_restart && (!frame->start_of_transfer))
{
rxSessionRestart(ins, rxs); // SOT-miss, no point going further.
}
else
// The purpose of the correct_start check is to reduce the possibility of accepting a malformed multi-frame
// transfer in the event of a CRC collision. The scenario where this failure mode would manifest is as follows:
// 1. A valid transfer (whether single- or multi-frame) is accepted with TID=X.
// 2. All frames of the subsequent multi-frame transfer with TID=X+1 are lost except for the last one.
// 3. The CRC of said multi-frame transfer happens to yield the correct residue when applied to the fragment
// of the payload contained in the last frame of the transfer (a CRC collision is in effect).
// 4. The last frame of the multi-frame transfer is erroneously accepted even though it is malformed.
// The correct_start check eliminates this failure mode by ensuring that the first frame is observed.
// See https://github.com/OpenCyphal/libcanard/issues/189.
const bool correct_transport = (rxs->redundant_transport_index == redundant_transport_index);
const bool correct_toggle = (frame->toggle == rxs->toggle);
const bool correct_tid = (frame->transfer_id == rxs->transfer_id);
const bool correct_start = frame->start_of_transfer //
? (0 == rxs->total_payload_size)
: (rxs->total_payload_size > 0);
if (correct_transport && correct_toggle && correct_tid && correct_start)
{
// The purpose of the correct_start check is to reduce the possibility of accepting a malformed multi-frame
// transfer in the event of a CRC collision. The scenario where this failure mode would manifest is as follows:
// 1. A valid transfer (whether single- or multi-frame) is accepted with TID=X.
// 2. All frames of the subsequent multi-frame transfer with TID=X+1 are lost except for the last one.
// 3. The CRC of said multi-frame transfer happens to yield the correct residue when applied to the fragment
// of the payload contained in the last frame of the transfer (a CRC collision is in effect).
// 4. The last frame of the multi-frame transfer is erroneously accepted even though it is malformed.
// The correct_start check eliminates this failure mode by ensuring that the first frame is observed.
// See https://github.com/OpenCyphal/libcanard/issues/189.
const bool correct_transport = (rxs->redundant_transport_index == redundant_transport_index);
const bool correct_toggle = (frame->toggle == rxs->toggle);
const bool correct_tid = (frame->transfer_id == rxs->transfer_id);
const bool correct_start = frame->start_of_transfer || (rxs->total_payload_size > 0);
if (correct_transport && correct_toggle && correct_tid && correct_start)
{
out = rxSessionAcceptFrame(ins, rxs, frame, extent, out_transfer);
}
out = rxSessionAcceptFrame(ins, rxs, frame, extent, out_transfer);
}
return out;
}
Expand Down
2 changes: 1 addition & 1 deletion libcanard/canard.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ extern "C" {
/// Semantic version of this library (not the Cyphal specification).
/// API will be backward compatible within the same major version.
#define CANARD_VERSION_MAJOR 3
#define CANARD_VERSION_MINOR 0
#define CANARD_VERSION_MINOR 1

/// The version number of the Cyphal specification implemented by this library.
#define CANARD_CYPHAL_SPECIFICATION_VERSION_MAJOR 1
Expand Down
44 changes: 30 additions & 14 deletions tests/test_private_rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ TEST_CASE("rxSessionUpdate")
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 16);
ins.getAllocator().deallocate(transfer.payload);

// Restart by TID timeout, not the first frame.
// TID timeout does not occur until SOT; see https://github.com/OpenCyphal/libcanard/issues/212.
frame.timestamp_usec = 30'000'000;
frame.transfer_id = 12; // Goes back.
frame.start_of_transfer = false;
Expand All @@ -507,18 +507,34 @@ TEST_CASE("rxSessionUpdate")
frame.payload = reinterpret_cast<const uint8_t*>("\x0A\x0A\x0A\x0A\x0A\x0A\x0A");
REQUIRE(0 == update(2, 1'000'000, 16));
REQUIRE(rxs.transfer_timestamp_usec == 20'000'100); // No change.
REQUIRE(rxs.payload_size == 0);
REQUIRE(rxs.payload == nullptr);
REQUIRE(rxs.calculated_crc == 0xFFFF);
REQUIRE(rxs.transfer_id == 13U);
REQUIRE(rxs.toggle);
REQUIRE(rxs.redundant_transport_index == 2);
REQUIRE(rxs.transfer_id == 14U); // No change.
REQUIRE(rxs.toggle); // No change.
REQUIRE(rxs.redundant_transport_index == 0); // No change.
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 0);
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 0);

// Restart by TID timeout. This may only occur when SOT is set.
frame.timestamp_usec = 30'000'000;
frame.transfer_id = 12; // Goes back.
frame.start_of_transfer = true;
frame.end_of_transfer = false;
frame.toggle = true;
frame.payload_size = 7;
frame.payload = reinterpret_cast<const uint8_t*>("\x0A\x0A\x0A\x0A\x0A\x0A\x0A");
REQUIRE(0 == update(2, 1'000'000, 16));
REQUIRE(rxs.transfer_timestamp_usec == 30'000'000); // Updated from the frame.
REQUIRE(rxs.payload_size == 7); // From the frame.
REQUIRE(rxs.payload != nullptr);
REQUIRE(rxs.calculated_crc == 0x23C7);
REQUIRE(rxs.transfer_id == 12U); // Updated from the frame.
REQUIRE(!rxs.toggle); // In anticipation of the next frame.
REQUIRE(rxs.redundant_transport_index == 2); // Updated from the update.
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1);
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 16);

// Restart by TID mismatch.
frame.timestamp_usec = 20'000'200; // Goes back.
frame.transfer_id = 11; // Goes back.
frame.transfer_id = 10; // Goes back.
frame.start_of_transfer = true;
frame.end_of_transfer = false;
frame.toggle = true;
Expand All @@ -529,15 +545,15 @@ TEST_CASE("rxSessionUpdate")
REQUIRE(rxs.payload_size == 7);
REQUIRE(0 == std::memcmp(rxs.payload, "\x0B\x0B\x0B\x0B\x0B\x0B\x0B", 7));
REQUIRE(rxs.calculated_crc == crc("\x0B\x0B\x0B\x0B\x0B\x0B\x0B"));
REQUIRE(rxs.transfer_id == 11U);
REQUIRE(rxs.transfer_id == 10U);
REQUIRE(!rxs.toggle);
REQUIRE(rxs.redundant_transport_index == 2);
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1);
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 16);

// Duplicate start rejected (toggle mismatch).
frame.timestamp_usec = 20'000'300;
frame.transfer_id = 11;
frame.transfer_id = 10;
frame.start_of_transfer = true;
frame.end_of_transfer = true;
frame.toggle = true;
Expand All @@ -548,15 +564,15 @@ TEST_CASE("rxSessionUpdate")
REQUIRE(rxs.payload_size == 7);
REQUIRE(0 == std::memcmp(rxs.payload, "\x0B\x0B\x0B\x0B\x0B\x0B\x0B", 7));
REQUIRE(rxs.calculated_crc == crc("\x0B\x0B\x0B\x0B\x0B\x0B\x0B"));
REQUIRE(rxs.transfer_id == 11U);
REQUIRE(rxs.transfer_id == 10U);
REQUIRE(!rxs.toggle);
REQUIRE(rxs.redundant_transport_index == 2);
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1);
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == 16);

// Continue & finalize.
frame.timestamp_usec = 20'000'400;
frame.transfer_id = 11;
frame.transfer_id = 10;
frame.start_of_transfer = false;
frame.end_of_transfer = true;
frame.toggle = false;
Expand All @@ -567,15 +583,15 @@ TEST_CASE("rxSessionUpdate")
REQUIRE(rxs.payload_size == 0);
REQUIRE(rxs.payload == nullptr);
REQUIRE(rxs.calculated_crc == 0xFFFF);
REQUIRE(rxs.transfer_id == 12U);
REQUIRE(rxs.transfer_id == 11U);
REQUIRE(rxs.toggle);
REQUIRE(rxs.redundant_transport_index == 2);
REQUIRE(transfer.timestamp_usec == 20'000'200);
REQUIRE(transfer.metadata.priority == CanardPrioritySlow);
REQUIRE(transfer.metadata.transfer_kind == CanardTransferKindMessage);
REQUIRE(transfer.metadata.port_id == 2'222);
REQUIRE(transfer.metadata.remote_node_id == 55);
REQUIRE(transfer.metadata.transfer_id == 11);
REQUIRE(transfer.metadata.transfer_id == 10);
REQUIRE(transfer.payload_size == 10);
REQUIRE(0 == std::memcmp(transfer.payload, "\x0B\x0B\x0B\x0B\x0B\x0B\x0B\x0D\x0D\x0D", 10));
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1);
Expand Down
105 changes: 105 additions & 0 deletions tests/test_public_rx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,108 @@ TEST_CASE("Issue189") // https://github.com/OpenCyphal/libcanard/issues/189
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1); // The payload buffer is gone.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == sizeof(RxSession));
}

TEST_CASE("Issue212")
{
using helpers::Instance;
using exposed::RxSession;

Instance ins;
CanardRxTransfer transfer{};
CanardRxSubscription* subscription = nullptr;

const auto accept = [&](const CanardMicrosecond timestamp_usec,
const std::uint8_t redundant_transport_index,
const std::uint32_t extended_can_id,
const std::vector<std::uint8_t>& payload) {
static std::vector<std::uint8_t> payload_storage;
payload_storage = payload;
CanardFrame frame{};
frame.extended_can_id = extended_can_id;
frame.payload_size = std::size(payload);
frame.payload = payload_storage.data();
return ins.rxAccept(timestamp_usec, frame, redundant_transport_index, transfer, &subscription);
};

ins.getAllocator().setAllocationCeiling(sizeof(RxSession) + 50); // A session and the payload buffer.

// Create a message subscription with the transfer-ID timeout of just one microsecond.
CanardRxSubscription sub_msg{};
REQUIRE(1 == ins.rxSubscribe(CanardTransferKindMessage, 0b0110011001100, 50, 1, sub_msg));
REQUIRE(ins.getMessageSubs().at(0) == &sub_msg);
REQUIRE(ins.getMessageSubs().at(0)->port_id == 0b0110011001100);
REQUIRE(ins.getMessageSubs().at(0)->extent == 50);
REQUIRE(ins.getMessageSubs().at(0)->transfer_id_timeout_usec == 1);
REQUIRE(ensureAllNullptr(ins.getMessageSubs().at(0)->sessions));
REQUIRE(ins.getResponseSubs().empty());
REQUIRE(ins.getRequestSubs().empty());

// Feed a multi-frame transfer with a time delta between its frames larger than the transfer-ID timeout.
// Here's how we compute the reference value of the transfer CRC:
// >>> from pycyphal.transport.commons.crc import CRC16CCITT
// >>> CRC16CCITT.new(bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14])).value_as_bytes
// b'2\xf8'
subscription = nullptr;
REQUIRE(0 == accept(100'000'001, // first frame
1,
0b001'00'0'11'0110011001100'0'0100111,
{1, 2, 3, 4, 5, 6, 7, 0b101'00010}));
REQUIRE(0 == accept(101'000'001, // second frame
1,
0b001'00'0'11'0110011001100'0'0100111,
{8, 9, 10, 11, 12, 13, 14, 0b000'00010}));
REQUIRE(1 == accept(102'000'002, // third and last frame
1,
0b001'00'0'11'0110011001100'0'0100111,
{0x32, 0xF8, 0b011'00010}));
REQUIRE(subscription != nullptr); // Subscription exists.
REQUIRE(subscription->port_id == 0b0110011001100);
REQUIRE(transfer.timestamp_usec == 100'000'001);
REQUIRE(transfer.metadata.priority == CanardPriorityImmediate);
REQUIRE(transfer.metadata.transfer_kind == CanardTransferKindMessage);
REQUIRE(transfer.metadata.port_id == 0b0110011001100);
REQUIRE(transfer.metadata.remote_node_id == 0b0100111);
REQUIRE(transfer.metadata.transfer_id == 2);
REQUIRE(transfer.payload_size == 14);
REQUIRE(0 == std::memcmp(transfer.payload, "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E", 14));
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 2); // The SESSION and the PAYLOAD BUFFER.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == (sizeof(RxSession) + 50));
REQUIRE(ins.getMessageSubs().at(0)->sessions[0b0100111] != nullptr);
ins.getAllocator().deallocate(transfer.payload);
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1); // The payload buffer is gone.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == sizeof(RxSession));

// Similar but the reassembler should switch to the other transport.
REQUIRE(0 == accept(110'000'001, // first frame, transport #1
1,
0b001'00'0'11'0110011001100'0'0100111,
{1, 2, 3, 4, 5, 6, 7, 0b101'00011}));
REQUIRE(0 == accept(111'000'001, // first frame, transport #0
0,
0b001'00'0'11'0110011001100'0'0100111,
{1, 2, 3, 4, 5, 6, 7, 0b101'00011}));
REQUIRE(0 == accept(112'000'001, // second frame, transport #1
1,
0b001'00'0'11'0110011001100'0'0100111,
{8, 9, 10, 11, 12, 13, 14, 0b000'00011}));
REQUIRE(1 == accept(113'000'002, // third and last frame, transport #1
1,
0b001'00'0'11'0110011001100'0'0100111,
{0x32, 0xF8, 0b011'00011}));
REQUIRE(subscription != nullptr); // Subscription exists.
REQUIRE(subscription->port_id == 0b0110011001100);
REQUIRE(transfer.timestamp_usec == 110'000'001);
REQUIRE(transfer.metadata.priority == CanardPriorityImmediate);
REQUIRE(transfer.metadata.transfer_kind == CanardTransferKindMessage);
REQUIRE(transfer.metadata.port_id == 0b0110011001100);
REQUIRE(transfer.metadata.remote_node_id == 0b0100111);
REQUIRE(transfer.metadata.transfer_id == 3);
REQUIRE(transfer.payload_size == 14);
REQUIRE(0 == std::memcmp(transfer.payload, "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0A\x0B\x0C\x0D\x0E", 14));
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 2); // The SESSION and the PAYLOAD BUFFER.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == (sizeof(RxSession) + 50));
REQUIRE(ins.getMessageSubs().at(0)->sessions[0b0100111] != nullptr);
ins.getAllocator().deallocate(transfer.payload);
REQUIRE(ins.getAllocator().getNumAllocatedFragments() == 1); // The payload buffer is gone.
REQUIRE(ins.getAllocator().getTotalAllocatedAmount() == sizeof(RxSession));
}