Skip to content

Commit

Permalink
Refactor link capabilities (#280)
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland authored Nov 17, 2023
1 parent e420029 commit b5483d7
Show file tree
Hide file tree
Showing 19 changed files with 329 additions and 201 deletions.
48 changes: 34 additions & 14 deletions include/zenoh-pico/link/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,44 @@
#include "zenoh-pico/utils/result.h"

/**
* Link capabilities values, defined as a bitmask.
* Link transport capability enum.
*
* Enumerators:
* Z_LINK_CAPABILITY_NONE: Bitmask to define that link has no capabilities.
* Z_LINK_CAPABILITY_RELIABLE: Bitmask to define and check if link is reliable.
* Z_LINK_CAPABILITY_STREAMED: Bitmask to define and check if link is streamed.
* Z_LINK_CAPABILITY_MULTICAST: Bitmask to define and check if link is multicast.
* Z_LINK_CAP_TRANSPORT_UNICAST: Link has unicast capabilities.
* Z_LINK_CAP_TRANSPORT_MULTICAST: Link has multicast capabilities.
*/
typedef enum {
Z_LINK_CAPABILITY_NONE = 0x00, // 0
Z_LINK_CAPABILITY_RELIABLE = 0x01, // 1 << 0
Z_LINK_CAPABILITY_STREAMED = 0x02, // 1 << 1
Z_LINK_CAPABILITY_MULTICAST = 0x04 // 1 << 2
} _z_link_capabilities_t;
Z_LINK_CAP_TRANSPORT_UNICAST = 0,
Z_LINK_CAP_TRANSPORT_MULTICAST = 1,
} _z_link_cap_transport_t;

#define _Z_LINK_IS_RELIABLE(X) ((X & Z_LINK_CAPABILITY_RELIABLE) == Z_LINK_CAPABILITY_RELIABLE)
#define _Z_LINK_IS_STREAMED(X) ((X & Z_LINK_CAPABILITY_STREAMED) == Z_LINK_CAPABILITY_STREAMED)
#define _Z_LINK_IS_MULTICAST(X) ((X & Z_LINK_CAPABILITY_MULTICAST) == Z_LINK_CAPABILITY_MULTICAST)
/**
* Link flow capability enum.
*
* Enumerators:
* Z_LINK_CAP_FLOW_STREAM: Link use datagrams.
* Z_LINK_CAP_FLOW_DATAGRAM: Link use byte stream.
*/
typedef enum {
Z_LINK_CAP_FLOW_DATAGRAM = 0,
Z_LINK_CAP_FLOW_STREAM = 1,
} _z_link_cap_flow_t;

/**
* Link capabilities, stored as a register-like object.
*
* Fields:
* transport: 2 bits, see _z_link_cap_transport_t enum.
* flow: 1 bit, see _z_link_cap_flow_t enum.
* reliable: 1 bit, 1 if the link is reliable (network definition)
* reserved: 4 bits, reserved for futur use
*/
typedef struct _z_link_capabilities_t {
uint8_t _transport : 2;
uint8_t _flow : 1;
uint8_t _is_reliable : 1;
uint8_t _reserved : 4;
} _z_link_capabilities_t;

struct _z_link_t; // Forward declaration to be used in _z_f_link_*

Expand Down Expand Up @@ -104,7 +124,7 @@ typedef struct _z_link_t {
_z_f_link_free _free_f;

uint16_t _mtu;
uint8_t _capabilities;
_z_link_capabilities_t _cap;
} _z_link_t;

void _z_link_clear(_z_link_t *zl);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/transport/common/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/transport/transport.h"

void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed);
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability);
/*This function is unsafe because it operates in potentially concurrent
data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */
int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);
Expand Down
12 changes: 11 additions & 1 deletion src/link/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,17 @@ size_t _z_link_recv_exact_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, size_t len

int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) {
int8_t ret = _Z_RES_OK;
_Bool link_is_streamed = _Z_LINK_IS_STREAMED(link->_capabilities);
_Bool link_is_streamed = false;

switch (link->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
link_is_streamed = true;
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
link_is_streamed = false;
break;
}
for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) {
_z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i));
size_t n = bs.len;
Expand Down
5 changes: 4 additions & 1 deletion src/link/multicast/bt.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ uint16_t _z_get_link_mtu_bt(void) { return SPP_MAXIMUM_PAYLOAD; }
int8_t _z_new_link_bt(_z_link_t *zl, _z_endpoint_t endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_STREAMED | Z_LINK_CAPABILITY_MULTICAST;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM;
zl->_cap._is_reliable = false;

zl->_mtu = _z_get_link_mtu_bt();

zl->_endpoint = endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/multicast/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ uint16_t _z_get_link_mtu_udp_multicast(void) {
int8_t _z_new_link_udp_multicast(_z_link_t *zl, _z_endpoint_t endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_MULTICAST;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_MULTICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM;
zl->_cap._is_reliable = false;

zl->_mtu = _z_get_link_mtu_udp_multicast();

zl->_endpoint = endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/unicast/serial.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ uint16_t _z_get_link_mtu_serial(void) { return _Z_SERIAL_MTU_SIZE; }
int8_t _z_new_link_serial(_z_link_t *zl, _z_endpoint_t endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_NONE;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM;
zl->_cap._is_reliable = false;

zl->_mtu = _z_get_link_mtu_serial();

zl->_endpoint = endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/unicast/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ uint16_t _z_get_link_mtu_tcp(void) {
int8_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE | Z_LINK_CAPABILITY_STREAMED;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_STREAM;
zl->_cap._is_reliable = true;

zl->_mtu = _z_get_link_mtu_tcp();

zl->_endpoint = *endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/unicast/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ uint16_t _z_get_link_mtu_udp_unicast(void) {
int8_t _z_new_link_udp_unicast(_z_link_t *zl, _z_endpoint_t endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_NONE;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM;
zl->_cap._is_reliable = false;

zl->_mtu = _z_get_link_mtu_udp_unicast();

zl->_endpoint = endpoint;
Expand Down
5 changes: 4 additions & 1 deletion src/link/unicast/ws.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ uint16_t _z_get_link_mtu_ws(void) {
int8_t _z_new_link_ws(_z_link_t *zl, _z_endpoint_t *endpoint) {
int8_t ret = _Z_RES_OK;

zl->_capabilities = Z_LINK_CAPABILITY_RELIABLE;
zl->_cap._transport = Z_LINK_CAP_TRANSPORT_UNICAST;
zl->_cap._flow = Z_LINK_CAP_FLOW_DATAGRAM;
zl->_cap._is_reliable = true;

zl->_mtu = _z_get_link_mtu_ws();

zl->_endpoint = *endpoint;
Expand Down
47 changes: 26 additions & 21 deletions src/transport/common/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,37 @@ int8_t _z_link_recv_t_msg(_z_transport_message_t *t_msg, const _z_link_t *zl) {
_z_zbuf_t zbf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);
_z_zbuf_reset(&zbf);

if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) {
// Read the message length
if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) {
size_t len = 0;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8));
}
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
// Read the message length
if (_z_link_recv_exact_zbuf(zl, &zbf, _Z_MSG_LEN_ENC_SIZE, NULL) == _Z_MSG_LEN_ENC_SIZE) {
size_t len = 0;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
len |= (size_t)(_z_zbuf_read(&zbf) << (i * (uint8_t)8));
}

size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf);
if (writable >= len) {
// Read enough bytes to decode the message
if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
size_t writable = _z_zbuf_capacity(&zbf) - _z_zbuf_len(&zbf);
if (writable >= len) {
// Read enough bytes to decode the message
if (_z_link_recv_exact_zbuf(zl, &zbf, len, NULL) != len) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
} else {
ret = _Z_ERR_TRANSPORT_NO_SPACE;
}
} else {
ret = _Z_ERR_TRANSPORT_NO_SPACE;
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
} else {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
} else {
if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
if (_z_link_recv_zbuf(zl, &zbf, NULL) == SIZE_MAX) {
ret = _Z_ERR_TRANSPORT_RX_FAILED;
}
break;
default:
ret = _Z_ERR_GENERIC;
break;
}

if (ret == _Z_RES_OK) {
_z_transport_message_t l_t_msg;
ret = _z_transport_message_decode(&l_t_msg, &zbf);
Expand Down
75 changes: 52 additions & 23 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@
* Make sure that the following mutexes are locked before calling this function:
* - ztu->mutex_tx
*/
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) {
void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) {
_z_wbuf_reset(buf);

if (is_streamed == true) {
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, 0, i);
}
_z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE);
switch (link_flow_capability) {
// Stream capable links
case Z_LINK_CAP_FLOW_STREAM:
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, 0, i);
}
_z_wbuf_set_wpos(buf, _Z_MSG_LEN_ENC_SIZE);
break;
// Datagram capable links
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
break;
}
}

Expand All @@ -43,12 +50,20 @@ void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed) {
* Make sure that the following mutexes are locked before calling this function:
* - ztu->mutex_tx
*/
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed) {
if (is_streamed == true) {
size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, uint8_t link_flow_capability) {
switch (link_flow_capability) {
// Stream capable links
case Z_LINK_CAP_FLOW_STREAM: {
size_t len = _z_wbuf_len(buf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(buf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
}
break;
}
// Datagram capable links
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
break;
}
}

Expand All @@ -74,24 +89,38 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m
// Create and prepare the buffer to serialize the message on
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
_z_wbuf_t wbf = _z_wbuf_make(mtu, false);
if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) {
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, 0, i);
}
_z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE);
}

switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, 0, i);
}
_z_wbuf_set_wpos(&wbf, _Z_MSG_LEN_ENC_SIZE);
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}
// Encode the session message
ret = _z_transport_message_encode(&wbf, t_msg);
if (ret == _Z_RES_OK) {
// Write the message length in the reserved space if needed
if (_Z_LINK_IS_STREAMED(zl->_capabilities) == true) {
size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM: {
// Write the message length in the reserved space if needed
size_t len = _z_wbuf_len(&wbf) - _Z_MSG_LEN_ENC_SIZE;
for (uint8_t i = 0; i < _Z_MSG_LEN_ENC_SIZE; i++) {
_z_wbuf_put(&wbf, (uint8_t)((len >> (uint8_t)8 * i) & (uint8_t)0xFF), i);
}
break;
}
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}

// Send the wbuf on the socket
ret = _z_link_send_wbuf(zl, &wbf);
}
Expand Down
Loading

0 comments on commit b5483d7

Please sign in to comment.