From 22ecbc3698eabd7fc754f37443df23a12312b463 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Thu, 21 Mar 2024 06:37:02 +0000 Subject: [PATCH 01/13] Introducing a stable API for changing the commission of a robot Signed-off-by: Michael X. Grey --- .../agv/RobotUpdateHandle.hpp | 51 +++++++++++ .../agv/FleetUpdateHandle.cpp | 2 +- .../rmf_fleet_adapter/agv/RobotContext.cpp | 15 ++-- .../rmf_fleet_adapter/agv/RobotContext.hpp | 16 ++-- .../agv/RobotUpdateHandle.cpp | 89 +++++++++++++++---- .../agv/internal_RobotUpdateHandle.hpp | 24 +++++ 6 files changed, 165 insertions(+), 32 deletions(-) diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp index 22da7b444..328af0b61 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp @@ -411,6 +411,54 @@ class RobotUpdateHandle /// If the robot is holding onto a session with a lift, release that session. void release_lift(); + /// A description of whether the robot should accept dispatched and/or direct + /// tasks. + class Commission + { + public: + /// Construct a Commission description with all default values. + /// - accept_dispatched_tasks: true + /// - accept_direct_tasks: true + Commission(); + + /// Construct a Commission description that accepts no tasks at all. + /// - accept_dispatch_tasks: false + /// - accept_direct_tasks: false + static Commission decommission(); + + /// Set whether this commission should accept dispatched tasks. + Commission& accept_dispatched_tasks(bool decision=true); + + /// Check whether this commission is accepting dispatched tasks. + bool is_accepting_dispatched_tasks() const; + + /// Set whether this commission should accept direct tasks + Commission& accept_direct_tasks(bool decision=true); + + /// Check whether this commission is accepting direct tasks. + bool is_accepting_direct_tasks() const; + + class Implementation; + private: + rmf_utils::impl_ptr _pimpl; + }; + + /// Set the current commission for the robot. + void set_commission(Commission commission); + + /// Get the current commission for the robot. If the robot has been dropped + /// from the fleet, this will return Commission::decommission(). + Commission commission() const; + + /// Tell the fleet adapter to reassign all the tasks that have been dispatched + /// to this robot. To prevent the tasks from being reassigned back to this + /// robot use .set_commission(Commission::decommission()) + /// + /// In the current implementation, tasks will only be reassigned to robots + /// in the same fleet that the task was originally assigned to. This behavior + /// could change in the future. + void reassign_dispatched_tasks(); + class Implementation; /// This API is experimental and will not be supported in the future. Users @@ -420,15 +468,18 @@ class RobotUpdateHandle public: /// True if this robot is allowed to accept new tasks. False if the robot /// will not accept any new tasks. + [[deprecated("Use commission instead")]] bool is_commissioned() const; /// Stop this robot from accepting any new tasks. It will continue to /// perform tasks that are already in its queue. To reassign those tasks, /// you will need to use the task request API to cancel the tasks and /// re-request them. + [[deprecated("Use set_commission instead")]] void decommission(); /// Allow this robot to resume accepting new tasks. + [[deprecated("Use set_commission instead")]] void recommission(); /// Get the schedule participant of this robot diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index b45c2dd7a..ba5e666ee 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -1492,7 +1492,7 @@ auto FleetUpdateHandle::Implementation::aggregate_expectations() const for (const auto& t : task_managers) { // Ignore any robots that are not currently commissioned. - if (!t.first->is_commissioned()) + if (!t.first->commission().is_accepting_dispatched_tasks()) continue; expect.states.push_back(t.second->expected_finish_state()); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp index 0abfd7165..91d94eae8 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp @@ -961,21 +961,24 @@ std::shared_ptr RobotContext::task_manager() } //============================================================================== -bool RobotContext::is_commissioned() const +void RobotContext::set_commission(RobotUpdateHandle::Commission value) { - return _commissioned; + std::lock_guard lock(*_commission_mutex); + _commission = std::move(value); } //============================================================================== -void RobotContext::decommission() +const RobotUpdateHandle::Commission& RobotContext::commission() const { - _commissioned = false; + return _commission; } //============================================================================== -void RobotContext::recommission() +RobotUpdateHandle::Commission RobotContext::copy_commission() const { - _commissioned = true; + std::lock_guard lock(*_commission_mutex); + RobotUpdateHandle::Commission copy = _commission; + return copy; } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp index 9c82be580..c91c4c4fd 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp @@ -632,13 +632,15 @@ class RobotContext /// Get the task manager for this robot, if it exists. std::shared_ptr task_manager(); - /// Return true if this robot is currently commissioned (available to accept - /// new tasks). - bool is_commissioned() const; + /// Set the commission for this robot + void set_commission(RobotUpdateHandle::Commission value); - void decommission(); + /// Get a reference to the robot's commission. + const RobotUpdateHandle::Commission& commission() const; - void recommission(); + /// Lock the commission_mutex and return a copy of the robot's current + /// commission. + RobotUpdateHandle::Commission copy_commission() const; Reporting& reporting(); @@ -831,7 +833,9 @@ class RobotContext RobotUpdateHandle::Unstable::Watchdog _lift_watchdog; rmf_traffic::Duration _lift_rewait_duration = std::chrono::seconds(0); - bool _commissioned = true; + std::unique_ptr _commission_mutex = + std::make_unique(); + RobotUpdateHandle::Commission _commission; bool _emergency = false; EasyFullControl::LocalizationRequest _localize; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp index 46fbb44f1..a27b5d696 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp @@ -720,6 +720,69 @@ void RobotUpdateHandle::release_lift() }); } +//============================================================================== +class RobotUpdateHandle::Commission::Implementation +{ +public: + bool is_accepting_dispatched_tasks = true; + bool is_accepting_direct_tasks = true; +}; + +//============================================================================== +RobotUpdateHandle::Commission::Commission() + : _pimpl(rmf_utils::make_impl()) +{ + // Do nothing +} + +//============================================================================== +auto RobotUpdateHandle::Commission::decommission() -> Commission +{ + return Commission() + .accept_dispatched_tasks(false) + .accept_direct_tasks(false); +} + +//============================================================================== +auto RobotUpdateHandle::Commission::accept_dispatched_tasks(bool decision) +-> Commission& +{ + _pimpl->is_accepting_dispatched_tasks = decision; + return *this; +} + +//============================================================================== +bool RobotUpdateHandle::Commission::is_accepting_dispatched_tasks() const +{ + return _pimpl->is_accepting_dispatched_tasks; +} + +//============================================================================== +auto RobotUpdateHandle::Commission::accept_direct_tasks(bool decision) +-> Commission& +{ + _pimpl->is_accepting_direct_tasks = decision; + return *this; +} + +//============================================================================== +bool RobotUpdateHandle::Commission::is_accepting_direct_tasks() const +{ + return _pimpl->is_accepting_direct_tasks; +} + +//============================================================================== +void RobotUpdateHandle::set_commission(Commission commission) +{ + _pimpl->set_commission(std::move(commission)); +} + +//============================================================================== +auto RobotUpdateHandle::commission() const -> Commission +{ + return _pimpl->commission(); +} + //============================================================================== RobotUpdateHandle::RobotUpdateHandle() { @@ -742,7 +805,7 @@ const RobotUpdateHandle::Unstable& RobotUpdateHandle::unstable() const bool RobotUpdateHandle::Unstable::is_commissioned() const { if (const auto context = _pimpl->get_context()) - return context->is_commissioned(); + return context->copy_commission().is_accepting_dispatched_tasks(); return false; } @@ -750,29 +813,17 @@ bool RobotUpdateHandle::Unstable::is_commissioned() const //============================================================================== void RobotUpdateHandle::Unstable::decommission() { - if (const auto context = _pimpl->get_context()) - { - context->worker().schedule( - [w = context->weak_from_this()](const auto&) - { - if (const auto context = w.lock()) - context->decommission(); - }); - } + _pimpl->set_commission( + _pimpl->commission() + .accept_dispatched_tasks(false)); } //============================================================================== void RobotUpdateHandle::Unstable::recommission() { - if (const auto context = _pimpl->get_context()) - { - context->worker().schedule( - [w = context->weak_from_this()](const auto&) - { - if (const auto context = w.lock()) - context->recommission(); - }); - } + _pimpl->set_commission( + _pimpl->commission() + .accept_dispatched_tasks(true)); } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp index b95dc85dc..19c0be3fc 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp @@ -299,6 +299,30 @@ class RobotUpdateHandle::Implementation return *handle._pimpl; } + void set_commission(Commission commission) + { + if (const auto context = get_context()) + { + context->worker().schedule( + [w = context->weak_from_this(), commission = std::move(commission)]( + const auto&) + { + if (const auto context = w.lock()) + context->set_commission(commission); + }); + } + } + + Commission commission() const + { + if (const auto context = get_context()) + { + return context->copy_commission(); + } + + return Commission::decommission(); + } + std::shared_ptr get_context(); std::shared_ptr get_context() const; From d4f11909d943a2d4d08f2381cbc7ee21f8e81849 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Sun, 24 Mar 2024 12:13:13 +0000 Subject: [PATCH 02/13] Respect commission settings for direct tasks and idle behavior Signed-off-by: Michael X. Grey --- .../rmf_fleet_adapter/agv/RobotUpdateHandle.hpp | 10 ++++++++++ .../src/rmf_fleet_adapter/TaskManager.cpp | 14 ++++++++++++++ .../rmf_fleet_adapter/agv/RobotUpdateHandle.cpp | 17 ++++++++++++++++- 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp index 328af0b61..e4e604362 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp @@ -419,11 +419,13 @@ class RobotUpdateHandle /// Construct a Commission description with all default values. /// - accept_dispatched_tasks: true /// - accept_direct_tasks: true + /// - is_performing_idle_behavior: true Commission(); /// Construct a Commission description that accepts no tasks at all. /// - accept_dispatch_tasks: false /// - accept_direct_tasks: false + /// - is_performing_idle_behavior: false static Commission decommission(); /// Set whether this commission should accept dispatched tasks. @@ -438,6 +440,14 @@ class RobotUpdateHandle /// Check whether this commission is accepting direct tasks. bool is_accepting_direct_tasks() const; + /// Set whether this commission should perform idle behaviors (formerly + /// referred to as "finishing tasks"). + Commission& perform_idle_behavior(bool decision=true); + + /// Check whether this commission is performing idle behaviors (formerly + /// referred to as "finishing tasks"). + bool is_performing_idle_behavior() const; + class Implementation; private: rmf_utils::impl_ptr _pimpl; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index c15c7f382..f730ed048 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -1018,6 +1018,13 @@ nlohmann::json TaskManager::submit_direct_request( const auto& fleet = _context->group(); const auto& robot = _context->name(); + if (!_context->commission().is_accepting_direct_tasks()) + { + return _make_error_response( + 20, "Uncommissioned", "The robot [" + robot + "] in fleet [" + + fleet + "] is not commissioned to perform direct tasks."); + } + const auto& impl = agv::FleetUpdateHandle::Implementation::get(*fleet_handle); std::vector errors; @@ -1490,6 +1497,13 @@ std::function TaskManager::_robot_interruption_callback() //============================================================================== void TaskManager::_begin_waiting() { + if (!_context->commission().is_performing_idle_behavior()) + { + // This robot is not supposed to perform its idle behavior, so we + // immediately from here. + return; + } + if (_idle_task) { const auto request = _idle_task->make_request(_context->make_get_state()()); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp index a27b5d696..8ad0fa2bc 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp @@ -726,6 +726,7 @@ class RobotUpdateHandle::Commission::Implementation public: bool is_accepting_dispatched_tasks = true; bool is_accepting_direct_tasks = true; + bool is_performing_idle_behavior = true; }; //============================================================================== @@ -740,7 +741,8 @@ auto RobotUpdateHandle::Commission::decommission() -> Commission { return Commission() .accept_dispatched_tasks(false) - .accept_direct_tasks(false); + .accept_direct_tasks(false) + .perform_idle_behavior(false); } //============================================================================== @@ -771,6 +773,19 @@ bool RobotUpdateHandle::Commission::is_accepting_direct_tasks() const return _pimpl->is_accepting_direct_tasks; } +//============================================================================== +auto RobotUpdateHandle::Commission::perform_idle_behavior(bool decision) +{ + _pimpl->is_performing_idle_behavior = decision; + return *this; +} + +//============================================================================== +bool RobotUpdateHandle::Commission::is_performing_idle_behavior() const +{ + return _pimpl->is_performing_idle_behavior; +} + //============================================================================== void RobotUpdateHandle::set_commission(Commission commission) { From b5b99d73ec9cc02520f4cb649060b970d07af0e6 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Mon, 25 Mar 2024 15:05:33 +0000 Subject: [PATCH 03/13] Implement task reassignment within a fleet Signed-off-by: Michael X. Grey --- .../src/rmf_fleet_adapter/TaskManager.cpp | 16 +- .../agv/FleetUpdateHandle.cpp | 411 +++++++++++++----- .../agv/RobotUpdateHandle.cpp | 1 + .../agv/internal_FleetUpdateHandle.hpp | 35 +- 4 files changed, 336 insertions(+), 127 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index f730ed048..073d70686 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -802,17 +802,7 @@ bool TaskManager::cancel_task_if_present(const std::string& task_id) return true; } - std::lock_guard lock(_mutex); - for (auto it = _queue.begin(); it != _queue.end(); ++it) - { - if (it->request()->booking()->id() == task_id) - { - _queue.erase(it); - return true; - } - } - - return false; + return _cancel_task_from_dispatch_queue(task_id, {"DispatchRequest"}); } //============================================================================== @@ -1997,12 +1987,16 @@ bool TaskManager::_cancel_task_from_dispatch_queue( const std::string& task_id, const std::vector& labels) { + std::lock_guard lock(_mutex); for (auto it = _queue.begin(); it != _queue.end(); ++it) { if (it->request()->booking()->id() == task_id) { _publish_canceled_pending_task(*it, labels); _queue.erase(it); + + // Count this as an executed task so we don't lose track of its existence + _register_executed_task(task_id); return true; } } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index ba5e666ee..4d457fc52 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -333,10 +333,18 @@ class AllocateTasks : public std::enable_shared_from_this expect.states.size(), expect.pending_requests.size()); + std::unordered_map robot_indexes; + std::vector states; + for (const auto& [context, state] : expect.states) + { + robot_indexes[states.size()] = context; + states.push_back(state); + } + // Generate new task assignments const auto result = task_planner.plan( rmf_traffic_ros2::convert(node->now()), - expect.states, + states, expect.pending_requests); auto assignments_ptr = std::get_if< @@ -387,7 +395,22 @@ class AllocateTasks : public std::enable_shared_from_this return std::nullopt; } - const auto assignments = *assignments_ptr; + TaskAssignments assignments; + for (std::size_t i=0; i < assignments_ptr->size(); ++i) + { + const auto r_it = robot_indexes.find(i); + if (r_it == robot_indexes.end()) + { + RCLCPP_ERROR( + node->get_logger(), + "[AllocateTasks] Unable to find a robot associated with index [%d]", + i); + continue; + } + + const auto context = r_it->second; + assignments[context] = (*assignments_ptr)[i]; + } if (assignments.empty()) { @@ -534,15 +557,15 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( const auto& assignments = allocation_result.value(); - const double cost = self->_pimpl->task_planner->compute_cost(assignments); + const double cost = self->_pimpl->compute_cost(assignments); // Display computed assignments for debugging std::stringstream debug_stream; debug_stream << "Cost: " << cost << std::endl; - for (std::size_t i = 0; i < assignments.size(); ++i) + for (const auto& [context, queue] : assignments) { - debug_stream << "--Agent: " << i << std::endl; - for (const auto& a : assignments[i]) + debug_stream << "--Agent: " << context->requester_id() << std::endl; + for (const auto& a : queue) { const auto& s = a.finish_state(); const double request_seconds = @@ -569,31 +592,19 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "%s", debug_stream.str().c_str()); - // Map robot index to name to populate robot_name in BidProposal - std::unordered_map robot_name_map; - std::size_t index = 0; - for (const auto& t : self->_pimpl->task_managers) - { - robot_name_map.insert({index, t.first->name()}); - ++index; - } - std::optional robot_name; std::optional finish_time; - index = 0; - for (const auto& agent : assignments) + for (const auto& [context, queue] : assignments) { - for (const auto& assignment : agent) + for (const auto& assignment : queue) { if (assignment.request()->booking()->id() == task_id) { finish_time = assignment.finish_state().time().value(); - if (robot_name_map.find(index) != robot_name_map.end()) - robot_name = robot_name_map[index]; + robot_name = context->name(); break; } } - ++index; } if (!robot_name.has_value() || !finish_time.has_value()) @@ -653,6 +664,7 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( dispatch_ack.dispatch_id = msg->dispatch_id; if (msg->type == DispatchCmdMsg::TYPE_AWARD) { + last_bid_assignment = task_id; const auto task_it = bid_notice_assignments.find(task_id); if (task_it == bid_notice_assignments.end()) { @@ -708,23 +720,6 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( auto assignments = std::move(task_it->second); bid_notice_assignments.erase(task_it); - if (assignments.size() != task_managers.size()) - { - // FIXME(MXG): This error mode seems like a problem with our - // implementation. If a robot is added during a bid process, we could - // trigger this error even though it shouldn't actually be a problem. - - std::string error_str = - "The number of available robots does not match that in the assignments " - "for task_id [" + task_id + "]. This request will be ignored."; - RCLCPP_ERROR(node->get_logger(), "%s", error_str.c_str()); - dispatch_ack.errors.push_back( - make_error_str(13, "Internal bug", std::move(error_str))); - - dispatch_ack_pub->publish(dispatch_ack); - return; - } - // Here we make sure none of the tasks in the assignments has already begun // execution. If so, we replan assignments until a valid set is obtained // and only then update the task manager queues @@ -732,9 +727,9 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( if (!valid_assignments) { rmf_task::ConstRequestPtr request; - for (const auto& a : assignments) + for (const auto& [_, queue] : assignments) { - for (const auto& r : a) + for (const auto& r : queue) { if (r.request()->booking()->id() == task_id) { @@ -761,42 +756,71 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( return; } - // TODO: This replanning is blocking the main thread. Instead, the - // replanning should run on a separate worker and then deliver the - // result back to the main worker. - const auto replan_results = AllocateTasks( - nullptr, aggregate_expectations(), *task_planner, node) - .run(dispatch_ack.errors); + unassigned_requests.push_back(request); + reassign_dispatched_tasks( + // On success + [dispatch_ack, dispatch_ack_pub = dispatch_ack_pub]() + { + auto msg = dispatch_ack; + msg.success = true; + dispatch_ack_pub->publish(dispatch_ack); + }, + // On failure + [dispatch_ack, w = weak_self, request]( + std::vector errors) + { + const auto self = w.lock(); + if (!self) + return; - if (!replan_results) - { - std::string error_str = - "Unable to replan assignments when accommodating task_id [" + task_id - + "]. This request will be ignored."; + auto r_it = std::remove( + self->_pimpl->unassigned_requests.begin(), + self->_pimpl->unassigned_requests.end(), + request); - RCLCPP_ERROR(node->get_logger(), "%s", error_str.c_str()); - dispatch_ack.errors.push_back( - make_error_str(9, "Not feasible", std::move(error_str))); - dispatch_ack_pub->publish(dispatch_ack); - return; - } + self->_pimpl->unassigned_requests.erase( + r_it, self->_pimpl->unassigned_requests.end()); - assignments = replan_results.value(); - // We do not need to re-check if assignments are valid as this function - // is being called by the ROS2 executor and is running on the main - // rxcpp worker. Hence, no new tasks would have started during this - // replanning. + std::string error_str = + "Unable to replan assignments when accommodating task_id [" + + request->booking()->id() + "]. Reasons:"; + if (errors.empty()) + { + error_str += " No reason given by planner."; + } + else + { + for (const auto& e : errors) + { + error_str += "\n -- " + e; + } + } + + RCLCPP_ERROR( + self->_pimpl->node->get_logger(), + "%s", + error_str.c_str()); + auto msg = dispatch_ack; + msg.success = false; + msg.errors.push_back( + make_error_str(9, "Not feasible", std::move(error_str))); + self->_pimpl->dispatch_ack_pub->publish(msg); + }); } else { - std::size_t index = 0; - for (auto& t : task_managers) + for (const auto& [context, queue] : assignments) { - t.second->set_queue(assignments[index]); - ++index; + context->task_manager()->set_queue(queue); } - current_assignment_cost = task_planner->compute_cost(assignments); + // Any unassigned requests would have been collected by the aggregator and + // given an assignment while calculating this bid. As long as + // is_valid_assignments was able to pass, then all tasks have been + // assigned to properly commissioned robots. + unassigned_requests.clear(); + + current_assignment_cost = compute_cost(assignments); dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); @@ -829,52 +853,39 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( if (task_was_found) { - // Re-plan assignments while ignoring request for task to be cancelled - std::vector errors; - const auto replan_results = AllocateTasks( - nullptr, aggregate_expectations(), *task_planner, node) - .run(errors); - - if (!replan_results.has_value()) - { - std::stringstream ss; - ss << "Unabled to replan assignments when cancelling task [" - << task_id << "]. "; - if (errors.empty()) + reassign_dispatched_tasks( + // On success + [task_id, name = name, node = node]() { - ss << "No planner error messages were provided."; - } - else + RCLCPP_INFO( + node->get_logger(), + "Task with task_id [%s] has successfully been cancelled. " + "TaskAssignments updated for robots in fleet [%s].", + task_id.c_str(), name.c_str()); + }, + // On failure + [task_id, name = name, node = node](std::vector errors) { - ss << "The following planner errors occurred:"; - for (const auto& e : errors) + std::stringstream ss; + ss << "Unabled to replan assignments when cancelling task [" + << task_id << "] for fleet [" << name << "]. "; + if (errors.empty()) { - const auto err = nlohmann::json::parse(e); - ss << "\n -- " << err["detail"].get(); + ss << "No planner error messages were provided."; } - } - ss << "\n"; - - RCLCPP_WARN(node->get_logger(), "%s", ss.str().c_str()); - } - else - { - const auto& assignments = replan_results.value(); - std::size_t index = 0; - for (auto& t : task_managers) - { - t.second->set_queue(assignments[index]); - ++index; - } - - current_assignment_cost = task_planner->compute_cost(assignments); + else + { + ss << "The following planner errors occurred:"; + for (const auto& e : errors) + { + const auto err = nlohmann::json::parse(e); + ss << "\n -- " << err["detail"].get(); + } + } + ss << "\n"; - RCLCPP_INFO( - node->get_logger(), - "Task with task_id [%s] has successfully been cancelled. TaskAssignments " - "updated for robots in fleet [%s].", - task_id.c_str(), name.c_str()); - } + RCLCPP_WARN(node->get_logger(), "%s", ss.str().c_str()); + }); } } @@ -892,9 +903,24 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( } } +//============================================================================== +double FleetUpdateHandle::Implementation::compute_cost( + const TaskAssignments& assignments) const +{ + rmf_task::TaskPlanner::Assignments raw_assignments; + raw_assignments.reserve(assignments.size()); + for (const auto [_, queue] : assignments) + { + raw_assignments.push_back(queue); + } + + return task_planner->compute_cost(raw_assignments); +} + //============================================================================== auto FleetUpdateHandle::Implementation::is_valid_assignments( - TaskAssignments& assignments) const -> bool + TaskAssignments& assignments, + std::string* report_error) const -> bool { std::unordered_set executed_tasks; for (const auto& [context, mgr] : task_managers) @@ -903,19 +929,171 @@ auto FleetUpdateHandle::Implementation::is_valid_assignments( executed_tasks.insert(tasks.begin(), tasks.end()); } - for (const auto& agent : assignments) + for (const auto& [context, queue] : assignments) { - for (const auto& a : agent) + for (const auto& a : queue) { if (executed_tasks.find(a.request()->booking()->id()) != executed_tasks.end()) + { + if (report_error) + { + *report_error = "task [" + a.request()->booking()->id() + + "] already began executing"; + } return false; + } + } + + if (!queue.empty()) + { + if (!context->commission().is_accepting_dispatched_tasks()) + { + if (report_error) + { + *report_error = "robot [" + context->name() + + "] has been decommissioned"; + } + // If any tasks were assigned to this robot after it has been + // decommissioned, then those were invalid assignments. + return false; + } + + if (task_managers.find(context) == task_managers.end()) + { + if (report_error) + { + *report_error = "robot [" + context->name() + + "] has been removed from the fleet"; + } + // This robot is no longer part of the fleet, so we cannot assign + // tasks to it. + return false; + } } } return true; } +//============================================================================== +void FleetUpdateHandle::Implementation::reassign_dispatched_tasks( + std::function on_success, + std::function)> on_failure) +{ + auto expectations = aggregate_expectations(); + if (expectations.states.empty() && !expectations.pending_requests.empty()) + { + // If there are no robots that can perform any tasks but there are pending + // requests, then we should immediately assume failure. + worker.schedule( + [on_failure](const auto&) + { + on_failure({"no more robots available"}); + }); + return; + } + + auto on_plan_received = [ + on_success, + on_failure, + w = weak_self, + initial_last_bid_assignment = last_bid_assignment + ](TaskAssignments assignments) + { + const auto self = w.lock(); + if (!self) + return; + + const bool new_task_came_in = + initial_last_bid_assignment != self->_pimpl->last_bid_assignment; + + std::string error; + const bool valid_assignments = + !new_task_came_in + && self->_pimpl->is_valid_assignments(assignments, &error); + + if (!valid_assignments) + { + // We need to replan because some important aspect of the planning + // problem has changed since we originally tried to reassign. + if (new_task_came_in) + { + RCLCPP_WARN( + self->_pimpl->node->get_logger(), + "Redoing task reassignment for fleet [%s] because a new task was " + "dispatched while the reassignment was being calculated.", + self->_pimpl->name.c_str()); + } + else + { + RCLCPP_WARN( + self->_pimpl->node->get_logger(), + "Redoing task reassignment for fleet [%s] because %s.", + self->_pimpl->name.c_str(), + error.c_str()); + } + + self->_pimpl->reassign_dispatched_tasks(on_success, on_failure); + return; + } + + for (const auto& [context, queue] : assignments) + { + context->task_manager()->set_queue(queue); + } + + // All requests should be assigned now, no matter which robot they were + // originally assigned to, so we can clear this buffer. + // + // As long as no new tasks were dispatched to this fleet (verified by the + // new_task_came_in check), any tasks that might have been previously + // unassigned will be accounted for in the current set of assignments. + self->_pimpl->unassigned_requests.clear(); + + self->_pimpl->current_assignment_cost = + self->_pimpl->compute_cost(assignments); + + on_success(); + }; + + reassignment_worker.schedule( + [ + on_plan_received, + on_failure, + expectations = std::move(expectations), + task_planner = *task_planner, + node = node, + worker = worker + ](const auto&) + { + std::vector errors; + const auto replan_results = AllocateTasks( + nullptr, expectations, task_planner, node) + .run(errors); + + if (replan_results.has_value()) + { + worker.schedule( + [ + assignments = std::move(*replan_results), + on_plan_received + ](const auto&) + { + on_plan_received(assignments); + }); + } + else + { + worker.schedule( + [errors = std::move(errors), on_failure](const auto&) + { + on_failure(errors); + }); + } + }); +} + //============================================================================== std::optional FleetUpdateHandle::Implementation:: get_nearest_charger( @@ -1495,12 +1673,21 @@ auto FleetUpdateHandle::Implementation::aggregate_expectations() const if (!t.first->commission().is_accepting_dispatched_tasks()) continue; - expect.states.push_back(t.second->expected_finish_state()); + expect.states.push_back( + ExpectedState { + t.first, + t.second->expected_finish_state() + }); const auto requests = t.second->requests(); expect.pending_requests.insert( expect.pending_requests.end(), requests.begin(), requests.end()); } + expect.pending_requests.insert( + expect.pending_requests.end(), + unassigned_requests.begin(), + unassigned_requests.end()); + return expect; } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp index 8ad0fa2bc..bee953468 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp @@ -775,6 +775,7 @@ bool RobotUpdateHandle::Commission::is_accepting_direct_tasks() const //============================================================================== auto RobotUpdateHandle::Commission::perform_idle_behavior(bool decision) +-> Commission& { _pimpl->is_performing_idle_behavior = decision; return *this; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index 22369f3ff..c1bca2830 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -233,16 +233,27 @@ class ParticipantFactoryRos2 : public ParticipantFactory rmf_traffic_ros2::schedule::WriterPtr _writer; }; +//============================================================================== +struct ExpectedState +{ + RobotContextPtr context; + rmf_task::State state; +}; + //============================================================================== struct Expectations { - std::vector states; + std::vector states; std::vector pending_requests; }; //============================================================================== -// Map task id to pair of -using TaskAssignments = rmf_task::TaskPlanner::Assignments; +// Map from the robot instance to its proposed assignment of tasks +using TaskAssignments = std::unordered_map< + RobotContextPtr, + std::vector>; + +// Forward declaration of a task allocation job class AllocateTasks; //============================================================================== @@ -330,6 +341,13 @@ class FleetUpdateHandle::Implementation // Map to store task id with assignments for BidNotice std::unordered_map bid_notice_assignments = {}; + // This is checked before and after the task reassignment procedure to ensure + // that no new task came in from the dispatcher while the reassignment was + // being calculated. + std::string last_bid_assignment; + std::vector unassigned_requests; + rxcpp::schedulers::worker reassignment_worker; + using BidNoticeMsg = rmf_task_msgs::msg::BidNotice; using DispatchCmdMsg = rmf_task_msgs::msg::DispatchCommand; @@ -617,6 +635,8 @@ class FleetUpdateHandle::Implementation void dispatch_command_cb(const DispatchCmdMsg::SharedPtr msg); + double compute_cost(const TaskAssignments& assignments) const; + std::optional get_nearest_charger( const rmf_traffic::agv::Planner::Start& start); @@ -624,7 +644,14 @@ class FleetUpdateHandle::Implementation /// Helper function to check if assignments are valid. An assignment set is /// invalid if one of the assignments has already begun execution. - bool is_valid_assignments(TaskAssignments& assignments) const; + bool is_valid_assignments( + TaskAssignments& assignments, + std::string* report_error = nullptr) const; + + void reassign_dispatched_tasks( + std::function on_success, + // argument is a vector of error messages from the planner + std::function)> on_failure); void publish_fleet_state_topic() const; From 541546c95e42767b73ddb7b334338782d09f630c Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 26 Mar 2024 12:12:00 +0000 Subject: [PATCH 04/13] Implement robot task reassignment API Signed-off-by: Michael X. Grey --- .../src/rmf_fleet_adapter/TaskManager.cpp | 113 +++++++++++++++++- .../src/rmf_fleet_adapter/TaskManager.hpp | 6 +- .../agv/FleetUpdateHandle.cpp | 2 +- .../rmf_fleet_adapter/agv/RobotContext.hpp | 4 + .../agv/RobotUpdateHandle.cpp | 15 +++ .../agv/internal_RobotUpdateHandle.hpp | 2 + 6 files changed, 138 insertions(+), 4 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 073d70686..0d5e1afdf 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -939,12 +939,12 @@ void TaskManager::set_queue( } //============================================================================== -const std::vector TaskManager::requests() const +std::vector TaskManager::dispatched_requests() const { using namespace rmf_task::requests; std::vector requests; - requests.reserve(_queue.size()); std::lock_guard lock(_mutex); + requests.reserve(_queue.size()); for (const auto& task : _queue) { if (task.request()->booking()->automatic()) @@ -957,6 +957,115 @@ const std::vector TaskManager::requests() const return requests; } +//============================================================================== +void TaskManager::reassign_dispatched_requests() +{ + std::vector assignments; + { + std::lock_guard lock(_mutex); + for (const auto& a : _queue) + { + if (a.request()->booking()->automatic()) + { + continue; + } + + assignments.push_back(a); + } + _queue.clear(); + } + + const auto fleet = _fleet_handle.lock(); + if (!fleet) + { + RCLCPP_ERROR( + _context->node()->get_logger(), + "Attempting to reassign tasks for [%s] but its fleet is shutting down", + _context->requester_id().c_str()); + return; + } + + auto& fleet_impl = agv::FleetUpdateHandle::Implementation::get(*fleet); + auto& unassigned = fleet_impl.unassigned_requests; + for (const auto& a : assignments) + { + unassigned.push_back(a.request()); + } + + fleet_impl.reassign_dispatched_tasks( + [name = _context->requester_id(), node = _context->node()]() + { + RCLCPP_INFO( + node->get_logger(), + "Successfully reassigned tasks for [%s]", + name.c_str()); + }, + [ + name = _context->requester_id(), + node = _context->node(), + assignments, + fleet, + self = shared_from_this() + ](std::vector errors) + { + std::stringstream ss_errors; + if (errors.empty()) + { + ss_errors << " no reason given"; + } + else + { + for (auto&& e : errors) + { + ss_errors << "\n -- " << e; + } + } + + std::stringstream ss_requests; + if (assignments.empty()) + { + ss_requests << "No tasks were assigned to the robot."; + } + else + { + ss_requests << "The following tasks will be canceled:"; + for (const auto& a : assignments) + { + ss_requests << "\n -- " << a.request()->booking()->id(); + } + } + + RCLCPP_ERROR( + node->get_logger(), + "Failed to reassign tasks for [%s]. %s\nReasons for failure:%s", + ss_requests.str().c_str(), + ss_errors.str().c_str()); + + auto& fleet_impl = agv::FleetUpdateHandle::Implementation::get(*fleet); + auto& unassigned = fleet_impl.unassigned_requests; + const auto r_it = std::remove_if( + unassigned.begin(), + unassigned.end(), + [&assignments](const auto& r) + { + return std::find_if( + assignments.begin(), + assignments.end(), + [r](const auto& a) + { + return a.request() == r; + }) != assignments.end(); + }); + unassigned.erase(r_it, unassigned.end()); + + for (const auto& a : assignments) + { + self->_publish_canceled_pending_task(a, {"Failure to reassign"}); + } + } + ); +} + //============================================================================== TaskManager::RobotModeMsg TaskManager::robot_mode() const { diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 575da4697..22d6a78fb 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -118,7 +118,11 @@ class TaskManager : public std::enable_shared_from_this void set_queue(const std::vector& assignments); /// Get the non-charging requests among pending tasks - const std::vector requests() const; + std::vector dispatched_requests() const; + + /// Send all dispatched requests that are still in the queue back to the fleet + /// handle for reassignment. + void reassign_dispatched_requests(); std::optional current_task_id() const; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 4d457fc52..9ce9ad69a 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -1678,7 +1678,7 @@ auto FleetUpdateHandle::Implementation::aggregate_expectations() const t.first, t.second->expected_finish_state() }); - const auto requests = t.second->requests(); + const auto requests = t.second->dispatched_requests(); expect.pending_requests.insert( expect.pending_requests.end(), requests.begin(), requests.end()); } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp index c91c4c4fd..65d84e03f 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp @@ -53,6 +53,7 @@ class TaskManager; namespace agv { +class FleetUpdateHandle; class RobotContext; using TransformDictionary = std::unordered_map; using SharedPlanner = std::shared_ptr< @@ -642,6 +643,9 @@ class RobotContext /// commission. RobotUpdateHandle::Commission copy_commission() const; + /// Reassign the tasks that have been dispatched for this robot + void reassign_dispatched_tasks(); + Reporting& reporting(); const Reporting& reporting() const; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp index bee953468..ce23ef6db 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp @@ -799,6 +799,21 @@ auto RobotUpdateHandle::commission() const -> Commission return _pimpl->commission(); } +//============================================================================== +void RobotUpdateHandle::reassign_dispatched_tasks() +{ + if (const auto context = _pimpl->get_context()) + { + context->worker().schedule( + [context](const auto&) + { + const auto mgr = context->task_manager(); + if (mgr) + mgr->reassign_dispatched_requests(); + }); + } +} + //============================================================================== RobotUpdateHandle::RobotUpdateHandle() { diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp index 19c0be3fc..7e6bc133d 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp @@ -308,7 +308,9 @@ class RobotUpdateHandle::Implementation const auto&) { if (const auto context = w.lock()) + { context->set_commission(commission); + } }); } } From 41275922fcfb8a51201ed7bbbe93ef1441ee6498 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 26 Mar 2024 15:37:23 +0000 Subject: [PATCH 05/13] Accept commission requests through ApiRequest pipeline Signed-off-by: Michael X. Grey --- .../src/rmf_fleet_adapter/TaskManager.cpp | 104 +++++++++++++++++- .../src/rmf_fleet_adapter/TaskManager.hpp | 8 +- .../agv/RobotUpdateHandle.cpp | 2 +- 3 files changed, 107 insertions(+), 7 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 0d5e1afdf..fb1b2e79c 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -64,6 +64,8 @@ #include #include #include +#include +#include namespace rmf_fleet_adapter { @@ -958,7 +960,9 @@ std::vector TaskManager::dispatched_requests() const } //============================================================================== -void TaskManager::reassign_dispatched_requests() +void TaskManager::reassign_dispatched_requests( + std::function on_success, + std::function)> on_failure) { std::vector assignments; { @@ -993,18 +997,20 @@ void TaskManager::reassign_dispatched_requests() } fleet_impl.reassign_dispatched_tasks( - [name = _context->requester_id(), node = _context->node()]() + [name = _context->requester_id(), node = _context->node(), on_success]() { RCLCPP_INFO( node->get_logger(), "Successfully reassigned tasks for [%s]", name.c_str()); + on_success(); }, [ name = _context->requester_id(), node = _context->node(), assignments, fleet, + on_failure, self = shared_from_this() ](std::vector errors) { @@ -1062,8 +1068,9 @@ void TaskManager::reassign_dispatched_requests() { self->_publish_canceled_pending_task(a, {"Failure to reassign"}); } - } - ); + + on_failure(errors); + }); } //============================================================================== @@ -1149,7 +1156,11 @@ nlohmann::json TaskManager::submit_direct_request( } catch (const std::exception&) { - json_errors.push_back(e); + nlohmann::json error; + error["code"] = 42; + error["category"] = "unknown"; + error["detail"] = e; + json_errors.push_back(error); } } response_json["errors"] = std::move(json_errors); @@ -2387,6 +2398,8 @@ void TaskManager::_handle_request( _handle_undo_skip_phase_request(request_json, request_id); else if (type_str == "robot_task_request") _handle_direct_request(request_json, request_id); + else if (type_str == "robot_commission_request") + _handle_commission_request(request_json, request_id); else return; } @@ -2425,6 +2438,87 @@ void TaskManager::_handle_direct_request( _validate_and_publish_api_response(response, response_validator, request_id); } +//============================================================================== +void TaskManager::_handle_commission_request( + const nlohmann::json& request_json, + const std::string& request_id) +{ + static const auto request_validator = + std::make_shared( + _make_validator(rmf_api_msgs::schemas::robot_commission_request)); + + static const auto response_validator = + std::make_shared( + _make_validator(rmf_api_msgs::schemas::robot_commission_response)); + + agv::RobotUpdateHandle::Commission commission = _context->commission(); + const nlohmann::json& commission_json = request_json["commission"]; + const auto dispatch_it = commission_json.find("dispatch_tasks"); + if (dispatch_it != commission_json.end()) + { + commission.accept_dispatched_tasks(dispatch_it->get()); + } + + const auto direct_it = commission_json.find("direct_tasks"); + if (direct_it != commission_json.end()) + { + commission.accept_direct_tasks(direct_it->get()); + } + + const auto idle_it = commission_json.find("idle_behavior"); + if (idle_it != commission_json.end()) + { + commission.perform_idle_behavior(idle_it->get()); + } + + const auto reassign_it = request_json.find("reassign_tasks"); + if (reassign_it == request_json.end() || reassign_it->get()) + { + reassign_dispatched_requests( + [ + request_id, + response_validator = response_validator, + self = shared_from_this() + ]() + { + nlohmann::json response_json; + response_json["success"] = true; + self->_validate_and_publish_api_response( + response_json, *response_validator, request_id); + }, + [ + request_id, + response_validator = response_validator, + self = shared_from_this() + ](std::vector errors) + { + nlohmann::json response_json; + response_json["success"] = false; + std::vector errors_json; + for (const auto& e : errors) + { + nlohmann::json error; + error["code"] = 21; + error["category"] = "planner"; + error["detail"] = e; + errors_json.push_back(error); + } + response_json["errors"] = errors_json; + self->_validate_and_publish_api_response( + response_json, *response_validator, request_id); + }); + } + else + { + // No task reassignment is needed, so we can simply return success right + // away. + nlohmann::json response_json; + response_json["success"] = true; + _validate_and_publish_api_response( + response_json, *response_validator, request_id); + } +} + //============================================================================== void TaskManager::_handle_cancel_request( const nlohmann::json& request_json, diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 22d6a78fb..1779d3c5a 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -122,7 +122,9 @@ class TaskManager : public std::enable_shared_from_this /// Send all dispatched requests that are still in the queue back to the fleet /// handle for reassignment. - void reassign_dispatched_requests(); + void reassign_dispatched_requests( + std::function on_success, + std::function)> on_failure); std::optional current_task_id() const; @@ -543,6 +545,10 @@ class TaskManager : public std::enable_shared_from_this const nlohmann::json& request_json, const std::string& request_id); + void _handle_commission_request( + const nlohmann::json& request_json, + const std::string& request_id); + void _handle_cancel_request( const nlohmann::json& request_json, const std::string& request_id); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp index ce23ef6db..9fa0e6380 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp @@ -809,7 +809,7 @@ void RobotUpdateHandle::reassign_dispatched_tasks() { const auto mgr = context->task_manager(); if (mgr) - mgr->reassign_dispatched_requests(); + mgr->reassign_dispatched_requests([](){}, [](auto){}); }); } } From 12b53c81897d35c9251589837973e7a443c526e3 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Wed, 27 Mar 2024 06:54:37 +0000 Subject: [PATCH 06/13] Include commission information in robot state Signed-off-by: Michael X. Grey --- .../src/rmf_fleet_adapter/TaskManager.cpp | 6 +++++- .../src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp | 10 ++++++++++ .../agv/internal_FleetUpdateHandle.hpp | 4 +++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index fb1b2e79c..3f4dd146b 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -64,6 +64,7 @@ #include #include #include +#include #include #include @@ -227,7 +228,10 @@ TaskManagerPtr TaskManager::make( rmf_api_msgs::schemas::skip_phase_response, rmf_api_msgs::schemas::task_request, rmf_api_msgs::schemas::undo_skip_phase_request, - rmf_api_msgs::schemas::undo_skip_phase_response + rmf_api_msgs::schemas::undo_skip_phase_response, + rmf_api_msgs::schemas::commission, + rmf_api_msgs::schemas::robot_commission_request, + rmf_api_msgs::schemas::robot_commission_response, }; for (const auto& schema : schemas) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 9ce9ad69a..1f279b1f9 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -1268,6 +1268,16 @@ void FleetUpdateHandle::Implementation::update_fleet_state() const issue_msg["detail"] = issue->detail; issues_msg.push_back(std::move(issue_msg)); } + + const auto& commission = context->commission(); + nlohmann::json commission_json; + commission_json["dispatch_tasks"] = + commission.is_accepting_dispatched_tasks(); + commission_json["direct_tasks"] = commission.is_accepting_direct_tasks(); + commission_json["idle_behavior"] = + commission.is_performing_idle_behavior(); + + json["commission"] = commission_json; } try diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index c1bca2830..af7af03c4 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -68,6 +68,7 @@ #include #include #include +#include #include @@ -497,7 +498,8 @@ class FleetUpdateHandle::Implementation rmf_api_msgs::schemas::robot_state, rmf_api_msgs::schemas::location_2D, rmf_api_msgs::schemas::fleet_log, - rmf_api_msgs::schemas::log_entry + rmf_api_msgs::schemas::log_entry, + rmf_api_msgs::schemas::commission, }; for (const auto& schema : schemas) From 68e6fc8b4e68245f4a1a84570659e033ed97bd49 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Wed, 27 Mar 2024 17:47:28 +0000 Subject: [PATCH 07/13] Fix commision ApiRequest handling Signed-off-by: Michael X. Grey --- .../src/rmf_fleet_adapter/TaskManager.cpp | 140 ++++++++++++++---- .../src/rmf_fleet_adapter/TaskManager.hpp | 6 + 2 files changed, 114 insertions(+), 32 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 3f4dd146b..50777166e 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -968,21 +968,7 @@ void TaskManager::reassign_dispatched_requests( std::function on_success, std::function)> on_failure) { - std::vector assignments; - { - std::lock_guard lock(_mutex); - for (const auto& a : _queue) - { - if (a.request()->booking()->automatic()) - { - continue; - } - - assignments.push_back(a); - } - _queue.clear(); - } - + std::vector assignments = _drain_dispatched_assignments(); const auto fleet = _fleet_handle.lock(); if (!fleet) { @@ -1071,6 +1057,7 @@ void TaskManager::reassign_dispatched_requests( for (const auto& a : assignments) { self->_publish_canceled_pending_task(a, {"Failure to reassign"}); + self->_register_executed_task(a.request()->booking()->id()); } on_failure(errors); @@ -2106,6 +2093,42 @@ void TaskManager::_publish_canceled_pending_task( _validate_and_publish_websocket(task_state_update, validator); } +//============================================================================== +auto TaskManager::_drain_dispatched_assignments() -> std::vector +{ + std::vector assignments; + std::lock_guard lock(_mutex); + for (const auto& a : _queue) + { + if (a.request()->booking()->automatic()) + { + continue; + } + + assignments.push_back(a); + } + _queue.clear(); + + return assignments; +} + +//============================================================================== +auto TaskManager::_drain_direct_assignments() -> std::vector +{ + std::vector assignments; + std::lock_guard lock(_mutex); + for (const auto& a : _direct_queue) + { + if (a.assignment.request()->booking()->automatic()) + { + continue; + } + + assignments.push_back(a.assignment); + } + _direct_queue.clear(); +} + //============================================================================== bool TaskManager::_cancel_task_from_dispatch_queue( const std::string& task_id, @@ -2429,12 +2452,12 @@ void TaskManager::_handle_direct_request( if (!_validate_request_message(request_json, request_validator, request_id)) return; - const auto& robot = request_json["robot"].get(); - if (robot.empty() || robot != _context->name()) + const auto& fleet = request_json["fleet"].get(); + if (fleet != _context->group()) return; - const auto& fleet = request_json["fleet"].get(); - if (fleet.empty() || fleet != _context->group()) + const auto& robot = request_json["robot"].get(); + if (robot != _context->name()) return; const nlohmann::json& request = request_json["request"]; @@ -2442,19 +2465,39 @@ void TaskManager::_handle_direct_request( _validate_and_publish_api_response(response, response_validator, request_id); } +//============================================================================== +namespace { +nlohmann::json simple_success_json() +{ + nlohmann::json successful_result; + successful_result["success"] = true; + return successful_result; +} +} // anonymous namespace + //============================================================================== void TaskManager::_handle_commission_request( const nlohmann::json& request_json, const std::string& request_id) { static const auto request_validator = - std::make_shared( - _make_validator(rmf_api_msgs::schemas::robot_commission_request)); + _make_validator(rmf_api_msgs::schemas::robot_commission_request); static const auto response_validator = std::make_shared( _make_validator(rmf_api_msgs::schemas::robot_commission_response)); + if (!_validate_request_message(request_json, request_validator, request_id)) + return; + + const auto& fleet = request_json["fleet"].get(); + if (fleet != _context->group()) + return; + + const auto& robot = request_json["robot"].get(); + if (robot != _context->name()) + return; + agv::RobotUpdateHandle::Commission commission = _context->commission(); const nlohmann::json& commission_json = request_json["commission"]; const auto dispatch_it = commission_json.find("dispatch_tasks"); @@ -2475,29 +2518,53 @@ void TaskManager::_handle_commission_request( commission.perform_idle_behavior(idle_it->get()); } - const auto reassign_it = request_json.find("reassign_tasks"); - if (reassign_it == request_json.end() || reassign_it->get()) + nlohmann::json response_json; + response_json["commission"] = simple_success_json(); + + const auto direct_policy_it = + request_json.find("pending_direct_tasks_policy"); + if (direct_policy_it != request_json.end() + && direct_policy_it->get() == "cancel") + { + const auto assignments = _drain_direct_assignments(); + for (const auto& a : assignments) + { + _publish_canceled_pending_task( + a, {"Canceled by robot commission request [" + request_id + "]"}); + _register_executed_task(a.request()->booking()->id()); + } + } + + response_json["pending_direct_tasks_policy"] = simple_success_json(); + + const auto dispatch_policy_it = + request_json.find("pending_dispatch_tasks_policy"); + if (dispatch_policy_it == request_json.end() + || dispatch_policy_it->get() == "reassign") { reassign_dispatched_requests( [ request_id, response_validator = response_validator, + base_response_json = response_json, self = shared_from_this() ]() { - nlohmann::json response_json; - response_json["success"] = true; + nlohmann::json response_json = base_response_json; + response_json["pending_dispatch_tasks_policy"] = simple_success_json(); self->_validate_and_publish_api_response( response_json, *response_validator, request_id); }, [ request_id, response_validator = response_validator, + base_response_json = response_json, self = shared_from_this() ](std::vector errors) { - nlohmann::json response_json; - response_json["success"] = false; + nlohmann::json response_json = base_response_json; + nlohmann::json dispatch_json; + dispatch_json["success"] = false; std::vector errors_json; for (const auto& e : errors) { @@ -2507,17 +2574,26 @@ void TaskManager::_handle_commission_request( error["detail"] = e; errors_json.push_back(error); } - response_json["errors"] = errors_json; + dispatch_json["errors"] = errors_json; + response_json["pending_dispatch_tasks_policy"] = dispatch_json; self->_validate_and_publish_api_response( response_json, *response_validator, request_id); }); } else { - // No task reassignment is needed, so we can simply return success right - // away. - nlohmann::json response_json; - response_json["success"] = true; + if (dispatch_policy_it->get() == "cancel") + { + std::vector assignments = _drain_dispatched_assignments(); + for (const auto& a : assignments) + { + _publish_canceled_pending_task( + a, {"Canceled by robot commission request [" + request_id + "]"}); + _register_executed_task(a.request()->booking()->id()); + } + } + + response_json["pending_dispatch_tasks_policy"] = simple_success_json(); _validate_and_publish_api_response( response_json, *response_validator, request_id); } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 1779d3c5a..b9b30950a 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -443,6 +443,12 @@ class TaskManager : public std::enable_shared_from_this const Assignment& assignment, std::vector labels); + /// Take all dispatched assignments out of the queue, leaving the queue empty. + std::vector _drain_dispatched_assignments(); + + /// Take all direct assignments out of the queue, leaving the queue empty. + std::vector _drain_direct_assignments(); + /// Cancel a task that is in the dispatch queue. Returns false if the task /// was not present. bool _cancel_task_from_dispatch_queue( From e973234a42ac0f608bba675169f52daeee8cbcf2 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Wed, 27 Mar 2024 17:48:07 +0000 Subject: [PATCH 08/13] Fix initialization of reassignment_worker Signed-off-by: Michael X. Grey --- .../src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index af7af03c4..2e218e882 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -383,6 +383,9 @@ class FleetUpdateHandle::Implementation handle->_pimpl = rmf_utils::make_unique_impl( Implementation{handle, std::forward(args)...}); + handle->_pimpl->reassignment_worker = + rxcpp::schedulers::make_event_loop().create_worker(); + handle->_pimpl->add_standard_tasks(); handle->_pimpl->emergency_obs = From 561b9cea0b018d2a1e071ef62a7e8098b08286e9 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Wed, 27 Mar 2024 18:18:07 +0000 Subject: [PATCH 09/13] Remember to actually set the commission Signed-off-by: Michael X. Grey --- rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 50777166e..e736031c9 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -2518,6 +2518,8 @@ void TaskManager::_handle_commission_request( commission.perform_idle_behavior(idle_it->get()); } + _context->set_commission(commission); + nlohmann::json response_json; response_json["commission"] = simple_success_json(); From cf3eb4fb9c27eb8c899a30316c34a7dd8d04c162 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Thu, 28 Mar 2024 13:48:05 +0000 Subject: [PATCH 10/13] Cache pending task info so pending tasks can be canceled with full information Signed-off-by: Michael X. Grey --- .../src/rmf_fleet_adapter/TaskManager.cpp | 31 +++++++++++++++++-- .../src/rmf_fleet_adapter/TaskManager.hpp | 17 ++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index e736031c9..b4e67d16c 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -2009,6 +2009,8 @@ rmf_task::State TaskManager::_publish_pending_task( { const auto info = pending.request()->description()->generate_info( std::move(expected_state), parameters); + PendingInfo cache; + cache.info = info; nlohmann::json pending_json; const auto& booking = *pending.request()->booking(); @@ -2022,13 +2024,18 @@ rmf_task::State TaskManager::_publish_pending_task( if (pending.finish_state().time()) { - pending_json["unix_millis_finish_time"] = + PendingTimeInfo t; + t.unix_millis_finish_time = to_millis(pending.finish_state().time()->time_since_epoch()).count(); const auto estimate = pending.finish_state().time().value() - pending.deployment_time(); - pending_json["original_estimate_millis"] = - std::max(0l, to_millis(estimate).count()); + t.original_estimate_millis = std::max(0l, to_millis(estimate).count()); + + pending_json["unix_millis_finish_time"] = t.unix_millis_finish_time; + pending_json["original_estimate_millis"] = t.original_estimate_millis; + + cache.time = t; } copy_assignment(pending_json["assigned_to"], *_context); pending_json["status"] = "queued"; @@ -2041,6 +2048,7 @@ rmf_task::State TaskManager::_publish_pending_task( _validate_and_publish_websocket(task_state_update, validator); + _pending_task_info[pending.request()] = cache; return pending.finish_state(); } @@ -2049,6 +2057,7 @@ void TaskManager::_publish_task_queue() { rmf_task::State expected_state = _context->current_task_end_state(); const auto& parameters = *_context->task_parameters(); + _pending_task_info.clear(); for (const auto& pending : _direct_queue) { @@ -2072,6 +2081,20 @@ void TaskManager::_publish_canceled_pending_task( const auto& booking = *pending.request()->booking(); copy_booking_data(pending_json["booking"], booking); + const auto info_it = _pending_task_info.find(pending.request()); + if (info_it != _pending_task_info.end()) + { + const auto& cache = info_it->second; + pending_json["category"] = cache.info.category; + pending_json["detail"] = cache.info.detail; + if (cache.time.has_value()) + { + const auto& t = *cache.time; + pending_json["unix_millis_finish_time"] = t.unix_millis_finish_time; + pending_json["original_estimate_millis"] = t.original_estimate_millis; + } + } + pending_json["unix_millis_start_time"] = to_millis(pending.deployment_time().time_since_epoch()).count(); @@ -2127,6 +2150,8 @@ auto TaskManager::_drain_direct_assignments() -> std::vector assignments.push_back(a.assignment); } _direct_queue.clear(); + + return assignments; } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index b9b30950a..c8d0d4ea1 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -40,6 +40,18 @@ namespace rmf_fleet_adapter { +struct PendingTimeInfo +{ + int64_t unix_millis_finish_time; + int64_t original_estimate_millis; +}; + +struct PendingInfo +{ + rmf_task::Task::Description::Info info; + std::optional time; +}; + //============================================================================== /// This task manager is a first attempt at managing multiple tasks per fleet. /// This is a simple implementation that only makes a modest attempt at being @@ -326,6 +338,11 @@ class TaskManager : public std::enable_shared_from_this uint16_t _count_emergency_pullover = 0; // Queue for dispatched tasks std::vector _queue; + std::unordered_map< + rmf_task::ConstRequestPtr, + PendingInfo + > _pending_task_info; + // An ID to keep track of the FIFO order of direct tasks std::size_t _next_sequence_number; // Queue for directly assigned tasks From 4549852a2a2225ce62b9c5bece5eeb8412c43bee Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Fri, 29 Mar 2024 05:33:26 +0000 Subject: [PATCH 11/13] Use recursive mutex to avoid deadlocks Signed-off-by: Michael X. Grey --- .../src/rmf_fleet_adapter/TaskManager.cpp | 30 +++++++++---------- .../src/rmf_fleet_adapter/TaskManager.hpp | 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index b4e67d16c..fe4f5b88a 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -827,7 +827,7 @@ std::string TaskManager::robot_status() const //============================================================================== auto TaskManager::expected_finish_state() const -> State { - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); if (!_direct_queue.empty()) { return _direct_queue.rbegin()->assignment.finish_state(); @@ -877,7 +877,7 @@ void TaskManager::enable_responsive_wait(bool value) if (_responsive_wait_enabled) { - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); if (!_active_task && _queue.empty() && _direct_queue.empty() && !_waiting) { _begin_waiting(); @@ -892,7 +892,7 @@ void TaskManager::set_idle_task(rmf_task::ConstRequestFactoryPtr task) return; _idle_task = std::move(task); - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); if (!_active_task && _queue.empty() && _direct_queue.empty()) { _begin_waiting(); @@ -906,7 +906,7 @@ void TaskManager::set_queue( // We indent this block as _mutex is also locked in the _begin_next_task() // function that is called at the end of this function. { - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); // Do not remove automatic task if assignments is empty. See Issue #138 if (assignments.empty() && _queue.size() == 1 && @@ -949,7 +949,7 @@ std::vector TaskManager::dispatched_requests() const { using namespace rmf_task::requests; std::vector requests; - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); requests.reserve(_queue.size()); for (const auto& task : _queue) { @@ -1213,7 +1213,7 @@ nlohmann::json TaskManager::submit_direct_request( }; ++_next_sequence_number; { - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); _direct_queue.insert(assignment); } @@ -1329,7 +1329,7 @@ bool TaskManager::cancel_task( // TODO(YV): We could cache the task_ids of direct and dispatched tasks in // unordered_sets and perform a lookup to see which function to call. - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); if (_cancel_task_from_dispatch_queue(task_id, labels)) return true; @@ -1351,7 +1351,7 @@ bool TaskManager::kill_task( return true; } - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); if (_cancel_task_from_dispatch_queue(task_id, labels)) return true; @@ -1374,7 +1374,7 @@ void TaskManager::_begin_next_task() return; } - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); if (_queue.empty() && _direct_queue.empty()) { @@ -1754,7 +1754,7 @@ void TaskManager::retreat_to_charger() return; { - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); if (_active_task || !_queue.empty()) return; } @@ -1827,7 +1827,7 @@ void TaskManager::retreat_to_charger() charging_assignment}; ++_next_sequence_number; { - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); _direct_queue.insert(assignment); } @@ -2120,7 +2120,7 @@ void TaskManager::_publish_canceled_pending_task( auto TaskManager::_drain_dispatched_assignments() -> std::vector { std::vector assignments; - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); for (const auto& a : _queue) { if (a.request()->booking()->automatic()) @@ -2139,7 +2139,7 @@ auto TaskManager::_drain_dispatched_assignments() -> std::vector auto TaskManager::_drain_direct_assignments() -> std::vector { std::vector assignments; - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); for (const auto& a : _direct_queue) { if (a.assignment.request()->booking()->automatic()) @@ -2159,7 +2159,7 @@ bool TaskManager::_cancel_task_from_dispatch_queue( const std::string& task_id, const std::vector& labels) { - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); for (auto it = _queue.begin(); it != _queue.end(); ++it) { if (it->request()->booking()->id() == task_id) @@ -2294,7 +2294,7 @@ void TaskManager::_send_simple_error_if_queued( { // TODO(YV): We could cache the task_ids of direct and dispatched tasks in // unordered_sets and perform a lookup to see which queue to iterate. - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); for (const auto& a : _queue) { if (a.request()->booking()->id() == task_id) diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index c8d0d4ea1..7a409a7e7 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -361,7 +361,7 @@ class TaskManager : public std::enable_shared_from_this // TODO: Eliminate the need for a mutex by redesigning the use of the task // manager so that modifications of shared data only happen on designated // rxcpp worker - mutable std::mutex _mutex; + mutable std::recursive_mutex _mutex; rclcpp::TimerBase::SharedPtr _task_timer; rclcpp::TimerBase::SharedPtr _retreat_timer; rclcpp::TimerBase::SharedPtr _update_timer; From 021e692b6964bd5ad7b1a33221fbbfb208e48ecd Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Tue, 2 Apr 2024 08:21:12 +0000 Subject: [PATCH 12/13] Add python bindings for commission API Signed-off-by: Michael X. Grey --- rmf_fleet_adapter_python/src/adapter.cpp | 26 +++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/rmf_fleet_adapter_python/src/adapter.cpp b/rmf_fleet_adapter_python/src/adapter.cpp index 0381dd164..fc1bace85 100644 --- a/rmf_fleet_adapter_python/src/adapter.cpp +++ b/rmf_fleet_adapter_python/src/adapter.cpp @@ -61,6 +61,7 @@ using ActivityIdentifier = agv::RobotUpdateHandle::ActivityIdentifier; using ActionExecution = agv::RobotUpdateHandle::ActionExecution; using RobotInterruption = agv::RobotUpdateHandle::Interruption; using IssueTicket = agv::RobotUpdateHandle::IssueTicket; +using Commission = agv::RobotUpdateHandle::Commission; using Stubbornness = agv::RobotUpdateHandle::Unstable::Stubbornness; void bind_types(py::module&); @@ -291,7 +292,14 @@ PYBIND11_MODULE(rmf_adapter, m) { py::arg("text")) .def("enable_responsive_wait", &agv::RobotUpdateHandle::enable_responsive_wait, - py::arg("value")); + py::arg("value")) + .def("set_commission", + &agv::RobotUpdateHandle::set_commission, + py::arg("commission")) + .def("commission", + &agv::RobotUpdateHandle::commission) + .def("reassign_dispatched_tasks", + &agv::RobotUpdateHandle::reassign_dispatched_tasks); // ACTION EXECUTOR ======================================================= auto m_robot_update_handle = m.def_submodule("robot_update_handle"); @@ -354,6 +362,22 @@ PYBIND11_MODULE(rmf_adapter, m) { .value("Warning", agv::RobotUpdateHandle::Tier::Warning) .value("Error", agv::RobotUpdateHandle::Tier::Error); + // Commission ====================================================== + py::class_( + m_robot_update_handle, "Commission") + .def_property( + "accept_dispatched_tasks", + &Commission::is_accepting_dispatched_tasks, + &Commission::accept_dispatched_tasks) + .def_property( + "accept_direct_tasks", + &Commission::is_accepting_direct_tasks, + &Commission::accept_direct_tasks) + .def_property( + "perform_idle_behavior", + &Commission::is_performing_idle_behavior, + &Commission::perform_idle_behavior); + // Stubbornness ============================================================ py::class_( m_robot_update_handle, "Stubbornness") From 8ef2f551ae4634df3a8beb76803be4259a427676 Mon Sep 17 00:00:00 2001 From: "Michael X. Grey" Date: Mon, 8 Apr 2024 06:45:36 +0000 Subject: [PATCH 13/13] Fix style Signed-off-by: Michael X. Grey --- .../agv/RobotUpdateHandle.hpp | 6 +++--- .../src/rmf_fleet_adapter/TaskManager.cpp | 2 +- .../agv/FleetUpdateHandle.cpp | 18 +++++++++--------- .../agv/RobotUpdateHandle.cpp | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp index e4e604362..1a7514c34 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp @@ -429,20 +429,20 @@ class RobotUpdateHandle static Commission decommission(); /// Set whether this commission should accept dispatched tasks. - Commission& accept_dispatched_tasks(bool decision=true); + Commission& accept_dispatched_tasks(bool decision = true); /// Check whether this commission is accepting dispatched tasks. bool is_accepting_dispatched_tasks() const; /// Set whether this commission should accept direct tasks - Commission& accept_direct_tasks(bool decision=true); + Commission& accept_direct_tasks(bool decision = true); /// Check whether this commission is accepting direct tasks. bool is_accepting_direct_tasks() const; /// Set whether this commission should perform idle behaviors (formerly /// referred to as "finishing tasks"). - Commission& perform_idle_behavior(bool decision=true); + Commission& perform_idle_behavior(bool decision = true); /// Check whether this commission is performing idle behaviors (formerly /// referred to as "finishing tasks"). diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index f5263f7c0..78a50f5c3 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -2515,7 +2515,7 @@ void TaskManager::_handle_commission_request( static const auto response_validator = std::make_shared( - _make_validator(rmf_api_msgs::schemas::robot_commission_response)); + _make_validator(rmf_api_msgs::schemas::robot_commission_response)); if (!_validate_request_message(request_json, request_validator, request_id)) return; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index c5f2e6515..ebad11c93 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -403,7 +403,7 @@ class AllocateTasks : public std::enable_shared_from_this } TaskAssignments assignments; - for (std::size_t i=0; i < assignments_ptr->size(); ++i) + for (std::size_t i = 0; i < assignments_ptr->size(); ++i) { const auto r_it = robot_indexes.find(i); if (r_it == robot_indexes.end()) @@ -789,8 +789,8 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( r_it, self->_pimpl->unassigned_requests.end()); std::string error_str = - "Unable to replan assignments when accommodating task_id [" - + request->booking()->id() + "]. Reasons:"; + "Unable to replan assignments when accommodating task_id [" + + request->booking()->id() + "]. Reasons:"; if (errors.empty()) { error_str += " No reason given by planner."; @@ -875,7 +875,7 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( { std::stringstream ss; ss << "Unabled to replan assignments when cancelling task [" - << task_id << "] for fleet [" << name << "]. "; + << task_id << "] for fleet [" << name << "]. "; if (errors.empty()) { ss << "No planner error messages were provided."; @@ -1002,10 +1002,10 @@ void FleetUpdateHandle::Implementation::reassign_dispatched_tasks( } auto on_plan_received = [ - on_success, - on_failure, - w = weak_self, - initial_last_bid_assignment = last_bid_assignment + on_success, + on_failure, + w = weak_self, + initial_last_bid_assignment = last_bid_assignment ](TaskAssignments assignments) { const auto self = w.lock(); @@ -1077,7 +1077,7 @@ void FleetUpdateHandle::Implementation::reassign_dispatched_tasks( std::vector errors; const auto replan_results = AllocateTasks( nullptr, expectations, task_planner, node) - .run(errors); + .run(errors); if (replan_results.has_value()) { diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp index 9fa0e6380..68a46eca0 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp @@ -731,7 +731,7 @@ class RobotUpdateHandle::Commission::Implementation //============================================================================== RobotUpdateHandle::Commission::Commission() - : _pimpl(rmf_utils::make_impl()) +: _pimpl(rmf_utils::make_impl()) { // Do nothing } @@ -809,7 +809,7 @@ void RobotUpdateHandle::reassign_dispatched_tasks() { const auto mgr = context->task_manager(); if (mgr) - mgr->reassign_dispatched_requests([](){}, [](auto){}); + mgr->reassign_dispatched_requests([]() {}, [](auto) {}); }); } }