Skip to content

Commit

Permalink
implement fragmentation
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Aug 18, 2023
1 parent 5238f25 commit 0be6ad0
Show file tree
Hide file tree
Showing 13 changed files with 28 additions and 12 deletions.
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef struct {
} _z_bytes_t;

_z_bytes_t _z_bytes_empty(void);
inline static _Bool _z_bytes_check(_z_bytes_t value) { return value.start != NULL; }
int8_t _z_bytes_init(_z_bytes_t *bs, size_t capacity);
_z_bytes_t _z_bytes_make(size_t capacity);
_z_bytes_t _z_bytes_wrap(const uint8_t *bs, size_t len);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ _z_transport_message_t _z_t_msg_make_close(uint8_t reason, _Bool link_only);
_z_transport_message_t _z_t_msg_make_keep_alive(void);
_z_transport_message_t _z_t_msg_make_frame(_z_zint_t sn, _z_network_message_vec_t messages, _Bool is_reliable);
_z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, _Bool is_reliable);
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, _Bool is_reliable, _Bool is_last);
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_bytes_t messages, _Bool is_reliable, _Bool is_last);

/*------------------ Copy ------------------*/
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ _z_zbuf_t _z_zbuf_view(_z_zbuf_t *zbf, size_t length);
_z_zbuf_t _z_zbytes_as_zbuf(_z_bytes_t slice);

size_t _z_zbuf_capacity(const _z_zbuf_t *zbf);
uint8_t const *_z_zbuf_start(const _z_zbuf_t *zbf);
size_t _z_zbuf_len(const _z_zbuf_t *zbf);
_Bool _z_zbuf_can_read(const _z_zbuf_t *zbf);
size_t _z_zbuf_space_left(const _z_zbuf_t *zbf);
Expand Down
3 changes: 1 addition & 2 deletions src/link/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ size_t _z_link_recv_exact_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, size_t len

int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) {
int8_t ret = _Z_RES_OK;

for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) || (ret == -1); i++) {
for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)); i++) {
_z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i));
size_t n = bs.len;
do {
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ int8_t _z_zint64_decode(uint64_t *zint, _z_zbuf_t *zbf) {
int8_t _z_bytes_val_encode(_z_wbuf_t *wbf, const _z_bytes_t *bs) {
int8_t ret = _Z_RES_OK;

if ((wbf->_expansion_step = true) && (bs->len > Z_TSID_LENGTH)) {
if ((wbf->_expansion_step != 0) && (bs->len > Z_TSID_LENGTH)) {
// ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len);
ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len);
} else {
Expand Down
6 changes: 4 additions & 2 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <stddef.h>
#include <stdint.h>

#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/protocol/codec/core.h"
#include "zenoh-pico/protocol/codec/ext.h"
#include "zenoh-pico/protocol/codec/network.h"
Expand Down Expand Up @@ -380,7 +381,7 @@ int8_t _z_fragment_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_fragmen
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
ret = _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
if (ret == _Z_RES_OK) {
if (ret == _Z_RES_OK && _z_bytes_check(msg->_payload)) {
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_payload))
}

Expand All @@ -398,7 +399,8 @@ int8_t _z_fragment_decode(_z_t_msg_fragment_t *msg, _z_zbuf_t *zbf, uint8_t head
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x05);
}

ret |= _z_bytes_decode(&msg->_payload, zbf);
__auto_type bytes = _z_bytes_wrap((uint8_t *)_z_zbuf_start(zbf), _z_zbuf_len(zbf));
_z_bytes_copy(&msg->_payload, &bytes);

return ret;
}
Expand Down
6 changes: 5 additions & 1 deletion src/protocol/definitions/transport.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "zenoh-pico/protocol/definitions/transport.h"

#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/utils/logging.h"

void _z_s_msg_scout_clear(_z_s_msg_scout_t *msg) {}
Expand Down Expand Up @@ -218,10 +219,13 @@ _z_transport_message_t _z_t_msg_make_frame_header(_z_zint_t sn, _Bool is_reliabl
}

/*------------------ Fragment Message ------------------*/
_z_transport_message_t _z_t_msg_make_fragment_header(_z_zint_t sn, _Bool is_reliable, _Bool is_last) {
return _z_t_msg_make_fragment(sn, _z_bytes_empty(), is_reliable, is_last);
}
_z_transport_message_t _z_t_msg_make_fragment(_z_zint_t sn, _z_bytes_t payload, _Bool is_reliable, _Bool is_last) {
_z_transport_message_t msg;
msg._header = _Z_MID_T_FRAGMENT;
if (is_last == true) {
if (is_last == false) {
_Z_SET_FLAG(msg._header, _Z_FLAG_T_FRAGMENT_M);
}
if (is_reliable == true) {
Expand Down
1 change: 1 addition & 0 deletions src/protocol/iobuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ size_t _z_zbuf_capacity(const _z_zbuf_t *zbf) { return zbf->_ios._capacity; }

size_t _z_zbuf_space_left(const _z_zbuf_t *zbf) { return _z_iosli_writable(&zbf->_ios); }

uint8_t const *_z_zbuf_start(const _z_zbuf_t *zbf) { return _z_ptr_u8_offset(zbf->_ios._buf, zbf->_ios._r_pos); }
size_t _z_zbuf_len(const _z_zbuf_t *zbf) { return _z_iosli_readable(&zbf->_ios); }

_Bool _z_zbuf_can_read(const _z_zbuf_t *zbf) { return _z_zbuf_len(zbf) > (size_t)0; }
Expand Down
4 changes: 3 additions & 1 deletion src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "zenoh-pico/transport/link/tx.h"

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/protocol/codec/core.h"
#include "zenoh-pico/protocol/codec/transport.h"
#include "zenoh-pico/protocol/definitions/transport.h"
#include "zenoh-pico/utils/logging.h"
Expand Down Expand Up @@ -111,7 +112,8 @@ int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_rel
do {
size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation

_z_transport_message_t f_hdr = _z_t_msg_make_frame_header(sn, reliability == Z_RELIABILITY_RELIABLE);
_z_transport_message_t f_hdr =
_z_t_msg_make_fragment_header(sn, reliability == Z_RELIABILITY_RELIABLE, is_final);
ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header
if (ret == _Z_RES_OK) {
size_t space_left = _z_wbuf_space_left(dst);
Expand Down
4 changes: 2 additions & 2 deletions src/transport/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ int8_t _z_transport_unicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_unic
expandable = false;
dbuf_size = Z_FRAG_MAX_SIZE;
#endif
zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, expandable);
zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, false);
zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);

// Initialize the defragmentation buffers
Expand Down Expand Up @@ -197,7 +197,7 @@ int8_t _z_transport_multicast(_z_transport_t *zt, _z_link_t *zl, _z_transport_mu
// Initialize the read and write buffers
if (ret == _Z_RES_OK) {
uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE;
zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, _Z_LINK_IS_STREAMED(zl->_capabilities));
zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, false);
zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE);

// Clean up the buffers if one of them failed to be allocated
Expand Down
5 changes: 4 additions & 1 deletion src/transport/unicast/link/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "zenoh-pico/protocol/codec/network.h"
#include "zenoh-pico/protocol/codec/transport.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/iobuf.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/transport/utils.h"
#include "zenoh-pico/utils/logging.h"
Expand Down Expand Up @@ -127,7 +128,7 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans
}

case _Z_MID_T_FRAGMENT: {
_Z_INFO("Received Z_FRAGMENT message\n");
_Z_INFO("Received Z_FRAGMENT message, len: %ld\n", t_msg->_body._fragment._payload.len);

_z_wbuf_t *dbuf = _Z_HAS_FLAG(t_msg->_header, _Z_FLAG_T_FRAGMENT_R)
? &ztu->_dbuf_reliable
Expand Down Expand Up @@ -158,6 +159,8 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans
_z_handle_zenoh_message(ztu->_session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE);
_z_msg_clear(&zm); // Clear must be explicitly called for fragmented zenoh messages. Non-fragmented
// zenoh messages are released when their transport message is released.
} else {
_Z_DEBUG("Failed to decode defragmented message\n");
}

// Free the decoding buffer
Expand Down
4 changes: 3 additions & 1 deletion src/transport/unicast/link/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "zenoh-pico/transport/link/tx.h"

#include <assert.h>

#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/codec/network.h"
#include "zenoh-pico/protocol/codec/transport.h"
Expand Down Expand Up @@ -122,7 +124,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._expansion_step - 8, true);
_z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - 12, true);

ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down
2 changes: 1 addition & 1 deletion zenohpico.pc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ prefix=/usr/local
Name: zenohpico
Description:
URL:
Version: 0.10.20230811dev
Version: 0.10.20230818dev
Cflags: -I${prefix}/
Libs: -L${prefix}/ -lzenohpico

0 comments on commit 0be6ad0

Please sign in to comment.