Skip to content

Commit

Permalink
Refactor the transfer reassembly state machine to enhance its maintai…
Browse files Browse the repository at this point in the history
…nability and robustness (#215)

This is a step towards improving the transfer reassembler.

This changeset also renames `redundant_transport_index` as
`redundant_iface_index` for consistency with the other implementations;
this change is not visible at the API level.
  • Loading branch information
pavel-kirienko committed Apr 29, 2023
1 parent 69ed329 commit 73d0a9c
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 123 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ If you want to contribute, please read [`CONTRIBUTING.md`](/CONTRIBUTING.md).
- Detailed time complexity and memory requirement models for the benefit of real-time high-integrity applications.
- Purely reactive API without the need for background servicing.
- Support for the Classic CAN and CAN FD.
- Support for redundant transports.
- Support for redundant network interfaces.
- Compatibility with 8/16/32/64-bit platforms.
- Compatibility with extremely resource-constrained baremetal environments starting from 32K ROM and 32K RAM.
- Implemented in ≈1000 lines of code.
Expand Down Expand Up @@ -124,7 +124,7 @@ Use [Nunavut](https://github.com/OpenCyphal/nunavut) to automatically generate
The CAN frames generated from the message transfer are now stored in the `queue`.
We need to pick them out one by one and have them transmitted.
Normally, the following fragment should be invoked periodically to unload the CAN frames from the
prioritized transmission queue (or several, if redundant interfaces are used) into the CAN driver:
prioritized transmission queue (or several, if redundant network interfaces are used) into the CAN driver:

```c
for (const CanardTxQueueItem* ti = NULL; (ti = canardTxPeek(&queue)) != NULL;) // Peek at the top of the queue.
Expand Down Expand Up @@ -237,6 +237,10 @@ If you find the examples to be unclear or incorrect, please, open a ticket.
- Simplify the transfer reassembly state machine to address [#212](https://github.com/OpenCyphal/libcanard/issues/212).
See also <https://forum.opencyphal.org/t/amendment-to-the-transfer-reception-state-machine-implementations/1870>.
#### v3.1.1
- Refactor the transfer reassembly state machine to enhance its maintainability and robustness.
### v3.0
- Update branding as [UAVCAN v1 is renamed to Cyphal](https://forum.opencyphal.org/t/uavcan-v1-is-now-cyphal/1622).
Expand Down
121 changes: 69 additions & 52 deletions libcanard/canard.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ typedef struct CanardInternalRxSession
uint8_t* payload; ///< Dynamically allocated and handed off to the application when done.
TransferCRC calculated_crc; ///< Updated with the received payload in real time.
CanardTransferID transfer_id;
uint8_t redundant_transport_index; ///< Arbitrary value in [0, 255].
uint8_t redundant_iface_index; ///< Arbitrary value in [0, 255].
bool toggle;
} CanardInternalRxSession;

Expand Down Expand Up @@ -802,6 +802,53 @@ CANARD_PRIVATE int8_t rxSessionAcceptFrame(CanardInstance* const ins,
return out;
}

/// Evaluates the state of the RX session with respect to time and the new frame, and restarts it if necessary.
/// The next step is to accept the frame if the transfer-ID, toggle but, and transport index match; reject otherwise.

This comment has been minimized.

Copy link
@lydia-at-amazon

lydia-at-amazon Jul 30, 2024

Should this say toggle bit instead of toggle but?

This comment has been minimized.

Copy link
@pavel-kirienko

pavel-kirienko Jul 30, 2024

Author Member

yes

/// The logic of this function is simple: it restarts the reassembler if the start-of-transfer flag is set and
/// any two of the three conditions are met:
///
/// - The frame has arrived over the same interface as the previous transfer.
/// - New transfer-ID is detected.
/// - The transfer-ID timeout has expired.
///
/// Notice that mere TID-timeout is not enough to restart to prevent the interface index oscillation;
/// while this is not visible at the application layer, it may delay the transfer arrival.
CANARD_PRIVATE void rxSessionSynchronize(CanardInternalRxSession* const rxs,
const RxFrameModel* const frame,
const uint8_t redundant_iface_index,
const CanardMicrosecond transfer_id_timeout_usec)
{
CANARD_ASSERT(rxs != NULL);
CANARD_ASSERT(frame != NULL);
CANARD_ASSERT(rxs->transfer_id <= CANARD_TRANSFER_ID_MAX);
CANARD_ASSERT(frame->transfer_id <= CANARD_TRANSFER_ID_MAX);

const bool same_transport = rxs->redundant_iface_index == redundant_iface_index;
// Examples: rxComputeTransferIDDifference(2, 3)==31
// rxComputeTransferIDDifference(2, 2)==0
// rxComputeTransferIDDifference(2, 1)==1
const bool tid_new = rxComputeTransferIDDifference(rxs->transfer_id, frame->transfer_id) > 1;
// The transfer ID timeout is measured relative to the timestamp of the last start-of-transfer frame.
const bool tid_timeout = (frame->timestamp_usec > rxs->transfer_timestamp_usec) &&
((frame->timestamp_usec - rxs->transfer_timestamp_usec) > transfer_id_timeout_usec);

const bool restartable = (same_transport && tid_new) || //
(same_transport && tid_timeout) || //
(tid_new && tid_timeout);
// Restarting the transfer reassembly only makes sense if the new frame is a start of transfer.
// Otherwise, the new transfer would be impossible to reassemble anyway since the first frame is lost.
if (frame->start_of_transfer && restartable)
{
CANARD_ASSERT(frame->start_of_transfer);
rxs->total_payload_size = 0U;
rxs->payload_size = 0U; // The buffer is not released because we still need it.
rxs->calculated_crc = CRC_INITIAL;
rxs->transfer_id = frame->transfer_id;
rxs->toggle = INITIAL_TOGGLE_STATE;
rxs->redundant_iface_index = redundant_iface_index;
}
}

/// RX session state machine update is the most intricate part of any Cyphal transport implementation.
/// The state model used here is derived from the reference pseudocode given in the original UAVCAN v0 specification.
/// The Cyphal/CAN v1 specification, which this library is an implementation of, does not provide any reference
Expand All @@ -812,7 +859,7 @@ CANARD_PRIVATE int8_t rxSessionAcceptFrame(CanardInstance* const ins,
CANARD_PRIVATE int8_t rxSessionUpdate(CanardInstance* const ins,
CanardInternalRxSession* const rxs,
const RxFrameModel* const frame,
const uint8_t redundant_transport_index,
const uint8_t redundant_iface_index,
const CanardMicrosecond transfer_id_timeout_usec,
const size_t extent,
CanardRxTransfer* const out_transfer)
Expand All @@ -823,37 +870,7 @@ CANARD_PRIVATE int8_t rxSessionUpdate(CanardInstance* const ins,
CANARD_ASSERT(out_transfer != NULL);
CANARD_ASSERT(rxs->transfer_id <= CANARD_TRANSFER_ID_MAX);
CANARD_ASSERT(frame->transfer_id <= CANARD_TRANSFER_ID_MAX);

// The transfer ID timeout is measured relative to the timestamp of the last start-of-transfer frame.
// Triggering a TID timeout when the TID is the same is undesirable because it may cause the reassembler to
// switch to another interface if the start-of-transfer frame of the current transfer is duplicated
// on the other interface more than (transfer-ID timeout) units of time after the start of
// the transfer while the reassembly of this transfer is still in progress.
// While this behavior is not visible to the application because the transfer will still be reassembled,
// it may delay the delivery of the transfer.
const bool tid_timed_out = (frame->timestamp_usec > rxs->transfer_timestamp_usec) &&
(frame->transfer_id != rxs->transfer_id) &&
((frame->timestamp_usec - rxs->transfer_timestamp_usec) > transfer_id_timeout_usec);
// Examples: rxComputeTransferIDDifference(2, 3)==31
// rxComputeTransferIDDifference(2, 2)==0
// rxComputeTransferIDDifference(2, 1)==1
const bool not_previous_tid = rxComputeTransferIDDifference(rxs->transfer_id, frame->transfer_id) > 1;
// Restarting the transfer reassembly only makes sense if the new frame is a start of transfer.
// Otherwise, the new transfer would be impossible to reassemble anyway since the first frame is lost.
const bool need_restart =
frame->start_of_transfer &&
(tid_timed_out || ((rxs->redundant_transport_index == redundant_transport_index) && not_previous_tid));
if (need_restart)
{
CANARD_ASSERT(frame->start_of_transfer);
rxs->total_payload_size = 0U;
rxs->payload_size = 0U;
rxs->calculated_crc = CRC_INITIAL;
rxs->transfer_id = frame->transfer_id;
rxs->toggle = INITIAL_TOGGLE_STATE;
rxs->redundant_transport_index = redundant_transport_index;
}

rxSessionSynchronize(rxs, frame, redundant_iface_index, transfer_id_timeout_usec);
int8_t out = 0;
// The purpose of the correct_start check is to reduce the possibility of accepting a malformed multi-frame
// transfer in the event of a CRC collision. The scenario where this failure mode would manifest is as follows:
Expand All @@ -864,13 +881,13 @@ CANARD_PRIVATE int8_t rxSessionUpdate(CanardInstance* const ins,
// 4. The last frame of the multi-frame transfer is erroneously accepted even though it is malformed.
// The correct_start check eliminates this failure mode by ensuring that the first frame is observed.
// See https://github.com/OpenCyphal/libcanard/issues/189.
const bool correct_transport = (rxs->redundant_transport_index == redundant_transport_index);
const bool correct_toggle = (frame->toggle == rxs->toggle);
const bool correct_tid = (frame->transfer_id == rxs->transfer_id);
const bool correct_start = frame->start_of_transfer //
? (0 == rxs->total_payload_size)
: (rxs->total_payload_size > 0);
if (correct_transport && correct_toggle && correct_tid && correct_start)
const bool correct_iface = (rxs->redundant_iface_index == redundant_iface_index);
const bool correct_toggle = (frame->toggle == rxs->toggle);
const bool correct_tid = (frame->transfer_id == rxs->transfer_id);
const bool correct_start = frame->start_of_transfer //
? (0 == rxs->total_payload_size)
: (rxs->total_payload_size > 0);
if (correct_iface && correct_toggle && correct_tid && correct_start)
{
out = rxSessionAcceptFrame(ins, rxs, frame, extent, out_transfer);
}
Expand All @@ -880,7 +897,7 @@ CANARD_PRIVATE int8_t rxSessionUpdate(CanardInstance* const ins,
CANARD_PRIVATE int8_t rxAcceptFrame(CanardInstance* const ins,
CanardRxSubscription* const subscription,
const RxFrameModel* const frame,
const uint8_t redundant_transport_index,
const uint8_t redundant_iface_index,
CanardRxTransfer* const out_transfer)
{
CANARD_ASSERT(ins != NULL);
Expand All @@ -904,14 +921,14 @@ CANARD_PRIVATE int8_t rxAcceptFrame(CanardInstance* const ins,
subscription->sessions[frame->source_node_id] = rxs;
if (rxs != NULL)
{
rxs->transfer_timestamp_usec = frame->timestamp_usec;
rxs->total_payload_size = 0U;
rxs->payload_size = 0U;
rxs->payload = NULL;
rxs->calculated_crc = CRC_INITIAL;
rxs->transfer_id = frame->transfer_id;
rxs->redundant_transport_index = redundant_transport_index;
rxs->toggle = INITIAL_TOGGLE_STATE;
rxs->transfer_timestamp_usec = frame->timestamp_usec;
rxs->total_payload_size = 0U;
rxs->payload_size = 0U;
rxs->payload = NULL;
rxs->calculated_crc = CRC_INITIAL;
rxs->transfer_id = frame->transfer_id;
rxs->redundant_iface_index = redundant_iface_index;
rxs->toggle = INITIAL_TOGGLE_STATE;
}
else
{
Expand All @@ -925,7 +942,7 @@ CANARD_PRIVATE int8_t rxAcceptFrame(CanardInstance* const ins,
out = rxSessionUpdate(ins,
subscription->sessions[frame->source_node_id],
frame,
redundant_transport_index,
redundant_iface_index,
subscription->transfer_id_timeout_usec,
subscription->extent,
out_transfer);
Expand Down Expand Up @@ -1098,7 +1115,7 @@ CanardTxQueueItem* canardTxPop(CanardTxQueue* const que, const CanardTxQueueItem
int8_t canardRxAccept(CanardInstance* const ins,
const CanardMicrosecond timestamp_usec,
const CanardFrame* const frame,
const uint8_t redundant_transport_index,
const uint8_t redundant_iface_index,
CanardRxTransfer* const out_transfer,
CanardRxSubscription** const out_subscription)
{
Expand Down Expand Up @@ -1126,7 +1143,7 @@ int8_t canardRxAccept(CanardInstance* const ins,
if (sub != NULL)
{
CANARD_ASSERT(sub->port_id == model.port_id);
out = rxAcceptFrame(ins, sub, &model, redundant_transport_index, out_transfer);
out = rxAcceptFrame(ins, sub, &model, redundant_iface_index, out_transfer);
}
else
{
Expand Down
12 changes: 6 additions & 6 deletions libcanard/canard.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
/// subscription are truncated following the Implicit Truncation Rule (ITR) defined by the Cyphal Specification --
/// the rule is implemented to facilitate backward-compatible DSDL data type extensibility.
///
/// The library supports a practically unlimited number of redundant transports.
/// The library supports a practically unlimited number of redundant interfaces.
///
/// The library is not thread-safe: if used in a concurrent environment, it is the responsibility of the application
/// to provide adequate synchronization.
Expand Down Expand Up @@ -511,9 +511,9 @@ CanardTxQueueItem* canardTxPop(CanardTxQueue* const que, const CanardTxQueueItem
///
/// The MTU of the accepted frame can be arbitrary; that is, any MTU is accepted. The DLC validity is irrelevant.
///
/// Any value of redundant_transport_index is accepted; that is, up to 256 redundant transports are supported.
/// The index of the transport from which the transfer is accepted is always the same as redundant_transport_index
/// of the current invocation, so the application can always determine which transport has delivered the transfer.
/// Any value of redundant_iface_index is accepted; that is, up to 256 redundant interfaces are supported.
/// The index of the interface from which the transfer is accepted is always the same as redundant_iface_index
/// of the current invocation, so the application can always determine which interface has delivered the transfer.
///
/// Upon return, the out_subscription pointer will point to the instance of CanardRxSubscription that accepted this
/// frame; if no matching subscription exists (i.e., frame discarded), the pointer will be NULL.
Expand Down Expand Up @@ -593,7 +593,7 @@ CanardTxQueueItem* canardTxPop(CanardTxQueue* const que, const CanardTxQueueItem
int8_t canardRxAccept(CanardInstance* const ins,
const CanardMicrosecond timestamp_usec,
const CanardFrame* const frame,
const uint8_t redundant_transport_index,
const uint8_t redundant_iface_index,
CanardRxTransfer* const out_transfer,
CanardRxSubscription** const out_subscription);

Expand All @@ -613,7 +613,7 @@ int8_t canardRxAccept(CanardInstance* const ins,
/// whether its payload is truncated.
///
/// The default transfer-ID timeout value is defined as CANARD_DEFAULT_TRANSFER_ID_TIMEOUT_USEC; use it if not sure.
/// The redundant transport fail-over timeout (if redundant transports are used) is the same as the transfer-ID timeout.
/// The redundant interface fail-over timeout (if redundant interfaces are used) is the same as the transfer-ID timeout.
/// It may be reduced in a future release of the library, but it will not affect the backward compatibility.
///
/// The return value is 1 if a new subscription has been created as requested.
Expand Down
18 changes: 9 additions & 9 deletions tests/exposed.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ struct TxItem final : CanardTxQueueItem

struct RxSession
{
CanardMicrosecond transfer_timestamp_usec = std::numeric_limits<std::uint64_t>::max();
std::size_t total_payload_size = 0U;
std::size_t payload_size = 0U;
std::uint8_t* payload = nullptr;
TransferCRC calculated_crc = 0U;
CanardTransferID transfer_id = std::numeric_limits<std::uint8_t>::max();
std::uint8_t redundant_transport_index = std::numeric_limits<std::uint8_t>::max();
bool toggle = false;
CanardMicrosecond transfer_timestamp_usec = std::numeric_limits<std::uint64_t>::max();
std::size_t total_payload_size = 0U;
std::size_t payload_size = 0U;
std::uint8_t* payload = nullptr;
TransferCRC calculated_crc = 0U;
CanardTransferID transfer_id = std::numeric_limits<std::uint8_t>::max();
std::uint8_t redundant_iface_index = std::numeric_limits<std::uint8_t>::max();
bool toggle = false;
};

struct RxFrameModel
Expand Down Expand Up @@ -112,7 +112,7 @@ void rxSessionRestart(CanardInstance* const ins, RxSession* const rxs);
auto rxSessionUpdate(CanardInstance* const ins,
RxSession* const rxs,
const RxFrameModel* const frame,
const std::uint8_t redundant_transport_index,
const std::uint8_t redundant_iface_index,
const CanardMicrosecond transfer_id_timeout_usec,
const std::size_t extent,
CanardRxTransfer* const out_transfer) -> std::int8_t;
Expand Down
9 changes: 2 additions & 7 deletions tests/helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,11 @@ class Instance

[[nodiscard]] auto rxAccept(const CanardMicrosecond timestamp_usec,
const CanardFrame& frame,
const uint8_t redundant_transport_index,
const uint8_t redundant_iface_index,
CanardRxTransfer& out_transfer,
CanardRxSubscription** const out_subscription)
{
return canardRxAccept(&canard_,
timestamp_usec,
&frame,
redundant_transport_index,
&out_transfer,
out_subscription);
return canardRxAccept(&canard_, timestamp_usec, &frame, redundant_iface_index, &out_transfer, out_subscription);
}

[[nodiscard]] auto rxSubscribe(const CanardTransferKind transfer_kind,
Expand Down
Loading

0 comments on commit 73d0a9c

Please sign in to comment.