目标:使用 Python 创建并运行服务节点和客户端节点。
教程级别:初学者
时间:20 分钟
目录
背景
先决条件
任务
1. 创建一个包
2. 编写服务节点
3. 编写客户端节点
4. 构建并运行
摘要
下一步
相关内容
背景
当节点通过服务进行通信时,发送数据请求的节点称为客户端节点,而响应请求的节点称为服务节点。请求和响应的结构由一个 .srv
文件决定。
这里使用的例子是一个简单的整数加法系统;一个节点请求两个整数的和,另一个节点则响应结果。
先决条件
在之前的教程中,您学习了如何创建工作区和创建包。
任务
1. 创建一个包
打开一个新的终端并且初始化您的 ROS 2 安装,这样 ros2
命令就会生效。
导航到在之前教程中创建的 ros2_ws
目录。
请记住,包应该在 src
目录中创建,而不是在工作区的根目录中。导航到 ros2_ws/src
并创建一个新包:
cxy@ubuntu2404-cxy:~/ros2_ws/src$ ros2 pkg create --build-type ament_python --license Apache-2.0 py_srvcli --dependencies rclpy example_interfaces
going to create a new package
package name: py_srvcli
destination directory: /home/cxy/ros2_ws/src
package format: 3
version: 0.0.0
description: TODO: Package description
maintainer: ['cxy <cxy@todo.todo>']
licenses: ['Apache-2.0']
build type: ament_python
dependencies: ['rclpy', 'example_interfaces']
creating folder ./py_srvcli
creating ./py_srvcli/package.xml
creating source folder
creating folder ./py_srvcli/py_srvcli
creating ./py_srvcli/setup.py
creating ./py_srvcli/setup.cfg
creating folder ./py_srvcli/resource
creating ./py_srvcli/resource/py_srvcli
creating ./py_srvcli/py_srvcli/__init__.py
creating folder ./py_srvcli/test
creating ./py_srvcli/test/test_copyright.py
creating ./py_srvcli/test/test_flake8.py
creating ./py_srvcli/test/test_pep257.py
您的终端将返回一条消息,确认您的包 py_srvcli
及其所有必要的文件和文件夹已创建。
`--dependencies` 参数会自动将必要的依赖行添加到 `package.xml`。`example_interfaces` 是包含 `.srv` 文件的包,您将需要它来构建您的请求和响应结构:
int64 a
int64 b
---
int64 sum
请求的前两行是参数,破折号以下的是响应。
1.1 更新 package.xml
因为在创建包时您使用了 --dependencies
选项,所以您无需手动将依赖项添加到 package.xml
中。
一如既往,不过,请确保将描述、维护者电子邮件和姓名以及许可信息添加到 package.xml
。
<description>Python client server tutorial</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache-2.0</license>
<?xml version="1.0"?>
# 声明XML版本为1.0
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
# 指定XML模式文件的位置和命名空间
<package format="3">
# 定义包的格式版本为3
<name>py_srvcli</name>
# 包的名称为'py_srvcli'
<version>0.0.0</version>
# 包的版本号为0.0.0
<description>Python client server tutorial</description>
# 包的描述信息为'Python客户端服务器教程'
<maintainer email="cxy@126.com">cxy</maintainer>
# 维护者的名字是cxy,电子邮件是cxy@126.com
<license>Apache-2.0</license>
# 包的许可证为Apache-2.0
<depend>rclpy</depend>
# 依赖项为rclpy
<depend>example_interfaces</depend>
# 依赖项为example_interfaces
<test_depend>ament_copyright</test_depend>
# 测试依赖项为ament_copyright
<test_depend>ament_flake8</test_depend>
# 测试依赖项为ament_flake8
<test_depend>ament_pep257</test_depend>
# 测试依赖项为ament_pep257
<test_depend>python3-pytest</test_depend>
# 测试依赖项为python3-pytest
<export>
# 导出部分开始
<build_type>ament_python</build_type>
# 构建类型为ament_python
</export>
# 导出部分结束
</package>
# 包定义结束
1.2 更新 setup.py
将相同的信息添加到 setup.py
文件中的 maintainer
、 maintainer_email
、 description
和 license
字段:
maintainer='Your Name',
maintainer_email='you@email.com',
description='Python client server tutorial',
license='Apache-2.0'
2. 编写服务节点
在 ros2_ws/src/py_srvcli/py_srvcli
目录中,创建一个名为 service_member_function.py
的新文件,并将以下代码粘贴其中:
from example_interfaces.srv import AddTwoInts # 从example_interfaces.srv模块导入AddTwoInts服务
import rclpy # 导入rclpy库
from rclpy.node import Node # 从rclpy.node模块导入Node类
class MinimalService(Node): # 定义一个继承自Node类的MinimalService类
def __init__(self): # 初始化方法
super().__init__('minimal_service') # 调用父类的初始化方法,并命名节点为'minimal_service'
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
# 创建一个服务,服务类型为AddTwoInts,服务名为'add_two_ints',回调函数为add_two_ints_callback
def add_two_ints_callback(self, request, response): # 定义服务的回调函数
response.sum = request.a + request.b # 将请求中的两个整数相加,并将结果赋值给响应的sum字段
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
# 记录日志,显示收到的请求中的两个整数a和b
return response # 返回响应
def main(): # 主函数
rclpy.init() # 初始化rclpy
minimal_service = MinimalService() # 创建MinimalService类的实例
rclpy.spin(minimal_service) # 保持节点运行,等待并处理请求
rclpy.shutdown() # 关闭rclpy
if __name__ == '__main__': # 如果该文件是作为主程序运行
main() # 调用主函数
检查代码 2.1
第一个 import
语句从 example_interfaces
包中导入 AddTwoInts
服务类型。接下来的 import
语句导入了 ROS 2 Python 客户端库,特别是 Node
类。
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
MinimalService
类构造函数用名称 minimal_service
初始化节点。然后,它创建一个服务并定义类型、名称和回调。
def __init__(self):
super().__init__('minimal_service')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
服务回调的定义接收请求数据,对其求和,然后将总和作为响应返回。
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
return response
最后,主类初始化 ROS 2 Python 客户端库,实例化 MinimalService
类以创建服务节点,并旋转节点以处理回调。
2.2 添加一个入口点
要允许 ros2 run
命令运行您的节点,您必须将入口点添加到 setup.py
(位于 ros2_ws/src/py_srvcli
目录中)。
在 'console_scripts':
括号之间添加以下行:
'service = py_srvcli.service_member_function:main',
3. 编写客户端节点
在 ros2_ws/src/py_srvcli/py_srvcli
目录中,创建一个名为 client_member_function.py
的新文件,并将以下代码粘贴其中:
import sys
# 导入sys模块
from example_interfaces.srv import AddTwoInts
# 从example_interfaces.srv模块中导入AddTwoInts服务
import rclpy
# 导入rclpy库
from rclpy.node import Node
# 从rclpy.node模块中导入Node类
class MinimalClientAsync(Node):
# 定义一个继承自Node类的MinimalClientAsync类
def __init__(self):
# 初始化函数
super().__init__('minimal_client_async')
# 调用父类的初始化函数,并命名节点为'minimal_client_async'
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
# 创建一个客户端,使用AddTwoInts服务,服务名为'add_two_ints'
while not self.cli.wait_for_service(timeout_sec=1.0):
# 循环等待服务可用,每次等待1秒
self.get_logger().info('service not available, waiting again...')
# 如果服务不可用,打印日志信息
self.req = AddTwoInts.Request()
# 创建一个AddTwoInts请求对象
def send_request(self, a, b):
# 定义发送请求的函数
self.req.a = a
# 设置请求的第一个参数
self.req.b = b
# 设置请求的第二个参数
return self.cli.call_async(self.req)
# 异步调用服务并返回结果
def main():
# 主函数
rclpy.init()
# 初始化rclpy库
minimal_client = MinimalClientAsync()
# 创建MinimalClientAsync对象
future = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
# 发送请求,参数为命令行输入的两个整数
rclpy.spin_until_future_complete(minimal_client, future)
# 等待请求完成
response = future.result()
# 获取请求结果
minimal_client.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(int(sys.argv[1]), int(sys.argv[2]), response.sum))
# 打印结果日志信息
minimal_client.destroy_node()
# 销毁节点
rclpy.shutdown()
# 关闭rclpy
if __name__ == '__main__':
main()
# 如果脚本是直接执行的,则调用main函数
3.1 检查代码
在服务代码中,我们首先 import
必要的库。
import sys
from example_interfaces.srv import AddTwoInts
import rclpy
from rclpy.node import Node
MinimalClientAsync
类构造函数用名称 minimal_client_async
初始化节点。构造函数定义创建了一个与服务节点类型和名称相同的客户端。类型和名称必须匹配,客户端和服务才能够通信。构造函数中的 while
循环每秒检查一次是否有与客户端类型和名称相匹配的服务。最后,它创建了一个新的 AddTwoInts
请求对象。
def __init__(self):
super().__init__('minimal_client_async')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddTwoInts.Request()
构造函数下面是 send_request
方法,它将发送请求并返回一个可以传递给 spin_until_future_complete
的未来:
def send_request(self, a, b):
self.req.a = a
self.req.b = b
return self.cli.call_async(self.req)
最后我们有 main
方法,它构建一个 MinimalClientAsync
对象,使用传入的命令行参数发送请求,调用 spin_until_future_complete
,并记录结果:
def main():
rclpy.init()
minimal_client = MinimalClientAsync()
future = minimal_client.send_request(int(sys.argv[1]), int(sys.argv[2]))
rclpy.spin_until_future_complete(minimal_client, future)
response = future.result()
minimal_client.get_logger().info(
'Result of add_two_ints: for %d + %d = %d' %
(int(sys.argv[1]), int(sys.argv[2]), response.sum))
minimal_client.destroy_node()
rclpy.shutdown()
3.2 添加一个入口点
与服务节点一样,您还需要添加一个入口点才能运行客户端节点。
您的 setup.py 文件中的 entry_points 字段应该如下所示:
entry_points={
'console_scripts': [
'service = py_srvcli.service_member_function:main',
'client = py_srvcli.client_member_function:main',
],
},
from setuptools import find_packages, setup
# 从setuptools模块中导入find_packages和setup函数
package_name = 'py_srvcli'
# 定义包名为'py_srvcli'
setup(
name=package_name,
# 设置包的名称
version='0.0.0',
# 设置包的版本号
packages=find_packages(exclude=['test']),
# 查找包,排除'test'目录
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
# 指定数据文件的位置和名称
('share/' + package_name, ['package.xml']),
# 指定package.xml文件的位置和名称
],
install_requires=['setuptools'],
# 指定安装依赖项
zip_safe=True,
# 指定包是否可以安全地作为zip文件分发
maintainer='cxy',
# 指定维护者的名字
maintainer_email='cxy@126.com',
# 指定维护者的电子邮件
description='Python client server tutorial',
# 包的描述信息
license='Apache-2.0',
# 指定包的许可证
tests_require=['pytest'],
# 指定测试依赖项
entry_points={
'console_scripts': [
'service = py_srvcli.service_member_function:main',
# 定义控制台脚本'服务',指向py_srvcli.service_member_function模块的main函数
'client = py_srvcli.client_member_function:main',
# 定义控制台脚本'客户端',指向py_srvcli.client_member_function模块的main函数
],
},
)
# 调用setup函数,传入包的配置信息
4. 构建并运行
在工作区的根目录运行 rosdep
( ros2_ws
)以检查构建前缺失的依赖项是一个好习惯:
rosdep install -i --from-path src --rosdistro jazzy -y
返回到您的工作区根目录, ros2_ws
,然后构建您的新包:
cxy@ubuntu2404-cxy:~/ros2_ws$ colcon build --packages-select py_srvcli
Starting >>> py_srvcli
Finished <<< py_srvcli [7.05s]
Summary: 1 package finished [17.1s]
打开一个新的终端,导航到 ros2_ws
,并且导入设置文件:
source install/setup.bash
现在运行服务节点:
ros2 run py_srvcli service
节点将等待客户端的请求。
打开另一个终端,并再次从 ros2_ws
内部加载设置文件。启动客户端节点,后面跟着任意两个用空格分隔的整数:
ros2 run py_srvcli client 2 3
如果您选择了 2
和 3
,例如,客户将收到这样的回应:
[INFO] [minimal_client_async]: Result of add_two_ints: for 2 + 3 = 5
返回到运行服务节点的终端。当它收到请求时,你会看到它发布了日志消息:
[INFO] [minimal_service]: Incoming request
a: 2 b: 3
在服务器终端输入 Ctrl+C
以停止节点旋转。
摘要
您创建了两个节点,通过服务请求和响应数据。您将它们的依赖项和可执行文件添加到包配置文件中,以便您可以构建并运行它们,从而让您看到服务/客户端系统的工作情况。
下一步
在过去的几个教程中,您一直在使用接口来跨主题和服务传递数据。接下来,您将学习如何创建自定义接口。https://docs.ros.org/en/jazzy/Tutorials/Beginner-Client-Libraries/Custom-ROS2-Interfaces.html
相关内容
有几种方法可以用 Python 编写服务和客户端;请查看 ros2/examples 仓库https://github.com/ros2/examples/tree/jazzy/rclpy/services 中的
minimal_client
和minimal_service
包。在本教程中,您在客户端节点中使用了
call_async()
API 来调用服务。Python 还有另一种服务调用 API,称为同步调用。我们不推荐使用同步调用,但如果您想了解更多关于它们的信息,请阅读《同步与异步客户端指南》。https://docs.ros.org/en/jazzy/How-To-Guides/Sync-Vs-Async.html