from direct.showbase.ShowBaseGlobal import globalClock from direct.task.TaskManagerGlobal import taskMgr from panda3d.core import Point3, Vec3 import math class PatrolSystem: """巡检系统类""" def __init__(self, world): """初始化巡检系统 Args: world: 核心世界对象引用 """ self.world = world # 巡检状态 self.is_patrolling = False self.patrol_points = [] # 巡检点列表 [(pos, hpr, wait_time), ...] self.current_patrol_index = 0 self.patrol_task = None # 巡检参数 self.patrol_speed = 5.0 # 巡检移动速度(单位/秒) self.patrol_turn_speed = 90.0 # 转向速度(度/秒) self.patrol_wait_timer = 0.0 self.patrol_state = "moving" # "moving", "turning_to_target", "waiting", "turning_back" # 相机状态保存 self.original_cam_pos = None self.original_cam_hpr = None print("✓ 巡检系统初始化完成") def add_patrol_point(self, position, heading=None, wait_time=3.0): if heading is None: if self.patrol_points: last_pos = self.patrol_points[-1][0] direction_x = position[0] - last_pos.x direction_y = position[1] - last_pos.y direction_z = position[2] - last_pos.z import math h=math.degrees(math.atan2(-direction_x,-direction_y)) distance_xy = math.sqrt(direction_x**2+direction_y**2) p = math.degrees(math.atan2(direction_z,distance_xy)) p = max(-89,min(89,p)) r=0 heading = (h,p,r) else: # 使用当前相机朝向 current_hpr = self.world.cam.getHpr() heading = (current_hpr.x, current_hpr.y, current_hpr.z) pos = Point3(position[0], position[1], position[2]) hpr = Vec3(heading[0], heading[1], heading[2]) self.patrol_points.append((pos, hpr, wait_time)) print(f"✓ 添加巡检点 {len(self.patrol_points)}: 位置{position}, 朝向{heading}, 停留{wait_time}秒") # 在 PatrolSystem 类中添加以下方法 def add_auto_heading_patrol_point(self, position, wait_time=3.0): """添加自动计算朝向的巡检点(朝向路径前进方向) Args: position: 相机位置 (x, y, z) wait_time: 在该点停留时间(秒) """ heading = None # 将自动计算朝向 # 复用原有的 add_patrol_point 方法 self.add_patrol_point(position, heading, wait_time) def add_patrol_point_looking_at(self, position, look_at_position, wait_time=3.0): """添加朝向指定位置的巡检点 Args: position: 相机位置 (x, y, z) look_at_position: 相机朝向的目标位置 (x, y, z) wait_time: 在该点停留时间(秒) """ import math # 计算从当前位置到目标位置的方向向量 direction_x = look_at_position[0] - position[0] direction_y = look_at_position[1] - position[1] direction_z = look_at_position[2] - position[2] # 计算HPR朝向 h = math.degrees(math.atan2(-direction_x, -direction_y)) distance_xy = math.sqrt(direction_x ** 2 + direction_y ** 2) p = math.degrees(math.atan2(direction_z, distance_xy)) p = max(-89, min(89, p)) # 限制pitch角度在合理范围内 r = 0 # roll通常为0 heading = (h, p, r) self.add_patrol_point(position, heading, wait_time) def clear_patrol_points(self): """清空所有巡检点""" self.patrol_points = [] print("✓ 巡检点已清空") def set_patrol_speed(self, move_speed, turn_speed=None): """设置巡检速度 Args: move_speed: 移动速度(单位/秒) turn_speed: 转向速度(度/秒),如果为None则保持当前值 """ self.patrol_speed = move_speed if turn_speed is not None: self.patrol_turn_speed = turn_speed print(f"✓ 巡检速度已设置: 移动{move_speed}, 转向{turn_speed or self.patrol_turn_speed}") def start_patrol(self): """开始巡检""" if not self.patrol_points: print("✗ 没有设置巡检点,无法开始巡检") return False if self.is_patrolling: print("⚠ 巡检已在进行中") return True # 保存当前相机状态 self.original_cam_pos = Point3(self.world.cam.getPos()) self.original_cam_hpr = Vec3(self.world.cam.getHpr()) # 重置巡检状态 self.current_patrol_index = 0 self.patrol_state = "moving" self.patrol_wait_timer = 0.0 self.is_patrolling = True # 启动巡检任务 if self.patrol_task: taskMgr.remove(self.patrol_task) self.patrol_task = taskMgr.add(self._patrol_task, "patrol_task") print(f"✓ 开始巡检,共{len(self.patrol_points)}个巡检点") return True def stop_patrol(self): """停止巡检""" if not self.is_patrolling: print("⚠ 巡检未在进行中") return False # 停止巡检任务 if self.patrol_task: taskMgr.remove(self.patrol_task) self.patrol_task = None self.is_patrolling = False self.patrol_state = "moving" self.patrol_wait_timer = 0.0 print("✓ 巡检已停止") return True def pause_patrol(self): """暂停巡检""" if not self.is_patrolling: print("⚠ 巡检未在进行中") return False if self.patrol_task: taskMgr.remove(self.patrol_task) self.patrol_task = None print("✓ 巡检已暂停") return True def resume_patrol(self): """恢复巡检""" if self.is_patrolling: print("⚠ 巡检已在进行中") return False if not self.patrol_points: print("✗ 没有设置巡检点") return False self.is_patrolling = True self.patrol_task = taskMgr.add(self._patrol_task, "patrol_task") print("✓ 巡检已恢复") return True def reset_to_original_position(self): """重置相机到原始位置""" if self.original_cam_pos and self.original_cam_hpr: self.world.cam.setPos(self.original_cam_pos) self.world.cam.setHpr(self.original_cam_hpr) print("✓ 相机已重置到原始位置") return True else: print("✗ 没有保存的原始位置") return False def _patrol_task(self, task): """巡检主任务""" try: if not self.is_patrolling or not self.patrol_points: return task.done # 获取当前巡检点 current_point = self.patrol_points[self.current_patrol_index] target_pos, target_hpr, wait_time = current_point # 根据当前状态执行不同操作 if self.patrol_state == "moving": self._handle_moving_state(target_pos) elif self.patrol_state == "turning_to_target": self._handle_turning_to_target_state(target_hpr) elif self.patrol_state == "waiting": self._handle_waiting_state(wait_time) elif self.patrol_state == "turning_back": self._handle_turning_back_state() return task.cont except Exception as e: print(f"巡检任务出错: {e}") import traceback traceback.print_exc() return task.done def _handle_moving_state(self, target_pos): """处理移动状态""" current_pos = self.world.cam.getPos() distance = (target_pos - current_pos).length() if distance < 0.1: # 到达目标点 print(f"✓ 到达巡检点 {self.current_patrol_index + 1}") self.patrol_state = "turning_to_target" return # 计算移动方向和距离 direction = target_pos - current_pos direction.normalize() # 计算目标朝向(看向目标点) target_hpr = self._look_at_to_hpr(direction) current_hpr = self.world.cam.getHpr() # 平滑转向到目标朝向 h_diff = self._normalize_angle(target_hpr.x - current_hpr.x) p_diff = self._normalize_angle(target_hpr.y - current_hpr.y) r_diff = self._normalize_angle(target_hpr.z - current_hpr.z) # 计算本帧应转动的角度 dt = globalClock.getDt() turn_amount = self.patrol_turn_speed * dt # 逐步转向目标角度 new_hpr = Vec3(current_hpr) if abs(h_diff) > turn_amount: new_hpr.x += turn_amount if h_diff > 0 else -turn_amount else: new_hpr.x = target_hpr.x if abs(p_diff) > turn_amount: new_hpr.y += turn_amount if p_diff > 0 else -turn_amount else: new_hpr.y = target_hpr.y if abs(r_diff) > turn_amount: new_hpr.z += turn_amount if r_diff > 0 else -turn_amount else: new_hpr.z = target_hpr.z self.world.cam.setHpr(new_hpr) # 计算本帧应移动的距离 move_distance = self.patrol_speed * dt # 如果移动距离大于剩余距离,则直接移动到目标点 if move_distance >= distance: self.world.cam.setPos(target_pos) else: # 否则按方向移动 new_pos = current_pos + direction * move_distance self.world.cam.setPos(new_pos) def _handle_turning_to_target_state(self, target_hpr): """处理转向目标状态""" # 检查是否需要朝向下一个点 if target_hpr == "look_next": # 计算朝向下一个点的方向 next_index = (self.current_patrol_index + 1) % len(self.patrol_points) next_point_pos = self.patrol_points[next_index][0] current_pos = self.world.cam.getPos() direction = next_point_pos - current_pos direction.normalize() # 计算目标朝向 target_hpr = self._look_at_to_hpr(direction) current_hpr = self.world.cam.getHpr() # 计算角度差 h_diff = self._normalize_angle(target_hpr.x - current_hpr.x) p_diff = self._normalize_angle(target_hpr.y - current_hpr.y) r_diff = self._normalize_angle(target_hpr.z - current_hpr.z) # 检查是否已完成转向 if abs(h_diff) < 1.0 and abs(p_diff) < 1.0 and abs(r_diff) < 1.0: print(f"✓ 完成转向,开始停留") self.patrol_state = "waiting" self.patrol_wait_timer = 0.0 return # 计算本帧应转动的角度 dt = globalClock.getDt() turn_amount = self.patrol_turn_speed * dt # 逐步转向目标角度 new_hpr = Vec3(current_hpr) if abs(h_diff) > turn_amount: new_hpr.x += turn_amount if h_diff > 0 else -turn_amount else: new_hpr.x = target_hpr.x if abs(p_diff) > turn_amount: new_hpr.y += turn_amount if p_diff > 0 else -turn_amount else: new_hpr.y = target_hpr.y if abs(r_diff) > turn_amount: new_hpr.z += turn_amount if r_diff > 0 else -turn_amount else: new_hpr.z = target_hpr.z self.world.cam.setHpr(new_hpr) def _handle_waiting_state(self, wait_time): """处理等待状态""" self.patrol_wait_timer += globalClock.getDt() if self.patrol_wait_timer >= wait_time: print(f"✓ 停留结束,准备转回原朝向") self.patrol_state = "turning_back" # 修改 core/patrol_system.py 中的 _handle_turning_back_state 方法 def _handle_turning_back_state(self): """处理转回原朝向状态""" # 直接完成转向状态,进入移动状态 print(f"✓ 停留结束,开始移动到下一个点") # 移动到下一个巡检点 next_index = (self.current_patrol_index + 1) % len(self.patrol_points) self.current_patrol_index = next_index self.patrol_state = "moving" return def _normalize_angle(self, angle): """规范化角度到-180到180度之间""" while angle > 180: angle -= 360 while angle < -180: angle += 360 return angle def _look_at_to_hpr(self, direction): """将方向向量转换为HPR角度""" # 简化的转换,实际应用中可能需要更精确的计算 h = math.degrees(math.atan2(-direction.x, -direction.y)) p = math.degrees(math.asin(direction.z)) return Vec3(h, p, 0) def get_patrol_status(self): """获取巡检状态信息""" return { "is_patrolling": self.is_patrolling, "current_point": self.current_patrol_index, "total_points": len(self.patrol_points), "state": self.patrol_state, "wait_timer": self.patrol_wait_timer } def list_patrol_points(self): """列出所有巡检点""" if not self.patrol_points: print("没有设置巡检点") return print(f"巡检点列表 (共{len(self.patrol_points)}个):") for i, (pos, hpr, wait_time) in enumerate(self.patrol_points): current_marker = " >>>" if i == self.current_patrol_index and self.is_patrolling else "" print(f" {i + 1}. 位置:({pos.x:.1f}, {pos.y:.1f}, {pos.z:.1f}) " f"朝向:({hpr.x:.1f}, {hpr.y:.1f}, {hpr.z:.1f}) " f"停留:{wait_time}秒{current_marker}") def remove_patrol_point(self, index): """移除指定索引的巡检点""" if 0 <= index < len(self.patrol_points): removed_point = self.patrol_points.pop(index) print( f"✓ 移除巡检点 {index + 1}: 位置({removed_point[0].x:.1f}, {removed_point[0].y:.1f}, {removed_point[0].z:.1f})") # 调整当前索引 if self.current_patrol_index >= len(self.patrol_points) and self.patrol_points: self.current_patrol_index = len(self.patrol_points) - 1 elif self.current_patrol_index >= len(self.patrol_points): self.current_patrol_index = 0 else: print(f"✗ 无效的巡检点索引: {index}") def insert_patrol_point(self, index, position, heading=None, wait_time=3.0): """在指定位置插入巡检点""" if index < 0 or index > len(self.patrol_points): print(f"✗ 无效的插入位置: {index}") return if heading is None: # 使用当前相机朝向 current_hpr = self.world.cam.getHpr() heading = (current_hpr.x, current_hpr.y, current_hpr.z) pos = Point3(position[0], position[1], position[2]) hpr = Vec3(heading[0], heading[1], heading[2]) self.patrol_points.insert(index, (pos, hpr, wait_time)) print(f"✓ 在位置 {index + 1} 插入巡检点: 位置{position}, 朝向{heading}, 停留{wait_time}秒") def update_patrol_point(self, index, position=None, heading=None, wait_time=None): """更新指定巡检点的信息""" if 0 <= index < len(self.patrol_points): pos, hpr, wt = self.patrol_points[index] if position is not None: pos = Point3(position[0], position[1], position[2]) if heading is not None: hpr = Vec3(heading[0], heading[1], heading[2]) if wait_time is not None: wt = wait_time self.patrol_points[index] = (pos, hpr, wt) print(f"✓ 更新巡检点 {index + 1}") else: print(f"✗ 无效的巡检点索引: {index}") def goto_patrol_point(self, index): """直接跳转到指定巡检点""" if not self.patrol_points: print("✗ 没有设置巡检点") return False if 0 <= index < len(self.patrol_points): pos, hpr, _ = self.patrol_points[index] self.world.cam.setPos(pos) self.world.cam.setHpr(hpr) self.current_patrol_index = index print(f"✓ 跳转到巡检点 {index + 1}") return True else: print(f"✗ 无效的巡检点索引: {index}") return False def cleanup(self): """清理巡检系统资源""" self.stop_patrol() self.clear_patrol_points() self.original_cam_pos = None self.original_cam_hpr = None print("✓ 巡检系统资源已清理") # 使用示例和便捷函数 def create_default_patrol_route(patrol_system): """创建默认的巡检路线(示例)""" # 清空现有巡检点 patrol_system.clear_patrol_points() # 添加一些示例巡检点 patrol_system.add_patrol_point((0, -20, 5), (0, -15, 0), 2.0) # 点1:前方低位置 patrol_system.add_patrol_point((0, 0, 10), (0, -30, 0), 3.0) # 点2:中央高位置 patrol_system.add_patrol_point((15, 10, 5), (-45, -10, 0), 2.5) # 点3:右侧位置 patrol_system.add_patrol_point((-15, 10, 5), (45, -10, 0), 2.5) # 点4:左侧位置 print("✓ 默认巡检路线已创建")