Skip to content

Commit

Permalink
shm_sub REFACTOR recover, new lock and notify wait
Browse files Browse the repository at this point in the history
The logic in the following function has been revised multiple times, and
some of the checks are no longer relevant.

This is mainly due to the use of `SR_SUB_EV_FINISHED` by subscribers and
`SR_SUB_EV_NONE` by originators.

The following are relevant for synchronization:

1. sub_shm->orig_cid:
This is used to track the originator of an event, and must always be set
unless holding the mutex.
This is cleared only after originator has finished (success or failure)
the event, and does not want to access the SHM anymore.
It's purpose is entirely for sub_shm recovery (clear the event).
  a. if set, and orig_cid is dead
  b. if unset and event exists.

2. sub_shm->event:
Originators set it to one of SR_IS_NOTIFY_EVENT and listeners set it
to one of SR_IS_LISTEN_EVENT.
Only originators clear the event once they are done accessing the SHM or
when they recover an abandoned event.

3. sub_shm->request_id:
Identifies the event uniquely for the subscription. It is checked by
subscribers to prevent processing the same event more than once.
It is incremented by the originator to notify a new event only after
acquiring `sr_shmsub_notify_new_wrlock`.

Special cases:

1. Notification subscription
It is the only one which does not use a orig_cid while an event is
ongoing.

2. Aborted events
In case of aborted events, the originator does not clear the error
status of the SHM, but clears the orig_cid.

3. Originator loses WRITE lock
When originator loses WRITE lock, because some of the subscribers are
holding READ locks, abort events are not sent. The event is cleared
regardless of SUCCESS, FAILURE or FINISHED.
orig_cid is also cleared since it is no longer waiting for the event.
  • Loading branch information
irfanHaslanded authored and michalvasko committed Jun 12, 2024
1 parent e1d99b1 commit d1eade7
Showing 1 changed file with 40 additions and 21 deletions.
61 changes: 40 additions & 21 deletions src/shm_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,25 @@ sr_shmsub_recover(sr_sub_shm_t *sub_shm)
{
sr_sub_event_t ev = ATOMIC_LOAD_RELAXED(sub_shm->event);

if (sub_shm->orig_cid && !sr_conn_is_alive(sub_shm->orig_cid)) {
SR_LOG_WRN("EV ORIGIN: SHM event \"%s\" of CID %" PRIu32 " ID %" PRIu32 " recovered.",
sr_ev2str(ev), sub_shm->orig_cid,
(uint32_t)ATOMIC_LOAD_RELAXED(sub_shm->request_id));

/* clear the event */
ATOMIC_STORE_RELAXED(sub_shm->event, SR_SUB_EV_NONE);
sub_shm->orig_cid = 0;
} else if (ev && (ev != SR_SUB_EV_NOTIF)) {
/* if there is an event except SR_SUB_EV_NOTIF, orig_cid must always exist */
assert(sub_shm->orig_cid);
/* if orig_cid is set, the connection must be alive*/
if (sub_shm->orig_cid) {
if (sr_conn_is_alive(sub_shm->orig_cid)) {
return;
}
} else {
/* if there is no orig_cid, there should be no event, except a notif event */
if (!ev || (ev == SR_SUB_EV_NOTIF)) {
return;
}
}

SR_LOG_WRN("EV ORIGIN: SHM event \"%s\" of CID %" PRIu32 " ID %" PRIu32 " recovered.",
sr_ev2str(ev), sub_shm->orig_cid,
(uint32_t)ATOMIC_LOAD_RELAXED(sub_shm->request_id));

/* clear the event */
ATOMIC_STORE_RELAXED(sub_shm->event, SR_SUB_EV_NONE);
sub_shm->orig_cid = 0;
}

/**
Expand All @@ -338,9 +345,12 @@ sr_shmsub_notify_new_wrlock(sr_sub_shm_t *sub_shm, const char *shm_name, sr_sub_
sr_error_info_t *err_info = NULL;
struct timespec timeout_abs;
sr_sub_event_t last_event;
uint32_t request_id, last_request_id;
uint32_t last_request_id;
int ret;

/* it is only possible to lock with none or error */
assert(!lock_event || (SR_SUB_EV_ERROR == lock_event));

/* WRITE LOCK */
if ((err_info = sr_rwlock(&sub_shm->lock, SR_SUBSHM_LOCK_TIMEOUT, SR_LOCK_WRITE, cid, __func__, NULL, NULL))) {
return err_info;
Expand All @@ -351,9 +361,6 @@ sr_shmsub_notify_new_wrlock(sr_sub_shm_t *sub_shm, const char *shm_name, sr_sub_
sr_shmsub_recover(sub_shm);
}

/* remember current request_id */
request_id = ATOMIC_LOAD_RELAXED(sub_shm->request_id);

assert(sub_shm->lock.writer == cid);
/* FAKE WRITE UNLOCK */
sub_shm->lock.writer = 0;
Expand Down Expand Up @@ -387,7 +394,7 @@ sr_shmsub_notify_new_wrlock(sr_sub_shm_t *sub_shm, const char *shm_name, sr_sub_

if (ret) {
if ((ret == ETIMEDOUT) && (!sub_shm->lock.readers[0]) &&
(!last_event || (last_event == lock_event)) && (request_id == last_request_id)) {
(!last_event || (last_event == lock_event))) {
/* even though the timeout has elapsed, the event was handled so continue normally */
/* ensure that there are no readers left, otherwise we don't have the write lock */
goto event_handled;
Expand All @@ -413,7 +420,11 @@ sr_shmsub_notify_new_wrlock(sr_sub_shm_t *sub_shm, const char *shm_name, sr_sub_

event_handled:
/* we have write lock and the expected event, remove any left over orig_cid */
sub_shm->orig_cid = 0;
if (sub_shm->orig_cid) {
SR_LOG_WRN("Recovered \"%s\" previous event \"%s\" ID %" PRIu32 " abandoned by CID %" PRIu32,
shm_name, sr_ev2str(last_event), last_request_id, sub_shm->orig_cid);
sub_shm->orig_cid = 0;
}
return NULL;
}

Expand Down Expand Up @@ -476,15 +487,18 @@ _sr_shmsub_notify_wait_wr(sr_sub_shm_t *sub_shm, sr_sub_event_t event, uint32_t
last_event = ATOMIC_LOAD_RELAXED(sub_shm->event);
last_request_id = ATOMIC_LOAD_RELAXED(sub_shm->request_id);

/* request_id cannot have changed while we were waiting */
assert(request_id == last_request_id);

/* orig_cid is mainly used to recover the shm if the originator has crashed after a fake write unlock.
* We can clear it here, as we will not fake write unlock beyond this point. */
sub_shm->orig_cid = 0;

if (ret) {
if ((ret == ETIMEDOUT) && SR_IS_NOTIFY_EVENT(last_event) && (request_id == last_request_id)) {
if ((ret == ETIMEDOUT) && SR_IS_NOTIFY_EVENT(last_event)) {
/* even though the timeout has elapsed, the event was handled so continue normally */
goto event_handled;
} else if ((ret == ETIMEDOUT) && (last_event && !SR_IS_NOTIFY_EVENT(last_event))) {
} else if ((ret == ETIMEDOUT) && (event == last_event)) { /* our publised event remains untouched in SHM */
/* WRITE LOCK, chances are we will get it if we ignore the event */
timeout_abs2 = sr_time_ts_add(timeout_abs, SR_EVENT_TIMEOUT_LOCK_TIMEOUT);
if (!(err_info = sr_sub_rwlock(&sub_shm->lock, &timeout_abs2, SR_LOCK_WRITE, cid, __func__, NULL, NULL, 1))) {
Expand All @@ -494,7 +508,12 @@ _sr_shmsub_notify_wait_wr(sr_sub_shm_t *sub_shm, sr_sub_event_t event, uint32_t
write_lock = 1;
}
} else {
/* other error */
/* other error - not ETIMEDOUT or shm event has changed incorrectly */
if (event != last_event) {
SR_LOG_WRN("EV ORIGIN: SHM event \"%s\" ID %" PRIu32 " changed to \"%s\" unexpectedly",
sr_ev2str(event), request_id, sr_ev2str(last_event));

}
SR_ERRINFO_COND(&err_info, __func__, ret);
}

Expand All @@ -509,7 +528,7 @@ _sr_shmsub_notify_wait_wr(sr_sub_shm_t *sub_shm, sr_sub_event_t event, uint32_t
sub_shm->lock.writer = cid;
}

if ((event == last_event) && (request_id == last_request_id)) {
if ((event == last_event)) {
/* event failed */
if (clear_ev_on_err) {
ATOMIC_STORE_RELAXED(sub_shm->event, SR_SUB_EV_NONE);
Expand Down

0 comments on commit d1eade7

Please sign in to comment.