大纲
- 同步
- 创建SyncParametersClient
- 设置监控回调
- 回调函数主体
- 测试
- 完整代码
- 异步
- 创建AsyncParametersClient
- 设置监控回调
- 测试
- 完整代码
在《Robot Operating System——Parameter设置的预处理、校验和成功回调》一文中,我们使用Node::add_post_set_parameters_callback设置一个回调函数,用于在Parameters设置成功后,执行相关后续动作,但是这种行为发生在Node内部。
如果我们希望在Node外部监控Parameters变动,则可以使用SyncParametersClient或者AsyncParametersClient类的on_parameter_event方法设置一个回调函数来监控。
我们分别以同步和异步的方式讲解这个功能。
同步
我们参考的例子是demo_nodes_cpp/src/parameters/parameter_events.cpp。
首先我们创建一个待监控的Node
int main(int argc, char ** argv)
{
// Force flush of the stdout buffer.
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("parameter_events");
创建SyncParametersClient
然后创建一个该Node的连接客户端,并和服务端进行连接。
auto parameters_client = std::make_shared<rclcpp::SyncParametersClient>(node);
while (!parameters_client->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(node->get_logger(), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(node->get_logger(), "service not available, waiting again...");
}
设置监控回调
我们通过SyncParametersClient::on_parameter_event设置监控回调函数。它的入参是是一个rcl_interfaces::msg::ParameterEvent指针,用于表达Parameter变动的事件。
events_received_promise 是一个std::promise,我们通过它让回调函数和主程序之间有通信。如下面代码,当回调执行10次后,promise会被设置以通知主程序退出执行。
auto events_received_promise = std::make_shared<std::promise<void>>();
auto events_received_future = events_received_promise->get_future();
// Setup callback for changes to parameters.
auto sub = parameters_client->on_parameter_event(
[node, promise = std::move(events_received_promise)](
rcl_interfaces::msg::ParameterEvent::UniquePtr event) -> void
{
static size_t n_times_called = 0u;
if (on_parameter_event(std::move(event), node->get_logger())) {
++n_times_called;
}
if (10u == n_times_called) {
// This callback will be called 10 times, set the promise when that happens.
promise->set_value();
}
});
回调函数主体
event中包含三种数据:
- new_parameters。记录新增的Parameters。
- changed_parameters。记录修改的Parameters。
- deleted_parameters。记录删除的Parameters。
// /opt/ros/jazzy/include/rcl_interfaces/rcl_interfaces/msg/detail/parameter_event__struct.hpp
using _new_parameters_type =
std::vector<rcl_interfaces::msg::Parameter_<ContainerAllocator>, typename std::allocator_traits<ContainerAllocator>::template rebind_alloc<rcl_interfaces::msg::Parameter_<ContainerAllocator>>>;
_new_parameters_type new_parameters;
using _changed_parameters_type =
std::vector<rcl_interfaces::msg::Parameter_<ContainerAllocator>, typename std::allocator_traits<ContainerAllocator>::template rebind_alloc<rcl_interfaces::msg::Parameter_<ContainerAllocator>>>;
_changed_parameters_type changed_parameters;
using _deleted_parameters_type =
std::vector<rcl_interfaces::msg::Parameter_<ContainerAllocator>, typename std::allocator_traits<ContainerAllocator>::template rebind_alloc<rcl_interfaces::msg::Parameter_<ContainerAllocator>>>;
_deleted_parameters_type deleted_parameters;
在监控函数中,我们会过滤掉新增Parameters中的有关qos_overrides前缀的项目。然后分门别类的把上述三种Parameters打印出来。
bool on_parameter_event(
rcl_interfaces::msg::ParameterEvent::UniquePtr event, rclcpp::Logger logger)
{
// TODO(wjwwood): The message should have an operator<<, which would replace all of this.
std::stringstream ss;
// ignore qos overrides
event->new_parameters.erase(
std::remove_if(
event->new_parameters.begin(),
event->new_parameters.end(),
[](const auto & item) {
const char * param_override_prefix = "qos_overrides.";
return std::strncmp(
item.name.c_str(), param_override_prefix, sizeof(param_override_prefix) - 1) == 0u;
}),
event->new_parameters.end());
if (
!event->new_parameters.size() && !event->changed_parameters.size() &&
!event->deleted_parameters.size())
{
return false;
}
ss << "\nParameter event:\n new parameters:";
for (auto & new_parameter : event->new_parameters) {
ss << "\n " << new_parameter.name;
}
ss << "\n changed parameters:";
for (auto & changed_parameter : event->changed_parameters) {
ss << "\n " << changed_parameter.name;
}
ss << "\n deleted parameters:";
for (auto & deleted_parameter : event->deleted_parameters) {
ss << "\n " << deleted_parameter.name;
}
ss << "\n";
RCLCPP_INFO(logger, "%s", ss.str().c_str());
return true;
}
测试
// Declare parameters that may be set on this node
node->declare_parameter("foo", 0);
node->declare_parameter("bar", "");
node->declare_parameter("baz", 0.);
node->declare_parameter("foobar", false);
// Set several different types of parameters.
auto set_parameters_results = parameters_client->set_parameters(
{
rclcpp::Parameter("foo", 2),
rclcpp::Parameter("bar", "hello"),
rclcpp::Parameter("baz", 1.45),
rclcpp::Parameter("foobar", true),
});
// Change the value of some of them.
set_parameters_results = parameters_client->set_parameters(
{
rclcpp::Parameter("foo", 3),
rclcpp::Parameter("bar", "world"),
});
// TODO(wjwwood): Create and use delete_parameter
rclcpp::spin_until_future_complete(node, events_received_future.share());
rclcpp::shutdown();
return 0;
}
上面的代码的declare_parameter和set_parameters都会导致监控被执行。这些操作一共10次,这样就会导致回调函数中Promise被设置,进而让rclcpp::spin_until_future_complete退出等待,从而关闭整个程序。
完整代码
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono>
#include <cstring>
#include <future>
#include <memory>
#include <sstream>
#include <utility>
#include "rcl_interfaces/msg/parameter_event.hpp"
#include "rclcpp/rclcpp.hpp"
using namespace std::chrono_literals;
bool on_parameter_event(
rcl_interfaces::msg::ParameterEvent::UniquePtr event, rclcpp::Logger logger)
{
// TODO(wjwwood): The message should have an operator<<, which would replace all of this.
std::stringstream ss;
// ignore qos overrides
event->new_parameters.erase(
std::remove_if(
event->new_parameters.begin(),
event->new_parameters.end(),
[](const auto & item) {
const char * param_override_prefix = "qos_overrides.";
return std::strncmp(
item.name.c_str(), param_override_prefix, sizeof(param_override_prefix) - 1) == 0u;
}),
event->new_parameters.end());
if (
!event->new_parameters.size() && !event->changed_parameters.size() &&
!event->deleted_parameters.size())
{
return false;
}
ss << "\nParameter event:\n new parameters:";
for (auto & new_parameter : event->new_parameters) {
ss << "\n " << new_parameter.name;
}
ss << "\n changed parameters:";
for (auto & changed_parameter : event->changed_parameters) {
ss << "\n " << changed_parameter.name;
}
ss << "\n deleted parameters:";
for (auto & deleted_parameter : event->deleted_parameters) {
ss << "\n " << deleted_parameter.name;
}
ss << "\n";
RCLCPP_INFO(logger, "%s", ss.str().c_str());
return true;
}
int main(int argc, char ** argv)
{
// Force flush of the stdout buffer.
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
rclcpp::init(argc, argv);
auto node = rclcpp::Node::make_shared("parameter_events");
auto parameters_client = std::make_shared<rclcpp::SyncParametersClient>(node);
while (!parameters_client->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(node->get_logger(), "Interrupted while waiting for the service. Exiting.");
return 0;
}
RCLCPP_INFO(node->get_logger(), "service not available, waiting again...");
}
auto events_received_promise = std::make_shared<std::promise<void>>();
auto events_received_future = events_received_promise->get_future();
// Setup callback for changes to parameters.
auto sub = parameters_client->on_parameter_event(
[node, promise = std::move(events_received_promise)](
rcl_interfaces::msg::ParameterEvent::UniquePtr event) -> void
{
static size_t n_times_called = 0u;
if (on_parameter_event(std::move(event), node->get_logger())) {
++n_times_called;
}
if (10u == n_times_called) {
// This callback will be called 10 times, set the promise when that happens.
promise->set_value();
}
});
// Declare parameters that may be set on this node
node->declare_parameter("foo", 0);
node->declare_parameter("bar", "");
node->declare_parameter("baz", 0.);
node->declare_parameter("foobar", false);
// Set several different types of parameters.
auto set_parameters_results = parameters_client->set_parameters(
{
rclcpp::Parameter("foo", 2),
rclcpp::Parameter("bar", "hello"),
rclcpp::Parameter("baz", 1.45),
rclcpp::Parameter("foobar", true),
});
// Change the value of some of them.
set_parameters_results = parameters_client->set_parameters(
{
rclcpp::Parameter("foo", 3),
rclcpp::Parameter("bar", "world"),
});
// TODO(wjwwood): Create and use delete_parameter
rclcpp::spin_until_future_complete(node, events_received_future.share());
rclcpp::shutdown();
return 0;
}
异步
我们参考demo_nodes_cpp/src/parameters/parameter_events_async.cpp的实现。
这次我们使用Node的方式来运行监控。
class ParameterEventsAsyncNode : public rclcpp::Node
{
public:
DEMO_NODES_CPP_PUBLIC
explicit ParameterEventsAsyncNode(const rclcpp::NodeOptions & options)
: Node("parameter_events", options)
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
创建AsyncParametersClient
我们创建一个和本Node连接的AsyncParametersClient对象,并测试其连接。
// Typically a parameter client is created for a remote node by passing the name of the remote
// node in the constructor; in this example we create a parameter client for this node itself.
parameters_client_ = std::make_shared<rclcpp::AsyncParametersClient>(this);
// Even though this is in the same node, we still have to wait for the
// service to be available before declaring parameters (otherwise there is
// a chance we'll miss some events in the callback).
while (!parameters_client_->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(this->get_logger(), "interrupted while waiting for the service. exiting.");
rclcpp::shutdown();
return;
}
RCLCPP_INFO(this->get_logger(), "service not available, waiting again...");
}
设置监控回调
回调函数和之前介绍的同步模式中的逻辑是一样的,这儿就不赘述了。
auto on_parameter_event_callback =
[this](rcl_interfaces::msg::ParameterEvent::UniquePtr event) -> void
{
// ignore qos overrides
event->new_parameters.erase(
std::remove_if(
event->new_parameters.begin(),
event->new_parameters.end(),
[](const auto & item) {
const char * param_override_prefix = "qos_overrides.";
return std::strncmp(
item.name.c_str(), param_override_prefix, sizeof(param_override_prefix) - 1) == 0u;
}),
event->new_parameters.end());
if (
!event->new_parameters.size() && !event->changed_parameters.size() &&
!event->deleted_parameters.size())
{
return;
}
// TODO(wjwwood): The message should have an operator<<, which would replace all of this.
std::stringstream ss;
ss << "\nParameter event:\n new parameters:";
for (auto & new_parameter : event->new_parameters) {
ss << "\n " << new_parameter.name;
}
ss << "\n changed parameters:";
for (auto & changed_parameter : event->changed_parameters) {
ss << "\n " << changed_parameter.name;
}
ss << "\n deleted parameters:";
for (auto & deleted_parameter : event->deleted_parameters) {
ss << "\n " << deleted_parameter.name;
}
ss << "\n";
RCLCPP_INFO(this->get_logger(), "%s", ss.str().c_str());
};
// Setup callback for changes to parameters.
parameter_event_sub_ = parameters_client_->on_parameter_event(on_parameter_event_callback);
测试
先声明一些Parameters。这个行为也会触发监控函数执行。
// Declare parameters that may be set on this node
this->declare_parameter("foo", 0);
this->declare_parameter("bar", "");
this->declare_parameter("baz", 0.);
this->declare_parameter("foobar", false);
后面的测试代码是一个套娃模式。它会先启动一个Timer,执行一次值的设置。由于AsyncParametersClient的set_parameters也是异步的,但是我们可以设置一个完成时执行的回调。在这个回调中,我们再设置一些值,并最终关闭整个Node。
// Queue a `set_parameters` request as soon as `spin` is called on this node.
// TODO(dhood): consider adding a "call soon" notion to Node to not require a timer for this.
timer_ = create_wall_timer(
200ms,
[this]() {
this->queue_first_set_parameter_request();
});
}
private:
// Set several different types of parameters.
DEMO_NODES_CPP_LOCAL
void queue_first_set_parameter_request()
{
timer_->cancel(); // Prevent another request from being queued by the timer.
while (!parameters_client_->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(this->get_logger(), "interrupted while waiting for the service. exiting.");
rclcpp::shutdown();
return;
}
RCLCPP_INFO(this->get_logger(), "service not available, waiting again...");
}
auto response_received_callback = [this](SetParametersResult future) {
// Check to see if they were set.
for (auto & result : future.get()) {
if (!result.successful) {
RCLCPP_ERROR(this->get_logger(), "Failed to set parameter: %s", result.reason.c_str());
}
}
this->queue_second_set_parameter_request();
};
parameters_client_->set_parameters(
{
rclcpp::Parameter("foo", 2),
rclcpp::Parameter("bar", "hello"),
rclcpp::Parameter("baz", 1.45),
rclcpp::Parameter("foobar", true),
}, response_received_callback);
}
// Change the value of some of them.
DEMO_NODES_CPP_LOCAL
void queue_second_set_parameter_request()
{
auto response_received_callback = [this](SetParametersResult future) {
// Check to see if they were set.
for (auto & result : future.get()) {
if (!result.successful) {
RCLCPP_ERROR(this->get_logger(), "Failed to set parameter: %s", result.reason.c_str());
}
}
// TODO(wjwwood): Create and use delete_parameter
// Give time for all of the ParameterEvent callbacks to be received.
timer_ = create_wall_timer(
100ms,
[]() {
rclcpp::shutdown();
});
};
parameters_client_->set_parameters(
{
rclcpp::Parameter("foo", 3),
rclcpp::Parameter("bar", "world"),
}, response_received_callback);
}
完整代码
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <chrono>
#include <cstring>
#include <memory>
#include <sstream>
#include <vector>
#include "rcl_interfaces/msg/parameter_event.hpp"
#include "rcl_interfaces/msg/set_parameters_result.hpp"
#include "rclcpp/rclcpp.hpp"
#include "rclcpp_components/register_node_macro.hpp"
#include "demo_nodes_cpp/visibility_control.h"
using namespace std::chrono_literals;
using SetParametersResult =
std::shared_future<std::vector<rcl_interfaces::msg::SetParametersResult>>;
namespace demo_nodes_cpp
{
class ParameterEventsAsyncNode : public rclcpp::Node
{
public:
DEMO_NODES_CPP_PUBLIC
explicit ParameterEventsAsyncNode(const rclcpp::NodeOptions & options)
: Node("parameter_events", options)
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
// Typically a parameter client is created for a remote node by passing the name of the remote
// node in the constructor; in this example we create a parameter client for this node itself.
parameters_client_ = std::make_shared<rclcpp::AsyncParametersClient>(this);
auto on_parameter_event_callback =
[this](rcl_interfaces::msg::ParameterEvent::UniquePtr event) -> void
{
// ignore qos overrides
event->new_parameters.erase(
std::remove_if(
event->new_parameters.begin(),
event->new_parameters.end(),
[](const auto & item) {
const char * param_override_prefix = "qos_overrides.";
return std::strncmp(
item.name.c_str(), param_override_prefix, sizeof(param_override_prefix) - 1) == 0u;
}),
event->new_parameters.end());
if (
!event->new_parameters.size() && !event->changed_parameters.size() &&
!event->deleted_parameters.size())
{
return;
}
// TODO(wjwwood): The message should have an operator<<, which would replace all of this.
std::stringstream ss;
ss << "\nParameter event:\n new parameters:";
for (auto & new_parameter : event->new_parameters) {
ss << "\n " << new_parameter.name;
}
ss << "\n changed parameters:";
for (auto & changed_parameter : event->changed_parameters) {
ss << "\n " << changed_parameter.name;
}
ss << "\n deleted parameters:";
for (auto & deleted_parameter : event->deleted_parameters) {
ss << "\n " << deleted_parameter.name;
}
ss << "\n";
RCLCPP_INFO(this->get_logger(), "%s", ss.str().c_str());
};
// Setup callback for changes to parameters.
parameter_event_sub_ = parameters_client_->on_parameter_event(on_parameter_event_callback);
// Even though this is in the same node, we still have to wait for the
// service to be available before declaring parameters (otherwise there is
// a chance we'll miss some events in the callback).
while (!parameters_client_->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(this->get_logger(), "interrupted while waiting for the service. exiting.");
rclcpp::shutdown();
return;
}
RCLCPP_INFO(this->get_logger(), "service not available, waiting again...");
}
// Declare parameters that may be set on this node
this->declare_parameter("foo", 0);
this->declare_parameter("bar", "");
this->declare_parameter("baz", 0.);
this->declare_parameter("foobar", false);
// Queue a `set_parameters` request as soon as `spin` is called on this node.
// TODO(dhood): consider adding a "call soon" notion to Node to not require a timer for this.
timer_ = create_wall_timer(
200ms,
[this]() {
this->queue_first_set_parameter_request();
});
}
private:
// Set several different types of parameters.
DEMO_NODES_CPP_LOCAL
void queue_first_set_parameter_request()
{
timer_->cancel(); // Prevent another request from being queued by the timer.
while (!parameters_client_->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(this->get_logger(), "interrupted while waiting for the service. exiting.");
rclcpp::shutdown();
return;
}
RCLCPP_INFO(this->get_logger(), "service not available, waiting again...");
}
auto response_received_callback = [this](SetParametersResult future) {
// Check to see if they were set.
for (auto & result : future.get()) {
if (!result.successful) {
RCLCPP_ERROR(this->get_logger(), "Failed to set parameter: %s", result.reason.c_str());
}
}
this->queue_second_set_parameter_request();
};
parameters_client_->set_parameters(
{
rclcpp::Parameter("foo", 2),
rclcpp::Parameter("bar", "hello"),
rclcpp::Parameter("baz", 1.45),
rclcpp::Parameter("foobar", true),
}, response_received_callback);
}
// Change the value of some of them.
DEMO_NODES_CPP_LOCAL
void queue_second_set_parameter_request()
{
auto response_received_callback = [this](SetParametersResult future) {
// Check to see if they were set.
for (auto & result : future.get()) {
if (!result.successful) {
RCLCPP_ERROR(this->get_logger(), "Failed to set parameter: %s", result.reason.c_str());
}
}
// TODO(wjwwood): Create and use delete_parameter
// Give time for all of the ParameterEvent callbacks to be received.
timer_ = create_wall_timer(
100ms,
[]() {
rclcpp::shutdown();
});
};
parameters_client_->set_parameters(
{
rclcpp::Parameter("foo", 3),
rclcpp::Parameter("bar", "world"),
}, response_received_callback);
}
rclcpp::AsyncParametersClient::SharedPtr parameters_client_;
rclcpp::Subscription<rcl_interfaces::msg::ParameterEvent>::SharedPtr parameter_event_sub_;
rclcpp::TimerBase::SharedPtr timer_;
};
} // namespace demo_nodes_cpp
RCLCPP_COMPONENTS_REGISTER_NODE(demo_nodes_cpp::ParameterEventsAsyncNode)