EG/plugins/user/pathfinding_algorithms/algorithms/jps.py
2025-12-12 16:16:15 +08:00

290 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
JPS算法实现
Jump Point Search (JPS) 是一种优化的A*算法,专门用于网格地图
"""
import heapq
from typing import List, Tuple, Optional, Dict, Set
class JPS:
"""
JPS算法实现
Jump Point Search (JPS) 是一种优化的A*算法,专门用于网格地图
"""
def __init__(self):
"""初始化JPS算法"""
self.stats = {
'nodes_visited': 0,
'max_queue_size': 0
}
def find_path(self, grid: List[List[int]], start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[List[Tuple[int, int]]]:
"""
使用JPS算法查找路径
Args:
grid: 网格地图0表示可通行1表示障碍物
start: 起点坐标 (row, col)
goal: 终点坐标 (row, col)
Returns:
路径坐标列表如果找不到路径则返回None
"""
# 重置统计信息
self.stats = {
'nodes_visited': 0,
'max_queue_size': 0
}
# 获取网格尺寸
self.rows = len(grid)
self.cols = len(grid[0]) if self.rows > 0 else 0
self.grid = grid
# 检查起点和终点是否有效
if not (0 <= start[0] < self.rows and 0 <= start[1] < self.cols):
return None
if not (0 <= goal[0] < self.rows and 0 <= goal[1] < self.cols):
return None
if grid[start[0]][start[1]] == 1 or grid[goal[0]][goal[1]] == 1:
return None
# 初始化开放列表(优先队列)
open_list = []
heapq.heappush(open_list, (0, start))
# 初始化关闭列表
closed_list: Set[Tuple[int, int]] = set()
# 初始化距离和父节点字典
g_score: Dict[Tuple[int, int], float] = {start: 0}
f_score: Dict[Tuple[int, int], float] = {start: self._manhattan_distance(start, goal)}
parent: Dict[Tuple[int, int], Tuple[int, int]] = {}
while open_list:
# 更新统计信息
self.stats['nodes_visited'] += 1
self.stats['max_queue_size'] = max(self.stats['max_queue_size'], len(open_list))
# 取出f_score最小的节点
current = heapq.heappop(open_list)[1]
# 如果到达目标节点
if current == goal:
# 重构路径
path = self._reconstruct_path(parent, current)
return path
# 将当前节点加入关闭列表
closed_list.add(current)
# 获取当前节点的跳点邻居
successors = self._get_jump_point_successors(current, goal, g_score[current], parent)
# 处理所有跳点邻居
for successor, move_cost in successors:
if successor in closed_list:
continue
tentative_g_score = g_score[current] + move_cost
if successor not in g_score or tentative_g_score < g_score[successor]:
parent[successor] = current
g_score[successor] = tentative_g_score
f_score[successor] = g_score[successor] + self._manhattan_distance(successor, goal)
heapq.heappush(open_list, (f_score[successor], successor))
# 未找到路径
return None
def _manhattan_distance(self, point1: Tuple[int, int], point2: Tuple[int, int]) -> float:
"""计算曼哈顿距离"""
return abs(point1[0] - point2[0]) + abs(point1[1] - point2[1])
def _get_jump_point_successors(self, current: Tuple[int, int], goal: Tuple[int, int],
current_g: float, parent: Dict[Tuple[int, int], Tuple[int, int]]) -> List[Tuple[Tuple[int, int], float]]:
"""
获取跳点邻居
Args:
current: 当前节点
goal: 目标节点
current_g: 当前节点的g值
parent: 父节点字典
Returns:
跳点邻居列表 [(节点, 移动代价), ...]
"""
successors = []
# 如果当前节点有父节点,则沿着父节点方向寻找跳点
if current in parent:
parent_node = parent[current]
dx = current[0] - parent_node[0]
dy = current[1] - parent_node[1]
# 标准化方向
dx = 1 if dx > 0 else (-1 if dx < 0 else 0)
dy = 1 if dy > 0 else (-1 if dy < 0 else 0)
# 沿着该方向寻找跳点
jump_point = self._jump(current[0], current[1], dx, dy, goal)
if jump_point:
move_cost = self._calculate_move_cost(current, jump_point)
successors.append((jump_point, move_cost))
else:
# 当前节点是起点,检查所有方向
directions = [
(-1, 0), (1, 0), (0, -1), (0, 1), # 上下左右
(-1, -1), (-1, 1), (1, -1), (1, 1) # 对角线
]
for dx, dy in directions:
jump_point = self._jump(current[0], current[1], dx, dy, goal)
if jump_point:
move_cost = self._calculate_move_cost(current, jump_point)
successors.append((jump_point, move_cost))
return successors
def _jump(self, x: int, y: int, dx: int, dy: int, goal: Tuple[int, int]) -> Optional[Tuple[int, int]]:
"""
沿着给定方向跳跃寻找跳点
Args:
x, y: 当前位置
dx, dy: 跳跃方向
goal: 目标位置
Returns:
跳点坐标如果没有找到则返回None
"""
# 计算下一个位置
next_x = x + dx
next_y = y + dy
# 检查边界
if not (0 <= next_x < self.rows and 0 <= next_y < self.cols):
return None
# 检查障碍物
if self.grid[next_x][next_y] == 1:
return None
# 如果到达目标点
if (next_x, next_y) == goal:
return (next_x, next_y)
# 检查是否为跳点(对于对角线移动)
if dx != 0 and dy != 0:
# 检查水平和垂直方向是否有强制邻居
if (self._has_forced_neighbor(next_x, next_y, -dx, dy) or
self._has_forced_neighbor(next_x, next_y, dx, -dy)):
return (next_x, next_y)
# 检查是否为跳点(对于直线移动)
else:
# 检查是否有强制邻居
if self._has_forced_neighbor(next_x, next_y, dx, dy):
return (next_x, next_y)
# 如果是直线移动,继续递归跳跃
if dx == 0 or dy == 0:
return self._jump(next_x, next_y, dx, dy, goal)
# 如果是对角线移动,检查水平和垂直分量
else:
# 检查对角线方向的跳点
if (self._jump(next_x, next_y, dx, 0, goal) is not None or
self._jump(next_x, next_y, 0, dy, goal) is not None):
return (next_x, next_y)
# 继续对角线跳跃
return self._jump(next_x, next_y, dx, dy, goal)
def _has_forced_neighbor(self, x: int, y: int, dx: int, dy: int) -> bool:
"""
检查是否存在强制邻居
Args:
x, y: 当前位置
dx, dy: 检查方向
Returns:
是否存在强制邻居
"""
# 检查对角线方向
if dx != 0 and dy != 0:
# 检查对角线方向的强制邻居
if (self._is_blocked(x - dx, y) and not self._is_blocked(x - dx, y + dy)) or \
(self._is_blocked(x, y - dy) and not self._is_blocked(x + dx, y - dy)):
return True
# 检查直线方向
else:
# 水平移动
if dx == 0:
if (self._is_blocked(x - 1, y) and not self._is_blocked(x - 1, y + dy)) or \
(self._is_blocked(x + 1, y) and not self._is_blocked(x + 1, y + dy)):
return True
# 垂直移动
else:
if (self._is_blocked(x, y - 1) and not self._is_blocked(x + dx, y - 1)) or \
(self._is_blocked(x, y + 1) and not self._is_blocked(x + dx, y + 1)):
return True
return False
def _is_blocked(self, x: int, y: int) -> bool:
"""
检查位置是否被阻挡
Args:
x, y: 位置坐标
Returns:
是否被阻挡
"""
return not (0 <= x < self.rows and 0 <= y < self.cols) or self.grid[x][y] == 1
def _calculate_move_cost(self, point1: Tuple[int, int], point2: Tuple[int, int]) -> float:
"""
计算移动代价
Args:
point1: 起点
point2: 终点
Returns:
移动代价
"""
dx = abs(point1[0] - point2[0])
dy = abs(point1[1] - point2[1])
if dx != 0 and dy != 0: # 对角线移动
return max(dx, dy) * 1.414
else: # 直线移动
return max(dx, dy) * 1.0
def _reconstruct_path(self, parent: Dict[Tuple[int, int], Tuple[int, int]], current: Tuple[int, int]) -> List[Tuple[int, int]]:
"""
重构路径
Args:
parent: 父节点字典
current: 当前节点
Returns:
路径坐标列表
"""
path = [current]
while current in parent:
current = parent[current]
path.append(current)
path.reverse()
return path
def get_stats(self) -> Dict[str, int]:
"""
获取算法统计信息
Returns:
统计信息字典
"""
return self.stats.copy()