EG/plugins/user/pathfinding_algorithms/algorithms/astar.py
2025-10-30 11:46:41 +08:00

185 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
A*算法实现
A*是一种启发式搜索算法,广泛用于路径规划
"""
import heapq
import math
from typing import List, Tuple, Optional, Dict, Set
class AStar:
"""
A*算法实现
A*是一种启发式搜索算法,广泛用于路径规划
"""
def __init__(self):
"""初始化A*算法"""
self.heuristic = 'manhattan' # 默认启发式函数
self.stats = {
'nodes_visited': 0,
'max_queue_size': 0
}
def find_path(self, grid: List[List[int]], start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[List[Tuple[int, int]]]:
"""
使用A*算法查找路径
Args:
grid: 网格地图0表示可通行1表示障碍物
start: 起点坐标 (row, col)
goal: 终点坐标 (row, col)
Returns:
路径坐标列表如果找不到路径则返回None
"""
# 重置统计信息
self.stats = {
'nodes_visited': 0,
'max_queue_size': 0
}
# 获取网格尺寸
rows = len(grid)
cols = len(grid[0]) if rows > 0 else 0
# 检查起点和终点是否有效
if not (0 <= start[0] < rows and 0 <= start[1] < cols):
return None
if not (0 <= goal[0] < rows and 0 <= goal[1] < cols):
return None
if grid[start[0]][start[1]] == 1 or grid[goal[0]][goal[1]] == 1:
return None
# 初始化开放列表(优先队列)
open_list = []
heapq.heappush(open_list, (0, start))
# 初始化关闭列表
closed_list: Set[Tuple[int, int]] = set()
# 初始化距离和父节点字典
g_score: Dict[Tuple[int, int], float] = {start: 0}
f_score: Dict[Tuple[int, int], float] = {start: self._calculate_heuristic(start, goal)}
parent: Dict[Tuple[int, int], Tuple[int, int]] = {}
# 8方向移动包括对角线
directions = [
(-1, 0), (1, 0), (0, -1), (0, 1), # 上下左右
(-1, -1), (-1, 1), (1, -1), (1, 1) # 对角线
]
while open_list:
# 更新统计信息
self.stats['nodes_visited'] += 1
self.stats['max_queue_size'] = max(self.stats['max_queue_size'], len(open_list))
# 取出f_score最小的节点
current = heapq.heappop(open_list)[1]
# 如果到达目标节点
if current == goal:
# 重构路径
path = self._reconstruct_path(parent, current)
return path
# 将当前节点加入关闭列表
closed_list.add(current)
# 检查所有邻居节点
for dr, dc in directions:
neighbor = (current[0] + dr, current[1] + dc)
# 检查邻居节点是否在网格范围内
if not (0 <= neighbor[0] < rows and 0 <= neighbor[1] < cols):
continue
# 检查邻居节点是否为障碍物
if grid[neighbor[0]][neighbor[1]] == 1:
continue
# 检查邻居节点是否已在关闭列表中
if neighbor in closed_list:
continue
# 计算移动代价
if abs(dr) + abs(dc) == 2: # 对角线移动
move_cost = 1.414 # √2
else: # 直线移动
move_cost = 1.0
# 计算到邻居节点的 tentative_g_score
tentative_g_score = g_score[current] + move_cost
# 如果找到更优路径或首次访问该节点
if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
# 更新父节点和得分
parent[neighbor] = current
g_score[neighbor] = tentative_g_score
f_score[neighbor] = g_score[neighbor] + self._calculate_heuristic(neighbor, goal)
# 将邻居节点加入开放列表
heapq.heappush(open_list, (f_score[neighbor], neighbor))
# 未找到路径
return None
def _calculate_heuristic(self, point1: Tuple[int, int], point2: Tuple[int, int]) -> float:
"""
计算启发式函数值
Args:
point1: 第一个点
point2: 第二个点
Returns:
启发式函数值
"""
if self.heuristic == 'manhattan':
# 曼哈顿距离
return abs(point1[0] - point2[0]) + abs(point1[1] - point2[1])
elif self.heuristic == 'euclidean':
# 欧几里得距离
return math.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2)
elif self.heuristic == 'chebyshev':
# 切比雪夫距离
return max(abs(point1[0] - point2[0]), abs(point1[1] - point2[1]))
else:
# 默认使用曼哈顿距离
return abs(point1[0] - point2[0]) + abs(point1[1] - point2[1])
def _reconstruct_path(self, parent: Dict[Tuple[int, int], Tuple[int, int]], current: Tuple[int, int]) -> List[Tuple[int, int]]:
"""
重构路径
Args:
parent: 父节点字典
current: 当前节点
Returns:
路径坐标列表
"""
path = [current]
while current in parent:
current = parent[current]
path.append(current)
path.reverse()
return path
def get_stats(self) -> Dict[str, int]:
"""
获取算法统计信息
Returns:
统计信息字典
"""
return self.stats.copy()
def set_heuristic(self, heuristic: str):
"""
设置启发式函数
Args:
heuristic: 启发式函数名称 ('manhattan', 'euclidean', 'chebyshev')
"""
self.heuristic = heuristic