EG/core/patrol_system.py
2025-09-17 15:42:42 +08:00

496 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from direct.showbase.ShowBaseGlobal import globalClock
from direct.task.TaskManagerGlobal import taskMgr
from panda3d.core import Point3, Vec3
import math
class PatrolSystem:
"""巡检系统类"""
def __init__(self, world):
"""初始化巡检系统
Args:
world: 核心世界对象引用
"""
self.world = world
# 巡检状态
self.is_patrolling = False
self.patrol_points = [] # 巡检点列表 [(pos, hpr, wait_time), ...]
self.current_patrol_index = 0
self.patrol_task = None
# 巡检参数
self.patrol_speed = 5.0 # 巡检移动速度(单位/秒)
self.patrol_turn_speed = 90.0 # 转向速度(度/秒)
self.patrol_wait_timer = 0.0
self.patrol_state = "moving" # "moving", "turning_to_target", "waiting", "turning_back"
# 相机状态保存
self.original_cam_pos = None
self.original_cam_hpr = None
print("✓ 巡检系统初始化完成")
def add_patrol_point(self, position, heading=None, wait_time=3.0):
if heading is None:
if self.patrol_points:
last_pos = self.patrol_points[-1][0]
direction_x = position[0] - last_pos.x
direction_y = position[1] - last_pos.y
direction_z = position[2] - last_pos.z
import math
h=math.degrees(math.atan2(-direction_x,-direction_y))
distance_xy = math.sqrt(direction_x**2+direction_y**2)
p = math.degrees(math.atan2(direction_z,distance_xy))
p = max(-89,min(89,p))
r=0
heading = (h,p,r)
else:
# 使用当前相机朝向
current_hpr = self.world.cam.getHpr()
heading = (current_hpr.x, current_hpr.y, current_hpr.z)
pos = Point3(position[0], position[1], position[2])
hpr = Vec3(heading[0], heading[1], heading[2])
self.patrol_points.append((pos, hpr, wait_time))
print(f"✓ 添加巡检点 {len(self.patrol_points)}: 位置{position}, 朝向{heading}, 停留{wait_time}")
# 在 PatrolSystem 类中添加以下方法
def add_auto_heading_patrol_point(self, position, wait_time=3.0):
"""添加自动计算朝向的巡检点(朝向路径前进方向)
Args:
position: 相机位置 (x, y, z)
wait_time: 在该点停留时间(秒)
"""
heading = None # 将自动计算朝向
# 复用原有的 add_patrol_point 方法
self.add_patrol_point(position, heading, wait_time)
def add_patrol_point_looking_at(self, position, look_at_position, wait_time=3.0):
"""添加朝向指定位置的巡检点
Args:
position: 相机位置 (x, y, z)
look_at_position: 相机朝向的目标位置 (x, y, z)
wait_time: 在该点停留时间(秒)
"""
import math
# 计算从当前位置到目标位置的方向向量
direction_x = look_at_position[0] - position[0]
direction_y = look_at_position[1] - position[1]
direction_z = look_at_position[2] - position[2]
# 计算HPR朝向
h = math.degrees(math.atan2(-direction_x, -direction_y))
distance_xy = math.sqrt(direction_x ** 2 + direction_y ** 2)
p = math.degrees(math.atan2(direction_z, distance_xy))
p = max(-89, min(89, p)) # 限制pitch角度在合理范围内
r = 0 # roll通常为0
heading = (h, p, r)
self.add_patrol_point(position, heading, wait_time)
def clear_patrol_points(self):
"""清空所有巡检点"""
self.patrol_points = []
print("✓ 巡检点已清空")
def set_patrol_speed(self, move_speed, turn_speed=None):
"""设置巡检速度
Args:
move_speed: 移动速度(单位/秒)
turn_speed: 转向速度(度/秒如果为None则保持当前值
"""
self.patrol_speed = move_speed
if turn_speed is not None:
self.patrol_turn_speed = turn_speed
print(f"✓ 巡检速度已设置: 移动{move_speed}, 转向{turn_speed or self.patrol_turn_speed}")
def start_patrol(self):
"""开始巡检"""
if not self.patrol_points:
print("✗ 没有设置巡检点,无法开始巡检")
return False
if self.is_patrolling:
print("⚠ 巡检已在进行中")
return True
# 保存当前相机状态
self.original_cam_pos = Point3(self.world.cam.getPos())
self.original_cam_hpr = Vec3(self.world.cam.getHpr())
# 重置巡检状态
self.current_patrol_index = 0
self.patrol_state = "moving"
self.patrol_wait_timer = 0.0
self.is_patrolling = True
# 启动巡检任务
if self.patrol_task:
taskMgr.remove(self.patrol_task)
self.patrol_task = taskMgr.add(self._patrol_task, "patrol_task")
print(f"✓ 开始巡检,共{len(self.patrol_points)}个巡检点")
return True
def stop_patrol(self):
"""停止巡检"""
if not self.is_patrolling:
print("⚠ 巡检未在进行中")
return False
# 停止巡检任务
if self.patrol_task:
taskMgr.remove(self.patrol_task)
self.patrol_task = None
self.is_patrolling = False
self.patrol_state = "moving"
self.patrol_wait_timer = 0.0
print("✓ 巡检已停止")
return True
def pause_patrol(self):
"""暂停巡检"""
if not self.is_patrolling:
print("⚠ 巡检未在进行中")
return False
if self.patrol_task:
taskMgr.remove(self.patrol_task)
self.patrol_task = None
print("✓ 巡检已暂停")
return True
def resume_patrol(self):
"""恢复巡检"""
if self.is_patrolling:
print("⚠ 巡检已在进行中")
return False
if not self.patrol_points:
print("✗ 没有设置巡检点")
return False
self.is_patrolling = True
self.patrol_task = taskMgr.add(self._patrol_task, "patrol_task")
print("✓ 巡检已恢复")
return True
def reset_to_original_position(self):
"""重置相机到原始位置"""
if self.original_cam_pos and self.original_cam_hpr:
self.world.cam.setPos(self.original_cam_pos)
self.world.cam.setHpr(self.original_cam_hpr)
print("✓ 相机已重置到原始位置")
return True
else:
print("✗ 没有保存的原始位置")
return False
def _patrol_task(self, task):
"""巡检主任务"""
try:
if not self.is_patrolling or not self.patrol_points:
return task.done
# 获取当前巡检点
current_point = self.patrol_points[self.current_patrol_index]
target_pos, target_hpr, wait_time = current_point
# 根据当前状态执行不同操作
if self.patrol_state == "moving":
self._handle_moving_state(target_pos)
elif self.patrol_state == "turning_to_target":
self._handle_turning_to_target_state(target_hpr)
elif self.patrol_state == "waiting":
self._handle_waiting_state(wait_time)
elif self.patrol_state == "turning_back":
self._handle_turning_back_state()
return task.cont
except Exception as e:
print(f"巡检任务出错: {e}")
import traceback
traceback.print_exc()
return task.done
def _handle_moving_state(self, target_pos):
"""处理移动状态"""
current_pos = self.world.cam.getPos()
distance = (target_pos - current_pos).length()
if distance < 0.1: # 到达目标点
print(f"✓ 到达巡检点 {self.current_patrol_index + 1}")
self.patrol_state = "turning_to_target"
return
# 计算移动方向和距离
direction = target_pos - current_pos
direction.normalize()
# 计算目标朝向(看向目标点)
target_hpr = self._look_at_to_hpr(direction)
current_hpr = self.world.cam.getHpr()
# 平滑转向到目标朝向
h_diff = self._normalize_angle(target_hpr.x - current_hpr.x)
p_diff = self._normalize_angle(target_hpr.y - current_hpr.y)
r_diff = self._normalize_angle(target_hpr.z - current_hpr.z)
# 计算本帧应转动的角度
dt = globalClock.getDt()
turn_amount = self.patrol_turn_speed * dt
# 逐步转向目标角度
new_hpr = Vec3(current_hpr)
if abs(h_diff) > turn_amount:
new_hpr.x += turn_amount if h_diff > 0 else -turn_amount
else:
new_hpr.x = target_hpr.x
if abs(p_diff) > turn_amount:
new_hpr.y += turn_amount if p_diff > 0 else -turn_amount
else:
new_hpr.y = target_hpr.y
if abs(r_diff) > turn_amount:
new_hpr.z += turn_amount if r_diff > 0 else -turn_amount
else:
new_hpr.z = target_hpr.z
self.world.cam.setHpr(new_hpr)
# 计算本帧应移动的距离
move_distance = self.patrol_speed * dt
# 如果移动距离大于剩余距离,则直接移动到目标点
if move_distance >= distance:
self.world.cam.setPos(target_pos)
else:
# 否则按方向移动
new_pos = current_pos + direction * move_distance
self.world.cam.setPos(new_pos)
def _handle_turning_to_target_state(self, target_hpr):
"""处理转向目标状态"""
# 检查是否需要朝向下一个点
if target_hpr == "look_next":
# 计算朝向下一个点的方向
next_index = (self.current_patrol_index + 1) % len(self.patrol_points)
next_point_pos = self.patrol_points[next_index][0]
current_pos = self.world.cam.getPos()
direction = next_point_pos - current_pos
direction.normalize()
# 计算目标朝向
target_hpr = self._look_at_to_hpr(direction)
current_hpr = self.world.cam.getHpr()
# 计算角度差
h_diff = self._normalize_angle(target_hpr.x - current_hpr.x)
p_diff = self._normalize_angle(target_hpr.y - current_hpr.y)
r_diff = self._normalize_angle(target_hpr.z - current_hpr.z)
# 检查是否已完成转向
if abs(h_diff) < 1.0 and abs(p_diff) < 1.0 and abs(r_diff) < 1.0:
print(f"✓ 完成转向,开始停留")
self.patrol_state = "waiting"
self.patrol_wait_timer = 0.0
return
# 计算本帧应转动的角度
dt = globalClock.getDt()
turn_amount = self.patrol_turn_speed * dt
# 逐步转向目标角度
new_hpr = Vec3(current_hpr)
if abs(h_diff) > turn_amount:
new_hpr.x += turn_amount if h_diff > 0 else -turn_amount
else:
new_hpr.x = target_hpr.x
if abs(p_diff) > turn_amount:
new_hpr.y += turn_amount if p_diff > 0 else -turn_amount
else:
new_hpr.y = target_hpr.y
if abs(r_diff) > turn_amount:
new_hpr.z += turn_amount if r_diff > 0 else -turn_amount
else:
new_hpr.z = target_hpr.z
self.world.cam.setHpr(new_hpr)
def _handle_waiting_state(self, wait_time):
"""处理等待状态"""
self.patrol_wait_timer += globalClock.getDt()
if self.patrol_wait_timer >= wait_time:
print(f"✓ 停留结束,准备转回原朝向")
self.patrol_state = "turning_back"
# 修改 core/patrol_system.py 中的 _handle_turning_back_state 方法
def _handle_turning_back_state(self):
"""处理转回原朝向状态"""
# 直接完成转向状态,进入移动状态
print(f"✓ 停留结束,开始移动到下一个点")
# 移动到下一个巡检点
next_index = (self.current_patrol_index + 1) % len(self.patrol_points)
self.current_patrol_index = next_index
self.patrol_state = "moving"
return
def _normalize_angle(self, angle):
"""规范化角度到-180到180度之间"""
while angle > 180:
angle -= 360
while angle < -180:
angle += 360
return angle
def _look_at_to_hpr(self, direction):
"""将方向向量转换为HPR角度"""
# 简化的转换,实际应用中可能需要更精确的计算
h = math.degrees(math.atan2(-direction.x, -direction.y))
p = math.degrees(math.asin(direction.z))
return Vec3(h, p, 0)
def get_patrol_status(self):
"""获取巡检状态信息"""
return {
"is_patrolling": self.is_patrolling,
"current_point": self.current_patrol_index,
"total_points": len(self.patrol_points),
"state": self.patrol_state,
"wait_timer": self.patrol_wait_timer
}
def list_patrol_points(self):
"""列出所有巡检点"""
if not self.patrol_points:
print("没有设置巡检点")
return
print(f"巡检点列表 (共{len(self.patrol_points)}个):")
for i, (pos, hpr, wait_time) in enumerate(self.patrol_points):
current_marker = " >>>" if i == self.current_patrol_index and self.is_patrolling else ""
print(f" {i + 1}. 位置:({pos.x:.1f}, {pos.y:.1f}, {pos.z:.1f}) "
f"朝向:({hpr.x:.1f}, {hpr.y:.1f}, {hpr.z:.1f}) "
f"停留:{wait_time}{current_marker}")
def remove_patrol_point(self, index):
"""移除指定索引的巡检点"""
if 0 <= index < len(self.patrol_points):
removed_point = self.patrol_points.pop(index)
print(
f"✓ 移除巡检点 {index + 1}: 位置({removed_point[0].x:.1f}, {removed_point[0].y:.1f}, {removed_point[0].z:.1f})")
# 调整当前索引
if self.current_patrol_index >= len(self.patrol_points) and self.patrol_points:
self.current_patrol_index = len(self.patrol_points) - 1
elif self.current_patrol_index >= len(self.patrol_points):
self.current_patrol_index = 0
else:
print(f"✗ 无效的巡检点索引: {index}")
def insert_patrol_point(self, index, position, heading=None, wait_time=3.0):
"""在指定位置插入巡检点"""
if index < 0 or index > len(self.patrol_points):
print(f"✗ 无效的插入位置: {index}")
return
if heading is None:
# 使用当前相机朝向
current_hpr = self.world.cam.getHpr()
heading = (current_hpr.x, current_hpr.y, current_hpr.z)
pos = Point3(position[0], position[1], position[2])
hpr = Vec3(heading[0], heading[1], heading[2])
self.patrol_points.insert(index, (pos, hpr, wait_time))
print(f"✓ 在位置 {index + 1} 插入巡检点: 位置{position}, 朝向{heading}, 停留{wait_time}")
def update_patrol_point(self, index, position=None, heading=None, wait_time=None):
"""更新指定巡检点的信息"""
if 0 <= index < len(self.patrol_points):
pos, hpr, wt = self.patrol_points[index]
if position is not None:
pos = Point3(position[0], position[1], position[2])
if heading is not None:
hpr = Vec3(heading[0], heading[1], heading[2])
if wait_time is not None:
wt = wait_time
self.patrol_points[index] = (pos, hpr, wt)
print(f"✓ 更新巡检点 {index + 1}")
else:
print(f"✗ 无效的巡检点索引: {index}")
def goto_patrol_point(self, index):
"""直接跳转到指定巡检点"""
if not self.patrol_points:
print("✗ 没有设置巡检点")
return False
if 0 <= index < len(self.patrol_points):
pos, hpr, _ = self.patrol_points[index]
self.world.cam.setPos(pos)
self.world.cam.setHpr(hpr)
self.current_patrol_index = index
print(f"✓ 跳转到巡检点 {index + 1}")
return True
else:
print(f"✗ 无效的巡检点索引: {index}")
return False
def cleanup(self):
"""清理巡检系统资源"""
self.stop_patrol()
self.clear_patrol_points()
self.original_cam_pos = None
self.original_cam_hpr = None
print("✓ 巡检系统资源已清理")
# 使用示例和便捷函数
def create_default_patrol_route(patrol_system):
"""创建默认的巡检路线(示例)"""
# 清空现有巡检点
patrol_system.clear_patrol_points()
# 添加一些示例巡检点
patrol_system.add_patrol_point((0, -20, 5), (0, -15, 0), 2.0) # 点1前方低位置
patrol_system.add_patrol_point((0, 0, 10), (0, -30, 0), 3.0) # 点2中央高位置
patrol_system.add_patrol_point((15, 10, 5), (-45, -10, 0), 2.5) # 点3右侧位置
patrol_system.add_patrol_point((-15, 10, 5), (45, -10, 0), 2.5) # 点4左侧位置
print("✓ 默认巡检路线已创建")