diff --git a/rclcpp_components/include/rclcpp_components/component_manager.hpp b/rclcpp_components/include/rclcpp_components/component_manager.hpp index 34b2ea4d39..62834efa81 100644 --- a/rclcpp_components/include/rclcpp_components/component_manager.hpp +++ b/rclcpp_components/include/rclcpp_components/component_manager.hpp @@ -78,6 +78,21 @@ class ComponentManagerException : public std::runtime_error /// ComponentManager handles the services to load, unload, and get the list of loaded components. class ComponentManager : public rclcpp::Node { + struct DedicatedExecutorWrapper + { + std::shared_ptr executor; + std::thread thread; + std::atomic_bool thread_initialized; + + /// Constructor for the wrapper. + /// This is necessary as atomic variables don't have copy/move operators + /// implemented so this structure is not copyable/movable by default + explicit DedicatedExecutorWrapper(std::shared_ptr exec) + : executor(exec), thread_initialized(false) + { + } + }; + public: using LoadNode = composition_interfaces::srv::LoadNode; using UnloadNode = composition_interfaces::srv::UnloadNode; @@ -225,6 +240,16 @@ class ComponentManager : public rclcpp::Node rclcpp::Service::SharedPtr loadNode_srv_; rclcpp::Service::SharedPtr unloadNode_srv_; rclcpp::Service::SharedPtr listNodes_srv_; + +private: + /// Stops a spinning executor avoiding race conditions. + /** + * @param executor_wrapper executor to stop and its associated thread + */ + void + cancel_executor(DedicatedExecutorWrapper & executor_wrapper); + + std::unordered_map dedicated_executor_wrappers_; }; } // namespace rclcpp_components diff --git a/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp b/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp index dfe0c79a32..c5c4af383e 100644 --- a/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp +++ b/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp @@ -37,7 +37,12 @@ namespace rclcpp_components { /// ComponentManagerIsolated uses dedicated single-threaded executors for each components. template -class ComponentManagerIsolated : public rclcpp_components::ComponentManager +class +[[deprecated( + "ComponentManagerIsolated is deprecated. Use ComponentManager, and specify the type of executor " + "you want to use for each node with the 'executor_type' parameter of " + "load_node.")]] +ComponentManagerIsolated : public rclcpp_components::ComponentManager { using rclcpp_components::ComponentManager::ComponentManager; diff --git a/rclcpp_components/include/rclcpp_components/node_instance_wrapper.hpp b/rclcpp_components/include/rclcpp_components/node_instance_wrapper.hpp index 2705e4e8b3..22d6bf5387 100644 --- a/rclcpp_components/include/rclcpp_components/node_instance_wrapper.hpp +++ b/rclcpp_components/include/rclcpp_components/node_instance_wrapper.hpp @@ -22,6 +22,14 @@ namespace rclcpp_components { + +enum class ExecutorType +{ + Shared, + SingleThreaded, + MultiThreaded, +}; + /// The NodeInstanceWrapper encapsulates the node instance. class NodeInstanceWrapper { @@ -35,9 +43,15 @@ class NodeInstanceWrapper NodeInstanceWrapper( std::shared_ptr node_instance, - NodeBaseInterfaceGetter node_base_interface_getter) - : node_instance_(node_instance), node_base_interface_getter_(node_base_interface_getter) - {} + NodeBaseInterfaceGetter node_base_interface_getter, + ExecutorType executor_type = ExecutorType::Shared, + int thread_num = 0) + : executor_type_(executor_type), + thread_num_(thread_num), + node_instance_(node_instance), + node_base_interface_getter_(node_base_interface_getter) + { + } /// Get a type-erased pointer to the original Node instance /** @@ -62,7 +76,33 @@ class NodeInstanceWrapper return node_base_interface_getter_(node_instance_); } + ExecutorType + get_executor_type() + { + return executor_type_; + } + + int + get_thread_num() + { + return thread_num_; + } + + void + set_executor_type(ExecutorType executor_type) + { + executor_type_ = executor_type; + } + + void + set_thread_num(int thread_num) + { + thread_num_ = thread_num; + } + private: + ExecutorType executor_type_; + int thread_num_; std::shared_ptr node_instance_; NodeBaseInterfaceGetter node_base_interface_getter_; }; diff --git a/rclcpp_components/src/component_container_isolated.cpp b/rclcpp_components/src/component_container_isolated.cpp index dbb4c134eb..6436026d03 100644 --- a/rclcpp_components/src/component_container_isolated.cpp +++ b/rclcpp_components/src/component_container_isolated.cpp @@ -21,8 +21,11 @@ #include "rclcpp/utilities.hpp" #include "rclcpp_components/component_manager_isolated.hpp" - -int main(int argc, char * argv[]) +[[deprecated( + "component_container_isolated is deprecated. Use component_container, and specify the type of " + "executor you want to use for each node with the 'executor_type' parameter of load_node.")]] +int +main(int argc, char * argv[]) { /// Component container with dedicated single-threaded executors for each components. rclcpp::init(argc, argv); @@ -46,6 +49,11 @@ int main(int argc, char * argv[]) rclcpp_components::ComponentManagerIsolated; node = std::make_shared(exec); } + + RCLCPP_WARN(node->get_logger(), + "component_container_isolated is deprecated. Use component_container, and specify the type of " + "executor you want to use for each node with the 'executor_type' parameter of load_node."); + exec->add_node(node); exec->spin(); diff --git a/rclcpp_components/src/component_manager.cpp b/rclcpp_components/src/component_manager.cpp index ac2df6d812..8ef956f95f 100644 --- a/rclcpp_components/src/component_manager.cpp +++ b/rclcpp_components/src/component_manager.cpp @@ -37,6 +37,7 @@ #include "rcl_interfaces/msg/integer_range.hpp" #include "rcpputils/split.hpp" +#include "rcpputils/thread_name.hpp" #include "rmw/types.h" @@ -83,9 +84,19 @@ ComponentManager::~ComponentManager() RCLCPP_DEBUG(get_logger(), "Removing components from executor"); if (auto exec = executor_.lock()) { for (auto & wrapper : node_wrappers_) { - exec->remove_node(wrapper.second.get_node_base_interface()); + if (wrapper.second.get_executor_type() == ExecutorType::Shared) { + RCLCPP_DEBUG(get_logger(), "Removing node %lu from shared executor.", wrapper.first); + exec->remove_node(wrapper.second.get_node_base_interface()); + } } } + + for (auto & executor_wrapper : dedicated_executor_wrappers_) { + RCLCPP_DEBUG(get_logger(), "Removing node %lu's executor.", executor_wrapper.first); + cancel_executor(executor_wrapper.second); + } + + node_wrappers_.clear(); } } @@ -248,16 +259,63 @@ ComponentManager::set_executor(const std::weak_ptr executor) void ComponentManager::add_node_to_executor(uint64_t node_id) { - if (auto exec = executor_.lock()) { - exec->add_node(node_wrappers_[node_id].get_node_base_interface(), true); + if (node_wrappers_[node_id].get_executor_type() == ExecutorType::Shared) { + if (auto exec = executor_.lock()) { + exec->add_node(node_wrappers_[node_id].get_node_base_interface(), true); + } + return; } + + std::shared_ptr exec; + switch (node_wrappers_[node_id].get_executor_type()) { + case ExecutorType::Shared: + break; // Unreachable + case ExecutorType::SingleThreaded: + exec = std::make_shared(); + break; + case ExecutorType::MultiThreaded: + exec = std::make_shared( + rclcpp::ExecutorOptions(), node_wrappers_[node_id].get_thread_num()); + break; + } + + exec->add_node(node_wrappers_[node_id].get_node_base_interface()); + + // Emplace rather than std::move since move operations are deleted for atomics + auto result = dedicated_executor_wrappers_.emplace(std::make_pair(node_id, exec)); + DedicatedExecutorWrapper & wrapper = result.first->second; + wrapper.executor = exec; + auto & thread_initialized = wrapper.thread_initialized; + auto name = node_wrappers_[node_id].get_node_base_interface()->get_name(); + // Copy name so that it doesn't deallocate before the thread is started + wrapper.thread = std::thread([exec, &thread_initialized, name]() { + try { + rcpputils::set_thread_name(name); + } catch (const std::system_error & e) { + RCLCPP_WARN(rclcpp::get_logger("rclcpp"), + "Failed to set thread name: %s (%s)", + e.what(), + e.code().message().c_str()); + } + thread_initialized = true; + exec->spin(); + }); } void ComponentManager::remove_node_from_executor(uint64_t node_id) { - if (auto exec = executor_.lock()) { - exec->remove_node(node_wrappers_[node_id].get_node_base_interface()); + if (node_wrappers_[node_id].get_executor_type() == ExecutorType::Shared) { + if (auto exec = executor_.lock()) { + exec->remove_node(node_wrappers_[node_id].get_node_base_interface()); + } + return; + } + + auto executor_wrapper = dedicated_executor_wrappers_.find(node_id); + if (executor_wrapper != dedicated_executor_wrappers_.end()) { + cancel_executor(executor_wrapper->second); + dedicated_executor_wrappers_.erase(executor_wrapper); } } @@ -307,6 +365,18 @@ ComponentManager::on_load_node( throw ComponentManagerException("Component constructor threw an exception"); } + if (request->executor_type == "") { + node_wrappers_[node_id].set_executor_type(ExecutorType::Shared); + } else if (request->executor_type == "SingleThreadedExecutor") { + node_wrappers_[node_id].set_executor_type(ExecutorType::SingleThreaded); + } else if (request->executor_type == "MultiThreadedExecutor") { + node_wrappers_[node_id].set_executor_type(ExecutorType::MultiThreaded); + node_wrappers_[node_id].set_thread_num(request->thread_num); + } else { + node_wrappers_.erase(node_id); + throw ComponentManagerException("Invalid executor_type: " + request->executor_type); + } + add_node_to_executor(node_id); auto node = node_wrappers_[node_id].get_node_base_interface(); @@ -362,4 +432,31 @@ ComponentManager::on_list_nodes( } } +void +ComponentManager::cancel_executor(DedicatedExecutorWrapper & executor_wrapper) +{ + // Verify that the executor thread has begun spinning. + // If it has not, then wait until the thread starts to ensure + // that cancel() will fully stop the execution + // + // This prevents a previous race condition that occurs between the + // creation of the executor spin thread and cancelling an executor + + if (!executor_wrapper.thread_initialized) { + auto context = this->get_node_base_interface()->get_context(); + + // Guarantee that either the executor is spinning or we are shutting down. + while (!executor_wrapper.executor->is_spinning() && rclcpp::ok(context)) { + // This is an arbitrarily small delay to avoid busy looping + rclcpp::sleep_for(std::chrono::milliseconds(1)); + } + } + + // After the while loop we are sure that the executor is now spinning, so + // the call to cancel() will work. + executor_wrapper.executor->cancel(); + // Wait for the thread task to return + executor_wrapper.thread.join(); +} + } // namespace rclcpp_components