Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions rclcpp_components/include/rclcpp_components/component_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ class ComponentManagerException : public std::runtime_error
/// ComponentManager handles the services to load, unload, and get the list of loaded components.
class ComponentManager : public rclcpp::Node
{
struct DedicatedExecutorWrapper
{
std::shared_ptr<rclcpp::Executor> executor;
std::thread thread;
std::atomic_bool thread_initialized;

/// Constructor for the wrapper.
/// This is necessary as atomic variables don't have copy/move operators
/// implemented so this structure is not copyable/movable by default
explicit DedicatedExecutorWrapper(std::shared_ptr<rclcpp::Executor> exec)
: executor(exec), thread_initialized(false)
{
}
};

public:
using LoadNode = composition_interfaces::srv::LoadNode;
using UnloadNode = composition_interfaces::srv::UnloadNode;
Expand Down Expand Up @@ -225,6 +240,16 @@ class ComponentManager : public rclcpp::Node
rclcpp::Service<LoadNode>::SharedPtr loadNode_srv_;
rclcpp::Service<UnloadNode>::SharedPtr unloadNode_srv_;
rclcpp::Service<ListNodes>::SharedPtr listNodes_srv_;

private:
/// Stops a spinning executor avoiding race conditions.
/**
* @param executor_wrapper executor to stop and its associated thread
*/
void
cancel_executor(DedicatedExecutorWrapper & executor_wrapper);

std::unordered_map<uint64_t, DedicatedExecutorWrapper> dedicated_executor_wrappers_;
};

} // namespace rclcpp_components
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ namespace rclcpp_components
{
/// ComponentManagerIsolated uses dedicated single-threaded executors for each components.
template<typename ExecutorT = rclcpp::executors::SingleThreadedExecutor>
class ComponentManagerIsolated : public rclcpp_components::ComponentManager
class
[[deprecated(
"ComponentManagerIsolated is deprecated. Use ComponentManager, and specify the type of executor "
"you want to use for each node with the 'executor_type' parameter of "
"load_node.")]]
ComponentManagerIsolated : public rclcpp_components::ComponentManager
{
using rclcpp_components::ComponentManager::ComponentManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@

namespace rclcpp_components
{

enum class ExecutorType
{
Shared,
SingleThreaded,
MultiThreaded,
};

/// The NodeInstanceWrapper encapsulates the node instance.
class NodeInstanceWrapper
{
Expand All @@ -35,9 +43,15 @@ class NodeInstanceWrapper

NodeInstanceWrapper(
std::shared_ptr<void> node_instance,
NodeBaseInterfaceGetter node_base_interface_getter)
: node_instance_(node_instance), node_base_interface_getter_(node_base_interface_getter)
{}
NodeBaseInterfaceGetter node_base_interface_getter,
ExecutorType executor_type = ExecutorType::Shared,
int thread_num = 0)
: executor_type_(executor_type),
thread_num_(thread_num),
node_instance_(node_instance),
node_base_interface_getter_(node_base_interface_getter)
{
}

/// Get a type-erased pointer to the original Node instance
/**
Expand All @@ -62,7 +76,33 @@ class NodeInstanceWrapper
return node_base_interface_getter_(node_instance_);
}

ExecutorType
get_executor_type()
{
return executor_type_;
}

int
get_thread_num()
{
return thread_num_;
}

void
set_executor_type(ExecutorType executor_type)
{
executor_type_ = executor_type;
}

void
set_thread_num(int thread_num)
{
thread_num_ = thread_num;
}

private:
ExecutorType executor_type_;
int thread_num_;
std::shared_ptr<void> node_instance_;
NodeBaseInterfaceGetter node_base_interface_getter_;
};
Expand Down
12 changes: 10 additions & 2 deletions rclcpp_components/src/component_container_isolated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
#include "rclcpp/utilities.hpp"

#include "rclcpp_components/component_manager_isolated.hpp"

int main(int argc, char * argv[])
[[deprecated(
"component_container_isolated is deprecated. Use component_container, and specify the type of "
"executor you want to use for each node with the 'executor_type' parameter of load_node.")]]
int
main(int argc, char * argv[])
{
/// Component container with dedicated single-threaded executors for each components.
rclcpp::init(argc, argv);
Expand All @@ -46,6 +49,11 @@ int main(int argc, char * argv[])
rclcpp_components::ComponentManagerIsolated<rclcpp::executors::SingleThreadedExecutor>;
node = std::make_shared<ComponentManagerIsolated>(exec);
}

RCLCPP_WARN(node->get_logger(),
"component_container_isolated is deprecated. Use component_container, and specify the type of "
"executor you want to use for each node with the 'executor_type' parameter of load_node.");

exec->add_node(node);
exec->spin();

Expand Down
107 changes: 102 additions & 5 deletions rclcpp_components/src/component_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "rcl_interfaces/msg/integer_range.hpp"

#include "rcpputils/split.hpp"
#include "rcpputils/thread_name.hpp"

#include "rmw/types.h"

Expand Down Expand Up @@ -83,9 +84,19 @@ ComponentManager::~ComponentManager()
RCLCPP_DEBUG(get_logger(), "Removing components from executor");
if (auto exec = executor_.lock()) {
for (auto & wrapper : node_wrappers_) {
exec->remove_node(wrapper.second.get_node_base_interface());
if (wrapper.second.get_executor_type() == ExecutorType::Shared) {
RCLCPP_DEBUG(get_logger(), "Removing node %lu from shared executor.", wrapper.first);
exec->remove_node(wrapper.second.get_node_base_interface());
}
}
}

for (auto & executor_wrapper : dedicated_executor_wrappers_) {
RCLCPP_DEBUG(get_logger(), "Removing node %lu's executor.", executor_wrapper.first);
cancel_executor(executor_wrapper.second);
}

node_wrappers_.clear();
}
}

Expand Down Expand Up @@ -248,16 +259,63 @@ ComponentManager::set_executor(const std::weak_ptr<rclcpp::Executor> executor)
void
ComponentManager::add_node_to_executor(uint64_t node_id)
{
if (auto exec = executor_.lock()) {
exec->add_node(node_wrappers_[node_id].get_node_base_interface(), true);
if (node_wrappers_[node_id].get_executor_type() == ExecutorType::Shared) {
if (auto exec = executor_.lock()) {
exec->add_node(node_wrappers_[node_id].get_node_base_interface(), true);
}
return;
}

std::shared_ptr<rclcpp::Executor> exec;
switch (node_wrappers_[node_id].get_executor_type()) {
case ExecutorType::Shared:
break; // Unreachable
case ExecutorType::SingleThreaded:
exec = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
break;
case ExecutorType::MultiThreaded:
exec = std::make_shared<rclcpp::executors::MultiThreadedExecutor>(
rclcpp::ExecutorOptions(), node_wrappers_[node_id].get_thread_num());
break;
}

exec->add_node(node_wrappers_[node_id].get_node_base_interface());

// Emplace rather than std::move since move operations are deleted for atomics
auto result = dedicated_executor_wrappers_.emplace(std::make_pair(node_id, exec));
DedicatedExecutorWrapper & wrapper = result.first->second;
wrapper.executor = exec;
auto & thread_initialized = wrapper.thread_initialized;
auto name = node_wrappers_[node_id].get_node_base_interface()->get_name();
// Copy name so that it doesn't deallocate before the thread is started
wrapper.thread = std::thread([exec, &thread_initialized, name]() {
try {
rcpputils::set_thread_name(name);
} catch (const std::system_error & e) {
RCLCPP_WARN(rclcpp::get_logger("rclcpp"),
"Failed to set thread name: %s (%s)",
e.what(),
e.code().message().c_str());
}
thread_initialized = true;
exec->spin();
});
}

void
ComponentManager::remove_node_from_executor(uint64_t node_id)
{
if (auto exec = executor_.lock()) {
exec->remove_node(node_wrappers_[node_id].get_node_base_interface());
if (node_wrappers_[node_id].get_executor_type() == ExecutorType::Shared) {
if (auto exec = executor_.lock()) {
exec->remove_node(node_wrappers_[node_id].get_node_base_interface());
}
return;
}

auto executor_wrapper = dedicated_executor_wrappers_.find(node_id);
if (executor_wrapper != dedicated_executor_wrappers_.end()) {
cancel_executor(executor_wrapper->second);
dedicated_executor_wrappers_.erase(executor_wrapper);
}
}

Expand Down Expand Up @@ -307,6 +365,18 @@ ComponentManager::on_load_node(
throw ComponentManagerException("Component constructor threw an exception");
}

if (request->executor_type == "") {
node_wrappers_[node_id].set_executor_type(ExecutorType::Shared);
} else if (request->executor_type == "SingleThreadedExecutor") {
node_wrappers_[node_id].set_executor_type(ExecutorType::SingleThreaded);
} else if (request->executor_type == "MultiThreadedExecutor") {
node_wrappers_[node_id].set_executor_type(ExecutorType::MultiThreaded);
node_wrappers_[node_id].set_thread_num(request->thread_num);
} else {
node_wrappers_.erase(node_id);
throw ComponentManagerException("Invalid executor_type: " + request->executor_type);
}

add_node_to_executor(node_id);

auto node = node_wrappers_[node_id].get_node_base_interface();
Expand Down Expand Up @@ -362,4 +432,31 @@ ComponentManager::on_list_nodes(
}
}

void
ComponentManager::cancel_executor(DedicatedExecutorWrapper & executor_wrapper)
{
// Verify that the executor thread has begun spinning.
// If it has not, then wait until the thread starts to ensure
// that cancel() will fully stop the execution
//
// This prevents a previous race condition that occurs between the
// creation of the executor spin thread and cancelling an executor

if (!executor_wrapper.thread_initialized) {
auto context = this->get_node_base_interface()->get_context();

// Guarantee that either the executor is spinning or we are shutting down.
while (!executor_wrapper.executor->is_spinning() && rclcpp::ok(context)) {
// This is an arbitrarily small delay to avoid busy looping
rclcpp::sleep_for(std::chrono::milliseconds(1));
}
}

// After the while loop we are sure that the executor is now spinning, so
// the call to cancel() will work.
executor_wrapper.executor->cancel();
// Wait for the thread task to return
executor_wrapper.thread.join();
}

} // namespace rclcpp_components