Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow specifying default encoding for publisher; #568

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,14 @@ typedef struct {
* Represents the configuration used to configure a publisher upon declaration with :c:func:`z_declare_publisher`.
*
* Members:
* z_owned_encoding_t *encoding: Default encoding for messages put by this publisher.
* z_congestion_control_t congestion_control: The congestion control to apply when routing messages from this
* publisher.
* z_priority_t priority: The priority of messages issued by this publisher.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
*/
typedef struct {
z_owned_encoding_t *encoding;
z_congestion_control_t congestion_control;
z_priority_t priority;
_Bool is_express;
Expand Down Expand Up @@ -303,6 +305,7 @@ typedef struct {
z_timestamp_t *timestamp;
_Bool is_express;
z_owned_bytes_t *attachment;

} z_put_options_t;

/**
Expand All @@ -327,13 +330,11 @@ typedef struct {
*
* Members:
* z_owned_encoding_t *encoding: The encoding of the payload.
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
* z_owned_bytes_t *attachment: An optional attachment to the publication.
*/
typedef struct {
z_owned_encoding_t *encoding;
_Bool is_express;
z_timestamp_t *timestamp;
z_owned_bytes_t *attachment;
} z_publisher_put_options_t;
Expand All @@ -343,11 +344,9 @@ typedef struct {
* sent via :c:func:`z_publisher_delete`.
*
* Members:
* _Bool is_express: If true, Zenoh will not wait to batch this operation with others to reduce the bandwidth.
* z_timestamp_t *timestamp: The API level timestamp (e.g. of the data when it was created).
*/
typedef struct {
_Bool is_express;
z_timestamp_t *timestamp;
} z_publisher_delete_options_t;

Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,12 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid);
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to publish. The callee gets the ownership
* of any allocated value.
* encoding: The optional default encoding to use during put. The callee gets the ownership.
*
* Returns:
* The created :c:type:`_z_publisher_t` (in null state if the declaration failed)..
*/
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
z_congestion_control_t congestion_control, z_priority_t priority, _Bool is_express);

/**
Expand Down Expand Up @@ -120,7 +121,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub);
* Returns:
* ``0`` in case of success, ``-1`` in case of failure.
*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t encoding,
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, _z_bytes_t payload, const _z_encoding_t *encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
#endif
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct _z_publisher_t {
_z_keyexpr_t _key;
_z_zint_t _id;
_z_session_rc_t _zn;
_z_encoding_t _encoding;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
_Bool _is_express;
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

/*------------------ Subscription ------------------*/
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_n_qos_t qos, const _z_bytes_t attachment);
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id);
Expand Down
61 changes: 30 additions & 31 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1073,15 +1073,15 @@ int8_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, z_
}

_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);

ret = _z_write(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority,
opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
ret = _z_write(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(payload), &opt.encoding->_val,
Z_SAMPLE_KIND_PUT, opt.congestion_control, opt.priority, opt.is_express, opt.timestamp,
_z_bytes_from_owned_bytes(opt.attachment));

// Trigger local subscriptions
_z_trigger_local_subscriptions(_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(payload),
_z_n_qos_make(0, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority),
_z_bytes_from_owned_bytes(opt.attachment));
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(payload), &opt.encoding->_val,
_z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority),
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
// Clean-up
z_encoding_drop(opt.encoding);
z_bytes_drop(opt.attachment);
Expand All @@ -1100,13 +1100,14 @@ int8_t z_delete(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
}
ret = _z_write(_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
opt.congestion_control, opt.priority, opt.is_express, opt.timestamp, _z_bytes_null());
ret = _z_write(_Z_RC_IN_VAL(zs), *keyexpr, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE, opt.congestion_control,
opt.priority, opt.is_express, opt.timestamp, _z_bytes_null());

return ret;
}

void z_publisher_options_default(z_publisher_options_t *options) {
options->encoding = NULL;
options->congestion_control = Z_CONGESTION_CONTROL_DEFAULT;
options->priority = Z_PRIORITY_DEFAULT;
options->is_express = false;
Expand All @@ -1132,12 +1133,11 @@ int8_t z_declare_publisher(z_owned_publisher_t *pub, const z_loaned_session_t *z
z_publisher_options_t opt;
z_publisher_options_default(&opt);
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt = *options;
}
// Set publisher
_z_publisher_t int_pub = _z_declare_publisher(zs, key, opt.congestion_control, opt.priority, opt.is_express);
_z_publisher_t int_pub =
_z_declare_publisher(zs, key, &opt.encoding->_val, opt.congestion_control, opt.priority, opt.is_express);
// Create write filter
int8_t res = _z_write_filter_create(&int_pub);
if (res != _Z_RES_OK) {
Expand All @@ -1155,14 +1155,10 @@ int8_t z_undeclare_publisher(z_owned_publisher_t *pub) { return _z_undeclare_and
void z_publisher_put_options_default(z_publisher_put_options_t *options) {
options->encoding = NULL;
options->attachment = NULL;
options->is_express = false;
options->timestamp = NULL;
}

void z_publisher_delete_options_default(z_publisher_delete_options_t *options) {
options->is_express = false;
options->timestamp = NULL;
}
void z_publisher_delete_options_default(z_publisher_delete_options_t *options) { options->timestamp = NULL; }

int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload,
const z_publisher_put_options_t *options) {
Expand All @@ -1172,24 +1168,30 @@ int8_t z_publisher_put(const z_loaned_publisher_t *pub, z_owned_bytes_t *payload
z_publisher_put_options_default(&opt);
if (options != NULL) {
opt.encoding = options->encoding;
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
opt.attachment = options->attachment;
}
_z_encoding_t encoding;
if (opt.encoding == NULL) {
_Z_RETURN_IF_ERR(_z_encoding_copy(&encoding, &pub->_encoding));
} else {
opt.is_express = pub->_is_express;
encoding = _z_encoding_steal(&opt.encoding->_val);
}

// Check if write filter is active before writing
if (!_z_write_filter_active(pub)) {
// Write value
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload),
_z_encoding_from_owned(opt.encoding), Z_SAMPLE_KIND_PUT, pub->_congestion_control,
pub->_priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
ret = _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload), &encoding,
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(opt.attachment));
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload),
_Z_N_QOS_DEFAULT, _z_bytes_from_owned_bytes(opt.attachment));
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_from_owned_bytes(payload), &encoding,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(opt.attachment));
// Clean-up
z_encoding_drop(opt.encoding);
_z_encoding_clear(&encoding);
z_bytes_drop(opt.attachment);
z_bytes_drop(payload);
return ret;
Expand All @@ -1200,13 +1202,10 @@ int8_t z_publisher_delete(const z_loaned_publisher_t *pub, const z_publisher_del
z_publisher_delete_options_t opt;
z_publisher_delete_options_default(&opt);
if (options != NULL) {
opt.is_express = options->is_express;
opt.timestamp = options->timestamp;
} else {
opt.is_express = pub->_is_express;
}
return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_null(), _z_encoding_null(), Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, opt.is_express, opt.timestamp, _z_bytes_null());
return _z_write(_Z_RC_IN_VAL(&pub->_zn), pub->_key, _z_bytes_null(), NULL, Z_SAMPLE_KIND_DELETE,
pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_null());
}

z_owned_keyexpr_t z_publisher_keyexpr(z_loaned_publisher_t *publisher) {
Expand Down
7 changes: 4 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ int8_t _z_undeclare_resource(_z_session_t *zn, uint16_t rid) {

#if Z_FEATURE_PUBLICATION == 1
/*------------------ Publisher Declaration ------------------*/
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, _z_encoding_t *encoding,
z_congestion_control_t congestion_control, z_priority_t priority,
_Bool is_express) {
// Allocate publisher
Expand All @@ -114,6 +114,7 @@ _z_publisher_t _z_declare_publisher(const _z_session_rc_t *zn, _z_keyexpr_t keye
ret._priority = priority;
ret._is_express = is_express;
ret._zn = _z_session_rc_clone(zn);
ret._encoding = encoding == NULL ? _z_encoding_null() : _z_encoding_steal(encoding);
return ret;
}

Expand All @@ -129,7 +130,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) {
}

/*------------------ Write ------------------*/
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t encoding,
int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, const _z_encoding_t *encoding,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment) {
int8_t ret = _Z_RES_OK;
Expand All @@ -149,7 +150,7 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t p
._commons = {._timestamp = ((timestamp != NULL) ? *timestamp : _z_timestamp_null()),
._source_info = _z_source_info_null()},
._payload = payload,
._encoding = encoding,
._encoding = encoding == NULL ? _z_encoding_null() : *encoding,
._attachment = attachment,
},
},
Expand Down
8 changes: 6 additions & 2 deletions src/net/sample.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,15 @@ _z_sample_t _z_sample_create(_z_keyexpr_t *key, const _z_bytes_t payload, const
_z_sample_t s = _z_sample_null();
s.keyexpr = _z_keyexpr_steal(key);
s.kind = kind;
s.timestamp = _z_timestamp_duplicate(timestamp);
if (timestamp != NULL) {
s.timestamp = _z_timestamp_duplicate(timestamp);
}
s.qos = qos;
_z_bytes_copy(&s.payload, &payload);
_z_bytes_copy(&s.attachment, &attachment);
_z_encoding_move(&s.encoding, encoding);
if (encoding != NULL) {
_z_encoding_move(&s.encoding, encoding);
}
return s;
}
#else
Expand Down
10 changes: 5 additions & 5 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,10 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca
}

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
const _z_n_qos_t qos, const _z_bytes_t attachment) {
_z_encoding_t encoding = _z_encoding_null();
_z_timestamp_t timestamp = _z_timestamp_null();
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment) {
int8_t ret =
_z_trigger_subscriptions(zn, keyexpr, payload, &encoding, Z_SAMPLE_KIND_PUT, &timestamp, qos, attachment);
_z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, timestamp, qos, attachment);
(void)ret;
}

Expand Down Expand Up @@ -208,11 +207,12 @@ void _z_flush_subscriptions(_z_session_t *zn) {
#else // Z_FEATURE_SUBSCRIPTION == 0

void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_n_qos_t qos, const _z_bytes_t attachment) {
_z_n_qos_t qos, const _z_timestampt_t *timestamp, const _z_bytes_t attachment) {
_ZP_UNUSED(zn);
_ZP_UNUSED(keyexpr);
_ZP_UNUSED(payload);
_ZP_UNUSED(qos);
_ZP_UNUSED(timestamp);
_ZP_UNUSED(attachment);
}

Expand Down
19 changes: 18 additions & 1 deletion tests/z_api_alignment_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ void data_handler(const z_loaned_sample_t *sample, void *arg) {
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &k_str);
(void)arg;
assert(z_check(k_str));
const char *encoding_expected =
z_sample_kind(sample) == Z_SAMPLE_KIND_PUT ? "zenoh/bytes;test_encoding" : "zenoh/bytes";
z_owned_string_t encoding;
z_encoding_to_string(z_sample_encoding(sample), &encoding);
printf("%s\n", z_string_data(z_loan(encoding)));
assert(strncmp(encoding_expected, z_string_data(z_loan(encoding)), strlen(encoding_expected)) == 0);
}

int main(int argc, char **argv) {
Expand All @@ -122,9 +128,13 @@ int main(int argc, char **argv) {

#ifdef ZENOH_C
zc_init_logger();
const char *encoding_expected =
z_sample_kind(sample) == Z_SAMPLE_KIND_PUT ? "zenoh/bytes" : "zenoh/bytes;test_encoding";
z_pu
#endif

z_view_keyexpr_t key_demo_example, key_demo_example_a, key_demo_example_starstar;
z_view_keyexpr_t key_demo_example,
key_demo_example_a, key_demo_example_starstar;
z_view_keyexpr_from_str(&key_demo_example, "demo/example");
z_view_keyexpr_from_str(&key_demo_example_a, "demo/example/a");
z_view_keyexpr_from_str(&key_demo_example_starstar, "demo/example/**");
Expand Down Expand Up @@ -296,6 +306,9 @@ int main(int argc, char **argv) {
printf("Session Put...");
z_put_options_t _ret_put_opt;
z_put_options_default(&_ret_put_opt);
z_owned_encoding_t encoding;
z_encoding_from_str(&encoding, "test_encoding");
_ret_put_opt.encoding = z_move(encoding);
_ret_put_opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK;

// Create payload
Expand All @@ -304,6 +317,7 @@ int main(int argc, char **argv) {

_ret_int8 = z_put(z_loan(s1), z_loan(_ret_expr), z_move(payload), &_ret_put_opt);
assert_eq(_ret_int8, 0);
assert(!z_check(encoding));
printf("Ok\n");

z_sleep_s(SLEEP);
Expand All @@ -330,10 +344,13 @@ int main(int argc, char **argv) {
printf("Declaring Publisher...");
z_publisher_options_t _ret_pub_opt;
z_publisher_options_default(&_ret_pub_opt);
z_encoding_from_str(&encoding, "test_encoding");
_ret_pub_opt.encoding = z_move(encoding);
_ret_pub_opt.congestion_control = Z_CONGESTION_CONTROL_BLOCK;
z_owned_publisher_t _ret_pub;
_ret_int8 = z_declare_publisher(&_ret_pub, z_loan(s1), z_loan(s1_key), &_ret_pub_opt);
assert(_ret_int8 == _Z_RES_OK);
assert(!z_check(encoding));
printf("Ok\n");

z_sleep_s(SLEEP);
Expand Down
2 changes: 1 addition & 1 deletion zenohpico.pc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ prefix=/usr/local
Name: zenohpico
Description:
URL:
Version: 1.0.20240730dev
Version: 1.0.20240801dev
Cflags: -I${prefix}/include
Libs: -L${prefix}/lib -lzenohpico
Loading