本篇摘要
上一篇我们针对障碍物的“死胡同”问题进行了探索,本篇则针对障碍物生成在网格对角顶点处是否有意义以及路径周围存在障碍物时按照对角线移动是否合理两个问题进行探讨,这两个问题主要在于移动规则的限定,主要分为以下两种情况:
1.如果我们允许这种类似“穿墙”的行为。
2.障碍物之间并非完全连接而是留有一定的距离,并且这个距离允许我们通过。
当我们的移动规则包括上述其中一种情况时,我们在网格对角顶点处生成障碍物则没有意义,因为这么做并不能阻碍我们的移动,并且路径周围存在障碍物时按照对角线移动是被允许的,反之如果我们的移动规则中并不包括上述两种情况,那么我们就不允许周围存在障碍物时去沿着对角线移动,因为这么做就好比我们的半边身体从障碍物中穿过,如果我们允许这么做,那么就是对应着上述的第一种情况,此时在网格对角顶点处生成障碍物则有意义。
方案探讨
既然我们明确了这两个问题是由移动规则所定,且两个问题可以作为一个问题进行讨论,那么我们就有两个不同的答案,显然我们默认是第一个答案,而接下来我们就针对第二个答案进行讨论,也就是我们的移动规则中不包括“本篇摘要”中所述的两种情况。此时我们只需要在上一篇的预寻路中加入一个移动规则的判断,分为以下几种情况:
(1)从当前Node移动至下一个Node时,如果是向左上方移动,则需要判断当前Node的上方和左侧是否存在障碍物Node,若不存在则允许斜向移动,否则不允许;
(2)从当前Node移动至下一个Node时,如果是向右上方移动,则需要判断当前Node的上方和右侧是否存在障碍物Node,若不存在则允许斜向移动,否则不允许;
(3)从当前Node移动至下一个Node时,如果是向左下方移动,则需要判断当前Node的下方和左侧是否存在障碍物Node,若不存在则允许斜向移动,否则不允许;
(4)从当前Node移动至下一个Node时,如果是向右下方移动,则需要判断当前Node的下方和右侧是否存在障碍物Node,若不存在则允许斜向移动,否则不允许;
(5)从当前Node移动至下一个Node时,如果是向左或向上或向右或向下,并且无特殊的移动规则限定,那么无需经过检测,默认为允许移动;
当然,这种处理方式,应该与我们进行实际移动的规则保持一致,因为预寻路是对实际寻路的一种预测,我们拿它来判断BlockNode集的布置是否合理,那么相对的实际寻路时会有可能采用预寻路所合成的路线,为什么不肯定说二者路线一致呢?因为预寻路只要求从起始点到终点至少存在一条可行的路线即可,但是实际寻路时可能会有多条可行的路线,而我们目前的查找最佳路径的方式还需要改进,所以实际寻路也可能不是采用预寻路所合成的路线。
示例代码UML图
效果截图
代码示例(Python)
Grid.py
import matplotlib.pyplot as pp
import numpy as np
from myCodes import SharedData as sd
from myCodes import Node as node
# 视图的构建和绘制
class Grid:
# 横纵坐标轴的刻度标记
_xLabels = []
_yLabels = []
# 网格单元格的信息
_data = None
# 是否完成初始化
_isInit = False
def __init__(self, p_rowsCount=6, p_colsCount=6):
if self._isInit is False:
# 网格的行列数
self._rowsCount = p_rowsCount
self._colsCount = p_colsCount
# 将网格的行列数设置为共享信息
sd.SharedData.rowsCount = self._rowsCount
sd.SharedData.colsCount = self._colsCount
self._isInit = True
# 绘制图形
def Draw(self):
"""绘制图形"""
pp.rcParams['font.sans-serif'] = ['SimHei']
pp.title(sd.SharedData.blockInfo)
# 获取坐标轴实例
v_ax = pp.gca()
self._AxisSet(v_ax)
self._GridValueSet(v_ax)
# 将数据以二维图片的形式进行显示
v_ax.imshow(self._data, cmap='Accent', aspect='equal', vmin=0, vmax=255)
# 标记起始点和终点
self.Mark(sd.SharedNodes().GetStartNode(), 30, 'go')
self._PathNodeSet()
self.Mark(sd.SharedNodes().GetEndNode(), 30, 'ro')
# 布置网格线
pp.grid(visible=True, color='w')
pp.tight_layout()
pp.show()
# 标记所查找到的路径
def _PathNodeSet(self):
v_pathNodes = sd.SharedNodes().GetPathNodes()
v_length = len(v_pathNodes)
for i in range(v_length):
if 0 < i < v_length - 1:
self.Mark(v_pathNodes[i], 10, 'bo')
self._Arrow(v_pathNodes[i - 1], v_pathNodes[i])
elif i == v_length - 1:
self._Arrow(v_pathNodes[i - 1], v_pathNodes[i])
# 标记方法
@classmethod
def Mark(cls, p_node: node.Node, p_marksize: int, p_fmt: str):
"""
标记Node
**p_node**:表示待标记的Node
**p_marksize**:表示标记的尺寸大小
**p_fmt**:表示颜色和图形的样式描述
"""
v_x = p_node.nodePos.x
v_y = p_node.nodePos.y
pp.plot(v_x, v_y, p_fmt, markersize=p_marksize, zorder=1)
# 箭头指向方法
def _Arrow(self, p_firstNode, p_secondNode):
v_dx = p_secondNode.nodePos.x - p_firstNode.nodePos.x
v_dy = p_secondNode.nodePos.y - p_firstNode.nodePos.y
pp.arrow(p_firstNode.nodePos.x, p_firstNode.nodePos.y, v_dx, v_dy, color='orange', width=0.01, head_width=0.08,
zorder=3)
# 坐标轴设置
def _AxisSet(self, p_ax):
v_ax = p_ax
for i in range(1, self._colsCount + 1):
self._xLabels.append(str(i))
for i in range(1, self._rowsCount + 1):
self._yLabels.append(str(i))
# 隐藏刻度线
v_ax.tick_params(left=False, bottom=False, top=False, right=False)
# 生成Image Data
v_low = 1
if self._rowsCount > self._colsCount:
v_high = self._rowsCount
else:
v_high = self._colsCount
self._data = np.random.randint(v_low, v_high, size=(self._rowsCount + 1, self._colsCount + 1))
# 设置横纵坐标轴的范围
pp.xlim(1, self._colsCount)
pp.ylim(1, self._rowsCount)
# 设置坐标轴的刻度标记
v_ax.set_xticks(np.arange(self._colsCount), labels=self._xLabels, visible=False)
v_ax.set_yticks(np.arange(self._rowsCount), labels=self._yLabels, visible=False)
# 设置坐标轴的横纵轴比例相等
v_ax.set_aspect('equal')
# 网格内容设置
def _GridValueSet(self, p_ax):
v_ax = p_ax
for i in range(self._rowsCount + 1):
for j in range(self._colsCount + 1):
v_str = '(' + str(i + 0.5) + ',' + str(j + 0.5) + ')'
v_ax.text(i + 0.5, j + 0.5, v_str, ha='center', va='center', color='w')
Node.py
import random
from enum import Enum, unique
from myCodes import SharedData as sd
import threading as th
# Node坐标类
class NodePosition:
def __init__(self, p_x: float = 1, p_y: float = 1):
"""
Node坐标信息初始化
:param p_x: 横坐标值
:param p_y: 纵坐标值
"""
self.x = p_x
self.y = p_y
def __str__(self):
return '[' + str(self.x) + ',' + str(self.y) + ']'
# Node标签
@unique
class NodeTag(Enum):
PATHNODE = 1
BLOCKNODE = 2
# Node类
class Node:
def __init__(self, p_nodeName: str, p_nodePos: NodePosition):
"""
Node初始化
:param p_nodeName: Node名称
:param p_nodePos: Node的坐标信息
"""
self.nodeName = p_nodeName
self.nodePos = p_nodePos
# f=g+h,g代表从上一个点到该点的代价和,h代表从该点到终点的代价和,f代表总权值weight
self.f: int = 0
self.g: int = 0
self.h: int = 0
# Node的标签,用于判断是否为不可到达的Node,
self.nodeTag = NodeTag.PATHNODE
def SetWeight(self, p_f: int, p_g: int, p_h: int):
"""
设置Node的权值
:param p_f: 代表总权值weight, f = g + h
:param p_g: 代表从上一个点到该点的代价和
:param p_h: 代表从该点到终点的代价和
:return: 无返回值
"""
self.f = p_f
self.g = p_g
self.h = p_h
def __str__(self):
return '{nodeName:' + self.nodeName + ',nodeTag:' + self.nodeTag.name + ',nodePos:' + str(
self.nodePos) + ',f=' + str(
self.f) + ',g=' + str(
self.g) + ',h=' + str(self.h) + '}'
def __eq__(self, other):
if other is not None and self.nodePos.x == other.nodePos.x and self.nodePos.y == other.nodePos.y:
return True
return False
# Node工厂,用来创建和获取起始点Node和终点Node
class NodeFactory:
_lock = th.Lock()
_isCreateEndNode = False
_isCreateStartNode = False
_startNode: Node
_endNode: Node
# Node名称索引
_nameIndex = 1
# GenerateOneNode的Node索引
_rowIndex = 1
_colIndex = 1
_generateCount = 0
def __init__(self):
if self._isCreateStartNode is False:
self._CreateStartNode()
sd.SharedNodes().SetStartNode(self._startNode, 'SNSET')
if self._isCreateEndNode is False:
self._CreateEndNode()
sd.SharedNodes().SetEndNode(self._endNode, 'ENSET')
def __new__(cls, *args, **kwargs):
if hasattr(NodeFactory, '_instance') is False:
with cls._lock:
if hasattr(NodeFactory, '_instance') is False:
NodeFactory._instance = object.__new__(cls)
return NodeFactory._instance
# 创建起始点Node
def _CreateStartNode(self):
v_nodePos = NodePosition(0.5, 0.5)
self._startNode = Node('StartNode', v_nodePos)
self._isCreateStartNode = True
# 创建终点Node
def _CreateEndNode(self):
v_startNode = self._startNode
v_node = v_startNode
while v_node == v_startNode:
v_node = self.GenerateOneNode('EndNode', p_isRandom=True)
self._endNode = v_node
self._isCreateEndNode = True
def GenerateXYNodes(self, p_x=0, p_y=0, p_isRefX=True, p_isDefCheck=True):
"""
生成横坐标或纵坐标为指定参考值的Node集
:param p_x: 横坐标参考值
:param p_y: 纵坐标参考值
:param p_isRefX: 是否以横坐标为参考坐标,默认为True,若为False则以纵坐标为参考坐标
:param p_isDefCheck: 是否开启默认检测,默认为True,会自动排除起始点Node和终点Node
:return: 返回一个列表
"""
v_list = []
v_index = 1
v_startNode = self._startNode
v_endNode = self._endNode
if p_isRefX:
while True:
v_node = self.GenerateOneNode('Node' + str(v_index))
if v_node is None:
break
else:
v_j1 = True
if p_isDefCheck:
v_j1 = v_node != v_startNode and v_node != v_endNode
if v_j1 and v_node.nodePos.x == p_x:
v_list.append(v_node)
v_index += 1
else:
while True:
v_node = self.GenerateOneNode('Node' + str(v_index))
if v_node is None:
break
else:
v_j1 = True
if p_isDefCheck:
v_j1 = v_node != v_startNode and v_node != v_endNode
if v_j1 and v_node.nodePos.y == p_y:
v_list.append(v_node)
v_index += 1
return v_list
def GenerateNodes(self, p_count, p_isRandom=False):
"""
生成指定数量的非重复Node集
:param p_count: 生成的Node的数量
:param p_isRandom: 是否随机生成,默认为False
:return: 返回一个列表
"""
v_list = []
v_index = 1
v_startNode = self._startNode
v_endNode = self._endNode
while len(v_list) < p_count:
v_node = self.GenerateOneNode('Node' + str(v_index), p_isRandom=p_isRandom)
if v_node is None:
break
else:
v_isRepeat = NodeCheck.RepeatCheck(v_list, v_node)
if v_isRepeat is False and v_node != v_startNode and v_node != v_endNode:
v_list.append(v_node)
v_index += 1
return v_list
def GenerateOneNode(self, p_name: str, p_isRandom=False):
"""
生成一个指定名称的Node
:param p_name: 生成的Node的名称
:param p_isRandom: 是否随机生成,默认为False,若为True则为非随机模式生成,将按照起始点从左至右&从下至上的顺序生成
:return: 默认会返回Node,在非随机生成模式下,当该方法生成完当前网格的所有Node后会进行重置并返回None,请保持对该方法的返回值是否为None的判断,避免陷入死循环
"""
v_rowsCount = sd.SharedData.rowsCount
v_colsCount = sd.SharedData.colsCount
v_x = 0.5
v_y = 0.5
if p_isRandom:
v_i = random.randint(0, v_rowsCount - 1)
v_x = v_i + 0.5
v_i = random.randint(0, v_colsCount - 1)
v_y = v_i + 0.5
else:
if self._colIndex < v_colsCount:
if self._rowIndex > v_rowsCount:
self._rowIndex -= v_rowsCount
self._colIndex += 1
v_x = (self._rowIndex - 1) + 0.5
v_y = (self._colIndex - 1) + 0.5
self._rowIndex += 1
self._generateCount += 1
else:
if self._rowIndex <= v_rowsCount:
v_x = (self._rowIndex - 1) + 0.5
v_y = (self._colIndex - 1) + 0.5
self._rowIndex += 1
self._generateCount += 1
else:
self._rowIndex = 1
self._colIndex = 1
if self._generateCount > v_rowsCount * v_colsCount:
self._generateCount = 0
return None
v_nodePos = NodePosition(v_x, v_y)
v_node = Node(p_name, v_nodePos)
return v_node
def GenerateNextNodes(self, p_node: Node):
"""
获取某个Node下一步可以前往的Node集(NextNode集)
:param p_node: 待生成NextNode集的Node
:return: 若p_node为None则返回None,否则返回一个列表
"""
v_list = []
if p_node is not None:
v_p = p_node.nodePos
v_posList = [(v_p.x - 1, v_p.y), (v_p.x - 1, v_p.y + 1), (v_p.x, v_p.y + 1), (v_p.x + 1, v_p.y + 1),
(v_p.x + 1, v_p.y), (v_p.x + 1, v_p.y - 1), (v_p.x, v_p.y - 1), (v_p.x - 1, v_p.y - 1)]
v_nameList = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']
for i in range(len(v_posList)):
v_nodeName = v_nameList[i] + str(self._nameIndex)
v_nodePos = NodePosition(v_posList[i][0], v_posList[i][1])
v_node = Node(v_nodeName, v_nodePos)
v_list.append(v_node)
self._nameIndex += 1
return v_list
return v_list
# Node或Node集的检测
class NodeCheck:
@classmethod
def RepeatCheck(cls, p_list: list[Node], p_node: Node):
"""
判断Node集中是否存在某个Node
:param p_list: Node集
:param p_node: 待检测的Node
:return: 存在返回True,否则返回False
"""
if p_list is not None and len(p_list) != 0:
for n in p_list:
if n == p_node:
return True
return False
@classmethod
def OutOfRange(cls, p_node: Node):
"""
判断Node是否超出了网格限定范围
:param p_node: 待检测的Node
:return:若超出了范围则返回True,否则返回False
"""
v_minX = 0.5
v_minY = 0.5
v_maxX = (sd.SharedData.rowsCount - 1) + 0.5
v_maxY = (sd.SharedData.colsCount - 1) + 0.5
v_x = p_node.nodePos.x
v_y = p_node.nodePos.y
if v_minX <= v_x <= v_maxX and v_minY <= v_y <= v_maxY:
return False
return True
@classmethod
def IsContainNodes(cls, p_listA: list[Node], p_listB: list[Node]):
"""
判断某个Node集(p_listA)是否包含另一个Node集(p_listB)
:param p_listA: 待检测的Node集
:param p_listB: 被包含的Node集
:return: 若p_listA包含p_listB则返回True,否则返回False,若p_listA为None也会返回False
"""
if p_listA is not None and len(p_listA) != 0:
if p_listB is None or len(p_listB) == 0:
return True
else:
for n in p_listB:
if cls.RepeatCheck(p_listA, n) is False:
return False
return True
return False
# Node或Node集的处理
class NodeDeal:
# 对角线移动一格的代价
_diagonalCost = 14
# 上下或左右移动一格的代价
_nonDiagonalCost = 10
@classmethod
def RemoveRepeateNode(cls, p_list: list[Node]):
"""
移除Node集中的重复Node
:param p_list: Node集
:return: 若p_list为None则返回None,否则返回一个列表
"""
if p_list is not None and len(p_list) != 0:
v_list = []
v_length = len(p_list)
for i in range(v_length):
v_isRepeate = False
for j in range(i + 1, v_length):
if p_list[i] == p_list[j]:
v_isRepeate = True
break
if v_isRepeate is False:
v_list.append(p_list[i])
return v_list
return p_list
@classmethod
def RemoveNode(cls, p_sourceList: list[Node], p_node: Node):
"""
移除Node集(p_sourceList)中的某个Node(p_node)
:param p_sourceList: 待处理的Node集
:param p_node: 参考Node
:return: 返回一个列表
"""
v_j1 = p_sourceList is not None and p_node is not None
if v_j1 and len(p_sourceList) != 0:
v_list = []
for n in p_sourceList:
if n != p_node:
v_list.append(n)
return v_list
return p_sourceList
@classmethod
def RemoveNodes(cls, p_sourceList: list[Node], p_refList: list[Node]):
"""
将一个Node集(p_sourceList)中另一个Node集(p_refList)的元素去除
:param p_sourceList: 待处理的原Node集
:param p_refList: 参考Node集
:return: 默认返回一个列表,若p_sourceList或p_refList为None则返回原Node集
"""
v_j1 = p_sourceList is not None and p_refList is not None
if v_j1 and len(p_sourceList) != 0 and len(p_refList) != 0:
v_list = []
for n in p_sourceList:
if NodeCheck.RepeatCheck(p_refList, n) is False:
v_list.append(n)
return v_list
return p_sourceList
@classmethod
def RemoveOutOfRangeNode(cls, p_list: list[Node]):
"""
移除Node集中超出网格范围的Node
:param p_list: 待处理的Node集
:return: 默认返回一个列表,如果p_list为None则返回None
"""
if p_list is not None and len(p_list) != 0:
v_list = []
for n in p_list:
if NodeCheck.OutOfRange(n) is False:
v_list.append(n)
return v_list
return p_list
@classmethod
def ReplaceNode(cls, p_list: list[Node], p_node: Node):
"""
替换Node集中与某个Node相同的Node
:param p_list: Node集
:param p_node: 用于替换的Node
:return: 若p_list和p_node为None则返回None,否则返回一个列表
"""
v_j1 = p_list is not None and p_node is not None
if v_j1 and len(p_list) != 0:
v_list = p_list
if NodeCheck.RepeatCheck(v_list, p_node):
for i in range(len(v_list)):
if v_list[i] == p_node:
v_list[i] = p_node
return v_list
return p_list
@classmethod
def SetWeightValue(cls, p_diagonalCost: int = 14, p_nonDiagonalCost: int = 10):
"""
设置diagonalCost(对角线移动权值)和nonDiagonalCost(非对角线移动权值)
:param p_diagonalCost: 对角线移动权值,默认为14
:param p_nonDiagonalCost: 非对角线移动权值,默认为10
"""
if p_diagonalCost >= 0:
cls.__diagonalCost = p_diagonalCost
if p_nonDiagonalCost >= 0:
cls.__nonDiagonalCost = p_nonDiagonalCost
@classmethod
def CalculateWeight(cls, p_list: list[Node], p_startNode: Node, p_endNode: Node):
"""
计算某个Node集相对起始点和终点的权值
:param p_list: 待计算权值的Node集
:param p_startNode: 起始点
:param p_endNode: 终点
:return: 默认返回一个列表,若p_list或p_startNode或p_endNode有一个为None则返回None
"""
v_j1 = p_list is not None and p_startNode is not None and p_endNode is not None
if v_j1 and len(p_list) != 0:
v_list = p_list
v_startNodeX = p_startNode.nodePos.x
v_startNodeY = p_startNode.nodePos.y
v_endNodeX = p_endNode.nodePos.x
v_endNodeY = p_endNode.nodePos.y
for n in v_list:
v_x = n.nodePos.x
v_y = n.nodePos.y
if v_x == v_y:
v_g = abs((v_x - v_startNodeX)) * cls._diagonalCost
if v_endNodeX == v_endNodeY:
v_h = abs((v_endNodeX - v_x)) * cls._diagonalCost
else:
v_h = abs((v_endNodeX - v_x)) * cls._nonDiagonalCost + abs(
(v_endNodeY - v_y)) * cls._nonDiagonalCost
else:
v_g = abs((v_x - v_startNodeX)) * cls._nonDiagonalCost + abs(
(v_y - v_startNodeY)) * cls._nonDiagonalCost
v_h = abs((v_endNodeX - v_x)) * cls._nonDiagonalCost + abs(
(v_endNodeY - v_y)) * cls._nonDiagonalCost
v_f = v_g + v_h
n.SetWeight(int(v_f), int(v_g), int(v_h))
return v_list
return p_list
@classmethod
def SortNodes(cls, p_list: list[Node], p_endMax=True):
"""
对Node集按照总权值(f)进行排序
:param p_list: 待排序的Node集
:param p_endMax: 是否按照总权值从小到大排序,默认为True
:return: 默认返回一个列表,若p_list为None则返回None
"""
if p_list is not None and len(p_list) != 0:
v_list = p_list
if p_endMax:
for i in range(0, len(v_list)):
for j in range(i + 1, len(v_list)):
if v_list[i].f > v_list[j].f:
v_node = v_list[i]
v_list[i] = v_list[j]
v_list[j] = v_node
else:
for i in range(0, len(v_list)):
for j in range(i + 1, len(v_list)):
if v_list[i].f < v_list[j].f:
v_node = v_list[i]
v_list[i] = v_list[j]
v_list[j] = v_node
return v_list
return p_list
@classmethod
def PrintNodes(cls, p_list: list[Node], p_sep='--'):
"""
控制台打印Node集
:param p_list: 待打印的Node集
:param p_sep: 每个Node之间的间隔符号,默认为"--"
"""
v_len = len(p_list)
if p_list is not None and v_len != 0:
v_str = ''
for i in range(v_len):
if i < v_len - 1:
v_str += str(p_list[i]) + p_sep
else:
v_str += str(p_list[i])
print(v_str)
else:
print("List is None or it's length equals zero.")
Block.py
from myCodes import Grid as grid
from myCodes import Node as node
from myCodes import SharedData as sd
import threading as th
from myCodes import SearchPath as sp
# 用于生成阻碍物
class Block:
_lock = th.Lock()
_blockCount = 0
_unCheckedNodes = []
_checkedNodes = []
_isImpassable = False
_generateNum = 0
def __init__(self, p_blcokCount: int = 5):
"""
:param p_blcokCount: 生成的障碍物的数量
"""
self._blockCount = p_blcokCount
self._endNode = sd.SharedNodes().GetEndNode()
def __new__(cls, *args, **kwargs):
if hasattr(Block, '_instance') is False:
with cls._lock:
if hasattr(Block, '_instance') is False:
Block._instance = object.__new__(cls)
return Block._instance
# 用于生成Block
def Create(self):
v_list = []
v_curNode = sd.SharedNodes().GetStartNode()
while self._StartImpasseCheck(v_list, v_curNode) is False:
v_list = self._SelectNode()
self._Reset()
sd.SharedData.blockInfo = '生成BlockNode集次数:' + str(self._generateNum)
v_list = self._GenerateBlock(v_list)
sd.SharedNodes().Submit(v_list)
self._MarkBlockNode(v_list)
# 生成BlockNode集
def _SelectNode(self):
v_list = node.NodeFactory().GenerateNodes(self._blockCount, p_isRandom=True)
return v_list
# 对所生成的BlockNode集进行检测,若出现死胡同则重新生成BlockNode集
def _StartImpasseCheck(self, p_list, p_curNode):
# 若返回值为False:p_list为None或死胡同
# 若返回值为True:无死胡同
# p_list在这里代表的是生成的BlockNode集
v_isImpassable = self._isImpassable
v_j1 = p_list is not None and p_curNode is not None and v_isImpassable is False
v_nextNodes = self._GetNextNodes(p_curNode, p_list)
if v_j1 and len(p_list) != 0 and len(v_nextNodes) != 0:
if sp.NodeMove().MoveCheck(p_list, v_nextNodes, p_curNode) is False:
return False
v_nextNodes = sp.NodeMove().GetNextNodes(p_list, v_nextNodes, p_curNode, p_isDefCheck=False)
v_XNodes = self._GetXNodes(p_curNode.nodePos.x + 1, v_nextNodes)
v_YNodes = self._GetYNodes(p_curNode.nodePos.y + 1, v_nextNodes)
v_isContainX = node.NodeCheck.IsContainNodes(p_list, v_XNodes)
v_isContainY = node.NodeCheck.IsContainNodes(p_list, v_YNodes)
if v_isContainX and v_isContainY:
self._isImpassable = True
return False
elif v_isContainX is False and v_isContainY is False:
if v_nextNodes[0] == self._endNode:
return True
else:
return self._StartImpasseCheck(p_list, v_nextNodes[0])
else:
if v_isContainX:
v_nextNodes = node.NodeDeal.RemoveNodes(v_nextNodes, v_XNodes)
else:
v_nextNodes = node.NodeDeal.RemoveNodes(v_nextNodes, v_YNodes)
if v_nextNodes[0] == self._endNode:
return True
else:
return self._StartImpasseCheck(p_list, v_nextNodes[0])
return False
# BlockNode集检测重置
def _Reset(self):
self._isImpassable = False
self._unCheckedNodes = []
self._checkedNodes = []
self._generateNum += 1
# 获取处理后的NextNode集
def _GetNextNodes(self, p_curNode, p_list):
v_uclist = self._unCheckedNodes
self._unCheckedNodes = node.NodeDeal.RemoveNode(v_uclist, p_curNode)
self._checkedNodes.append(p_curNode)
v_nextNodes = node.NodeFactory().GenerateNextNodes(p_curNode)
v_nextNodes = node.NodeDeal.RemoveOutOfRangeNode(v_nextNodes)
v_nextNodes = node.NodeDeal.RemoveNodes(v_nextNodes, self._unCheckedNodes)
v_nextNodes = node.NodeDeal.RemoveNodes(v_nextNodes, self._checkedNodes)
v_nextNodes = node.NodeDeal.RemoveNodes(v_nextNodes, p_list)
v_nextNodes = node.NodeDeal.CalculateWeight(v_nextNodes, p_curNode, sd.SharedNodes().GetEndNode())
v_nextNodes = node.NodeDeal.SortNodes(v_nextNodes)
return v_nextNodes
# 获取xNodes
def _GetXNodes(self, p_x, p_list):
v_list = []
for n in p_list:
v_x = n.nodePos.x
if v_x == p_x:
v_list.append(n)
return v_list
# 获取yNodes
def _GetYNodes(self, p_y, p_list):
v_list = []
for n in p_list:
v_y = n.nodePos.y
if v_y == p_y:
v_list.append(n)
return v_list
# 生成Block
def _GenerateBlock(self, p_list):
if p_list is not None and len(p_list) != 0:
v_list = p_list
for n in v_list:
n.nodeTag = node.NodeTag.BLOCKNODE
return v_list
return p_list
# 标记生成Block的Node
def _MarkBlockNode(self, p_list):
if p_list is not None and len(p_list) != 0:
for n in p_list:
grid.Grid.Mark(n, 49, 'ks')
SearchPath.py
from myCodes import SharedData as sd
from myCodes import Node as node
import threading as th
from enum import Enum, unique
@unique
class Direction(Enum):
LeftUp = 1
RightUp = 2
LeftDown = 3
RightDown = 4
Left = 5
Right = 6
Up = 7
Down = 8
# Node移动规则类
class NodeMove:
_lock = th.Lock()
_init = False
def __init__(self):
if self._init is False:
self._nextNodes = []
self._init = True
def __new__(cls, *args, **kwargs):
if hasattr(NodeMove, '_instance') is False:
with cls._lock:
if hasattr(NodeMove, '_instance') is False:
NodeMove._instance = object.__new__(cls)
return NodeMove._instance
def GetNextNodes(self, p_blockNodes: list[node.Node], p_nextNodes: list[node.Node], p_curNode: node.Node,
p_isDefCheck=True):
"""
获取经过移动检测的NextNode集
:param p_blockNodes: 障碍物Node集
:param p_nextNodes: 当前Node的下一步Node集
:param p_curNode: 当前Node
:param p_isDefCheck: 是否开启移动检测,默认为True,若开启移动检测则自动调用MoveCheck方法
:return: 返回一个列表
"""
v_list = []
if p_isDefCheck:
if self.MoveCheck(p_blockNodes, p_nextNodes, p_curNode):
for n in self._nextNodes:
v_list.append(n)
else:
for n in self._nextNodes:
v_list.append(n)
if self._nextNodes is None or len(self._nextNodes) == 0:
return p_nextNodes
return v_list
def GetMoveDirection(self, p_curNode: node.Node, p_nextNode: node.Node):
"""
获取当前Node至下一个Node的移动方向
:param p_curNode: 当前Node
:param p_nextNode: 下一个Node
:return: 返回一个Direction枚举值
"""
v_curX = p_curNode.nodePos.x
v_curY = p_curNode.nodePos.y
v_neX = p_nextNode.nodePos.x
v_neY = p_nextNode.nodePos.y
v_X = v_neX - v_curX
v_Y = v_neY - v_curY
if v_X == -1 and v_Y == 0:
return Direction.Left
elif v_X == -1 and v_Y == 1:
return Direction.LeftUp
elif v_X == 0 and v_Y == 1:
return Direction.Up
elif v_X == 1 and v_Y == 1:
return Direction.RightUp
elif v_X == 1 and v_Y == 0:
return Direction.Right
elif v_X == 1 and v_Y == -1:
return Direction.RightDown
elif v_X == 0 and v_Y == -1:
return Direction.Down
else:
return Direction.LeftDown
def MoveCheck(self, p_blockNodes: list[node.Node], p_nextNodes: list[node.Node], p_curNode: node.Node):
"""
移动规则检测
:param p_blockNodes: 障碍物Node集
:param p_nextNodes: 当前Node的下一步Node集
:param p_curNode: 当前Node
:return: 若下一步可前行,则返回True,否则返回False
"""
v_j1 = p_blockNodes is not None and p_nextNodes is not None and p_curNode is not None
v_j2 = len(p_blockNodes) != 0 and len(p_nextNodes) != 0
if v_j1 is False or v_j2 is False:
self._nextNodes = []
return False
v_nextNode = p_nextNodes[0]
v_curX = p_curNode.nodePos.x
v_curY = p_curNode.nodePos.y
v_direction = self.GetMoveDirection(p_curNode, v_nextNode)
if v_direction == Direction.LeftUp:
v_x = v_curX - 1
v_y = v_curY + 1
elif v_direction == Direction.RightUp:
v_x = v_curX + 1
v_y = v_curY + 1
elif v_direction == Direction.RightDown:
v_x = v_curX + 1
v_y = v_curY - 1
elif v_direction == Direction.LeftDown:
v_x = v_curX - 1
v_y = v_curY - 1
else:
self._nextNodes = p_nextNodes
return True
v_xNode = node.Node('xNode', node.NodePosition(v_x, v_curY))
v_yNode = node.Node('yNode', node.NodePosition(v_curX, v_y))
v_isContainX = node.NodeCheck.RepeatCheck(p_blockNodes, v_xNode)
v_isContainY = node.NodeCheck.RepeatCheck(p_blockNodes, v_yNode)
if v_isContainX is False and v_isContainY is False:
self._nextNodes = p_nextNodes
return True
else:
v_nextNodes = node.NodeDeal.RemoveNode(p_nextNodes, v_nextNode)
if len(v_nextNodes) != 0:
return self.MoveCheck(p_blockNodes, v_nextNodes, p_curNode)
self._nextNodes = []
return False
# 查找最佳路径
class SearchPath:
_lock = th.Lock()
# 对角线移动一格的代价
_diagonalCost = 14
# 上下或左右移动一格的代价
_nonDiagonalCost = 10
_nodeFactory: node.NodeFactory
_currentNode: node.Node
# 是否完成了初始化
_isInit = False
_openList = []
_closeList = []
# 网格行列数
_rowsCount = 0
_colsCount = 0
# x和y的最小值
_minX = 0.5
_minY = 0.5
# x和y的最大值
_maxX = 0
_maxY = 0
# 终点Node
_endNode: node.Node
def __init__(self):
if self._isInit is False:
self._nodeFactory = node.NodeFactory()
self._currentNode = sd.SharedNodes().GetStartNode()
self._openList.append(self._currentNode)
self._rowsCount = sd.SharedData.rowsCount
self._colsCount = sd.SharedData.colsCount
self._maxX = self._rowsCount - 0.5
self._maxY = self._colsCount - 0.5
self._endNode = sd.SharedNodes().GetEndNode()
self._isInit = True
def __new__(cls, *args, **kwargs):
if hasattr(SearchPath, '_instance') is False:
with cls._lock:
if hasattr(SearchPath, '_instance') is False:
SearchPath._instance = object.__new__(cls)
return SearchPath._instance
def Search(self, p_isPrint=False):
"""
查找最佳路径
:param p_isPrint: 是否在控制台打印最佳路径的Node集信息,默认为False
"""
while self._UpdateCurrentNode():
# 获取currentNode下一步可以前往的Node,并将它们保存在一个临时列表v_list中
v_list = self._nodeFactory.GenerateNextNodes(self._currentNode)
v_list = self._NodeCheck(v_list)
v_list = self._CalculateWeight(v_list)
v_list = self._SortNodes(v_list)
v_list = NodeMove().GetNextNodes(sd.SharedNodes().GetBlockNodes(), v_list, self._currentNode)
self._AddNode(v_list)
sd.SharedNodes().Submit(self._closeList)
if p_isPrint:
node.NodeDeal.PrintNodes(self._closeList, '-->')
# 检查临时列表p_list中哪些Node不符合要求,保留符合要求的节点
def _NodeCheck(self, p_list):
if p_list is not None and len(p_list) != 0:
v_list1 = []
v_list2 = []
for n in p_list:
if node.NodeCheck.OutOfRange(n) is False:
v_list1.append(n)
for n in v_list1:
v_isInOpenList = node.NodeCheck.RepeatCheck(self._openList, n)
v_isInCloseList = node.NodeCheck.RepeatCheck(self._closeList, n)
v_isInBlockList = node.NodeCheck.RepeatCheck(sd.SharedNodes().GetBlockNodes(), n)
if v_isInOpenList is False and v_isInCloseList is False and v_isInBlockList is False:
v_list2.append(n)
return v_list2
return p_list
# 计算临时列表p_list中每个Node的权值
def _CalculateWeight(self, p_list):
if p_list is not None and len(p_list) != 0:
v_list = node.NodeDeal.CalculateWeight(p_list, self._currentNode, self._endNode)
return v_list
return p_list
# 根据临时列表p_list中每个Node的权值进行排序,权值越大越接近列表尾
def _SortNodes(self, p_list):
if p_list is not None and len(p_list) != 0:
v_list = node.NodeDeal.SortNodes(p_list)
return v_list
return p_list
# 将临时列表p_list拼接在openList的列表尾
def _AddNode(self, p_list):
if p_list is not None and len(p_list) != 0:
v_list = []
for i in range(len(p_list)):
v_n = p_list[i]
v_isInOpenList = node.NodeCheck.RepeatCheck(self._openList, v_n)
v_isInCloseList = node.NodeCheck.RepeatCheck(self._closeList, v_n)
if v_isInOpenList is False and v_isInCloseList is False:
v_list.append(v_n)
self._openList.append(v_list[0])
return True
return False
# 从openList中获取列表尾的元素,将之作为currentNode并加入closeList中,然后将其从openList中移除
def _UpdateCurrentNode(self):
if len(self._openList) > 0:
self._currentNode = self._openList[0]
v_isInCloseList = node.NodeCheck.RepeatCheck(self._closeList, self._currentNode)
if v_isInCloseList is False:
self._closeList.append(self._currentNode)
del self._openList[0]
if self._currentNode == self._endNode:
return False
return True
return False
SharedData.py
from myCodes import Node as node
import threading as th
# 共享信息类
class SharedData:
# 网格行列数
rowsCount = 0
colsCount = 0
blockInfo = ''
# 共享Node和Node集
class SharedNodes:
_instanceLock = th.Lock()
_lock = th.Lock()
_sharedNodes = []
_pathNodes = []
_blockNodes = []
_isUpdatePathNodes = False
_isUpdateBlockNodes = False
_startNode: node.Node
_endNode: node.Node
def __init__(self):
node.NodeFactory()
def __new__(cls, *args, **kwargs):
if hasattr(SharedNodes, '_instance') is False:
with cls._instanceLock:
if hasattr(SharedNodes, '_instance') is False:
SharedNodes._instance = object.__new__(cls)
return SharedNodes._instance
# 起始点Node的设置
def SetStartNode(self, p_node: node.Node, p_password: str = ''):
if p_node is not None and p_password == 'SNSET':
self._startNode = p_node
# 终点Node的设置
def SetEndNode(self, p_node: node.Node, p_password: str = ''):
if p_node is not None and p_password == 'ENSET':
self._endNode = p_node
def GetStartNode(self):
"""
获取起始点Node
:return: 返回Node
"""
v_node = node.Node(self._startNode.nodeName, self._startNode.nodePos)
v_node.f = self._startNode.f
v_node.g = self._startNode.g
v_node.h = self._startNode.h
v_node.nodeTag = self._startNode.nodeTag
return v_node
def GetEndNode(self):
"""
获取终点Node
:return: 返回Node
"""
v_node = node.Node(self._endNode.nodeName, self._endNode.nodePos)
v_node.f = self._endNode.f
v_node.g = self._endNode.g
v_node.h = self._endNode.h
v_node.nodeTag = self._endNode.nodeTag
return v_node
def GetPathNodes(self):
# 起初该方法是直接返回cls.__pathNodes,这就导致了数据安全性的问题,使得在外部可以直接修改这里pathNodes的数据,所以正确的做法应该像现在这样
# 声明一个新的列表,然后将cls.__pathNodes中的值添加进去,再将新的列表作为返回值传递出去,这说明在Python中直接传递内部的静态变量会传递其引用
"""
获取PathNode集
:return: 返回一个列表
"""
self._UpdateNodes(node.NodeTag.PATHNODE)
v_pathNodes = []
for n in self._pathNodes:
v_pathNodes.append(n)
return v_pathNodes
def GetBlockNodes(self):
"""
获取BlockNode集
:return: 返回一个列表
"""
self._UpdateNodes(node.NodeTag.BLOCKNODE)
v_blockNodes = []
for n in self._blockNodes:
v_blockNodes.append(n)
return v_blockNodes
def Submit(self, p_list: list[node.Node]):
"""
通过该方法可以实现Node集的添加、删除、修改,建议与GetPathNodes或GetBlockNodes方法配套使用
:param p_list: 修改后的Node集
:return: 无返回值
"""
v_list = node.NodeDeal.RemoveRepeateNode(p_list)
v_list = self._PollutionNodeCheck(v_list)
self._ReplaceSharedNodes(v_list)
self._AddToSharedNodes(v_list)
v_tag = self._GetNodesType(v_list)
self._DeleteSharedNodes(v_list, v_tag)
if v_tag == node.NodeTag.PATHNODE:
self._isUpdatePathNodes = False
else:
self._isUpdateBlockNodes = False
self._UpdateNodes(v_tag)
# 更新PathNode集和BlockNode集
def _UpdateNodes(self, p_tag):
if p_tag == node.NodeTag.PATHNODE:
if self._isUpdatePathNodes is False:
with self._lock:
v_list = []
for n in self._sharedNodes:
if n.nodeTag == node.NodeTag.PATHNODE:
v_list.append(n)
self._pathNodes = v_list
self._isUpdatePathNodes = True
else:
if self._isUpdateBlockNodes is False:
with self._lock:
v_list = []
for n in self._sharedNodes:
if n.nodeTag == node.NodeTag.BLOCKNODE:
v_list.append(n)
self._blockNodes = v_list
self._isUpdateBlockNodes = True
# 脏数据清理
def _PollutionNodeCheck(self, p_list):
if p_list is not None and len(p_list) != 0:
v_tag = self._GetNodesType(p_list)
if v_tag is None:
return p_list
v_list = []
for n in p_list:
if n.nodeTag == v_tag:
v_list.append(n)
return v_list
return p_list
# 获取当前Node集的种类:PathNode集或BlockNode集
def _GetNodesType(self, p_list):
if p_list is not None and len(p_list) != 0:
v_pathNodeCount = 0
v_blockNodeCount = 0
for n in p_list:
if n.nodeTag == node.NodeTag.PATHNODE:
v_pathNodeCount += 1
else:
v_blockNodeCount += 1
if v_pathNodeCount > v_blockNodeCount:
return node.NodeTag.PATHNODE
elif v_pathNodeCount < v_blockNodeCount:
return node.NodeTag.BLOCKNODE
else:
return None
return None
# Node集的修改功能
def _ReplaceSharedNodes(self, p_list):
if p_list is not None and len(p_list) != 0:
with self._lock:
v_list = self._sharedNodes
for n in p_list:
v_list = node.NodeDeal.ReplaceNode(v_list, n)
self._sharedNodes = v_list
# Node集的添加功能
def _AddToSharedNodes(self, p_list):
if p_list is not None and len(p_list) != 0:
with self._lock:
v_list = []
for n in p_list:
if node.NodeCheck.RepeatCheck(self._sharedNodes, n) is False:
v_list.append(n)
self._sharedNodes.extend(v_list)
# Node集的删除功能
def _DeleteSharedNodes(self, p_list: list, p_tag: node.NodeTag):
if p_list is not None and len(p_list) != 0:
with self._lock:
if p_tag == node.NodeTag.PATHNODE:
v_nodes = self._pathNodes
else:
v_nodes = self._blockNodes
for n in v_nodes:
if node.NodeCheck.RepeatCheck(p_list, n) is False:
v_index = self._sharedNodes.index(n)
del self._sharedNodes[v_index]
Main.py
from myCodes import SearchPath as sp
from myCodes import Grid as grid
from myCodes import Block as block
g = grid.Grid()
block.Block(15).Create()
sp.SearchPath().Search()
g.Draw()
代码解说
在本篇中我们实现了对移动规则的改变,当我们下一步是斜向移动时,会进行移动检测,如果下一步斜向移动符合移动规则,则允许移动,否则不允许,而负责移动规则检测的类是数据层家族的新成员——NodeMove类,通过这个类,我们可以在生成BlockNode集以及查找最佳路径时进行检测,以此来达到我们在“方案探讨”中所说的“预寻路和实际寻路的移动规则保持一致”的目的。
下一篇将针对最佳路径的查找方式优化进行进一步探索和实现
如果这篇文章对你有帮助,请给作者点个赞吧!