本文是ROS2基础知识的综合小应用,练习如何创建工作包,创建Node,定义Topic和Service,以及通过LaunchFile启动多个节点。基础知识可以参考:ROS2基础编程,ROS2 Topics和Services,ROS2 LaunchFile和Parameter
更多内容,访问专栏目录获取实时更新。
创建工作包
ros2 pkg create turtlesim_greedy_turtle --build-type ament_python
创建节点
touch turtle_controller.py
touch turtle_spawner.py
chmod +x turtle_controller.py
chmod +x turtle_spawner.py
添加依赖
<depend>rclpy</depend>
<depend>turtlesim</depend>
<depend>geometry_msgs</depend>
<depend>my_robot_interfaces</depend>
创建Msg
Turtle.msg
string name
float64 x
float64 y
float64 theta
TurtleArray.msg
Turtle[] turtles
创建Srv
CatchTurtle.srv
string name
---
bool success
添加节点控制Turtle
turtle_controller.py
#!/usr/bin/env python3
import rclpy
import math
from functools import partial
from rclpy.node import Node
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from my_robot_interfaces.msg import Turtle
from my_robot_interfaces.msg import TurtleArray
from my_robot_interfaces.srv import CatchTurtle
class TurtleControllerNode(Node):
def __init__(self):
super().__init__("turtle_controller")
self.pose = None
self.turtle_to_catch = None
self.pose_subscriber = self.create_subscription(Pose, "turtle1/pose", self.callback_turtle_pose, 10)
self.cmd_vel_publisher = self.create_publisher(Twist, "turtle1/cmd_vel", 10)
self.alive_turtles_subscriber = self.create_subscription(TurtleArray, "alive_turtles", self.callback_alive_turtles, 10)
self.control_loop_timer = self.create_timer(0.01, self.control_loop)
def callback_turtle_pose(self, msg):
self.pose = msg
def control_loop(self):
if self.pose == None or self.turtle_to_catch == None:
return
# calcu the distance between current position and target position
dist_x = self.turtle_to_catch.x - self.pose.x
dist_y = self.turtle_to_catch.y - self.pose.y
distance = math.sqrt(dist_x * dist_x + dist_y * dist_y)
# construct cmd_vel msg to control the turtle
msg = Twist()
if distance <= 0.5: # target reached
msg.linear.x = 0.0 # stop the turtle
msg.angular.z = 0.0
self.call_catch_turtle_server(self.turtle_to_catch.name)
self.turtle_to_catch = None
else: # move to the target position
msg.linear.x = 2 * distance # optimize
goal_theta = math.atan2(dist_y, dist_x) # orientation
diff = goal_theta - self.pose.theta
if diff > math.pi:
diff -= 2 * math.pi
elif diff < -math.pi:
diff += 2 * math.pi
msg.angular.z = 6 * diff # optimize
self.cmd_vel_publisher.publish(msg)
def callback_alive_turtles(self, msg):
if len(msg.turtles) > 0:
self.turtle_to_catch = msg.turtles[0]
def call_catch_turtle_server(self, turtle_name):
client = self.create_client(CatchTurtle, "catch_turtle")
while not client.wait_for_service(1.0):
self.get_logger().warn("Waiting for server...")
request = CatchTurtle.Request()
request.name = turtle_name
future = client.call_async(request)
future.add_done_callback(partial(self.callback_call_catch_turtle, turtle_name=turtle_name))
def callback_call_catch_turtle(self, future, turtle_name):
try:
response = future.result()
if not response.success:
self.get_logger().error("Turtle " + str(turtle_name) + " could not be caught")
except Exception as e:
self.get_logger().error("Service call failed %r" & (e,))
def main(args=None):
rclpy.init(args=args)
node = TurtleControllerNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
添加节点产生或移除Turtle
#!/usr/bin/env python3
import rclpy
import random
import math
from functools import partial
from rclpy.node import Node
from turtlesim.srv import Spawn
from turtlesim.srv import Kill
from my_robot_interfaces.msg import Turtle
from my_robot_interfaces.msg import TurtleArray
from my_robot_interfaces.srv import CatchTurtle
class TurtleSpawnerNode(Node):
def __init__(self):
super().__init__("turtle_spawner")
self.declare_parameter("spawn_frequency", 1.0)
self.declare_parameter("turtle_name_prefix", "turtle")
self.spawn_frequency = self.get_parameter("spawn_frequency").value
self.turtle_name_prefix = self.get_parameter("turtle_name_prefix").value
self.turtle_counter = 0
self.alive_turtles = []
self.spawn_turtle_timer = self.create_timer(1.0 / self.spawn_frequency, self.spawn_new_turtle)
self.alive_turtles_publisher = self.create_publisher(TurtleArray, "alive_turtles", 10)
self.catch_turtle_service = self.create_service(CatchTurtle, "catch_turtle", self.callback_catch_turtle)
def spawn_new_turtle(self):
self.turtle_counter += 1
name = self.turtle_name_prefix + "_" + str(self.turtle_counter)
x = random.uniform(0.0, 10.0)
y = random.uniform(0.0, 10.0)
theta = random.uniform(0.0, 2 * math.pi)
self.call_spawn_server(name, x, y, theta)
def call_spawn_server(self, turtle_name, x, y, theta):
client = self.create_client(Spawn, "spawn")
while not client.wait_for_service(1.0):
self.get_logger().warn("Waiting for server...")
request = Spawn.Request()
request.name = turtle_name
request.x = x
request.y = y
request.theta = theta
future = client.call_async(request)
future.add_done_callback(partial(self.callback_call_spawn, turtle_name=turtle_name, x=x, y=y, theta=theta))
def callback_call_spawn(self, future, turtle_name, x, y, theta):
try:
response = future.result()
if response.name != "":
self.get_logger().info("Turtle " + response.name + " is now alive")
new_turtle = Turtle()
new_turtle.name = turtle_name
new_turtle.x = x
new_turtle.y = y
new_turtle.theta = theta
self.alive_turtles.append(new_turtle)
self.publish_alive_turtles()
except Exception as e:
self.get_logger().error("Service call failed %r" % (e,))
def publish_alive_turtles(self):
msg = TurtleArray()
msg.turtles = self.alive_turtles
self.alive_turtles_publisher.publish(msg)
def callback_catch_turtle(self, request, response):
self.call_kill_server(request.name)
response.success = True
return response
def call_kill_server(self, turtle_name):
client = self.create_client(Kill, "kill")
while not client.wait_for_service(1.0):
self.get_logger().warn("Waiting for server...")
request = Kill.Request()
request.name = turtle_name
future = client.call_async(request)
future.add_done_callback(partial(self.callback_call_kill, turtle_name=turtle_name))
def callback_call_kill(self, future, turtle_name):
try:
future.result()
for (i, turtle) in enumerate(self.alive_turtles):
if turtle.name == turtle_name:
del self.alive_turtles[i]
self.publish_alive_turtles()
break
except Exception as e:
self.get_logger().error("Service call failed %r" & (e,))
def main(args=None):
rclpy.init(args=args)
node = TurtleSpawnerNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
创建LaunchFile
不要忘记更新my_robot_bringup工作包的package.xml添加依赖
通过LaunchFile启动
ros2 launch my_robot_bringup turtlesim_greedy_turtle.launch.py
最终的效果:
如有错误,欢迎留言或来信指正:hbin6358@163.com