Skip to content

Commit

Permalink
uORB Subscription callbacks with WorkItem scheduling on publication (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dagar authored Jun 17, 2019
1 parent beaba44 commit 136962d
Showing 11 changed files with 375 additions and 6 deletions.
6 changes: 6 additions & 0 deletions src/modules/uORB/Subscription.hpp
Original file line number Diff line number Diff line change
@@ -48,6 +48,8 @@
namespace uORB
{

class SubscriptionCallback;

// Base subscription wrapper class
class Subscription
{
@@ -107,6 +109,10 @@ class Subscription

protected:

friend class SubscriptionCallback;

DeviceNode *get_node() { return _node; }

bool subscribe();
void unsubscribe();

136 changes: 136 additions & 0 deletions src/modules/uORB/SubscriptionCallback.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/****************************************************************************
*
* Copyright (c) 2019 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/

/**
* @file SubscriptionCallback.hpp
*
*/

#pragma once

#include <uORB/SubscriptionInterval.hpp>
#include <containers/List.hpp>
#include <px4_work_queue/WorkItem.hpp>

namespace uORB
{

// Subscription wrapper class with callbacks on new publications
class SubscriptionCallback : public SubscriptionInterval, public ListNode<SubscriptionCallback *>
{
public:
/**
* Constructor
*
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param instance The instance for multi sub.
*/
SubscriptionCallback(const orb_metadata *meta, uint8_t interval_ms = 0, uint8_t instance = 0) :
SubscriptionInterval(meta, interval_ms, instance)
{
}

virtual ~SubscriptionCallback()
{
unregister_callback();
};

bool register_callback()
{
bool ret = false;

if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) {
// registered
ret = true;

} else {
// force topic creation by subscribing with old API
int fd = orb_subscribe_multi(_subscription.get_topic(), _subscription.get_instance());

// try to register callback again
if (_subscription.forceInit()) {
if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) {
ret = true;
}
}

orb_unsubscribe(fd);
}


return ret;
}

void unregister_callback()
{
if (_subscription.get_node()) {
_subscription.get_node()->unregister_callback(this);
}
}

virtual void call() = 0;

};

// Subscription with callback that schedules a WorkItem
class SubscriptionCallbackWorkItem : public SubscriptionCallback
{
public:
/**
* Constructor
*
* @param work_item The WorkItem that will be scheduled immediately on new publications.
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param instance The instance for multi sub.
*/
SubscriptionCallbackWorkItem(px4::WorkItem *work_item, const orb_metadata *meta, uint8_t instance = 0) :
SubscriptionCallback(meta, 0, instance), // interval 0
_work_item(work_item)
{
}

virtual ~SubscriptionCallbackWorkItem() = default;

void call() override
{
// schedule immediately if no interval, otherwise check time elapsed
if ((_interval == 0) || (hrt_elapsed_time(&_last_update) >= _interval)) {
_work_item->ScheduleNow();
}
}

private:
px4::WorkItem *_work_item;
};

} // namespace uORB
133 changes: 133 additions & 0 deletions src/modules/uORB/SubscriptionInterval.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/****************************************************************************
*
* Copyright (c) 2019 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/

/**
* @file SubscriptionInterval.hpp
*
*/

#pragma once

#include <uORB/uORB.h>
#include <px4_defines.h>

#include "uORBDeviceNode.hpp"
#include "uORBManager.hpp"
#include "uORBUtils.hpp"

#include "Subscription.hpp"

namespace uORB
{

// Base subscription wrapper class
class SubscriptionInterval
{
public:

/**
* Constructor
*
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param interval The requested maximum update interval in microseconds.
* @param instance The instance for multi sub.
*/
SubscriptionInterval(const orb_metadata *meta, uint32_t interval_us = 0, uint8_t instance = 0) :
_subscription{meta, instance},
_interval(interval_us)
{}

SubscriptionInterval() : _subscription{nullptr} {}

~SubscriptionInterval() = default;

bool published() { return _subscription.published(); }

/**
* Check if there is a new update.
* */
bool updated()
{
if (published() && (hrt_elapsed_time(&_last_update) >= _interval)) {
return _subscription.updated();
}

return false;
}

/**
* Copy the struct if updated.
* @param dst The destination pointer where the struct will be copied.
* @return true only if topic was updated and copied successfully.
*/
bool update(void *dst)
{
if (updated()) {
return copy(dst);
}

return false;
}

/**
* Copy the struct
* @param dst The destination pointer where the struct will be copied.
* @return true only if topic was copied successfully.
*/
bool copy(void *dst)
{
if (_subscription.copy(dst)) {
_last_update = hrt_absolute_time();
return true;
}

return false;
}

bool valid() const { return _subscription.valid(); }

uint8_t get_instance() const { return _subscription.get_instance(); }
orb_id_t get_topic() const { return _subscription.get_topic(); }
uint32_t get_interval() const { return _interval; }

void set_interval(uint32_t interval) { _interval = interval; }

protected:

Subscription _subscription;
uint64_t _last_update{0}; // last update in microseconds
uint32_t _interval{0}; // maximum update interval in microseconds

};

} // namespace uORB
37 changes: 37 additions & 0 deletions src/modules/uORB/uORBDeviceNode.cpp
Original file line number Diff line number Diff line change
@@ -37,6 +37,8 @@
#include "uORBUtils.hpp"
#include "uORBManager.hpp"

#include "SubscriptionCallback.hpp"

#ifdef ORB_COMMUNICATOR
#include "uORBCommunicator.hpp"
#endif /* ORB_COMMUNICATOR */
@@ -317,6 +319,11 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)

_published = true;

// callbacks
for (auto item : _callbacks) {
item->call();
}

ATOMIC_LEAVE;

/* notify any poll waiters */
@@ -673,3 +680,33 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
_queue_size = queue_size;
return PX4_OK;
}

bool
uORB::DeviceNode::register_callback(uORB::SubscriptionCallback *callback_sub)
{
if (callback_sub != nullptr) {
ATOMIC_ENTER;

// prevent duplicate registrations
for (auto existing_callbacks : _callbacks) {
if (callback_sub == existing_callbacks) {
ATOMIC_LEAVE;
return true;
}
}

_callbacks.add(callback_sub);
ATOMIC_LEAVE;
return true;
}

return false;
}

void
uORB::DeviceNode::unregister_callback(uORB::SubscriptionCallback *callback_sub)
{
ATOMIC_ENTER;
_callbacks.remove(callback_sub);
ATOMIC_LEAVE;
}
8 changes: 8 additions & 0 deletions src/modules/uORB/uORBDeviceNode.hpp
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ namespace uORB
class DeviceNode;
class DeviceMaster;
class Manager;
class SubscriptionCallback;
}

/**
@@ -231,6 +232,12 @@ class uORB::DeviceNode : public cdev::CDev, public ListNode<uORB::DeviceNode *>
*/
uint64_t copy_and_get_timestamp(void *dst, unsigned &generation);

// add item to list of work items to schedule on node update
bool register_callback(SubscriptionCallback *callback_sub);

// remove item from list of work items
void unregister_callback(SubscriptionCallback *callback_sub);

protected:

pollevent_t poll_state(cdev::file_t *filp) override;
@@ -269,6 +276,7 @@ class uORB::DeviceNode : public cdev::CDev, public ListNode<uORB::DeviceNode *>
uint8_t *_data{nullptr}; /**< allocated object buffer */
hrt_abstime _last_update{0}; /**< time the object was last updated */
volatile unsigned _generation{0}; /**< object generation count */
List<uORB::SubscriptionCallback *> _callbacks;
uint8_t _priority; /**< priority of the topic */
bool _published{false}; /**< has ever data been published */
uint8_t _queue_size; /**< maximum number of elements in the queue */
Loading

0 comments on commit 136962d

Please sign in to comment.