双向A*算法

news2024/10/6 14:28:15

前面看最佳路径优先搜索算法的时候顺便研究了一下它的改进算法:双向最佳路径优先搜索算法。那既然有双向最佳路径优先搜索算法自然也可以有双向A* 算法。这篇文章简单看一下双向A*算法的基本原理以及代码实现。

基本原理

双向A* 算法是一种用于解决图搜索问题的启发式搜索算法。它是A* 算法的一种改进,旨在减少搜索的节点数量,从而提高搜索的效率。

双向A*算法同时从起点和终点开始搜索,分别利用启发式函数来评估节点的优先级。在每一步中,从两个方向选择具有最低优先级的节点进行扩展。当两个搜索方向的搜索路径相交时,即找到了一条从起点到终点的路径。

相比于普通的A算法,双向A算法可以显著减少搜索的节点数量,特别是在图搜索问题中,当图的规模较大时,搜索的效率提高得更为明显。它通过同时从起点和终点搜索,利用双向信息来减少不必要的搜索。

其基本思路如下:

1.初始化起点和终点,并分别给它们分配初始的启发式函数值(即估计从起点到终点的距离)。

2.初始化起点和终点的优先级队列,分别用来存放待扩展的节点。

3.从起点和终点同时开始搜索,每次选择优先级最低的节点进行扩展。

4.对于当前被选择的节点,计算并更新它的启发式函数值、代价值(即从起点到该节点的实际路径长度)以及总的估计值(即启发式函数值加上代价值)。

5.在每一次扩展节点时,检查是否有节点在另一个方向上已经被访问过,如果存在交叉节点,则找到一条路径并返回。

6.如果没有交叉节点,继续选择下一个优先级最低的节点进行扩展,直到找到路径或者搜索完所有的节点。

7.如果搜索完所有的节点都没有找到路径,则说明起点和终点之间不存在可达路径。

代码实现

简单实现代码如下:

import os
import sys
import math
import heapq
import matplotlib.pyplot as plt



class BidirectionalAStar:
    def __init__(self, s_start, s_goal, heuristic_type,xI, xG):
        self.s_start = s_start
        self.s_goal = s_goal
        self.heuristic_type = heuristic_type
        self.motions = [(-1, 0), (-1, 1), (0, 1), (1, 1),
                        (1, 0), (1, -1), (0, -1), (-1, -1)]
        self.u_set = self.motions  # feasible input set
        self.obs = self.obs_map()  # position of obstacles

        self.OPEN_fore = []  # OPEN set for forward searching
        self.OPEN_back = []  # OPEN set for backward searching
        self.CLOSED_fore = []  # CLOSED set for forward
        self.CLOSED_back = []  # CLOSED set for backward
        self.PARENT_fore = dict()  # recorded parent for forward
        self.PARENT_back = dict()  # recorded parent for backward
        self.g_fore = dict()  # cost to come for forward
        self.g_back = dict()  # cost to come for backward
        self.x_range = 51  # size of background
        self.y_range = 31
        
        
        self.xI, self.xG = xI, xG
        self.obs = self.obs_map()
    def init(self):
        """
        initialize parameters
        """

        self.g_fore[self.s_start] = 0.0
        self.g_fore[self.s_goal] = math.inf
        self.g_back[self.s_goal] = 0.0
        self.g_back[self.s_start] = math.inf
        self.PARENT_fore[self.s_start] = self.s_start
        self.PARENT_back[self.s_goal] = self.s_goal
        heapq.heappush(self.OPEN_fore,
                       (self.f_value_fore(self.s_start), self.s_start))
        heapq.heappush(self.OPEN_back,
                       (self.f_value_back(self.s_goal), self.s_goal))


    def obs_map(self):
        """
        Initialize obstacles' positions
        :return: map of obstacles
        """

        x = 51
        y = 31
        obs = set()

        for i in range(x):
            obs.add((i, 0))
        for i in range(x):
            obs.add((i, y - 1))

        for i in range(y):
            obs.add((0, i))
        for i in range(y):
            obs.add((x - 1, i))

        for i in range(10, 21):
            obs.add((i, 15))
        for i in range(15):
            obs.add((20, i))

        for i in range(15, 30):
            obs.add((30, i))
        for i in range(16):
            obs.add((40, i))

        return obs


    def animation_bi_astar(self, path, v_fore, v_back, name):
        self.plot_grid(name)
        self.plot_visited_bi(v_fore, v_back)
        self.plot_path(path)
        plt.show()

    def plot_grid(self, name):
        obs_x = [x[0] for x in self.obs]
        obs_y = [x[1] for x in self.obs]

        plt.plot(self.xI[0], self.xI[1], "bs")
        plt.plot(self.xG[0], self.xG[1], "gs")
        plt.plot(obs_x, obs_y, "sk")
        plt.title(name)
        plt.axis("equal")

    def plot_visited(self, visited, cl='gray'):
        if self.xI in visited:
            visited.remove(self.xI)

        if self.xG in visited:
            visited.remove(self.xG)

        count = 0

        for x in visited:
            count += 1
            plt.plot(x[0], x[1], color=cl, marker='o')
            plt.gcf().canvas.mpl_connect('key_release_event',
                                         lambda event: [exit(0) if event.key == 'escape' else None])

            if count < len(visited) / 3:
                length = 20
            elif count < len(visited) * 2 / 3:
                length = 30
            else:
                length = 40
            #
            # length = 15

            if count % length == 0:
                plt.pause(0.001)
        plt.pause(0.01)

    def plot_path(self, path, cl='r', flag=False):
        path_x = [path[i][0] for i in range(len(path))]
        path_y = [path[i][1] for i in range(len(path))]

        if not flag:
            plt.plot(path_x, path_y, linewidth='3', color='r')
        else:
            plt.plot(path_x, path_y, linewidth='3', color=cl)

        plt.plot(self.xI[0], self.xI[1], "bs")
        plt.plot(self.xG[0], self.xG[1], "gs")

        plt.pause(0.01)

    def plot_visited_bi(self, v_fore, v_back):
        if self.xI in v_fore:
            v_fore.remove(self.xI)

        if self.xG in v_back:
            v_back.remove(self.xG)

        len_fore, len_back = len(v_fore), len(v_back)

        for k in range(max(len_fore, len_back)):
            if k < len_fore:
                plt.plot(v_fore[k][0], v_fore[k][1], linewidth='3', color='gray', marker='o')
            if k < len_back:
                plt.plot(v_back[k][0], v_back[k][1], linewidth='3', color='cornflowerblue', marker='o')

            plt.gcf().canvas.mpl_connect('key_release_event',
                                         lambda event: [exit(0) if event.key == 'escape' else None])

            if k % 10 == 0:
                plt.pause(0.001)
        plt.pause(0.01)


    def searching(self):
        """
        Bidirectional A*
        :return: connected path, visited order of forward, visited order of backward
        """

        self.init()
        s_meet = self.s_start

        while self.OPEN_fore and self.OPEN_back:
            # solve foreward-search
            _, s_fore = heapq.heappop(self.OPEN_fore)

            if s_fore in self.PARENT_back:
                s_meet = s_fore
                break
            #前向路径走过的点,用于可视化
            self.CLOSED_fore.append(s_fore)

            for s_n in self.get_neighbor(s_fore):
                new_cost = self.g_fore[s_fore] + self.cost(s_fore, s_n)

                if s_n not in self.g_fore:
                    self.g_fore[s_n] = math.inf

                if new_cost < self.g_fore[s_n]:
                    self.g_fore[s_n] = new_cost
                    self.PARENT_fore[s_n] = s_fore
                    heapq.heappush(self.OPEN_fore,
                                   (self.f_value_fore(s_n), s_n))

            # solve backward-search
            _, s_back = heapq.heappop(self.OPEN_back)

            if s_back in self.PARENT_fore:
                s_meet = s_back
                break

            self.CLOSED_back.append(s_back)

            for s_n in self.get_neighbor(s_back):
                new_cost = self.g_back[s_back] + self.cost(s_back, s_n)

                if s_n not in self.g_back:
                    self.g_back[s_n] = math.inf

                if new_cost < self.g_back[s_n]:
                    self.g_back[s_n] = new_cost
                    self.PARENT_back[s_n] = s_back
                    heapq.heappush(self.OPEN_back,
                                   (self.f_value_back(s_n), s_n))

        return self.extract_path(s_meet), self.CLOSED_fore, self.CLOSED_back

    def get_neighbor(self, s):
        """
        find neighbors of state s that not in obstacles.
        :param s: state
        :return: neighbors
        """

        return [(s[0] + u[0], s[1] + u[1]) for u in self.u_set]

    def extract_path(self, s_meet):
        """
        extract path from start and goal
        :param s_meet: meet point of bi-direction a*
        :return: path
        """

        # extract path for foreward part
        path_fore = [s_meet]
        s = s_meet

        while True:
            s = self.PARENT_fore[s]
            path_fore.append(s)
            if s == self.s_start:
                break

        # extract path for backward part
        path_back = []
        s = s_meet

        while True:
            s = self.PARENT_back[s]
            path_back.append(s)
            if s == self.s_goal:
                break

        return list(reversed(path_fore)) + list(path_back)

    def f_value_fore(self, s):
        """
        forward searching: f = g + h. (g: Cost to come, h: heuristic value)
        :param s: current state
        :return: f
        """

        return self.g_fore[s] + self.h(s, self.s_goal)

    def f_value_back(self, s):
        """
        backward searching: f = g + h. (g: Cost to come, h: heuristic value)
        :param s: current state
        :return: f
        """

        return self.g_back[s] + self.h(s, self.s_start)

    def h(self, s, goal):
        """
        Calculate heuristic value.
        :param s: current node (state)
        :param goal: goal node (state)
        :return: heuristic value
        """

        heuristic_type = self.heuristic_type

        if heuristic_type == "manhattan":
            return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
        else:
            return math.hypot(goal[0] - s[0], goal[1] - s[1])

    def cost(self, s_start, s_goal):
        """
        Calculate Cost for this motion
        :param s_start: starting node
        :param s_goal: end node
        :return:  Cost for this motion
        :note: Cost function could be more complicate!
        """

        if self.is_collision(s_start, s_goal):
            return math.inf

        return math.hypot(s_goal[0] - s_start[0], s_goal[1] - s_start[1])

    def is_collision(self, s_start, s_end):
        """
        check if the line segment (s_start, s_end) is collision.
        :param s_start: start node
        :param s_end: end node
        :return: True: is collision / False: not collision
        """

        if s_start in self.obs or s_end in self.obs:
            return True

        if s_start[0] != s_end[0] and s_start[1] != s_end[1]:
            if s_end[0] - s_start[0] == s_start[1] - s_end[1]:
                s1 = (min(s_start[0], s_end[0]), min(s_start[1], s_end[1]))
                s2 = (max(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
            else:
                s1 = (min(s_start[0], s_end[0]), max(s_start[1], s_end[1]))
                s2 = (max(s_start[0], s_end[0]), min(s_start[1], s_end[1]))

            if s1 in self.obs or s2 in self.obs:
                return True

        return False


def main():
    x_start = (5, 5)
    x_goal = (45, 25)

    bastar = BidirectionalAStar(x_start, x_goal, "euclidean",x_start,x_goal)
    
    path, visited_fore, visited_back = bastar.searching()
    bastar.animation_bi_astar(path, visited_fore, visited_back, "Bidirectional-A*")  # animation


if __name__ == '__main__':
    main()

代码讲解

首先在程序开始时先初始了两个字典

        self.PARENT_fore[self.s_start] = self.s_start
        self.PARENT_back[self.s_goal] = self.s_goal

然后再初始化了两个堆栈:

        heapq.heappush(self.OPEN_fore,
                       (self.f_value_fore(self.s_start), self.s_start))
        heapq.heappush(self.OPEN_back,
                       (self.f_value_back(self.s_goal), self.s_goal))

两个字典是用于存放走过的路径,前者存放从起点往终点搜索的路径,后者存放从终点往起点搜索过的路径。字典的作用是用于回朔路径,即该点是从哪个点索引过来的。比如说从起点搜索经过点1,点1搜索四周时到达点4,点4搜索时又到达点8,则可以根据回朔找到回去的路径8->4->1。

两个堆栈的作用是用于对每个点的代价值进行排序,因为A* 本身搜索的时候是根据每个点的代价值作为条件的,优先搜索代价值最小的点。所以使用两个堆栈进行点的排序。堆栈可以对每个点的值按照从大到小向上排序,所以需要取代价值最小的点只需要栈顶出栈就可以了。

接下来对起点与终点周围的点进行遍历,计算每个点的代价值并存放到两个堆栈中,并将经过的点排入到字典中。

            for s_n in self.get_neighbor(s_fore):
                new_cost = self.g_fore[s_fore] + self.cost(s_fore, s_n)

                if s_n not in self.g_fore:
                    self.g_fore[s_n] = math.inf

                if new_cost < self.g_fore[s_n]:
                    self.g_fore[s_n] = new_cost
                    self.PARENT_fore[s_n] = s_fore
                    heapq.heappush(self.OPEN_fore,
                                   (self.f_value_fore(s_n), s_n))
            for s_n in self.get_neighbor(s_back):
                new_cost = self.g_back[s_back] + self.cost(s_back, s_n)

                if s_n not in self.g_back:
                    self.g_back[s_n] = math.inf

                if new_cost < self.g_back[s_n]:
                    self.g_back[s_n] = new_cost
                    self.PARENT_back[s_n] = s_back
                    heapq.heappush(self.OPEN_back,
                                   (self.f_value_back(s_n), s_n))

这里注意前后两个点的代价值计算方式是不一样的,前者的计算为:

    def f_value_fore(self, s):
        return self.g_fore[s] + self.h(s, self.s_goal)
    def h(self, s, goal):
        heuristic_type = self.heuristic_type
        if heuristic_type == "manhattan":
            return abs(goal[0] - s[0]) + abs(goal[1] - s[1])
        else:
            return math.hypot(goal[0] - s[0], goal[1] - s[1])

后者的计算为:

    def f_value_back(self, s):
        return self.g_back[s] + self.h(s, self.s_start)

此时判断堆栈中是否为空,如果不为空的话就循环出栈对周围进行搜索,直到起点的搜索点位出现再终点所搜索过的字典中或者终点的搜索点位出现在起点所搜索过的字典中,则说明两边的点位相接触了,已经找到了最终的路径:

            if s_fore in self.PARENT_back:
                s_meet = s_fore
                break
            if s_back in self.PARENT_fore:
                s_meet = s_back
                break

然后再根据两个字典向两边搜索得到最终的路径即可:

    def extract_path(self, s_meet):
        # extract path for foreward part
        path_fore = [s_meet]
        s = s_meet
        while True:
            s = self.PARENT_fore[s]
            path_fore.append(s)
            if s == self.s_start:
                break
        # extract path for backward part
        path_back = []
        s = s_meet

        while True:
            s = self.PARENT_back[s]
            path_back.append(s)
            if s == self.s_goal:
                break
        return list(reversed(path_fore)) + list(path_back)

结果

在这里插入图片描述
需要注意的是:双向A* 算法的优势在于它可以从起点和终点同时进行搜索,通过双向信息的利用减少搜索的节点数量,提高搜索效率。然而,它也需要更多的空间来存储两个方向的搜索路径和节点信息。因为单向A* 只需要一个字典一个堆栈就可以完成遍历,但是双向A* 需要维护两个字典与堆栈。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/947564.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

供水营业收费管理系统:智慧水务的得力助手

随着我国经济的快速发展&#xff0c;城市化进程不断加快&#xff0c;供水行业的需求也不断增长。为满足人们日益增长的用水需求&#xff0c;提高供水企业的管理水平和服务质量&#xff0c;供水营业收费管理系统应运而生&#xff0c;成为智慧水务的得力助手。 一、供水营业收费管…

算法通关村-----哈希和队列的基本知识

哈希概念 哈希也称为散列&#xff0c;就是把任意长度的输入&#xff0c;通过散列算法&#xff0c;变成固定长度的输出&#xff0c;这个输出值就是散列值。 哈希存储 现在有1&#xff0c;2&#xff0c;3…15&#xff0c;要将其存储到大小为7的哈希表中&#xff0c;应该如何存…

Android studio实现水平进度条

原文 ProgressBar 用于显示某个耗时操作完成的百分比的组件称为进度条。ProgressBar默认产生圆形进度条。 实现效果图&#xff1a; MainActivity import android.os.Bundle; import android.view.View; import android.app.Activity; import android.widget.Button; import…

算法 稀疏数组 数组优化 数组压缩 二维数组转稀疏数组 算法合集(二)

1. 五子棋游戏&#xff0c;玩家对战一半停战休息&#xff0c;此时需要存储当前对战双方棋子信息 a. 采用二维数组存储&#xff1a; 0为空&#xff0c; 1代表黑棋 2代表蓝色棋子 b. 棋盘为11行&#xff0c;11列 > int [][] chessArray new int [11][11]; c. 出现的问题&am…

RT_Thread内核机制学习(五)邮箱

之所以引入线程间通信&#xff0c;是为了实现互斥&#xff0c;休眠-唤醒。 队列可以指定消息的大小、个数&#xff0c;存放消息&#xff0c;取出消息时都是由rt_memcpy()实现。 邮箱 保存数据的核心在于数组&#xff0c;只能存放unsigned long类型数据&#xff0c;数据存取、…

Acwing798.差分矩阵

前缀和与差分 图文并茂 超详细整理&#xff08;全网最通俗易懂&#xff09;_前缀和差分_林小鹿的博客-CSDN博客 代码展示&#xff1a; #include<iostream> #include<cstdio> using namespace std; const int N 1e3 10; int a[N][N], b[N][N]; void insert(int x…

在iPhone 15发布之前,iPhone在智能手机出货量上占据主导地位,这对安卓来说是个坏消息

可以说这是一记重拳&#xff0c;但似乎没有一个有价值的竞争者能与苹果今年迄今为止的智能手机出货量相媲美。 事实上&#xff0c;根据Omdia智能手机型号市场跟踪机构收集的数据&#xff0c;苹果的iPhone占据了前四名。位居榜首的是iPhone 14 Pro Max&#xff0c;2023年上半年…

Python Qt学习(五)Checkbox

源码 # -*- coding: utf-8 -*-# Form implementation generated from reading ui file qt_checkbox.ui # # Created by: PyQt5 UI code generator 5.15.9 # # WARNING: Any manual changes made to this file will be lost when pyuic5 is # run again. Do not edit this fil…

ATF(TF-A)安全通告 TFV-3 (CVE-2017-7563)

安全之安全(security)博客目录导读 ATF(TF-A)安全通告汇总 目录 一、ATF(TF-A)安全通告 TFV-3 (CVE-2017-7563) 二、CVE-2017-7563 一、ATF(TF-A)安全通告 TFV-3 (CVE-2017-7563) Title RO内存始终在AArch64 Secure EL1下可执行 CVE ID CVE-2017-7563 Date 06 Apr 2017 …

字符设备驱动框架解析

一、字符设备驱动框架解析 设备的操作函数如果比喻是桩的话&#xff08;性质类似于设备操作函数的函数&#xff0c;在一些场合被称为桩函数&#xff09;&#xff0c;则&#xff1a; 驱动实现设备操作函数 ----------- 做桩 insmod调用的init函数主要作用 --------- 钉桩 rm…

LinearAlgebraMIT_11_MatrixSpace/Rank==1‘sMatrix/SmallWorldGraph

x.1 矩阵空间 向量空间定义&#xff1a;满足加法和数乘的封闭性。就类似向量空间一样&#xff0c;也存在着矩阵空间的定义。举个例子&#xff0c;例如所有的3x3的矩阵构成的矩阵空间M&#xff0c;它的纬度就是9&#xff0c;如[1, 0, …], [0, 1, …]。对于M中所有对称矩阵组成…

Ansible学习笔记3

ansible模块&#xff1a; ansible是基于模块来工作的&#xff0c;本身没有批量部署的能力&#xff0c;真正具有批量部署的是ansible所运行的模块&#xff0c;ansible只是提供一个框架。 ansible支持的模块非常多&#xff0c;我们并不需要把每个模块记住&#xff0c;而只需要熟…

Ubuntu20以上高版本如何安装低版本GCC

安装了Ubuntu 20.04之后&#xff0c;通过命令行 sudo apt-get install build-essential安装gcc&#xff0c;再通过命令行 gcc -v可查看gcc版本为gcc13 如果想用低版本的gcc&#xff0c;比如gcc4.8&#xff0c;尝试输入命令 sudo apt-get install gcc-4.8会提示找不到gcc4.8的…

胡歌深夜发文:我对不起好多人

胡歌的微博又上了热搜。 8月29日01:18分&#xff0c;胡歌微博发文称&#xff1a;“我尽量保持冷静&#xff0c;我对不起好多人&#xff0c;我希望对得起这短暂的一生”&#xff0c;并配了一张自己胡子拉碴的图&#xff0c;右眼的伤疤清晰可见。 不少网友留言称“哥你又喝多了吗…

基于Java+SpringBoot+Vue前后端分离教师工作量管理系统设计和实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

希尔排序(JAVA实例代码)

目录 希尔排序 一、概念及其介绍 二、适用说明 三、过程图示 四、Java 实例代码 ShellSort.java 文件代码&#xff1a; 希尔排序 一、概念及其介绍 希尔排序(Shell Sort)是插入排序的一种&#xff0c;它是针对直接插入排序算法的改进。 希尔排序又称缩小增量排序&#…

【2023年11月第四版教材】《第9章-范围管理》(第二部分)

《第9章-范围管理》&#xff08;第二部分&#xff09; 4 规划范围管理4.1 范围管理计划★★★ &#xff08;21下29&#xff09;4.2 需求管理计划★★★ &#xff08;22上27&#xff09; 5 收集需求5.1 数据收集★★★5.2 决策★★★5.3 数据表现★★★5.4 人际关系与团队技能★…

【pyqt5界面化开发-6】抽屉布局界面的开发

目录 0x01 前言&#xff1a; 一、封装的主窗口类 第一步&#xff1a;封装窗口类 第二步&#xff1a;添加抽屉界面 第三步&#xff1a;添加抽屉界面的相关布局 第四步&#xff1a;每一个抽屉界面的点击触发 二、封装的抽屉类 三、程序入口程序 四、完整代码 0x01 前言&…

Windows端口占用处理

端口被占用时&#xff0c;大部分是后台服务持续运行使用了某个端口。这样会导致我们使用了相同端口的新程序无法正常启动&#xff0c;我们需要找到端口被占用的应用程序&#xff0c;方法比较简单&#xff0c;如下以3000端口被占用为例&#xff1a; 1、打开windows的cmd命令行窗…

数据结构与算法基础-学习-30-插入排序之直接插入排序、二分插入排序、希尔排序

一、排序概念 将一组杂乱无章的数据按一定规律顺次排列起来。 将无序序列排成一个有序序列&#xff08;由小到大或由大到小&#xff09;的运算。 二、排序方法分类 1、按数据存储介质 名称描述内部排序数据量不大、数据在内存&#xff0c;无需内外交换存交换存储。外部排序…