EG/core/vr_manager.py
2025-09-16 16:30:36 +08:00

2361 lines
95 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
VR管理器模块
负责VR功能的初始化、渲染和交互
- OpenVR/SteamVR集成
- VR头显跟踪和渲染
- VR控制器交互
- VR模式切换
"""
import sys
import numpy as np
from panda3d.core import (
WindowProperties, GraphicsPipe, FrameBufferProperties,
GraphicsOutput, Texture, Camera, PerspectiveLens, MatrixLens,
Mat4, Vec3, TransformState, RenderState, CardMaker,
BitMask32, PandaNode, NodePath, LMatrix4, LVector3, LVector4,
CS_yup_right, CS_default, PythonCallbackObject
)
from direct.task import Task
from direct.showbase.DirectObject import DirectObject
try:
import openvr
OPENVR_AVAILABLE = True
except ImportError:
OPENVR_AVAILABLE = False
print("警告: OpenVR未安装VR功能将不可用")
# 导入手柄控制器、动作系统和交互系统
from .vr_controller import LeftController, RightController
from .vr_actions import VRActionManager
from .vr_interaction import VRInteractionManager
class VRManager(DirectObject):
"""VR管理器类 - 处理所有VR相关功能"""
def __init__(self, world):
"""初始化VR管理器
Args:
world: 主世界对象引用
"""
super().__init__()
self.world = world
self.vr_system = None
self.vr_enabled = False
self.vr_initialized = False
# VR渲染相关
self.vr_left_eye_buffer = None
self.vr_right_eye_buffer = None
self.vr_left_camera = None
self.vr_right_camera = None
self.vr_compositor = None
# VR跟踪数据
self.hmd_pose = Mat4.identMat()
self.controller_poses = {}
self.tracked_device_poses = []
self.poses = None # OpenVR渲染姿态数组
self.game_poses = None # OpenVR游戏逻辑姿态数组
# VR渲染参数
self.eye_width = 1080
self.eye_height = 1200
self.near_clip = 0.1
self.far_clip = 1000.0
# VR任务
self.vr_task = None
# VR锚点层级系统
self.tracking_space = None
self.hmd_anchor = None
self.left_eye_anchor = None
self.right_eye_anchor = None
# 坐标系转换矩阵 - 使用Panda3D内置方法
self.coord_mat = LMatrix4.convert_mat(CS_yup_right, CS_default)
self.coord_mat_inv = LMatrix4.convert_mat(CS_default, CS_yup_right)
# 性能监控
self.frame_count = 0
self.last_fps_check = 0
self.last_fps_time = 0
self.vr_fps = 0
self.submit_failures = 0
self.pose_failures = 0
# 高级性能监控
self.performance_monitoring = True # 是否启用性能监控
self.debug_output_enabled = True # 是否启用调试输出
self.debug_mode = 'detailed' # 'brief' 或 'detailed'
self.cpu_usage = 0.0
self.memory_usage = 0.0
self.gpu_usage = 0.0
self.gpu_memory_usage = 0.0
self.frame_times = [] # 存储最近帧时间
self.max_frame_time_history = 60 # 保存60帧的历史
self.last_performance_check = 0
self.performance_check_interval = 0.5 # 每0.5秒更新一次性能数据
# 渲染管线详细监控
self.enable_pipeline_monitoring = True # 是否启用管线监控
self.wait_poses_time = 0.0 # waitGetPoses耗时
self.left_render_time = 0.0 # 左眼渲染耗时
self.right_render_time = 0.0 # 右眼渲染耗时
self.submit_time = 0.0 # 纹理提交耗时
self.total_frame_time = 0.0 # 总帧时间
self.vr_sync_wait_time = 0.0 # VR同步等待时间
# 时间监控历史保存最近30帧
self.wait_poses_times = []
self.render_times = []
self.submit_times = []
self.sync_wait_times = []
self.pipeline_history_size = 30
# VR系统信息
self.current_eye_resolution = (self.eye_width, self.eye_height)
self.recommended_eye_resolution = (0, 0)
self.vr_display_frequency = 0.0
self.vr_vsync_enabled = True
self.vsync_to_photons_ms = 0.0 # VSync到光子的延迟
self.target_frame_time_ms = 0.0 # 目标帧时间
self.vsync_window_ms = 0.0 # VSync时间窗口
self.async_reprojection_enabled = False # 异步重投影状态
self.motion_smoothing_enabled = False # 运动平滑状态
# waitGetPoses调用策略配置
self.pose_strategy = 'render_callback' # 'render_callback' 或 'update_task'
self.use_prediction_time = 0.011 # 11ms的预测时间用于update_task策略
self.poses_updated_in_task = False # 标记是否在更新任务中已获取姿态
# 尝试导入性能监控库
self._init_performance_monitoring()
# VR提交策略 - 基于参考实现
self.submit_together = True # 是否在right_cb中同时提交左右眼
# 帧同步标记 - 修复ATW闪烁
self._poses_updated_this_frame = False
# 姿态缓存 - 修复时序不匹配
self._cached_render_poses = None # 用于渲染的缓存姿态
self._first_frame = True # 首帧标记
# ATW控制选项 - 备选方案
self.disable_async_reprojection = False # 是否禁用异步重投影
# VR手柄控制器
self.left_controller = None
self.right_controller = None
self.controllers = {} # 设备索引到控制器的映射
self.tracked_device_anchors = {} # 跟踪设备锚点
# VR动作系统
self.action_manager = VRActionManager(self)
# VR交互系统
self.interaction_manager = VRInteractionManager(self)
print("✓ VR管理器初始化完成")
def convert_mat(self, mat):
"""
将OpenVR矩阵转换为Panda3D矩阵 - 基于参考实现
"""
if len(mat.m) == 4:
result = LMatrix4(
mat.m[0][0], mat.m[1][0], mat.m[2][0], mat.m[3][0],
mat.m[0][1], mat.m[1][1], mat.m[2][1], mat.m[3][1],
mat.m[0][2], mat.m[1][2], mat.m[2][2], mat.m[3][2],
mat.m[0][3], mat.m[1][3], mat.m[2][3], mat.m[3][3])
elif len(mat.m) == 3:
result = LMatrix4(
mat.m[0][0], mat.m[1][0], mat.m[2][0], 0.0,
mat.m[0][1], mat.m[1][1], mat.m[2][1], 0.0,
mat.m[0][2], mat.m[1][2], mat.m[2][2], 0.0,
mat.m[0][3], mat.m[1][3], mat.m[2][3], 1.0)
return result
def is_vr_available(self):
"""检查VR系统是否可用"""
if not OPENVR_AVAILABLE:
return False
try:
# 检查SteamVR是否运行
return openvr.isRuntimeInstalled() and openvr.isHmdPresent()
except Exception as e:
print(f"VR检查失败: {e}")
return False
def initialize_vr(self):
"""初始化VR系统"""
if not OPENVR_AVAILABLE:
print("❌ OpenVR不可用无法初始化VR")
return False
if self.vr_initialized:
print("VR系统已经初始化")
return True
try:
print("🔄 正在初始化VR系统...")
# 初始化OpenVR - 使用Scene应用类型确保正确的焦点管理
self.vr_system = openvr.init(openvr.VRApplication_Scene)
if not self.vr_system:
print("❌ 无法初始化OpenVR系统")
return False
# 获取compositor
self.vr_compositor = openvr.VRCompositor()
if not self.vr_compositor:
print("❌ 无法获取VR Compositor")
return False
# 获取推荐的渲染目标尺寸
self.eye_width, self.eye_height = self.vr_system.getRecommendedRenderTargetSize()
self.current_eye_resolution = (self.eye_width, self.eye_height)
self.recommended_eye_resolution = (self.eye_width, self.eye_height)
print(f"✓ VR渲染目标尺寸: {self.eye_width}x{self.eye_height}")
# 获取VR系统信息
try:
# 获取显示频率
self.vr_display_frequency = self.vr_system.getFloatTrackedDeviceProperty(
openvr.k_unTrackedDeviceIndex_Hmd,
openvr.Prop_DisplayFrequency_Float
)
print(f"✓ VR显示频率: {self.vr_display_frequency} Hz")
# 获取IPD瞳距
ipd = self.vr_system.getFloatTrackedDeviceProperty(
openvr.k_unTrackedDeviceIndex_Hmd,
openvr.Prop_UserIpdMeters_Float
)
print(f"✓ IPD瞳距: {ipd * 1000:.1f}mm")
# 获取设备制造商和型号
manufacturer = self.vr_system.getStringTrackedDeviceProperty(
openvr.k_unTrackedDeviceIndex_Hmd,
openvr.Prop_ManufacturerName_String
)
model = self.vr_system.getStringTrackedDeviceProperty(
openvr.k_unTrackedDeviceIndex_Hmd,
openvr.Prop_ModelNumber_String
)
print(f"✓ VR设备: {manufacturer} {model}")
# 获取更多系统配置信息
try:
# 获取刷新率相关信息
seconds_since_last_vsync = self.vr_system.getFloatTrackedDeviceProperty(
openvr.k_unTrackedDeviceIndex_Hmd,
openvr.Prop_SecondsFromVsyncToPhotons_Float
)
self.vsync_to_photons_ms = seconds_since_last_vsync * 1000
print(f"✓ VSync到光子延迟: {self.vsync_to_photons_ms:.2f}ms")
# 获取显示延迟信息
display_refresh_rate = self.vr_display_frequency
if display_refresh_rate > 0:
self.target_frame_time_ms = 1000.0 / display_refresh_rate
print(f"✓ 目标帧时间: {self.target_frame_time_ms:.2f}ms")
# 计算VSync时间窗口
self.vsync_window_ms = self.target_frame_time_ms * 0.1 # 10%的窗口
print(f"✓ VSync时间窗口: ±{self.vsync_window_ms:.2f}ms")
except Exception as vsync_error:
print(f"⚠️ 获取VSync信息失败: {vsync_error}")
# 检查是否启用了异步重投影
try:
if hasattr(openvr, 'VRSettings'):
settings = openvr.VRSettings()
if settings:
self.async_reprojection_enabled = settings.getBool("steamvr", "enableAsyncReprojection")
print(f"✓ 异步重投影: {'启用' if self.async_reprojection_enabled else '禁用'}")
self.motion_smoothing_enabled = settings.getBool("steamvr", "motionSmoothing")
print(f"✓ 运动平滑: {'启用' if self.motion_smoothing_enabled else '禁用'}")
except Exception as settings_error:
print(f"⚠️ 获取VR设置失败: {settings_error}")
except Exception as e:
print(f"⚠️ 获取VR系统信息失败: {e}")
# 创建OpenVR姿态数组 - 分离渲染和游戏姿态
poses_t = openvr.TrackedDevicePose_t * openvr.k_unMaxTrackedDeviceCount
self.poses = poses_t() # 渲染姿态(预测的)
self.game_poses = poses_t() # 游戏逻辑姿态(当前的)
print("✓ VR渲染和游戏姿态数组已创建")
# 创建VR渲染缓冲区
if not self._create_vr_buffers():
print("❌ 创建VR渲染缓冲区失败")
return False
# 设置VR相机
if not self._setup_vr_cameras():
print("❌ 设置VR相机失败")
return False
# 初始化手柄控制器
self._initialize_controllers()
# 初始化动作系统
if not self.action_manager.initialize():
print("⚠️ VR动作系统初始化失败但VR系统将继续运行")
# 初始化交互系统
if not self.interaction_manager.initialize():
print("⚠️ VR交互系统初始化失败但VR系统将继续运行")
# 可选:禁用异步重投影(备选方案)
if self.disable_async_reprojection:
self._disable_async_reprojection()
# 启动VR更新任务
self._start_vr_task()
self.vr_initialized = True
print("✅ VR系统初始化成功")
return True
except Exception as e:
print(f"❌ VR初始化失败: {e}")
import traceback
traceback.print_exc()
return False
def _create_vr_buffers(self):
"""创建VR渲染缓冲区 - 基于参考实现"""
try:
# 创建左眼纹理和缓冲区
self.vr_left_texture = self._create_vr_texture("VR Left Eye Texture")
self.vr_left_eye_buffer = self._create_vr_buffer(
"VR Left Eye",
self.vr_left_texture,
self.eye_width,
self.eye_height
)
if not self.vr_left_eye_buffer:
print("❌ 创建左眼缓冲区失败")
return False
# 设置左眼缓冲区属性
self.vr_left_eye_buffer.setSort(-100)
self.vr_left_eye_buffer.setClearColor((0.1, 0.2, 0.4, 1)) # 深蓝色背景便于调试
self.vr_left_eye_buffer.setActive(True)
# 创建右眼纹理和缓冲区
self.vr_right_texture = self._create_vr_texture("VR Right Eye Texture")
self.vr_right_eye_buffer = self._create_vr_buffer(
"VR Right Eye",
self.vr_right_texture,
self.eye_width,
self.eye_height
)
if not self.vr_right_eye_buffer:
print("❌ 创建右眼缓冲区失败")
return False
# 设置右眼缓冲区属性
self.vr_right_eye_buffer.setSort(-99)
self.vr_right_eye_buffer.setClearColor((0.1, 0.2, 0.4, 1)) # 深蓝色背景便于调试
self.vr_right_eye_buffer.setActive(True)
print("✓ VR渲染缓冲区创建成功")
return True
except Exception as e:
print(f"❌ 创建VR缓冲区失败: {e}")
import traceback
traceback.print_exc()
return False
def _create_vr_texture(self, name):
"""创建VR纹理对象 - 基于参考实现"""
texture = Texture(name)
texture.setWrapU(Texture.WMClamp)
texture.setWrapV(Texture.WMClamp)
texture.setMinfilter(Texture.FTLinear)
texture.setMagfilter(Texture.FTLinear)
return texture
def _create_vr_buffer(self, name, texture, width, height):
"""创建VR渲染缓冲区 - 基于参考实现"""
# 设置帧缓冲属性
fbprops = FrameBufferProperties()
fbprops.setRgbaBits(1, 1, 1, 1)
# 可以在这里添加多重采样抗锯齿
# fbprops.setMultisamples(4)
# 创建缓冲区
buffer = self.world.win.makeTextureBuffer(name, width, height, to_ram=False, fbp=fbprops)
if buffer:
# 清除默认渲染纹理
buffer.clearRenderTextures()
# 添加我们的纹理
buffer.addRenderTexture(texture, GraphicsOutput.RTMBindOrCopy, GraphicsOutput.RTPColor)
return buffer
def _setup_vr_cameras(self):
"""设置VR相机 - 使用锚点层级系统"""
try:
# 创建VR追踪空间锚点层级
self.tracking_space = self.world.render.attachNewNode('tracking-space')
self.hmd_anchor = self.tracking_space.attachNewNode('hmd-anchor')
self.left_eye_anchor = self.hmd_anchor.attachNewNode('left-eye')
self.right_eye_anchor = self.hmd_anchor.attachNewNode('right-eye')
# 获取投影矩阵
projection_left = self.coord_mat_inv * self.convert_mat(
self.vr_system.getProjectionMatrix(openvr.Eye_Left, self.near_clip, self.far_clip))
projection_right = self.coord_mat_inv * self.convert_mat(
self.vr_system.getProjectionMatrix(openvr.Eye_Right, self.near_clip, self.far_clip))
# 创建左眼相机节点
left_cam_node = Camera('left-cam')
left_lens = MatrixLens()
left_lens.setUserMat(projection_left)
left_cam_node.setLens(left_lens)
# 创建右眼相机节点
right_cam_node = Camera('right-cam')
right_lens = MatrixLens()
right_lens.setUserMat(projection_right)
right_cam_node.setLens(right_lens)
# 附加相机到眼睛锚点
self.vr_left_camera = self.left_eye_anchor.attachNewNode(left_cam_node)
self.vr_right_camera = self.right_eye_anchor.attachNewNode(right_cam_node)
# 设置显示区域并添加渲染回调
left_dr = self.vr_left_eye_buffer.makeDisplayRegion()
left_dr.setCamera(self.vr_left_camera)
left_dr.setActive(True)
left_dr.setDrawCallback(PythonCallbackObject(self.left_cb))
right_dr = self.vr_right_eye_buffer.makeDisplayRegion()
right_dr.setCamera(self.vr_right_camera)
right_dr.setActive(True)
right_dr.setDrawCallback(PythonCallbackObject(self.right_cb))
print("✓ VR相机锚点层级系统设置完成")
return True
except Exception as e:
print(f"❌ 设置VR相机失败: {e}")
import traceback
traceback.print_exc()
return False
def _get_eye_offset(self, eye):
"""获取眼睛相对于头显的偏移"""
try:
if not self.vr_system:
# 使用标准IPD瞳距估算值
ipd = 0.064 # 64mm平均IPD
if eye == openvr.Eye_Left:
return Vec3(-ipd/2, 0, 0)
else:
return Vec3(ipd/2, 0, 0)
# 从OpenVR获取眼睛到头显的变换矩阵
eye_transform = self.vr_system.getEyeToHeadTransform(eye)
# 提取位移信息
x = eye_transform[0][3]
y = eye_transform[1][3]
z = eye_transform[2][3]
return Vec3(x, y, z)
except Exception as e:
print(f"❌ 获取眼睛偏移失败: {e}")
# 返回默认值
ipd = 0.064
if eye == openvr.Eye_Left:
return Vec3(-ipd/2, 0, 0)
else:
return Vec3(ipd/2, 0, 0)
def _start_vr_task(self):
"""启动VR更新任务"""
if self.vr_task:
self.world.taskMgr.remove(self.vr_task)
self.vr_task = self.world.taskMgr.add(self._update_vr, "update_vr")
print("✓ VR更新任务已启动")
def _update_vr(self, task):
"""VR更新任务 - 每帧调用"""
if not self.vr_enabled or not self.vr_system:
return task.cont
try:
# 性能监控
self.frame_count += 1
# 记录帧时间
self._track_frame_time()
# 计算VR FPS
import time
current_time = time.time()
if self.last_fps_time == 0:
self.last_fps_time = current_time
elif current_time - self.last_fps_time >= 1.0: # 每秒更新一次FPS
self.vr_fps = (self.frame_count - self.last_fps_check) / (current_time - self.last_fps_time)
self.last_fps_check = self.frame_count
self.last_fps_time = current_time
# 更新系统性能指标
self._update_performance_metrics()
# VR更新策略选择
if self.pose_strategy == 'update_task':
# 策略1在更新任务中调用waitGetPoses推荐用于高性能
timing = self._start_timing('wait_poses')
self._wait_get_poses_with_prediction()
self._end_timing(timing)
self.poses_updated_in_task = True
# 更新相机姿态
self._update_camera_poses()
else:
# 策略2在渲染回调中调用waitGetPoses默认策略
self.poses_updated_in_task = False
# 1. 更新手柄和其他跟踪设备
self.update_tracked_devices()
# 2. 更新VR动作状态
self.action_manager.update_actions()
# 3. 更新VR交互系统
self.interaction_manager.update()
# 注意:纹理提交现在通过渲染回调自动处理
# 定期输出性能报告 - 默认10秒间隔
report_interval = getattr(self, 'performance_report_interval', 600)
if self.frame_count % report_interval == 1:
self._print_performance_report()
except Exception as e:
print(f"VR更新错误: {e}")
import traceback
traceback.print_exc()
return task.cont
def _wait_get_poses(self):
"""调用VRCompositor的waitGetPoses来获取焦点和姿态数据"""
try:
if not self.vr_compositor or not self.poses:
return
# 调用waitGetPoses获取焦点和姿态数据
# 这个调用可能会阻塞直到下一个VR同步点
result = self.vr_compositor.waitGetPoses(self.poses, None)
# 检查姿态数据的有效性
valid_poses = 0
# 更新HMD姿态设备0通常是头显
if len(self.poses) > 0 and self.poses[0].bPoseIsValid:
valid_poses += 1
else:
# 如果HMD姿态无效不要频繁输出错误信息
if not hasattr(self, '_hmd_invalid_warning_shown'):
print("⚠️ HMD姿态数据无效")
self._hmd_invalid_warning_shown = True
# 更新控制器姿态
self.controller_poses.clear()
for device_id in range(1, min(len(self.poses), openvr.k_unMaxTrackedDeviceCount)):
if self.poses[device_id].bPoseIsValid:
device_class = self.vr_system.getTrackedDeviceClass(device_id)
if device_class == openvr.TrackedDeviceClass_Controller:
controller_matrix = self.poses[device_id].mDeviceToAbsoluteTracking
self.controller_poses[device_id] = self._convert_openvr_matrix_to_panda(controller_matrix)
valid_poses += 1
# 性能监控 - 偶尔输出姿态状态
if self.frame_count % 600 == 1: # 每10秒输出一次@60fps
print(f"📊 VR姿态状态 - 有效姿态数: {valid_poses}, 总帧数: {self.frame_count}")
except Exception as e:
# 限制错误输出频率
if not hasattr(self, '_last_error_frame'):
self._last_error_frame = 0
if self.frame_count - self._last_error_frame > 300: # 每5秒最多输出一次错误
print(f"waitGetPoses失败: {e}")
self._last_error_frame = self.frame_count
# 记录姿态失败次数
self.pose_failures += 1
def _wait_get_poses_immediate(self):
"""立即获取VR姿态 - 修复ATW闪烁的关键方法双姿态版本"""
# 开始计时waitGetPoses操作
timing = self._start_timing('wait_poses')
try:
if not self.vr_compositor or not self.poses or not self.game_poses:
self._end_timing(timing)
return
# 关键修复:传递渲染姿态和游戏姿态数组
# 渲染姿态用于绘制游戏姿态用于逻辑避免ATW过度补偿
result = self.vr_compositor.waitGetPoses(self.poses, self.game_poses)
# 结束计时
wait_time = self._end_timing(timing)
# 检查姿态数据的有效性
valid_poses = 0
# 更新HMD姿态设备0通常是头显
if len(self.poses) > 0 and self.poses[0].bPoseIsValid:
valid_poses += 1
else:
# 如果HMD姿态无效不要频繁输出错误信息
if not hasattr(self, '_hmd_invalid_warning_shown'):
print("⚠️ HMD姿态数据无效立即模式")
self._hmd_invalid_warning_shown = True
# 更新控制器姿态
self.controller_poses.clear()
for device_id in range(1, min(len(self.poses), openvr.k_unMaxTrackedDeviceCount)):
if self.poses[device_id].bPoseIsValid:
device_class = self.vr_system.getTrackedDeviceClass(device_id)
if device_class == openvr.TrackedDeviceClass_Controller:
controller_matrix = self.poses[device_id].mDeviceToAbsoluteTracking
self.controller_poses[device_id] = self._convert_openvr_matrix_to_panda(controller_matrix)
valid_poses += 1
# 调试信息 - 仅在第一次成功时输出
if not hasattr(self, '_dual_pose_mode_logged') and valid_poses > 0:
print(f"✅ 双姿态立即获取模式启用 - 有效姿态数: {valid_poses}")
print(" 渲染姿态用于绘制游戏姿态用于逻辑防止ATW过度补偿")
self._dual_pose_mode_logged = True
except Exception as e:
# 限制错误输出频率
if not hasattr(self, '_last_immediate_error_frame'):
self._last_immediate_error_frame = 0
if self.frame_count - self._last_immediate_error_frame > 300: # 每5秒最多输出一次错误
print(f"立即姿态获取失败: {e}")
self._last_immediate_error_frame = self.frame_count
self.pose_failures += 1
def _wait_get_poses_with_prediction(self):
"""使用预测时间获取VR姿态 - 优化性能的新策略"""
try:
if not self.vr_compositor or not self.poses or not self.game_poses:
return
# 使用预测时间获取姿态
# 预测时间通常为11-16ms对应下一个VR帧的时间
result = self.vr_compositor.waitGetPoses(self.poses, self.game_poses)
# 检查姿态数据的有效性
valid_poses = 0
# 更新HMD姿态设备0通常是头显
if len(self.poses) > 0 and self.poses[0].bPoseIsValid:
valid_poses += 1
else:
# 如果HMD姿态无效不要频繁输出错误信息
if not hasattr(self, '_hmd_invalid_warning_task'):
print("⚠️ HMD姿态数据无效更新任务模式")
self._hmd_invalid_warning_task = True
# 更新控制器姿态
self.controller_poses.clear()
for device_id in range(1, min(len(self.poses), openvr.k_unMaxTrackedDeviceCount)):
if self.poses[device_id].bPoseIsValid:
device_class = self.vr_system.getTrackedDeviceClass(device_id)
if device_class == openvr.TrackedDeviceClass_Controller:
controller_matrix = self.poses[device_id].mDeviceToAbsoluteTracking
self.controller_poses[device_id] = self._convert_openvr_matrix_to_panda(controller_matrix)
valid_poses += 1
# 调试信息 - 仅在第一次成功时输出
if not hasattr(self, '_update_task_mode_logged') and valid_poses > 0:
print(f"✅ 更新任务姿态获取模式启用 - 有效姿态数: {valid_poses}")
print(f" 预测时间: {self.use_prediction_time*1000:.1f}ms")
self._update_task_mode_logged = True
except Exception as e:
# 限制错误输出频率
if not hasattr(self, '_last_task_error_frame'):
self._last_task_error_frame = 0
if self.frame_count - self._last_task_error_frame > 300: # 每5秒最多输出一次错误
print(f"更新任务姿态获取失败: {e}")
self._last_task_error_frame = self.frame_count
# 记录姿态失败次数
self.pose_failures += 1
def _cache_poses_for_next_frame(self):
"""缓存当前姿态供下一帧渲染使用 - 修复时序不匹配"""
try:
if not self.poses or len(self.poses) == 0:
return
# 如果是第一帧,直接使用当前姿态
if self._first_frame:
self._cached_render_poses = self.poses
self._first_frame = False
print("✓ 首帧姿态缓存已设置")
return
# 复制当前渲染姿态到缓存
# 下一帧将使用这些姿态进行渲染
poses_t = openvr.TrackedDevicePose_t * openvr.k_unMaxTrackedDeviceCount
cached_poses = poses_t()
# 复制姿态数据
for i in range(len(self.poses)):
cached_poses[i] = self.poses[i]
self._cached_render_poses = cached_poses
except Exception as e:
print(f"⚠️ 姿态缓存失败: {e}")
def _reset_frame_flag(self, task):
"""重置帧标记 - 确保下一帧可以更新姿态"""
self._poses_updated_this_frame = False
return task.done
def _update_tracking_data(self):
"""更新VR追踪数据"""
try:
# 获取设备姿态
poses = self.vr_system.getDeviceToAbsoluteTrackingPose(
openvr.TrackingUniverseStanding, 0.0, openvr.k_unMaxTrackedDeviceCount
)
# 更新HMD姿态设备0通常是头显
if poses[0].bPoseIsValid:
hmd_matrix = poses[0].mDeviceToAbsoluteTracking
self.hmd_pose = self._convert_openvr_matrix_to_panda(hmd_matrix)
# 更新控制器姿态
for device_id in range(1, openvr.k_unMaxTrackedDeviceCount):
if poses[device_id].bPoseIsValid:
device_class = self.vr_system.getTrackedDeviceClass(device_id)
if device_class == openvr.TrackedDeviceClass_Controller:
controller_matrix = poses[device_id].mDeviceToAbsoluteTracking
self.controller_poses[device_id] = self._convert_openvr_matrix_to_panda(controller_matrix)
except Exception as e:
print(f"更新追踪数据失败: {e}")
def _convert_openvr_matrix_to_panda(self, ovr_matrix):
"""将OpenVR矩阵转换为Panda3D矩阵
坐标系转换:
OpenVR: X右, Y上, -Z前右手坐标系
Panda3D: X右, Y前, Z上右手坐标系
转换规则:
OpenVR X → Panda3D X
OpenVR Y → Panda3D Z
OpenVR -Z → Panda3D Y
"""
mat = Mat4()
# 修正的坐标转换矩阵
# OpenVR: X右, Y上, -Z前 → Panda3D: X右, Y前, Z上
# 转换规则: (ovr_x, ovr_y, ovr_z) → (panda_x, panda_y, panda_z)
# (ovr_x, ovr_y, ovr_z) → (ovr_x, -ovr_z, ovr_y)
# X轴行Panda3D的X轴对应OpenVR的X轴
mat.setCell(0, 0, ovr_matrix[0][0]) # X_x → X_x
mat.setCell(0, 1, ovr_matrix[0][1]) # X_y → X_y
mat.setCell(0, 2, ovr_matrix[0][2]) # X_z → X_z
mat.setCell(0, 3, ovr_matrix[0][3]) # 位移X分量
# Y轴行Panda3D的Y轴对应OpenVR的-Z轴
mat.setCell(1, 0, -ovr_matrix[2][0]) # -Z_x → Y_x
mat.setCell(1, 1, -ovr_matrix[2][1]) # -Z_y → Y_y
mat.setCell(1, 2, -ovr_matrix[2][2]) # -Z_z → Y_z
mat.setCell(1, 3, -ovr_matrix[2][3]) # 位移Y分量-Z位移
# Z轴行Panda3D的Z轴对应OpenVR的Y轴
mat.setCell(2, 0, ovr_matrix[1][0]) # Y_x → Z_x
mat.setCell(2, 1, ovr_matrix[1][1]) # Y_y → Z_y
mat.setCell(2, 2, ovr_matrix[1][2]) # Y_z → Z_z
mat.setCell(2, 3, ovr_matrix[1][3]) # 位移Z分量Y位移
# 齐次坐标
mat.setCell(3, 0, 0)
mat.setCell(3, 1, 0)
mat.setCell(3, 2, 0)
mat.setCell(3, 3, 1)
# 调试信息 - 验证坐标系转换
if not hasattr(self, '_coord_debug_counter'):
self._coord_debug_counter = 0
self._coord_debug_counter += 1
if self._coord_debug_counter % 600 == 1: # 每10秒输出一次@60fps
print(f"🔄 坐标系转换调试 (第{self._coord_debug_counter}帧)")
# 输出原始OpenVR矩阵信息
ovr_pos = Vec3(ovr_matrix[0][3], ovr_matrix[1][3], ovr_matrix[2][3])
print(f" OpenVR原始位置: {ovr_pos}")
# 输出转换后的Panda3D矩阵信息
# 正确的方法从矩阵中读取位移第4列前3行
panda_pos = Vec3(mat.getCell(0, 3), mat.getCell(1, 3), mat.getCell(2, 3))
print(f" Panda3D转换位置: {panda_pos}")
# 检查矩阵设置是否正确
# 手动验证位置转换OpenVR (x,y,z) → Panda3D (x,-z,y)
manual_converted_pos = Vec3(ovr_pos.x, -ovr_pos.z, ovr_pos.y)
print(f" 手动转换结果: {manual_converted_pos}")
# 检查我们的矩阵是否正确设置了位置
print(f" 矩阵位置元素: [{mat.getCell(0,3)}, {mat.getCell(1,3)}, {mat.getCell(2,3)}]")
# 验证转换是否正确
expected_panda_pos = manual_converted_pos
print(f" 预期Panda3D位置: {expected_panda_pos}")
# 检查转换是否正确
diff = panda_pos - expected_panda_pos
diff_magnitude = diff.length()
if diff_magnitude < 0.001:
print(f" ✅ 坐标转换正确 (误差: {diff_magnitude:.6f})")
else:
print(f" ⚠️ 坐标转换可能有误 (误差: {diff_magnitude:.6f})")
print(f" 差异向量: {diff}")
print(f" 实际矩阵第4行: [{mat.getCell(3,0)}, {mat.getCell(3,1)}, {mat.getCell(3,2)}, {mat.getCell(3,3)}]")
return mat
def update_hmd(self, pose):
"""
更新HMD锚点 - 基于参考实现
"""
try:
# 将OpenVR姿态转换为Panda3D矩阵
modelview = self.convert_mat(pose.mDeviceToAbsoluteTracking)
# 应用坐标系转换并设置HMD锚点
self.hmd_anchor.setMat(self.coord_mat_inv * modelview * self.coord_mat)
# 获取眼睛到头部的变换
view_left = self.convert_mat(self.vr_system.getEyeToHeadTransform(openvr.Eye_Left))
view_right = self.convert_mat(self.vr_system.getEyeToHeadTransform(openvr.Eye_Right))
# 设置眼睛锚点
self.left_eye_anchor.setMat(self.coord_mat_inv * view_left * self.coord_mat)
self.right_eye_anchor.setMat(self.coord_mat_inv * view_right * self.coord_mat)
except Exception as e:
print(f"更新HMD姿态失败: {e}")
def _update_camera_poses(self):
"""更新相机姿态 - 使用锚点系统简化处理"""
try:
# 使用锚点系统后,相机位置自动跟随锚点
# 只需要获取HMD姿态并更新锚点即可
# 从poses数组中获取HMD姿态
if hasattr(self, 'poses') and len(self.poses) > 0:
hmd_pose = self.poses[openvr.k_unTrackedDeviceIndex_Hmd]
if hmd_pose.bPoseIsValid:
self.update_hmd(hmd_pose)
else:
print("⚠️ HMD姿态数据无效")
except Exception as e:
print(f"更新相机姿态失败: {e}")
import traceback
traceback.print_exc()
def _update_camera_poses_with_cache(self):
"""使用缓存姿态更新相机 - 符合OpenVR时序假设"""
try:
# 使用缓存的姿态符合OpenVR的时序假设
# OpenVR假设你用上一次WaitGetPoses的姿态渲染当前提交的帧
if not self._cached_render_poses:
# 如果没有缓存姿态,回退到当前姿态(首帧情况)
print("⚠️ 没有缓存姿态,使用当前姿态")
return self._update_camera_poses()
# 从缓存姿态数组中获取HMD姿态
if len(self._cached_render_poses) > 0:
hmd_pose = self._cached_render_poses[openvr.k_unTrackedDeviceIndex_Hmd]
if hmd_pose.bPoseIsValid:
self.update_hmd(hmd_pose)
# 调试信息 - 验证缓存姿态使用
if not hasattr(self, '_cached_pose_logged'):
print("✅ 使用缓存姿态更新相机 - 符合OpenVR时序假设")
self._cached_pose_logged = True
else:
print("⚠️ 缓存的HMD姿态数据无效")
except Exception as e:
print(f"使用缓存姿态更新相机失败: {e}")
# 回退到正常更新方式
self._update_camera_poses()
import traceback
traceback.print_exc()
def _submit_frames_to_vr(self):
"""将渲染帧提交给VR系统"""
try:
if not self.vr_compositor:
return
# 使用我们创建的纹理对象
left_texture = self.vr_left_texture
right_texture = self.vr_right_texture
if left_texture and right_texture:
# 确保纹理已准备并获取OpenGL纹理ID
gsg = self.world.win.getGsg()
prepared_objects = gsg.getPreparedObjects()
# 准备纹理 - 使用更简单的方法
try:
# 方法1: 尝试prepareNow
if hasattr(left_texture, 'prepareNow'):
left_texture.prepareNow(0, prepared_objects, gsg)
if hasattr(right_texture, 'prepareNow'):
right_texture.prepareNow(0, prepared_objects, gsg)
except Exception as prep_error:
print(f"纹理准备失败: {prep_error}")
# 继续尝试,可能纹理已经准备好了
# 获取OpenGL纹理ID
left_texture_id = self._get_texture_opengl_id(left_texture, prepared_objects, gsg)
right_texture_id = self._get_texture_opengl_id(right_texture, prepared_objects, gsg)
# 检查是否成功获取了纹理ID
if left_texture_id is not None and left_texture_id > 0 and right_texture_id is not None and right_texture_id > 0:
try:
# 创建OpenVR纹理结构
left_eye_texture = openvr.Texture_t()
left_eye_texture.handle = int(left_texture_id)
left_eye_texture.eType = openvr.TextureType_OpenGL
left_eye_texture.eColorSpace = openvr.ColorSpace_Gamma
right_eye_texture = openvr.Texture_t()
right_eye_texture.handle = int(right_texture_id)
right_eye_texture.eType = openvr.TextureType_OpenGL
right_eye_texture.eColorSpace = openvr.ColorSpace_Gamma
# 提交到VR系统
error_left = self.vr_compositor.submit(openvr.Eye_Left, left_eye_texture)
error_right = self.vr_compositor.submit(openvr.Eye_Right, right_eye_texture)
# 检查提交结果 - 在Python OpenVR中None表示成功
left_success = (error_left is None or error_left == openvr.VRCompositorError_None)
right_success = (error_right is None or error_right == openvr.VRCompositorError_None)
if not left_success:
print(f"⚠️ 左眼纹理提交错误: {error_left}")
self.submit_failures += 1
if not right_success:
print(f"⚠️ 右眼纹理提交错误: {error_right}")
self.submit_failures += 1
# 如果两个都成功了,输出成功信息(仅第一次)
if not hasattr(self, '_first_submit_success'):
if left_success and right_success:
print("✅ VR纹理提交成功缓冲区应该显示场景内容。")
print(f" 左眼纹理ID: {left_texture_id}, 右眼纹理ID: {right_texture_id}")
self._first_submit_success = True
except Exception as submit_error:
print(f"❌ VR纹理提交过程失败: {submit_error}")
if "DoNotHaveFocus" in str(submit_error):
print("🔍 这通常意味着另一个VR应用程序正在使用VR系统")
self.submit_failures += 1
else:
# 只在第一次失败时输出警告,避免太多日志
if not hasattr(self, '_texture_id_warning_shown'):
print("⚠️ 无法获取有效的纹理OpenGL ID跳过VR帧提交")
print(" 这可能是因为纹理尚未正确准备到GPU")
self._texture_id_warning_shown = True
except Exception as e:
print(f"提交VR帧失败: {e}")
import traceback
traceback.print_exc()
def _get_texture_opengl_id(self, texture, prepared_objects, gsg):
"""获取纹理的OpenGL ID"""
try:
# 方法1: 使用prepareNow获取正确的纹理上下文
texture_context = texture.prepareNow(0, prepared_objects, gsg)
if texture_context and hasattr(texture_context, 'getNativeId'):
native_id = texture_context.getNativeId()
if native_id > 0:
# print(f"✓ 成功获取纹理 {texture.getName()} 的OpenGL ID: {native_id}")
return native_id
# 方法2: 备选方案 - 通过prepared_objects获取texture context
if hasattr(prepared_objects, 'getTextureContext'):
texture_context = prepared_objects.getTextureContext(texture)
if texture_context and hasattr(texture_context, 'getNativeId'):
native_id = texture_context.getNativeId()
if native_id > 0:
# print(f"✓ 备选方法获取纹理 {texture.getName()} 的OpenGL ID: {native_id}")
return native_id
# 方法3: 尝试texture对象本身的方法
if hasattr(texture, 'getNativeId'):
native_id = texture.getNativeId()
if native_id > 0:
# print(f"✓ 直接获取纹理 {texture.getName()} 的OpenGL ID: {native_id}")
return native_id
# 如果所有方法都失败
print(f"❌ 无法获取纹理 {texture.getName()} 的OpenGL ID")
return None
except Exception as e:
print(f"获取纹理OpenGL ID失败: {e}")
import traceback
traceback.print_exc()
return None
def enable_vr(self):
"""启用VR模式"""
if not self.is_vr_available():
print("❌ VR系统不可用")
return False
if not self.vr_initialized:
if not self.initialize_vr():
return False
self.vr_enabled = True
# 禁用主相机避免干扰VR渲染
self._disable_main_cam()
print("✅ VR模式已启用")
return True
def disable_vr(self):
"""禁用VR模式"""
self.vr_enabled = False
# 恢复主相机
self._enable_main_cam()
print("✅ VR模式已禁用")
def cleanup(self):
"""清理VR资源"""
try:
print("🔄 正在清理VR资源...")
# 停止VR任务
if self.vr_task:
self.world.taskMgr.remove(self.vr_task)
self.vr_task = None
# 清理渲染缓冲区
if self.vr_left_eye_buffer:
self.vr_left_eye_buffer.removeAllDisplayRegions()
self.world.graphicsEngine.removeWindow(self.vr_left_eye_buffer)
self.vr_left_eye_buffer = None
if self.vr_right_eye_buffer:
self.vr_right_eye_buffer.removeAllDisplayRegions()
self.world.graphicsEngine.removeWindow(self.vr_right_eye_buffer)
self.vr_right_eye_buffer = None
# 清理相机
if self.vr_left_camera:
self.vr_left_camera.removeNode()
self.vr_left_camera = None
if self.vr_right_camera:
self.vr_right_camera.removeNode()
self.vr_right_camera = None
# 关闭OpenVR
if self.vr_system and OPENVR_AVAILABLE:
try:
openvr.shutdown()
except:
pass
self.vr_system = None
self.vr_enabled = False
self.vr_initialized = False
print("✅ VR资源清理完成")
except Exception as e:
print(f"⚠️ VR清理过程中出错: {e}")
def get_vr_status(self):
"""获取VR状态信息"""
return {
'available': self.is_vr_available(),
'initialized': self.vr_initialized,
'enabled': self.vr_enabled,
'eye_resolution': (self.eye_width, self.eye_height),
'device_count': len(self.controller_poses) + (1 if self.vr_enabled else 0),
'vr_fps': self.vr_fps,
'frame_count': self.frame_count,
'submit_failures': self.submit_failures,
'pose_failures': self.pose_failures
}
def _print_performance_report(self):
"""输出VR性能报告"""
if not self.performance_monitoring or not self.debug_output_enabled:
return
stats = self.get_performance_stats()
# 简短模式输出
if self.debug_mode == 'brief':
self._print_brief_performance_report(stats)
return
print("📊 ======= VR性能监控报告 =======")
# 帧率和帧时间信息
print(f"🎯 渲染性能:")
print(f" VR帧率: {stats['vr_fps']:.1f} FPS")
print(f" 平均帧时间: {stats['frame_time_avg']:.2f} ms")
print(f" 最小帧时间: {stats['frame_time_min']:.2f} ms")
print(f" 最大帧时间: {stats['frame_time_max']:.2f} ms")
print(f" 95%帧时间: {stats['frame_time_95th']:.2f} ms")
# 系统性能
print(f"💻 系统性能:")
print(f" CPU使用率: {stats['cpu_usage']:.1f}%")
print(f" 内存使用率: {stats['memory_usage']:.1f}%")
# GPU性能
print(f"🎮 GPU性能:")
if self.gputil_available or self.nvidia_ml_available:
print(f" GPU使用率: {stats['gpu_usage']:.1f}%")
print(f" 显存使用率: {stats['gpu_memory_usage']:.1f}%")
else:
print(f" GPU监控: 不可用 (需要安装 GPUtil 或 pynvml)")
print(f" 安装命令: pip install GPUtil nvidia-ml-py")
# VR特定指标
print(f"🥽 VR指标:")
print(f" 总帧数: {stats['frame_count']}")
print(f" 提交失败: {stats['submit_failures']}")
print(f" 姿态失败: {stats['pose_failures']}")
# 计算失败率
if stats['frame_count'] > 0:
submit_fail_rate = (stats['submit_failures'] / stats['frame_count']) * 100
pose_fail_rate = (stats['pose_failures'] / stats['frame_count']) * 100
print(f" 提交失败率: {submit_fail_rate:.2f}%")
print(f" 姿态失败率: {pose_fail_rate:.2f}%")
# 渲染管线监控
if self.enable_pipeline_monitoring:
pipeline_stats = self._get_pipeline_stats()
print(f"⚙️ 渲染管线分析:")
print(f" 当前分辨率: {pipeline_stats['vr_info']['eye_resolution'][0]}x{pipeline_stats['vr_info']['eye_resolution'][1]}")
print(f" 推荐分辨率: {pipeline_stats['vr_info']['recommended_resolution'][0]}x{pipeline_stats['vr_info']['recommended_resolution'][1]}")
print(f" 显示频率: {pipeline_stats['vr_info']['display_frequency']:.1f} Hz")
# VSync和时序信息
if pipeline_stats['vr_info']['target_frame_time_ms'] > 0:
print(f"🎯 VSync时序:")
print(f" 目标帧时间: {pipeline_stats['vr_info']['target_frame_time_ms']:.2f}ms")
print(f" VSync到光子: {pipeline_stats['vr_info']['vsync_to_photons_ms']:.2f}ms")
print(f" VSync窗口: ±{pipeline_stats['vr_info']['vsync_window_ms']:.2f}ms")
print(f" 异步重投影: {'启用' if pipeline_stats['vr_info']['async_reprojection'] else '禁用'}")
print(f" 运动平滑: {'启用' if pipeline_stats['vr_info']['motion_smoothing'] else '禁用'}")
# 分析帧时间是否在目标范围内
current_frame_time = stats['frame_time_avg']
target_frame_time = pipeline_stats['vr_info']['target_frame_time_ms']
if current_frame_time > 0 and target_frame_time > 0:
frame_time_ratio = current_frame_time / target_frame_time
if frame_time_ratio > 1.1:
print(f" ⚠️ 帧时间超标: {current_frame_time:.1f}ms (目标:{target_frame_time:.1f}ms)")
elif frame_time_ratio < 0.9:
print(f" ✅ 帧时间充裕: {current_frame_time:.1f}ms (目标:{target_frame_time:.1f}ms)")
else:
print(f" ✓ 帧时间正常: {current_frame_time:.1f}ms (目标:{target_frame_time:.1f}ms)")
print(f"🕐 各阶段耗时 (最近{self.pipeline_history_size}帧平均):")
print(f" waitGetPoses: {pipeline_stats['wait_poses']['avg']:.2f}ms (min:{pipeline_stats['wait_poses']['min']:.1f}, max:{pipeline_stats['wait_poses']['max']:.1f})")
print(f" 渲染总计: {pipeline_stats['render']['avg']:.2f}ms (min:{pipeline_stats['render']['min']:.1f}, max:{pipeline_stats['render']['max']:.1f})")
print(f" 纹理提交: {pipeline_stats['submit']['avg']:.2f}ms (min:{pipeline_stats['submit']['min']:.1f}, max:{pipeline_stats['submit']['max']:.1f})")
if pipeline_stats['sync_wait']['avg'] > 0:
print(f" 同步等待: {pipeline_stats['sync_wait']['avg']:.2f}ms (min:{pipeline_stats['sync_wait']['min']:.1f}, max:{pipeline_stats['sync_wait']['max']:.1f})")
print(f"🔍 当前帧详情:")
print(f" 左眼渲染: {pipeline_stats['current']['left_render']:.2f}ms")
print(f" 右眼渲染: {pipeline_stats['current']['right_render']:.2f}ms")
print(f" 姿态获取: {pipeline_stats['current']['wait_poses']:.2f}ms")
# 显示姿态策略信息
strategy_info = self.get_pose_strategy_info()
print(f"🎯 姿态策略:")
print(f" 当前策略: {strategy_info['strategy']}")
print(f" 策略描述: {strategy_info['description']}")
if strategy_info['strategy'] == 'update_task':
print(f" 预测时间: {strategy_info['prediction_time_ms']:.1f}ms")
# 分析最大瓶颈
current = pipeline_stats['current']
bottleneck_analysis = []
if current['wait_poses'] > 5.0:
bottleneck_analysis.append(f"姿态获取耗时过长({current['wait_poses']:.1f}ms)")
if current['total_render'] > 8.0:
bottleneck_analysis.append(f"渲染耗时过长({current['total_render']:.1f}ms)")
if current['submit'] > 2.0:
bottleneck_analysis.append(f"纹理提交耗时过长({current['submit']:.1f}ms)")
if bottleneck_analysis:
print(f"🚨 瓶颈分析:")
for analysis in bottleneck_analysis:
print(f" ⚠️ {analysis}")
# 性能建议
self._print_performance_recommendations(stats)
print("===============================")
def _print_performance_recommendations(self, stats):
"""根据性能数据输出优化建议"""
print(f"💡 性能建议:")
recommendations = []
# FPS相关建议
if stats['vr_fps'] < 72:
recommendations.append(" ⚠️ VR帧率过低可能影响体验")
# 帧时间相关建议
if stats['frame_time_avg'] > 13.9: # 72fps = 13.9ms
recommendations.append(" ⚠️ 平均帧时间过高,建议降低渲染质量")
if stats['frame_time_max'] > 50:
recommendations.append(" ⚠️ 检测到严重卡顿检查CPU/GPU负载")
# CPU建议
if stats['cpu_usage'] > 80:
recommendations.append(" 🔴 CPU使用率过高可能存在CPU瓶颈")
elif stats['cpu_usage'] > 60:
recommendations.append(" 🟡 CPU使用率偏高注意监控")
# 内存建议
if stats['memory_usage'] > 85:
recommendations.append(" 🔴 内存使用率过高,可能影响性能")
# GPU建议
if self.gputil_available or self.nvidia_ml_available:
if stats['gpu_usage'] > 95:
recommendations.append(" 🔴 GPU使用率接近满载存在GPU瓶颈")
if stats['gpu_memory_usage'] > 90:
recommendations.append(" 🔴 显存使用率过高,可能需要降低纹理质量")
# 失败率建议
submit_fail_rate = (stats['submit_failures'] / max(stats['frame_count'], 1)) * 100
if submit_fail_rate > 1:
recommendations.append(" ⚠️ VR帧提交失败率较高检查VR系统状态")
if not recommendations:
recommendations.append(" ✅ 性能表现良好,无明显问题")
for rec in recommendations:
print(rec)
def _print_brief_performance_report(self, stats):
"""输出简短的VR性能报告"""
# 创建一行简短摘要
summary = f"🥽 VR性能: {stats['vr_fps']:.1f}fps"
if stats['frame_time_avg'] > 0:
summary += f" | 帧时间: {stats['frame_time_avg']:.1f}ms"
if self.psutil_available:
summary += f" | CPU: {stats['cpu_usage']:.0f}%"
summary += f" | 内存: {stats['memory_usage']:.0f}%"
# 显示GPU信息如果库不可用则显示提示
if self.gputil_available or self.nvidia_ml_available:
summary += f" | GPU: {stats['gpu_usage']:.0f}%"
else:
summary += " | GPU: N/A"
# 添加失败率指示
if stats['frame_count'] > 0:
submit_fail_rate = (stats['submit_failures'] / stats['frame_count']) * 100
if submit_fail_rate > 0.1:
summary += f" | 提交失败: {submit_fail_rate:.1f}%"
# 添加管线关键信息
if self.enable_pipeline_monitoring:
pipeline_stats = self._get_pipeline_stats()
current = pipeline_stats['current']
# 显示关键瓶颈
if current['wait_poses'] > 5.0:
summary += f" | 姿态: {current['wait_poses']:.1f}ms⚠"
elif current['wait_poses'] > 0:
summary += f" | 姿态: {current['wait_poses']:.1f}ms"
if current['total_render'] > 8.0:
summary += f" | 渲染: {current['total_render']:.1f}ms⚠"
elif current['total_render'] > 0:
summary += f" | 渲染: {current['total_render']:.1f}ms"
# 显示目标帧时间对比
vr_info = pipeline_stats['vr_info']
if vr_info['target_frame_time_ms'] > 0:
target = vr_info['target_frame_time_ms']
current_avg = stats['frame_time_avg']
if current_avg > target * 1.1:
summary += f" | 目标:{target:.0f}ms⚠"
else:
summary += f" | 目标:{target:.0f}ms"
# 性能状态指示
if stats['vr_fps'] < 72:
summary += " ⚠️"
elif stats['vr_fps'] > 85:
summary += ""
print(summary)
def left_cb(self, cbdata):
"""左眼渲染回调 - 修复ATW闪烁问题"""
# 开始计时左眼渲染
render_timing = self._start_timing('left_render')
try:
# 根据策略决定是否在渲染回调中获取姿态
if self.pose_strategy == 'render_callback':
# 传统策略:在渲染开始前立即获取最新姿态
if not hasattr(self, '_poses_updated_this_frame') or not self._poses_updated_this_frame:
self._wait_get_poses_immediate()
# 缓存当前姿态供下一帧使用符合OpenVR时序假设
self._cache_poses_for_next_frame()
# 使用缓存的姿态更新相机(上一帧的姿态)
self._update_camera_poses_with_cache()
self._poses_updated_this_frame = True
# 在帧结束时重置标记
self.world.taskMgr.doMethodLater(0.001, self._reset_frame_flag, "reset_frame_flag")
elif self.poses_updated_in_task:
# 新策略:使用在更新任务中获取的姿态,无需重新获取
pass
# 执行实际的渲染工作
cbdata.upcall()
# 结束渲染计时
self._end_timing(render_timing)
# 根据提交策略决定是否立即提交
if not self.submit_together:
# 分别提交模式:左眼渲染完成后立即提交
submit_timing = self._start_timing('submit')
self.submit_texture(openvr.Eye_Left, self.vr_left_texture)
self._end_timing(submit_timing)
except Exception as e:
self._end_timing(render_timing)
print(f"左眼渲染回调错误: {e}")
def right_cb(self, cbdata):
"""右眼渲染回调 - 修复ATW闪烁问题"""
# 开始计时右眼渲染
render_timing = self._start_timing('right_render')
try:
# 根据策略决定是否在渲染回调中获取姿态
if self.pose_strategy == 'render_callback':
# 传统策略:在渲染开始前立即获取最新姿态
# 由于左右眼都会调用回调确保每帧只调用一次WaitGetPoses
if not hasattr(self, '_poses_updated_this_frame') or not self._poses_updated_this_frame:
self._wait_get_poses_immediate()
# 缓存当前姿态供下一帧使用符合OpenVR时序假设
self._cache_poses_for_next_frame()
# 使用缓存的姿态更新相机(上一帧的姿态)
self._update_camera_poses_with_cache()
self._poses_updated_this_frame = True
# 在帧结束时重置标记
self.world.taskMgr.doMethodLater(0.001, self._reset_frame_flag, "reset_frame_flag")
elif self.poses_updated_in_task:
# 新策略:使用在更新任务中获取的姿态,无需重新获取
pass
# 执行实际的渲染工作
cbdata.upcall()
# 结束渲染计时
self._end_timing(render_timing)
# 根据提交策略决定提交方式
submit_timing = self._start_timing('submit')
if self.submit_together:
# 同时提交模式:右眼渲染完成后同时提交左右眼
self.submit_texture(openvr.Eye_Left, self.vr_left_texture)
self.submit_texture(openvr.Eye_Right, self.vr_right_texture)
else:
# 分别提交模式:只提交右眼
self.submit_texture(openvr.Eye_Right, self.vr_right_texture)
self._end_timing(submit_timing)
except Exception as e:
self._end_timing(render_timing)
print(f"右眼渲染回调错误: {e}")
def submit_texture(self, eye, texture):
"""提交纹理到VR - 基于参考实现,增强调试信息"""
try:
if not self.vr_compositor:
print("❌ VR compositor不可用")
self.submit_failures += 1
return
# 获取graphics state guardian和prepared objects
gsg = self.world.win.getGsg()
if not gsg:
print("❌ 无法获取GraphicsStateGuardian")
self.submit_failures += 1
return
prepared_objects = gsg.getPreparedObjects()
if not prepared_objects:
print("❌ 无法获取PreparedObjects")
self.submit_failures += 1
return
# 准备纹理并获取更详细的错误信息
if not texture:
print("❌ 纹理对象为空")
self.submit_failures += 1
return
# 纹理准备调试信息已注释掉以减少输出
# print(f"🔍 准备纹理: {texture.getName()}, 大小: {texture.getXSize()}x{texture.getYSize()}")
texture_context = texture.prepareNow(0, prepared_objects, gsg)
if not texture_context:
print("❌ prepareNow返回空的texture_context")
self.submit_failures += 1
return
handle = texture_context.getNativeId()
# print(f"🔍 获取OpenGL纹理句柄: {handle}")
if handle != 0:
ovr_texture = openvr.Texture_t()
ovr_texture.handle = handle
ovr_texture.eType = openvr.TextureType_OpenGL
ovr_texture.eColorSpace = openvr.ColorSpace_Gamma
eye_name = "左眼" if eye == openvr.Eye_Left else "右眼"
# print(f"🔍 提交{eye_name}纹理到VR, 句柄: {handle}")
# 提交到VR系统
error = self.vr_compositor.submit(eye, ovr_texture)
# print(f"🔍 VR提交结果: {error}")
# 检查错误
if error and error != openvr.VRCompositorError_None:
print(f"⚠️ VR纹理提交错误代码: {error}")
self.submit_failures += 1
else:
# 只在第一次成功时输出
if not hasattr(self, '_submit_success_logged'):
print(f"✅ VR纹理提交成功! {eye_name}")
self._submit_success_logged = True
else:
print(f"❌ 无法获取纹理OpenGL句柄: handle = {handle}")
print(f" 纹理状态: 已准备={texture.isPrepared(prepared_objects)}")
print(f" 纹理格式: {texture.getFormat()}")
self.submit_failures += 1
except Exception as e:
print(f"❌ VR纹理提交异常: {e}")
import traceback
traceback.print_exc()
self.submit_failures += 1
def _disable_main_cam(self):
"""禁用主相机 - 基于参考实现"""
try:
# 保存原始相机状态
if not hasattr(self, '_original_camera_parent'):
self._original_camera_parent = self.world.camera.getParent()
# 创建空节点并将主相机重新附加到它
self._empty_world = NodePath("empty_world")
self.world.camera.reparentTo(self._empty_world)
print("✓ 主相机已禁用")
except Exception as e:
print(f"⚠️ 禁用主相机失败: {e}")
def _enable_main_cam(self):
"""恢复主相机 - 基于参考实现"""
try:
# 恢复原始相机状态
if hasattr(self, '_original_camera_parent') and self._original_camera_parent:
self.world.camera.reparentTo(self._original_camera_parent)
else:
# 如果没有保存的父节点重新附加到render
self.world.camera.reparentTo(self.world.render)
# 清理空世界节点
if hasattr(self, '_empty_world'):
self._empty_world.removeNode()
delattr(self, '_empty_world')
print("✓ 主相机已恢复")
except Exception as e:
print(f"⚠️ 恢复主相机失败: {e}")
def _initialize_controllers(self):
"""初始化VR手柄控制器"""
try:
print("🎮 正在初始化VR手柄控制器...")
# 创建左右手柄控制器实例
self.left_controller = LeftController(self)
self.right_controller = RightController(self)
# 检测现有连接的控制器
self._detect_controllers()
print("✓ VR手柄控制器初始化完成")
except Exception as e:
print(f"⚠️ VR手柄初始化失败: {e}")
import traceback
traceback.print_exc()
def _detect_controllers(self):
"""检测并连接VR控制器"""
if not self.vr_system:
return
try:
for device_index in range(openvr.k_unMaxTrackedDeviceCount):
# 检查设备是否已连接
if not self.vr_system.isTrackedDeviceConnected(device_index):
continue
# 获取设备类型
device_class = self.vr_system.getTrackedDeviceClass(device_index)
if device_class == openvr.TrackedDeviceClass_Controller:
# 获取控制器角色
role = self.vr_system.getControllerRoleForTrackedDeviceIndex(device_index)
if role == openvr.TrackedControllerRole_LeftHand and self.left_controller:
self.left_controller.set_device_index(device_index)
self.controllers[device_index] = self.left_controller
# 为设备创建锚点
self._create_tracked_device_anchor(device_index, 'left_controller')
elif role == openvr.TrackedControllerRole_RightHand and self.right_controller:
self.right_controller.set_device_index(device_index)
self.controllers[device_index] = self.right_controller
# 为设备创建锚点
self._create_tracked_device_anchor(device_index, 'right_controller')
print(f"🎮 检测到 {len(self.controllers)} 个控制器")
except Exception as e:
print(f"⚠️ 控制器检测失败: {e}")
def _create_tracked_device_anchor(self, device_index, name):
"""为跟踪设备创建锚点节点"""
if not self.tracking_space:
print(f"⚠️ 无法为设备 {device_index} 创建锚点 - tracking_space未初始化")
return
try:
# 获取设备模型名称
if self.vr_system:
model_name = self.vr_system.getStringTrackedDeviceProperty(
device_index,
openvr.Prop_RenderModelName_String
)
anchor_name = f"{device_index}:{model_name}:{name}"
else:
anchor_name = f"{device_index}:{name}"
# 创建锚点节点
device_anchor = self.tracking_space.attachNewNode(anchor_name)
self.tracked_device_anchors[device_index] = device_anchor
print(f"✓ 为设备 {device_index} 创建锚点: {anchor_name}")
except Exception as e:
print(f"⚠️ 创建设备锚点失败: {e}")
def update_tracked_devices(self):
"""更新所有跟踪设备的姿态 - 基于参考实现"""
if not self.poses or not self.vr_system:
return
try:
# 更新每个已连接的控制器
for device_index, controller in self.controllers.items():
if device_index < len(self.poses):
pose_data = self.poses[device_index]
# 更新控制器姿态
controller.update_pose(pose_data)
# 更新控制器输入状态
controller.update_input_state(self.vr_system)
# 更新其他跟踪设备的锚点
for device_index in range(1, min(len(self.poses), openvr.k_unMaxTrackedDeviceCount)):
if device_index in self.tracked_device_anchors:
pose_data = self.poses[device_index]
if pose_data.bPoseIsValid:
# 转换姿态矩阵
modelview = self.convert_mat(pose_data.mDeviceToAbsoluteTracking)
final_matrix = self.coord_mat_inv * modelview * self.coord_mat
# 更新锚点变换
anchor = self.tracked_device_anchors[device_index]
anchor.setMat(final_matrix)
anchor.show()
else:
# 姿态无效,隐藏锚点
self.tracked_device_anchors[device_index].hide()
except Exception as e:
if self.frame_count % 300 == 0: # 每5秒输出一次错误
print(f"⚠️ 更新跟踪设备失败: {e}")
def get_controller_by_role(self, role):
"""根据角色获取控制器
Args:
role: 'left''right'
Returns:
VRController实例或None
"""
if role == 'left':
return self.left_controller
elif role == 'right':
return self.right_controller
return None
def are_controllers_connected(self):
"""检查是否有控制器连接"""
return len(self.controllers) > 0
def get_connected_controllers(self):
"""获取所有连接的控制器列表"""
return list(self.controllers.values())
def trigger_controller_haptic(self, role, duration=0.001, strength=1.0):
"""触发控制器震动反馈
Args:
role: 'left', 'right''both'
duration: 震动持续时间(秒)
strength: 震动强度 (0.0-1.0)
"""
if role in ['left', 'both'] and self.left_controller:
self.left_controller.trigger_haptic_feedback(duration, strength)
if role in ['right', 'both'] and self.right_controller:
self.right_controller.trigger_haptic_feedback(duration, strength)
# VR动作系统便捷方法
def is_trigger_pressed(self, hand='any'):
"""检查扳机是否被按下
Args:
hand: 'left', 'right', 'any'
"""
device_path = None
if hand == 'left':
device_path = '/user/hand/left'
elif hand == 'right':
device_path = '/user/hand/right'
pressed, _ = self.action_manager.is_digital_action_pressed('trigger', device_path)
return pressed
def is_trigger_just_pressed(self, hand='any'):
"""检查扳机是否刚刚被按下"""
device_path = None
if hand == 'left':
device_path = '/user/hand/left'
elif hand == 'right':
device_path = '/user/hand/right'
pressed, _ = self.action_manager.is_digital_action_just_pressed('trigger', device_path)
return pressed
def is_grip_pressed(self, hand='any'):
"""检查握把是否被按下"""
device_path = None
if hand == 'left':
device_path = '/user/hand/left'
elif hand == 'right':
device_path = '/user/hand/right'
pressed, _ = self.action_manager.is_digital_action_pressed('grip', device_path)
return pressed
def is_grip_just_pressed(self, hand='any'):
"""检查握把是否刚刚被按下"""
device_path = None
if hand == 'left':
device_path = '/user/hand/left'
elif hand == 'right':
device_path = '/user/hand/right'
pressed, _ = self.action_manager.is_digital_action_just_pressed('grip', device_path)
return pressed
def is_menu_pressed(self, hand='any'):
"""检查菜单按钮是否被按下"""
device_path = None
if hand == 'left':
device_path = '/user/hand/left'
elif hand == 'right':
device_path = '/user/hand/right'
pressed, _ = self.action_manager.is_digital_action_pressed('menu', device_path)
return pressed
def is_trackpad_touched(self, hand='any'):
"""检查触摸板是否被触摸"""
device_path = None
if hand == 'left':
device_path = '/user/hand/left'
elif hand == 'right':
device_path = '/user/hand/right'
touched, _ = self.action_manager.is_digital_action_pressed('trackpad_touch', device_path)
return touched
def get_trackpad_position(self, hand='any'):
"""获取触摸板位置
Returns:
Vec2或None: 触摸板位置 (-1到1的范围)
"""
device_path = None
if hand == 'left':
device_path = '/user/hand/left'
elif hand == 'right':
device_path = '/user/hand/right'
value, _ = self.action_manager.get_analog_action_value('trackpad', device_path)
return value
# VR交互系统便捷方法
def get_selected_object(self, hand='any'):
"""获取指定手选中的对象
Args:
hand: 'left', 'right', 'any'
Returns:
选中的对象节点或None
"""
if hand == 'any':
# 返回任意手选中的对象
for controller in self.get_connected_controllers():
selected = self.interaction_manager.get_selected_object(controller.name)
if selected:
return selected
return None
else:
return self.interaction_manager.get_selected_object(hand)
def get_grabbed_object(self, hand='any'):
"""获取指定手抓取的对象
Args:
hand: 'left', 'right', 'any'
Returns:
抓取的对象节点或None
"""
if hand == 'any':
# 返回任意手抓取的对象
for controller in self.get_connected_controllers():
grabbed = self.interaction_manager.get_grabbed_object(controller.name)
if grabbed:
return grabbed
return None
else:
return self.interaction_manager.get_grabbed_object(hand)
def is_grabbing_object(self, hand='any'):
"""检查是否正在抓取对象
Args:
hand: 'left', 'right', 'any'
Returns:
bool: 是否正在抓取
"""
if hand == 'any':
# 检查任意手是否正在抓取
for controller in self.get_connected_controllers():
if self.interaction_manager.is_grabbing(controller.name):
return True
return False
else:
return self.interaction_manager.is_grabbing(hand)
def force_release_all_grabs(self):
"""强制释放所有抓取的对象"""
self.interaction_manager.force_release_all()
def add_interactable_object(self, object_node):
"""将对象标记为可交互
Args:
object_node: 要标记的对象节点
"""
self.interaction_manager._add_collision_to_object(object_node)
def _disable_async_reprojection(self):
"""禁用异步重投影 - 备选修复方案"""
try:
# 尝试通过OpenVR设置禁用异步重投影
if hasattr(openvr, 'VRSettings'):
settings = openvr.VRSettings()
if settings:
# 禁用异步重投影
error = settings.setBool("steamvr", "enableAsyncReprojection", False)
if error == openvr.VRSettingsError_None:
print("✅ 异步重投影已禁用")
else:
print(f"⚠️ 禁用异步重投影失败: 设置错误 {error}")
else:
print("⚠️ 无法获取VR设置接口")
else:
print("⚠️ OpenVR设置接口不可用")
except Exception as e:
print(f"⚠️ 禁用异步重投影失败: {e}")
def enable_async_reprojection_disable(self):
"""启用异步重投影禁用选项 - 用户可调用的方法"""
self.disable_async_reprojection = True
print("📝 异步重投影禁用选项已启用将在下次VR初始化时生效")
def disable_async_reprojection_disable(self):
"""禁用异步重投影禁用选项 - 恢复默认行为"""
self.disable_async_reprojection = False
print("📝 异步重投影禁用选项已关闭将使用默认ATW行为")
def _start_timing(self, operation_name):
"""开始计时操作"""
if not self.enable_pipeline_monitoring:
return None
import time
return {
'operation': operation_name,
'start_time': time.perf_counter()
}
def _end_timing(self, timing_data):
"""结束计时并记录结果"""
if not self.enable_pipeline_monitoring or not timing_data:
return 0.0
import time
elapsed = (time.perf_counter() - timing_data['start_time']) * 1000 # 转换为毫秒
# 保存到相应的历史记录
operation = timing_data['operation']
if operation == 'wait_poses':
self.wait_poses_time = elapsed
self.wait_poses_times.append(elapsed)
if len(self.wait_poses_times) > self.pipeline_history_size:
self.wait_poses_times.pop(0)
elif operation == 'left_render':
self.left_render_time = elapsed
elif operation == 'right_render':
self.right_render_time = elapsed
elif operation == 'submit':
self.submit_time = elapsed
self.submit_times.append(elapsed)
if len(self.submit_times) > self.pipeline_history_size:
self.submit_times.pop(0)
elif operation == 'sync_wait':
self.vr_sync_wait_time = elapsed
self.sync_wait_times.append(elapsed)
if len(self.sync_wait_times) > self.pipeline_history_size:
self.sync_wait_times.pop(0)
# 计算总渲染时间
total_render = self.left_render_time + self.right_render_time
self.render_times.append(total_render)
if len(self.render_times) > self.pipeline_history_size:
self.render_times.pop(0)
return elapsed
def _get_pipeline_stats(self):
"""获取渲染管线统计信息"""
def get_stats(times_list):
if not times_list:
return {'avg': 0.0, 'min': 0.0, 'max': 0.0}
return {
'avg': sum(times_list) / len(times_list),
'min': min(times_list),
'max': max(times_list)
}
return {
'wait_poses': get_stats(self.wait_poses_times),
'render': get_stats(self.render_times),
'submit': get_stats(self.submit_times),
'sync_wait': get_stats(self.sync_wait_times),
'current': {
'wait_poses': self.wait_poses_time,
'left_render': self.left_render_time,
'right_render': self.right_render_time,
'submit': self.submit_time,
'sync_wait': self.vr_sync_wait_time,
'total_render': self.left_render_time + self.right_render_time
},
'vr_info': {
'eye_resolution': self.current_eye_resolution,
'recommended_resolution': self.recommended_eye_resolution,
'display_frequency': self.vr_display_frequency,
'vsync_enabled': self.vr_vsync_enabled,
'target_frame_time_ms': self.target_frame_time_ms,
'vsync_to_photons_ms': self.vsync_to_photons_ms,
'vsync_window_ms': self.vsync_window_ms,
'async_reprojection': self.async_reprojection_enabled,
'motion_smoothing': self.motion_smoothing_enabled
}
}
def set_pose_strategy(self, strategy):
"""设置waitGetPoses调用策略
Args:
strategy: 'render_callback''update_task'
"""
if strategy not in ['render_callback', 'update_task']:
print(f"⚠️ 无效的姿态策略: {strategy},支持的策略: render_callback, update_task")
return False
old_strategy = self.pose_strategy
self.pose_strategy = strategy
if old_strategy != strategy:
print(f"✓ VR姿态策略已切换: {old_strategy}{strategy}")
if strategy == 'update_task':
print(f" 使用预测时间: {self.use_prediction_time*1000:.1f}ms")
print(" 优势降低VR回调延迟提高性能")
else:
print(" 使用传统渲染回调策略")
print(" 优势:更低延迟,更精确的姿态")
# 重置相关标记
if hasattr(self, '_poses_updated_this_frame'):
delattr(self, '_poses_updated_this_frame')
self.poses_updated_in_task = False
return True
def set_prediction_time(self, prediction_time_ms):
"""设置预测时间仅用于update_task策略
Args:
prediction_time_ms: 预测时间单位毫秒通常8-16ms
"""
prediction_time_s = prediction_time_ms / 1000.0
if prediction_time_s < 0.005 or prediction_time_s > 0.020:
print(f"⚠️ 预测时间超出推荐范围(5-20ms): {prediction_time_ms}ms")
old_time = self.use_prediction_time * 1000
self.use_prediction_time = prediction_time_s
print(f"✓ VR预测时间已设置: {old_time:.1f}ms → {prediction_time_ms:.1f}ms")
def get_pose_strategy_info(self):
"""获取当前姿态策略信息"""
return {
'strategy': self.pose_strategy,
'prediction_time_ms': self.use_prediction_time * 1000,
'description': {
'render_callback': '在渲染回调中获取姿态 - 低延迟,精确',
'update_task': '在更新任务中获取姿态 - 高性能,预测'
}.get(self.pose_strategy, '未知策略')
}
def test_pipeline_monitoring(self):
"""测试管线监控功能 - 用于调试和验证"""
print("🔧 正在测试VR管线监控功能...")
try:
# 测试基本信息获取
print("📊 基本信息:")
print(f" 当前分辨率: {self.current_eye_resolution}")
print(f" 显示频率: {self.vr_display_frequency} Hz")
print(f" 目标帧时间: {self.target_frame_time_ms:.2f}ms")
# 测试策略信息
strategy_info = self.get_pose_strategy_info()
print("🎯 姿态策略:")
print(f" 当前策略: {strategy_info['strategy']}")
print(f" 预测时间: {strategy_info['prediction_time_ms']:.1f}ms")
# 测试管线统计
if self.enable_pipeline_monitoring:
pipeline_stats = self._get_pipeline_stats()
print("⚙️ 管线统计:")
print(f" waitGetPoses历史: {len(self.wait_poses_times)}")
print(f" 渲染历史: {len(self.render_times)}")
print(f" 提交历史: {len(self.submit_times)}")
# 测试性能报告
print("📋 生成详细性能报告:")
self._print_performance_report()
print("✅ 管线监控功能测试完成")
else:
print("⚠️ 管线监控已禁用,无法测试统计功能")
except Exception as e:
print(f"❌ 测试管线监控功能时发生错误: {e}")
import traceback
traceback.print_exc()
def _init_performance_monitoring(self):
"""初始化性能监控库"""
self.psutil_available = False
self.gputil_available = False
self.nvidia_ml_available = False
try:
import psutil
self.psutil = psutil
self.psutil_available = True
print("✓ psutil性能监控库已加载")
except ImportError:
print("⚠️ psutil库未安装CPU和内存监控将不可用")
try:
import GPUtil
self.gputil = GPUtil
self.gputil_available = True
print("✓ GPUtil GPU监控库已加载")
except ImportError:
print("⚠️ GPUtil库未安装GPU监控将不可用")
try:
import pynvml
self.pynvml = pynvml
pynvml.nvmlInit()
self.nvidia_ml_available = True
print("✓ NVIDIA-ML GPU监控库已加载")
except ImportError:
print("⚠️ pynvml库未安装NVIDIA GPU详细监控将不可用")
except Exception as e:
print(f"⚠️ NVIDIA-ML初始化失败: {e}")
def _update_performance_metrics(self):
"""更新系统性能指标"""
if not self.performance_monitoring:
return
import time
current_time = time.time()
# 限制更新频率
if current_time - self.last_performance_check < self.performance_check_interval:
return
self.last_performance_check = current_time
try:
# 更新CPU和内存使用率
if self.psutil_available:
self.cpu_usage = self.psutil.cpu_percent(interval=None)
memory = self.psutil.virtual_memory()
self.memory_usage = memory.percent
# 更新GPU使用率
self._update_gpu_metrics()
except Exception as e:
if self.frame_count % 600 == 0: # 每10秒输出一次错误
print(f"⚠️ 性能监控更新失败: {e}")
def _update_gpu_metrics(self):
"""更新GPU相关指标"""
try:
# 方法1: 使用GPUtil
if self.gputil_available:
gpus = self.gputil.getGPUs()
if gpus:
gpu = gpus[0] # 使用第一个GPU
self.gpu_usage = gpu.load * 100
self.gpu_memory_usage = gpu.memoryUtil * 100
# 方法2: 使用NVIDIA-ML (更精确)
elif self.nvidia_ml_available:
try:
handle = self.pynvml.nvmlDeviceGetHandleByIndex(0)
# GPU使用率
utilization = self.pynvml.nvmlDeviceGetUtilizationRates(handle)
self.gpu_usage = utilization.gpu
# GPU内存使用率
memory_info = self.pynvml.nvmlDeviceGetMemoryInfo(handle)
self.gpu_memory_usage = (memory_info.used / memory_info.total) * 100
except Exception as e:
# NVIDIA-ML可能无法在某些系统上工作
pass
except Exception as e:
# GPU监控失败但不影响VR功能
pass
def _track_frame_time(self):
"""记录帧时间"""
import time
current_time = time.time()
if hasattr(self, '_last_frame_time'):
frame_time = (current_time - self._last_frame_time) * 1000 # 转换为毫秒
# 添加到帧时间历史
self.frame_times.append(frame_time)
# 限制历史长度
if len(self.frame_times) > self.max_frame_time_history:
self.frame_times.pop(0)
self._last_frame_time = current_time
def get_performance_stats(self):
"""获取详细的性能统计信息"""
stats = {
'vr_fps': self.vr_fps,
'frame_count': self.frame_count,
'submit_failures': self.submit_failures,
'pose_failures': self.pose_failures,
'cpu_usage': self.cpu_usage,
'memory_usage': self.memory_usage,
'gpu_usage': self.gpu_usage,
'gpu_memory_usage': self.gpu_memory_usage,
}
# 计算帧时间统计
if self.frame_times:
stats['frame_time_avg'] = sum(self.frame_times) / len(self.frame_times)
stats['frame_time_min'] = min(self.frame_times)
stats['frame_time_max'] = max(self.frame_times)
stats['frame_time_95th'] = sorted(self.frame_times)[int(len(self.frame_times) * 0.95)]
else:
stats['frame_time_avg'] = 0
stats['frame_time_min'] = 0
stats['frame_time_max'] = 0
stats['frame_time_95th'] = 0
return stats
def enable_performance_monitoring(self):
"""启用性能监控"""
self.performance_monitoring = True
print("✓ VR性能监控已启用")
def disable_performance_monitoring(self):
"""禁用性能监控"""
self.performance_monitoring = False
print("✓ VR性能监控已禁用")
def set_performance_check_interval(self, interval):
"""设置性能检查间隔
Args:
interval: 检查间隔建议0.1-2.0之间
"""
if 0.1 <= interval <= 5.0:
self.performance_check_interval = interval
print(f"✓ 性能监控间隔设置为 {interval:.1f}")
else:
print("⚠️ 性能监控间隔应在0.1-5.0秒之间")
def set_frame_time_history_size(self, size):
"""设置帧时间历史记录大小
Args:
size: 历史记录大小帧数建议30-300之间
"""
if 10 <= size <= 1000:
self.max_frame_time_history = size
# 清理超出的历史记录
if len(self.frame_times) > size:
self.frame_times = self.frame_times[-size:]
print(f"✓ 帧时间历史记录大小设置为 {size}")
else:
print("⚠️ 帧时间历史记录大小应在10-1000帧之间")
def set_performance_report_interval(self, frames):
"""设置性能报告输出间隔
Args:
frames: 帧数间隔建议300-7200之间5秒-2分钟@60fps
"""
if 300 <= frames <= 7200:
# 修改_update_vr中的报告间隔
print(f"✓ 性能报告间隔设置为每 {frames} 帧(约 {frames/60:.1f} 秒@60fps")
# 这里可以添加一个实例变量来控制
self.performance_report_interval = frames
else:
print("⚠️ 性能报告间隔应在300-7200帧之间")
def get_performance_monitoring_config(self):
"""获取当前性能监控配置"""
return {
'enabled': self.performance_monitoring,
'check_interval': self.performance_check_interval,
'frame_history_size': self.max_frame_time_history,
'report_interval': getattr(self, 'performance_report_interval', 1800),
'psutil_available': self.psutil_available,
'gputil_available': self.gputil_available,
'nvidia_ml_available': self.nvidia_ml_available
}
def print_performance_monitoring_status(self):
"""输出性能监控状态"""
config = self.get_performance_monitoring_config()
print("🔧 ===== VR性能监控配置 =====")
print(f" 监控状态: {'✅ 启用' if config['enabled'] else '❌ 禁用'}")
print(f" 检查间隔: {config['check_interval']:.1f}")
print(f" 帧时间历史: {config['frame_history_size']}")
print(f" 报告间隔: {config['report_interval']}")
print(f" 可用库:")
print(f" psutil (CPU/内存): {'' if config['psutil_available'] else ''}")
print(f" GPUtil (GPU): {'' if config['gputil_available'] else ''}")
print(f" NVIDIA-ML (GPU): {'' if config['nvidia_ml_available'] else ''}")
print("=============================")
def force_performance_report(self):
"""强制输出一次性能报告"""
print("🔄 手动触发性能报告...")
self._print_performance_report()
def reset_performance_counters(self):
"""重置性能计数器"""
self.frame_count = 0
self.last_fps_check = 0
self.last_fps_time = 0
self.vr_fps = 0
self.submit_failures = 0
self.pose_failures = 0
self.frame_times.clear()
print("✅ 性能计数器已重置")
def get_current_performance_summary(self):
"""获取当前性能摘要(简短版本)"""
stats = self.get_performance_stats()
summary = f"VR性能: {stats['vr_fps']:.1f}fps"
if stats['frame_time_avg'] > 0:
summary += f" | 帧时间: {stats['frame_time_avg']:.1f}ms"
if self.psutil_available:
summary += f" | CPU: {stats['cpu_usage']:.0f}%"
summary += f" | 内存: {stats['memory_usage']:.0f}%"
# 显示GPU信息如果库不可用则显示提示
if self.gputil_available or self.nvidia_ml_available:
summary += f" | GPU: {stats['gpu_usage']:.0f}%"
else:
summary += " | GPU: N/A"
return summary
def enable_debug_output(self):
"""启用调试输出"""
self.debug_output_enabled = True
print("✓ VR调试输出已启用")
def disable_debug_output(self):
"""禁用调试输出"""
self.debug_output_enabled = False
print("✓ VR调试输出已禁用")
def set_debug_mode(self, mode):
"""设置调试模式
Args:
mode: 'brief''detailed'
"""
if mode in ['brief', 'detailed']:
self.debug_mode = mode
print(f"✓ VR调试模式设置为: {mode}")
else:
print("⚠️ 调试模式只能是 'brief''detailed'")
def toggle_debug_output(self):
"""切换调试输出状态"""
self.debug_output_enabled = not self.debug_output_enabled
status = "启用" if self.debug_output_enabled else "禁用"
print(f"✓ VR调试输出已{status}")
return self.debug_output_enabled
def get_debug_status(self):
"""获取调试状态"""
return {
'debug_enabled': self.debug_output_enabled,
'debug_mode': self.debug_mode,
'performance_monitoring': self.performance_monitoring,
'report_interval_frames': getattr(self, 'performance_report_interval', 600),
'report_interval_seconds': getattr(self, 'performance_report_interval', 600) / 60, # 假设60fps
}