#!/usr/bin/env python3 """ Path Optimizer Module 路径优化模块,对RRT*生成的路径进行平滑和优化。 包括路径简化、平滑处理、速度规划等功能。 错误处理:失败立即抛出异常,无后备方案 """ import numpy as np from typing import List, Tuple, Optional, Dict, Any # 路径优化参数 SHORTCUT_ITERATIONS = 5 MAX_SHORTCUT_DISTANCE = 0.15 DENSIFICATION_STEP = 0.05 SMOOTHING_FACTOR = 0.5 class PathOptimizer: """路径优化器""" def __init__(self, arm_controller, config_loader): """ 初始化路径优化器 Args: arm_controller: 机械臂控制器 config_loader: 配置加载器 """ self.arm_controller = arm_controller self.config_loader = config_loader # 优化参数(使用文件内常量) self.shortcut_iterations = SHORTCUT_ITERATIONS self.max_shortcut_distance = MAX_SHORTCUT_DISTANCE self.densification_step = DENSIFICATION_STEP self.smoothing_factor = SMOOTHING_FACTOR # 从配置读取跨文件共享参数 config = config_loader.get_full_config() execution_config = config['path_planning']['execution'] self.velocity_scaling = execution_config['velocity_scaling'] self.position_tolerance = execution_config['position_tolerance'] collision_config = config['path_planning']['collision'] self.check_resolution = collision_config['check_resolution'] # 获取关节数量 self.dof = self.arm_controller.kinematics_engine.get_num_joints() # 路径最小点数(起点+终点) self.min_path_points = len(["start", "end"]) # 从逻辑推导:路径至少需要起点和终点 def optimize_path(self, path: List[List[float]], collision_checker) -> List[List[float]]: """ 优化路径 Args: path: 原始路径点列表 collision_checker: 碰撞检测器 Returns: 优化后的路径点列表 Raises: ValueError: 路径无效 RuntimeError: 优化失败 """ if len(path) < 2: raise ValueError("Path must have at least 2 points") # 步骤1: 路径简化(移除冗余点) simplified = self._simplify_path(path, collision_checker) # 步骤2: 捷径优化(限制距离) shortcut = self._shortcut_path(simplified, collision_checker) # 步骤3: 路径密集化(保证轨迹贴合) dense = self._densify_path(shortcut, collision_checker) # 步骤4: 路径平滑 smoothed = self._smooth_path(dense, collision_checker) return smoothed def _simplify_path(self, path: List[List[float]], collision_checker) -> List[List[float]]: """ 简化路径,移除不必要的中间点 Args: path: 原始路径 collision_checker: 碰撞检测器 Returns: 简化后的路径 """ if len(path) <= 2: return path simplified = [path[0]] current_idx = 0 while current_idx < len(path) - 1: # 找到从当前点能直接到达的最远点 farthest_idx = current_idx + 1 for idx in range(current_idx + 2, len(path)): # 检查距离限制 distance = np.linalg.norm( np.array(path[idx]) - np.array(path[current_idx]) ) if distance > self.max_shortcut_distance: break # 检查直接连接是否无碰撞 if self._is_edge_collision_free( path[current_idx], path[idx], collision_checker ): farthest_idx = idx else: break simplified.append(path[farthest_idx]) current_idx = farthest_idx return simplified def _shortcut_path(self, path: List[List[float]], collision_checker) -> List[List[float]]: """ 捷径优化,尝试直接连接不相邻的点 Args: path: 输入路径 collision_checker: 碰撞检测器 Returns: 优化后的路径 """ optimized = path.copy() for _ in range(self.shortcut_iterations): if len(optimized) <= 2: break # 随机选择两个不相邻的点 i = np.random.randint(0, len(optimized) - 2) j = np.random.randint(i + 2, len(optimized)) # 检查距离限制 distance = np.linalg.norm( np.array(optimized[j]) - np.array(optimized[i]) ) if distance > self.max_shortcut_distance: continue # 尝试直接连接 if self._is_edge_collision_free( optimized[i], optimized[j], collision_checker ): # 删除中间点 optimized = optimized[:i+1] + optimized[j:] return optimized def _densify_path(self, path: List[List[float]], collision_checker) -> List[List[float]]: """ 密集化路径,确保相邻点间距小于阈值 Args: path: 输入路径 collision_checker: 碰撞检测器 Returns: 密集化后的路径 """ if len(path) <= 2: return path densified = [path[0]] for i in range(len(path) - 1): current = np.array(path[i]) next_point = np.array(path[i + 1]) # 计算两点间距离 distance = np.linalg.norm(next_point - current) # 如果距离超过阈值,插入中间点 if distance > self.densification_step: num_segments = int(np.ceil(distance / self.densification_step)) for j in range(1, num_segments): ratio = j / num_segments interpolated = current + ratio * (next_point - current) # 检查插值点是否有碰撞 if not collision_checker.check_collision(interpolated.tolist()): densified.append(interpolated.tolist()) else: raise RuntimeError(f"Densification created collision at segment {i}") densified.append(path[i + 1]) return densified def _smooth_path(self, path: List[List[float]], collision_checker) -> List[List[float]]: """ 平滑路径 Args: path: 输入路径 collision_checker: 碰撞检测器 Returns: 平滑后的路径 """ if len(path) <= 2: return path # 使用三次样条插值进行平滑 path_array = np.array(path) num_points = len(path) # 创建参数化变量(基于路径长度) distances = [0] for i in range(1, num_points): dist = np.linalg.norm(path_array[i] - path_array[i-1]) distances.append(distances[-1] + dist) # 归一化距离 total_distance = distances[-1] if total_distance < self.position_tolerance: return path t_original = np.array(distances) / total_distance # 计算平滑后的点数(基于平滑因子) num_smooth_points = max(self.min_path_points, int(num_points * (1.0 + self.smoothing_factor))) t_smooth = np.linspace(0, 1, num_smooth_points) # 对每个关节维度进行插值 smoothed_path = [] for t in t_smooth: # 线性插值(保证路径可行性) idx = np.searchsorted(t_original, t) if idx == 0: point = path[0] elif idx >= len(path): point = path[-1] else: # 在两点间线性插值 t0 = t_original[idx-1] t1 = t_original[idx] alpha = (t - t0) / (t1 - t0) if t1 > t0 else 0 point = (1 - alpha) * np.array(path[idx-1]) + alpha * np.array(path[idx]) # 验证点的有效性 # 确保 point 是列表格式 if isinstance(point, np.ndarray): point = point.tolist() elif not isinstance(point, list): point = list(point) if not collision_checker.check_collision(point): smoothed_path.append(point) # 验证平滑结果 if len(smoothed_path) < 2: raise RuntimeError(f"Path smoothing failed: only {len(smoothed_path)} valid points generated") return smoothed_path def _is_edge_collision_free(self, start: List[float], end: List[float], collision_checker) -> bool: """ 检查两点间的边是否无碰撞 Args: start: 起点配置 end: 终点配置 collision_checker: 碰撞检测器 Returns: True if edge is collision free """ start_array = np.array(start) end_array = np.array(end) distance = np.linalg.norm(end_array - start_array) # 根据分辨率计算检查点数 num_checks = max(self.min_path_points, int(distance / self.check_resolution)) for i in range(num_checks + 1): t = i / num_checks interpolated = start_array + t * (end_array - start_array) if collision_checker.check_collision(interpolated.tolist()): return False return True def add_velocity_profile(self, path: List[List[float]]) -> List[Dict[str, Any]]: """ 为路径添加速度规划 Args: path: 路径点列表 Returns: 带速度信息的路径点列表 """ if len(path) < 2: raise ValueError("Path must have at least 2 points") trajectory = [] # 计算总路径长度 total_distance = 0 distances = [0] for i in range(1, len(path)): dist = np.linalg.norm( np.array(path[i]) - np.array(path[i-1]) ) total_distance += dist distances.append(total_distance) # 计算总时间(基于速度缩放) total_time = total_distance / self.velocity_scaling # 为每个点分配时间戳和速度 for i, point in enumerate(path): # 时间戳 if i == 0: timestamp = 0 elif i == len(path) - 1: timestamp = total_time else: timestamp = (distances[i] / total_distance) * total_time # 速度(除起止点外) if i == 0 or i == len(path) - 1: velocity = [0.0] * self.dof else: # 计算前后段的方向 prev_dir = np.array(path[i]) - np.array(path[i-1]) next_dir = np.array(path[i+1]) - np.array(path[i]) # 平均速度方向 avg_dir = (prev_dir + next_dir) / 2 # 归一化并应用速度缩放 norm = np.linalg.norm(avg_dir) if norm > self.position_tolerance: velocity = (avg_dir / norm * self.velocity_scaling).tolist() else: velocity = [0.0] * self.dof trajectory.append({ 'position': point, 'velocity': velocity, 'timestamp': timestamp }) return trajectory def validate_trajectory(self, trajectory: List[Dict[str, Any]]) -> Tuple[bool, str]: """ 验证轨迹的有效性 Args: trajectory: 轨迹点列表 Returns: (是否有效, 错误信息) """ if len(trajectory) < 2: return False, "Trajectory must have at least 2 points" # 检查时间戳单调递增 for i in range(1, len(trajectory)): if trajectory[i]['timestamp'] <= trajectory[i-1]['timestamp']: return False, f"Non-monotonic timestamps at index {i}" # 检查关节限位 for i, point in enumerate(trajectory): is_valid, violations = self.arm_controller.check_joint_limits( point['position'] ) if not is_valid: return False, f"Joint limit violations at point {i}: {violations}" # 检查速度连续性 for i in range(1, len(trajectory) - 1): prev_vel = np.array(trajectory[i-1]['velocity']) curr_vel = np.array(trajectory[i]['velocity']) next_vel = np.array(trajectory[i+1]['velocity']) # 速度变化不应过大(基于配置的容差) max_change = self.velocity_scaling * self.smoothing_factor if np.linalg.norm(curr_vel - prev_vel) > max_change: return False, f"Velocity discontinuity at point {i}" if np.linalg.norm(next_vel - curr_vel) > max_change: return False, f"Velocity discontinuity at point {i+1}" return True, ""