Skip to content

Commit

Permalink
Update encoding format on the wire (#399)
Browse files Browse the repository at this point in the history
* feat: update encoding wire change

* refactor: rename codec test gen_uint to gen_uint32

* test: update codec test for encoding

* fix: add encoding_len codec function

* fix: deactivating interests temporarily

* build: suppress msvc warning c4127

* fix: remove unused variable warning

* fix: rename encoding fields
  • Loading branch information
jean-roland authored Apr 11, 2024
1 parent 72c7a50 commit aa883f4
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 69 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ if(CMAKE_BUILD_TYPE MATCHES "DEBUG")
elseif(MSVC)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /std:c11 /experimental:c11atomics")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /std:c++latest /experimental:c11atomics")
add_compile_options(/W4 /WX /Od)
add_compile_options(/W4 /WX /Od /wd4127)
elseif(CMAKE_SYSTEM_NAME MATCHES "Generic")
add_compile_options(-Wall -Wextra -Wno-unused-parameter -Wmissing-prototypes -pipe -g -O0)
endif()
Expand Down
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Z_FEATURE_SUBSCRIPTION?=1
Z_FEATURE_QUERY?=1
Z_FEATURE_QUERYABLE?=1
Z_FEATURE_ATTACHMENT?=1
Z_FEATURE_INTEREST?=1
Z_FEATURE_INTEREST?=0
Z_FEATURE_RAWETH_TRANSPORT?=0

# zenoh-pico/ directory
Expand Down
6 changes: 4 additions & 2 deletions include/zenoh-pico/protocol/codec/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
}

/*------------------ Internal Zenoh-net Macros ------------------*/
int8_t _z_encoding_prefix_encode(_z_wbuf_t *wbf, z_encoding_prefix_t en);
int8_t _z_encoding_prefix_decode(z_encoding_prefix_t *en, _z_zbuf_t *zbf);
int8_t _z_consolidation_mode_encode(_z_wbuf_t *wbf, z_consolidation_mode_t en);
int8_t _z_consolidation_mode_decode(z_consolidation_mode_t *en, _z_zbuf_t *zbf);
int8_t _z_query_target_encode(_z_wbuf_t *wbf, z_query_target_t en);
Expand Down Expand Up @@ -76,6 +74,10 @@ int8_t _z_zbuf_read_exact(_z_zbuf_t *zbf, uint8_t *dest, size_t length);
int8_t _z_str_encode(_z_wbuf_t *buf, const char *s);
int8_t _z_str_decode(char **str, _z_zbuf_t *buf);

size_t _z_encoding_len(const _z_encoding_t *en);
int8_t _z_encoding_encode(_z_wbuf_t *wbf, const _z_encoding_t *en);
int8_t _z_encoding_decode(_z_encoding_t *en, _z_zbuf_t *zbf);

int8_t _z_keyexpr_encode(_z_wbuf_t *buf, _Bool has_suffix, const _z_keyexpr_t *ke);
int8_t _z_keyexpr_decode(_z_keyexpr_t *ke, _z_zbuf_t *buf, _Bool has_suffix);

Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ _z_id_t _z_id_empty(void);
* A zenoh encoding.
*/
typedef struct {
_z_bytes_t suffix;
z_encoding_prefix_t prefix;
_z_bytes_t schema;
uint16_t id;
} _z_encoding_t;

/**
Expand Down
5 changes: 2 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ int8_t zp_scouting_config_insert(z_scouting_config_t sc, uint8_t key, z_string_t

z_encoding_t z_encoding(z_encoding_prefix_t prefix, const char *suffix) {
return (_z_encoding_t){
.prefix = prefix,
.suffix = _z_bytes_wrap((const uint8_t *)suffix, (suffix == NULL) ? (size_t)0 : strlen(suffix))};
.id = prefix, .schema = _z_bytes_wrap((const uint8_t *)suffix, (suffix == NULL) ? (size_t)0 : strlen(suffix))};
}

z_encoding_t z_encoding_default(void) { return z_encoding(Z_ENCODING_PREFIX_DEFAULT, NULL); }
Expand Down Expand Up @@ -948,7 +947,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui
._is_alloc = false,
.len = payload_len,
},
.encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}};
.encoding = {.id = opts.encoding.id, .schema = opts.encoding.schema}};
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT);
return _Z_ERR_GENERIC;
}
Expand Down
12 changes: 6 additions & 6 deletions src/net/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ void _z_sample_move(_z_sample_t *dst, _z_sample_t *src) {

_z_bytes_move(&dst->payload, &src->payload);

dst->encoding.prefix = src->encoding.prefix; // FIXME: call the z_encoding_move
_z_bytes_move(&dst->encoding.suffix, &src->encoding.suffix); // FIXME: call the z_encoding_move
dst->encoding.id = src->encoding.id; // FIXME: call the z_encoding_move
_z_bytes_move(&dst->encoding.schema, &src->encoding.schema); // FIXME: call the z_encoding_move

dst->timestamp.time = src->timestamp.time; // FIXME: call the z_timestamp_move
dst->timestamp.id = src->timestamp.id; // FIXME: call the z_timestamp_move
Expand All @@ -34,7 +34,7 @@ void _z_sample_move(_z_sample_t *dst, _z_sample_t *src) {
void _z_sample_clear(_z_sample_t *sample) {
_z_keyexpr_clear(&sample->keyexpr);
_z_bytes_clear(&sample->payload);
_z_bytes_clear(&sample->encoding.suffix); // FIXME: call the z_encoding_clear
_z_bytes_clear(&sample->encoding.schema); // FIXME: call the z_encoding_clear
_z_timestamp_clear(&sample->timestamp);
}

Expand All @@ -55,8 +55,8 @@ void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src) {
dst->timestamp = _z_timestamp_duplicate(&src->timestamp);

// TODO(sashacmc): should be changed after encoding rework
dst->encoding.prefix = src->encoding.prefix;
_z_bytes_copy(&dst->encoding.suffix, &src->encoding.suffix);
dst->encoding.id = src->encoding.id;
_z_bytes_copy(&dst->encoding.schema, &src->encoding.schema);

dst->kind = src->kind;
#if Z_FEATURE_ATTACHMENT == 1
Expand Down Expand Up @@ -104,7 +104,7 @@ void _z_reply_data_free(_z_reply_data_t **reply_data) {
}

void _z_value_clear(_z_value_t *value) {
_z_bytes_clear(&value->encoding.suffix);
_z_bytes_clear(&value->encoding.schema);
_z_bytes_clear(&value->payload);
}

Expand Down
50 changes: 38 additions & 12 deletions src/protocol/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,6 @@
#include "zenoh-pico/utils/result.h"

/*------------------ uint8 -------------------*/
int8_t _z_encoding_prefix_encode(_z_wbuf_t *wbf, z_encoding_prefix_t en) { return _z_zsize_encode(wbf, en); }

int8_t _z_encoding_prefix_decode(z_encoding_prefix_t *en, _z_zbuf_t *zbf) {
int8_t ret = _Z_RES_OK;

_z_zint_t tmp;
ret |= _z_zsize_decode(&tmp, zbf);
*en = tmp;

return ret;
}

int8_t _z_consolidation_mode_encode(_z_wbuf_t *wbf, z_consolidation_mode_t en) { return _z_zsize_encode(wbf, en); }

int8_t _z_consolidation_mode_decode(z_consolidation_mode_t *en, _z_zbuf_t *zbf) {
Expand Down Expand Up @@ -298,3 +286,41 @@ int8_t _z_str_decode(char **str, _z_zbuf_t *zbf) {

return ret;
}

/*------------------ encoding ------------------*/
#define _Z_ENCODING_FLAG_S 0x01

size_t _z_encoding_len(const _z_encoding_t *en) {
size_t en_len = _z_zint_len((uint32_t)(en->id) << 1);
if (_z_bytes_check(en->schema)) {
en_len += _z_bytes_encode_len(&en->schema);
}
return en_len;
}

int8_t _z_encoding_encode(_z_wbuf_t *wbf, const _z_encoding_t *en) {
_Bool has_schema = _z_bytes_check(en->schema);
uint32_t id = (uint32_t)(en->id) << 1;
if (has_schema) {
id |= _Z_ENCODING_FLAG_S;
}
_Z_RETURN_IF_ERR(_z_zint32_encode(wbf, id));
if (has_schema) {
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &en->schema));
}
return _Z_RES_OK;
}

int8_t _z_encoding_decode(_z_encoding_t *en, _z_zbuf_t *zbf) {
uint32_t id = 0;
_Bool has_schema = false;
_Z_RETURN_IF_ERR(_z_zint32_decode(&id, zbf));
if ((id & _Z_ENCODING_FLAG_S) != 0) {
has_schema = true;
}
en->id = (uint16_t)(id >> 1);
if (has_schema) {
_Z_RETURN_IF_ERR(_z_bytes_decode(&en->schema, zbf));
}
return _Z_RES_OK;
}
30 changes: 11 additions & 19 deletions src/protocol/codec/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) {
if (has_timestamp) {
header |= _Z_FLAG_Z_P_T;
}
has_encoding = pshb->_body._put._encoding.prefix != Z_ENCODING_PREFIX_EMPTY ||
!_z_bytes_is_empty(&pshb->_body._put._encoding.suffix);
has_encoding = pshb->_body._put._encoding.id != Z_ENCODING_PREFIX_EMPTY ||
!_z_bytes_is_empty(&pshb->_body._put._encoding.schema);
if (has_encoding) {
header |= _Z_FLAG_Z_P_E;
}
Expand All @@ -295,8 +295,7 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) {
}

if (has_encoding) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, pshb->_body._put._encoding.prefix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &pshb->_body._put._encoding.suffix));
_Z_RETURN_IF_ERR(_z_encoding_encode(wbf, &pshb->_body._put._encoding));
}

if (has_source_info) {
Expand Down Expand Up @@ -352,8 +351,7 @@ int8_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t header)
_Z_RETURN_IF_ERR(_z_timestamp_decode(&pshb->_body._put._commons._timestamp, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_P_E)) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_decode(&pshb->_body._put._encoding.prefix, zbf));
_Z_RETURN_IF_ERR(_z_bytes_decode(&pshb->_body._put._encoding.suffix, zbf));
_Z_RETURN_IF_ERR(_z_encoding_decode(&pshb->_body._put._encoding, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_push_body_decode_extensions, pshb));
Expand Down Expand Up @@ -437,11 +435,9 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) {
extheader |= _Z_FLAG_Z_Z;
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, _z_zint_len(msg->_ext_value.encoding.prefix) +
_z_bytes_encode_len(&msg->_ext_value.encoding.suffix) +
msg->_ext_value.payload.len));
_Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, msg->_ext_value.encoding.prefix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_ext_value.encoding.suffix));
_Z_RETURN_IF_ERR(
_z_zsize_encode(wbf, _z_encoding_len(&msg->_ext_value.encoding) + msg->_ext_value.payload.len));
_Z_RETURN_IF_ERR(_z_encoding_encode(wbf, &msg->_ext_value.encoding));
_Z_RETURN_IF_ERR(_z_bytes_val_encode(wbf, &msg->_ext_value.payload));
}
if (required_exts.info) {
Expand Down Expand Up @@ -474,8 +470,7 @@ int8_t _z_query_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
}
case _Z_MSG_EXT_ENC_ZBUF | 0x03: { // Payload
_z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val);
ret = _z_encoding_prefix_decode(&msg->_ext_value.encoding.prefix, &zbf);
ret |= _z_bytes_decode(&msg->_ext_value.encoding.suffix, &zbf);
ret = _z_encoding_decode(&msg->_ext_value.encoding, &zbf);
_z_bytes_t bytes = _z_bytes_wrap((uint8_t *)_z_zbuf_start(&zbf), _z_zbuf_len(&zbf));
_z_bytes_copy(&msg->_ext_value.payload, &bytes);
break;
Expand Down Expand Up @@ -533,7 +528,6 @@ int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) {
}
int8_t _z_reply_decode_extension(_z_msg_ext_t *extension, void *ctx) {
int8_t ret = _Z_RES_OK;
_z_msg_reply_t *reply = (_z_msg_reply_t *)ctx;
switch (_Z_EXT_FULL_ID(extension->_header)) {
default:
ret = _z_msg_ext_unknown_error(extension, 0x0a);
Expand Down Expand Up @@ -562,7 +556,7 @@ int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) {
uint8_t header = _Z_MID_Z_ERR;

// Encode header
_Bool has_encoding = err->encoding.prefix != Z_ENCODING_PREFIX_EMPTY;
_Bool has_encoding = err->encoding.id != Z_ENCODING_PREFIX_EMPTY;
if (has_encoding) {
_Z_SET_FLAG(header, _Z_FLAG_Z_E_E);
}
Expand All @@ -574,8 +568,7 @@ int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
// Encode encoding
if (has_encoding) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, err->encoding.prefix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &err->encoding.suffix));
_Z_RETURN_IF_ERR(_z_encoding_encode(wbf, &err->encoding));
}
// Encode extensions
if (has_sinfo_ext) {
Expand Down Expand Up @@ -607,8 +600,7 @@ int8_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header) {
*err = (_z_msg_err_t){0};

if (_Z_HAS_FLAG(header, _Z_FLAG_Z_E_E)) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_decode(&err->encoding.prefix, zbf));
_Z_RETURN_IF_ERR(_z_bytes_decode(&err->encoding.suffix, zbf));
_Z_RETURN_IF_ERR(_z_encoding_decode(&err->encoding, zbf));
}
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_err_decode_extension, err));
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ _z_value_t _z_value_steal(_z_value_t *value) {
return ret;
}
void _z_value_copy(_z_value_t *dst, const _z_value_t *src) {
dst->encoding.prefix = src->encoding.prefix;
_z_bytes_copy(&dst->encoding.suffix, &src->encoding.suffix);
dst->encoding.id = src->encoding.id;
_z_bytes_copy(&dst->encoding.schema, &src->encoding.schema);
_z_bytes_copy(&dst->payload, &src->payload);
}

Expand Down
8 changes: 4 additions & 4 deletions src/protocol/definitions/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body); }

void _z_msg_put_clear(_z_msg_put_t *msg) {
_z_bytes_clear(&msg->_encoding.suffix);
_z_bytes_clear(&msg->_encoding.schema);
_z_bytes_clear(&msg->_payload);
_z_timestamp_clear(&msg->_commons._timestamp);
}
Expand All @@ -33,8 +33,8 @@ _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *ms
z_attachment_t att = _z_encoded_as_attachment(&msg->_ext_attachment);
#endif
return (_z_msg_query_reqexts_t) {
.body = msg->_ext_value.payload.start != NULL || msg->_ext_value.encoding.prefix != 0 ||
!_z_bytes_is_empty(&msg->_ext_value.encoding.suffix),
.body = msg->_ext_value.payload.start != NULL || msg->_ext_value.encoding.id != 0 ||
!_z_bytes_is_empty(&msg->_ext_value.encoding.schema),
.info = _z_id_check(msg->_ext_info._id) || msg->_ext_info._entity_id != 0 || msg->_ext_info._source_sn != 0,
#if Z_FEATURE_ATTACHMENT == 1
.attachment = z_attachment_check(&att)
Expand All @@ -49,6 +49,6 @@ void _z_msg_query_clear(_z_msg_query_t *msg) {
_z_value_clear(&msg->_ext_value);
}
void _z_msg_err_clear(_z_msg_err_t *err) {
_z_bytes_clear(&err->encoding.suffix);
_z_bytes_clear(&err->encoding.schema);
_z_bytes_clear(&err->_payload);
}
4 changes: 2 additions & 2 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
reply.data.replier_id = zn->_local_zid;
reply.data.sample.keyexpr = expanded_ke;
_z_bytes_copy(&reply.data.sample.payload, &msg->_payload);
reply.data.sample.encoding.prefix = msg->_encoding.prefix;
_z_bytes_copy(&reply.data.sample.encoding.suffix, &msg->_encoding.suffix);
reply.data.sample.encoding.id = msg->_encoding.id;
_z_bytes_copy(&reply.data.sample.encoding.schema, &msg->_encoding.schema);
reply.data.sample.kind = Z_SAMPLE_KIND_PUT;
reply.data.sample.timestamp = _z_timestamp_duplicate(&msg->_commons._timestamp);

Expand Down
2 changes: 1 addition & 1 deletion src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr
z_attachment_t att
#endif
) {
_z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)};
_z_encoding_t encoding = {.id = Z_ENCODING_PREFIX_DEFAULT, .schema = _z_bytes_wrap(NULL, 0)};
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT,
_z_timestamp_null(), qos
#if Z_FEATURE_ATTACHMENT == 1
Expand Down
Loading

0 comments on commit aa883f4

Please sign in to comment.