大纲
- 创建SingleThreadedExecutor
- 新增Node
- add_node
- trigger_entity_recollect
- collect_entities
- 自旋等待
- get_next_executable
- wait_for_work
- get_next_ready_executable
- Timer
- Subscription
- Service
- Client
- Waitable
- AnyExecutable
- execute_any_executable
- 参考资料
在ROS2中,我们设置的各种回调都是在执行器(Executor)中执行的,所以它是整个系统非常核心的组件。
目前,rclcpp 提供了三种 Executor 类型,它们派生自同一个父类Executor。
本文我们将借用《Robot Operating System——单线程中启动多个Node》中的例子,将单线程执行器在最上层的使用方法总结如下
// Create an executor that will be responsible for execution of callbacks for a set of nodes.
// With this version, all callbacks will be called from within this thread (the main one).
rclcpp::executors::SingleThreadedExecutor exec;
rclcpp::NodeOptions options;
// Add some nodes to the executor which provide work for the executor during its "spin" function.
// An example of available work is executing a subscription callback, or a timer callback.
auto server = std::make_shared<composition::Server>(options);
exec.add_node(server);
// spin will block until work comes in, execute work as it becomes available, and keep blocking.
// It will only be interrupted by Ctrl-C.
exec.spin();
即:
- 创建SingleThreadedExecutor ;
- 新增Node;
- 自旋等待。
创建SingleThreadedExecutor
SingleThreadedExecutor的构造函数基本就是交给基类rclcpp::Executor的构造函数来实现。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::ExecutorOptions & options)
: rclcpp::Executor(options) {}
在Executor的构造函数中,我们着重关注成员变量collector_。
Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
context_(options.context),
notify_waitable_(std::make_shared<rclcpp::executors::ExecutorNotifyWaitable>(
[this]() {
this->entities_need_rebuild_.store(true);
})),
entities_need_rebuild_(true),
collector_(notify_waitable_),
wait_set_({}, {}, {}, {}, {}, {}, options.context),
current_notify_waitable_(notify_waitable_),
impl_(std::make_unique<rclcpp::ExecutorImplementation>())
{
shutdown_callback_handle_ = context_->add_on_shutdown_callback(
[weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
auto strong_gc = weak_gc.lock();
if (strong_gc) {
strong_gc->trigger();
}
});
notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);
wait_set_.add_waitable(notify_waitable_);
}
collector_是一个集合,它保存了后续要执行的各个Node指针。
/// Collector used to associate executable entities from nodes and guard conditions
rclcpp::executors::ExecutorEntitiesCollector collector_;
新增Node
add_node
业务逻辑的Node会被添加到上面介绍的成员变量collector_中。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify)
{
this->collector_.add_node(node_ptr);
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
}
}
然后会调用trigger_entity_recollect方法。
trigger_entity_recollect
这个方法会做两件事:
- 修改std::atomic_bool类型变量entities_need_rebuild_的值为true,进而让collect_entities()被执行。
- 如果notify为true,则会通过interrupt_guard_condition_->trigger()唤醒一个处于等待状态的执行器。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::trigger_entity_recollect(bool notify)
{
this->entities_need_rebuild_.store(true);
if (!spinning.load() && entities_need_rebuild_.exchange(false)) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}
if (notify) {
interrupt_guard_condition_->trigger();
}
}
collect_entities
collect_entities主要做两件事:
- 过滤过期的Node以及相关回调函数。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::collect_entities()
{
// Updating the entity collection and waitset expires any active result
this->wait_result_.reset();
// Get the current list of available waitables from the collector.
rclcpp::executors::ExecutorEntitiesCollection collection;
this->collector_.update_collections();
auto callback_groups = this->collector_.get_all_callback_groups();
rclcpp::executors::build_entities_collection(callback_groups, collection);
// We must remove expired entities here, so that we don't continue to use older entities.
// See https://github.com/ros2/rclcpp/issues/2180 for more information.
current_collection_.remove_expired_entities();
- 更新current_collection_和wait_set_。
// Update each of the groups of entities in the current collection, adding or removing
// from the wait set as necessary.
current_collection_.timers.update(
collection.timers,
[this](auto timer) {wait_set_.add_timer(timer);},
[this](auto timer) {wait_set_.remove_timer(timer);});
current_collection_.subscriptions.update(
collection.subscriptions,
[this](auto subscription) {
wait_set_.add_subscription(subscription, kDefaultSubscriptionMask);
},
[this](auto subscription) {
wait_set_.remove_subscription(subscription, kDefaultSubscriptionMask);
});
current_collection_.clients.update(
collection.clients,
[this](auto client) {wait_set_.add_client(client);},
[this](auto client) {wait_set_.remove_client(client);});
current_collection_.services.update(
collection.services,
[this](auto service) {wait_set_.add_service(service);},
[this](auto service) {wait_set_.remove_service(service);});
current_collection_.guard_conditions.update(
collection.guard_conditions,
[this](auto guard_condition) {wait_set_.add_guard_condition(guard_condition);},
[this](auto guard_condition) {wait_set_.remove_guard_condition(guard_condition);});
current_collection_.waitables.update(
collection.waitables,
[this](auto waitable) {wait_set_.add_waitable(waitable);},
[this](auto waitable) {wait_set_.remove_waitable(waitable);});
// In the case that an entity already has an expired weak pointer
// before being removed from the waitset, additionally prune the waitset.
this->wait_set_.prune_deleted_entities();
}
自旋等待
spin()内部核心是一个while循环。它会不停使用get_next_executable取出可以运行的Node的回调,然后让execute_any_executable将其执行。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
void
SingleThreadedExecutor::spin()
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
// Clear any previous result and rebuild the waitset
this->wait_result_.reset();
this->entities_need_rebuild_ = true;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable)) {
execute_any_executable(any_executable);
}
}
}
那么这个while循环会不会导致CPU一直空转呢?答案是:不是。我们可以看get_next_executable的实现。
get_next_executable
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
bool
Executor::get_next_executable(AnyExecutable & any_executable, std::chrono::nanoseconds timeout)
{
bool success = false;
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
success = get_next_ready_executable(any_executable);
// If there are none
if (!success) {
// Wait for subscriptions or timers to work on
wait_for_work(timeout);
if (!spinning.load()) {
return false;
}
// Try again
success = get_next_ready_executable(any_executable);
}
return success;
}
它会在底层调用wait_for_work。
wait_for_work
这个方法会一直阻塞到时间超时,或者有回调函数可以被调用。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
TRACETOOLS_TRACEPOINT(rclcpp_executor_wait_for_work, timeout.count());
// Clear any previous wait result
this->wait_result_.reset();
{
std::lock_guard<std::mutex> guard(mutex_);
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}
}
this->wait_result_.emplace(wait_set_.wait(timeout));
if (!this->wait_result_ || this->wait_result_->kind() == WaitResultKind::Empty) {
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
} else {
if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
if (current_notify_waitable_->is_ready(rcl_wait_set)) {
current_notify_waitable_->execute(current_notify_waitable_->take_data());
}
}
}
}
其中主要负责等待的是这句
wait_set_.wait(timeout)
在SingleThreadedExecutor中,由于调用get_next_executable没有传递时间,便采用了默认时间。这样get_next_executable会一直等到有回调可以被执行。这样就避免了CPU空转的问题。
/// Wait for executable in ready state and populate union structure.
/**
* If an executable is ready, it will return immediately, otherwise
* block based on the timeout for work to become ready.
*
* \param[out] any_executable populated union structure of ready executable
* \param[in] timeout duration of time to wait for work, a negative value
* (the defualt behavior), will make this function block indefinitely
* \return true if an executable was ready and any_executable was populated,
* otherwise false
*/
RCLCPP_PUBLIC
bool
get_next_executable(
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
get_next_ready_executable
get_next_ready_executable会按顺序寻找Timer、Subscription、Service 、Client和Waitable中第一个处于可被回调状态的Node。
Timer
bool valid_executable = false;
if (!wait_result_.has_value() || wait_result_->kind() != rclcpp::WaitResultKind::Ready) {
return false;
}
if (!valid_executable) {
size_t current_timer_index = 0;
while (true) {
auto [timer, timer_index] = wait_result_->peek_next_ready_timer(current_timer_index);
if (nullptr == timer) {
break;
}
current_timer_index = timer_index;
auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
if (entity_iter != current_collection_.timers.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (!callback_group || !callback_group->can_be_taken_from()) {
current_timer_index++;
continue;
}
// At this point the timer is either ready for execution or was perhaps
// it was canceled, based on the result of call(), but either way it
// should not be checked again from peek_next_ready_timer(), so clear
// it from the wait result.
wait_result_->clear_timer_with_index(current_timer_index);
// Check that the timer should be called still, i.e. it wasn't canceled.
any_executable.data = timer->call();
if (!any_executable.data) {
current_timer_index++;
continue;
}
any_executable.timer = timer;
any_executable.callback_group = callback_group;
valid_executable = true;
break;
}
current_timer_index++;
}
}
Subscription
if (!valid_executable) {
while (auto subscription = wait_result_->next_ready_subscription()) {
auto entity_iter = current_collection_.subscriptions.find(
subscription->get_subscription_handle().get());
if (entity_iter != current_collection_.subscriptions.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.subscription = subscription;
any_executable.callback_group = callback_group;
valid_executable = true;
break;
}
}
}
Service
if (!valid_executable) {
while (auto service = wait_result_->next_ready_service()) {
auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
if (entity_iter != current_collection_.services.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.service = service;
any_executable.callback_group = callback_group;
valid_executable = true;
break;
}
}
}
Client
if (!valid_executable) {
while (auto client = wait_result_->next_ready_client()) {
auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
if (entity_iter != current_collection_.clients.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.client = client;
any_executable.callback_group = callback_group;
valid_executable = true;
break;
}
}
}
Waitable
if (!valid_executable) {
while (auto waitable = wait_result_->next_ready_waitable()) {
auto entity_iter = current_collection_.waitables.find(waitable.get());
if (entity_iter != current_collection_.waitables.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.waitable = waitable;
any_executable.callback_group = callback_group;
any_executable.data = waitable->take_data();
valid_executable = true;
break;
}
}
}
AnyExecutable
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/include/rclcpp/any_executable.hpp
struct AnyExecutable
{
RCLCPP_PUBLIC
AnyExecutable();
RCLCPP_PUBLIC
virtual ~AnyExecutable();
// Only one of the following pointers will be set.
rclcpp::SubscriptionBase::SharedPtr subscription;
rclcpp::TimerBase::SharedPtr timer;
rclcpp::ServiceBase::SharedPtr service;
rclcpp::ClientBase::SharedPtr client;
rclcpp::Waitable::SharedPtr waitable;
// These are used to keep the scope on the containing items
rclcpp::CallbackGroup::SharedPtr callback_group {nullptr};
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base {nullptr};
std::shared_ptr<void> data {nullptr};
};
execute_any_executable
找到可以执行的Node后,便可以调用execute_any_executable让其执行。
在execute_any_executable内部,我们看到它也是区分Timer、Subscription、Service 、Client和Waitable类型来执行的。
// https://github.com/ros2/rclcpp/blob/jazzy/rclcpp/src/rclcpp/executor.cpp
void
Executor::execute_any_executable(AnyExecutable & any_exec)
{
if (!spinning.load()) {
return;
}
assert(
(void("cannot execute an AnyExecutable without a valid callback group"),
any_exec.callback_group));
if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.timer->get_timer_handle().get()));
execute_timer(any_exec.timer, any_exec.data);
}
if (any_exec.subscription) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
static_cast<const void *>(any_exec.subscription->get_subscription_handle().get()));
execute_subscription(any_exec.subscription);
}
if (any_exec.service) {
execute_service(any_exec.service);
}
if (any_exec.client) {
execute_client(any_exec.client);
}
if (any_exec.waitable) {
const std::shared_ptr<void> & const_data = any_exec.data;
any_exec.waitable->execute(const_data);
}
// Reset the callback_group, regardless of type
any_exec.callback_group->can_be_taken_from().store(true);
}
参考资料
- https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Executors.html#executors