432 lines
16 KiB
Python
432 lines
16 KiB
Python
"""
|
||
VR手柄管理模块
|
||
|
||
基于panda3d-openvr参考实现,提供完整的VR手柄追踪和交互功能:
|
||
- 手柄位置和姿态追踪
|
||
- 按钮和触摸板输入处理
|
||
- 手柄可视化和射线显示
|
||
- 震动反馈支持
|
||
"""
|
||
|
||
from panda3d.core import (
|
||
NodePath, PandaNode, Vec3, Mat4, LVector3, LMatrix4,
|
||
GeomNode, LineSegs, CardMaker, Texture, RenderState,
|
||
TransparencyAttrib, ColorAttrib, Vec4
|
||
)
|
||
from direct.actor.Actor import Actor
|
||
from direct.showbase.DirectObject import DirectObject
|
||
|
||
try:
|
||
import openvr
|
||
OPENVR_AVAILABLE = True
|
||
except ImportError:
|
||
OPENVR_AVAILABLE = False
|
||
|
||
# 导入可视化器
|
||
from .vr_visualization import VRControllerVisualizer
|
||
|
||
|
||
class VRController(DirectObject):
|
||
"""VR手柄基类 - 管理单个手柄的追踪和交互"""
|
||
|
||
def __init__(self, vr_manager, name, hand_path, device_index=None):
|
||
"""初始化VR手柄
|
||
|
||
Args:
|
||
vr_manager: VR管理器实例
|
||
name: 手柄名称 ('left' 或 'right')
|
||
hand_path: OpenVR手部路径 ('/user/hand/left' 或 '/user/hand/right')
|
||
device_index: OpenVR设备索引(可选)
|
||
"""
|
||
super().__init__()
|
||
|
||
self.vr_manager = vr_manager
|
||
self.name = name
|
||
self.hand_path = hand_path
|
||
self.device_index = device_index
|
||
|
||
# 手柄状态
|
||
self.is_connected = False
|
||
self.is_pose_valid = False
|
||
self.pose = Mat4.identMat()
|
||
self.velocity = Vec3(0, 0, 0)
|
||
self.angular_velocity = Vec3(0, 0, 0)
|
||
|
||
# 按钮状态
|
||
self.button_states = {}
|
||
self.previous_button_states = {}
|
||
self.trigger_value = 0.0
|
||
self.grip_value = 0.0
|
||
self.touchpad_pos = Vec3(0, 0, 0)
|
||
self.touchpad_touched = False
|
||
|
||
# 摇杆状态 - 用于传送和转向交互
|
||
self.joystick_pos = Vec3(0, 0, 0) # 摇杆位置 (x, y, 0)
|
||
self.joystick_touched = False # 摇杆是否被触摸
|
||
self.joystick_pressed = False # 摇杆是否被按下
|
||
self.previous_joystick_pos = Vec3(0, 0, 0) # 上一帧摇杆位置
|
||
|
||
# 3D节点和可视化
|
||
self.anchor_node = None
|
||
self.visualizer = None
|
||
self.ray_length = 10.0
|
||
|
||
# 初始化
|
||
self._create_anchor()
|
||
self._create_visualizer()
|
||
|
||
print(f"✓ {name}手柄控制器初始化完成")
|
||
|
||
def _create_anchor(self):
|
||
"""创建手柄锚点节点"""
|
||
if self.vr_manager.tracking_space:
|
||
self.anchor_node = self.vr_manager.tracking_space.attachNewNode(f'{self.name}-controller')
|
||
self.anchor_node.hide() # 初始隐藏,直到获得有效姿态
|
||
|
||
def _create_visualizer(self):
|
||
"""创建手柄可视化器"""
|
||
if self.anchor_node and hasattr(self.vr_manager, 'world'):
|
||
self.visualizer = VRControllerVisualizer(self, self.vr_manager.world.render)
|
||
elif self.anchor_node:
|
||
# 如果没有世界对象,使用基础渲染节点
|
||
from panda3d.core import NodePath
|
||
render = NodePath('render')
|
||
self.visualizer = VRControllerVisualizer(self, render)
|
||
|
||
def set_device_index(self, device_index):
|
||
"""设置OpenVR设备索引"""
|
||
self.device_index = device_index
|
||
self.is_connected = True
|
||
print(f"📱 {self.name}手柄连接 (设备索引: {device_index})")
|
||
|
||
def update_pose(self, pose_data):
|
||
"""更新手柄姿态
|
||
|
||
Args:
|
||
pose_data: OpenVR TrackedDevicePose_t数据
|
||
"""
|
||
if not pose_data.bPoseIsValid:
|
||
self.is_pose_valid = False
|
||
if self.anchor_node:
|
||
self.anchor_node.hide()
|
||
return
|
||
|
||
self.is_pose_valid = True
|
||
|
||
# 转换OpenVR矩阵到Panda3D
|
||
if hasattr(self.vr_manager, 'convert_mat') and hasattr(self.vr_manager, 'coord_mat_inv') and hasattr(self.vr_manager, 'coord_mat'):
|
||
modelview = self.vr_manager.convert_mat(pose_data.mDeviceToAbsoluteTracking)
|
||
self.pose = self.vr_manager.coord_mat_inv * modelview * self.vr_manager.coord_mat
|
||
else:
|
||
# 直接使用矩阵数据
|
||
m = pose_data.mDeviceToAbsoluteTracking.m
|
||
self.pose = LMatrix4(
|
||
m[0][0], m[1][0], m[2][0], m[3][0],
|
||
m[0][1], m[1][1], m[2][1], m[3][1],
|
||
m[0][2], m[1][2], m[2][2], m[3][2],
|
||
m[0][3], m[1][3], m[2][3], m[3][3]
|
||
)
|
||
|
||
# 更新锚点变换
|
||
if self.anchor_node:
|
||
self.anchor_node.setMat(self.pose)
|
||
self.anchor_node.show()
|
||
|
||
# 更新可视化
|
||
if self.visualizer:
|
||
self.visualizer.update()
|
||
|
||
# 更新速度信息
|
||
vel = pose_data.vVelocity
|
||
self.velocity = Vec3(vel[0], vel[1], vel[2])
|
||
|
||
ang_vel = pose_data.vAngularVelocity
|
||
self.angular_velocity = Vec3(ang_vel[0], ang_vel[1], ang_vel[2])
|
||
|
||
def update_input_state(self, vr_system):
|
||
"""更新输入状态
|
||
|
||
Args:
|
||
vr_system: OpenVR系统实例
|
||
"""
|
||
if not self.is_connected or not OPENVR_AVAILABLE or not vr_system:
|
||
return
|
||
|
||
# 保存上一帧的按钮状态和摇杆位置
|
||
self.previous_button_states = self.button_states.copy()
|
||
self.previous_joystick_pos = Vec3(self.joystick_pos)
|
||
|
||
# 获取控制器状态
|
||
try:
|
||
result, state = vr_system.getControllerState(self.device_index)
|
||
if result:
|
||
# 更新按钮状态 - 使用正确的OpenVR属性名
|
||
for i in range(openvr.k_EButton_Max):
|
||
button_mask = 1 << i
|
||
# OpenVR Python绑定中使用ulButtonPressed而不是rButtonPressed
|
||
self.button_states[i] = (state.ulButtonPressed & button_mask) != 0
|
||
|
||
# 更新轴状态(扳机、握把、触摸板、摇杆)
|
||
# 兼容不同版本的OpenVR Python绑定
|
||
axis_data = None
|
||
if hasattr(state, 'rAxis'):
|
||
axis_data = state.rAxis # 旧版本使用rAxis
|
||
elif hasattr(state, 'vAxis'):
|
||
axis_data = state.vAxis # 新版本使用vAxis
|
||
|
||
if axis_data is not None and len(axis_data) > 0:
|
||
# 调试输出 - 显示所有轴数据(仅当有变化时)
|
||
self._debug_axis_data(axis_data)
|
||
|
||
# 扳机轴通常在axis[1].x
|
||
if len(axis_data) > 1:
|
||
self.trigger_value = axis_data[1].x
|
||
|
||
# 触摸板/摇杆轴通常在axis[0]
|
||
if len(axis_data) > 0:
|
||
self.touchpad_pos = Vec3(axis_data[0].x, axis_data[0].y, 0)
|
||
# 摇杆和触摸板通常使用同一个轴,但可以区分设备类型
|
||
self.joystick_pos = Vec3(axis_data[0].x, axis_data[0].y, 0)
|
||
|
||
# 额外检查其他轴(某些控制器可能将摇杆分配到不同轴)
|
||
if len(axis_data) > 2:
|
||
# 有些控制器可能在axis[2]有摇杆数据
|
||
axis2_magnitude = abs(axis_data[2].x) + abs(axis_data[2].y)
|
||
if axis2_magnitude > 0.1: # 如果有显著输入,使用这个轴作为摇杆
|
||
self.joystick_pos = Vec3(axis_data[2].x, axis_data[2].y, 0)
|
||
|
||
# 检查axis[3]和axis[4](Quest控制器可能使用这些轴)
|
||
for axis_idx in range(3, min(len(axis_data), 5)):
|
||
axis_magnitude = abs(axis_data[axis_idx].x) + abs(axis_data[axis_idx].y)
|
||
if axis_magnitude > 0.1: # 如果有显著输入
|
||
# 覆盖之前的摇杆数据,使用最有活动的轴
|
||
self.joystick_pos = Vec3(axis_data[axis_idx].x, axis_data[axis_idx].y, 0)
|
||
# 调试输出
|
||
if not hasattr(self, '_last_axis_notify'):
|
||
self._last_axis_notify = {}
|
||
if self._last_axis_notify.get(axis_idx, 0) == 0:
|
||
print(f"🎮 {self.name}手检测到axis[{axis_idx}]活动: ({axis_data[axis_idx].x:.3f}, {axis_data[axis_idx].y:.3f})")
|
||
self._last_axis_notify[axis_idx] = 60 # 60帧后再次提醒
|
||
else:
|
||
self._last_axis_notify[axis_idx] -= 1
|
||
|
||
# 触摸板和摇杆触摸状态
|
||
self.touchpad_touched = (state.ulButtonTouched & (1 << openvr.k_EButton_SteamVR_Touchpad)) != 0
|
||
|
||
# 摇杆触摸状态(检查多个可能的按钮)
|
||
joystick_touch_mask = 0
|
||
if hasattr(openvr, 'k_EButton_Joystick'):
|
||
joystick_touch_mask |= (1 << openvr.k_EButton_Joystick)
|
||
if hasattr(openvr, 'k_EButton_Thumbstick'):
|
||
joystick_touch_mask |= (1 << openvr.k_EButton_Thumbstick)
|
||
|
||
self.joystick_touched = (state.ulButtonTouched & joystick_touch_mask) != 0 or self.touchpad_touched
|
||
|
||
# 摇杆按下状态
|
||
joystick_press_mask = 0
|
||
if hasattr(openvr, 'k_EButton_Joystick'):
|
||
joystick_press_mask |= (1 << openvr.k_EButton_Joystick)
|
||
if hasattr(openvr, 'k_EButton_Thumbstick'):
|
||
joystick_press_mask |= (1 << openvr.k_EButton_Thumbstick)
|
||
|
||
self.joystick_pressed = (state.ulButtonPressed & joystick_press_mask) != 0
|
||
|
||
except Exception as e:
|
||
# 减少错误输出频率
|
||
if not hasattr(self, '_last_input_error_frame'):
|
||
self._last_input_error_frame = 0
|
||
|
||
# 获取当前帧数(通过VR管理器)
|
||
current_frame = getattr(self.vr_manager, 'frame_count', 0)
|
||
|
||
# 每5秒最多输出一次错误(300帧@60fps)
|
||
if current_frame - self._last_input_error_frame > 300:
|
||
print(f"⚠️ 更新{self.name}手柄输入状态失败: {e}")
|
||
self._last_input_error_frame = current_frame
|
||
|
||
def is_button_pressed(self, button_id):
|
||
"""检查按钮是否被按下"""
|
||
return self.button_states.get(button_id, False)
|
||
|
||
def is_button_just_pressed(self, button_id):
|
||
"""检查按钮是否刚刚被按下(上升沿)"""
|
||
current = self.button_states.get(button_id, False)
|
||
previous = self.previous_button_states.get(button_id, False)
|
||
return current and not previous
|
||
|
||
def is_button_just_released(self, button_id):
|
||
"""检查按钮是否刚刚被释放(下降沿)"""
|
||
current = self.button_states.get(button_id, False)
|
||
previous = self.previous_button_states.get(button_id, False)
|
||
return not current and previous
|
||
|
||
def is_trigger_pressed(self, threshold=0.1):
|
||
"""检查扳机是否被按下"""
|
||
return self.trigger_value > threshold
|
||
|
||
def is_grip_pressed(self, threshold=0.1):
|
||
"""检查握把是否被按下"""
|
||
return self.grip_value > threshold
|
||
|
||
def show_ray(self, show=True):
|
||
"""显示或隐藏交互射线"""
|
||
if self.visualizer:
|
||
if show:
|
||
self.visualizer.show_ray()
|
||
else:
|
||
self.visualizer.hide_ray()
|
||
|
||
def set_ray_color(self, color):
|
||
"""设置射线颜色"""
|
||
if self.visualizer and len(color) >= 3:
|
||
from panda3d.core import Vec4
|
||
color_vec = Vec4(color[0], color[1], color[2], color[3] if len(color) > 3 else 1.0)
|
||
self.visualizer.set_ray_color(color_vec)
|
||
|
||
def trigger_haptic_feedback(self, duration=0.001, strength=1.0):
|
||
"""触发震动反馈
|
||
|
||
Args:
|
||
duration: 震动持续时间(秒)
|
||
strength: 震动强度 (0.0-1.0)
|
||
"""
|
||
if not self.is_connected or not OPENVR_AVAILABLE:
|
||
return
|
||
|
||
try:
|
||
if hasattr(self.vr_manager, 'vr_system') and self.vr_manager.vr_system:
|
||
# OpenVR的震动API
|
||
duration_microseconds = int(duration * 1000000)
|
||
self.vr_manager.vr_system.triggerHapticPulse(
|
||
self.device_index,
|
||
0, # axis ID (通常为0)
|
||
int(strength * 3999) # 强度 (0-3999)
|
||
)
|
||
except Exception as e:
|
||
print(f"⚠️ {self.name}手柄震动反馈失败: {e}")
|
||
|
||
def get_world_position(self):
|
||
"""获取手柄在世界坐标系中的位置"""
|
||
if self.anchor_node:
|
||
return self.anchor_node.getPos(self.vr_manager.world.render)
|
||
return Vec3(0, 0, 0)
|
||
|
||
def get_world_rotation(self):
|
||
"""获取手柄在世界坐标系中的旋转"""
|
||
if self.anchor_node:
|
||
return self.anchor_node.getHpr(self.vr_manager.world.render)
|
||
return Vec3(0, 0, 0)
|
||
|
||
def get_forward_direction(self):
|
||
"""获取手柄指向的方向向量(包含视角转向)"""
|
||
if self.anchor_node:
|
||
# 获取相对于世界坐标系的方向,包含tracking_space的旋转
|
||
if hasattr(self.vr_manager, 'world') and self.vr_manager.world:
|
||
# 使用世界变换,包含所有父节点的旋转
|
||
world_transform = self.anchor_node.getMat(self.vr_manager.world.render)
|
||
forward = Vec3(world_transform.getRow3(1)) # Y轴 = 前方
|
||
else:
|
||
# 备选:使用局部变换
|
||
forward = Vec3(self.anchor_node.getMat().getRow3(1))
|
||
|
||
if forward.length() > 0:
|
||
return forward.normalized()
|
||
return Vec3(0, 1, 0)
|
||
|
||
def is_joystick_touched(self):
|
||
"""检查摇杆是否被触摸"""
|
||
return self.joystick_touched
|
||
|
||
def is_joystick_pressed(self):
|
||
"""检查摇杆是否被按下"""
|
||
return self.joystick_pressed
|
||
|
||
def get_joystick_position(self):
|
||
"""获取摇杆位置
|
||
|
||
Returns:
|
||
Vec3: 摇杆位置 (x, y, 0),范围 [-1, 1]
|
||
"""
|
||
return Vec3(self.joystick_pos)
|
||
|
||
def get_joystick_delta(self):
|
||
"""获取摇杆位置变化
|
||
|
||
Returns:
|
||
Vec3: 摇杆位置变化向量
|
||
"""
|
||
return self.joystick_pos - self.previous_joystick_pos
|
||
|
||
def is_joystick_moved(self, threshold=0.01):
|
||
"""检查摇杆是否移动
|
||
|
||
Args:
|
||
threshold: 移动阈值
|
||
|
||
Returns:
|
||
bool: 是否移动
|
||
"""
|
||
delta = self.get_joystick_delta()
|
||
return delta.length() > threshold
|
||
|
||
def _debug_axis_data(self, axis_data):
|
||
"""调试输出轴数据"""
|
||
try:
|
||
# 只在有活动时输出调试信息
|
||
has_activity = False
|
||
active_axes = []
|
||
|
||
for i, axis in enumerate(axis_data):
|
||
magnitude = abs(axis.x) + abs(axis.y)
|
||
if magnitude > 0.01: # 检测到活动
|
||
has_activity = True
|
||
active_axes.append(f"axis[{i}]: ({axis.x:.3f}, {axis.y:.3f})")
|
||
|
||
if has_activity:
|
||
# 初始化调试计数器
|
||
if not hasattr(self, '_debug_axis_counter'):
|
||
self._debug_axis_counter = 0
|
||
|
||
self._debug_axis_counter += 1
|
||
|
||
# 每30帧输出一次详细信息
|
||
if self._debug_axis_counter % 30 == 1:
|
||
print(f"🔍 {self.name}手轴数据调试:")
|
||
print(f" 总轴数: {len(axis_data)}")
|
||
print(f" 活跃轴: {', '.join(active_axes)}")
|
||
|
||
# 显示所有轴的当前值(不管是否活跃)
|
||
all_axes = []
|
||
for i, axis in enumerate(axis_data):
|
||
all_axes.append(f"[{i}]:({axis.x:.3f},{axis.y:.3f})")
|
||
print(f" 所有轴: {' '.join(all_axes)}")
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ 轴数据调试失败: {e}")
|
||
|
||
def cleanup(self):
|
||
"""清理资源"""
|
||
self.ignoreAll()
|
||
|
||
if self.visualizer:
|
||
self.visualizer.cleanup()
|
||
|
||
if self.anchor_node:
|
||
self.anchor_node.removeNode()
|
||
|
||
self.is_connected = False
|
||
print(f"🧹 {self.name}手柄控制器已清理")
|
||
|
||
|
||
class LeftController(VRController):
|
||
"""左手控制器"""
|
||
|
||
def __init__(self, vr_manager):
|
||
super().__init__(vr_manager, 'left', '/user/hand/left')
|
||
|
||
|
||
class RightController(VRController):
|
||
"""右手控制器"""
|
||
|
||
def __init__(self, vr_manager):
|
||
super().__init__(vr_manager, 'right', '/user/hand/right') |