本文主要介绍动力学约束下的运动规划算法中非常经典的Hybrid A*算法,大致分为三部分,第一部分是在传统A * 算法的基础上,对Hybrid A * 算法的原理、流程进行理论介绍。第二部分是详细分析 MotionPlanning运动规划库中Hybrid A * 算法的源码,进一步深入对Hybrid A * 算法的具体细节 进行理解。 第三部分是结合前面第一部分的理论和第二部分的详细源码,对Hybrid A * 算法的流程进行综合的概括总结。
另外,本文介绍的源码来源于zhm_real/MotionPlanning运动规划库,我进行了简单的修改,并 Hybrid A * 算法涉及到的源码从该运动规划库中独立摘出来,我会将其以绑定资源的形式绑定在博客中,有需要的可自行免费下载。
Hybrid A* 算法结合了传统的A*算法和连续状态空间中的运动规划方法。它旨在在具有非完整约束(如车辆的转弯半径)的车辆等机器人中进行高效的路径规划。
传统的A * 算法是在栅格地图中进行搜索,可行路径中的点,都是栅格地图的中心点,如下面的第一幅图所示,lattice planner算法是先构建一个用于搜索的lattice图,如下面的第二幅图所示,Hybrid A* 算法结合了A * 算法和lattice planner算法的思想,将栅格地图的路径搜索与lattice图结合起来,在搜索过程中选取不同的控制量预演一段轨迹,且保持在每个栅格中仅保留一个可行的状态,如下面的第三幅图所示
与传统的A * 相比,Hybrid A算法的g(n)和h(n)函数更加复杂;传统的A * 算法在拓展的时候会搜索当前节点周围所有的邻居节点,而Hybrid A算法是在不同的控制输入的驱动下演化出一系列状态点,作为其要拓展的邻居,若当前节点n找出的某个邻居m所在的栅格之前是没有被探索过的,即该栅格中没有记录状态点,则把状态点m记录在该栅格中,若当前节点n找出的某个邻居m所在的栅格中已经记录了状态点,此时需要进行判断,若当前节点n拓展出的状态点m的累计代价更小,则将该栅格中的记录的状态点更改为m,否则不更改。
如何合理的设置Hybrid A*算法的启发式函数呢?
选择1是采用二维的欧式距离作为启发式函数,如下面的a图所示;选择2是采用考虑动力学但不考虑障碍物的启发式函数,如下面的b图所示,但在障碍物较多的环境,该启发式函数效率会变得很低,如下面的c图所示,选择3是采用考虑动力学但不考虑障碍物与不考虑动力学但考虑障碍物的一个最短路径相结合的启发式函数,如下面的d图所示
Hybrid A * 算法中作者还提出了一种Analytic Expansions的思想,即在搜索树生长的过程中,在添加了一个新的节点后,会以一定的概率去直接解从该新节点到目标点的理论上的最优可行路径,如果这一段路径恰好满足动力学约束,且不与障碍物相碰,则我们成功找到了一条可行路径(由Hybrid A * 算法搜索生成的从起点到该新的拓展节点的路径+该新的扩展节点到目标点生成的理论最优路径 两段构成),提前结束规划。
但频繁的使用该方法,在障碍物较多的环境下,往往会降低效率,所以,常常按一定的频率去使用,比如每扩展20个节点,尝试一次。随着新的节点距离目标点越来越近,通过该方式获得可行路径的概率也会增加,所以这个尝试频率,可以随着新的拓展节点距离目标点距离的接近而增大。
下图给出了一些Hybrid A*算法的应用示例
Hybrid A* 算法的优点在于它在离散状态空间和连续状态空间之间进行了平衡,兼顾了路径的最短性和机器人的动力学特性。然而,这也使得算法比传统的A算法更为复杂,并且需要更多的计算资源来处理连续状态空间中的运动规划。总的来说,Hybrid A 算法是一种强大的路径规划算法,特别适用于需要考虑机器人动力学约束的情况,如自动驾驶车辆、无人机等。
zhm_real/MotionPlanning运动规划库中Hybrid A * 算法的主要程序在hybrid_astar.py中,经过上面的介绍我们知道在Hybrid A*算法运行过程中会掉传统的A * 算法以及Reeds sheep算法,所以在hybrid_astar.py中分别调用了astar.py、reeds_shepp.py、以及可视化绘图所需的draw.py。
涉及到的astar.py和reeds_shepp.py这两个文件中的源码在之前的文章中已经详细介绍过了,链接如下,请大家自行前往查看,所以这里仅放一下这两个文件的源码,方便大家使用,不再作详细介绍
zhm_real/MotionPlanning运动规划库中A*算法源码详细解读(点击可跳转)
reeds_sheep运动规划算法Python源码分析(点击可跳转)
astar.py源码如下:
import heapq
import math
import numpy as np
import matplotlib.pyplot as plt
class Node:
def __init__(self, x, y, cost, pind):
self.x = x # x position of node
self.y = y # y position of node
self.cost = cost # g cost of node
self.pind = pind # parent index of node
class Para:
def __init__(self, minx, miny, maxx, maxy, xw, yw, reso, motion):
self.minx = minx
self.miny = miny
self.maxx = maxx
self.maxy = maxy
self.xw = xw
self.yw = yw
self.reso = reso # resolution of grid world
self.motion = motion # motion set
def astar_planning(sx, sy, gx, gy, ox, oy, reso, rr):
"""
return path of A*.
:param sx: starting node x [m]
:param sy: starting node y [m]
:param gx: goal node x [m]
:param gy: goal node y [m]
:param ox: obstacles x positions [m]
:param oy: obstacles y positions [m]
:param reso: xy grid resolution
:param rr: robot radius
:return: path
"""
n_start = Node(round(sx / reso), round(sy / reso), 0.0, -1)
n_goal = Node(round(gx / reso), round(gy / reso), 0.0, -1)
ox = [x / reso for x in ox]
oy = [y / reso for y in oy]
P, obsmap = calc_parameters(ox, oy, rr, reso)
open_set, closed_set = dict(), dict()
open_set[calc_index(n_start, P)] = n_start
q_priority = []
heapq.heappush(q_priority,
(fvalue(n_start, n_goal), calc_index(n_start, P)))
while True:
if not open_set:
break
_, ind = heapq.heappop(q_priority)
n_curr = open_set[ind]
closed_set[ind] = n_curr
open_set.pop(ind)
for i in range(len(P.motion)):
node = Node(n_curr.x + P.motion[i][0],
n_curr.y + P.motion[i][1],
n_curr.cost + u_cost(P.motion[i]), ind)
if not check_node(node, P, obsmap):
continue
n_ind = calc_index(node, P)
if n_ind not in closed_set:
if n_ind in open_set:
if open_set[n_ind].cost > node.cost:
open_set[n_ind].cost = node.cost
open_set[n_ind].pind = ind
else:
open_set[n_ind] = node
heapq.heappush(q_priority,
(fvalue(node, n_goal), calc_index(node, P)))
pathx, pathy = extract_path(closed_set, n_start, n_goal, P)
return pathx, pathy
def calc_holonomic_heuristic_with_obstacle(node, ox, oy, reso, rr):
n_goal = Node(round(node.x[-1] / reso), round(node.y[-1] / reso), 0.0, -1)
ox = [x / reso for x in ox]
oy = [y / reso for y in oy]
P, obsmap = calc_parameters(ox, oy, reso, rr)
open_set, closed_set = dict(), dict()
open_set[calc_index(n_goal, P)] = n_goal
q_priority = []
heapq.heappush(q_priority, (n_goal.cost, calc_index(n_goal, P)))
while True:
if not open_set:
break
_, ind = heapq.heappop(q_priority)
n_curr = open_set[ind]
closed_set[ind] = n_curr
open_set.pop(ind)
for i in range(len(P.motion)):
node = Node(n_curr.x + P.motion[i][0],
n_curr.y + P.motion[i][1],
n_curr.cost + u_cost(P.motion[i]), ind)
if not check_node(node, P, obsmap):
continue
n_ind = calc_index(node, P)
if n_ind not in closed_set:
if n_ind in open_set:
if open_set[n_ind].cost > node.cost:
open_set[n_ind].cost = node.cost
open_set[n_ind].pind = ind
else:
open_set[n_ind] = node
heapq.heappush(q_priority, (node.cost, calc_index(node, P)))
hmap = [[np.inf for _ in range(P.yw)] for _ in range(P.xw)]
for n in closed_set.values():
hmap[n.x - P.minx][n.y - P.miny] = n.cost
return hmap
def check_node(node, P, obsmap):
if node.x <= P.minx or node.x >= P.maxx or \
node.y <= P.miny or node.y >= P.maxy:
return False
if obsmap[node.x - P.minx][node.y - P.miny]:
return False
return True
def u_cost(u):
return math.hypot(u[0], u[1])
def fvalue(node, n_goal):
return node.cost + h(node, n_goal)
def h(node, n_goal):
return math.hypot(node.x - n_goal.x, node.y - n_goal.y)
def calc_index(node, P):
return (node.y - P.miny) * P.xw + (node.x - P.minx)
def calc_parameters(ox, oy, rr, reso):
minx, miny = round(min(ox)), round(min(oy))
maxx, maxy = round(max(ox)), round(max(oy))
xw, yw = maxx - minx, maxy - miny
motion = get_motion()
P = Para(minx, miny, maxx, maxy, xw, yw, reso, motion)
obsmap = calc_obsmap(ox, oy, rr, P)
return P, obsmap
def calc_obsmap(ox, oy, rr, P):
obsmap = [[False for _ in range(P.yw)] for _ in range(P.xw)]
for x in range(P.xw):
xx = x + P.minx
for y in range(P.yw):
yy = y + P.miny
for oxx, oyy in zip(ox, oy):
if math.hypot(oxx - xx, oyy - yy) <= rr / P.reso:
obsmap[x][y] = True
break
return obsmap
def extract_path(closed_set, n_start, n_goal, P):
pathx, pathy = [n_goal.x], [n_goal.y]
n_ind = calc_index(n_goal, P)
while True:
node = closed_set[n_ind]
pathx.append(node.x)
pathy.append(node.y)
n_ind = node.pind
if node == n_start:
break
pathx = [x * P.reso for x in reversed(pathx)]
pathy = [y * P.reso for y in reversed(pathy)]
return pathx, pathy
def get_motion():
motion = [[-1, 0], [-1, 1], [0, 1], [1, 1],
[1, 0], [1, -1], [0, -1], [-1, -1]]
return motion
def get_env():
ox, oy = [], []
for i in range(60):
ox.append(i)
oy.append(0.0)
for i in range(60):
ox.append(60.0)
oy.append(i)
for i in range(61):
ox.append(i)
oy.append(60.0)
for i in range(61):
ox.append(0.0)
oy.append(i)
for i in range(40):
ox.append(20.0)
oy.append(i)
for i in range(40):
ox.append(40.0)
oy.append(60.0 - i)
return ox, oy
def main():
sx = 10.0 # [m]
sy = 10.0 # [m]
gx = 50.0 # [m]
gy = 50.0 # [m]
robot_radius = 2.0
grid_resolution = 1.0
ox, oy = get_env()
pathx, pathy = astar_planning(sx, sy, gx, gy, ox, oy, grid_resolution, robot_radius)
plt.plot(ox, oy, 'sk')
plt.plot(pathx, pathy, '-r')
plt.plot(sx, sy, 'sg')
plt.plot(gx, gy, 'sb')
plt.axis("equal")
plt.show()
if __name__ == '__main__':
main()
reeds_shepp.py源码如下:
import time
import math
import numpy as np
# parameters initiation
STEP_SIZE = 0.2
MAX_LENGTH = 1000.0
PI = math.pi
# class for PATH element
class PATH:
def __init__(self, lengths, ctypes, L, x, y, yaw, directions):
self.lengths = lengths # lengths of each part of path (+: forward, -: backward) [float]
self.ctypes = ctypes # type of each part of the path [string]
self.L = L # total path length [float]
self.x = x # final x positions [m]
self.y = y # final y positions [m]
self.yaw = yaw # final yaw angles [rad]
self.directions = directions # forward: 1, backward:-1
def calc_optimal_path(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=STEP_SIZE):
paths = calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=step_size)
minL = paths[0].L
mini = 0
for i in range(len(paths)):
if paths[i].L <= minL:
minL, mini = paths[i].L, i
return paths[mini]
def calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=STEP_SIZE):
q0 = [sx, sy, syaw]
q1 = [gx, gy, gyaw]
paths = generate_path(q0, q1, maxc)
for path in paths:
x, y, yaw, directions = \
generate_local_course(path.L, path.lengths,
path.ctypes, maxc, step_size * maxc)
# convert global coordinate
path.x = [math.cos(-q0[2]) * ix + math.sin(-q0[2]) * iy + q0[0] for (ix, iy) in zip(x, y)]
path.y = [-math.sin(-q0[2]) * ix + math.cos(-q0[2]) * iy + q0[1] for (ix, iy) in zip(x, y)]
path.yaw = [pi_2_pi(iyaw + q0[2]) for iyaw in yaw]
path.directions = directions
path.lengths = [l / maxc for l in path.lengths]
path.L = path.L / maxc
return paths
def set_path(paths, lengths, ctypes):
path = PATH([], [], 0.0, [], [], [], [])
path.ctypes = ctypes
path.lengths = lengths
# check same path exist
for path_e in paths:
if path_e.ctypes == path.ctypes:
if sum([x - y for x, y in zip(path_e.lengths, path.lengths)]) <= 0.01:
return paths # not insert path
path.L = sum([abs(i) for i in lengths])
if path.L >= MAX_LENGTH:
return paths
assert path.L >= 0.01
paths.append(path)
return paths
def LSL(x, y, phi):
u, t = R(x - math.sin(phi), y - 1.0 + math.cos(phi))
if t >= 0.0:
v = M(phi - t)
if v >= 0.0:
return True, t, u, v
return False, 0.0, 0.0, 0.0
def LSR(x, y, phi):
u1, t1 = R(x + math.sin(phi), y - 1.0 - math.cos(phi))
u1 = u1 ** 2
if u1 >= 4.0:
u = math.sqrt(u1 - 4.0)
theta = math.atan2(2.0, u)
t = M(t1 + theta)
v = M(t - phi)
if t >= 0.0 and v >= 0.0:
return True, t, u, v
return False, 0.0, 0.0, 0.0
def LRL(x, y, phi):
u1, t1 = R(x - math.sin(phi), y - 1.0 + math.cos(phi))
if u1 <= 4.0:
u = -2.0 * math.asin(0.25 * u1)
t = M(t1 + 0.5 * u + PI)
v = M(phi - t + u)
if t >= 0.0 and u <= 0.0:
return True, t, u, v
return False, 0.0, 0.0, 0.0
def SCS(x, y, phi, paths):
flag, t, u, v = SLS(x, y, phi)
if flag:
paths = set_path(paths, [t, u, v], ["S", "WB", "S"])
flag, t, u, v = SLS(x, -y, -phi)
if flag:
paths = set_path(paths, [t, u, v], ["S", "R", "S"])
return paths
def SLS(x, y, phi):
phi = M(phi)
if y > 0.0 and 0.0 < phi < PI * 0.99:
xd = -y / math.tan(phi) + x
t = xd - math.tan(phi / 2.0)
u = phi
v = math.sqrt((x - xd) ** 2 + y ** 2) - math.tan(phi / 2.0)
return True, t, u, v
elif y < 0.0 and 0.0 < phi < PI * 0.99:
xd = -y / math.tan(phi) + x
t = xd - math.tan(phi / 2.0)
u = phi
v = -math.sqrt((x - xd) ** 2 + y ** 2) - math.tan(phi / 2.0)
return True, t, u, v
return False, 0.0, 0.0, 0.0
def CSC(x, y, phi, paths):
flag, t, u, v = LSL(x, y, phi)
if flag:
paths = set_path(paths, [t, u, v], ["WB", "S", "WB"])
flag, t, u, v = LSL(-x, y, -phi)
if flag:
paths = set_path(paths, [-t, -u, -v], ["WB", "S", "WB"])
flag, t, u, v = LSL(x, -y, -phi)
if flag:
paths = set_path(paths, [t, u, v], ["R", "S", "R"])
flag, t, u, v = LSL(-x, -y, phi)
if flag:
paths = set_path(paths, [-t, -u, -v], ["R", "S", "R"])
flag, t, u, v = LSR(x, y, phi)
if flag:
paths = set_path(paths, [t, u, v], ["WB", "S", "R"])
flag, t, u, v = LSR(-x, y, -phi)
if flag:
paths = set_path(paths, [-t, -u, -v], ["WB", "S", "R"])
flag, t, u, v = LSR(x, -y, -phi)
if flag:
paths = set_path(paths, [t, u, v], ["R", "S", "WB"])
flag, t, u, v = LSR(-x, -y, phi)
if flag:
paths = set_path(paths, [-t, -u, -v], ["R", "S", "WB"])
return paths
def CCC(x, y, phi, paths):
flag, t, u, v = LRL(x, y, phi)
if flag:
paths = set_path(paths, [t, u, v], ["WB", "R", "WB"])
flag, t, u, v = LRL(-x, y, -phi)
if flag:
paths = set_path(paths, [-t, -u, -v], ["WB", "R", "WB"])
flag, t, u, v = LRL(x, -y, -phi)
if flag:
paths = set_path(paths, [t, u, v], ["R", "WB", "R"])
flag, t, u, v = LRL(-x, -y, phi)
if flag:
paths = set_path(paths, [-t, -u, -v], ["R", "WB", "R"])
# backwards
xb = x * math.cos(phi) + y * math.sin(phi)
yb = x * math.sin(phi) - y * math.cos(phi)
flag, t, u, v = LRL(xb, yb, phi)
if flag:
paths = set_path(paths, [v, u, t], ["WB", "R", "WB"])
flag, t, u, v = LRL(-xb, yb, -phi)
if flag:
paths = set_path(paths, [-v, -u, -t], ["WB", "R", "WB"])
flag, t, u, v = LRL(xb, -yb, -phi)
if flag:
paths = set_path(paths, [v, u, t], ["R", "WB", "R"])
flag, t, u, v = LRL(-xb, -yb, phi)
if flag:
paths = set_path(paths, [-v, -u, -t], ["R", "WB", "R"])
return paths
def calc_tauOmega(u, v, xi, eta, phi):
delta = M(u - v)
A = math.sin(u) - math.sin(delta)
B = math.cos(u) - math.cos(delta) - 1.0
t1 = math.atan2(eta * A - xi * B, xi * A + eta * B)
t2 = 2.0 * (math.cos(delta) - math.cos(v) - math.cos(u)) + 3.0
if t2 < 0:
tau = M(t1 + PI)
else:
tau = M(t1)
omega = M(tau - u + v - phi)
return tau, omega
def LRLRn(x, y, phi):
xi = x + math.sin(phi)
eta = y - 1.0 - math.cos(phi)
rho = 0.25 * (2.0 + math.sqrt(xi * xi + eta * eta))
if rho <= 1.0:
u = math.acos(rho)
t, v = calc_tauOmega(u, -u, xi, eta, phi)
if t >= 0.0 and v <= 0.0:
return True, t, u, v
return False, 0.0, 0.0, 0.0
def LRLRp(x, y, phi):
xi = x + math.sin(phi)
eta = y - 1.0 - math.cos(phi)
rho = (20.0 - xi * xi - eta * eta) / 16.0
if 0.0 <= rho <= 1.0:
u = -math.acos(rho)
if u >= -0.5 * PI:
t, v = calc_tauOmega(u, u, xi, eta, phi)
if t >= 0.0 and v >= 0.0:
return True, t, u, v
return False, 0.0, 0.0, 0.0
def CCCC(x, y, phi, paths):
flag, t, u, v = LRLRn(x, y, phi)
if flag:
paths = set_path(paths, [t, u, -u, v], ["WB", "R", "WB", "R"])
flag, t, u, v = LRLRn(-x, y, -phi)
if flag:
paths = set_path(paths, [-t, -u, u, -v], ["WB", "R", "WB", "R"])
flag, t, u, v = LRLRn(x, -y, -phi)
if flag:
paths = set_path(paths, [t, u, -u, v], ["R", "WB", "R", "WB"])
flag, t, u, v = LRLRn(-x, -y, phi)
if flag:
paths = set_path(paths, [-t, -u, u, -v], ["R", "WB", "R", "WB"])
flag, t, u, v = LRLRp(x, y, phi)
if flag:
paths = set_path(paths, [t, u, u, v], ["WB", "R", "WB", "R"])
flag, t, u, v = LRLRp(-x, y, -phi)
if flag:
paths = set_path(paths, [-t, -u, -u, -v], ["WB", "R", "WB", "R"])
flag, t, u, v = LRLRp(x, -y, -phi)
if flag:
paths = set_path(paths, [t, u, u, v], ["R", "WB", "R", "WB"])
flag, t, u, v = LRLRp(-x, -y, phi)
if flag:
paths = set_path(paths, [-t, -u, -u, -v], ["R", "WB", "R", "WB"])
return paths
def LRSR(x, y, phi):
xi = x + math.sin(phi)
eta = y - 1.0 - math.cos(phi)
rho, theta = R(-eta, xi)
if rho >= 2.0:
t = theta
u = 2.0 - rho
v = M(t + 0.5 * PI - phi)
if t >= 0.0 and u <= 0.0 and v <= 0.0:
return True, t, u, v
return False, 0.0, 0.0, 0.0
def LRSL(x, y, phi):
xi = x - math.sin(phi)
eta = y - 1.0 + math.cos(phi)
rho, theta = R(xi, eta)
if rho >= 2.0:
r = math.sqrt(rho * rho - 4.0)
u = 2.0 - r
t = M(theta + math.atan2(r, -2.0))
v = M(phi - 0.5 * PI - t)
if t >= 0.0 and u <= 0.0 and v <= 0.0:
return True, t, u, v
return False, 0.0, 0.0, 0.0
def CCSC(x, y, phi, paths):
flag, t, u, v = LRSL(x, y, phi)
if flag:
paths = set_path(paths, [t, -0.5 * PI, u, v], ["WB", "R", "S", "WB"])
flag, t, u, v = LRSL(-x, y, -phi)
if flag:
paths = set_path(paths, [-t, 0.5 * PI, -u, -v], ["WB", "R", "S", "WB"])
flag, t, u, v = LRSL(x, -y, -phi)
if flag:
paths = set_path(paths, [t, -0.5 * PI, u, v], ["R", "WB", "S", "R"])
flag, t, u, v = LRSL(-x, -y, phi)
if flag:
paths = set_path(paths, [-t, 0.5 * PI, -u, -v], ["R", "WB", "S", "R"])
flag, t, u, v = LRSR(x, y, phi)
if flag:
paths = set_path(paths, [t, -0.5 * PI, u, v], ["WB", "R", "S", "R"])
flag, t, u, v = LRSR(-x, y, -phi)
if flag:
paths = set_path(paths, [-t, 0.5 * PI, -u, -v], ["WB", "R", "S", "R"])
flag, t, u, v = LRSR(x, -y, -phi)
if flag:
paths = set_path(paths, [t, -0.5 * PI, u, v], ["R", "WB", "S", "WB"])
flag, t, u, v = LRSR(-x, -y, phi)
if flag:
paths = set_path(paths, [-t, 0.5 * PI, -u, -v], ["R", "WB", "S", "WB"])
# backwards
xb = x * math.cos(phi) + y * math.sin(phi)
yb = x * math.sin(phi) - y * math.cos(phi)
flag, t, u, v = LRSL(xb, yb, phi)
if flag:
paths = set_path(paths, [v, u, -0.5 * PI, t], ["WB", "S", "R", "WB"])
flag, t, u, v = LRSL(-xb, yb, -phi)
if flag:
paths = set_path(paths, [-v, -u, 0.5 * PI, -t], ["WB", "S", "R", "WB"])
flag, t, u, v = LRSL(xb, -yb, -phi)
if flag:
paths = set_path(paths, [v, u, -0.5 * PI, t], ["R", "S", "WB", "R"])
flag, t, u, v = LRSL(-xb, -yb, phi)
if flag:
paths = set_path(paths, [-v, -u, 0.5 * PI, -t], ["R", "S", "WB", "R"])
flag, t, u, v = LRSR(xb, yb, phi)
if flag:
paths = set_path(paths, [v, u, -0.5 * PI, t], ["R", "S", "R", "WB"])
flag, t, u, v = LRSR(-xb, yb, -phi)
if flag:
paths = set_path(paths, [-v, -u, 0.5 * PI, -t], ["R", "S", "R", "WB"])
flag, t, u, v = LRSR(xb, -yb, -phi)
if flag:
paths = set_path(paths, [v, u, -0.5 * PI, t], ["WB", "S", "WB", "R"])
flag, t, u, v = LRSR(-xb, -yb, phi)
if flag:
paths = set_path(paths, [-v, -u, 0.5 * PI, -t], ["WB", "S", "WB", "R"])
return paths
def LRSLR(x, y, phi):
# formula 8.11 *** TYPO IN PAPER ***
xi = x + math.sin(phi)
eta = y - 1.0 - math.cos(phi)
rho, theta = R(xi, eta)
if rho >= 2.0:
u = 4.0 - math.sqrt(rho * rho - 4.0)
if u <= 0.0:
t = M(math.atan2((4.0 - u) * xi - 2.0 * eta, -2.0 * xi + (u - 4.0) * eta))
v = M(t - phi)
if t >= 0.0 and v >= 0.0:
return True, t, u, v
return False, 0.0, 0.0, 0.0
def CCSCC(x, y, phi, paths):
flag, t, u, v = LRSLR(x, y, phi)
if flag:
paths = set_path(paths, [t, -0.5 * PI, u, -0.5 * PI, v], ["WB", "R", "S", "WB", "R"])
flag, t, u, v = LRSLR(-x, y, -phi)
if flag:
paths = set_path(paths, [-t, 0.5 * PI, -u, 0.5 * PI, -v], ["WB", "R", "S", "WB", "R"])
flag, t, u, v = LRSLR(x, -y, -phi)
if flag:
paths = set_path(paths, [t, -0.5 * PI, u, -0.5 * PI, v], ["R", "WB", "S", "R", "WB"])
flag, t, u, v = LRSLR(-x, -y, phi)
if flag:
paths = set_path(paths, [-t, 0.5 * PI, -u, 0.5 * PI, -v], ["R", "WB", "S", "R", "WB"])
return paths
def generate_local_course(L, lengths, mode, maxc, step_size):
point_num = int(L / step_size) + len(lengths) + 3
px = [0.0 for _ in range(point_num)]
py = [0.0 for _ in range(point_num)]
pyaw = [0.0 for _ in range(point_num)]
directions = [0 for _ in range(point_num)]
ind = 1
if lengths[0] > 0.0:
directions[0] = 1
else:
directions[0] = -1
if lengths[0] > 0.0:
d = step_size
else:
d = -step_size
ll = 0.0
for m, l, i in zip(mode, lengths, range(len(mode))):
if l > 0.0:
d = step_size
else:
d = -step_size
ox, oy, oyaw = px[ind], py[ind], pyaw[ind]
ind -= 1
if i >= 1 and (lengths[i - 1] * lengths[i]) > 0:
pd = -d - ll
else:
pd = d - ll
while abs(pd) <= abs(l):
ind += 1
px, py, pyaw, directions = \
interpolate(ind, pd, m, maxc, ox, oy, oyaw, px, py, pyaw, directions)
pd += d
ll = l - pd - d # calc remain length
ind += 1
px, py, pyaw, directions = \
interpolate(ind, l, m, maxc, ox, oy, oyaw, px, py, pyaw, directions)
if len(px) <= 1:
return [], [], [], []
# remove unused data
while len(px) >= 1 and px[-1] == 0.0:
px.pop()
py.pop()
pyaw.pop()
directions.pop()
return px, py, pyaw, directions
def interpolate(ind, l, m, maxc, ox, oy, oyaw, px, py, pyaw, directions):
if m == "S":
px[ind] = ox + l / maxc * math.cos(oyaw)
py[ind] = oy + l / maxc * math.sin(oyaw)
pyaw[ind] = oyaw
else:
ldx = math.sin(l) / maxc
if m == "WB":
ldy = (1.0 - math.cos(l)) / maxc
elif m == "R":
ldy = (1.0 - math.cos(l)) / (-maxc)
gdx = math.cos(-oyaw) * ldx + math.sin(-oyaw) * ldy
gdy = -math.sin(-oyaw) * ldx + math.cos(-oyaw) * ldy
px[ind] = ox + gdx
py[ind] = oy + gdy
if m == "WB":
pyaw[ind] = oyaw + l
elif m == "R":
pyaw[ind] = oyaw - l
if l > 0.0:
directions[ind] = 1
else:
directions[ind] = -1
return px, py, pyaw, directions
def generate_path(q0, q1, maxc):
dx = q1[0] - q0[0]
dy = q1[1] - q0[1]
dth = q1[2] - q0[2]
c = math.cos(q0[2])
s = math.sin(q0[2])
x = (c * dx + s * dy) * maxc
y = (-s * dx + c * dy) * maxc
paths = []
paths = SCS(x, y, dth, paths)
paths = CSC(x, y, dth, paths)
paths = CCC(x, y, dth, paths)
paths = CCCC(x, y, dth, paths)
paths = CCSC(x, y, dth, paths)
paths = CCSCC(x, y, dth, paths)
return paths
# utils
def pi_2_pi(theta):
while theta > PI:
theta -= 2.0 * PI
while theta < -PI:
theta += 2.0 * PI
return theta
def R(x, y):
"""
Return the polar coordinates (r, theta) of the point (x, y)
"""
r = math.hypot(x, y)
theta = math.atan2(y, x)
return r, theta
def M(theta):
"""
Regulate theta to -pi <= theta < pi
"""
phi = theta % (2.0 * PI)
if phi < -PI:
phi += 2.0 * PI
if phi > PI:
phi -= 2.0 * PI
return phi
def get_label(path):
label = ""
for m, l in zip(path.ctypes, path.lengths):
label = label + m
if l > 0.0:
label = label + "+"
else:
label = label + "-"
return label
def calc_curvature(x, y, yaw, directions):
c, ds = [], []
for i in range(1, len(x) - 1):
dxn = x[i] - x[i - 1]
dxp = x[i + 1] - x[i]
dyn = y[i] - y[i - 1]
dyp = y[i + 1] - y[i]
dn = math.hypot(dxn, dyn)
dp = math.hypot(dxp, dyp)
dx = 1.0 / (dn + dp) * (dp / dn * dxn + dn / dp * dxp)
ddx = 2.0 / (dn + dp) * (dxp / dp - dxn / dn)
dy = 1.0 / (dn + dp) * (dp / dn * dyn + dn / dp * dyp)
ddy = 2.0 / (dn + dp) * (dyp / dp - dyn / dn)
curvature = (ddy * dx - ddx * dy) / (dx ** 2 + dy ** 2)
d = (dn + dp) / 2.0
if np.isnan(curvature):
curvature = 0.0
if directions[i] <= 0.0:
curvature = -curvature
if len(c) == 0:
ds.append(d)
c.append(curvature)
ds.append(d)
c.append(curvature)
ds.append(ds[-1])
c.append(c[-1])
return c, ds
def check_path(sx, sy, syaw, gx, gy, gyaw, maxc):
paths = calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc)
assert len(paths) >= 1
for path in paths:
assert abs(path.x[0] - sx) <= 0.01
assert abs(path.y[0] - sy) <= 0.01
assert abs(path.yaw[0] - syaw) <= 0.01
assert abs(path.x[-1] - gx) <= 0.01
assert abs(path.y[-1] - gy) <= 0.01
assert abs(path.yaw[-1] - gyaw) <= 0.01
# course distance check
d = [math.hypot(dx, dy)
for dx, dy in zip(np.diff(path.x[0:len(path.x) - 1]),
np.diff(path.y[0:len(path.y) - 1]))]
for i in range(len(d)):
assert abs(d[i] - STEP_SIZE) <= 0.001
def main():
start_x = 3.0 # [m]
start_y = 10.0 # [m]
start_yaw = np.deg2rad(40.0) # [rad]
end_x = 0.0 # [m]
end_y = 1.0 # [m]
end_yaw = np.deg2rad(0.0) # [rad]
max_curvature = 0.1
t0 = time.time()
for i in range(1000):
_ = calc_optimal_path(start_x, start_y, start_yaw, end_x, end_y, end_yaw, max_curvature)
t1 = time.time()
print(t1 - t0)
if __name__ == '__main__':
main()
涉及到的draw.py是关于可视化,绘图的程序,与算法本身无关,这里也不展开详细的介绍,仅放一下draw.py的源码
draw.py源码:
import matplotlib.pyplot as plt
import numpy as np
import math
PI = np.pi
class Arrow:
def __init__(self, x, y, theta, L, c):
angle = np.deg2rad(30)
d = 0.3 * L
w = 2
x_start = x
y_start = y
x_end = x + L * np.cos(theta)
y_end = y + L * np.sin(theta)
theta_hat_L = theta + PI - angle
theta_hat_R = theta + PI + angle
x_hat_start = x_end
x_hat_end_L = x_hat_start + d * np.cos(theta_hat_L)
x_hat_end_R = x_hat_start + d * np.cos(theta_hat_R)
y_hat_start = y_end
y_hat_end_L = y_hat_start + d * np.sin(theta_hat_L)
y_hat_end_R = y_hat_start + d * np.sin(theta_hat_R)
plt.plot([x_start, x_end], [y_start, y_end], color=c, linewidth=w)
plt.plot([x_hat_start, x_hat_end_L],
[y_hat_start, y_hat_end_L], color=c, linewidth=w)
plt.plot([x_hat_start, x_hat_end_R],
[y_hat_start, y_hat_end_R], color=c, linewidth=w)
class Car:
def __init__(self, x, y, yaw, w, L):
theta_B = PI + yaw
xB = x + L / 4 * np.cos(theta_B)
yB = y + L / 4 * np.sin(theta_B)
theta_BL = theta_B + PI / 2
theta_BR = theta_B - PI / 2
x_BL = xB + w / 2 * np.cos(theta_BL) # Bottom-Left vertex
y_BL = yB + w / 2 * np.sin(theta_BL)
x_BR = xB + w / 2 * np.cos(theta_BR) # Bottom-Right vertex
y_BR = yB + w / 2 * np.sin(theta_BR)
x_FL = x_BL + L * np.cos(yaw) # Front-Left vertex
y_FL = y_BL + L * np.sin(yaw)
x_FR = x_BR + L * np.cos(yaw) # Front-Right vertex
y_FR = y_BR + L * np.sin(yaw)
plt.plot([x_BL, x_BR, x_FR, x_FL, x_BL],
[y_BL, y_BR, y_FR, y_FL, y_BL],
linewidth=1, color='black')
Arrow(x, y, yaw, L / 2, 'black')
# plt.axis("equal")
# plt.show()
def draw_car(x, y, yaw, steer, C, color='black'):
car = np.array([[-C.RB, -C.RB, C.RF, C.RF, -C.RB],
[C.W / 2, -C.W / 2, -C.W / 2, C.W / 2, C.W / 2]])
wheel = np.array([[-C.TR, -C.TR, C.TR, C.TR, -C.TR],
[C.TW / 4, -C.TW / 4, -C.TW / 4, C.TW / 4, C.TW / 4]])
rlWheel = wheel.copy()
rrWheel = wheel.copy()
frWheel = wheel.copy()
flWheel = wheel.copy()
Rot1 = np.array([[math.cos(yaw), -math.sin(yaw)],
[math.sin(yaw), math.cos(yaw)]])
Rot2 = np.array([[math.cos(steer), math.sin(steer)],
[-math.sin(steer), math.cos(steer)]])
frWheel = np.dot(Rot2, frWheel)
flWheel = np.dot(Rot2, flWheel)
frWheel += np.array([[C.WB], [-C.WD / 2]])
flWheel += np.array([[C.WB], [C.WD / 2]])
rrWheel[1, :] -= C.WD / 2
rlWheel[1, :] += C.WD / 2
frWheel = np.dot(Rot1, frWheel)
flWheel = np.dot(Rot1, flWheel)
rrWheel = np.dot(Rot1, rrWheel)
rlWheel = np.dot(Rot1, rlWheel)
car = np.dot(Rot1, car)
frWheel += np.array([[x], [y]])
flWheel += np.array([[x], [y]])
rrWheel += np.array([[x], [y]])
rlWheel += np.array([[x], [y]])
car += np.array([[x], [y]])
plt.plot(car[0, :], car[1, :], color)
plt.plot(frWheel[0, :], frWheel[1, :], color)
plt.plot(rrWheel[0, :], rrWheel[1, :], color)
plt.plot(flWheel[0, :], flWheel[1, :], color)
plt.plot(rlWheel[0, :], rlWheel[1, :], color)
Arrow(x, y, yaw, C.WB * 0.8, color)
if __name__ == '__main__':
# Arrow(-1, 2, 60)
Car(0, 0, 1, 2, 60)
那么接下来就正式开始hybrid_astar.py文件中Hybrid A * 算法程序的详细介绍,同样,我们先放一下hybrid_astar.py文件完整的源码,再展开详细的介绍
hybrid_astar.py源码如下
"""
Hybrid A*
@author: Huiming Zhou
"""
import os
import sys
import math
import heapq
from heapdict import heapdict
import time
import numpy as np
import matplotlib.pyplot as plt
import scipy.spatial.kdtree as kd
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../HybridAstarPlanner/")
import astar as astar
import draw as draw
import reeds_shepp as rs
class C: # Parameter config
PI = math.pi
XY_RESO = 2.0 # [m]
YAW_RESO = np.deg2rad(15.0) # [rad]
MOVE_STEP = 0.4 # [m] path interporate resolution
N_STEER = 20.0 # steer command number
COLLISION_CHECK_STEP = 5 # skip number for collision check
EXTEND_BOUND = 1 # collision check range extended
GEAR_COST = 100.0 # switch back penalty cost
BACKWARD_COST = 5.0 # backward penalty cost
STEER_CHANGE_COST = 5.0 # steer angle change penalty cost
STEER_ANGLE_COST = 1.0 # steer angle penalty cost
H_COST = 15.0 # Heuristic cost penalty cost
RF = 4.5 # [m] distance from rear to vehicle front end of vehicle
RB = 1.0 # [m] distance from rear to vehicle back end of vehicle
W = 3.0 # [m] width of vehicle
WD = 0.7 * W # [m] distance between left-right wheels
WB = 3.5 # [m] Wheel base
TR = 0.5 # [m] Tyre radius
TW = 1 # [m] Tyre width
MAX_STEER = 0.6 # [rad] maximum steering angle
class Node:
def __init__(self, xind, yind, yawind, direction, x, y,
yaw, directions, steer, cost, pind):
self.xind = xind
self.yind = yind
self.yawind = yawind
self.direction = direction
self.x = x
self.y = y
self.yaw = yaw
self.directions = directions
self.steer = steer
self.cost = cost
self.pind = pind
class Para:
def __init__(self, minx, miny, minyaw, maxx, maxy, maxyaw,
xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree):
self.minx = minx
self.miny = miny
self.minyaw = minyaw
self.maxx = maxx
self.maxy = maxy
self.maxyaw = maxyaw
self.xw = xw
self.yw = yw
self.yaww = yaww
self.xyreso = xyreso
self.yawreso = yawreso
self.ox = ox
self.oy = oy
self.kdtree = kdtree
class Path:
def __init__(self, x, y, yaw, direction, cost):
self.x = x
self.y = y
self.yaw = yaw
self.direction = direction
self.cost = cost
class QueuePrior:
def __init__(self):
self.queue = heapdict()
def empty(self):
return len(self.queue) == 0 # if Q is empty
def put(self, item, priority):
self.queue[item] = priority # push
def get(self):
return self.queue.popitem()[0] # pop out element with smallest priority
def hybrid_astar_planning(sx, sy, syaw, gx, gy, gyaw, ox, oy, xyreso, yawreso):
sxr, syr = round(sx / xyreso), round(sy / xyreso)
gxr, gyr = round(gx / xyreso), round(gy / xyreso)
syawr = round(rs.pi_2_pi(syaw) / yawreso)
gyawr = round(rs.pi_2_pi(gyaw) / yawreso)
nstart = Node(sxr, syr, syawr, 1, [sx], [sy], [syaw], [1], 0.0, 0.0, -1)
ngoal = Node(gxr, gyr, gyawr, 1, [gx], [gy], [gyaw], [1], 0.0, 0.0, -1)
kdtree = kd.KDTree([[x, y] for x, y in zip(ox, oy)])
P = calc_parameters(ox, oy, xyreso, yawreso, kdtree)
hmap = astar.calc_holonomic_heuristic_with_obstacle(ngoal, P.ox, P.oy, P.xyreso, 1.0)
steer_set, direc_set = calc_motion_set()
open_set, closed_set = {calc_index(nstart, P): nstart}, {}
qp = QueuePrior()
qp.put(calc_index(nstart, P), calc_hybrid_cost(nstart, hmap, P))
while True:
if not open_set:
return None
ind = qp.get()
n_curr = open_set[ind]
closed_set[ind] = n_curr
open_set.pop(ind)
update, fpath = update_node_with_analystic_expantion(n_curr, ngoal, P)
if update:
fnode = fpath
break
for i in range(len(steer_set)):
node = calc_next_node(n_curr, ind, steer_set[i], direc_set[i], P)
if not node:
continue
node_ind = calc_index(node, P)
if node_ind in closed_set:
continue
if node_ind not in open_set:
open_set[node_ind] = node
qp.put(node_ind, calc_hybrid_cost(node, hmap, P))
else:
if open_set[node_ind].cost > node.cost:
open_set[node_ind] = node
qp.put(node_ind, calc_hybrid_cost(node, hmap, P))
return extract_path(closed_set, fnode, nstart)
def extract_path(closed, ngoal, nstart):
rx, ry, ryaw, direc = [], [], [], []
cost = 0.0
node = ngoal
while True:
rx += node.x[::-1]
ry += node.y[::-1]
ryaw += node.yaw[::-1]
direc += node.directions[::-1]
cost += node.cost
if is_same_grid(node, nstart):
break
node = closed[node.pind]
rx = rx[::-1]
ry = ry[::-1]
ryaw = ryaw[::-1]
direc = direc[::-1]
direc[0] = direc[1]
path = Path(rx, ry, ryaw, direc, cost)
return path
def calc_next_node(n_curr, c_id, u, d, P):
step = C.XY_RESO * 2
nlist = math.ceil(step / C.MOVE_STEP)
xlist = [n_curr.x[-1] + d * C.MOVE_STEP * math.cos(n_curr.yaw[-1])]
ylist = [n_curr.y[-1] + d * C.MOVE_STEP * math.sin(n_curr.yaw[-1])]
yawlist = [rs.pi_2_pi(n_curr.yaw[-1] + d * C.MOVE_STEP / C.WB * math.tan(u))]
for i in range(nlist - 1):
xlist.append(xlist[i] + d * C.MOVE_STEP * math.cos(yawlist[i]))
ylist.append(ylist[i] + d * C.MOVE_STEP * math.sin(yawlist[i]))
yawlist.append(rs.pi_2_pi(yawlist[i] + d * C.MOVE_STEP / C.WB * math.tan(u)))
xind = round(xlist[-1] / P.xyreso)
yind = round(ylist[-1] / P.xyreso)
yawind = round(yawlist[-1] / P.yawreso)
if not is_index_ok(xind, yind, xlist, ylist, yawlist, P):
return None
cost = 0.0
if d > 0:
direction = 1
cost += abs(step)
else:
direction = -1
cost += abs(step) * C.BACKWARD_COST
if direction != n_curr.direction: # switch back penalty
cost += C.GEAR_COST
cost += C.STEER_ANGLE_COST * abs(u) # steer angle penalyty
cost += C.STEER_CHANGE_COST * abs(n_curr.steer - u) # steer change penalty
cost = n_curr.cost + cost
directions = [direction for _ in range(len(xlist))]
node = Node(xind, yind, yawind, direction, xlist, ylist,
yawlist, directions, u, cost, c_id)
return node
def is_index_ok(xind, yind, xlist, ylist, yawlist, P):
if xind <= P.minx or \
xind >= P.maxx or \
yind <= P.miny or \
yind >= P.maxy:
return False
ind = range(0, len(xlist), C.COLLISION_CHECK_STEP)
nodex = [xlist[k] for k in ind]
nodey = [ylist[k] for k in ind]
nodeyaw = [yawlist[k] for k in ind]
if is_collision(nodex, nodey, nodeyaw, P):
return False
return True
def update_node_with_analystic_expantion(n_curr, ngoal, P):
path = analystic_expantion(n_curr, ngoal, P) # rs path: n -> ngoal
if not path:
return False, None
fx = path.x[1:-1]
fy = path.y[1:-1]
fyaw = path.yaw[1:-1]
fd = path.directions[1:-1]
fcost = n_curr.cost + calc_rs_path_cost(path)
fpind = calc_index(n_curr, P)
fsteer = 0.0
fpath = Node(n_curr.xind, n_curr.yind, n_curr.yawind, n_curr.direction,
fx, fy, fyaw, fd, fsteer, fcost, fpind)
return True, fpath
def analystic_expantion(node, ngoal, P):
sx, sy, syaw = node.x[-1], node.y[-1], node.yaw[-1]
gx, gy, gyaw = ngoal.x[-1], ngoal.y[-1], ngoal.yaw[-1]
maxc = math.tan(C.MAX_STEER) / C.WB
paths = rs.calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=C.MOVE_STEP)
if not paths:
return None
pq = QueuePrior()
for path in paths:
pq.put(path, calc_rs_path_cost(path))
while not pq.empty():
path = pq.get()
ind = range(0, len(path.x), C.COLLISION_CHECK_STEP)
pathx = [path.x[k] for k in ind]
pathy = [path.y[k] for k in ind]
pathyaw = [path.yaw[k] for k in ind]
if not is_collision(pathx, pathy, pathyaw, P):
return path
return None
def is_collision(x, y, yaw, P):
for ix, iy, iyaw in zip(x, y, yaw):
d = 1
dl = (C.RF - C.RB) / 2.0
r = (C.RF + C.RB) / 2.0 + d
cx = ix + dl * math.cos(iyaw)
cy = iy + dl * math.sin(iyaw)
ids = P.kdtree.query_ball_point([cx, cy], r)
if not ids:
continue
for i in ids:
xo = P.ox[i] - cx
yo = P.oy[i] - cy
dx = xo * math.cos(iyaw) + yo * math.sin(iyaw)
dy = -xo * math.sin(iyaw) + yo * math.cos(iyaw)
if abs(dx) < r and abs(dy) < C.W / 2 + d:
return True
return False
def calc_rs_path_cost(rspath):
cost = 0.0
for lr in rspath.lengths:
if lr >= 0:
cost += 1
else:
cost += abs(lr) * C.BACKWARD_COST
for i in range(len(rspath.lengths) - 1):
if rspath.lengths[i] * rspath.lengths[i + 1] < 0.0:
cost += C.GEAR_COST
for ctype in rspath.ctypes:
if ctype != "S":
cost += C.STEER_ANGLE_COST * abs(C.MAX_STEER)
nctypes = len(rspath.ctypes)
ulist = [0.0 for _ in range(nctypes)]
for i in range(nctypes):
if rspath.ctypes[i] == "R":
ulist[i] = -C.MAX_STEER
elif rspath.ctypes[i] == "WB":
ulist[i] = C.MAX_STEER
for i in range(nctypes - 1):
cost += C.STEER_CHANGE_COST * abs(ulist[i + 1] - ulist[i])
return cost
def calc_hybrid_cost(node, hmap, P):
cost = node.cost + \
C.H_COST * hmap[node.xind - P.minx][node.yind - P.miny]
return cost
def calc_motion_set():
s = np.arange(C.MAX_STEER / C.N_STEER,
C.MAX_STEER, C.MAX_STEER / C.N_STEER)
steer = list(s) + [0.0] + list(-s)
direc = [1.0 for _ in range(len(steer))] + [-1.0 for _ in range(len(steer))]
steer = steer + steer
return steer, direc
def is_same_grid(node1, node2):
if node1.xind != node2.xind or \
node1.yind != node2.yind or \
node1.yawind != node2.yawind:
return False
return True
def calc_index(node, P):
ind = (node.yawind - P.minyaw) * P.xw * P.yw + \
(node.yind - P.miny) * P.xw + \
(node.xind - P.minx)
return ind
def calc_parameters(ox, oy, xyreso, yawreso, kdtree):
minx = round(min(ox) / xyreso)
miny = round(min(oy) / xyreso)
maxx = round(max(ox) / xyreso)
maxy = round(max(oy) / xyreso)
xw, yw = maxx - minx, maxy - miny
minyaw = round(-C.PI / yawreso) - 1
maxyaw = round(C.PI / yawreso)
yaww = maxyaw - minyaw
return Para(minx, miny, minyaw, maxx, maxy, maxyaw,
xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree)
def draw_car(x, y, yaw, steer, color='black'):
car = np.array([[-C.RB, -C.RB, C.RF, C.RF, -C.RB],
[C.W / 2, -C.W / 2, -C.W / 2, C.W / 2, C.W / 2]])
wheel = np.array([[-C.TR, -C.TR, C.TR, C.TR, -C.TR],
[C.TW / 4, -C.TW / 4, -C.TW / 4, C.TW / 4, C.TW / 4]])
rlWheel = wheel.copy()
rrWheel = wheel.copy()
frWheel = wheel.copy()
flWheel = wheel.copy()
Rot1 = np.array([[math.cos(yaw), -math.sin(yaw)],
[math.sin(yaw), math.cos(yaw)]])
Rot2 = np.array([[math.cos(steer), math.sin(steer)],
[-math.sin(steer), math.cos(steer)]])
frWheel = np.dot(Rot2, frWheel)
flWheel = np.dot(Rot2, flWheel)
frWheel += np.array([[C.WB], [-C.WD / 2]])
flWheel += np.array([[C.WB], [C.WD / 2]])
rrWheel[1, :] -= C.WD / 2
rlWheel[1, :] += C.WD / 2
frWheel = np.dot(Rot1, frWheel)
flWheel = np.dot(Rot1, flWheel)
rrWheel = np.dot(Rot1, rrWheel)
rlWheel = np.dot(Rot1, rlWheel)
car = np.dot(Rot1, car)
frWheel += np.array([[x], [y]])
flWheel += np.array([[x], [y]])
rrWheel += np.array([[x], [y]])
rlWheel += np.array([[x], [y]])
car += np.array([[x], [y]])
plt.plot(car[0, :], car[1, :], color)
plt.plot(frWheel[0, :], frWheel[1, :], color)
plt.plot(rrWheel[0, :], rrWheel[1, :], color)
plt.plot(flWheel[0, :], flWheel[1, :], color)
plt.plot(rlWheel[0, :], rlWheel[1, :], color)
draw.Arrow(x, y, yaw, C.WB * 0.8, color)
def design_obstacles(x, y):
ox, oy = [], []
for i in range(x):
ox.append(i)
oy.append(0)
for i in range(x):
ox.append(i)
oy.append(y - 1)
for i in range(y):
ox.append(0)
oy.append(i)
for i in range(y):
ox.append(x - 1)
oy.append(i)
for i in range(10, 21):
ox.append(i)
oy.append(15)
for i in range(15):
ox.append(20)
oy.append(i)
for i in range(15, 30):
ox.append(30)
oy.append(i)
for i in range(16):
ox.append(40)
oy.append(i)
return ox, oy
def main():
print("start!")
x, y = 51, 31
sx, sy, syaw0 = 10.0, 7.0, np.deg2rad(120.0)
gx, gy, gyaw0 = 45.0, 20.0, np.deg2rad(90.0)
ox, oy = design_obstacles(x, y)
t0 = time.time()
path = hybrid_astar_planning(sx, sy, syaw0, gx, gy, gyaw0,
ox, oy, C.XY_RESO, C.YAW_RESO)
t1 = time.time()
print("running T: ", t1 - t0)
if not path:
print("Searching failed!")
return
x = path.x
y = path.y
yaw = path.yaw
direction = path.direction
for k in range(len(x)):
plt.cla()
plt.plot(ox, oy, "sk")
plt.plot(x, y, linewidth=1.5, color='r')
if k < len(x) - 2:
dy = (yaw[k + 1] - yaw[k]) / C.MOVE_STEP
steer = rs.pi_2_pi(math.atan(-C.WB * dy / direction[k]))
else:
steer = 0.0
draw_car(gx, gy, gyaw0, 0.0, 'dimgray')
draw_car(x[k], y[k], yaw[k], steer)
plt.title("Hybrid A*")
plt.axis("equal")
plt.pause(0.0001)
plt.show()
print("Done!")
if __name__ == '__main__':
main()
1. 导入库和模块:
import os
import sys
import math
import heapq
from heapdict import heapdict
import time
import numpy as np
import matplotlib.pyplot as plt
import scipy.spatial.kdtree as kd
sys.path.append(os.path.dirname(os.path.abspath(__file__)) +
"/../../HybridAstarPlanner/")
import astar as astar
import draw as draw
import reeds_shepp as rs
这部分代码导入了所需的库和模块,其中heapq
和heapdict
用于构建优先级队列,numpy
和matplotlib
用于数值计算和绘图,scipy.spatial.kdtree
用于创建K-D树来进行碰撞检测。
2. 定义参数配置类 C
:
class C: # Parameter config
PI = math.pi
XY_RESO = 2.0 # [m]
YAW_RESO = np.deg2rad(15.0) # [rad]
MOVE_STEP = 0.4 # [m] path interporate resolution
N_STEER = 20.0 # steer command number
COLLISION_CHECK_STEP = 5 # skip number for collision check
EXTEND_BOUND = 1 # collision check range extended
GEAR_COST = 100.0 # switch back penalty cost
BACKWARD_COST = 5.0 # backward penalty cost
STEER_CHANGE_COST = 5.0 # steer angle change penalty cost
STEER_ANGLE_COST = 1.0 # steer angle penalty cost
H_COST = 15.0 # Heuristic cost penalty cost
RF = 4.5 # [m] distance from rear to vehicle front end of vehicle
RB = 1.0 # [m] distance from rear to vehicle back end of vehicle
W = 3.0 # [m] width of vehicle
WD = 0.7 * W # [m] distance between left-right wheels
WB = 3.5 # [m] Wheel base
TR = 0.5 # [m] Tyre radius
TW = 1 # [m] Tyre width
MAX_STEER = 0.6 # [rad] maximum steering angle
这段程序定义了一个名为 C
的参数配置类,用于存储路径规划中使用的各种参数。下面逐个解释每个参数的含义:
①. PI = math.pi
:圆周率π。
②. XY_RESO = 2.0
:路径规划中使用的坐标分辨率,表示每个坐标点在X和Y方向上的间隔。
③. YAW_RESO = np.deg2rad(15.0)
:路径规划中使用的角度分辨率,表示每个角度的间隔。这里将角度转换为弧度。
④. MOVE_STEP = 0.4
:路径插值分辨率,表示在路径中每个节点之间的距离。
⑤. N_STEER = 20.0
:用于计算转向角的离散化值,即将最大转向角范围分成多少个不同的离散值。
⑥. COLLISION_CHECK_STEP = 5
:碰撞检测时,跳过的节点数,即每隔多少个节点进行一次碰撞检测,用于减少碰撞检测的计算量。
⑦. EXTEND_BOUND = 1
:碰撞检测范围的扩展值,用于在实际碰撞检测时,将检测范围扩展一定距离。
⑧. GEAR_COST = 100.0
:切换方向的惩罚成本,当车辆需要切换行驶方向时,会增加这个成本。
⑨. BACKWARD_COST = 5.0
:后退行驶的惩罚成本,当车辆需要后退时,会增加这个成本。
⑩. STEER_CHANGE_COST = 5.0
:转向角变化的惩罚成本,用于惩罚车辆在连续节点之间的转向角变化。
⑪. STEER_ANGLE_COST = 1.0`:转向角度的惩罚成本,用于惩罚车辆的转向角度。
⑫. H_COST = 15.0
:启发式成本的惩罚成本,用于引导启发式搜索算法的探索方向。
⑬. RF = 4.5
:车辆从后部到车辆前部的距离。
⑭. RB = 1.0
:车辆从后部到车辆后部的距离。
⑮. W = 3.0
:车辆的宽度。
⑯. WD = 0.7 * W
:左右车轮之间的距离,用于计算车辆的转向角。
⑰. WB = 3.5
:车辆的轴距,表示前后轮之间的距离。
⑱. TR = 0.5
:车轮的半径。
⑲. TW = 1
:车轮的宽度。
⑳. MAX_STEER = 0.6
:最大转向角,表示车辆的最大转向角度。
这些参数会在路径规划的不同阶段用于计算代价、转向角、碰撞检测等,以实现车辆的路径规划和运动控制。
3. 定义节点类 Node
:
class Node:
def __init__(self, xind, yind, yawind, direction, x, y,
yaw, directions, steer, cost, pind):
self.xind = xind
self.yind = yind
self.yawind = yawind
self.direction = direction
self.x = x
self.y = y
self.yaw = yaw
self.directions = directions
self.steer = steer
self.cost = cost
self.pind = pind
这段代码定义了一个名为 Node
的类,用于表示路径规划中的节点。每个节点存储了与路径规划相关的信息,以下是每个属性的解释:
①. xind
:节点所在的 X 坐标索引。
②. yind
:节点所在的 Y 坐标索引。
③. yawind
:节点的航向角索引。
④. direction
:车辆的行驶方向,值为 1 表示正向,-1 表示反向。
⑤. x
:车辆路径中的 X 坐标列表。
⑥. y
:车辆路径中的 Y 坐标列表。
⑦. yaw
:车辆路径中的航向角列表。
⑧. directions
:车辆在每个路径节点上的行驶方向列表。
⑨. steer
:车辆在节点上的转向角。
⑩. cost
:从起始节点到当前节点的路径代价。
⑪. pind
:父节点在路径规划中的索引,用于追踪路径的连接。
这个 Node
类用于在路径规划过程中存储节点的信息,以便计算路径代价、生成路径、进行碰撞检测等操作。每个节点都包含了一系列与路径规划相关的属性,用于描述车辆在路径中的状态和轨迹。
4. 定义参数类 Para
:
class Para:
def __init__(self, minx, miny, minyaw, maxx, maxy, maxyaw,
xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree):
self.minx = minx
self.miny = miny
self.minyaw = minyaw
self.maxx = maxx
self.maxy = maxy
self.maxyaw = maxyaw
self.xw = xw
self.yw = yw
self.yaww = yaww
self.xyreso = xyreso
self.yawreso = yawreso
self.ox = ox
self.oy = oy
self.kdtree = kdtree
这段代码定义了一个名为 Para
的类,用于存储路径规划所需的各种参数和计算结果。这个类的每个属性都用于存储路径规划的参数和中间计算结果,以下是每个属性的解释:
①. minx
:地图中 X 坐标的最小值。
②. miny
:地图中 Y 坐标的最小值。
③. minyaw
:地图中航向角的最小值。
④. maxx
:地图中 X 坐标的最大值。
⑤. maxy
:地图中 Y 坐标的最大值。
⑥. maxyaw
:地图中航向角的最大值。
⑦. xw
:X 方向上的网格数。
⑧. yw
:Y 方向上的网格数。
⑨. yaww
:航向角方向上的网格数。
⑩. xyreso
:XY 坐标分辨率。
⑪. yawreso
:航向角分辨率。
⑫. ox
:障碍物的 X 坐标列表。
⑬. oy
:障碍物的 Y 坐标列表。
⑭. kdtree
:使用 k-d 树表示的障碍物数据结构,用于快速查找最近的障碍物点。
这个 Para
类用于封装路径规划所需的各种参数和计算结果,以便在算法的不同阶段方便地获取和传递这些数据。它允许将路径规划中的参数和计算结果组织成一个对象,使代码更具可读性和可维护性。
5. 定义路径类 Path
:
class Path:
def __init__(self, x, y, yaw, direction, cost):
self.x = x
self.y = y
self.yaw = yaw
self.direction = direction
self.cost = cost
这段代码定义了一个名为 Path
的类,用于存储路径规划的结果。每个 Path
对象包含了描述车辆路径的各个属性,以下是每个属性的解释:
①. x
:路径中每个节点的 X 坐标列表。
②. y
:路径中每个节点的 Y 坐标列表。
③. yaw
:路径中每个节点的航向角列表。
④. direction
:路径中每个节点的行驶方向列表,值为 1 表示正向,-1 表示反向。
⑤. cost
:路径的总代价,表示从起始节点到终点节点的代价。
这个 Path
类用于封装路径规划的结果,以便在算法完成后方便地获取和传递路径的各个属性,以及计算路径的总代价。它用于存储车辆的路径信息,包括坐标、航向角、行驶方向和代价等。
6. 定义优先级队列类 QueuePrior
:
class QueuePrior:
def __init__(self):
self.queue = heapdict()
def empty(self):
return len(self.queue) == 0
def put(self, item, priority):
self.queue[item] = priority
def get(self):
return self.queue.popitem()[0]
这段代码定义了一个名为 QueuePrior
的类,实现了一个优先级队列(Priority Queue)的数据结构。优先级队列用于存储元素,并确保按照一定的优先级顺序进行插入和删除操作。以下是这个类的每个方法的解释:
①. __init__(self)
:初始化优先级队列的构造函数。它创建一个 heapdict
数据结构作为队列的实现,该数据结构是基于堆和字典的混合数据结构,允许在 O(1) 时间内执行插入和删除操作。
②. empty(self)
:判断优先级队列是否为空。如果队列中没有元素,返回 True
;否则,返回 False
。
③. put(self, item, priority)
:将元素 item
插入队列,并指定它的优先级为 priority
。在 heapdict
中,插入元素相当于在字典中添加一个键值对,键是元素,值是其优先级。这个操作的时间复杂度为 O(1)。
④. get(self)
:从队列中弹出优先级最小的元素,并返回它。在 heapdict
中,通过 popitem()
方法可以获取键值对中优先级最小的键,并将其从字典中删除。这个操作的时间复杂度为 O(1)。
这个 QueuePrior
类实现了优先级队列的基本功能,它用于在路径规划的过程中管理待处理的节点,并根据节点的代价或优先级进行排序和选择。
7 . extract_path函数
extract_path函数用于从封闭集(closed set)中提取并构造最终的路径。在这个函数中,从目标节点开始,它通过回溯封闭集中的节点,从而构造出完整的路径信息。路径包括x、y坐标、航向角、方向以及路径成本。它会检查当前节点是否回溯到起始节点,以确定是否结束回溯。最终,它构造一个Path对象,表示从起始点到目标点的规划路径,该路径是按照正确的顺序排列的。
def extract_path(closed, ngoal, nstart):
rx, ry, ryaw, direc = [], [], [], [] # 创建空列表用于存储路径信息
cost = 0.0 # 初始化路径成本
node = ngoal # 从目标节点开始回溯路径
while True:
rx += node.x[::-1] # 将当前节点的x坐标列表反向加入rx列表
ry += node.y[::-1] # 将当前节点的y坐标列表反向加入ry列表
ryaw += node.yaw[::-1] # 将当前节点的航向角列表反向加入ryaw列表
direc += node.directions[::-1] # 将当前节点的方向列表反向加入direc列表
cost += node.cost # 将当前节点的成本累加到总成本中
if is_same_grid(node, nstart): # 如果当前节点已经回溯到起始节点
break # 停止回溯
node = closed[node.pind] # 获取上一节点,继续回溯
rx = rx[::-1] # 将rx列表反向,得到正确的路径顺序
ry = ry[::-1] # 将ry列表反向,得到正确的路径顺序
ryaw = ryaw[::-1] # 将ryaw列表反向,得到正确的路径顺序
direc = direc[::-1] # 将direc列表反向,得到正确的路径顺序
direc[0] = direc[1] # 将起始节点的方向设置为与第二个节点的方向相同
path = Path(rx, ry, ryaw, direc, cost) # 创建Path对象,表示完整路径
return path # 返回构造的路径对象
下面逐步解释代码的每个部分:
def extract_path(closed, ngoal, nstart):
rx, ry, ryaw, direc = [], [], [], []
cost = 0.0
node = ngoal
- closed
: 闭集,这是在路径搜索中存储已经探索过的节点的集合。
- ngoal
: 目标节点,即路径规划的目标终点。
- nstart
: 起始节点,即路径规划的起始点。
这段代码首先初始化了一些列表和变量,其中 rx
, ry
, ryaw
, 和 direc
分别用来存储路径的 x 坐标、y 坐标、航向角、和方向信息。cost
是路径的累计成本,node
初始化为目标节点。
while True:
rx += node.x[::-1]
ry += node.y[::-1]
ryaw += node.yaw[::-1]
direc += node.directions[::-1]
cost += node.cost
这是主要的循环部分。在每次循环迭代中,代码从当前节点 node
中提取其路径上的 x 坐标、y 坐标、航向角、和方向信息,并将它们分别添加到 rx
, ry
, ryaw
, 和 direc
列表的末尾。这是因为我们是从目标节点往回回溯路径的,所以需要将节点信息倒序添加到列表中。
累计 cost
变量的值,将当前节点的成本累加到总成本中。
if is_same_grid(node, nstart):
break
在每次循环中,检查当前节点是否达到了起始节点。如果当前节点已经是起始节点,则回溯结束,退出循环。
node = closed[node.pind]
如果当前节点不是起始节点,那么将当前节点的 pind
属性指向的节点从闭集 closed
中取出,这个节点即为当前节点的父节点。这就实现了从目标节点往回回溯的过程。
rx = rx[::-1]
ry = ry[::-1]
ryaw = ryaw[::-1]
direc = direc[::-1]
完成循环后,列表中的节点信息是倒序排列的,这里通过切片操作将它们倒序恢复为正序。
direc[0] = direc[1]
path = Path(rx, ry, ryaw, direc, cost)
return path
在提取完整的路径信息后,将 direc
列表中的第一个方向修正为第二个方向,这是因为在往回回溯时,第一个方向可能会有特殊的处理。
最后,基于提取到的路径信息构建一个 Path
对象,并将路径信息封装到这个对象中,然后将该对象返回作为函数的输出。这个 Path
对象包括了路径的 x、y 坐标、航向角、方向以及累计成本等信息,即路径规划的结果。
总结起来,extract_path
函数的作用是从目标节点往回回溯,提取路径上的节点信息,并将这些信息组合成一个 Path
对象,表示一条路径。
8 . calc_next_node函数
calc_next_node函数实现了计算下一个路径规划节点的功能。在Hybrid A* 算法 的路径搜索过程中,算法会从当前节点生成下一个可能的节点,并对这些节点进行评估以确定是否继续探索。
def calc_next_node(n_curr, c_id, u, d, P):
step = C.XY_RESO * 2
nlist = math.ceil(step / C.MOVE_STEP)
xlist = [n_curr.x[-1] + d * C.MOVE_STEP * math.cos(n_curr.yaw[-1])]
ylist = [n_curr.y[-1] + d * C.MOVE_STEP * math.sin(n_curr.yaw[-1])]
yawlist = [rs.pi_2_pi(n_curr.yaw[-1] + d * C.MOVE_STEP / C.WB * math.tan(u))]
for i in range(nlist - 1):
xlist.append(xlist[i] + d * C.MOVE_STEP * math.cos(yawlist[i]))
ylist.append(ylist[i] + d * C.MOVE_STEP * math.sin(yawlist[i]))
yawlist.append(rs.pi_2_pi(yawlist[i] + d * C.MOVE_STEP / C.WB * math.tan(u)))
xind = round(xlist[-1] / P.xyreso)
yind = round(ylist[-1] / P.xyreso)
yawind = round(yawlist[-1] / P.yawreso)
if not is_index_ok(xind, yind, xlist, ylist, yawlist, P):
return None
cost = 0.0
if d > 0:
direction = 1
cost += abs(step)
else:
direction = -1
cost += abs(step) * C.BACKWARD_COST
if direction != n_curr.direction: # switch back penalty
cost += C.GEAR_COST
cost += C.STEER_ANGLE_COST * abs(u) # steer angle penalyty
cost += C.STEER_CHANGE_COST * abs(n_curr.steer - u) # steer change penalty
cost = n_curr.cost + cost
directions = [direction for _ in range(len(xlist))]
node = Node(xind, yind, yawind, direction, xlist, ylist,
yawlist, directions, u, cost, c_id)
return node
下面进行逐步解释:
def calc_next_node(n_curr, c_id, u, d, P):
step = C.XY_RESO * 2
nlist = math.ceil(step / C.MOVE_STEP)
xlist = [n_curr.x[-1] + d * C.MOVE_STEP * math.cos(n_curr.yaw[-1])]
ylist = [n_curr.y[-1] + d * C.MOVE_STEP * math.sin(n_curr.yaw[-1])]
yawlist = [rs.pi_2_pi(n_curr.yaw[-1] + d * C.MOVE_STEP / C.WB * math.tan(u))]
- n_curr
: 当前节点,是路径规划过程中的一个节点。
- c_id
: 当前节点的标识,即索引。
- u
: 转向角,表示车辆的转向角度。
- d
: 行进方向,表示车辆的前进或后退。
- P
: 参数对象,包含了路径规划所需的各种参数。
首先,算法计算一个步长 step
,然后根据步长计算出需要生成的下一个节点数量 nlist
。接下来,算法初始化 xlist
、ylist
和 yawlist
列表,用于存储下一个节点的 x 坐标、y 坐标和航向角信息。
xlist
和 ylist
初始化为当前节点的 x 坐标和 y 坐标,加上车辆在 d
方向上行驶 C.MOVE_STEP
距离后的坐标。yawlist
则计算了车辆转向后的航向角。
for i in range(nlist - 1):
xlist.append(xlist[i] + d * C.MOVE_STEP * math.cos(yawlist[i]))
ylist.append(ylist[i] + d * C.MOVE_STEP * math.sin(yawlist[i]))
yawlist.append(rs.pi_2_pi(yawlist[i] + d * C.MOVE_STEP / C.WB * math.tan(u)))
在这个循环中,根据上述计算得到的第一个节点坐标和航向角,通过不断累加进行插值,计算出剩余的节点的 x、y 坐标和航向角。这里使用的插值方法是在车辆的转向角度变化下,计算车辆每次行驶 C.MOVE_STEP
距离时的坐标和航向角。
xind = round(xlist[-1] / P.xyreso)
yind = round(ylist[-1] / P.xyreso)
yawind = round(yawlist[-1] / P.yawreso)
if not is_index_ok(xind, yind, xlist, ylist, yawlist, P):
return None
接下来,根据最后一个节点的坐标和航向角,将其转换为在地图上的索引坐标。然后,通过调用 is_index_ok
函数检查这些索引坐标是否合法,即是否在地图范围内且不与障碍物冲突。如果不合法,说明生成的节点在地图外或与障碍物冲突,此时会返回 None
。
cost = 0.0
if d > 0:
direction = 1
cost += abs(step)
else:
direction = -1
cost += abs(step) * C.BACKWARD_COST
if direction != n_curr.direction: # switch back penalty
cost += C.GEAR_COST
cost += C.STEER_ANGLE_COST * abs(u) # steer angle penalty
cost += C.STEER_CHANGE_COST * abs(n_curr.steer - u) # steer change penalty
cost = n_curr.cost + cost
在这一部分,计算生成的节点的成本。首先,根据 d
的值判断节点的行进方向是前进还是后退,并为其分别计算成本。如果是前进,直接累加 step
的绝对值;如果是后退,将 step
的绝对值乘以 C.BACKWARD_COST
,即向后行驶的成本。
然后,通过比较生成的节点的行进方向和父节点的行进方向,如果两者不一致,说明需要切换车辆的行进方向,此时会加上一个切换成本 C.GEAR_COST
。
接下来,根据转向角 u
的绝对值,累加转向角度的惩罚成本 C.STEER_ANGLE_COST
。还有一个惩罚项是转向角度的变化成本,通过计算当前节点的转向角度 n_curr.steer
和生成节点的转向角度 u
的差值,并乘以 C.STEER_CHANGE_COST
,加到总成本中。
最后,将所有成本相加,得到生成节点的总成本。
directions = [direction for
_ in range(len(xlist))]
node = Node(xind, yind, yawind, direction, xlist, ylist,
yawlist, directions, u, cost, c_id)
return node
将生成的行进方向信息复制多次,以匹配生成的节点数量,存储在 directions
列表中。然后,使用所有计算出的信息创建一个新的节点对象 Node
,并将节点的索引、坐标、航向角、行进方向、转向角、成本等信息传递给该节点对象。
最后,将构建好的节点对象返回作为函数的输出。这个函数的作用是根据当前节点、转向角、行进方向等信息,计算并返回下一个可能的路径规划节点。
9 . is_index_ok函数
is_index_ok
函数用于判断生成的路径规划节点的索引坐标是否在地图范围内,并且检查这些节点是否与障碍物发生碰撞。
def is_index_ok(xind, yind, xlist, ylist, yawlist, P):
if xind <= P.minx or \
xind >= P.maxx or \
yind <= P.miny or \
yind >= P.maxy:
return False
ind = range(0, len(xlist), C.COLLISION_CHECK_STEP)
nodex = [xlist[k] for k in ind]
nodey = [ylist[k] for k in ind]
nodeyaw = [yawlist[k] for k in ind]
if is_collision(nodex, nodey, nodeyaw, P):
return False
return True
下面给出详细介绍:
def is_index_ok(xind, yind, xlist, ylist, yawlist, P):
if xind <= P.minx or \
xind >= P.maxx or \
yind <= P.miny or \
yind >= P.maxy:
return False
首先,它检查生成的节点的 xind
是否小于地图范围的最小值 P.minx
,或者大于地图范围的最大值 P.maxx
,或者 yind
是否小于 P.miny
或大于 P.maxy
。如果任何一个条件成立,说明节点位于地图边界外,此时返回 False
,表示索引不合法。
ind = range(0, len(xlist), C.COLLISION_CHECK_STEP)
nodex = [xlist[k] for k in ind]
nodey = [ylist[k] for k in ind]
nodeyaw = [yawlist[k] for k in ind]
然后,程序计算一个索引范围 ind
,用于选择在 xlist
、ylist
和 yawlist
中的哪些值需要进行碰撞检查。这里使用的步长是 C.COLLISION_CHECK_STEP
,表示每隔多少个节点进行一次碰撞检查。
接着,程序根据 ind
的范围,将对应索引处的值从 xlist
、ylist
和 yawlist
中提取出来,分别存储在 nodex
、nodey
和 nodeyaw
列表中。
if is_collision(nodex, nodey, nodeyaw, P):
return False
return True
最后,程序调用函数 is_collision
,将 nodex
、nodey
和 nodeyaw
传递给它,以及参数对象 P
,用于检查这些节点是否与障碍物发生碰撞。如果发生碰撞,则返回 False
,否则返回 True
。
总之,这个函数用于检查生成的路径规划节点的索引是否在地图范围内,并且对一部分节点进行了碰撞检查,以确保生成的节点不会与障碍物发生碰撞。
10. update_node_with_analystic_expantion函数
update_node_with_analystic_expantion
函数,用于分析判断是否存在从当前拓展点到目标点的可行reeds sheep曲线路径,若存在则返回TRUE及Reeds sheep路径信息,若不存在则返回FALSE和None,其核心部分是调用analystic_expantion函数实现的。
def update_node_with_analystic_expantion(n_curr, ngoal, P):
path = analystic_expantion(n_curr, ngoal, P) # rs path: n -> ngoal
if not path:
return False, None
fx = path.x[1:-1]
fy = path.y[1:-1]
fyaw = path.yaw[1:-1]
fd = path.directions[1:-1]
fcost = n_curr.cost + calc_rs_path_cost(path)
fpind = calc_index(n_curr, P)
fsteer = 0.0
fpath = Node(n_curr.xind, n_curr.yind, n_curr.yawind, n_curr.direction,
fx, fy, fyaw, fd, fsteer, fcost, fpind)
return True, fpath
下面给出详细的介绍:
def update_node_with_analystic_expantion(n_curr, ngoal, P):
path = analystic_expantion(n_curr, ngoal, P) # rs path: n -> ngoal
if not path:
return False, None
首先,函数调用了 analystic_expantion
函数,传递了当前节点 n_curr
、目标节点 ngoal
以及参数对象 P
。analystic_expantion
函数会尝试分析路径规划,从当前节点到目标节点是否有更优的路径。如果没有找到更优的路径,则返回 False
表示无法更新节点。
如果 analystic_expantion
找到了从当前节点到目标节点的可行Reeds shee曲线路径,它会返回一个路径对象 path
,然后代码继续执行。
fx = path.x[1:-1]
fy = path.y[1:-1]
fyaw = path.yaw[1:-1]
fd = path.directions[1:-1]
从找到的路径对象 path
中提取出除去起始和终点的路径信息,分别存储在 fx
、fy
、fyaw
和 fd
列表中。这些列表中的数据将用于创建一个新的节点。
fcost = n_curr.cost + calc_rs_path_cost(path)
计算新节点的成本,新节点的成本由当前节点的成本加上从当前节点到目标节点的路径 path
的成本计算得到。
fpind = calc_index(n_curr, P)
fsteer = 0.0
fpath = Node(n_curr.xind, n_curr.yind, n_curr.yawind, n_curr.direction,
fx, fy, fyaw, fd, fsteer, fcost, fpind)
return True, fpath
计算新节点的索引 fpind
,然后创建一个新的节点对象 fpath
,该节点对象包括了新的索引、坐标、航向角、行进方向、转向角、成本等信息。
最后,函数返回一个元组 (True, fpath)
,表示成功更新节点,同时返回更新后的节点对象 fpath
(该节点中存储了当前拓展点到目标点Reeds sheep算法找到的可行路径),以及新的成本信息
11.analystic_expantion函数
analystic_expantion函数会尝试调用Reeds_sheep算法查找48种reeds_sheep曲线中从当前扩展点到目标点的所有可行曲线,若不存在可行曲线,则返回None,若存在可行曲线,则对所有可行曲线的代价值进行计算(这里的代价计算考虑了倒车惩罚代价、转向惩罚代价、角度变换惩罚代价等各种惩罚值),然后将其存放在一个优先级队列中,然后依次从该优先级队列中选取代价值最小的路径,采样路径点,进行碰撞检查,直至找到通过碰撞检查的路径,返回该路径,或者所有的路径均没有通过碰撞检测,返回None。
def analystic_expantion(node, ngoal, P):
sx, sy, syaw = node.x[-1], node.y[-1], node.yaw[-1]
gx, gy, gyaw = ngoal.x[-1], ngoal.y[-1], ngoal.yaw[-1]
maxc = math.tan(C.MAX_STEER) / C.WB
paths = rs.calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=C.MOVE_STEP)
if not paths:
return None
pq = QueuePrior()
for path in paths:
pq.put(path, calc_rs_path_cost(path))
while not pq.empty():
path = pq.get()
ind = range(0, len(path.x), C.COLLISION_CHECK_STEP)
pathx = [path.x[k] for k in ind]
pathy = [path.y[k] for k in ind]
pathyaw = [path.yaw[k] for k in ind]
if not is_collision(pathx, pathy, pathyaw, P):
return path
return None
下面给出详细的介绍:
def analystic_expantion(node, ngoal, P):
sx, sy, syaw = node.x[-1], node.y[-1], node.yaw[-1]
gx, gy, gyaw = ngoal.x[-1], ngoal.y[-1], ngoal.yaw[-1]
函数首先获取当前节点的最新坐标和航向角,以及目标节点的最新坐标和航向角。
maxc = math.tan(C.MAX_STEER) / C.WB
计算车辆最大转向角对应的转弯半径。
paths = rs.calc_all_paths(sx, sy, syaw, gx, gy, gyaw, maxc, step_size=C.MOVE_STEP)
调用 rs.calc_all_paths
函数,该函数是根据 Reeds-Shepp 路径类型计算从起始状态 (sx, sy, syaw)
到目标状态 (gx, gy, gyaw)
的所有可能路径。step_size
参数指定了每步的步长。
if not paths:
return None
如果未找到任何路径,返回 None
,表示无法进行路径扩展。
pq = QueuePrior()
for path in paths:
pq.put(path, calc_rs_path_cost(path))
如果找到了路径,则创建一个优先级队列 pq
,将所有找到的路径放入队列,并按照路径成本进行排序。路径成本通过调用 calc_rs_path_cost
函数计算。
while not pq.empty():
path = pq.get()
ind = range(0, len(path.x), C.COLLISION_CHECK_STEP)
pathx = [path.x[k] for k in ind]
pathy = [path.y[k] for k in ind]
pathyaw = [path.yaw[k] for k in ind]
if not is_collision(pathx, pathy, pathyaw, P):
return path
从优先级队列中依次弹出路径,并使用间隔为 C.COLLISION_CHECK_STEP
的索引,从路径中提取出部分点。然后,通过调用 is_collision
函数检查这些点是否与障碍物发生碰撞。如果没有发生碰撞,返回这条路径作为扩展路径。
如果优先级队列中的所有路径都与障碍物发生碰撞,则返回 None
,表示无法找到可行的扩展路径。
总之,analystic_expantion
函数的目的是通过使用 Reeds-Shepp 路径类型计算从当前状态到目标状态的所有可能路径,并尝试找到一条不与障碍物发生碰撞的路径作为扩展路径。
12 . is_collision函数
is_collision函数用于检测车辆是否与障碍物发生碰撞。
def is_collision(x, y, yaw, P):
for ix, iy, iyaw in zip(x, y, yaw):
d = 1
dl = (C.RF - C.RB) / 2.0
r = (C.RF + C.RB) / 2.0 + d
cx = ix + dl * math.cos(iyaw)
cy = iy + dl * math.sin(iyaw)
ids = P.kdtree.query_ball_point([cx, cy], r)
if not ids:
continue
for i in ids:
xo = P.ox[i] - cx
yo = P.oy[i] - cy
dx = xo * math.cos(iyaw) + yo * math.sin(iyaw)
dy = -xo * math.sin(iyaw) + yo * math.cos(iyaw)
if abs(dx) < r and abs(dy) < C.W / 2 + d:
return True
return False
下面给出详细介绍:
def is_collision(x, y, yaw, P):
for ix, iy, iyaw in zip(x, y, yaw):
函数接受三个参数:车辆的 x 坐标数组 x
、y 坐标数组 y
,以及航向角数组 yaw
。这些数组中的每个元素对应车辆在不同时间步长的状态。
d = 1
dl = (C.RF - C.RB) / 2.0
r = (C.RF + C.RB) / 2.0 + d
定义了一些参数,其中 d
表示扩展碰撞检测范围,dl
表示车辆前后轮之间的距离差的一半,r
表示车辆的半径。
cx = ix + dl * math.cos(iyaw)
cy = iy + dl * math.sin(iyaw)
根据当前的 x、y 坐标以及航向角计算车辆前部中心点的坐标 (cx, cy)
,即车辆的前轴中心点坐标。
ids = P.kdtree.query_ball_point([cx, cy], r)
使用 K-D 树(k-dimensional tree)进行近邻点查询,找到位于以车辆前部中心为中心,以 r
为半径的范围内的所有障碍物点的索引。
if not ids:
continue
如果在查询范围内没有障碍物点,继续循环下一个状态。
for i in ids:
xo = P.ox[i] - cx
yo = P.oy[i] - cy
dx = xo * math.cos(iyaw) + yo * math.sin(iyaw)
dy = -xo * math.sin(iyaw) + yo * math.cos(iyaw)
if abs(dx) < r and abs(dy) < C.W / 2 + d:
return True
对于位于查询范围内的每个障碍物点,计算其相对于车辆前部中心点的相对坐标 (xo, yo)
。然后,将这些相对坐标通过旋转变换(根据车辆航向角)映射到车辆坐标系。接着,检查映射后的坐标是否在车辆的碰撞范围内,即是否发生了碰撞。
如果发现任何一个障碍物与车辆发生碰撞,则返回 True
,表示发生了碰撞。如果没有发生碰撞,最后返回 False
。这个函数用于检查车辆在给定的状态下是否与障碍物发生了碰撞。
13 . calc_rs_path_cost函数
calc_rs_path_cost函数用于计算路径的成本,即评估给定路径的优劣程度。
def calc_rs_path_cost(rspath):
cost = 0.0
for lr in rspath.lengths:
if lr >= 0:
cost += 1
else:
cost += abs(lr) * C.BACKWARD_COST
for i in range(len(rspath.lengths) - 1):
if rspath.lengths[i] * rspath.lengths[i + 1] < 0.0:
cost += C.GEAR_COST
for ctype in rspath.ctypes:
if ctype != "S":
cost += C.STEER_ANGLE_COST * abs(C.MAX_STEER)
nctypes = len(rspath.ctypes)
ulist = [0.0 for _ in range(nctypes)]
for i in range(nctypes):
if rspath.ctypes[i] == "R":
ulist[i] = -C.MAX_STEER
elif rspath.ctypes[i] == "WB":
ulist[i] = C.MAX_STEER
for i in range(nctypes - 1):
cost += C.STEER_CHANGE_COST * abs(ulist[i + 1] - ulist[i])
return cost
详细介绍如下:
def calc_rs_path_cost(rspath):
cost = 0.0
初始化路径成本为0。
for lr in rspath.lengths:
if lr >= 0:
cost += 1
else:
cost += abs(lr) * C.BACKWARD_COST
遍历路径中的每个路径段的长度(可能是正数或负数)。如果长度是正数,将成本增加1。如果长度是负数,将成本增加绝对值乘以 C.BACKWARD_COST
,即后退行驶的代价。
for i in range(len(rspath.lengths) - 1):
if rspath.lengths[i] * rspath.lengths[i + 1] < 0.0:
cost += C.GEAR_COST
遍历路径长度数组,检查相邻的路径段是否具有不同的行驶方向(一个是前进,一个是后退)。如果是,则增加 C.GEAR_COST
,表示换挡的代价。
for ctype in rspath.ctypes:
if ctype != "S":
cost += C.STEER_ANGLE_COST * abs(C.MAX_STEER)
遍历路径的曲线类型数组,如果不是直线(“S”),则增加 C.STEER_ANGLE_COST
乘以最大转角 C.MAX_STEER
的代价,表示曲线行驶的代价。
nctypes = len(rspath.ctypes)
ulist = [0.0 for _ in range(nctypes)]
for i in range(nctypes):
if rspath.ctypes[i] == "R":
ulist[i] = -C.MAX_STEER
elif rspath.ctypes[i] == "WB":
ulist[i] = C.MAX_STEER
创建一个列表 ulist
,用于存储路径每个曲线段对应的转向角度。遍历路径的曲线类型数组,如果是右转(“R”),则将相应的转向角度设置为 -C.MAX_STEER
,如果是左转(“WB”),则将相应的转向角度设置为 C.MAX_STEER
。
for i in range(nctypes - 1):
cost += C.STEER_CHANGE_COST * abs(ulist[i + 1] - ulist[i])
return cost
遍历转向角度列表,计算相邻两个转向角之间的变化的绝对值,并将其乘以 C.STEER_CHANGE_COST
加入到成本中,表示转向角变化的代价。最后返回计算出的总成本值。
该函数基于路径的长度、曲线类型以及转向角变化等因素来评估路径的优劣程度,从而帮助路径规划算法选择更优的路径。
14 . calc_hybrid_cost函数
calc_hybrid_cost函数用于计算Hybrid A * 算法中节点的成本,结合了路径规划成本和启发式地图成本,用于评估节点的优劣程度。
def calc_hybrid_cost(node, hmap, P):
cost = node.cost + \
C.H_COST * hmap[node.xind - P.minx][node.yind - P.miny]
return cost
详细介绍如下:
这个函数接收三个参数:
- node
: 表示一个节点,包含了路径规划信息,如位置、方向、成本等。
- hmap
: 是一个启发式地图,包含了每个地图格子的启发式成本(或估计值)。
- P
: 是一个包含地图和路径规划参数的对象。
函数首先计算了混合成本,它由两部分组成:
①. node.cost
: 这是从起始节点到当前节点的累计路径规划成本。
②. C.H_COST * hmap[node.xind - P.minx][node.yind - P.miny]
: 这部分是启发式地图的成本。C.H_COST
是一个常数,用于调整路径规划成本和地图成本之间的权重。hmap[node.xind - P.minx][node.yind - P.miny]
是从启发式地图中获取当前节点位置对应的启发式成本。
最后,将路径规划成本和启发式地图成本相加,得到一个综合的混合成本,并将其作为函数的返回值。
该函数的目的是为了在路径规划时综合考虑路径的实际规划成本和启发式地图提供的信息,以更好地指导路径搜索算法选择节点,从而获得更优的路径。
15 . calc_motion_set函数
calc_motion_set函数用于计算可用的车辆操控动作集合,包括转向角度和方向。
def calc_motion_set():
s = np.arange(C.MAX_STEER / C.N_STEER,
C.MAX_STEER, C.MAX_STEER / C.N_STEER)
steer = list(s) + [0.0] + list(-s)
direc = [1.0 for _ in range(len(steer))] + [-1.0 for _ in range(len(steer))]
steer = steer + steer
return steer, direc
详细介绍如下:
这个函数没有接收任何参数。它的目的是生成一组可以用于车辆路径规划的操控动作。主要包括两个方面:转向角度和方向。
首先,函数使用了 numpy
库中的 np.arange()
函数生成一系列转向角度值 s
。这些值是从最小的转向角(非零值)开始,以 C.MAX_STEER / C.N_STEER
的增量递增,直到达到最大的转向角 C.MAX_STEER
。这样做是为了生成一组适当的转向角度,覆盖从左转到右转的范围。
然后,这些转向角度被用于构建两个列表:steer
和 direc
。steer
列表包括了正向转向角度、零角度和负向转向角度,以及它们的一个重复。direc
列表表示每个转向角度对应的方向,正向转向的方向是 1.0,负向转向的方向是 -1.0。
最后,将 steer
和 direc
合并成一个元组,并作为函数的返回值。
这个函数的作用是为路径规划算法提供一个操控动作集合,算法可以从中选择适当的动作来生成候选路径,并在路径搜索中应用这些动作。
16 . is_same_grid函数
is_same_grid函数用于判断两个节点是否位于相同的栅格(格子)上
def is_same_grid(node1, node2):
if node1.xind != node2.xind or \
node1.yind != node2.yind or \
node1.yawind != node2.yawind:
return False
return True
这个函数接收两个节点对象 node1
和 node2
作为参数,然后比较它们的 x 坐标索引、y 坐标索引以及偏航角索引是否相同。这些索引表示节点在栅格地图中的位置。如果这些索引中有任何一个不同,函数会返回 False
,表示这两个节点不在相同的栅格上。如果所有的索引都相同,函数会返回 True
,表示这两个节点在相同的栅格上。
这个函数的作用是在路径搜索算法中,判断当前节点和目标节点是否在同一个栅格上,以便在一些情况下进行路径规划的优化,例如在同一个栅格上直接进行局部规划,而不需要进行全局搜索。
17 . calc_index函数
calc_index(node, P)
函数用于计算给定节点在一维数组(通常用于表示二维栅格地图或状态空间)中的索引。
def calc_index(node, P):
ind = (node.yawind - P.minyaw) * P.xw * P.yw + \
(node.yind - P.miny) * P.xw + \
(node.xind - P.minx)
return ind
这个函数接收两个参数:一个节点对象 node
和一个参数对象 P
,该参数对象包含了与索引计算有关的信息,如最小 x 坐标、最小 y 坐标、最小偏航角等。
函数使用了三个维度的索引计算公式来将二维坐标以及偏航角索引映射到一个一维数组中的索引。这个一维数组可以用来表示一个多维状态空间或栅格地图,以便在搜索和规划算法中更高效地存储和访问状态信息。
- (node.yawind - P.minyaw) * P.xw * P.yw
:这部分计算使用了偏航角索引,并将其映射到一维数组中。P.xw
和 P.yw
是与坐标轴尺寸相关的因子,用于在多维索引中进行合适的映射。
- (node.yind - P.miny) * P.xw
:这部分计算使用了 y 坐标索引,并将其映射到一维数组中,乘以 P.xw
是为了在 x 方向上进行映射。
- (node.xind - P.minx)
:这部分计算使用了 x 坐标索引,并将其直接映射到一维数组中。
最终,这些部分相加得到节点在一维数组中的索引 ind
。这个索引可以用于在一维数组中存储和检索与该节点相关的信息,例如代价、状态等。
18 . calc_parameters函数
calc_parameters函数用于计算参数并返回一个包含计算结果的参数对象。
def calc_parameters(ox, oy, xyreso, yawreso, kdtree):
minx = round(min(ox) / xyreso)
miny = round(min(oy) / xyreso)
maxx = round(max(ox) / xyreso)
maxy = round(max(oy) / xyreso)
xw, yw = maxx - minx, maxy - miny
minyaw = round(-C.PI / yawreso) - 1
maxyaw = round(C.PI / yawreso)
yaww = maxyaw - minyaw
return Para(minx, miny, minyaw, maxx, maxy, maxyaw,
xw, yw, yaww, xyreso, yawreso, ox, oy, kdtree)
这个函数接收一系列参数:ox
和 oy
是障碍物的 x 和 y 坐标,xyreso
是 x 和 y 坐标的分辨率,yawreso
是偏航角的分辨率,kdtree
是一个 k-d 树,用于高效地进行最近邻查询。
函数首先根据提供的障碍物坐标以及分辨率计算出一些参数,然后使用这些参数创建并返回一个参数对象 Para
,该对象保存了计算结果。
计算过程如下:
①. minx
和 miny
分别计算出 x 和 y 坐标的最小值对应的栅格索引,这些索引被四舍五入为整数。同理,maxx
和 maxy
分别计算出 x 和 y 坐标的最大值对应的栅格索引。
②. xw
和 yw
分别计算出 x 和 y 方向上的栅格宽度,通过 maxx - minx
和 maxy - miny
计算。
③. minyaw
和 maxyaw
分别计算出偏航角的最小值和最大值对应的栅格索引。yawreso
表示每个栅格的偏航角范围,这里将范围扩展了一些,以确保包含全部的偏航角范围。
④. yaww
计算出偏航角的宽度,即栅格偏航角范围的个数。
⑤. 最后,使用计算得到的参数创建一个参数对象 Para
,并返回。这个对象包含了所有计算过程中涉及的信息,用于后续的路径规划和搜索操作。
19 . draw_car函数
def draw_car(x, y, yaw, steer, color='black'):
# 定义车辆轮廓
car = np.array([[-C.RB, -C.RB, C.RF, C.RF, -C.RB],
[C.W / 2, -C.W / 2, -C.W / 2, C.W / 2, C.W / 2]])
# 定义车轮形状
wheel = np.array([[-C.TR, -C.TR, C.TR, C.TR, -C.TR],
[C.TW / 4, -C.TW / 4, -C.TW / 4, C.TW / 4, C.TW / 4]])
# 复制车轮形状,用于各个轮子
rlWheel = wheel.copy()
rrWheel = wheel.copy()
frWheel = wheel.copy()
flWheel = wheel.copy()
# 构建旋转矩阵
Rot1 = np.array([[math.cos(yaw), -math.sin(yaw)],
[math.sin(yaw), math.cos(yaw)]])
Rot2 = np.array([[math.cos(steer), math.sin(steer)],
[-math.sin(steer), math.cos(steer)]])
# 根据转向角度旋转前轮
frWheel = np.dot(Rot2, frWheel)
flWheel = np.dot(Rot2, flWheel)
# 调整前轮位置
frWheel += np.array([[C.WB], [-C.WD / 2]])
flWheel += np.array([[C.WB], [C.WD / 2]])
rrWheel[1, :] -= C.WD / 2
rlWheel[1, :] += C.WD / 2
# 根据车辆朝向旋转前轮和车轮轮廓
frWheel = np.dot(Rot1, frWheel)
flWheel = np.dot(Rot1, flWheel)
rrWheel = np.dot(Rot1, rrWheel)
rlWheel = np.dot(Rot1, rlWheel)
car = np.dot(Rot1, car)
# 平移车辆和车轮位置
frWheel += np.array([[x], [y]])
flWheel += np.array([[x], [y]])
rrWheel += np.array([[x], [y]])
rlWheel += np.array([[x], [y]])
car += np.array([[x], [y]])
# 绘制车辆和车轮
plt.plot(car[0, :], car[1, :], color)
plt.plot(frWheel[0, :], frWheel[1, :], color)
plt.plot(rrWheel[0, :], rrWheel[1, :], color)
plt.plot(flWheel[0, :], flWheel[1, :], color)
plt.plot(rlWheel[0, :], rlWheel[1, :], color)
draw.Arrow(x, y, yaw, C.WB * 0.8, color)
这段程序定义了一个函数 draw_car(x, y, yaw, steer, color='black')
,用于在一个matplotlib图中绘制一个表示汽车的简化形状,包括车身和车轮。这个函数的参数描述了汽车的位置、朝向和方向盘转角。
分步解释如下:
①. 首先,定义了一个表示汽车车身的多边形 car
,以及一个表示车轮的多边形 wheel
。
②. 对每个车轮进行复制,得到右前轮(frWheel
)、左前轮(flWheel
)、右后轮(rrWheel
)、左后轮(rlWheel
)的形状。接着应用旋转矩阵 Rot2
,这个矩阵根据方向盘的转角 steer
进行旋转。
③. 对前轮进行偏移以模拟前悬挂,后轮则稍微下移以模拟后悬挂。
④. 接下来,使用旋转矩阵 Rot1
将轮子和车身的多边形进行方向和朝向的旋转。
⑤. 最后,将车轮和车身的多边形根据车辆的位置 (x, y)
进行平移,以便正确绘制在图中。
⑥. 使用 plt.plot()
函数绘制车身和车轮的多边形,以及一个表示车头朝向的箭头。
综合来看,这段程序用于在matplotlib图中绘制一个简化的汽车形状,以展示汽车的位置、朝向和方向盘转角。这对于路径规划和控制算法的可视化非常有用,可以帮助理解算法的行为。
20 . design_obstacles函数
design_obstacles函数用于生成障碍物的坐标。障碍物由一系列的点坐标表示,这些点坐标表示了在给定的地图尺寸内障碍物的位置。
def design_obstacles(x, y):
ox, oy = [], []
for i in range(x):
ox.append(i)
oy.append(0)
for i in range(x):
ox.append(i)
oy.append(y - 1)
for i in range(y):
ox.append(0)
oy.append(i)
for i in range(y):
ox.append(x - 1)
oy.append(i)
for i in range(10, 21):
ox.append(i)
oy.append(15)
for i in range(15):
ox.append(20)
oy.append(i)
for i in range(15, 30):
ox.append(30)
oy.append(i)
for i in range(16):
ox.append(40)
oy.append(i)
return ox, oy
详细介绍如下:
这个函数接受两个参数 x
和 y
,它们表示地图的宽度和高度。函数内部使用了两个空列表 ox
和 oy
来存储障碍物的 x 和 y 坐标。
①. 添加四条边界:通过循环,将地图的四条边界添加到障碍物坐标中,以防止车辆越过地图边界。
②. 添加额外的障碍物:使用一些额外的循环,将其他障碍物点的坐标添加到 ox
和 oy
中。这些额外的障碍物会在地图的不同位置生成,例如从 (10, 15) 到 (20, 15) 的一行障碍物,或者从 (20, 0) 到 (20, 14) 的一列障碍物等。
③. 返回 ox
和 oy
列表,其中包含了生成的障碍物的坐标。
这个函数用于为地图生成一组障碍物的坐标,这些坐标在规定的地图范围内,并且代表了地图上的实际障碍物位置。
21 . hybrid_astar_planning函数
hybrid_astar_planning函数实现了混合A*算法,根据起始点、目标点、障碍物、规划分辨率等信息进行路径规划。主循环中,它通过不断扩展节点、更新优先队列来寻找一条最优路径,如果成功找到路径,将返回路径。
def hybrid_astar_planning(sx, sy, syaw, gx, gy, gyaw, ox, oy, xyreso, yawreso):
# 将连续坐标转换为离散坐标
sxr, syr = round(sx / xyreso), round(sy / xyreso)
gxr, gyr = round(gx / xyreso), round(gy / xyreso)
syawr = round(rs.pi_2_pi(syaw) / yawreso)
gyawr = round(rs.pi_2_pi(gyaw) / yawreso)
# 创建起始节点和目标节点
nstart = Node(sxr, syr, syawr, 1, [sx], [sy], [syaw], [1], 0.0, 0.0, -1)
ngoal = Node(gxr, gyr, gyawr, 1, [gx], [gy], [gyaw], [1], 0.0, 0.0, -1)
# 创建KD树以加速障碍物查询
kdtree = kd.KDTree([[x, y] for x, y in zip(ox, oy)])
# 计算规划参数P
P = calc_parameters(ox, oy, xyreso, yawreso, kdtree)
# 使用A*算法计算启发式地图
hmap = astar.calc_holonomic_heuristic_with_obstacle(ngoal, P.ox, P.oy, P.xyreso, 1.0)
# 计算转向和方向的集合
steer_set, direc_set = calc_motion_set()
# 初始化open_set、closed_set和优先队列
open_set, closed_set = {calc_index(nstart, P): nstart}, {}
qp = QueuePrior()
qp.put(calc_index(nstart, P), calc_hybrid_cost(nstart, hmap, P))
# 开始主循环
while True:
if not open_set:
return None # 无解情况
ind = qp.get()
n_curr = open_set[ind]
closed_set[ind] = n_curr
open_set.pop(ind)
# 尝试使用解析扩展更新节点
update, fpath = update_node_with_analystic_expantion(n_curr, ngoal, P)
if update:
fnode = fpath
break
# 遍历转向和方向的集合
for i in range(len(steer_set)):
# 计算下一个节点
node = calc_next_node(n_curr, ind, steer_set[i], direc_set[i], P)
if not node:
continue
node_ind = calc_index(node, P)
if node_ind in closed_set:
continue
if node_ind not in open_set:
open_set[node_ind] = node
qp.put(node_ind, calc_hybrid_cost(node, hmap, P))
else:
if open_set[node_ind].cost > node.cost:
open_set[node_ind] = node
qp.put(node_ind, calc_hybrid_cost(node, hmap, P))
# 提取并返回路径
return extract_path(closed_set, fnode, nstart)
下面是程序的详细解释:
(1). 坐标映射和节点初始化:
首先,将起始点和目标点的连续坐标映射到离散坐标,并创建起始节点(nstart
)和目标节点(ngoal
)。
sxr, syr = round(sx / xyreso), round(sy / xyreso)
gxr, gyr = round(gx / xyreso), round(gy / xyreso)
syawr = round(rs.pi_2_pi(syaw) / yawreso)
gyawr = round(rs.pi_2_pi(gyaw) / yawreso)
nstart = Node(sxr, syr, syawr, 1, [sx], [sy], [syaw], [1], 0.0, 0.0, -1)
ngoal = Node(gxr, gyr, gyawr, 1, [gx], [gy], [gyaw], [1], 0.0, 0.0, -1)
(2). 构建KD树和参数计算:
创建KD树以便快速检索障碍物的位置,然后使用给定的障碍物坐标和其他规划参数来计算一些必要的参数。
kdtree = kd.KDTree([[x, y] for x, y in zip(ox, oy)])
P = calc_parameters(ox, oy, xyreso, yawreso, kdtree)
(3). 计算启发式地图:
使用A*算法计算启发式地图,以便在路径规划中提供启发式估计。
hmap = astar.calc_holonomic_heuristic_with_obstacle(ngoal, P.ox, P.oy, P.xyreso, 1.0)
(4). 转向和方向集合的计算:
计算用于转向和方向的集合,以便在规划中使用。
steer_set, direc_set = calc_motion_set()
(5). 初始化集合和优先队列:
初始化open_set
和closed_set
集合,将起始节点添加到open_set
中,并初始化优先队列qp
。
open_set, closed_set = {calc_index(nstart, P): nstart}, {}
qp = QueuePrior()
qp.put(calc_index(nstart, P), calc_hybrid_cost(nstart, hmap, P))
(6). 主循环:
开始主循环,直到找到路径或无法继续扩展节点为止。在循环中,从open_set
中选择当前成本最低的节点(由qp
队列提供)作为当前节点,然后进行以下操作:
- 尝试使用解析扩展(比如采用Reeds sheep曲线)来更新当前节点(update_node_with_analystic_expantion
函数)。
- 对每种转向和方向,计算下一个节点,并根据情况将其添加到open_set
或更新其成本。
while True:
if not open_set:
return None
ind = qp.get()
n_curr = open_set[ind]
closed_set[ind] = n_curr
open_set.pop(ind)
update, fpath = update_node_with_analystic_expantion(n_curr, ngoal, P)
if update:
fnode = fpath
break
for i in range(len(steer_set)):
node = calc_next_node(n_curr, ind, steer_set[i], direc_set[i], P)
if not node:
continue
node_ind = calc_index(node, P)
if node_ind in closed_set:
continue
if node_ind not in open_set:
open_set[node_ind] = node
qp.put(node_ind, calc_hybrid_cost(node, hmap, P))
else:
if open_set[node_ind].cost > node.cost:
open_set[node_ind] = node
qp.put(node_ind, calc_hybrid_cost(node, hmap, P))
(7). 返回路径:
如果找到路径,则从closed_set
中提取路径并返回。
return extract_path(closed_set, fnode, nstart)
这个程序实现了混合A*算法的核心逻辑,它从起始点出发,通过扩展节点、更新节点和计算代价等步骤,最终找到连接起始点和目标点的路径。
22 . 主函数main
主函数main用于调用Hybrid A*算法来规划车辆的路径,并在matplotlib中绘制车辆的轨迹和障碍物
def main():
print("start!")
x, y = 51, 31 # 地图的宽度和高度
sx, sy, syaw0 = 10.0, 7.0, np.deg2rad(120.0) # 起始点的x,y坐标和偏航角
gx, gy, gyaw0 = 45.0, 20.0, np.deg2rad(90.0) # 目标点的x,y坐标和偏航角
# 生成障碍物的坐标
ox, oy = design_obstacles(x, y)
t0 = time.time()
# 使用Hybrid A*算法进行路径规划
path = hybrid_astar_planning(sx, sy, syaw0, gx, gy, gyaw0,
ox, oy, C.XY_RESO, C.YAW_RESO)
t1 = time.time()
print("running T: ", t1 - t0)
if not path:
print("Searching failed!")
return
# 提取路径的信息
x = path.x
y = path.y
yaw = path.yaw
direction = path.direction
# 在每个时刻绘制车辆的轨迹和障碍物
for k in range(len(x)):
plt.cla()
plt.plot(ox, oy, "sk")
plt.plot(x, y, linewidth=1.5, color='r')
# 计算当前时刻的方向和转向角
if k < len(x) - 2:
dy = (yaw[k + 1] - yaw[k]) / C.MOVE_STEP
steer = rs.pi_2_pi(math.atan(-C.WB * dy / direction[k]))
else:
steer = 0.0
# 绘制车辆
draw_car(gx, gy, gyaw0, 0.0, 'dimgray') # 绘制目标点
draw_car(x[k], y[k], yaw[k], steer) # 绘制当前车辆状态
plt.title("Hybrid A*")
plt.axis("equal")
plt.pause(0.0001)
plt.show()
print("Done!")
if __name__ == '__main__':
main()
这段代码首先定义了一个地图的尺寸 x
和 y
,然后指定了起始点 sx
, sy
以及初始偏航角 syaw0
,还有目标点 gx
, gy
以及目标偏航角 gyaw0
。
接下来,它通过 design_obstacles
函数生成了障碍物的坐标,然后记录了算法的开始时间 t0
,运行 hybrid_astar_planning
函数进行路径规划,记录结束时间 t1
,然后计算并打印了算法运行时间。
如果路径规划成功(path
不为空),程序进入循环,逐步绘制路径规划的过程。在每个循环迭代中,程序会清空当前图形 (plt.cla()
),绘制障碍物和规划路径,计算当前车辆的转向角并绘制当前车辆的位置。然后,程序暂停一小段时间以便观察可视化效果。
最后,程序显示可视化结果,绘制出规划路径以及障碍物,然后输出 “Done!” 表示程序完成运行。
这个主程序通过调用混合A*算法来规划车辆路径,并通过matplotlib库进行可视化展示,以便观察路径规划过程和结果。
有了上面对Hybrid A * 算法源码的阅读及理解,我们再来对Hybrid A * 算法的核心流程进行概括总结,来更好的理解Hybrid A * 算法。
(1)首先,将得到的起始点、目标点的位姿信息,以及环境中障碍物的信息处理成我们需要的格式,并得到一个包含障碍物信息的栅格地图obsmap ,为后续的规划做准备。
(2)然后在栅格地图obsmap上,调用传统的 A * 算法,从目标点开始不断地进行拓展(往open_set中添加节点),同时将扩展过的节点从open_set移到closed_set中。直至open_set为空,此时栅格地图obsmap中所有非障碍物的节点都被拓展过了,存放在closed_set中,且这些节点中都储存了从目标点到该节点的实际距离(代价值),并将代价值存储在一个地图中,至此,我们得到了一个启发式地图hmap,对于障碍物点启发式地图hmap中对应位置为inf,否则则存放从该点到达目标点的实际代价值,作为Hybrid A * 算法中考虑障碍物但不考虑动力学约束的启发式函数值。
(3)将开集合open_set初始化为起始点,闭集合初始化为空集,并初始化优先级队列,以节点的总代价值作为优先级评判依据,总代价值越低,优先级越高,即每次从open_set中弹出总代价值最小的节点作为拓展点。至此,准备工作已完成。
(4)开始主循环,循环执行以下步骤,直至找到路径或open_set集合为空时,结束循环
① 判断open_set集合是否为空,若为空,则结束循环,规划失败。若不为空,则执行下一步
②、选取本轮循环拓展点,从open_set集合弹出总代价值最小的节点作为本轮循环的拓展点,并将该点添加到闭集合closed_set中(表示该节点已经拓展过了,或者即将进行扩展),
③、以一定的概率尝试调用Reeds_sheep算法查找48种reeds_sheep曲线中从当前扩展点到目标点的所有可行曲线,若不存在可行曲线,则继续下一步,若存在可行曲线,则对所有可行曲线的代价值进行计算(前面介绍的程序中这里的代价计算考虑了倒车惩罚代价、转向惩罚代价、角度变换惩罚代价等各种惩罚值),然后将其存放在一个优先级队列中,然后依次从该优先级队列中选取代价值最小的路径,采样路径点,进行碰撞检查,直至找到通过碰撞检查的路径或所有的路径均没有通过碰撞检测。对于前者的情形,我们成功找到了从当前拓展点到目标点的路径,将该路径与从当前拓展点回溯到起点所得到的路径拼接起来,即得到了从起始点到目标点的可行路径,停止循环。对于后者的情况,说明没有找到从当前拓展点到目标点的无碰撞Reeds_sheep曲线,继续执行下一步。
④、对当前拓展点进行拓展,从左转的最大角度到右转的最大角度中选取n个采样点,即选取n个前轮转向角,对于每个转向角,再考虑车辆前进后退两种运动形式,即在当前拓展点处朝着2n个可能的方向进行拓展。
对于每个方向均执行以下操作:
沿着该方向向前模拟一段路径,判断是否超出地图边界或者与障碍物相交,若是则放弃向该方向拓展,若不是,则计算该方向模拟路径的代价值(这里的代价值同样可以考虑倒车惩罚、切换方向惩罚、转向角惩罚等),从而得到从当前拓展点到该模拟路径末端节点的成本。
接下来检查该末端节点所属的栅格是否已经闭集合closed_set中,若是则说明该栅格早就已经拓展完成了,不用再加入到open_set中。 若不是,则进一步判断其是否在open_set中,若不是,则计算该节点的总代价值(从起点到该节点已花费的实际代价+从该节点到目标点预估启发代价),并将其添加到open_set集合中,若其已经在open_set集合中,则判断从起点到该末端节点的代价值是否小于从起点到该栅格中存储的之前的末端节点的代价值,若是,则对该栅格中存储的末端节点及代价值进行更新,否则,不更新。
对所有方向执行完以上操作后,当前拓展点的拓展就完成了,同时本轮循环也完成了,跳转到第①步,继续进行下一轮循环。
(5)进行目标点开始进行路径回溯,提取出从起点到目标点的可行路径。
需要注意的是以上的流程仅仅是Hybrid A * 算法的某一种实现方案,其结合了第一部分的理论介绍和第二部分的python源码,所以其与第二部分源码的流程不完全吻合。对于某些应用场景下,以上流程需要进行更改,比如在高动态的环境下,障碍物信息必然会频繁的变化,上述流程中第(2)步一次性计算出整个地图的考虑障碍物但不考虑动力学约束的启发式函数值的做法效率较低,再比如不同的环境下,适合用不同的启发式函数方案,上面的python源码启发式函数仅使用了考虑障碍物但不考虑动力学约束的启发式函数,没有综合考虑动力学当不考虑障碍物的启发式函数。
参考资料:
1、深蓝学院-移动机器人运动规划
2、zhm_real/MotionPlanning运动规划库