Skip to content

Commit

Permalink
allow specifying default encoding for publisher;
Browse files Browse the repository at this point in the history
clenaup of put flow;
  • Loading branch information
DenisBiryukov91 committed Aug 1, 2024
1 parent 2c7929f commit 41c3986
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 50 deletions.
7 changes: 3 additions & 4 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,14 @@ typedef struct {
* Represents the configuration used to configure a publisher upon declaration with :c:func:`z_declare_publisher`.
*
* Members:
* z_owned_encoding_t *encoding: Default encoding for messages put by this publisher.
* z_congestion_control_t congestion_control: The congestion control to apply when routing messages from this
* publisher.
* z_priority_t priority: The priority of messages issued by this publisher.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
*/
typedef struct {
z_owned_encoding_t *encoding;
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
Expand Down Expand Up @@ -303,6 +305,7 @@ typedef struct {
z_timestamp_t *timestamp;
_Bool is_express;
z_owned_bytes_t *attachment;

} z_put_options_t;

/**
Expand All @@ -327,13 +330,11 @@ typedef struct {
*
* Members:
* z_owned_encoding_t *encoding: The encoding of the payload.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* z_owned_bytes_t *attachment: An optional attachment to the publication.
*/
typedef struct {
z_owned_encoding_t *encoding;
_Bool is_express;
z_timestamp_t *timestamp;
z_owned_bytes_t *attachment;
} z_publisher_put_options_t;
Expand All @@ -343,11 +344,9 @@ typedef struct {
* sent via :c:func:`z_publisher_delete`.
*
* Members:
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
*/
typedef struct {
_Bool is_express;
z_timestamp_t *timestamp;
} z_publisher_delete_options_t;

Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid);
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to publish. The callee gets the ownership
* of any allocated value.
* encoding: The optional default encoding to use during put. The callee gets the ownership.
*
* Returns:
* The created :c:type:`_z_publisher_t` (in null state if the declaration failed)..
*/
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express);

/**
Expand Down Expand Up @@ -120,7 +121,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t encoding,
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t *encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
#endif
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct _z_publisher_t {
_z_keyexpr_t _key;
_z_zint_t _id;
_z_session_rc_t _zn;
_z_encoding_t _encoding;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
_Bool _is_express;
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

/*------------------ Subscription ------------------*/
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_n_qos_t qos, const _z_bytes_t attachment);
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id);
Expand Down
61 changes: 30 additions & 31 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1073,15 +1073,15 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_
}

_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);

ret = _z_write(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority,
opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
ret = _z_write(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(payload), &opt.encoding->_val,
Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority, opt.is_express, opt.timestamp,
_z_bytes_from_owned_bytes(opt.attachment));

// Trigger local subscriptions
_z_trigger_local_subscriptions(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(payload),
_z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority),
_z_bytes_from_owned_bytes(opt.attachment));
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(payload), &opt.encoding->_val,
_z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority),
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
// Clean-up
z_encoding_drop(opt.encoding);
z_bytes_drop(opt.attachment);
Expand All @@ -1100,13 +1100,14 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
}
ret = _z_write(_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
opt.congestion_control, opt.priority, opt.is_express, opt.timestamp, _z_bytes_null());
ret = _z_write(_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, opt.congestion_control,
opt.priority, opt.is_express, opt.timestamp, _z_bytes_null());

return ret;
}

void z_publisher_options_default(z_publisher_options_t *options) {
options->encoding = NULL;
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->is_express = false;
Expand All @@ -1132,12 +1133,11 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z
z_publisher_options_t opt;
z_publisher_options_default(&opt);
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt = *options;
}
// Set publisher
_z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.congestion_control, opt.priority, opt.is_express);
_z_publisher_t int_pub =
_z_declare_publisher(zs, key, &opt.encoding->_val, opt.congestion_control, opt.priority, opt.is_express);
// Create write filter
int8_t res = _z_write_filter_create(&int_pub);
if (res != _Z_RES_OK) {
Expand All @@ -1155,14 +1155,10 @@ int8_t z_undeclare_publisher(z_owned_publisher_t *pub) { return _z_undeclare_and
void z_publisher_put_options_default(z_publisher_put_options_t *options) {
options->encoding = NULL;
options->attachment = NULL;
options->is_express = false;
options->timestamp = NULL;
}

void z_publisher_delete_options_default(z_publisher_delete_options_t *options) {
options->is_express = false;
options->timestamp = NULL;
}
void z_publisher_delete_options_default(z_publisher_delete_options_t *options) { options->timestamp = NULL; }

int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload,
const z_publisher_put_options_t *options) {
Expand All @@ -1172,24 +1168,30 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload
z_publisher_put_options_default(&opt);
if (options != NULL) {
opt.encoding = options->encoding;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
}
_z_encoding_t encoding;
if (opt.encoding == NULL) {
_Z_RETURN_IF_ERR(_z_encoding_copy(&encoding, &pub->_encoding));
} else {
opt.is_express = pub->_is_express;
encoding = _z_encoding_steal(&opt.encoding->_val);
}

// Check if write filter is active before writing
if (!_z_write_filter_active(pub)) {
// Write value
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, pub->_congestion_control,
pub->_priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload), &encoding,
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(opt.attachment));
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload),
_Z_N_QOS_DEFAULT, _z_bytes_from_owned_bytes(opt.attachment));
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload), &encoding,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
// Clean-up
z_encoding_drop(opt.encoding);
_z_encoding_clear(&encoding);
z_bytes_drop(opt.attachment);
z_bytes_drop(payload);
return ret;
Expand All @@ -1200,13 +1202,10 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
z_publisher_delete_options_t opt;
z_publisher_delete_options_default(&opt);
if (options != NULL) {
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
} else {
opt.is_express = pub->_is_express;
}
return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, opt.is_express, opt.timestamp, _z_bytes_null());
return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null());
}

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
Expand Down
7 changes: 4 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) {

#if Z_FEATURE_PUBLICATION == 1
/*------------------ Publisher Declaration ------------------*/
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
z_congestion_control_t congestion_control, z_priority_t priority,
_Bool is_express) {
// Allocate publisher
Expand All @@ -114,6 +114,7 @@ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keye
ret._priority = priority;
ret._is_express = is_express;
ret._zn = _z_session_rc_clone(zn);
ret._encoding = encoding == NULL ? _z_encoding_null() : _z_encoding_steal(encoding);
return ret;
}

Expand All @@ -129,7 +130,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) {
}

/*------------------ Write ------------------*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding,
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t *encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;
Expand All @@ -149,7 +150,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
._commons = {._timestamp = ((timestamp != NULL) ? *timestamp : _z_timestamp_null()),
._source_info = _z_source_info_null()},
._payload = payload,
._encoding = encoding,
._encoding = encoding == NULL ? _z_encoding_null() : *encoding,
._attachment = attachment,
},
},
Expand Down
8 changes: 6 additions & 2 deletions src/net/sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const
_z_sample_t s = _z_sample_null();
s.keyexpr = _z_keyexpr_steal(key);
s.kind = kind;
s.timestamp = _z_timestamp_duplicate(timestamp);
if (timestamp != NULL) {
s.timestamp = _z_timestamp_duplicate(timestamp);
}
s.qos = qos;
_z_bytes_copy(&s.payload, &payload);
_z_bytes_copy(&s.attachment, &attachment);
_z_encoding_move(&s.encoding, encoding);
if (encoding != NULL) {
_z_encoding_move(&s.encoding, encoding);
}
return s;
}
#else
Expand Down
10 changes: 5 additions & 5 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,10 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
}

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_n_qos_t qos, const _z_bytes_t attachment) {
_z_encoding_t encoding = _z_encoding_null();
_z_timestamp_t timestamp = _z_timestamp_null();
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment) {
int8_t ret =
_z_trigger_subscriptions(zn, keyexpr, payload, &encoding, Z_SAMPLE_KIND_PUT, &timestamp, qos, attachment);
_z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, timestamp, qos, attachment);
(void)ret;
}

Expand Down Expand Up @@ -208,11 +207,12 @@ void _z_flush_subscriptions(_z_session_t *zn) {
#else // Z_FEATURE_SUBSCRIPTION == 0

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_n_qos_t qos, const _z_bytes_t attachment) {
_z_n_qos_t qos, const _z_timestampt_t *timestamp, const _z_bytes_t attachment) {
_ZP_UNUSED(zn);
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(payload);
_ZP_UNUSED(qos);
_ZP_UNUSED(timestamp);
_ZP_UNUSED(attachment);
}

Expand Down
19 changes: 18 additions & 1 deletion tests/z_api_alignment_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ void data_handler(const z_loaned_sample_t *sample, void *arg) {
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &k_str);
(void)arg;
assert(z_check(k_str));
const char *encoding_expected =
z_sample_kind(sample) == Z_SAMPLE_KIND_PUT ? "zenoh/bytes;test_encoding" : "zenoh/bytes";
z_owned_string_t encoding;
z_encoding_to_string(z_sample_encoding(sample), &encoding);
printf("%s\n", z_string_data(z_loan(encoding)));
assert(strncmp(encoding_expected, z_string_data(z_loan(encoding)), strlen(encoding_expected)) == 0);
}

int main(int argc, char **argv) {
Expand All @@ -122,9 +128,13 @@ int main(int argc, char **argv) {

#ifdef ZENOH_C
zc_init_logger();
const char *encoding_expected =
z_sample_kind(sample) == Z_SAMPLE_KIND_PUT ? "zenoh/bytes" : "zenoh/bytes;test_encoding";
z_pu
#endif

z_view_keyexpr_t key_demo_example, key_demo_example_a, key_demo_example_starstar;
z_view_keyexpr_t key_demo_example,
key_demo_example_a, key_demo_example_starstar;
z_view_keyexpr_from_str(&key_demo_example, "demo/example");
z_view_keyexpr_from_str(&key_demo_example_a, "demo/example/a");
z_view_keyexpr_from_str(&key_demo_example_starstar, "demo/example/**");
Expand Down Expand Up @@ -296,6 +306,9 @@ int main(int argc, char **argv) {
printf("Session Put...");
z_put_options_t _ret_put_opt;
z_put_options_default(&_ret_put_opt);
z_owned_encoding_t encoding;
z_encoding_from_str(&encoding, "test_encoding");
_ret_put_opt.encoding = z_move(encoding);
_ret_put_opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK;

// Create payload
Expand All @@ -304,6 +317,7 @@ int main(int argc, char **argv) {

_ret_int8 = z_put(z_loan(s1), z_loan(_ret_expr), z_move(payload), &_ret_put_opt);
assert_eq(_ret_int8, 0);
assert(!z_check(encoding));
printf("Ok\n");

z_sleep_s(SLEEP);
Expand All @@ -330,10 +344,13 @@ int main(int argc, char **argv) {
printf("Declaring Publisher...");
z_publisher_options_t _ret_pub_opt;
z_publisher_options_default(&_ret_pub_opt);
z_encoding_from_str(&encoding, "test_encoding");
_ret_pub_opt.encoding = z_move(encoding);
_ret_pub_opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
z_owned_publisher_t _ret_pub;
_ret_int8 = z_declare_publisher(&_ret_pub, z_loan(s1), z_loan(s1_key), &_ret_pub_opt);
assert(_ret_int8 == _Z_RES_OK);
assert(!z_check(encoding));
printf("Ok\n");

z_sleep_s(SLEEP);
Expand Down
2 changes: 1 addition & 1 deletion zenohpico.pc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ prefix=/usr/local
Name: zenohpico
Description:
URL:
Version: 1.0.20240730dev
Version: 1.0.20240801dev
Cflags: -I${prefix}/include
Libs: -L${prefix}/lib -lzenohpico

0 comments on commit 41c3986

Please sign in to comment.