""" JPS算法实现 Jump Point Search (JPS) 是一种优化的A*算法,专门用于网格地图 """ import heapq from typing import List, Tuple, Optional, Dict, Set class JPS: """ JPS算法实现 Jump Point Search (JPS) 是一种优化的A*算法,专门用于网格地图 """ def __init__(self): """初始化JPS算法""" self.stats = { 'nodes_visited': 0, 'max_queue_size': 0 } def find_path(self, grid: List[List[int]], start: Tuple[int, int], goal: Tuple[int, int]) -> Optional[List[Tuple[int, int]]]: """ 使用JPS算法查找路径 Args: grid: 网格地图,0表示可通行,1表示障碍物 start: 起点坐标 (row, col) goal: 终点坐标 (row, col) Returns: 路径坐标列表,如果找不到路径则返回None """ # 重置统计信息 self.stats = { 'nodes_visited': 0, 'max_queue_size': 0 } # 获取网格尺寸 self.rows = len(grid) self.cols = len(grid[0]) if self.rows > 0 else 0 self.grid = grid # 检查起点和终点是否有效 if not (0 <= start[0] < self.rows and 0 <= start[1] < self.cols): return None if not (0 <= goal[0] < self.rows and 0 <= goal[1] < self.cols): return None if grid[start[0]][start[1]] == 1 or grid[goal[0]][goal[1]] == 1: return None # 初始化开放列表(优先队列) open_list = [] heapq.heappush(open_list, (0, start)) # 初始化关闭列表 closed_list: Set[Tuple[int, int]] = set() # 初始化距离和父节点字典 g_score: Dict[Tuple[int, int], float] = {start: 0} f_score: Dict[Tuple[int, int], float] = {start: self._manhattan_distance(start, goal)} parent: Dict[Tuple[int, int], Tuple[int, int]] = {} while open_list: # 更新统计信息 self.stats['nodes_visited'] += 1 self.stats['max_queue_size'] = max(self.stats['max_queue_size'], len(open_list)) # 取出f_score最小的节点 current = heapq.heappop(open_list)[1] # 如果到达目标节点 if current == goal: # 重构路径 path = self._reconstruct_path(parent, current) return path # 将当前节点加入关闭列表 closed_list.add(current) # 获取当前节点的跳点邻居 successors = self._get_jump_point_successors(current, goal, g_score[current], parent) # 处理所有跳点邻居 for successor, move_cost in successors: if successor in closed_list: continue tentative_g_score = g_score[current] + move_cost if successor not in g_score or tentative_g_score < g_score[successor]: parent[successor] = current g_score[successor] = tentative_g_score f_score[successor] = g_score[successor] + self._manhattan_distance(successor, goal) heapq.heappush(open_list, (f_score[successor], successor)) # 未找到路径 return None def _manhattan_distance(self, point1: Tuple[int, int], point2: Tuple[int, int]) -> float: """计算曼哈顿距离""" return abs(point1[0] - point2[0]) + abs(point1[1] - point2[1]) def _get_jump_point_successors(self, current: Tuple[int, int], goal: Tuple[int, int], current_g: float, parent: Dict[Tuple[int, int], Tuple[int, int]]) -> List[Tuple[Tuple[int, int], float]]: """ 获取跳点邻居 Args: current: 当前节点 goal: 目标节点 current_g: 当前节点的g值 parent: 父节点字典 Returns: 跳点邻居列表 [(节点, 移动代价), ...] """ successors = [] # 如果当前节点有父节点,则沿着父节点方向寻找跳点 if current in parent: parent_node = parent[current] dx = current[0] - parent_node[0] dy = current[1] - parent_node[1] # 标准化方向 dx = 1 if dx > 0 else (-1 if dx < 0 else 0) dy = 1 if dy > 0 else (-1 if dy < 0 else 0) # 沿着该方向寻找跳点 jump_point = self._jump(current[0], current[1], dx, dy, goal) if jump_point: move_cost = self._calculate_move_cost(current, jump_point) successors.append((jump_point, move_cost)) else: # 当前节点是起点,检查所有方向 directions = [ (-1, 0), (1, 0), (0, -1), (0, 1), # 上下左右 (-1, -1), (-1, 1), (1, -1), (1, 1) # 对角线 ] for dx, dy in directions: jump_point = self._jump(current[0], current[1], dx, dy, goal) if jump_point: move_cost = self._calculate_move_cost(current, jump_point) successors.append((jump_point, move_cost)) return successors def _jump(self, x: int, y: int, dx: int, dy: int, goal: Tuple[int, int]) -> Optional[Tuple[int, int]]: """ 沿着给定方向跳跃寻找跳点 Args: x, y: 当前位置 dx, dy: 跳跃方向 goal: 目标位置 Returns: 跳点坐标,如果没有找到则返回None """ # 计算下一个位置 next_x = x + dx next_y = y + dy # 检查边界 if not (0 <= next_x < self.rows and 0 <= next_y < self.cols): return None # 检查障碍物 if self.grid[next_x][next_y] == 1: return None # 如果到达目标点 if (next_x, next_y) == goal: return (next_x, next_y) # 检查是否为跳点(对于对角线移动) if dx != 0 and dy != 0: # 检查水平和垂直方向是否有强制邻居 if (self._has_forced_neighbor(next_x, next_y, -dx, dy) or self._has_forced_neighbor(next_x, next_y, dx, -dy)): return (next_x, next_y) # 检查是否为跳点(对于直线移动) else: # 检查是否有强制邻居 if self._has_forced_neighbor(next_x, next_y, dx, dy): return (next_x, next_y) # 如果是直线移动,继续递归跳跃 if dx == 0 or dy == 0: return self._jump(next_x, next_y, dx, dy, goal) # 如果是对角线移动,检查水平和垂直分量 else: # 检查对角线方向的跳点 if (self._jump(next_x, next_y, dx, 0, goal) is not None or self._jump(next_x, next_y, 0, dy, goal) is not None): return (next_x, next_y) # 继续对角线跳跃 return self._jump(next_x, next_y, dx, dy, goal) def _has_forced_neighbor(self, x: int, y: int, dx: int, dy: int) -> bool: """ 检查是否存在强制邻居 Args: x, y: 当前位置 dx, dy: 检查方向 Returns: 是否存在强制邻居 """ # 检查对角线方向 if dx != 0 and dy != 0: # 检查对角线方向的强制邻居 if (self._is_blocked(x - dx, y) and not self._is_blocked(x - dx, y + dy)) or \ (self._is_blocked(x, y - dy) and not self._is_blocked(x + dx, y - dy)): return True # 检查直线方向 else: # 水平移动 if dx == 0: if (self._is_blocked(x - 1, y) and not self._is_blocked(x - 1, y + dy)) or \ (self._is_blocked(x + 1, y) and not self._is_blocked(x + 1, y + dy)): return True # 垂直移动 else: if (self._is_blocked(x, y - 1) and not self._is_blocked(x + dx, y - 1)) or \ (self._is_blocked(x, y + 1) and not self._is_blocked(x + dx, y + 1)): return True return False def _is_blocked(self, x: int, y: int) -> bool: """ 检查位置是否被阻挡 Args: x, y: 位置坐标 Returns: 是否被阻挡 """ return not (0 <= x < self.rows and 0 <= y < self.cols) or self.grid[x][y] == 1 def _calculate_move_cost(self, point1: Tuple[int, int], point2: Tuple[int, int]) -> float: """ 计算移动代价 Args: point1: 起点 point2: 终点 Returns: 移动代价 """ dx = abs(point1[0] - point2[0]) dy = abs(point1[1] - point2[1]) if dx != 0 and dy != 0: # 对角线移动 return max(dx, dy) * 1.414 else: # 直线移动 return max(dx, dy) * 1.0 def _reconstruct_path(self, parent: Dict[Tuple[int, int], Tuple[int, int]], current: Tuple[int, int]) -> List[Tuple[int, int]]: """ 重构路径 Args: parent: 父节点字典 current: 当前节点 Returns: 路径坐标列表 """ path = [current] while current in parent: current = parent[current] path.append(current) path.reverse() return path def get_stats(self) -> Dict[str, int]: """ 获取算法统计信息 Returns: 统计信息字典 """ return self.stats.copy()