forked from Rowland/EG
496 lines
17 KiB
Python
496 lines
17 KiB
Python
from direct.showbase.ShowBaseGlobal import globalClock
|
||
from direct.task.TaskManagerGlobal import taskMgr
|
||
from panda3d.core import Point3, Vec3
|
||
import math
|
||
|
||
|
||
class PatrolSystem:
|
||
"""巡检系统类"""
|
||
|
||
def __init__(self, world):
|
||
"""初始化巡检系统
|
||
|
||
Args:
|
||
world: 核心世界对象引用
|
||
"""
|
||
self.world = world
|
||
|
||
# 巡检状态
|
||
self.is_patrolling = False
|
||
self.patrol_points = [] # 巡检点列表 [(pos, hpr, wait_time), ...]
|
||
self.current_patrol_index = 0
|
||
self.patrol_task = None
|
||
|
||
# 巡检参数
|
||
self.patrol_speed = 5.0 # 巡检移动速度(单位/秒)
|
||
self.patrol_turn_speed = 90.0 # 转向速度(度/秒)
|
||
self.patrol_wait_timer = 0.0
|
||
self.patrol_state = "moving" # "moving", "turning_to_target", "waiting", "turning_back"
|
||
|
||
# 相机状态保存
|
||
self.original_cam_pos = None
|
||
self.original_cam_hpr = None
|
||
|
||
print("✓ 巡检系统初始化完成")
|
||
|
||
def add_patrol_point(self, position, heading=None, wait_time=3.0):
|
||
if heading is None:
|
||
if self.patrol_points:
|
||
last_pos = self.patrol_points[-1][0]
|
||
direction_x = position[0] - last_pos.x
|
||
direction_y = position[1] - last_pos.y
|
||
direction_z = position[2] - last_pos.z
|
||
|
||
import math
|
||
h=math.degrees(math.atan2(-direction_x,-direction_y))
|
||
|
||
distance_xy = math.sqrt(direction_x**2+direction_y**2)
|
||
p = math.degrees(math.atan2(direction_z,distance_xy))
|
||
p = max(-89,min(89,p))
|
||
|
||
r=0
|
||
|
||
heading = (h,p,r)
|
||
|
||
else:
|
||
# 使用当前相机朝向
|
||
current_hpr = self.world.cam.getHpr()
|
||
heading = (current_hpr.x, current_hpr.y, current_hpr.z)
|
||
|
||
pos = Point3(position[0], position[1], position[2])
|
||
hpr = Vec3(heading[0], heading[1], heading[2])
|
||
|
||
self.patrol_points.append((pos, hpr, wait_time))
|
||
print(f"✓ 添加巡检点 {len(self.patrol_points)}: 位置{position}, 朝向{heading}, 停留{wait_time}秒")
|
||
|
||
# 在 PatrolSystem 类中添加以下方法
|
||
|
||
def add_auto_heading_patrol_point(self, position, wait_time=3.0):
|
||
"""添加自动计算朝向的巡检点(朝向路径前进方向)
|
||
|
||
Args:
|
||
position: 相机位置 (x, y, z)
|
||
wait_time: 在该点停留时间(秒)
|
||
"""
|
||
heading = None # 将自动计算朝向
|
||
|
||
# 复用原有的 add_patrol_point 方法
|
||
self.add_patrol_point(position, heading, wait_time)
|
||
|
||
def add_patrol_point_looking_at(self, position, look_at_position, wait_time=3.0):
|
||
"""添加朝向指定位置的巡检点
|
||
|
||
Args:
|
||
position: 相机位置 (x, y, z)
|
||
look_at_position: 相机朝向的目标位置 (x, y, z)
|
||
wait_time: 在该点停留时间(秒)
|
||
"""
|
||
import math
|
||
|
||
# 计算从当前位置到目标位置的方向向量
|
||
direction_x = look_at_position[0] - position[0]
|
||
direction_y = look_at_position[1] - position[1]
|
||
direction_z = look_at_position[2] - position[2]
|
||
|
||
# 计算HPR朝向
|
||
h = math.degrees(math.atan2(-direction_x, -direction_y))
|
||
|
||
distance_xy = math.sqrt(direction_x ** 2 + direction_y ** 2)
|
||
p = math.degrees(math.atan2(direction_z, distance_xy))
|
||
p = max(-89, min(89, p)) # 限制pitch角度在合理范围内
|
||
|
||
r = 0 # roll通常为0
|
||
|
||
heading = (h, p, r)
|
||
self.add_patrol_point(position, heading, wait_time)
|
||
|
||
def clear_patrol_points(self):
|
||
"""清空所有巡检点"""
|
||
self.patrol_points = []
|
||
print("✓ 巡检点已清空")
|
||
|
||
def set_patrol_speed(self, move_speed, turn_speed=None):
|
||
"""设置巡检速度
|
||
|
||
Args:
|
||
move_speed: 移动速度(单位/秒)
|
||
turn_speed: 转向速度(度/秒),如果为None则保持当前值
|
||
"""
|
||
self.patrol_speed = move_speed
|
||
if turn_speed is not None:
|
||
self.patrol_turn_speed = turn_speed
|
||
print(f"✓ 巡检速度已设置: 移动{move_speed}, 转向{turn_speed or self.patrol_turn_speed}")
|
||
|
||
def start_patrol(self):
|
||
"""开始巡检"""
|
||
if not self.patrol_points:
|
||
print("✗ 没有设置巡检点,无法开始巡检")
|
||
return False
|
||
|
||
if self.is_patrolling:
|
||
print("⚠ 巡检已在进行中")
|
||
return True
|
||
|
||
# 保存当前相机状态
|
||
self.original_cam_pos = Point3(self.world.cam.getPos())
|
||
self.original_cam_hpr = Vec3(self.world.cam.getHpr())
|
||
|
||
# 重置巡检状态
|
||
self.current_patrol_index = 0
|
||
self.patrol_state = "moving"
|
||
self.patrol_wait_timer = 0.0
|
||
self.is_patrolling = True
|
||
|
||
# 启动巡检任务
|
||
if self.patrol_task:
|
||
taskMgr.remove(self.patrol_task)
|
||
self.patrol_task = taskMgr.add(self._patrol_task, "patrol_task")
|
||
|
||
print(f"✓ 开始巡检,共{len(self.patrol_points)}个巡检点")
|
||
return True
|
||
|
||
def stop_patrol(self):
|
||
"""停止巡检"""
|
||
if not self.is_patrolling:
|
||
print("⚠ 巡检未在进行中")
|
||
return False
|
||
|
||
# 停止巡检任务
|
||
if self.patrol_task:
|
||
taskMgr.remove(self.patrol_task)
|
||
self.patrol_task = None
|
||
|
||
self.is_patrolling = False
|
||
self.patrol_state = "moving"
|
||
self.patrol_wait_timer = 0.0
|
||
|
||
print("✓ 巡检已停止")
|
||
return True
|
||
|
||
def pause_patrol(self):
|
||
"""暂停巡检"""
|
||
if not self.is_patrolling:
|
||
print("⚠ 巡检未在进行中")
|
||
return False
|
||
|
||
if self.patrol_task:
|
||
taskMgr.remove(self.patrol_task)
|
||
self.patrol_task = None
|
||
|
||
print("✓ 巡检已暂停")
|
||
return True
|
||
|
||
def resume_patrol(self):
|
||
"""恢复巡检"""
|
||
if self.is_patrolling:
|
||
print("⚠ 巡检已在进行中")
|
||
return False
|
||
|
||
if not self.patrol_points:
|
||
print("✗ 没有设置巡检点")
|
||
return False
|
||
|
||
self.is_patrolling = True
|
||
self.patrol_task = taskMgr.add(self._patrol_task, "patrol_task")
|
||
|
||
print("✓ 巡检已恢复")
|
||
return True
|
||
|
||
def reset_to_original_position(self):
|
||
"""重置相机到原始位置"""
|
||
if self.original_cam_pos and self.original_cam_hpr:
|
||
self.world.cam.setPos(self.original_cam_pos)
|
||
self.world.cam.setHpr(self.original_cam_hpr)
|
||
print("✓ 相机已重置到原始位置")
|
||
return True
|
||
else:
|
||
print("✗ 没有保存的原始位置")
|
||
return False
|
||
|
||
def _patrol_task(self, task):
|
||
"""巡检主任务"""
|
||
try:
|
||
if not self.is_patrolling or not self.patrol_points:
|
||
return task.done
|
||
|
||
# 获取当前巡检点
|
||
current_point = self.patrol_points[self.current_patrol_index]
|
||
target_pos, target_hpr, wait_time = current_point
|
||
|
||
# 根据当前状态执行不同操作
|
||
if self.patrol_state == "moving":
|
||
self._handle_moving_state(target_pos)
|
||
elif self.patrol_state == "turning_to_target":
|
||
self._handle_turning_to_target_state(target_hpr)
|
||
elif self.patrol_state == "waiting":
|
||
self._handle_waiting_state(wait_time)
|
||
elif self.patrol_state == "turning_back":
|
||
self._handle_turning_back_state()
|
||
|
||
return task.cont
|
||
|
||
except Exception as e:
|
||
print(f"巡检任务出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return task.done
|
||
|
||
def _handle_moving_state(self, target_pos):
|
||
"""处理移动状态"""
|
||
current_pos = self.world.cam.getPos()
|
||
distance = (target_pos - current_pos).length()
|
||
|
||
if distance < 0.1: # 到达目标点
|
||
print(f"✓ 到达巡检点 {self.current_patrol_index + 1}")
|
||
self.patrol_state = "turning_to_target"
|
||
return
|
||
|
||
# 计算移动方向和距离
|
||
direction = target_pos - current_pos
|
||
direction.normalize()
|
||
|
||
# 计算目标朝向(看向目标点)
|
||
target_hpr = self._look_at_to_hpr(direction)
|
||
current_hpr = self.world.cam.getHpr()
|
||
|
||
# 平滑转向到目标朝向
|
||
h_diff = self._normalize_angle(target_hpr.x - current_hpr.x)
|
||
p_diff = self._normalize_angle(target_hpr.y - current_hpr.y)
|
||
r_diff = self._normalize_angle(target_hpr.z - current_hpr.z)
|
||
|
||
# 计算本帧应转动的角度
|
||
dt = globalClock.getDt()
|
||
turn_amount = self.patrol_turn_speed * dt
|
||
|
||
# 逐步转向目标角度
|
||
new_hpr = Vec3(current_hpr)
|
||
|
||
if abs(h_diff) > turn_amount:
|
||
new_hpr.x += turn_amount if h_diff > 0 else -turn_amount
|
||
else:
|
||
new_hpr.x = target_hpr.x
|
||
|
||
if abs(p_diff) > turn_amount:
|
||
new_hpr.y += turn_amount if p_diff > 0 else -turn_amount
|
||
else:
|
||
new_hpr.y = target_hpr.y
|
||
|
||
if abs(r_diff) > turn_amount:
|
||
new_hpr.z += turn_amount if r_diff > 0 else -turn_amount
|
||
else:
|
||
new_hpr.z = target_hpr.z
|
||
|
||
self.world.cam.setHpr(new_hpr)
|
||
|
||
# 计算本帧应移动的距离
|
||
move_distance = self.patrol_speed * dt
|
||
|
||
# 如果移动距离大于剩余距离,则直接移动到目标点
|
||
if move_distance >= distance:
|
||
self.world.cam.setPos(target_pos)
|
||
else:
|
||
# 否则按方向移动
|
||
new_pos = current_pos + direction * move_distance
|
||
self.world.cam.setPos(new_pos)
|
||
|
||
def _handle_turning_to_target_state(self, target_hpr):
|
||
"""处理转向目标状态"""
|
||
# 检查是否需要朝向下一个点
|
||
if target_hpr == "look_next":
|
||
# 计算朝向下一个点的方向
|
||
next_index = (self.current_patrol_index + 1) % len(self.patrol_points)
|
||
next_point_pos = self.patrol_points[next_index][0]
|
||
|
||
current_pos = self.world.cam.getPos()
|
||
direction = next_point_pos - current_pos
|
||
direction.normalize()
|
||
|
||
# 计算目标朝向
|
||
target_hpr = self._look_at_to_hpr(direction)
|
||
|
||
current_hpr = self.world.cam.getHpr()
|
||
|
||
# 计算角度差
|
||
h_diff = self._normalize_angle(target_hpr.x - current_hpr.x)
|
||
p_diff = self._normalize_angle(target_hpr.y - current_hpr.y)
|
||
r_diff = self._normalize_angle(target_hpr.z - current_hpr.z)
|
||
|
||
# 检查是否已完成转向
|
||
if abs(h_diff) < 1.0 and abs(p_diff) < 1.0 and abs(r_diff) < 1.0:
|
||
print(f"✓ 完成转向,开始停留")
|
||
self.patrol_state = "waiting"
|
||
self.patrol_wait_timer = 0.0
|
||
return
|
||
|
||
# 计算本帧应转动的角度
|
||
dt = globalClock.getDt()
|
||
turn_amount = self.patrol_turn_speed * dt
|
||
|
||
# 逐步转向目标角度
|
||
new_hpr = Vec3(current_hpr)
|
||
|
||
if abs(h_diff) > turn_amount:
|
||
new_hpr.x += turn_amount if h_diff > 0 else -turn_amount
|
||
else:
|
||
new_hpr.x = target_hpr.x
|
||
|
||
if abs(p_diff) > turn_amount:
|
||
new_hpr.y += turn_amount if p_diff > 0 else -turn_amount
|
||
else:
|
||
new_hpr.y = target_hpr.y
|
||
|
||
if abs(r_diff) > turn_amount:
|
||
new_hpr.z += turn_amount if r_diff > 0 else -turn_amount
|
||
else:
|
||
new_hpr.z = target_hpr.z
|
||
|
||
self.world.cam.setHpr(new_hpr)
|
||
|
||
def _handle_waiting_state(self, wait_time):
|
||
"""处理等待状态"""
|
||
self.patrol_wait_timer += globalClock.getDt()
|
||
|
||
if self.patrol_wait_timer >= wait_time:
|
||
print(f"✓ 停留结束,准备转回原朝向")
|
||
self.patrol_state = "turning_back"
|
||
|
||
# 修改 core/patrol_system.py 中的 _handle_turning_back_state 方法
|
||
|
||
def _handle_turning_back_state(self):
|
||
"""处理转回原朝向状态"""
|
||
# 直接完成转向状态,进入移动状态
|
||
print(f"✓ 停留结束,开始移动到下一个点")
|
||
# 移动到下一个巡检点
|
||
next_index = (self.current_patrol_index + 1) % len(self.patrol_points)
|
||
self.current_patrol_index = next_index
|
||
self.patrol_state = "moving"
|
||
return
|
||
|
||
def _normalize_angle(self, angle):
|
||
"""规范化角度到-180到180度之间"""
|
||
while angle > 180:
|
||
angle -= 360
|
||
while angle < -180:
|
||
angle += 360
|
||
return angle
|
||
|
||
def _look_at_to_hpr(self, direction):
|
||
"""将方向向量转换为HPR角度"""
|
||
# 简化的转换,实际应用中可能需要更精确的计算
|
||
h = math.degrees(math.atan2(-direction.x, -direction.y))
|
||
p = math.degrees(math.asin(direction.z))
|
||
return Vec3(h, p, 0)
|
||
|
||
def get_patrol_status(self):
|
||
"""获取巡检状态信息"""
|
||
return {
|
||
"is_patrolling": self.is_patrolling,
|
||
"current_point": self.current_patrol_index,
|
||
"total_points": len(self.patrol_points),
|
||
"state": self.patrol_state,
|
||
"wait_timer": self.patrol_wait_timer
|
||
}
|
||
|
||
def list_patrol_points(self):
|
||
"""列出所有巡检点"""
|
||
if not self.patrol_points:
|
||
print("没有设置巡检点")
|
||
return
|
||
|
||
print(f"巡检点列表 (共{len(self.patrol_points)}个):")
|
||
for i, (pos, hpr, wait_time) in enumerate(self.patrol_points):
|
||
current_marker = " >>>" if i == self.current_patrol_index and self.is_patrolling else ""
|
||
print(f" {i + 1}. 位置:({pos.x:.1f}, {pos.y:.1f}, {pos.z:.1f}) "
|
||
f"朝向:({hpr.x:.1f}, {hpr.y:.1f}, {hpr.z:.1f}) "
|
||
f"停留:{wait_time}秒{current_marker}")
|
||
|
||
def remove_patrol_point(self, index):
|
||
"""移除指定索引的巡检点"""
|
||
if 0 <= index < len(self.patrol_points):
|
||
removed_point = self.patrol_points.pop(index)
|
||
print(
|
||
f"✓ 移除巡检点 {index + 1}: 位置({removed_point[0].x:.1f}, {removed_point[0].y:.1f}, {removed_point[0].z:.1f})")
|
||
|
||
# 调整当前索引
|
||
if self.current_patrol_index >= len(self.patrol_points) and self.patrol_points:
|
||
self.current_patrol_index = len(self.patrol_points) - 1
|
||
elif self.current_patrol_index >= len(self.patrol_points):
|
||
self.current_patrol_index = 0
|
||
else:
|
||
print(f"✗ 无效的巡检点索引: {index}")
|
||
|
||
def insert_patrol_point(self, index, position, heading=None, wait_time=3.0):
|
||
"""在指定位置插入巡检点"""
|
||
if index < 0 or index > len(self.patrol_points):
|
||
print(f"✗ 无效的插入位置: {index}")
|
||
return
|
||
|
||
if heading is None:
|
||
# 使用当前相机朝向
|
||
current_hpr = self.world.cam.getHpr()
|
||
heading = (current_hpr.x, current_hpr.y, current_hpr.z)
|
||
|
||
pos = Point3(position[0], position[1], position[2])
|
||
hpr = Vec3(heading[0], heading[1], heading[2])
|
||
|
||
self.patrol_points.insert(index, (pos, hpr, wait_time))
|
||
print(f"✓ 在位置 {index + 1} 插入巡检点: 位置{position}, 朝向{heading}, 停留{wait_time}秒")
|
||
|
||
def update_patrol_point(self, index, position=None, heading=None, wait_time=None):
|
||
"""更新指定巡检点的信息"""
|
||
if 0 <= index < len(self.patrol_points):
|
||
pos, hpr, wt = self.patrol_points[index]
|
||
|
||
if position is not None:
|
||
pos = Point3(position[0], position[1], position[2])
|
||
if heading is not None:
|
||
hpr = Vec3(heading[0], heading[1], heading[2])
|
||
if wait_time is not None:
|
||
wt = wait_time
|
||
|
||
self.patrol_points[index] = (pos, hpr, wt)
|
||
print(f"✓ 更新巡检点 {index + 1}")
|
||
else:
|
||
print(f"✗ 无效的巡检点索引: {index}")
|
||
|
||
def goto_patrol_point(self, index):
|
||
"""直接跳转到指定巡检点"""
|
||
if not self.patrol_points:
|
||
print("✗ 没有设置巡检点")
|
||
return False
|
||
|
||
if 0 <= index < len(self.patrol_points):
|
||
pos, hpr, _ = self.patrol_points[index]
|
||
self.world.cam.setPos(pos)
|
||
self.world.cam.setHpr(hpr)
|
||
self.current_patrol_index = index
|
||
print(f"✓ 跳转到巡检点 {index + 1}")
|
||
return True
|
||
else:
|
||
print(f"✗ 无效的巡检点索引: {index}")
|
||
return False
|
||
|
||
def cleanup(self):
|
||
"""清理巡检系统资源"""
|
||
self.stop_patrol()
|
||
self.clear_patrol_points()
|
||
self.original_cam_pos = None
|
||
self.original_cam_hpr = None
|
||
print("✓ 巡检系统资源已清理")
|
||
|
||
|
||
# 使用示例和便捷函数
|
||
def create_default_patrol_route(patrol_system):
|
||
"""创建默认的巡检路线(示例)"""
|
||
# 清空现有巡检点
|
||
patrol_system.clear_patrol_points()
|
||
|
||
# 添加一些示例巡检点
|
||
patrol_system.add_patrol_point((0, -20, 5), (0, -15, 0), 2.0) # 点1:前方低位置
|
||
patrol_system.add_patrol_point((0, 0, 10), (0, -30, 0), 3.0) # 点2:中央高位置
|
||
patrol_system.add_patrol_point((15, 10, 5), (-45, -10, 0), 2.5) # 点3:右侧位置
|
||
patrol_system.add_patrol_point((-15, 10, 5), (45, -10, 0), 2.5) # 点4:左侧位置
|
||
|
||
print("✓ 默认巡检路线已创建")
|
||
|