Skip to content

Commit

Permalink
gcoap: send Observe notifications from request address
Browse files Browse the repository at this point in the history
  • Loading branch information
fabian18 committed May 30, 2024
1 parent d96695c commit 977ce56
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
9 changes: 9 additions & 0 deletions sys/include/net/gcoap.h
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,14 @@ extern "C" {
#define CONFIG_GCOAP_OBS_CLIENTS_MAX (2)
#endif

/**
* @ingroup net_gcoap_conf
* @brief Maximum number of local notifying endpoint addresses
*/
#ifndef CONFIG_GCOAP_OBS_NOTIFIERS_MAX
#define CONFIG_GCOAP_OBS_NOTIFIERS_MAX (2)
#endif

/**
* @ingroup net_gcoap_conf
* @brief Maximum number of registrations for Observable resources
Expand Down Expand Up @@ -837,6 +845,7 @@ struct gcoap_request_memo {
*/
typedef struct {
sock_udp_ep_t *observer; /**< Client endpoint; unused if null */
sock_udp_ep_t *notifier; /**< Local endpoint to send notifications */
const coap_resource_t *resource; /**< Entity being observed */
uint8_t token[GCOAP_TOKENLEN_MAX]; /**< Client token for notifications */
uint16_t last_msgid; /**< Message ID of last notification */
Expand Down
120 changes: 89 additions & 31 deletions sys/net/application_layer/gcoap/gcoap.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ static int _find_resource(gcoap_socket_type_t tl_type,
const coap_resource_t **resource_ptr,
gcoap_listener_t **listener_ptr);
static int _find_observer(sock_udp_ep_t **observer, sock_udp_ep_t *remote);
static int _find_obs_memo(gcoap_observe_memo_t **memo, sock_udp_ep_t *remote,
coap_pkt_t *pdu);
static int _find_notifier(sock_udp_ep_t **notifier, sock_udp_ep_t *local);
static int _find_obs_memo(gcoap_observe_memo_t **memo,
sock_udp_ep_t *remote, sock_udp_ep_t *local,
coap_pkt_t *pdu);
static void _find_obs_memo_resource(gcoap_observe_memo_t **memo,
const coap_resource_t *resource);

Expand Down Expand Up @@ -137,6 +139,10 @@ typedef struct {
sock_udp_ep_t observers[CONFIG_GCOAP_OBS_CLIENTS_MAX];
/* Observe clients; allows reuse for
observe memos */
/**
* @brief Local endpoint aliases to send notifications from
*/
sock_udp_ep_t notifiers[CONFIG_GCOAP_OBS_NOTIFIERS_MAX];
gcoap_observe_memo_t observe_memos[CONFIG_GCOAP_OBS_REGISTRATIONS_MAX];
/* Observed resource registrations */
uint8_t resend_bufs[CONFIG_GCOAP_RESEND_BUFS_MAX][CONFIG_GCOAP_PDU_BUF_SIZE];
Expand Down Expand Up @@ -679,6 +685,7 @@ static size_t _handle_req(gcoap_socket_t *sock, coap_pkt_t *pdu, uint8_t *buf,
const coap_resource_t *resource = NULL;
gcoap_listener_t *listener = NULL;
sock_udp_ep_t *observer = NULL;
sock_udp_ep_t *notifier = NULL;
gcoap_observe_memo_t *memo = NULL;
gcoap_observe_memo_t *resource_memo = NULL;

Expand All @@ -699,7 +706,7 @@ static size_t _handle_req(gcoap_socket_t *sock, coap_pkt_t *pdu, uint8_t *buf,

if (coap_get_observe(pdu) == COAP_OBS_REGISTER) {
/* lookup remote+token */
int empty_slot = _find_obs_memo(&memo, remote, pdu);
int empty_slot = _find_obs_memo(&memo, remote, NULL, pdu);
/* validate re-registration request */
if (resource_memo != NULL) {
if (memo != NULL) {
Expand All @@ -721,18 +728,28 @@ static size_t _handle_req(gcoap_socket_t *sock, coap_pkt_t *pdu, uint8_t *buf,
if ((memo == NULL) && coap_has_observe(pdu)) {
/* verify resource not already registered (for another endpoint) */
if ((empty_slot >= 0) && (resource_memo == NULL)) {
int obs_slot = _find_observer(&observer, remote);
int slot = _find_observer(&observer, remote);
/* cache new observer */
if (observer == NULL) {
if (obs_slot >= 0) {
observer = &_coap_state.observers[obs_slot];
memcpy(observer, remote, sizeof(sock_udp_ep_t));
if (slot >= 0) {
observer = &_coap_state.observers[slot];
} else {
DEBUG("gcoap: can't register observer\n");
}
}
if (observer != NULL) {
slot = _find_notifier(&notifier, &aux->local);
if (notifier == NULL) {
if (slot >= 0) {
notifier = &_coap_state.notifiers[slot];
} else {
DEBUG("gcoap: can't allocate notifier\n");
}
}
if (observer && notifier) {
memcpy(observer, remote, sizeof(*remote));
memcpy(notifier, &aux->local, sizeof(aux->local));
memo = &_coap_state.observe_memos[empty_slot];
memo->notifier = notifier;
memo->observer = observer;
}
}
Expand All @@ -754,19 +771,25 @@ static size_t _handle_req(gcoap_socket_t *sock, coap_pkt_t *pdu, uint8_t *buf,
}

} else if (coap_get_observe(pdu) == COAP_OBS_DEREGISTER) {
_find_obs_memo(&memo, remote, pdu);
_find_obs_memo(&memo, remote, NULL, pdu);
/* clear memo, and clear observer if no other memos */
if (memo != NULL) {
DEBUG("gcoap: Deregistering observer for: %s\n", memo->resource->path);
memo->observer = NULL;
memo = NULL;
_find_obs_memo(&memo, remote, NULL);
if (memo == NULL) {
gcoap_observe_memo_t *other_memo = NULL;
_find_obs_memo(&other_memo, remote, NULL, NULL);
if (other_memo == NULL) {
_find_observer(&observer, remote);
if (observer != NULL) {
observer->family = AF_UNSPEC;
}
}
other_memo = NULL;
_find_obs_memo(&other_memo, NULL, memo->notifier, NULL);
if (!other_memo) {
memo->notifier->family = AF_UNSPEC;
}
memo->notifier = NULL;
}
coap_clear_observe(pdu);

Expand Down Expand Up @@ -908,7 +931,7 @@ static int _find_resource(gcoap_socket_type_t tl_type,
* return Registered request memo, or NULL if not found
*/
static gcoap_request_memo_t* _find_req_memo_by_token(const sock_udp_ep_t *remote,
const uint8_t *token, size_t tkl)
const uint8_t *token, size_t tkl)
{
/* no need to initialize struct; we only care about buffer contents below */
coap_pkt_t memo_pdu_data;
Expand Down Expand Up @@ -1042,57 +1065,78 @@ static ssize_t _well_known_core_handler(coap_pkt_t* pdu, uint8_t *buf, size_t le
}

/*
* Find registered observer for a remote address and port.
* Find registered observer or notification endpoint for a remote aor local address and port.
*
* observer[out] -- Registered observer, or NULL if not found
* remote[in] -- Endpoint to match
* out[in,out] -- in: endpoint array to scan, out: found endpoint or NULL if not found
* in[in] -- Endpoint to match
*
* return Index of empty slot, suitable for registering new observer; or -1
* return Index of empty slot, suitable for registering new endpoint; or -1
* if no empty slots. Undefined if observer found.
*/
static int _find_observer(sock_udp_ep_t **observer, sock_udp_ep_t *remote)
static int _find_endpoint(sock_udp_ep_t **out, sock_udp_ep_t *in, unsigned max)
{
int empty_slot = -1;
*observer = NULL;
for (unsigned i = 0; i < CONFIG_GCOAP_OBS_CLIENTS_MAX; i++) {
sock_udp_ep_t *ep_array = *out;
*out = NULL;
for (unsigned i = 0; i < max; i++) {

if (_coap_state.observers[i].family == AF_UNSPEC) {
if (ep_array[i].family == AF_UNSPEC) {
empty_slot = i;
}
else if (sock_udp_ep_equal(&_coap_state.observers[i], remote)) {
*observer = &_coap_state.observers[i];
else if (sock_udp_ep_equal(&ep_array[i], in)) {
*out = &ep_array[i];
break;
}
}
return empty_slot;
}

static int _find_observer(sock_udp_ep_t **observer, sock_udp_ep_t *remote)
{
*observer = _coap_state.observers;
return _find_endpoint(observer, remote, CONFIG_GCOAP_OBS_CLIENTS_MAX);
}

static int _find_notifier(sock_udp_ep_t **notifier, sock_udp_ep_t *local)
{
*notifier = _coap_state.notifiers;
return _find_endpoint(notifier, local, CONFIG_GCOAP_OBS_NOTIFIERS_MAX);
}

/*
* Find registered observe memo for a remote address and token.
*
* memo[out] -- Registered observe memo, or NULL if not found
* remote[in] -- Endpoint for address to match
* remote[in] -- Remote endpoint for address to match if not NULL
* local[in] -- Local endpoint for address to match if not NULL
* pdu[in] -- PDU for token to match, or NULL to match only on remote address
*
* return Index of empty slot, suitable for registering new memo; or -1 if no
* empty slots. Undefined if memo found.
*/
static int _find_obs_memo(gcoap_observe_memo_t **memo, sock_udp_ep_t *remote,
coap_pkt_t *pdu)
static int _find_obs_memo(gcoap_observe_memo_t **memo,
sock_udp_ep_t *remote, sock_udp_ep_t *local,
coap_pkt_t *pdu)
{
int empty_slot = -1;
*memo = NULL;

sock_udp_ep_t *remote_observer = NULL;
_find_observer(&remote_observer, remote);

sock_udp_ep_t *local_notifier = NULL;
if (remote) {
_find_observer(&remote_observer, remote);
}
if (local) {
_find_notifier(&local_notifier, local);
}
for (unsigned i = 0; i < CONFIG_GCOAP_OBS_REGISTRATIONS_MAX; i++) {
if (_coap_state.observe_memos[i].observer == NULL) {
empty_slot = i;
continue;
}

if (_coap_state.observe_memos[i].observer == remote_observer) {
if ((_coap_state.observe_memos[i].observer == remote_observer || !remote_observer) &&
(_coap_state.observe_memos[i].notifier == local_notifier || !local_notifier)) {
if (pdu == NULL) {
*memo = &_coap_state.observe_memos[i];
break;
Expand Down Expand Up @@ -1144,10 +1188,19 @@ static void _check_and_expire_obs_memo_last_mid(sock_udp_ep_t *remote,

if (stale_obs_memo) {
stale_obs_memo->observer = NULL; /* clear memo */
/* check if no other memo is referencing the same local endpoint ... */
gcoap_observe_memo_t *other_memo = NULL;
_find_obs_memo(&other_memo, NULL, stale_obs_memo->notifier, NULL);
if (!other_memo) {
/* ... if not -> also free the notifier entry */
stale_obs_memo->notifier->family = AF_UNSPEC;
}
/* then unreference notifier */
stale_obs_memo->notifier = NULL;

/* check if the observer has more observe memos registered... */
stale_obs_memo = NULL;
_find_obs_memo(&stale_obs_memo, observer, NULL);
_find_obs_memo(&stale_obs_memo, observer, NULL, NULL);
if (stale_obs_memo == NULL) {
/* ... if not -> also free the observer entry */
observer->family = AF_UNSPEC;
Expand Down Expand Up @@ -1864,7 +1917,12 @@ size_t gcoap_obs_send(const uint8_t *buf, size_t len,
_find_obs_memo_resource(&memo, resource);

if (memo) {
ssize_t bytes = _tl_send(&memo->socket, buf, len, memo->observer, NULL);
sock_udp_aux_tx_t aux = { 0 };
if (memo->notifier) {
memcpy(&aux.local, memo->notifier, sizeof(*memo->notifier));
aux.flags = SOCK_AUX_SET_LOCAL;
}
ssize_t bytes = _tl_send(&memo->socket, buf, len, memo->observer, &aux);
return (size_t)((bytes > 0) ? bytes : 0);
}
else {
Expand Down

0 comments on commit 977ce56

Please sign in to comment.