Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update encoding format on the wire #399

Merged
merged 8 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ if(CMAKE_BUILD_TYPE MATCHES "DEBUG")
elseif(MSVC)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /std:c11 /experimental:c11atomics")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /std:c++latest /experimental:c11atomics")
add_compile_options(/W4 /WX /Od)
add_compile_options(/W4 /WX /Od /wd4127)
elseif(CMAKE_SYSTEM_NAME MATCHES "Generic")
add_compile_options(-Wall -Wextra -Wno-unused-parameter -Wmissing-prototypes -pipe -g -O0)
endif()
Expand Down
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Z_FEATURE_SUBSCRIPTION?=1
Z_FEATURE_QUERY?=1
Z_FEATURE_QUERYABLE?=1
Z_FEATURE_ATTACHMENT?=1
Z_FEATURE_INTEREST?=1
Z_FEATURE_INTEREST?=0
Z_FEATURE_RAWETH_TRANSPORT?=0

# zenoh-pico/ directory
Expand Down
6 changes: 4 additions & 2 deletions include/zenoh-pico/protocol/codec/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
}

/*------------------ Internal Zenoh-net Macros ------------------*/
int8_t _z_encoding_prefix_encode(_z_wbuf_t *wbf, z_encoding_prefix_t en);
int8_t _z_encoding_prefix_decode(z_encoding_prefix_t *en, _z_zbuf_t *zbf);
int8_t _z_consolidation_mode_encode(_z_wbuf_t *wbf, z_consolidation_mode_t en);
int8_t _z_consolidation_mode_decode(z_consolidation_mode_t *en, _z_zbuf_t *zbf);
int8_t _z_query_target_encode(_z_wbuf_t *wbf, z_query_target_t en);
Expand Down Expand Up @@ -76,6 +74,10 @@ int8_t _z_zbuf_read_exact(_z_zbuf_t *zbf, uint8_t *dest, size_t length);
int8_t _z_str_encode(_z_wbuf_t *buf, const char *s);
int8_t _z_str_decode(char **str, _z_zbuf_t *buf);

size_t _z_encoding_len(const _z_encoding_t *en);
int8_t _z_encoding_encode(_z_wbuf_t *wbf, const _z_encoding_t *en);
int8_t _z_encoding_decode(_z_encoding_t *en, _z_zbuf_t *zbf);

int8_t _z_keyexpr_encode(_z_wbuf_t *buf, _Bool has_suffix, const _z_keyexpr_t *ke);
int8_t _z_keyexpr_decode(_z_keyexpr_t *ke, _z_zbuf_t *buf, _Bool has_suffix);

Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ _z_id_t _z_id_empty(void);
* A zenoh encoding.
*/
typedef struct {
_z_bytes_t suffix;
z_encoding_prefix_t prefix;
_z_bytes_t schema;
uint16_t id;
} _z_encoding_t;

/**
Expand Down
5 changes: 2 additions & 3 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ int8_t zp_scouting_config_insert(z_scouting_config_t sc, uint8_t key, z_string_t

z_encoding_t z_encoding(z_encoding_prefix_t prefix, const char *suffix) {
return (_z_encoding_t){
.prefix = prefix,
.suffix = _z_bytes_wrap((const uint8_t *)suffix, (suffix == NULL) ? (size_t)0 : strlen(suffix))};
.id = prefix, .schema = _z_bytes_wrap((const uint8_t *)suffix, (suffix == NULL) ? (size_t)0 : strlen(suffix))};
}

z_encoding_t z_encoding_default(void) { return z_encoding(Z_ENCODING_PREFIX_DEFAULT, NULL); }
Expand Down Expand Up @@ -948,7 +947,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui
._is_alloc = false,
.len = payload_len,
},
.encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}};
.encoding = {.id = opts.encoding.id, .schema = opts.encoding.schema}};
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT);
return _Z_ERR_GENERIC;
}
Expand Down
12 changes: 6 additions & 6 deletions src/net/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ void _z_sample_move(_z_sample_t *dst, _z_sample_t *src) {

_z_bytes_move(&dst->payload, &src->payload);

dst->encoding.prefix = src->encoding.prefix; // FIXME: call the z_encoding_move
_z_bytes_move(&dst->encoding.suffix, &src->encoding.suffix); // FIXME: call the z_encoding_move
dst->encoding.id = src->encoding.id; // FIXME: call the z_encoding_move
_z_bytes_move(&dst->encoding.schema, &src->encoding.schema); // FIXME: call the z_encoding_move

dst->timestamp.time = src->timestamp.time; // FIXME: call the z_timestamp_move
dst->timestamp.id = src->timestamp.id; // FIXME: call the z_timestamp_move
Expand All @@ -34,7 +34,7 @@ void _z_sample_move(_z_sample_t *dst, _z_sample_t *src) {
void _z_sample_clear(_z_sample_t *sample) {
_z_keyexpr_clear(&sample->keyexpr);
_z_bytes_clear(&sample->payload);
_z_bytes_clear(&sample->encoding.suffix); // FIXME: call the z_encoding_clear
_z_bytes_clear(&sample->encoding.schema); // FIXME: call the z_encoding_clear
_z_timestamp_clear(&sample->timestamp);
}

Expand All @@ -55,8 +55,8 @@ void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src) {
dst->timestamp = _z_timestamp_duplicate(&src->timestamp);

// TODO(sashacmc): should be changed after encoding rework
dst->encoding.prefix = src->encoding.prefix;
_z_bytes_copy(&dst->encoding.suffix, &src->encoding.suffix);
dst->encoding.id = src->encoding.id;
_z_bytes_copy(&dst->encoding.schema, &src->encoding.schema);

dst->kind = src->kind;
#if Z_FEATURE_ATTACHMENT == 1
Expand Down Expand Up @@ -104,7 +104,7 @@ void _z_reply_data_free(_z_reply_data_t **reply_data) {
}

void _z_value_clear(_z_value_t *value) {
_z_bytes_clear(&value->encoding.suffix);
_z_bytes_clear(&value->encoding.schema);
_z_bytes_clear(&value->payload);
}

Expand Down
50 changes: 38 additions & 12 deletions src/protocol/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,6 @@
#include "zenoh-pico/utils/result.h"

/*------------------ uint8 -------------------*/
int8_t _z_encoding_prefix_encode(_z_wbuf_t *wbf, z_encoding_prefix_t en) { return _z_zsize_encode(wbf, en); }

int8_t _z_encoding_prefix_decode(z_encoding_prefix_t *en, _z_zbuf_t *zbf) {
int8_t ret = _Z_RES_OK;

_z_zint_t tmp;
ret |= _z_zsize_decode(&tmp, zbf);
*en = tmp;

return ret;
}

int8_t _z_consolidation_mode_encode(_z_wbuf_t *wbf, z_consolidation_mode_t en) { return _z_zsize_encode(wbf, en); }

int8_t _z_consolidation_mode_decode(z_consolidation_mode_t *en, _z_zbuf_t *zbf) {
Expand Down Expand Up @@ -298,3 +286,41 @@ int8_t _z_str_decode(char **str, _z_zbuf_t *zbf) {

return ret;
}

/*------------------ encoding ------------------*/
#define _Z_ENCODING_FLAG_S 0x01

size_t _z_encoding_len(const _z_encoding_t *en) {
size_t en_len = _z_zint_len((uint32_t)(en->id) << 1);
if (_z_bytes_check(en->schema)) {
en_len += _z_bytes_encode_len(&en->schema);
}
return en_len;
}

int8_t _z_encoding_encode(_z_wbuf_t *wbf, const _z_encoding_t *en) {
_Bool has_schema = _z_bytes_check(en->schema);
uint32_t id = (uint32_t)(en->id) << 1;
if (has_schema) {
id |= _Z_ENCODING_FLAG_S;
}
_Z_RETURN_IF_ERR(_z_zint32_encode(wbf, id));
if (has_schema) {
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &en->schema));
}
return _Z_RES_OK;
}

int8_t _z_encoding_decode(_z_encoding_t *en, _z_zbuf_t *zbf) {
uint32_t id = 0;
_Bool has_schema = false;
_Z_RETURN_IF_ERR(_z_zint32_decode(&id, zbf));
if ((id & _Z_ENCODING_FLAG_S) != 0) {
has_schema = true;
}
en->id = (uint16_t)(id >> 1);
if (has_schema) {
_Z_RETURN_IF_ERR(_z_bytes_decode(&en->schema, zbf));
}
return _Z_RES_OK;
}
30 changes: 11 additions & 19 deletions src/protocol/codec/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) {
if (has_timestamp) {
header |= _Z_FLAG_Z_P_T;
}
has_encoding = pshb->_body._put._encoding.prefix != Z_ENCODING_PREFIX_EMPTY ||
!_z_bytes_is_empty(&pshb->_body._put._encoding.suffix);
has_encoding = pshb->_body._put._encoding.id != Z_ENCODING_PREFIX_EMPTY ||
!_z_bytes_is_empty(&pshb->_body._put._encoding.schema);
if (has_encoding) {
header |= _Z_FLAG_Z_P_E;
}
Expand All @@ -295,8 +295,7 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) {
}

if (has_encoding) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, pshb->_body._put._encoding.prefix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &pshb->_body._put._encoding.suffix));
_Z_RETURN_IF_ERR(_z_encoding_encode(wbf, &pshb->_body._put._encoding));
}

if (has_source_info) {
Expand Down Expand Up @@ -352,8 +351,7 @@ int8_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t header)
_Z_RETURN_IF_ERR(_z_timestamp_decode(&pshb->_body._put._commons._timestamp, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_P_E)) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_decode(&pshb->_body._put._encoding.prefix, zbf));
_Z_RETURN_IF_ERR(_z_bytes_decode(&pshb->_body._put._encoding.suffix, zbf));
_Z_RETURN_IF_ERR(_z_encoding_decode(&pshb->_body._put._encoding, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_push_body_decode_extensions, pshb));
Expand Down Expand Up @@ -437,11 +435,9 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) {
extheader |= _Z_FLAG_Z_Z;
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, _z_zint_len(msg->_ext_value.encoding.prefix) +
_z_bytes_encode_len(&msg->_ext_value.encoding.suffix) +
msg->_ext_value.payload.len));
_Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, msg->_ext_value.encoding.prefix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_ext_value.encoding.suffix));
_Z_RETURN_IF_ERR(
_z_zsize_encode(wbf, _z_encoding_len(&msg->_ext_value.encoding) + msg->_ext_value.payload.len));
_Z_RETURN_IF_ERR(_z_encoding_encode(wbf, &msg->_ext_value.encoding));
_Z_RETURN_IF_ERR(_z_bytes_val_encode(wbf, &msg->_ext_value.payload));
}
if (required_exts.info) {
Expand Down Expand Up @@ -474,8 +470,7 @@ int8_t _z_query_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
}
case _Z_MSG_EXT_ENC_ZBUF | 0x03: { // Payload
_z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val);
ret = _z_encoding_prefix_decode(&msg->_ext_value.encoding.prefix, &zbf);
ret |= _z_bytes_decode(&msg->_ext_value.encoding.suffix, &zbf);
ret = _z_encoding_decode(&msg->_ext_value.encoding, &zbf);
_z_bytes_t bytes = _z_bytes_wrap((uint8_t *)_z_zbuf_start(&zbf), _z_zbuf_len(&zbf));
_z_bytes_copy(&msg->_ext_value.payload, &bytes);
break;
Expand Down Expand Up @@ -533,7 +528,6 @@ int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) {
}
int8_t _z_reply_decode_extension(_z_msg_ext_t *extension, void *ctx) {
int8_t ret = _Z_RES_OK;
_z_msg_reply_t *reply = (_z_msg_reply_t *)ctx;
switch (_Z_EXT_FULL_ID(extension->_header)) {
default:
ret = _z_msg_ext_unknown_error(extension, 0x0a);
Expand Down Expand Up @@ -562,7 +556,7 @@ int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) {
uint8_t header = _Z_MID_Z_ERR;

// Encode header
_Bool has_encoding = err->encoding.prefix != Z_ENCODING_PREFIX_EMPTY;
_Bool has_encoding = err->encoding.id != Z_ENCODING_PREFIX_EMPTY;
if (has_encoding) {
_Z_SET_FLAG(header, _Z_FLAG_Z_E_E);
}
Expand All @@ -574,8 +568,7 @@ int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
// Encode encoding
if (has_encoding) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, err->encoding.prefix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &err->encoding.suffix));
_Z_RETURN_IF_ERR(_z_encoding_encode(wbf, &err->encoding));
}
// Encode extensions
if (has_sinfo_ext) {
Expand Down Expand Up @@ -607,8 +600,7 @@ int8_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header) {
*err = (_z_msg_err_t){0};

if (_Z_HAS_FLAG(header, _Z_FLAG_Z_E_E)) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_decode(&err->encoding.prefix, zbf));
_Z_RETURN_IF_ERR(_z_bytes_decode(&err->encoding.suffix, zbf));
_Z_RETURN_IF_ERR(_z_encoding_decode(&err->encoding, zbf));
}
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_err_decode_extension, err));
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ _z_value_t _z_value_steal(_z_value_t *value) {
return ret;
}
void _z_value_copy(_z_value_t *dst, const _z_value_t *src) {
dst->encoding.prefix = src->encoding.prefix;
_z_bytes_copy(&dst->encoding.suffix, &src->encoding.suffix);
dst->encoding.id = src->encoding.id;
_z_bytes_copy(&dst->encoding.schema, &src->encoding.schema);
_z_bytes_copy(&dst->payload, &src->payload);
}

Expand Down
8 changes: 4 additions & 4 deletions src/protocol/definitions/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body); }

void _z_msg_put_clear(_z_msg_put_t *msg) {
_z_bytes_clear(&msg->_encoding.suffix);
_z_bytes_clear(&msg->_encoding.schema);
_z_bytes_clear(&msg->_payload);
_z_timestamp_clear(&msg->_commons._timestamp);
}
Expand All @@ -33,8 +33,8 @@ _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *ms
z_attachment_t att = _z_encoded_as_attachment(&msg->_ext_attachment);
#endif
return (_z_msg_query_reqexts_t) {
.body = msg->_ext_value.payload.start != NULL || msg->_ext_value.encoding.prefix != 0 ||
!_z_bytes_is_empty(&msg->_ext_value.encoding.suffix),
.body = msg->_ext_value.payload.start != NULL || msg->_ext_value.encoding.id != 0 ||
!_z_bytes_is_empty(&msg->_ext_value.encoding.schema),
.info = _z_id_check(msg->_ext_info._id) || msg->_ext_info._entity_id != 0 || msg->_ext_info._source_sn != 0,
#if Z_FEATURE_ATTACHMENT == 1
.attachment = z_attachment_check(&att)
Expand All @@ -49,6 +49,6 @@ void _z_msg_query_clear(_z_msg_query_t *msg) {
_z_value_clear(&msg->_ext_value);
}
void _z_msg_err_clear(_z_msg_err_t *err) {
_z_bytes_clear(&err->encoding.suffix);
_z_bytes_clear(&err->encoding.schema);
_z_bytes_clear(&err->_payload);
}
4 changes: 2 additions & 2 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
reply.data.replier_id = zn->_local_zid;
reply.data.sample.keyexpr = expanded_ke;
_z_bytes_copy(&reply.data.sample.payload, &msg->_payload);
reply.data.sample.encoding.prefix = msg->_encoding.prefix;
_z_bytes_copy(&reply.data.sample.encoding.suffix, &msg->_encoding.suffix);
reply.data.sample.encoding.id = msg->_encoding.id;
_z_bytes_copy(&reply.data.sample.encoding.schema, &msg->_encoding.schema);
reply.data.sample.kind = Z_SAMPLE_KIND_PUT;
reply.data.sample.timestamp = _z_timestamp_duplicate(&msg->_commons._timestamp);

Expand Down
2 changes: 1 addition & 1 deletion src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr
z_attachment_t att
#endif
) {
_z_encoding_t encoding = {.prefix = Z_ENCODING_PREFIX_DEFAULT, .suffix = _z_bytes_wrap(NULL, 0)};
_z_encoding_t encoding = {.id = Z_ENCODING_PREFIX_DEFAULT, .schema = _z_bytes_wrap(NULL, 0)};
int8_t ret = _z_trigger_subscriptions(zn, keyexpr, _z_bytes_wrap(payload, payload_len), encoding, Z_SAMPLE_KIND_PUT,
_z_timestamp_null(), qos
#if Z_FEATURE_ATTACHMENT == 1
Expand Down
Loading
Loading