本文主要介绍ROS的Topics概念,如何创建Publisher和Subscriber,通过Topic在ROS程序间通信;介绍ROS的Services概念,如何创建Client和Server并建立通信。
更多内容,访问专栏目录获取实时更新。
ROS Topics
Topics可以被视为一种具名的总线,用于节点间交换数据,通过Topics可以发布和订阅消息,实现单向的流式通信。需要注意的重点包括:
- 单向流式通信(发布/订阅模式)
- 匿名通信
- 每个Topic都有特定的消息格式
- 可以在ROS节点中通过Python, C++等多种语言实现
- 一个节点可以拥有多个Topic
Publisher in Python
在ROS2基础编程一文中,我们已经创建了工作空间和工作包,在此基础上,来通过Python实现一个Topic的发布者。
在my_py_pkg工作包下创建一个新的文件robot_news_station.py
robot_news_station.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String # do not forget to add package dependency
class RobotNewsPublisherNode(Node):
def __init__(self):
super().__init__("robot_news_publisher")
self.robot_name = "HiRobot"
self.publisher = self.create_publisher(String, "robot_news", 10)
self.timer = self.create_timer(1, self.publish_news)
self.get_logger().info("Robot News Publisher has been started")
def publish_news(self):
msg = String()
msg.data = "Hi, this is " + str(self.robot_name) + " from the robot news publisher"
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = RobotNewsPublisherNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
还需要添加依赖包引用,修改package.xml文件:
然后装载我们的新节点,修改setup.py:
之后执行下面的指令就可以启动我们的publisher了:
colcon build --packages-select my_py_pkg --symlink-install
source ~/.bashrc
ros2 run my_py_pkg robot_news_station
ros2 topic echo /robot_news
Subscriber in Python
创建订阅者的方式与创建发布者类似,这里我们添加了一个名为robot_news_subscriber.py的文件:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String # do not forget to add package dependency
class RobotNewsSubscriberNode(Node):
def __init__(self):
super().__init__("robot_news_subscriber")
self.subscriber = self.create_subscription(String, "robot_news", self.subscribe_news_callback, 10)
self.get_logger().info("Robot News Subscriber has been started")
def subscribe_news_callback(self, msg):
self.get_logger().info(msg.data)
def main(args=None):
rclpy.init(args=args)
node = RobotNewsSubscriberNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
不要忘记修改setup.py和colcon build
编译。之后同时运行publisher和subscriber就可以看到右侧命令行里的subscriber每1s收到一条来自publisher的信息,并打印:
使用命令行工具调试Topics
ros2 topic list
ros2 topic info <topic_name> # 查看所有激活的topic
ros2 topic echo <topic_name> # 创建一个订阅者监听发布的消息
ros2 interface show <msg_type> # 显示话题消息的类型
ros2 topic hz <topic_name> # 获取发布频率
ros2 topic bw <topic_name> # 获取消息的长度,byte width
ros2 topic pub <topic_name> <msg_type> <msg_data> # 发布一个topic
ros2 topic pub -r 10 /robot_news example_interfaces/msg/String "{data: 'hello from robot'}"
ros2 node list
ros2 info <node_name>
ros2 run <pkg_name> <node_name> --ros-args -r __node:=<new_node_name>
ros2 run <pkg_name> <node_name> --ros-args -r __node:=<new_node_name> -r <topic_name>:=<new_topic_name>
Ros Services
前文提到Topics实现的是单向的传输,通过发布/订阅模式建立连接,但用在一些需要请求/回复的分布式系统中就不太合适了。
Services可以帮助我们实现请求/回复的通信模式,一条消息用于请求,一条用于回复,ROS节点以字符串名称提供服务,客户端通过发送请求消息并等待回复来调用服务,从而建立持久性的连接。
Service Server in Python
ros2 interface show example_interfaces/srv
执行上面的指令,你能看到example_interface包里提供了哪些服务,这里我们使用AddTwoInts来演示如何创建一个service server
add_two_ints_server.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class AddTwoIntsServerNode(Node):
def __init__(self):
super().__init__("add_two_ints_server")
self.server = self.create_service(AddTwoInts, "add_two_ints", self.callback_add_two_ints)
self.get_logger().info("Add Two Ints Server has started")
def callback_add_two_ints(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(str(request.a) + " + " + str(request.b) + " = " + str(response.sum))
return response
def main(args=None):
rclpy.init(args=args)
node = AddTwoIntsServerNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
编译并运行,通过ros2 service list
就能在服务列表里看到我们启动的服务了。
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 3, b: 4}"
执行上面的指令就可以在命令行里调用我们创建的service。
Service Client in Python
add_two_ints_client.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
def main(args=None):
rclpy.init(args=args)
node = Node("add_two_ints_no_oop")
client = node.create_client(AddTwoInts, "add_two_ints")
while not client.wait_for_service(1):
node.get_logger().warn("Waiting for server Add Two Ints...")
request = AddTwoInts.Request()
request.a = 3
request.b = 8
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future)
try:
response = future.result()
node.get_logger().info(str(request.a) + " + " + str(request.b) + " = " + str(response.sum))
except Exception as e:
node.get_logger().error("Service call failed %r" % (e,))
rclpy.shutdown()
if __name__ == "__main__":
main()
使用命令行工具调试Services
ros2 service list
ros2 service type <service_name>
ros2 interface show <service type>
ros2 service call <service_name> <service_type> <value>
ros2 run <pkg_name> <node_name> --ros-args -r <service_name>:=<new_service_name>
ROS Interface
ROS应用之间有三种通信接口:messages, services和actions,ROS2使用IDL(interface definition language)来描述这些接口,使得在不同应用,不同编程语言间进行交互更加简单。例如前文提到的Topic和Service:
Topic:
- Name:话题名
- 消息定义:Msg,如example_interfaces/msg/Int64
Service:
- Name: 服务名
- 服务定义:Srv,如example_interfaces/srv/AddTwoInts (包含request和response)
创建自定义的Msg
创建一个新的工作包my_robot_interfaces,移除目录下的include和src文件夹,并新建msg文件夹:
在msg文件夹下新建msg定义文件: HardwareStatus.msg
int64 temperature
string debug_message
更新package.xml,添加:
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
更新CMakeLists.txt,添加:
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/HardwareStatus.msg"
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()
然后编译工作包,就可以在其他工程中使用该Msg定义了。
使用自定义的Msg
在my_py_pkg工作包下创建一个新的publisher:hw_status_publisher.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import HardwareStatus
class HardwareStatusPublisherNode(Node):
def __init__(self):
super().__init__("hardware_status_publisher")
self.hw_status_publisher = self.create_publisher(HardwareStatus, "hardware_status", 10)
self.timer = self.create_timer(1, self.publish_hw_status)
self.get_logger().info("Hardware Status Publisher has been started")
def publish_hw_status(self):
msg = HardwareStatus()
msg.temperature = 45
msg.debug_message = "No error"
self.hw_status_publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = HardwareStatusPublisherNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
不要忘记在package.xml中添加对my_robot_interface的依赖,并把新的节点加载到setup.py并编译。运行haredware_status_publisher并监听,可以看到如下的效果:
创建自定义的Srv
在my_robot_interfaces工作包目录下创建srv文件夹,并新建文件ComputerRectangleArea.srv
float64 length
float64 width
---
float64 area
更新CMakeLists.txt
成功编译了工作包后我们就能够获得自定义的Srv了
如有错误,欢迎留言或来信指正:hbin6358@163.com