1
0
forked from Rowland/EG
EG/core/vr_joystick.py
2025-09-29 10:52:39 +08:00

701 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
VR摇杆交互系统模块
提供类似SteamVR的摇杆交互功能
- 摇杆左右转向(旋转视角)
- 摇杆向前传送预览(抛物线轨迹)
- 松开摇杆执行传送
- 死区处理和平滑控制
"""
import math
from panda3d.core import Vec2, Vec3, Vec4
from direct.showbase.DirectObject import DirectObject
try:
import openvr
OPENVR_AVAILABLE = True
except ImportError:
OPENVR_AVAILABLE = False
class VRJoystickManager(DirectObject):
"""VR摇杆管理器 - 处理手柄摇杆的转向和传送功能"""
def __init__(self, vr_manager):
"""初始化VR摇杆管理器
Args:
vr_manager: VR管理器实例
"""
super().__init__()
self.vr_manager = vr_manager
self.teleport_system = None # 传送系统引用,稍后初始化
# 摇杆参数配置
self.deadzone = 0.15 # 摇杆死区 (0-1)
self.turn_threshold = 0.3 # 转向激活阈值
self.teleport_threshold = 0.5 # 传送激活阈值
self.turn_sensitivity = 250.0 # 转向灵敏度(度/秒)- 增加速度
self.smooth_turning = True # 是否平滑转向
self.snap_turn_angle = 30.0 # 分段转向角度(度)
# 摇杆状态跟踪
self.left_joystick_state = JoystickState()
self.right_joystick_state = JoystickState()
# 转向状态
self.left_turn_cooldown = 0.0 # 分段转向冷却时间
self.right_turn_cooldown = 0.0
self.snap_turn_cooldown = 0.3 # 分段转向间隔(秒)
# 传送状态
self.active_teleport_controller = None # 正在传送的控制器
self.teleport_preview_active = False # 传送预览是否激活
# 互斥状态管理 - 防止同时触发多种操作
self.interaction_mode = 'none' # 当前交互模式: 'none', 'turning', 'teleporting'
self.left_controller_mode = 'none' # 左手控制器状态
self.right_controller_mode = 'none' # 右手控制器状态
self.mode_lock_timeout = 0.1 # 模式锁定超时时间(秒)
self.left_mode_timer = 0.0 # 左手模式计时器
self.right_mode_timer = 0.0 # 右手模式计时器
print("✓ VR摇杆管理器初始化完成")
def initialize(self, teleport_system):
"""初始化摇杆系统
Args:
teleport_system: VR传送系统实例
"""
self.teleport_system = teleport_system
print("✅ VR摇杆系统初始化成功")
def update(self, dt):
"""更新摇杆系统 - 每帧调用
Args:
dt: 帧间隔时间(秒)
"""
if not self.vr_manager.are_controllers_connected():
return
# 调试计数器
if not hasattr(self, '_debug_frame_count'):
self._debug_frame_count = 0
print("🎮 VR摇杆系统开始更新")
self._debug_frame_count += 1
# 更新转向冷却时间
if self.left_turn_cooldown > 0:
self.left_turn_cooldown -= dt
if self.right_turn_cooldown > 0:
self.right_turn_cooldown -= dt
# 更新互斥状态计时器
self._update_interaction_modes(dt)
# 处理左手控制器摇杆
if self.vr_manager.left_controller:
self._update_controller_joystick(
self.vr_manager.left_controller,
self.left_joystick_state,
'left',
dt
)
# 处理右手控制器摇杆
if self.vr_manager.right_controller:
self._update_controller_joystick(
self.vr_manager.right_controller,
self.right_joystick_state,
'right',
dt
)
# 每5秒输出一次状态报告
if self._debug_frame_count % 300 == 1: # 假设60fps
self._print_debug_status()
def _update_controller_joystick(self, controller, joystick_state, hand, dt):
"""更新单个控制器的摇杆状态
Args:
controller: VR控制器实例
joystick_state: 摇杆状态对象
hand: 'left''right'
dt: 帧间隔时间
"""
# 获取摇杆输入
joystick_input = self._get_joystick_input(controller, hand)
if joystick_input is None:
return
# 应用死区
filtered_input = self._apply_deadzone(joystick_input)
# 更新摇杆状态
joystick_state.update(filtered_input)
# 处理转向(左右移动)
self._handle_turning(filtered_input, hand, dt)
# 处理传送(向前移动)
self._handle_teleport(controller, filtered_input, joystick_state, hand)
def _update_interaction_modes(self, dt):
"""更新互斥交互模式状态"""
# 更新左手模式计时器
if self.left_mode_timer > 0:
self.left_mode_timer -= dt
if self.left_mode_timer <= 0:
self.left_controller_mode = 'none'
# 更新右手模式计时器
if self.right_mode_timer > 0:
self.right_mode_timer -= dt
if self.right_mode_timer <= 0:
self.right_controller_mode = 'none'
# 更新全局交互模式
if self.left_controller_mode == 'none' and self.right_controller_mode == 'none':
if self.interaction_mode != 'none':
# 所有操作结束,恢复自由模式
self.interaction_mode = 'none'
print("🔓 摇杆交互模式解锁,恢复自由操作")
def _set_controller_mode(self, hand, mode):
"""设置控制器交互模式
Args:
hand: 'left''right'
mode: 'turning', 'teleporting', 'none'
"""
if hand == 'left':
if self.left_controller_mode != mode:
old_mode = self.left_controller_mode
self.left_controller_mode = mode
self.left_mode_timer = self.mode_lock_timeout
if mode != 'none':
print(f"🔒 左手控制器: {old_mode}{mode}模式")
else:
print(f"🔓 左手控制器解锁: {old_mode} → 自由")
else:
if self.right_controller_mode != mode:
old_mode = self.right_controller_mode
self.right_controller_mode = mode
self.right_mode_timer = self.mode_lock_timeout
if mode != 'none':
print(f"🔒 右手控制器: {old_mode}{mode}模式")
else:
print(f"🔓 右手控制器解锁: {old_mode} → 自由")
# 更新全局模式
if mode != 'none' and self.interaction_mode != mode:
self.interaction_mode = mode
def _can_use_mode(self, hand, requested_mode):
"""检查是否可以使用指定的交互模式
Args:
hand: 'left''right'
requested_mode: 'turning''teleporting'
Returns:
bool: 是否可以使用该模式
"""
current_mode = self.left_controller_mode if hand == 'left' else self.right_controller_mode
# 如果当前控制器已经是该模式,允许继续
if current_mode == requested_mode:
return True
# 如果当前控制器是空闲的,且全局模式兼容,允许切换
if current_mode == 'none':
if self.interaction_mode == 'none' or self.interaction_mode == requested_mode:
return True
# 其他情况不允许
return False
def _get_joystick_input(self, controller, hand):
"""获取摇杆输入
Args:
controller: VR控制器实例
hand: 'left''right'
Returns:
Vec2: 摇杆位置 (x, y) 或 None
"""
# 直接从控制器读取摇杆输入(绕过动作系统)
joystick_input = Vec2(0, 0)
# 优先读取joystick_pos
if hasattr(controller, 'joystick_pos') and controller.joystick_pos:
joystick_input = Vec2(controller.joystick_pos.x, controller.joystick_pos.y)
# 检查是否有有效输入
if joystick_input.length() > 0.01:
# 调试输出 - 仅在有输入时显示
if not hasattr(self, '_last_debug_time'):
self._last_debug_time = 0
self._debug_counter = 0
self._debug_counter += 1
# 每30帧输出一次调试信息
if self._debug_counter % 30 == 1:
print(f"🎮 {hand}手摇杆输入: ({joystick_input.x:.3f}, {joystick_input.y:.3f})")
return joystick_input
# 备选方案读取touchpad_posQuest等设备
if hasattr(controller, 'touchpad_pos') and controller.touchpad_pos:
touchpad_input = Vec2(controller.touchpad_pos.x, controller.touchpad_pos.y)
# 检查是否有有效输入
if touchpad_input.length() > 0.01:
# 调试输出 - 仅在有输入时显示
if not hasattr(self, '_last_debug_time'):
self._last_debug_time = 0
self._debug_counter = 0
self._debug_counter += 1
# 每30帧输出一次调试信息
if self._debug_counter % 30 == 1:
print(f"🎮 {hand}手触摸板输入: ({touchpad_input.x:.3f}, {touchpad_input.y:.3f})")
return touchpad_input
# 可选:尝试从动作系统获取(如果可用)
if (self.vr_manager.action_manager and
hasattr(self.vr_manager.action_manager, 'get_analog_action_value')):
try:
device_path = f'/user/hand/{hand}'
# 尝试摇杆
joystick_value, _ = self.vr_manager.action_manager.get_analog_action_value('joystick', device_path)
if joystick_value is not None:
return Vec2(joystick_value.x, joystick_value.y)
# 尝试触摸板
trackpad_value, _ = self.vr_manager.action_manager.get_analog_action_value('trackpad', device_path)
if trackpad_value is not None:
return Vec2(trackpad_value.x, trackpad_value.y)
except Exception:
# 静默忽略动作系统错误
pass
return Vec2(0, 0)
def _apply_deadzone(self, input_vec):
"""应用摇杆死区
Args:
input_vec: 原始摇杆输入
Returns:
Vec2: 应用死区后的输入
"""
magnitude = input_vec.length()
if magnitude < self.deadzone:
return Vec2(0, 0)
# 重新映射到 [0, 1] 范围
normalized_magnitude = (magnitude - self.deadzone) / (1.0 - self.deadzone)
normalized_magnitude = min(normalized_magnitude, 1.0)
if magnitude > 0:
direction = input_vec / magnitude
return direction * normalized_magnitude
return Vec2(0, 0)
def _handle_turning(self, input_vec, hand, dt):
"""处理摇杆转向
Args:
input_vec: 摇杆输入向量
hand: 'left''right'
dt: 帧间隔时间
"""
# 检查是否超过转向阈值
if abs(input_vec.x) < self.turn_threshold:
# 没有转向输入,重置该控制器的转向模式
current_mode = self.left_controller_mode if hand == 'left' else self.right_controller_mode
if current_mode == 'turning':
self._set_controller_mode(hand, 'none')
return
# 检查是否可以使用转向模式
if not self._can_use_mode(hand, 'turning'):
# 当前控制器被传送锁定,忽略转向输入
current_mode = self.left_controller_mode if hand == 'left' else self.right_controller_mode
if current_mode != 'turning':
print(f"{hand}手转向被阻止 - 当前模式: {current_mode}")
return
# 激活转向模式
self._set_controller_mode(hand, 'turning')
# 检查冷却时间(分段转向)
cooldown = self.left_turn_cooldown if hand == 'left' else self.right_turn_cooldown
if self.smooth_turning:
# 平滑转向 - 反转方向:摇杆右移(+x)应该向右转(-angle)
turn_amount = -input_vec.x * self.turn_sensitivity * dt
self._apply_rotation(turn_amount)
# 调试输出转向
if not hasattr(self, '_turn_debug_counter'):
self._turn_debug_counter = 0
self._turn_debug_counter += 1
if self._turn_debug_counter % 60 == 1: # 每秒输出一次
print(f"🔄 {hand}手转向: 输入={input_vec.x:.3f}, 角度={turn_amount:.1f}°")
else:
# 分段转向
if cooldown <= 0:
# 反转方向:摇杆右移应该向右转
turn_amount = -self.snap_turn_angle if input_vec.x > 0 else self.snap_turn_angle
self._apply_rotation(turn_amount)
print(f"🔄 {hand}手分段转向: 输入={input_vec.x:.3f}, 角度={turn_amount:.1f}°")
# 设置冷却时间
if hand == 'left':
self.left_turn_cooldown = self.snap_turn_cooldown
else:
self.right_turn_cooldown = self.snap_turn_cooldown
def _apply_rotation(self, angle_degrees):
"""应用旋转到VR跟踪空间
Args:
angle_degrees: 旋转角度(度)
"""
if not self.vr_manager.tracking_space:
return
# 绕Z轴旋转垂直轴
current_h = self.vr_manager.tracking_space.getH()
new_h = current_h + angle_degrees
self.vr_manager.tracking_space.setH(new_h)
def _handle_teleport(self, controller, input_vec, joystick_state, hand):
"""处理摇杆传送
Args:
controller: VR控制器实例
input_vec: 摇杆输入向量
joystick_state: 摇杆状态对象
hand: 'left''right'
"""
# 检查Y轴向前移动是否超过阈值
forward_input = input_vec.y
if forward_input > self.teleport_threshold:
# 检查是否可以使用传送模式
if not self._can_use_mode(hand, 'teleporting'):
# 当前控制器被转向锁定,忽略传送输入
current_mode = self.left_controller_mode if hand == 'left' else self.right_controller_mode
if current_mode != 'teleporting':
print(f"{hand}手传送被阻止 - 当前模式: {current_mode}")
return
# 激活传送模式
self._set_controller_mode(hand, 'teleporting')
# 开始或更新传送预览
if not joystick_state.teleport_active:
joystick_state.teleport_active = True
self.active_teleport_controller = controller
self.teleport_preview_active = True
# 触发震动反馈
controller.trigger_haptic_feedback(0.002, 0.3)
# 计算传送方向
direction = self._calculate_teleport_direction(controller, input_vec)
# 更新传送预览
if self.teleport_system:
if joystick_state.teleport_just_started:
self.teleport_system.start_teleport_preview(controller, direction)
joystick_state.teleport_just_started = False
else:
self.teleport_system.update_teleport_preview(controller, direction)
else:
# 检查是否需要执行传送
if joystick_state.teleport_active:
# 松开摇杆,执行传送
self._execute_teleport(controller)
joystick_state.teleport_active = False
joystick_state.teleport_just_started = True
# 重置该控制器的传送模式
self._set_controller_mode(hand, 'none')
else:
# 没有传送输入,检查是否需要重置传送模式
current_mode = self.left_controller_mode if hand == 'left' else self.right_controller_mode
if current_mode == 'teleporting':
self._set_controller_mode(hand, 'none')
def _calculate_teleport_direction(self, controller, input_vec):
"""计算传送方向向量 - 基于手柄姿态
Args:
controller: VR控制器实例
input_vec: 摇杆输入向量(仅用于激活,不影响方向)
Returns:
Vec3: 世界坐标中的传送方向
"""
# 方法1直接使用手柄指向推荐
if controller and hasattr(controller, 'get_forward_direction'):
try:
# 获取手柄的前向方向
controller_forward = controller.get_forward_direction()
# 确保方向向量在水平面上
horizontal_direction = Vec3(controller_forward.x, controller_forward.y, 0)
if horizontal_direction.length() > 0:
horizontal_direction.normalize()
# 调试输出每30帧输出一次
if not hasattr(self, '_direction_debug_counter'):
self._direction_debug_counter = 0
self._direction_debug_counter += 1
if self._direction_debug_counter % 30 == 1:
# 获取当前tracking_space的旋转角度用于调试
tracking_rotation = 0
if self.vr_manager.tracking_space:
tracking_rotation = self.vr_manager.tracking_space.getH()
print(f"🎯 综合传送方向: 视角旋转({tracking_rotation:.1f}°) + 手柄姿态({controller_forward.x:.2f},{controller_forward.y:.2f},{controller_forward.z:.2f}) → 最终方向({horizontal_direction.x:.2f},{horizontal_direction.y:.2f})")
return horizontal_direction
except Exception as e:
print(f"⚠️ 获取手柄方向失败: {e}")
# 备选方案:使用玩家朝向(保留原逻辑作为备选)
print("🔄 使用备选传送方向计算...")
# 获取玩家当前朝向的变换矩阵
player_transform = None
# 优先使用头显的世界变换
if self.vr_manager.hmd_anchor and hasattr(self.vr_manager, 'world') and self.vr_manager.world:
player_transform = self.vr_manager.hmd_anchor.getMat(self.vr_manager.world.render)
# 备选使用tracking_space的世界变换
elif self.vr_manager.tracking_space and hasattr(self.vr_manager, 'world') and self.vr_manager.world:
player_transform = self.vr_manager.tracking_space.getMat(self.vr_manager.world.render)
if player_transform:
# 从变换矩阵提取前向向量
forward = Vec3(player_transform.getRow3(1)) # Y轴 = 前向
# 确保向量在水平面上
forward.z = 0
if forward.length() > 0:
forward.normalize()
return forward
# 最终备选方案:默认前向
print("⚠️ 无法获取任何朝向,使用默认方向")
return Vec3(0, 1, 0)
def _execute_teleport(self, controller):
"""执行传送
Args:
controller: VR控制器实例
"""
if self.teleport_system and self.teleport_preview_active:
success = self.teleport_system.execute_teleport()
if success:
# 传送成功,触发震动反馈
controller.trigger_haptic_feedback(0.005, 0.8)
else:
# 传送失败,触发不同的震动反馈
controller.trigger_haptic_feedback(0.001, 0.2)
# 停止传送预览
self.teleport_system.stop_teleport_preview()
self.teleport_preview_active = False
self.active_teleport_controller = None
def set_turning_mode(self, smooth=True):
"""设置转向模式
Args:
smooth: True为平滑转向False为分段转向
"""
self.smooth_turning = smooth
print(f"✓ 转向模式设置为: {'平滑转向' if smooth else '分段转向'}")
def set_turn_sensitivity(self, sensitivity):
"""设置转向灵敏度
Args:
sensitivity: 转向灵敏度(度/秒)
"""
self.turn_sensitivity = max(10.0, min(180.0, sensitivity))
print(f"✓ 转向灵敏度设置为: {self.turn_sensitivity}度/秒")
def apply_config(self, config):
"""应用配置
Args:
config: VRJoystickConfig配置实例
"""
try:
self.deadzone = config.deadzone
self.turn_threshold = config.turn_threshold
self.teleport_threshold = config.teleport_threshold
self.turn_sensitivity = config.turn_sensitivity
self.smooth_turning = (config.turn_mode.value == 'smooth')
self.snap_turn_angle = config.snap_turn_angle
self.snap_turn_cooldown = config.snap_turn_cooldown
# 应用传送系统配置
if self.teleport_system and hasattr(config, 'teleport_range'):
self.teleport_system.teleport_range = config.teleport_range
if hasattr(config, 'teleport_arc_resolution'):
self.teleport_system.arc_resolution = config.teleport_arc_resolution
if hasattr(config, 'teleport_initial_velocity'):
self.teleport_system.initial_velocity = config.teleport_initial_velocity
if hasattr(config, 'min_teleport_distance'):
self.teleport_system.min_teleport_distance = config.min_teleport_distance
print("✅ 摇杆配置已成功应用")
except Exception as e:
print(f"⚠️ 应用配置失败: {e}")
def get_current_config(self):
"""获取当前配置
Returns:
dict: 当前配置参数
"""
return {
'deadzone': self.deadzone,
'turn_threshold': self.turn_threshold,
'teleport_threshold': self.teleport_threshold,
'turn_sensitivity': self.turn_sensitivity,
'smooth_turning': self.smooth_turning,
'snap_turn_angle': self.snap_turn_angle,
'snap_turn_cooldown': self.snap_turn_cooldown
}
def _print_debug_status(self):
"""打印调试状态信息"""
try:
print("🔍 ======= VR摇杆调试状态 =======")
# 控制器连接状态
left_connected = self.vr_manager.left_controller is not None
right_connected = self.vr_manager.right_controller is not None
print(f"📱 控制器状态: 左手={left_connected}, 右手={right_connected}")
# 检查控制器属性
if left_connected:
left_ctrl = self.vr_manager.left_controller
has_joystick = hasattr(left_ctrl, 'joystick_pos')
has_touchpad = hasattr(left_ctrl, 'touchpad_pos')
print(f"📊 左手控制器: joystick_pos={has_joystick}, touchpad_pos={has_touchpad}")
if has_joystick and left_ctrl.joystick_pos:
pos = left_ctrl.joystick_pos
print(f" 当前摇杆位置: ({pos.x:.3f}, {pos.y:.3f}, {pos.z:.3f})")
if has_touchpad and left_ctrl.touchpad_pos:
pos = left_ctrl.touchpad_pos
print(f" 当前触摸板位置: ({pos.x:.3f}, {pos.y:.3f}, {pos.z:.3f})")
if right_connected:
right_ctrl = self.vr_manager.right_controller
has_joystick = hasattr(right_ctrl, 'joystick_pos')
has_touchpad = hasattr(right_ctrl, 'touchpad_pos')
print(f"📊 右手控制器: joystick_pos={has_joystick}, touchpad_pos={has_touchpad}")
if has_joystick and right_ctrl.joystick_pos:
pos = right_ctrl.joystick_pos
print(f" 当前摇杆位置: ({pos.x:.3f}, {pos.y:.3f}, {pos.z:.3f})")
if has_touchpad and right_ctrl.touchpad_pos:
pos = right_ctrl.touchpad_pos
print(f" 当前触摸板位置: ({pos.x:.3f}, {pos.y:.3f}, {pos.z:.3f})")
# 摇杆配置
print(f"⚙️ 摇杆配置:")
print(f" 死区: {self.deadzone}")
print(f" 转向阈值: {self.turn_threshold}")
print(f" 传送阈值: {self.teleport_threshold}")
print(f" 转向模式: {'平滑' if self.smooth_turning else '分段'}")
# 动作系统状态
action_mgr_available = (self.vr_manager.action_manager is not None)
print(f"🎯 动作系统: {'可用' if action_mgr_available else '不可用'}")
# 传送系统状态
teleport_available = (self.teleport_system is not None)
print(f"🚀 传送系统: {'可用' if teleport_available else '不可用'}")
print("🔍 ==============================")
except Exception as e:
print(f"⚠️ 调试状态输出失败: {e}")
def cleanup(self):
"""清理摇杆系统资源"""
try:
# 停止任何活跃的传送预览
if self.teleport_preview_active and self.teleport_system:
self.teleport_system.stop_teleport_preview()
self.teleport_preview_active = False
self.active_teleport_controller = None
self.ignoreAll()
print("🧹 VR摇杆系统已清理")
except Exception as e:
print(f"⚠️ 清理摇杆系统失败: {e}")
class JoystickState:
"""摇杆状态跟踪类"""
def __init__(self):
self.current_input = Vec2(0, 0) # 当前摇杆输入
self.previous_input = Vec2(0, 0) # 上一帧摇杆输入
self.teleport_active = False # 传送是否激活
self.teleport_just_started = True # 传送是否刚开始
def update(self, new_input):
"""更新摇杆状态
Args:
new_input: 新的摇杆输入
"""
self.previous_input = Vec2(self.current_input)
self.current_input = Vec2(new_input)
def is_input_changed(self, threshold=0.01):
"""检查输入是否发生变化
Args:
threshold: 变化阈值
Returns:
bool: 是否发生变化
"""
diff = self.current_input - self.previous_input
return diff.length() > threshold