Skip to content

Commit

Permalink
posix: Add implementation of mq_notify() function
Browse files Browse the repository at this point in the history
The function was the last missing piece of the `_POSIX_MESSAGE_PASSING`
option group. Due to lack of signal subsystem in the Zephyr RTOS
the `sigev_notify` member of the `sigevent` structure that describes
the notification cannot be set to `SIGEV_SIGNAL` - this notification
type is not implemented, the function will return -1 and set `errno`
to `ENOSYS`.

`mq_notify` documentation:
https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_notify.html

Fixes zephyrproject-rtos#66958

Signed-off-by: Adam Wojasinski <awojasinski@baylibre.com>
  • Loading branch information
awojasinski committed Jan 23, 2024
1 parent 0a2876d commit 306b783
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 19 deletions.
2 changes: 1 addition & 1 deletion doc/services/portability/posix/option_groups/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ _POSIX_MESSAGE_PASSING

mq_close(),yes
mq_getattr(),yes
mq_notify(),
mq_notify(),yes
mq_open(),yes
mq_receive(),yes
mq_send(),yes
Expand Down
2 changes: 2 additions & 0 deletions include/zephyr/posix/mqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <zephyr/kernel.h>
#include <zephyr/posix/time.h>
#include <zephyr/posix/fcntl.h>
#include <zephyr/posix/signal.h>
#include <zephyr/posix/sys/stat.h>
#include "posix_types.h"

Expand Down Expand Up @@ -40,6 +41,7 @@ int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio, const struct timespec *abstime);
int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned int msg_prio, const struct timespec *abstime);
int mq_notify(mqd_t mqdes, const struct sigevent *notification);

#ifdef __cplusplus
}
Expand Down
115 changes: 97 additions & 18 deletions lib/posix/mqueue.c
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
/*
* Copyright (c) 2018 Intel Corporation
* Copyright (c) 2024 BayLibre, SAS
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/kernel.h>
#include <errno.h>
#include <string.h>
#include <zephyr/sys/atomic.h>
#include <zephyr/posix/time.h>
#include <zephyr/posix/mqueue.h>
#include <zephyr/posix/pthread.h>

#define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD)

typedef struct mqueue_object {
sys_snode_t snode;
Expand All @@ -17,12 +20,13 @@ typedef struct mqueue_object {
struct k_msgq queue;
atomic_t ref_count;
char *name;
struct sigevent not;
} mqueue_object;

typedef struct mqueue_desc {
char *mem_desc;
mqueue_object *mqueue;
uint32_t flags;
uint32_t flags;
} mqueue_desc;

K_SEM_DEFINE(mq_sem, 1, 1);
Expand All @@ -33,10 +37,12 @@ sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
int64_t timespec_to_timeoutms(const struct timespec *abstime);
static mqueue_object *find_in_list(const char *name);
static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
k_timeout_t timeout);
static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
k_timeout_t timeout);
k_timeout_t timeout);
static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
k_timeout_t timeout);
static void remove_notification(mqueue_object *msg_queue);
static void remove_mq(mqueue_object *msg_queue);
static void *mq_notify_thread(void *arg);

/**
* @brief Open a message queue.
Expand Down Expand Up @@ -69,13 +75,12 @@ mqd_t mq_open(const char *name, int oflags, ...)
max_msgs = attrs->mq_maxmsg;
}

if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
max_msgs <= 0))) {
if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 || max_msgs <= 0))) {
errno = EINVAL;
return (mqd_t)mqd;
}

if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) {
if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) {
errno = ENAMETOOLONG;
return (mqd_t)mqd;
}
Expand All @@ -85,8 +90,7 @@ mqd_t mq_open(const char *name, int oflags, ...)
msg_queue = find_in_list(name);
k_sem_give(&mq_sem);

if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
(oflags & O_EXCL) != 0) {
if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 && (oflags & O_EXCL) != 0) {
/* Message queue has already been opened and O_EXCL is set */
errno = EEXIST;
return (mqd_t)mqd;
Expand Down Expand Up @@ -139,17 +143,15 @@ mqd_t mq_open(const char *name, int oflags, ...)

mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
if (mq_buf_ptr != NULL) {
(void)memset(mq_buf_ptr, 0,
msg_size * max_msgs * sizeof(uint8_t));
(void)memset(mq_buf_ptr, 0, msg_size * max_msgs * sizeof(uint8_t));
msg_queue->mem_buffer = mq_buf_ptr;
} else {
goto free_mq_buffer;
}

(void)atomic_set(&msg_queue->ref_count, 1);
/* initialize zephyr message queue */
k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
max_msgs);
k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size, max_msgs);
k_sem_take(&mq_sem, K_FOREVER);
sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
k_sem_give(&mq_sem);
Expand Down Expand Up @@ -261,8 +263,7 @@ int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
*
* See IEEE 1003.1
*/
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
unsigned int *msg_prio)
int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;

Expand Down Expand Up @@ -341,6 +342,61 @@ int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
return 0;
}

/**
* @brief Notify process that a message is available.
*
* See IEEE 1003.1
*/
int mq_notify(mqd_t mqdes, const struct sigevent *notification)
{
mqueue_desc *mqd = (mqueue_desc *)mqdes;

if (mqd == NULL) {
errno = EBADF;
return -1;
}

mqueue_object * msg_queue = mqd->mqueue;

if (notification == NULL) {
if ((msg_queue->not.sigev_notify & SIGEV_MASK) == 0) {
errno = EINVAL;
return -1;
} else {
remove_notification(msg_queue);
}
} else {
if ((msg_queue->not.sigev_notify & SIGEV_MASK) != 0) {
errno = EBUSY;
return -1;
}
if (notification->sigev_notify == SIGEV_SIGNAL) {
errno = ENOSYS;
return -1;
}
k_sem_take(&mq_sem, K_FOREVER);
memcpy(&msg_queue->not, notification, sizeof(struct sigevent));
k_sem_give(&mq_sem);
}

return 0;
}

static void *mq_notify_thread(void *arg)
{
mqueue_object *mqueue = (mqueue_object *)arg;
struct sigevent *sevp = &mqueue->not;

pthread_detach(pthread_self());

sevp->sigev_notify_function(sevp->sigev_value);

remove_notification(mqueue);

pthread_exit(NULL);
return NULL;
}

/* Internal functions */
static mqueue_object *find_in_list(const char *name)
{
Expand All @@ -362,7 +418,7 @@ static mqueue_object *find_in_list(const char *name)
}

static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
k_timeout_t timeout)
k_timeout_t timeout)
{
int32_t ret = -1;

Expand All @@ -380,16 +436,32 @@ static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_le
return ret;
}

uint32_t msgq_num = k_msgq_num_used_get(&mqd->mqueue->queue);

if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
return ret;
}

if (k_msgq_num_used_get(&mqd->mqueue->queue) - msgq_num > 0) {
struct sigevent * sevp = &mqd->mqueue->not;

if (sevp->sigev_notify == SIGEV_NONE) {
sevp->sigev_notify_function(sevp->sigev_value);
} else if (sevp->sigev_notify == SIGEV_THREAD) {
pthread_t th;
ret = pthread_create(&th,
sevp->sigev_notify_attributes,
mq_notify_thread,
mqd->mqueue);
}
}

return 0;
}

static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
k_timeout_t timeout)
k_timeout_t timeout)
{
int ret = -1;

Expand Down Expand Up @@ -428,3 +500,10 @@ static void remove_mq(mqueue_object *msg_queue)
k_free(msg_queue->mem_obj);
}
}

static void remove_notification(mqueue_object *msg_queue)
{
k_sem_take(&mq_sem, K_FOREVER);
memset(&msg_queue->not, 0, sizeof(struct sigevent));
k_sem_give(&mq_sem);
}

0 comments on commit 306b783

Please sign in to comment.