290 lines
10 KiB
Python
290 lines
10 KiB
Python
"""
|
||
JPS算法实现
|
||
Jump Point Search (JPS) 是一种优化的A*算法,专门用于网格地图
|
||
"""
|
||
|
||
import heapq
|
||
from typing import List, Tuple, Optional, Dict, Set
|
||
|
||
class JPS:
|
||
"""
|
||
JPS算法实现
|
||
Jump Point Search (JPS) 是一种优化的A*算法,专门用于网格地图
|
||
"""
|
||
|
||
def __init__(self):
|
||
"""初始化JPS算法"""
|
||
self.stats = {
|
||
'nodes_visited': 0,
|
||
'max_queue_size': 0
|
||
}
|
||
|
||
def find_path(self, grid: List[List[int]], start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[List[Tuple[int, int]]]:
|
||
"""
|
||
使用JPS算法查找路径
|
||
|
||
Args:
|
||
grid: 网格地图,0表示可通行,1表示障碍物
|
||
start: 起点坐标 (row, col)
|
||
goal: 终点坐标 (row, col)
|
||
|
||
Returns:
|
||
路径坐标列表,如果找不到路径则返回None
|
||
"""
|
||
# 重置统计信息
|
||
self.stats = {
|
||
'nodes_visited': 0,
|
||
'max_queue_size': 0
|
||
}
|
||
|
||
# 获取网格尺寸
|
||
self.rows = len(grid)
|
||
self.cols = len(grid[0]) if self.rows > 0 else 0
|
||
self.grid = grid
|
||
|
||
# 检查起点和终点是否有效
|
||
if not (0 <= start[0] < self.rows and 0 <= start[1] < self.cols):
|
||
return None
|
||
if not (0 <= goal[0] < self.rows and 0 <= goal[1] < self.cols):
|
||
return None
|
||
if grid[start[0]][start[1]] == 1 or grid[goal[0]][goal[1]] == 1:
|
||
return None
|
||
|
||
# 初始化开放列表(优先队列)
|
||
open_list = []
|
||
heapq.heappush(open_list, (0, start))
|
||
|
||
# 初始化关闭列表
|
||
closed_list: Set[Tuple[int, int]] = set()
|
||
|
||
# 初始化距离和父节点字典
|
||
g_score: Dict[Tuple[int, int], float] = {start: 0}
|
||
f_score: Dict[Tuple[int, int], float] = {start: self._manhattan_distance(start, goal)}
|
||
parent: Dict[Tuple[int, int], Tuple[int, int]] = {}
|
||
|
||
while open_list:
|
||
# 更新统计信息
|
||
self.stats['nodes_visited'] += 1
|
||
self.stats['max_queue_size'] = max(self.stats['max_queue_size'], len(open_list))
|
||
|
||
# 取出f_score最小的节点
|
||
current = heapq.heappop(open_list)[1]
|
||
|
||
# 如果到达目标节点
|
||
if current == goal:
|
||
# 重构路径
|
||
path = self._reconstruct_path(parent, current)
|
||
return path
|
||
|
||
# 将当前节点加入关闭列表
|
||
closed_list.add(current)
|
||
|
||
# 获取当前节点的跳点邻居
|
||
successors = self._get_jump_point_successors(current, goal, g_score[current], parent)
|
||
|
||
# 处理所有跳点邻居
|
||
for successor, move_cost in successors:
|
||
if successor in closed_list:
|
||
continue
|
||
|
||
tentative_g_score = g_score[current] + move_cost
|
||
|
||
if successor not in g_score or tentative_g_score < g_score[successor]:
|
||
parent[successor] = current
|
||
g_score[successor] = tentative_g_score
|
||
f_score[successor] = g_score[successor] + self._manhattan_distance(successor, goal)
|
||
heapq.heappush(open_list, (f_score[successor], successor))
|
||
|
||
# 未找到路径
|
||
return None
|
||
|
||
def _manhattan_distance(self, point1: Tuple[int, int], point2: Tuple[int, int]) -> float:
|
||
"""计算曼哈顿距离"""
|
||
return abs(point1[0] - point2[0]) + abs(point1[1] - point2[1])
|
||
|
||
def _get_jump_point_successors(self, current: Tuple[int, int], goal: Tuple[int, int],
|
||
current_g: float, parent: Dict[Tuple[int, int], Tuple[int, int]]) -> List[Tuple[Tuple[int, int], float]]:
|
||
"""
|
||
获取跳点邻居
|
||
|
||
Args:
|
||
current: 当前节点
|
||
goal: 目标节点
|
||
current_g: 当前节点的g值
|
||
parent: 父节点字典
|
||
|
||
Returns:
|
||
跳点邻居列表 [(节点, 移动代价), ...]
|
||
"""
|
||
successors = []
|
||
|
||
# 如果当前节点有父节点,则沿着父节点方向寻找跳点
|
||
if current in parent:
|
||
parent_node = parent[current]
|
||
dx = current[0] - parent_node[0]
|
||
dy = current[1] - parent_node[1]
|
||
|
||
# 标准化方向
|
||
dx = 1 if dx > 0 else (-1 if dx < 0 else 0)
|
||
dy = 1 if dy > 0 else (-1 if dy < 0 else 0)
|
||
|
||
# 沿着该方向寻找跳点
|
||
jump_point = self._jump(current[0], current[1], dx, dy, goal)
|
||
if jump_point:
|
||
move_cost = self._calculate_move_cost(current, jump_point)
|
||
successors.append((jump_point, move_cost))
|
||
else:
|
||
# 当前节点是起点,检查所有方向
|
||
directions = [
|
||
(-1, 0), (1, 0), (0, -1), (0, 1), # 上下左右
|
||
(-1, -1), (-1, 1), (1, -1), (1, 1) # 对角线
|
||
]
|
||
|
||
for dx, dy in directions:
|
||
jump_point = self._jump(current[0], current[1], dx, dy, goal)
|
||
if jump_point:
|
||
move_cost = self._calculate_move_cost(current, jump_point)
|
||
successors.append((jump_point, move_cost))
|
||
|
||
return successors
|
||
|
||
def _jump(self, x: int, y: int, dx: int, dy: int, goal: Tuple[int, int]) -> Optional[Tuple[int, int]]:
|
||
"""
|
||
沿着给定方向跳跃寻找跳点
|
||
|
||
Args:
|
||
x, y: 当前位置
|
||
dx, dy: 跳跃方向
|
||
goal: 目标位置
|
||
|
||
Returns:
|
||
跳点坐标,如果没有找到则返回None
|
||
"""
|
||
# 计算下一个位置
|
||
next_x = x + dx
|
||
next_y = y + dy
|
||
|
||
# 检查边界
|
||
if not (0 <= next_x < self.rows and 0 <= next_y < self.cols):
|
||
return None
|
||
|
||
# 检查障碍物
|
||
if self.grid[next_x][next_y] == 1:
|
||
return None
|
||
|
||
# 如果到达目标点
|
||
if (next_x, next_y) == goal:
|
||
return (next_x, next_y)
|
||
|
||
# 检查是否为跳点(对于对角线移动)
|
||
if dx != 0 and dy != 0:
|
||
# 检查水平和垂直方向是否有强制邻居
|
||
if (self._has_forced_neighbor(next_x, next_y, -dx, dy) or
|
||
self._has_forced_neighbor(next_x, next_y, dx, -dy)):
|
||
return (next_x, next_y)
|
||
# 检查是否为跳点(对于直线移动)
|
||
else:
|
||
# 检查是否有强制邻居
|
||
if self._has_forced_neighbor(next_x, next_y, dx, dy):
|
||
return (next_x, next_y)
|
||
|
||
# 如果是直线移动,继续递归跳跃
|
||
if dx == 0 or dy == 0:
|
||
return self._jump(next_x, next_y, dx, dy, goal)
|
||
# 如果是对角线移动,检查水平和垂直分量
|
||
else:
|
||
# 检查对角线方向的跳点
|
||
if (self._jump(next_x, next_y, dx, 0, goal) is not None or
|
||
self._jump(next_x, next_y, 0, dy, goal) is not None):
|
||
return (next_x, next_y)
|
||
# 继续对角线跳跃
|
||
return self._jump(next_x, next_y, dx, dy, goal)
|
||
|
||
def _has_forced_neighbor(self, x: int, y: int, dx: int, dy: int) -> bool:
|
||
"""
|
||
检查是否存在强制邻居
|
||
|
||
Args:
|
||
x, y: 当前位置
|
||
dx, dy: 检查方向
|
||
|
||
Returns:
|
||
是否存在强制邻居
|
||
"""
|
||
# 检查对角线方向
|
||
if dx != 0 and dy != 0:
|
||
# 检查对角线方向的强制邻居
|
||
if (self._is_blocked(x - dx, y) and not self._is_blocked(x - dx, y + dy)) or \
|
||
(self._is_blocked(x, y - dy) and not self._is_blocked(x + dx, y - dy)):
|
||
return True
|
||
# 检查直线方向
|
||
else:
|
||
# 水平移动
|
||
if dx == 0:
|
||
if (self._is_blocked(x - 1, y) and not self._is_blocked(x - 1, y + dy)) or \
|
||
(self._is_blocked(x + 1, y) and not self._is_blocked(x + 1, y + dy)):
|
||
return True
|
||
# 垂直移动
|
||
else:
|
||
if (self._is_blocked(x, y - 1) and not self._is_blocked(x + dx, y - 1)) or \
|
||
(self._is_blocked(x, y + 1) and not self._is_blocked(x + dx, y + 1)):
|
||
return True
|
||
return False
|
||
|
||
def _is_blocked(self, x: int, y: int) -> bool:
|
||
"""
|
||
检查位置是否被阻挡
|
||
|
||
Args:
|
||
x, y: 位置坐标
|
||
|
||
Returns:
|
||
是否被阻挡
|
||
"""
|
||
return not (0 <= x < self.rows and 0 <= y < self.cols) or self.grid[x][y] == 1
|
||
|
||
def _calculate_move_cost(self, point1: Tuple[int, int], point2: Tuple[int, int]) -> float:
|
||
"""
|
||
计算移动代价
|
||
|
||
Args:
|
||
point1: 起点
|
||
point2: 终点
|
||
|
||
Returns:
|
||
移动代价
|
||
"""
|
||
dx = abs(point1[0] - point2[0])
|
||
dy = abs(point1[1] - point2[1])
|
||
|
||
if dx != 0 and dy != 0: # 对角线移动
|
||
return max(dx, dy) * 1.414
|
||
else: # 直线移动
|
||
return max(dx, dy) * 1.0
|
||
|
||
def _reconstruct_path(self, parent: Dict[Tuple[int, int], Tuple[int, int]], current: Tuple[int, int]) -> List[Tuple[int, int]]:
|
||
"""
|
||
重构路径
|
||
|
||
Args:
|
||
parent: 父节点字典
|
||
current: 当前节点
|
||
|
||
Returns:
|
||
路径坐标列表
|
||
"""
|
||
path = [current]
|
||
while current in parent:
|
||
current = parent[current]
|
||
path.append(current)
|
||
path.reverse()
|
||
return path
|
||
|
||
def get_stats(self) -> Dict[str, int]:
|
||
"""
|
||
获取算法统计信息
|
||
|
||
Returns:
|
||
统计信息字典
|
||
"""
|
||
return self.stats.copy() |