目录
- 一、初始化和关闭节点
- 二、发布者
- 三、订阅者
- 四、服务端
- 五、客户端
- 六、参数管理
- 七、日志记录
- 八、生命周期管理
ROS2 在 Python 编程中引入了一些新的概念和 API,这些变化使得代码更加模块化和易于维护。特别是 rclpy
库提供了更丰富的功能和更好的错误处理机制,同时支持异步编程模型。如果你已经熟悉 ROS1 的 Python 编程,这些变化应该不会太难适应。
一、初始化和关闭节点
ROS1:
import rospy
def main():
rospy.init_node('my_node', anonymous=True)
# 节点逻辑
rospy.spin()
if __name__ == '__main__':
main()
ROS2:
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('my_node')
# 节点逻辑
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
二、发布者
ROS1:
import rospy
from std_msgs.msg import String
def talker():
pub = rospy.Publisher('chatter', String, queue_size=10)
rospy.init_node('talker', anonymous=True)
rate = rospy.Rate(10) # 10 Hz
while not rospy.is_shutdown():
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str)
pub.publish(hello_str)
rate.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
pass
ROS2:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Talker(Node):
def __init__(self):
super().__init__('talker')
self.publisher_ = self.create_publisher(String, 'chatter', 10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
msg = String()
msg.data = f'Hello World {self.get_clock().now().nanoseconds // 1000000}'
self.get_logger().info(f'Publishing: "{msg.data}"')
self.publisher_.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = Talker()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
三、订阅者
ROS1:
import rospy
from std_msgs.msg import String
def callback(data):
rospy.loginfo(rospy.get_caller_id() + " I heard %s", data.data)
def listener():
rospy.init_node('listener', anonymous=True)
rospy.Subscriber('chatter', String, callback)
rospy.spin()
if __name__ == '__main__':
listener()
ROS2:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Listener(Node):
def __init__(self):
super().__init__('listener')
self.subscription = self.create_subscription(String, 'chatter', self.listener_callback, 10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info(f'I heard: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = Listener()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
四、服务端
ROS1:
import rospy
from std_srvs.srv import AddTwoInts
def handle_add_two_ints(req):
rospy.loginfo(f"Returning [{req.a} + {req.b} = {req.a + req.b}]")
return AddTwoIntsResponse(req.a + req.b)
def add_two_ints_server():
rospy.init_node('add_two_ints_server')
s = rospy.Service('add_two_ints', AddTwoInts, handle_add_two_ints)
rospy.spin()
if __name__ == "__main__":
add_two_ints_server()
ROS2:
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class AddTwoIntsService(Node):
def __init__(self):
super().__init__('add_two_ints_server')
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
def add_two_ints_callback(self, request, response):
response.sum = request.a + request.b
self.get_logger().info(f'Returning [{request.a} + {request.b} = {response.sum}]')
return response
def main(args=None):
rclpy.init(args=args)
node = AddTwoIntsService()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
五、客户端
ROS1:
import rospy
from std_srvs.srv import AddTwoInts
def add_two_ints_client(x, y):
rospy.wait_for_service('add_two_ints')
try:
add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)
resp1 = add_two_ints(x, y)
return resp1.sum
except rospy.ServiceException as e:
print(f"Service call failed: {e}")
if __name__ == "__main__":
rospy.init_node('add_two_ints_client')
x = 1
y = 2
print(f"Requesting {x}+{y}")
print(f"{x} + {y} = {add_two_ints_client(x, y)}")
ROS2:
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts
class AddTwoIntsClient(Node):
def __init__(self):
super().__init__('add_two_ints_client')
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddTwoInts.Request()
def send_request(self, a, b):
self.req.a = a
self.req.b = b
self.future = self.cli.call_async(self.req)
def main(args=None):
rclpy.init(args=args)
node = AddTwoIntsClient()
node.send_request(16, 2)
while rclpy.ok():
rclpy.spin_once(node)
if node.future.done():
try:
response = node.future.result()
except Exception as e:
node.get_logger().info(f'Service call failed {e}')
else:
node.get_logger().info(f'Result of add_two_ints: {response.sum}')
break
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
六、参数管理
ROS1:
在ROS1中,参数管理是通过全局参数服务器来实现的。
import rospy
def main():
rospy.init_node('my_node', anonymous=True)
# 获取参数
param_value = rospy.get_param('param_name', 'default_value')
rospy.loginfo(f"Parameter value: {param_value}")
# 设置参数
rospy.set_param('param_name', 'new_value')
if __name__ == '__main__':
main()
ROS2:
在ROS2中,参数管理更加灵活,支持类型安全的参数接口和参数描述符。
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('my_node')
# 获取参数
param_value = self.get_parameter('param_name').get_parameter_value().string_value
self.get_logger().info(f"Parameter value: {param_value}")
# 设置参数
self.set_parameters([rclpy.parameter.Parameter('param_name', rclpy.Parameter.Type.STRING, 'new_value')])
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
七、日志记录
ROS1:
在ROS1中,日志记录使用 rospy
提供的函数。
import rospy
def main():
rospy.init_node('my_node', anonymous=True)
rospy.loginfo("This is an info message.")
rospy.logwarn("This is a warning message.")
rospy.logerr("This is an error message.")
rospy.logfatal("This is a fatal message.")
if __name__ == '__main__':
main()
ROS2:
在ROS2中,日志记录使用 rclpy
提供的函数。
import rclpy
from rclpy.node import Node
class MyNode(Node):
def __init__(self):
super().__init__('my_node')
self.get_logger().info("This is an info message.")
self.get_logger().warning("This is a warning message.")
self.get_logger().error("This is an error message.")
self.get_logger().fatal("This is a fatal message.")
def main(args=None):
rclpy.init(args=args)
node = MyNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
八、生命周期管理
ROS1:
ROS1没有内置的生命周期管理功能,通常需要开发者自己实现节点的生命周期管理。
ROS2:
ROS2引入了生命周期管理,允许更精细地控制节点的启动和停止过程。
import rclpy
from rclpy.lifecycle import Node, State, TransitionCallbackReturn
from rclpy.node import Node as BaseNode
class LifecycleNode(Node):
def __init__(self):
super().__init__('lifecycle_node')
def on_configure(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_configure() is called.')
return TransitionCallbackReturn.SUCCESS
def on_activate(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_activate() is called.')
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_deactivate() is called.')
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_cleanup() is called.')
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state: State) -> TransitionCallbackReturn:
self.get_logger().info('on_shutdown() is called.')
return TransitionCallbackReturn.SUCCESS
def main(args=None):
rclpy.init(args=args)
node = LifecycleNode()
executor = rclpy.executors.SingleThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
node.on_shutdown(State(id=9, label='unconfigured'))
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
欢迎大家加QQ群,一起讨论学习:894013891