一、说明
二、问题:不同步重播
任何曾经认真开发过 ROS2 的人都会知道这个问题:我们想调试我们的管道或改进算法的微小部分,但当我们反复运行管道时,我们得到了不同的结果。如果我们处于开发的早期阶段,并且我们的管道运行速度不够快,无法实时处理所有数据,我们最终会丢失消息。如果我们在代码中放置断点,我们最终会丢失消息。如果我们在后台的同一台机器上发生了一些完全不相关但半繁重的处理,我们最终会丢失消息。最糟糕的是,症状在每次运行之间可能会有所不同,从完全系统故障到对结果的影响最小,因此我们通常不会直接意识到消息丢失可能是问题所在。
当然,我们可以非常缓慢地重播所有数据。但是,我们最终会永远等待,直到重播达到我们感兴趣的点。同样,如果我们想在CI服务器上运行管道,我们通常不知道有多少计算资源可用以及我们可以多快地运行我们的系统。
总结:重播器以固定速率重播其消息,对于算法管道来说,这可能是快或慢。因此,我们要么失去时间,要么得到不确定的结果。
三、方法:受控回放
解决这个问题的基本思想非常简单:我们需要一个考虑到管道当前状态的受控重放。然而,由于ROS2s的异步和松散耦合设计,实现这一点并非易事。特别是因为我们不想用“仅”开发所需的机制来混淆整个系统。
不幸的是,ROS2 没有针对这个(在我看来非常明显)问题的构建解决方案。当然,它是开源的,我们可以着手实现我们自己的 ros bag 重播器。但是,如上所述,我们希望在框架内开发一种算法。我们不想先实现框架。幸运的是,到目前为止,ROS2工具已经发展了很多,并且只需相对较少的努力,我们就可以应用一种在实践中可以很好地工作的解决方法。
这个概念是有一个额外的节点,就像重播者的遥控器一样。每当所有节点通知远程节点它们已准备就绪时,它都会触发新消息的重播。
四、实施
使用服务客户端体系结构可以相对直接地完成实现,如下所述。包括一个小示例在内的所有代码都可以在这里找到。它也可以很容易地作为 ROS 包包含在内。
远程是一个附加节点,充当rosbag2_replayer的代理。它使用“突发”服务每 T 秒触发接下来的 N 条消息,并等待每个节点的确认。实现自定义服务确实在调用方的身份旁边传输“就绪”信号。不幸的是,当我们使用突发调用“跳过”袋子时,重播者不会自动关闭。因此,额外的计时器会定期检查模拟时间是否仍在增加,以确定重播是否已结束。这要求重播者发布时钟(-clock),并使用 use-sim-time 参数集运行遥控器。然后,节点可以在重放结束时关闭自身/其整个组合。
我们可以将其作为可组合节点包含,也可以通过以下方式启动它:
ros2 run controllable_replay remote --ros-args -p use_sim_time:=True -p batch_size:=10 -p period:=0.01 -p automatic_shutdown:=5
这将每 10 毫秒播放 10 条消息,并在重播者处于非活动状态 5 秒后关闭节点。
五、算法节点
在算法方面,我们需要一些逻辑来通知遥控器我们已经准备好了。在简单节点中,我们可以在每个回调的末尾添加它。在更复杂的节点中,消息和回调之间可能没有一对一的映射。在这种情况下,需要一些额外的逻辑来监视节点的工作负载,例如通过观察输入队列大小。
对于一个简单的示例,我们将创建一个字符串侦听器节点,该节点计算收到的消息数并模拟具有特定时间长度的繁重任务。任务完成后,它将发布自己的消息并通知远程设备已准备就绪。
#include "Listener.h"
using namespace std::chrono_literals;
namespace controlled_replay_example {
Listener::Listener(const rclcpp::NodeOptions& options) :
rclcpp::Node("Listener", options),
_cliReady{create_client<controlled_replay_interfaces::srv::Ready>("/ready")},
_pub{create_publisher<std_msgs::msg::String>("/hearsay", 10)},
_sub{create_subscription<std_msgs::msg::String>( "/chatter", 10,
[&](std_msgs::msg::String::ConstSharedPtr msg) {
_ctr++;
std::this_thread::sleep_for(get_parameter("task_time").as_double() * 1000ms); // heavy task
auto rq = std::make_shared<controlled_replay_interfaces::srv::Ready::Request>();
rq->isready = true; _cliReady->async_send_request(rq); // inform replayer
})}, _timer{create_wall_timer(1s, [&]() { RCLCPP_INFO(get_logger(), "Received messages: %ld", _ctr); })}
{ declare_parameter("task_time", 1.0); } }
// namespace controlled_replay_example
#include "rclcpp_components/register_node_macro.hpp"
RCLCPP_COMPONENTS_REGISTER_NODE(controlled_replay_example::Listener)
我们使用一个包含 464 个字符串消息的包以 100hz 的速率测试上面的示例。我们将模拟一个算法管道,其中包含两个具有不同时间长度的链节点。第一个节点将以 100hz 运行,但第二个节点仅以 2hz 运行。因此,从理论上讲,第一个节点应该能够处理所有消息,但第二个节点会错过一些消息。
在第一次运行中,我们将在没有远程的情况下简单地运行管道。
#!/bin/bash
ros2 run controlled_replay_example listener --ros-args -p task_time:=0.01 -r __node:=listener1 &
ros2 run controlled_replay_example listener --ros-args -p task_time:=0.5 -r __node:=listener2 -r chatter:=hearsay -r hearsay:=hearsay1&
ros2 bag play bag
重放期间的节点图
[INFO] [1695973373.033875499] [listener2]: Received messages: 8
[INFO] [1695973373.732552274] [listener1]: Received messages: 445
我们可以看到,即使是第一个节点也丢失了一些消息。但是,第一个只处理了8个!
在第二次运行中,我们将使用远程运行管道。
#!/bin/bash
ros2 run controlled_replay_example listener --ros-args -p task_time:=0.01 -r __node:=listener1 &
ros2 run controlled_replay_example listener --ros-args -p task_time:=0.5 -r __node:=listener2 -r chatter:=hearsay -r hearsay:=hearsay1 &
ros2 run controlled_replay remote --ros-args -p batch_size:=1 -p use_sim_time:=True &
ros2 bag play --clock --start-paused bag
具有受控回放的节点图
[INFO] [1695973373.033875499] [listener2]: Received messages: 464
[INFO] [1695973373.732552274] [listener1]: Received messages: 464
我们可以看到两个节点都收到了所有消息。
六、结论
我们已经看到了一种在 ROS2 中获得可控重放的简单方法。通过添加一个额外的远程节点,我们可以将大部分任务封装在远离实际管道的地方。遥控器将重播速度减慢到节点可以处理的任何速度,从而避免丢失消息。
此处提供的远程,可以通过 ROS2 包管理包含在内。
七、未解决的问题和未来工作
- 很快,重播者应该可以作为可组合节点使用。我希望这将进一步提高重播速度。它应该与上述实现顺利配合。
- 使用这些服务时,ros bag 重播器会向终端发送垃圾邮件,其中包含每个服务的消息,我没有找到一种方法来阻止日志记录部分将其重定向到 /dev/null。但是,这超过了重播者的所有输出
- 最好找到更简单的方法来监视节点的工作负载,甚至可能是外部的工作负载。例如,如果我们能以某种方式访问待处理的回调,这已经有所帮助,但我没有找到做到这一点的方法。