在《Robot Operating System——Service的同步/异步通信》一文中,我们介绍了如何实现Service的客户端和服务端。在例子中,它们分别被编译成libadd_two_ints_client_async_library.so和libadd_two_ints_server_library.so,然后分别被可执行程序add_two_ints_client_async和add_two_ints_server加载。这样它们就以两个独立的进程运行起来。
本文我们将介绍,如果在一个进程中,使用单线程模式运行两个Node代码。本文选用的例子是composition/src/manual_composition.cpp、composition/src/client_component.cpp和composition/src/server_component.cpp。
服务端
和之前我们介绍的例子一样,Server需要继承于Node类,然后通过Node::create_service创建服务端,并设置一个处理请求的回调。
// composition/include/composition/server_component.hpp
class Server : public rclcpp::Node
{
public:
COMPOSITION_PUBLIC
explicit Server(const rclcpp::NodeOptions & options);
private:
rclcpp::Service<example_interfaces::srv::AddTwoInts>::SharedPtr srv_;
};
// composition/src/server_component.cpp
Server::Server(const rclcpp::NodeOptions & options)
: Node("Server", options)
{
auto handle_add_two_ints =
[this](
const std::shared_ptr<example_interfaces::srv::AddTwoInts::Request> request,
std::shared_ptr<example_interfaces::srv::AddTwoInts::Response> response
) -> void
{
RCLCPP_INFO(
this->get_logger(), "Incoming request: [a: %" PRId64 ", b: %" PRId64 "]",
request->a, request->b);
std::flush(std::cout);
response->sum = request->a + request->b;
};
srv_ = create_service<example_interfaces::srv::AddTwoInts>("add_two_ints", handle_add_two_ints);
}
客户端
客户端同样继承于Node。它会在定时器timer_中,使用rclcpp::Client与上述Service进行通信。
// composition/include/composition/client_component.hpp
class Client : public rclcpp::Node
{
public:
COMPOSITION_PUBLIC
explicit Client(const rclcpp::NodeOptions & options);
protected:
void on_timer();
private:
rclcpp::Client<example_interfaces::srv::AddTwoInts>::SharedPtr client_;
rclcpp::TimerBase::SharedPtr timer_;
};
由于使用的是异步通信async_send_request方法,所以我们要使用Future在服务端响应后,将结果打印出来。
// composition/src/client_component.cpp
Client::Client(const rclcpp::NodeOptions & options)
: Node("Client", options)
{
client_ = create_client<example_interfaces::srv::AddTwoInts>("add_two_ints");
// Note(dhood): The timer period must be greater than the duration of the timer callback.
// Otherwise, the timer can starve a single-threaded executor.
// See https://github.com/ros2/rclcpp/issues/392 for updates.
timer_ = create_wall_timer(2s, [this]() {return this->on_timer();});
}
void Client::on_timer()
{
if (!client_->wait_for_service(1s)) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(
this->get_logger(),
"Interrupted while waiting for the service. Exiting.");
return;
}
RCLCPP_INFO(this->get_logger(), "Service not available after waiting");
return;
}
auto request = std::make_shared<example_interfaces::srv::AddTwoInts::Request>();
request->a = 2;
request->b = 3;
// In order to wait for a response to arrive, we need to spin().
// However, this function is already being called from within another spin().
// Unfortunately, the current version of spin() is not recursive and so we
// cannot call spin() from within another spin().
// Therefore, we cannot wait for a response to the request we just made here
// within this callback, because it was executed by some other spin function.
// The workaround for this is to give the async_send_request() method another
// argument which is a callback that gets executed once the future is ready.
// We then return from this callback so that the existing spin() function can
// continue and our callback will get called once the response is received.
using ServiceResponseFuture =
rclcpp::Client<example_interfaces::srv::AddTwoInts>::SharedFuture;
auto response_received_callback = [this](ServiceResponseFuture future) {
RCLCPP_INFO(this->get_logger(), "Got result: [%" PRId64 "]", future.get()->sum);
};
auto future_result = client_->async_send_request(request, response_received_callback);
}
主程序
在主程序中,我们首先需要创建一个SingleThreadedExecutor对象exec。然后将上述两个Node的对象添加到exec。
int main(int argc, char * argv[])
{
// Force flush of the stdout buffer.
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
// Initialize any global resources needed by the middleware and the client library.
// This will also parse command line arguments one day (as of Beta 1 they are not used).
// You must call this before using any other part of the ROS system.
// This should be called once per process.
rclcpp::init(argc, argv);
// Create an executor that will be responsible for execution of callbacks for a set of nodes.
// With this version, all callbacks will be called from within this thread (the main one).
rclcpp::executors::SingleThreadedExecutor exec;
rclcpp::NodeOptions options;
// Add some nodes to the executor which provide work for the executor during its "spin" function.
// An example of available work is executing a subscription callback, or a timer callback.
auto server = std::make_shared<composition::Server>(options);
exec.add_node(server);
auto client = std::make_shared<composition::Client>(options);
exec.add_node(client);
// spin will block until work comes in, execute work as it becomes available, and keep blocking.
// It will only be interrupted by Ctrl-C.
exec.spin();
rclcpp::shutdown();
return 0;
}
SingleThreadedExecutor::spin()方法在底层会不停检测两个Node的回调是否需要被调用(get_next_executable)。如果有,则调用execute_any_executable方法让这个回调执行。
void
SingleThreadedExecutor::spin()
{
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
// Clear any previous result and rebuild the waitset
this->wait_result_.reset();
this->entities_need_rebuild_ = true;
while (rclcpp::ok(this->context_) && spinning.load()) {
rclcpp::AnyExecutable any_executable;
if (get_next_executable(any_executable)) {
execute_any_executable(any_executable);
}
}
}
通过这段代码,我们就能理解为什么ROS2的设计中有大量大回调函数。以及上例中客户端的代码在构造函数中,创建了一个Timer来定时执行任务,而不是通过死循环来不停发送任务——因为构造函数需要快速返回,不能堵塞住。