From 8a4b9e8a46ec947af0fc54c0853bd26fcd235878 Mon Sep 17 00:00:00 2001 From: Grey Date: Mon, 8 Apr 2024 15:13:23 +0800 Subject: [PATCH] Stabilize commissioning feature (#338) Signed-off-by: Michael X. Grey Signed-off-by: Arjo Chakravarty --- .../agv/RobotUpdateHandle.hpp | 61 +++ .../src/rmf_fleet_adapter/TaskManager.cpp | 384 ++++++++++++++-- .../src/rmf_fleet_adapter/TaskManager.hpp | 37 +- .../agv/FleetUpdateHandle.cpp | 425 +++++++++++++----- .../rmf_fleet_adapter/agv/RobotContext.cpp | 15 +- .../rmf_fleet_adapter/agv/RobotContext.hpp | 20 +- .../agv/RobotUpdateHandle.cpp | 120 ++++- .../agv/internal_FleetUpdateHandle.hpp | 42 +- .../agv/internal_RobotUpdateHandle.hpp | 26 ++ rmf_fleet_adapter_python/src/adapter.cpp | 26 +- 10 files changed, 970 insertions(+), 186 deletions(-) diff --git a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp index 22da7b444..1a7514c34 100644 --- a/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp +++ b/rmf_fleet_adapter/include/rmf_fleet_adapter/agv/RobotUpdateHandle.hpp @@ -411,6 +411,64 @@ class RobotUpdateHandle /// If the robot is holding onto a session with a lift, release that session. void release_lift(); + /// A description of whether the robot should accept dispatched and/or direct + /// tasks. + class Commission + { + public: + /// Construct a Commission description with all default values. + /// - accept_dispatched_tasks: true + /// - accept_direct_tasks: true + /// - is_performing_idle_behavior: true + Commission(); + + /// Construct a Commission description that accepts no tasks at all. + /// - accept_dispatch_tasks: false + /// - accept_direct_tasks: false + /// - is_performing_idle_behavior: false + static Commission decommission(); + + /// Set whether this commission should accept dispatched tasks. + Commission& accept_dispatched_tasks(bool decision = true); + + /// Check whether this commission is accepting dispatched tasks. + bool is_accepting_dispatched_tasks() const; + + /// Set whether this commission should accept direct tasks + Commission& accept_direct_tasks(bool decision = true); + + /// Check whether this commission is accepting direct tasks. + bool is_accepting_direct_tasks() const; + + /// Set whether this commission should perform idle behaviors (formerly + /// referred to as "finishing tasks"). + Commission& perform_idle_behavior(bool decision = true); + + /// Check whether this commission is performing idle behaviors (formerly + /// referred to as "finishing tasks"). + bool is_performing_idle_behavior() const; + + class Implementation; + private: + rmf_utils::impl_ptr _pimpl; + }; + + /// Set the current commission for the robot. + void set_commission(Commission commission); + + /// Get the current commission for the robot. If the robot has been dropped + /// from the fleet, this will return Commission::decommission(). + Commission commission() const; + + /// Tell the fleet adapter to reassign all the tasks that have been dispatched + /// to this robot. To prevent the tasks from being reassigned back to this + /// robot use .set_commission(Commission::decommission()) + /// + /// In the current implementation, tasks will only be reassigned to robots + /// in the same fleet that the task was originally assigned to. This behavior + /// could change in the future. + void reassign_dispatched_tasks(); + class Implementation; /// This API is experimental and will not be supported in the future. Users @@ -420,15 +478,18 @@ class RobotUpdateHandle public: /// True if this robot is allowed to accept new tasks. False if the robot /// will not accept any new tasks. + [[deprecated("Use commission instead")]] bool is_commissioned() const; /// Stop this robot from accepting any new tasks. It will continue to /// perform tasks that are already in its queue. To reassign those tasks, /// you will need to use the task request API to cancel the tasks and /// re-request them. + [[deprecated("Use set_commission instead")]] void decommission(); /// Allow this robot to resume accepting new tasks. + [[deprecated("Use set_commission instead")]] void recommission(); /// Get the schedule participant of this robot diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp index 8771e98c7..78a50f5c3 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.cpp @@ -64,6 +64,9 @@ #include #include #include +#include +#include +#include namespace rmf_fleet_adapter { @@ -225,7 +228,10 @@ TaskManagerPtr TaskManager::make( rmf_api_msgs::schemas::skip_phase_response, rmf_api_msgs::schemas::task_request, rmf_api_msgs::schemas::undo_skip_phase_request, - rmf_api_msgs::schemas::undo_skip_phase_response + rmf_api_msgs::schemas::undo_skip_phase_response, + rmf_api_msgs::schemas::commission, + rmf_api_msgs::schemas::robot_commission_request, + rmf_api_msgs::schemas::robot_commission_response, }; for (const auto& schema : schemas) @@ -807,17 +813,7 @@ bool TaskManager::cancel_task_if_present(const std::string& task_id) return true; } - std::lock_guard lock(_mutex); - for (auto it = _queue.begin(); it != _queue.end(); ++it) - { - if (it->request()->booking()->id() == task_id) - { - _queue.erase(it); - return true; - } - } - - return false; + return _cancel_task_from_dispatch_queue(task_id, {"DispatchRequest"}); } //============================================================================== @@ -836,7 +832,7 @@ std::string TaskManager::robot_status() const //============================================================================== auto TaskManager::expected_finish_state() const -> State { - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); if (!_direct_queue.empty()) { return _direct_queue.rbegin()->assignment.finish_state(); @@ -886,7 +882,7 @@ void TaskManager::enable_responsive_wait(bool value) if (_responsive_wait_enabled) { - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); if (!_active_task && _queue.empty() && _direct_queue.empty() && !_waiting) { _begin_waiting(); @@ -901,7 +897,7 @@ void TaskManager::set_idle_task(rmf_task::ConstRequestFactoryPtr task) return; _idle_task = std::move(task); - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); if (!_active_task && _queue.empty() && _direct_queue.empty()) { _begin_waiting(); @@ -915,7 +911,7 @@ void TaskManager::set_queue( // We indent this block as _mutex is also locked in the _begin_next_task() // function that is called at the end of this function. { - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); // Do not remove automatic task if assignments is empty. See Issue #138 if (assignments.empty() && _queue.size() == 1 && @@ -954,12 +950,12 @@ void TaskManager::set_queue( } //============================================================================== -const std::vector TaskManager::requests() const +std::vector TaskManager::dispatched_requests() const { using namespace rmf_task::requests; std::vector requests; + std::lock_guard lock(_mutex); requests.reserve(_queue.size()); - std::lock_guard lock(_mutex); for (const auto& task : _queue) { if (task.request()->booking()->automatic()) @@ -972,6 +968,107 @@ const std::vector TaskManager::requests() const return requests; } +//============================================================================== +void TaskManager::reassign_dispatched_requests( + std::function on_success, + std::function)> on_failure) +{ + std::vector assignments = _drain_dispatched_assignments(); + const auto fleet = _fleet_handle.lock(); + if (!fleet) + { + RCLCPP_ERROR( + _context->node()->get_logger(), + "Attempting to reassign tasks for [%s] but its fleet is shutting down", + _context->requester_id().c_str()); + return; + } + + auto& fleet_impl = agv::FleetUpdateHandle::Implementation::get(*fleet); + auto& unassigned = fleet_impl.unassigned_requests; + for (const auto& a : assignments) + { + unassigned.push_back(a.request()); + } + + fleet_impl.reassign_dispatched_tasks( + [name = _context->requester_id(), node = _context->node(), on_success]() + { + RCLCPP_INFO( + node->get_logger(), + "Successfully reassigned tasks for [%s]", + name.c_str()); + on_success(); + }, + [ + name = _context->requester_id(), + node = _context->node(), + assignments, + fleet, + on_failure, + self = shared_from_this() + ](std::vector errors) + { + std::stringstream ss_errors; + if (errors.empty()) + { + ss_errors << " no reason given"; + } + else + { + for (auto&& e : errors) + { + ss_errors << "\n -- " << e; + } + } + + std::stringstream ss_requests; + if (assignments.empty()) + { + ss_requests << "No tasks were assigned to the robot."; + } + else + { + ss_requests << "The following tasks will be canceled:"; + for (const auto& a : assignments) + { + ss_requests << "\n -- " << a.request()->booking()->id(); + } + } + + RCLCPP_ERROR( + node->get_logger(), + "Failed to reassign tasks for [%s]. %s\nReasons for failure:%s", + ss_requests.str().c_str(), + ss_errors.str().c_str()); + + auto& fleet_impl = agv::FleetUpdateHandle::Implementation::get(*fleet); + auto& unassigned = fleet_impl.unassigned_requests; + const auto r_it = std::remove_if( + unassigned.begin(), + unassigned.end(), + [&assignments](const auto& r) + { + return std::find_if( + assignments.begin(), + assignments.end(), + [r](const auto& a) + { + return a.request() == r; + }) != assignments.end(); + }); + unassigned.erase(r_it, unassigned.end()); + + for (const auto& a : assignments) + { + self->_publish_canceled_pending_task(a, {"Failure to reassign"}); + self->_register_executed_task(a.request()->booking()->id()); + } + + on_failure(errors); + }); +} + //============================================================================== TaskManager::RobotModeMsg TaskManager::robot_mode() const { @@ -1023,6 +1120,13 @@ nlohmann::json TaskManager::submit_direct_request( const auto& fleet = _context->group(); const auto& robot = _context->name(); + if (!_context->commission().is_accepting_direct_tasks()) + { + return _make_error_response( + 20, "Uncommissioned", "The robot [" + robot + "] in fleet [" + + fleet + "] is not commissioned to perform direct tasks."); + } + const auto& impl = agv::FleetUpdateHandle::Implementation::get(*fleet_handle); std::vector errors; @@ -1048,7 +1152,11 @@ nlohmann::json TaskManager::submit_direct_request( } catch (const std::exception&) { - json_errors.push_back(e); + nlohmann::json error; + error["code"] = 42; + error["category"] = "unknown"; + error["detail"] = e; + json_errors.push_back(error); } } response_json["errors"] = std::move(json_errors); @@ -1110,7 +1218,7 @@ nlohmann::json TaskManager::submit_direct_request( }; ++_next_sequence_number; { - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); _direct_queue.insert(assignment); } @@ -1226,7 +1334,7 @@ bool TaskManager::cancel_task( // TODO(YV): We could cache the task_ids of direct and dispatched tasks in // unordered_sets and perform a lookup to see which function to call. - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); if (_cancel_task_from_dispatch_queue(task_id, labels)) return true; @@ -1248,7 +1356,7 @@ bool TaskManager::kill_task( return true; } - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); if (_cancel_task_from_dispatch_queue(task_id, labels)) return true; @@ -1271,7 +1379,7 @@ void TaskManager::_begin_next_task() return; } - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); if (_queue.empty() && _direct_queue.empty()) { @@ -1495,6 +1603,13 @@ std::function TaskManager::_robot_interruption_callback() //============================================================================== void TaskManager::_begin_waiting() { + if (!_context->commission().is_performing_idle_behavior()) + { + // This robot is not supposed to perform its idle behavior, so we + // immediately from here. + return; + } + if (_idle_task) { const auto request = _idle_task->make_request(_context->make_get_state()()); @@ -1644,7 +1759,7 @@ void TaskManager::retreat_to_charger() return; { - std::lock_guard guard(_mutex); + std::lock_guard guard(_mutex); if (_active_task || !_queue.empty()) return; } @@ -1717,7 +1832,7 @@ void TaskManager::retreat_to_charger() charging_assignment}; ++_next_sequence_number; { - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); _direct_queue.insert(assignment); } @@ -1899,6 +2014,8 @@ rmf_task::State TaskManager::_publish_pending_task( { const auto info = pending.request()->description()->generate_info( std::move(expected_state), parameters); + PendingInfo cache; + cache.info = info; nlohmann::json pending_json; const auto& booking = *pending.request()->booking(); @@ -1912,13 +2029,18 @@ rmf_task::State TaskManager::_publish_pending_task( if (pending.finish_state().time()) { - pending_json["unix_millis_finish_time"] = + PendingTimeInfo t; + t.unix_millis_finish_time = to_millis(pending.finish_state().time()->time_since_epoch()).count(); const auto estimate = pending.finish_state().time().value() - pending.deployment_time(); - pending_json["original_estimate_millis"] = - std::max(0l, to_millis(estimate).count()); + t.original_estimate_millis = std::max(0l, to_millis(estimate).count()); + + pending_json["unix_millis_finish_time"] = t.unix_millis_finish_time; + pending_json["original_estimate_millis"] = t.original_estimate_millis; + + cache.time = t; } copy_assignment(pending_json["assigned_to"], *_context); pending_json["status"] = "queued"; @@ -1931,6 +2053,7 @@ rmf_task::State TaskManager::_publish_pending_task( _validate_and_publish_websocket(task_state_update, validator); + _pending_task_info[pending.request()] = cache; return pending.finish_state(); } @@ -1939,6 +2062,7 @@ void TaskManager::_publish_task_queue() { rmf_task::State expected_state = _context->current_task_end_state(); const auto& parameters = *_context->task_parameters(); + _pending_task_info.clear(); for (const auto& pending : _direct_queue) { @@ -1962,6 +2086,20 @@ void TaskManager::_publish_canceled_pending_task( const auto& booking = *pending.request()->booking(); copy_booking_data(pending_json["booking"], booking); + const auto info_it = _pending_task_info.find(pending.request()); + if (info_it != _pending_task_info.end()) + { + const auto& cache = info_it->second; + pending_json["category"] = cache.info.category; + pending_json["detail"] = cache.info.detail; + if (cache.time.has_value()) + { + const auto& t = *cache.time; + pending_json["unix_millis_finish_time"] = t.unix_millis_finish_time; + pending_json["original_estimate_millis"] = t.original_estimate_millis; + } + } + pending_json["unix_millis_start_time"] = to_millis(pending.deployment_time().time_since_epoch()).count(); @@ -1983,17 +2121,59 @@ void TaskManager::_publish_canceled_pending_task( _validate_and_publish_websocket(task_state_update, validator); } +//============================================================================== +auto TaskManager::_drain_dispatched_assignments() -> std::vector +{ + std::vector assignments; + std::lock_guard lock(_mutex); + for (const auto& a : _queue) + { + if (a.request()->booking()->automatic()) + { + continue; + } + + assignments.push_back(a); + } + _queue.clear(); + + return assignments; +} + +//============================================================================== +auto TaskManager::_drain_direct_assignments() -> std::vector +{ + std::vector assignments; + std::lock_guard lock(_mutex); + for (const auto& a : _direct_queue) + { + if (a.assignment.request()->booking()->automatic()) + { + continue; + } + + assignments.push_back(a.assignment); + } + _direct_queue.clear(); + + return assignments; +} + //============================================================================== bool TaskManager::_cancel_task_from_dispatch_queue( const std::string& task_id, const std::vector& labels) { + std::lock_guard lock(_mutex); for (auto it = _queue.begin(); it != _queue.end(); ++it) { if (it->request()->booking()->id() == task_id) { _publish_canceled_pending_task(*it, labels); _queue.erase(it); + + // Count this as an executed task so we don't lose track of its existence + _register_executed_task(task_id); return true; } } @@ -2119,7 +2299,7 @@ void TaskManager::_send_simple_error_if_queued( { // TODO(YV): We could cache the task_ids of direct and dispatched tasks in // unordered_sets and perform a lookup to see which queue to iterate. - std::lock_guard lock(_mutex); + std::lock_guard lock(_mutex); for (const auto& a : _queue) { if (a.request()->booking()->id() == task_id) @@ -2275,6 +2455,8 @@ void TaskManager::_handle_request( _handle_undo_skip_phase_request(request_json, request_id); else if (type_str == "robot_task_request") _handle_direct_request(request_json, request_id); + else if (type_str == "robot_commission_request") + _handle_commission_request(request_json, request_id); else return; } @@ -2300,12 +2482,12 @@ void TaskManager::_handle_direct_request( if (!_validate_request_message(request_json, request_validator, request_id)) return; - const auto& robot = request_json["robot"].get(); - if (robot.empty() || robot != _context->name()) + const auto& fleet = request_json["fleet"].get(); + if (fleet != _context->group()) return; - const auto& fleet = request_json["fleet"].get(); - if (fleet.empty() || fleet != _context->group()) + const auto& robot = request_json["robot"].get(); + if (robot != _context->name()) return; const nlohmann::json& request = request_json["request"]; @@ -2313,6 +2495,142 @@ void TaskManager::_handle_direct_request( _validate_and_publish_api_response(response, response_validator, request_id); } +//============================================================================== +namespace { +nlohmann::json simple_success_json() +{ + nlohmann::json successful_result; + successful_result["success"] = true; + return successful_result; +} +} // anonymous namespace + +//============================================================================== +void TaskManager::_handle_commission_request( + const nlohmann::json& request_json, + const std::string& request_id) +{ + static const auto request_validator = + _make_validator(rmf_api_msgs::schemas::robot_commission_request); + + static const auto response_validator = + std::make_shared( + _make_validator(rmf_api_msgs::schemas::robot_commission_response)); + + if (!_validate_request_message(request_json, request_validator, request_id)) + return; + + const auto& fleet = request_json["fleet"].get(); + if (fleet != _context->group()) + return; + + const auto& robot = request_json["robot"].get(); + if (robot != _context->name()) + return; + + agv::RobotUpdateHandle::Commission commission = _context->commission(); + const nlohmann::json& commission_json = request_json["commission"]; + const auto dispatch_it = commission_json.find("dispatch_tasks"); + if (dispatch_it != commission_json.end()) + { + commission.accept_dispatched_tasks(dispatch_it->get()); + } + + const auto direct_it = commission_json.find("direct_tasks"); + if (direct_it != commission_json.end()) + { + commission.accept_direct_tasks(direct_it->get()); + } + + const auto idle_it = commission_json.find("idle_behavior"); + if (idle_it != commission_json.end()) + { + commission.perform_idle_behavior(idle_it->get()); + } + + _context->set_commission(commission); + + nlohmann::json response_json; + response_json["commission"] = simple_success_json(); + + const auto direct_policy_it = + request_json.find("pending_direct_tasks_policy"); + if (direct_policy_it != request_json.end() + && direct_policy_it->get() == "cancel") + { + const auto assignments = _drain_direct_assignments(); + for (const auto& a : assignments) + { + _publish_canceled_pending_task( + a, {"Canceled by robot commission request [" + request_id + "]"}); + _register_executed_task(a.request()->booking()->id()); + } + } + + response_json["pending_direct_tasks_policy"] = simple_success_json(); + + const auto dispatch_policy_it = + request_json.find("pending_dispatch_tasks_policy"); + if (dispatch_policy_it == request_json.end() + || dispatch_policy_it->get() == "reassign") + { + reassign_dispatched_requests( + [ + request_id, + response_validator = response_validator, + base_response_json = response_json, + self = shared_from_this() + ]() + { + nlohmann::json response_json = base_response_json; + response_json["pending_dispatch_tasks_policy"] = simple_success_json(); + self->_validate_and_publish_api_response( + response_json, *response_validator, request_id); + }, + [ + request_id, + response_validator = response_validator, + base_response_json = response_json, + self = shared_from_this() + ](std::vector errors) + { + nlohmann::json response_json = base_response_json; + nlohmann::json dispatch_json; + dispatch_json["success"] = false; + std::vector errors_json; + for (const auto& e : errors) + { + nlohmann::json error; + error["code"] = 21; + error["category"] = "planner"; + error["detail"] = e; + errors_json.push_back(error); + } + dispatch_json["errors"] = errors_json; + response_json["pending_dispatch_tasks_policy"] = dispatch_json; + self->_validate_and_publish_api_response( + response_json, *response_validator, request_id); + }); + } + else + { + if (dispatch_policy_it->get() == "cancel") + { + std::vector assignments = _drain_dispatched_assignments(); + for (const auto& a : assignments) + { + _publish_canceled_pending_task( + a, {"Canceled by robot commission request [" + request_id + "]"}); + _register_executed_task(a.request()->booking()->id()); + } + } + + response_json["pending_dispatch_tasks_policy"] = simple_success_json(); + _validate_and_publish_api_response( + response_json, *response_validator, request_id); + } +} + //============================================================================== void TaskManager::_handle_cancel_request( const nlohmann::json& request_json, diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp index 575da4697..7a409a7e7 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/TaskManager.hpp @@ -40,6 +40,18 @@ namespace rmf_fleet_adapter { +struct PendingTimeInfo +{ + int64_t unix_millis_finish_time; + int64_t original_estimate_millis; +}; + +struct PendingInfo +{ + rmf_task::Task::Description::Info info; + std::optional time; +}; + //============================================================================== /// This task manager is a first attempt at managing multiple tasks per fleet. /// This is a simple implementation that only makes a modest attempt at being @@ -118,7 +130,13 @@ class TaskManager : public std::enable_shared_from_this void set_queue(const std::vector& assignments); /// Get the non-charging requests among pending tasks - const std::vector requests() const; + std::vector dispatched_requests() const; + + /// Send all dispatched requests that are still in the queue back to the fleet + /// handle for reassignment. + void reassign_dispatched_requests( + std::function on_success, + std::function)> on_failure); std::optional current_task_id() const; @@ -320,6 +338,11 @@ class TaskManager : public std::enable_shared_from_this uint16_t _count_emergency_pullover = 0; // Queue for dispatched tasks std::vector _queue; + std::unordered_map< + rmf_task::ConstRequestPtr, + PendingInfo + > _pending_task_info; + // An ID to keep track of the FIFO order of direct tasks std::size_t _next_sequence_number; // Queue for directly assigned tasks @@ -338,7 +361,7 @@ class TaskManager : public std::enable_shared_from_this // TODO: Eliminate the need for a mutex by redesigning the use of the task // manager so that modifications of shared data only happen on designated // rxcpp worker - mutable std::mutex _mutex; + mutable std::recursive_mutex _mutex; rclcpp::TimerBase::SharedPtr _task_timer; rclcpp::TimerBase::SharedPtr _retreat_timer; rclcpp::TimerBase::SharedPtr _update_timer; @@ -437,6 +460,12 @@ class TaskManager : public std::enable_shared_from_this const Assignment& assignment, std::vector labels); + /// Take all dispatched assignments out of the queue, leaving the queue empty. + std::vector _drain_dispatched_assignments(); + + /// Take all direct assignments out of the queue, leaving the queue empty. + std::vector _drain_direct_assignments(); + /// Cancel a task that is in the dispatch queue. Returns false if the task /// was not present. bool _cancel_task_from_dispatch_queue( @@ -539,6 +568,10 @@ class TaskManager : public std::enable_shared_from_this const nlohmann::json& request_json, const std::string& request_id); + void _handle_commission_request( + const nlohmann::json& request_json, + const std::string& request_id); + void _handle_cancel_request( const nlohmann::json& request_json, const std::string& request_id); diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp index 652f35024..2afd86eee 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/FleetUpdateHandle.cpp @@ -340,10 +340,18 @@ class AllocateTasks : public std::enable_shared_from_this expect.states.size(), expect.pending_requests.size()); + std::unordered_map robot_indexes; + std::vector states; + for (const auto& [context, state] : expect.states) + { + robot_indexes[states.size()] = context; + states.push_back(state); + } + // Generate new task assignments const auto result = task_planner.plan( rmf_traffic_ros2::convert(node->now()), - expect.states, + states, expect.pending_requests); auto assignments_ptr = std::get_if< @@ -394,7 +402,22 @@ class AllocateTasks : public std::enable_shared_from_this return std::nullopt; } - const auto assignments = *assignments_ptr; + TaskAssignments assignments; + for (std::size_t i = 0; i < assignments_ptr->size(); ++i) + { + const auto r_it = robot_indexes.find(i); + if (r_it == robot_indexes.end()) + { + RCLCPP_ERROR( + node->get_logger(), + "[AllocateTasks] Unable to find a robot associated with index [%d]", + i); + continue; + } + + const auto context = r_it->second; + assignments[context] = (*assignments_ptr)[i]; + } if (assignments.empty()) { @@ -541,15 +564,15 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( const auto& assignments = allocation_result.value(); - const double cost = self->_pimpl->task_planner->compute_cost(assignments); + const double cost = self->_pimpl->compute_cost(assignments); // Display computed assignments for debugging std::stringstream debug_stream; debug_stream << "Cost: " << cost << std::endl; - for (std::size_t i = 0; i < assignments.size(); ++i) + for (const auto& [context, queue] : assignments) { - debug_stream << "--Agent: " << i << std::endl; - for (const auto& a : assignments[i]) + debug_stream << "--Agent: " << context->requester_id() << std::endl; + for (const auto& a : queue) { const auto& s = a.finish_state(); const double request_seconds = @@ -576,31 +599,19 @@ void FleetUpdateHandle::Implementation::bid_notice_cb( "%s", debug_stream.str().c_str()); - // Map robot index to name to populate robot_name in BidProposal - std::unordered_map robot_name_map; - std::size_t index = 0; - for (const auto& t : self->_pimpl->task_managers) - { - robot_name_map.insert({index, t.first->name()}); - ++index; - } - std::optional robot_name; std::optional finish_time; - index = 0; - for (const auto& agent : assignments) + for (const auto& [context, queue] : assignments) { - for (const auto& assignment : agent) + for (const auto& assignment : queue) { if (assignment.request()->booking()->id() == task_id) { finish_time = assignment.finish_state().time().value(); - if (robot_name_map.find(index) != robot_name_map.end()) - robot_name = robot_name_map[index]; + robot_name = context->name(); break; } } - ++index; } if (!robot_name.has_value() || !finish_time.has_value()) @@ -660,6 +671,7 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( dispatch_ack.dispatch_id = msg->dispatch_id; if (msg->type == DispatchCmdMsg::TYPE_AWARD) { + last_bid_assignment = task_id; const auto task_it = bid_notice_assignments.find(task_id); if (task_it == bid_notice_assignments.end()) { @@ -715,23 +727,6 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( auto assignments = std::move(task_it->second); bid_notice_assignments.erase(task_it); - if (assignments.size() != task_managers.size()) - { - // FIXME(MXG): This error mode seems like a problem with our - // implementation. If a robot is added during a bid process, we could - // trigger this error even though it shouldn't actually be a problem. - - std::string error_str = - "The number of available robots does not match that in the assignments " - "for task_id [" + task_id + "]. This request will be ignored."; - RCLCPP_ERROR(node->get_logger(), "%s", error_str.c_str()); - dispatch_ack.errors.push_back( - make_error_str(13, "Internal bug", std::move(error_str))); - - dispatch_ack_pub->publish(dispatch_ack); - return; - } - // Here we make sure none of the tasks in the assignments has already begun // execution. If so, we replan assignments until a valid set is obtained // and only then update the task manager queues @@ -739,9 +734,9 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( if (!valid_assignments) { rmf_task::ConstRequestPtr request; - for (const auto& a : assignments) + for (const auto& [_, queue] : assignments) { - for (const auto& r : a) + for (const auto& r : queue) { if (r.request()->booking()->id() == task_id) { @@ -768,42 +763,71 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( return; } - // TODO: This replanning is blocking the main thread. Instead, the - // replanning should run on a separate worker and then deliver the - // result back to the main worker. - const auto replan_results = AllocateTasks( - nullptr, aggregate_expectations(), *task_planner, node) - .run(dispatch_ack.errors); + unassigned_requests.push_back(request); + reassign_dispatched_tasks( + // On success + [dispatch_ack, dispatch_ack_pub = dispatch_ack_pub]() + { + auto msg = dispatch_ack; + msg.success = true; + dispatch_ack_pub->publish(dispatch_ack); + }, + // On failure + [dispatch_ack, w = weak_self, request]( + std::vector errors) + { + const auto self = w.lock(); + if (!self) + return; - if (!replan_results) - { - std::string error_str = - "Unable to replan assignments when accommodating task_id [" + task_id - + "]. This request will be ignored."; + auto r_it = std::remove( + self->_pimpl->unassigned_requests.begin(), + self->_pimpl->unassigned_requests.end(), + request); - RCLCPP_ERROR(node->get_logger(), "%s", error_str.c_str()); - dispatch_ack.errors.push_back( - make_error_str(9, "Not feasible", std::move(error_str))); - dispatch_ack_pub->publish(dispatch_ack); - return; - } + self->_pimpl->unassigned_requests.erase( + r_it, self->_pimpl->unassigned_requests.end()); + + std::string error_str = + "Unable to replan assignments when accommodating task_id [" + + request->booking()->id() + "]. Reasons:"; + if (errors.empty()) + { + error_str += " No reason given by planner."; + } + else + { + for (const auto& e : errors) + { + error_str += "\n -- " + e; + } + } - assignments = replan_results.value(); - // We do not need to re-check if assignments are valid as this function - // is being called by the ROS2 executor and is running on the main - // rxcpp worker. Hence, no new tasks would have started during this - // replanning. + RCLCPP_ERROR( + self->_pimpl->node->get_logger(), + "%s", + error_str.c_str()); + auto msg = dispatch_ack; + msg.success = false; + msg.errors.push_back( + make_error_str(9, "Not feasible", std::move(error_str))); + self->_pimpl->dispatch_ack_pub->publish(msg); + }); } else { - std::size_t index = 0; - for (auto& t : task_managers) + for (const auto& [context, queue] : assignments) { - t.second->set_queue(assignments[index]); - ++index; + context->task_manager()->set_queue(queue); } - current_assignment_cost = task_planner->compute_cost(assignments); + // Any unassigned requests would have been collected by the aggregator and + // given an assignment while calculating this bid. As long as + // is_valid_assignments was able to pass, then all tasks have been + // assigned to properly commissioned robots. + unassigned_requests.clear(); + + current_assignment_cost = compute_cost(assignments); dispatch_ack.success = true; dispatch_ack_pub->publish(dispatch_ack); @@ -836,52 +860,39 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( if (task_was_found) { - // Re-plan assignments while ignoring request for task to be cancelled - std::vector errors; - const auto replan_results = AllocateTasks( - nullptr, aggregate_expectations(), *task_planner, node) - .run(errors); - - if (!replan_results.has_value()) - { - std::stringstream ss; - ss << "Unabled to replan assignments when cancelling task [" - << task_id << "]. "; - if (errors.empty()) + reassign_dispatched_tasks( + // On success + [task_id, name = name, node = node]() { - ss << "No planner error messages were provided."; - } - else + RCLCPP_INFO( + node->get_logger(), + "Task with task_id [%s] has successfully been cancelled. " + "TaskAssignments updated for robots in fleet [%s].", + task_id.c_str(), name.c_str()); + }, + // On failure + [task_id, name = name, node = node](std::vector errors) { - ss << "The following planner errors occurred:"; - for (const auto& e : errors) + std::stringstream ss; + ss << "Unabled to replan assignments when cancelling task [" + << task_id << "] for fleet [" << name << "]. "; + if (errors.empty()) { - const auto err = nlohmann::json::parse(e); - ss << "\n -- " << err["detail"].get(); + ss << "No planner error messages were provided."; } - } - ss << "\n"; - - RCLCPP_WARN(node->get_logger(), "%s", ss.str().c_str()); - } - else - { - const auto& assignments = replan_results.value(); - std::size_t index = 0; - for (auto& t : task_managers) - { - t.second->set_queue(assignments[index]); - ++index; - } - - current_assignment_cost = task_planner->compute_cost(assignments); + else + { + ss << "The following planner errors occurred:"; + for (const auto& e : errors) + { + const auto err = nlohmann::json::parse(e); + ss << "\n -- " << err["detail"].get(); + } + } + ss << "\n"; - RCLCPP_INFO( - node->get_logger(), - "Task with task_id [%s] has successfully been cancelled. TaskAssignments " - "updated for robots in fleet [%s].", - task_id.c_str(), name.c_str()); - } + RCLCPP_WARN(node->get_logger(), "%s", ss.str().c_str()); + }); } } @@ -899,9 +910,24 @@ void FleetUpdateHandle::Implementation::dispatch_command_cb( } } +//============================================================================== +double FleetUpdateHandle::Implementation::compute_cost( + const TaskAssignments& assignments) const +{ + rmf_task::TaskPlanner::Assignments raw_assignments; + raw_assignments.reserve(assignments.size()); + for (const auto [_, queue] : assignments) + { + raw_assignments.push_back(queue); + } + + return task_planner->compute_cost(raw_assignments); +} + //============================================================================== auto FleetUpdateHandle::Implementation::is_valid_assignments( - TaskAssignments& assignments) const -> bool + TaskAssignments& assignments, + std::string* report_error) const -> bool { std::unordered_set executed_tasks; for (const auto& [context, mgr] : task_managers) @@ -910,19 +936,171 @@ auto FleetUpdateHandle::Implementation::is_valid_assignments( executed_tasks.insert(tasks.begin(), tasks.end()); } - for (const auto& agent : assignments) + for (const auto& [context, queue] : assignments) { - for (const auto& a : agent) + for (const auto& a : queue) { if (executed_tasks.find(a.request()->booking()->id()) != executed_tasks.end()) + { + if (report_error) + { + *report_error = "task [" + a.request()->booking()->id() + + "] already began executing"; + } + return false; + } + } + + if (!queue.empty()) + { + if (!context->commission().is_accepting_dispatched_tasks()) + { + if (report_error) + { + *report_error = "robot [" + context->name() + + "] has been decommissioned"; + } + // If any tasks were assigned to this robot after it has been + // decommissioned, then those were invalid assignments. + return false; + } + + if (task_managers.find(context) == task_managers.end()) + { + if (report_error) + { + *report_error = "robot [" + context->name() + + "] has been removed from the fleet"; + } + // This robot is no longer part of the fleet, so we cannot assign + // tasks to it. return false; + } } } return true; } +//============================================================================== +void FleetUpdateHandle::Implementation::reassign_dispatched_tasks( + std::function on_success, + std::function)> on_failure) +{ + auto expectations = aggregate_expectations(); + if (expectations.states.empty() && !expectations.pending_requests.empty()) + { + // If there are no robots that can perform any tasks but there are pending + // requests, then we should immediately assume failure. + worker.schedule( + [on_failure](const auto&) + { + on_failure({"no more robots available"}); + }); + return; + } + + auto on_plan_received = [ + on_success, + on_failure, + w = weak_self, + initial_last_bid_assignment = last_bid_assignment + ](TaskAssignments assignments) + { + const auto self = w.lock(); + if (!self) + return; + + const bool new_task_came_in = + initial_last_bid_assignment != self->_pimpl->last_bid_assignment; + + std::string error; + const bool valid_assignments = + !new_task_came_in + && self->_pimpl->is_valid_assignments(assignments, &error); + + if (!valid_assignments) + { + // We need to replan because some important aspect of the planning + // problem has changed since we originally tried to reassign. + if (new_task_came_in) + { + RCLCPP_WARN( + self->_pimpl->node->get_logger(), + "Redoing task reassignment for fleet [%s] because a new task was " + "dispatched while the reassignment was being calculated.", + self->_pimpl->name.c_str()); + } + else + { + RCLCPP_WARN( + self->_pimpl->node->get_logger(), + "Redoing task reassignment for fleet [%s] because %s.", + self->_pimpl->name.c_str(), + error.c_str()); + } + + self->_pimpl->reassign_dispatched_tasks(on_success, on_failure); + return; + } + + for (const auto& [context, queue] : assignments) + { + context->task_manager()->set_queue(queue); + } + + // All requests should be assigned now, no matter which robot they were + // originally assigned to, so we can clear this buffer. + // + // As long as no new tasks were dispatched to this fleet (verified by the + // new_task_came_in check), any tasks that might have been previously + // unassigned will be accounted for in the current set of assignments. + self->_pimpl->unassigned_requests.clear(); + + self->_pimpl->current_assignment_cost = + self->_pimpl->compute_cost(assignments); + + on_success(); + }; + + reassignment_worker.schedule( + [ + on_plan_received, + on_failure, + expectations = std::move(expectations), + task_planner = *task_planner, + node = node, + worker = worker + ](const auto&) + { + std::vector errors; + const auto replan_results = AllocateTasks( + nullptr, expectations, task_planner, node) + .run(errors); + + if (replan_results.has_value()) + { + worker.schedule( + [ + assignments = std::move(*replan_results), + on_plan_received + ](const auto&) + { + on_plan_received(assignments); + }); + } + else + { + worker.schedule( + [errors = std::move(errors), on_failure](const auto&) + { + on_failure(errors); + }); + } + }); +} + //============================================================================== std::optional FleetUpdateHandle::Implementation:: get_nearest_charger( @@ -1097,6 +1275,16 @@ void FleetUpdateHandle::Implementation::update_fleet_state() const issue_msg["detail"] = issue->detail; issues_msg.push_back(std::move(issue_msg)); } + + const auto& commission = context->commission(); + nlohmann::json commission_json; + commission_json["dispatch_tasks"] = + commission.is_accepting_dispatched_tasks(); + commission_json["direct_tasks"] = commission.is_accepting_direct_tasks(); + commission_json["idle_behavior"] = + commission.is_performing_idle_behavior(); + + json["commission"] = commission_json; } try @@ -1499,15 +1687,24 @@ auto FleetUpdateHandle::Implementation::aggregate_expectations() const for (const auto& t : task_managers) { // Ignore any robots that are not currently commissioned. - if (!t.first->is_commissioned()) + if (!t.first->commission().is_accepting_dispatched_tasks()) continue; - expect.states.push_back(t.second->expected_finish_state()); - const auto requests = t.second->requests(); + expect.states.push_back( + ExpectedState { + t.first, + t.second->expected_finish_state() + }); + const auto requests = t.second->dispatched_requests(); expect.pending_requests.insert( expect.pending_requests.end(), requests.begin(), requests.end()); } + expect.pending_requests.insert( + expect.pending_requests.end(), + unassigned_requests.begin(), + unassigned_requests.end()); + return expect; } diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp index 3f0da0358..5fc732efb 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.cpp @@ -971,21 +971,24 @@ std::shared_ptr RobotContext::task_manager() } //============================================================================== -bool RobotContext::is_commissioned() const +void RobotContext::set_commission(RobotUpdateHandle::Commission value) { - return _commissioned; + std::lock_guard lock(*_commission_mutex); + _commission = std::move(value); } //============================================================================== -void RobotContext::decommission() +const RobotUpdateHandle::Commission& RobotContext::commission() const { - _commissioned = false; + return _commission; } //============================================================================== -void RobotContext::recommission() +RobotUpdateHandle::Commission RobotContext::copy_commission() const { - _commissioned = true; + std::lock_guard lock(*_commission_mutex); + RobotUpdateHandle::Commission copy = _commission; + return copy; } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp index da39cb27c..b20559015 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotContext.hpp @@ -54,6 +54,7 @@ class TaskManager; namespace agv { +class FleetUpdateHandle; class RobotContext; using TransformDictionary = std::unordered_map; using SharedPlanner = std::shared_ptr< @@ -636,13 +637,18 @@ class RobotContext /// Get the task manager for this robot, if it exists. std::shared_ptr task_manager(); - /// Return true if this robot is currently commissioned (available to accept - /// new tasks). - bool is_commissioned() const; + /// Set the commission for this robot + void set_commission(RobotUpdateHandle::Commission value); - void decommission(); + /// Get a reference to the robot's commission. + const RobotUpdateHandle::Commission& commission() const; - void recommission(); + /// Lock the commission_mutex and return a copy of the robot's current + /// commission. + RobotUpdateHandle::Commission copy_commission() const; + + /// Reassign the tasks that have been dispatched for this robot + void reassign_dispatched_tasks(); Reporting& reporting(); @@ -858,7 +864,9 @@ class RobotContext RobotUpdateHandle::Unstable::Watchdog _lift_watchdog; rmf_traffic::Duration _lift_rewait_duration = std::chrono::seconds(0); - bool _commissioned = true; + std::unique_ptr _commission_mutex = + std::make_unique(); + RobotUpdateHandle::Commission _commission; bool _emergency = false; EasyFullControl::LocalizationRequest _localize; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp index 46fbb44f1..68a46eca0 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/RobotUpdateHandle.cpp @@ -720,6 +720,100 @@ void RobotUpdateHandle::release_lift() }); } +//============================================================================== +class RobotUpdateHandle::Commission::Implementation +{ +public: + bool is_accepting_dispatched_tasks = true; + bool is_accepting_direct_tasks = true; + bool is_performing_idle_behavior = true; +}; + +//============================================================================== +RobotUpdateHandle::Commission::Commission() +: _pimpl(rmf_utils::make_impl()) +{ + // Do nothing +} + +//============================================================================== +auto RobotUpdateHandle::Commission::decommission() -> Commission +{ + return Commission() + .accept_dispatched_tasks(false) + .accept_direct_tasks(false) + .perform_idle_behavior(false); +} + +//============================================================================== +auto RobotUpdateHandle::Commission::accept_dispatched_tasks(bool decision) +-> Commission& +{ + _pimpl->is_accepting_dispatched_tasks = decision; + return *this; +} + +//============================================================================== +bool RobotUpdateHandle::Commission::is_accepting_dispatched_tasks() const +{ + return _pimpl->is_accepting_dispatched_tasks; +} + +//============================================================================== +auto RobotUpdateHandle::Commission::accept_direct_tasks(bool decision) +-> Commission& +{ + _pimpl->is_accepting_direct_tasks = decision; + return *this; +} + +//============================================================================== +bool RobotUpdateHandle::Commission::is_accepting_direct_tasks() const +{ + return _pimpl->is_accepting_direct_tasks; +} + +//============================================================================== +auto RobotUpdateHandle::Commission::perform_idle_behavior(bool decision) +-> Commission& +{ + _pimpl->is_performing_idle_behavior = decision; + return *this; +} + +//============================================================================== +bool RobotUpdateHandle::Commission::is_performing_idle_behavior() const +{ + return _pimpl->is_performing_idle_behavior; +} + +//============================================================================== +void RobotUpdateHandle::set_commission(Commission commission) +{ + _pimpl->set_commission(std::move(commission)); +} + +//============================================================================== +auto RobotUpdateHandle::commission() const -> Commission +{ + return _pimpl->commission(); +} + +//============================================================================== +void RobotUpdateHandle::reassign_dispatched_tasks() +{ + if (const auto context = _pimpl->get_context()) + { + context->worker().schedule( + [context](const auto&) + { + const auto mgr = context->task_manager(); + if (mgr) + mgr->reassign_dispatched_requests([]() {}, [](auto) {}); + }); + } +} + //============================================================================== RobotUpdateHandle::RobotUpdateHandle() { @@ -742,7 +836,7 @@ const RobotUpdateHandle::Unstable& RobotUpdateHandle::unstable() const bool RobotUpdateHandle::Unstable::is_commissioned() const { if (const auto context = _pimpl->get_context()) - return context->is_commissioned(); + return context->copy_commission().is_accepting_dispatched_tasks(); return false; } @@ -750,29 +844,17 @@ bool RobotUpdateHandle::Unstable::is_commissioned() const //============================================================================== void RobotUpdateHandle::Unstable::decommission() { - if (const auto context = _pimpl->get_context()) - { - context->worker().schedule( - [w = context->weak_from_this()](const auto&) - { - if (const auto context = w.lock()) - context->decommission(); - }); - } + _pimpl->set_commission( + _pimpl->commission() + .accept_dispatched_tasks(false)); } //============================================================================== void RobotUpdateHandle::Unstable::recommission() { - if (const auto context = _pimpl->get_context()) - { - context->worker().schedule( - [w = context->weak_from_this()](const auto&) - { - if (const auto context = w.lock()) - context->recommission(); - }); - } + _pimpl->set_commission( + _pimpl->commission() + .accept_dispatched_tasks(true)); } //============================================================================== diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp index 22369f3ff..2e218e882 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_FleetUpdateHandle.hpp @@ -68,6 +68,7 @@ #include #include #include +#include #include @@ -233,16 +234,27 @@ class ParticipantFactoryRos2 : public ParticipantFactory rmf_traffic_ros2::schedule::WriterPtr _writer; }; +//============================================================================== +struct ExpectedState +{ + RobotContextPtr context; + rmf_task::State state; +}; + //============================================================================== struct Expectations { - std::vector states; + std::vector states; std::vector pending_requests; }; //============================================================================== -// Map task id to pair of -using TaskAssignments = rmf_task::TaskPlanner::Assignments; +// Map from the robot instance to its proposed assignment of tasks +using TaskAssignments = std::unordered_map< + RobotContextPtr, + std::vector>; + +// Forward declaration of a task allocation job class AllocateTasks; //============================================================================== @@ -330,6 +342,13 @@ class FleetUpdateHandle::Implementation // Map to store task id with assignments for BidNotice std::unordered_map bid_notice_assignments = {}; + // This is checked before and after the task reassignment procedure to ensure + // that no new task came in from the dispatcher while the reassignment was + // being calculated. + std::string last_bid_assignment; + std::vector unassigned_requests; + rxcpp::schedulers::worker reassignment_worker; + using BidNoticeMsg = rmf_task_msgs::msg::BidNotice; using DispatchCmdMsg = rmf_task_msgs::msg::DispatchCommand; @@ -364,6 +383,9 @@ class FleetUpdateHandle::Implementation handle->_pimpl = rmf_utils::make_unique_impl( Implementation{handle, std::forward(args)...}); + handle->_pimpl->reassignment_worker = + rxcpp::schedulers::make_event_loop().create_worker(); + handle->_pimpl->add_standard_tasks(); handle->_pimpl->emergency_obs = @@ -479,7 +501,8 @@ class FleetUpdateHandle::Implementation rmf_api_msgs::schemas::robot_state, rmf_api_msgs::schemas::location_2D, rmf_api_msgs::schemas::fleet_log, - rmf_api_msgs::schemas::log_entry + rmf_api_msgs::schemas::log_entry, + rmf_api_msgs::schemas::commission, }; for (const auto& schema : schemas) @@ -617,6 +640,8 @@ class FleetUpdateHandle::Implementation void dispatch_command_cb(const DispatchCmdMsg::SharedPtr msg); + double compute_cost(const TaskAssignments& assignments) const; + std::optional get_nearest_charger( const rmf_traffic::agv::Planner::Start& start); @@ -624,7 +649,14 @@ class FleetUpdateHandle::Implementation /// Helper function to check if assignments are valid. An assignment set is /// invalid if one of the assignments has already begun execution. - bool is_valid_assignments(TaskAssignments& assignments) const; + bool is_valid_assignments( + TaskAssignments& assignments, + std::string* report_error = nullptr) const; + + void reassign_dispatched_tasks( + std::function on_success, + // argument is a vector of error messages from the planner + std::function)> on_failure); void publish_fleet_state_topic() const; diff --git a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp index b95dc85dc..7e6bc133d 100644 --- a/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp +++ b/rmf_fleet_adapter/src/rmf_fleet_adapter/agv/internal_RobotUpdateHandle.hpp @@ -299,6 +299,32 @@ class RobotUpdateHandle::Implementation return *handle._pimpl; } + void set_commission(Commission commission) + { + if (const auto context = get_context()) + { + context->worker().schedule( + [w = context->weak_from_this(), commission = std::move(commission)]( + const auto&) + { + if (const auto context = w.lock()) + { + context->set_commission(commission); + } + }); + } + } + + Commission commission() const + { + if (const auto context = get_context()) + { + return context->copy_commission(); + } + + return Commission::decommission(); + } + std::shared_ptr get_context(); std::shared_ptr get_context() const; diff --git a/rmf_fleet_adapter_python/src/adapter.cpp b/rmf_fleet_adapter_python/src/adapter.cpp index 0381dd164..fc1bace85 100644 --- a/rmf_fleet_adapter_python/src/adapter.cpp +++ b/rmf_fleet_adapter_python/src/adapter.cpp @@ -61,6 +61,7 @@ using ActivityIdentifier = agv::RobotUpdateHandle::ActivityIdentifier; using ActionExecution = agv::RobotUpdateHandle::ActionExecution; using RobotInterruption = agv::RobotUpdateHandle::Interruption; using IssueTicket = agv::RobotUpdateHandle::IssueTicket; +using Commission = agv::RobotUpdateHandle::Commission; using Stubbornness = agv::RobotUpdateHandle::Unstable::Stubbornness; void bind_types(py::module&); @@ -291,7 +292,14 @@ PYBIND11_MODULE(rmf_adapter, m) { py::arg("text")) .def("enable_responsive_wait", &agv::RobotUpdateHandle::enable_responsive_wait, - py::arg("value")); + py::arg("value")) + .def("set_commission", + &agv::RobotUpdateHandle::set_commission, + py::arg("commission")) + .def("commission", + &agv::RobotUpdateHandle::commission) + .def("reassign_dispatched_tasks", + &agv::RobotUpdateHandle::reassign_dispatched_tasks); // ACTION EXECUTOR ======================================================= auto m_robot_update_handle = m.def_submodule("robot_update_handle"); @@ -354,6 +362,22 @@ PYBIND11_MODULE(rmf_adapter, m) { .value("Warning", agv::RobotUpdateHandle::Tier::Warning) .value("Error", agv::RobotUpdateHandle::Tier::Error); + // Commission ====================================================== + py::class_( + m_robot_update_handle, "Commission") + .def_property( + "accept_dispatched_tasks", + &Commission::is_accepting_dispatched_tasks, + &Commission::accept_dispatched_tasks) + .def_property( + "accept_direct_tasks", + &Commission::is_accepting_direct_tasks, + &Commission::accept_direct_tasks) + .def_property( + "perform_idle_behavior", + &Commission::is_performing_idle_behavior, + &Commission::perform_idle_behavior); + // Stubbornness ============================================================ py::class_( m_robot_update_handle, "Stubbornness")