1
0
forked from Rowland/EG
EG/core/alvr_streamer.py
2025-08-13 09:30:16 +08:00

509 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ALVR串流处理器
负责与ALVR服务器通信和视频流传输
支持Quest等VR头显的无线串流
"""
import socket
import struct
import threading
import json
import time
import subprocess
import psutil
from direct.showbase.DirectObject import DirectObject
from panda3d.core import Texture, PNMImage
class ALVRStreamer(DirectObject):
"""ALVR串流处理器"""
def __init__(self, world, vr_manager):
super().__init__()
self.world = world
self.vr_manager = vr_manager
# ALVR服务器配置
self.alvr_server_ip = "127.0.0.1"
self.alvr_server_port = 9943
self.alvr_streaming_port = 9944
# 连接状态
self.connected = False
self.streaming = False
self.server_socket = None
self.streaming_socket = None
# 流媒体配置
self.stream_width = 2880 # Quest 2 推荐分辨率
self.stream_height = 1700
self.stream_fps = 72
self.bitrate = 150 # Mbps
self.codec = "h264"
# 线程管理
self.connection_thread = None
self.streaming_thread = None
self.running = False
# 性能统计
self.frame_count = 0
self.last_fps_time = time.time()
self.current_fps = 0
self.latency = 0
print("✓ ALVR串流处理器初始化完成")
def initialize(self):
"""初始化ALVR串流"""
try:
# 检查ALVR服务器是否运行
if not self._check_alvr_server():
print("ALVR服务器未运行尝试启动...")
if not self._start_alvr_server():
print("无法启动ALVR服务器")
return False
# 连接到ALVR服务器
if not self._connect_to_server():
print("无法连接到ALVR服务器")
return False
# 配置流媒体设置
self._configure_streaming()
# 启动串流线程
self._start_streaming_threads()
print("✓ ALVR串流初始化成功")
return True
except Exception as e:
print(f"ALVR初始化错误: {str(e)}")
return False
def _check_alvr_server(self):
"""检查ALVR服务器是否运行"""
try:
# 检查进程
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
if 'alvr' in proc.info['name'].lower():
return True
# 尝试连接端口
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((self.alvr_server_ip, self.alvr_server_port))
sock.close()
return result == 0
except Exception as e:
print(f"检查ALVR服务器错误: {str(e)}")
return False
def _start_alvr_server(self):
"""启动ALVR服务器"""
try:
# 尝试启动ALVR服务器
# 这里需要根据实际的ALVR安装路径调整
alvr_paths = [
"/usr/local/bin/alvr_server",
"/usr/bin/alvr_server",
"C:/Program Files/ALVR/alvr_server.exe",
"C:/ALVR/alvr_server.exe"
]
for path in alvr_paths:
try:
subprocess.Popen([path], shell=True)
time.sleep(3) # 等待服务器启动
if self._check_alvr_server():
return True
except FileNotFoundError:
continue
return False
except Exception as e:
print(f"启动ALVR服务器错误: {str(e)}")
return False
def _connect_to_server(self):
"""连接到ALVR服务器"""
try:
# 创建TCP连接
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.settimeout(5)
self.server_socket.connect((self.alvr_server_ip, self.alvr_server_port))
# 发送握手消息
handshake_data = {
"type": "handshake",
"client_name": "Panda3D_VR_Engine",
"version": "1.0",
"capabilities": {
"video": True,
"audio": True,
"tracking": True,
"haptics": True
}
}
self._send_message(handshake_data)
# 接收响应
response = self._receive_message()
if response and response.get("type") == "handshake_response":
self.connected = True
print("✓ 已连接到ALVR服务器")
return True
return False
except Exception as e:
print(f"连接ALVR服务器错误: {str(e)}")
return False
def _send_message(self, data):
"""发送消息到ALVR服务器"""
try:
message = json.dumps(data).encode('utf-8')
length = struct.pack('<I', len(message))
self.server_socket.send(length + message)
except Exception as e:
print(f"发送消息错误: {str(e)}")
def _receive_message(self):
"""从ALVR服务器接收消息"""
try:
# 接收消息长度
length_data = self.server_socket.recv(4)
if not length_data:
return None
length = struct.unpack('<I', length_data)[0]
# 接收消息内容
message_data = b''
while len(message_data) < length:
chunk = self.server_socket.recv(length - len(message_data))
if not chunk:
return None
message_data += chunk
return json.loads(message_data.decode('utf-8'))
except Exception as e:
print(f"接收消息错误: {str(e)}")
return None
def _configure_streaming(self):
"""配置流媒体设置"""
try:
# 发送流媒体配置
config_data = {
"type": "stream_config",
"video": {
"width": self.stream_width,
"height": self.stream_height,
"fps": self.stream_fps,
"bitrate": self.bitrate,
"codec": self.codec
},
"audio": {
"enabled": True,
"sample_rate": 48000,
"channels": 2
}
}
self._send_message(config_data)
# 接收配置响应
response = self._receive_message()
if response and response.get("type") == "config_response":
if response.get("status") == "success":
print("✓ 流媒体配置成功")
return True
return False
except Exception as e:
print(f"配置流媒体错误: {str(e)}")
return False
def _start_streaming_threads(self):
"""启动串流线程"""
self.running = True
# 启动连接管理线程
self.connection_thread = threading.Thread(target=self._connection_handler)
self.connection_thread.daemon = True
self.connection_thread.start()
# 启动流媒体线程
self.streaming_thread = threading.Thread(target=self._streaming_handler)
self.streaming_thread.daemon = True
self.streaming_thread.start()
def _connection_handler(self):
"""连接处理线程"""
while self.running:
try:
if self.connected:
# 发送心跳
heartbeat = {"type": "heartbeat", "timestamp": time.time()}
self._send_message(heartbeat)
# 接收消息
response = self._receive_message()
if response:
self._handle_server_message(response)
time.sleep(0.1)
except Exception as e:
print(f"连接处理错误: {str(e)}")
self.connected = False
time.sleep(1)
def _streaming_handler(self):
"""流媒体处理线程"""
while self.running:
try:
if self.connected and self.streaming:
# 获取VR渲染帧
frame_data = self._get_vr_frame()
if frame_data:
# 发送帧数据
self._send_frame(frame_data)
# 更新性能统计
self._update_performance_stats()
time.sleep(1.0 / self.stream_fps)
except Exception as e:
print(f"流媒体处理错误: {str(e)}")
time.sleep(0.1)
def _handle_server_message(self, message):
"""处理服务器消息"""
msg_type = message.get("type")
if msg_type == "start_streaming":
self.streaming = True
print("✓ 开始VR串流")
elif msg_type == "stop_streaming":
self.streaming = False
print("✓ 停止VR串流")
elif msg_type == "client_connected":
print(f"✓ VR客户端已连接: {message.get('client_info', {})}")
elif msg_type == "client_disconnected":
print("✓ VR客户端已断开")
elif msg_type == "tracking_data":
self._handle_tracking_data(message.get("data"))
elif msg_type == "haptic_feedback":
self._handle_haptic_feedback(message.get("data"))
def _handle_tracking_data(self, tracking_data):
"""处理跟踪数据"""
if not tracking_data:
return
# 更新VR管理器的跟踪数据
# 这里可以处理从ALVR客户端发送的跟踪数据
pass
def _handle_haptic_feedback(self, haptic_data):
"""处理触觉反馈"""
if not haptic_data:
return
# 处理触觉反馈请求
# 这里可以控制VR控制器的震动等
pass
def _get_vr_frame(self):
"""获取VR渲染帧"""
try:
if not self.vr_manager.is_vr_enabled():
return None
# 获取左右眼纹理
left_texture = self.vr_manager.eye_textures.get('left')
right_texture = self.vr_manager.eye_textures.get('right')
if not left_texture or not right_texture:
return None
# 合成立体帧
frame_data = self._compose_stereo_frame(left_texture, right_texture)
return frame_data
except Exception as e:
print(f"获取VR帧错误: {str(e)}")
return None
def _compose_stereo_frame(self, left_texture, right_texture):
"""合成立体帧"""
try:
# 创建组合图像
combined_image = PNMImage(self.stream_width, self.stream_height)
# 获取左右眼图像
left_image = PNMImage()
right_image = PNMImage()
if left_texture.store(left_image) and right_texture.store(right_image):
# 将左右眼图像合并Side-by-Side布局
left_width = self.stream_width // 2
# 缩放左眼图像到左半部分
left_scaled = PNMImage(left_width, self.stream_height)
left_scaled.quickFilterFrom(left_image)
combined_image.copySubImage(left_scaled, 0, 0)
# 缩放右眼图像到右半部分
right_scaled = PNMImage(left_width, self.stream_height)
right_scaled.quickFilterFrom(right_image)
combined_image.copySubImage(right_scaled, left_width, 0)
# 转换为字节数据
return combined_image.makeRamImage()
return None
except Exception as e:
print(f"合成立体帧错误: {str(e)}")
return None
def _send_frame(self, frame_data):
"""发送帧数据"""
try:
if not self.server_socket:
return
# 创建帧消息
frame_message = {
"type": "video_frame",
"timestamp": time.time(),
"width": self.stream_width,
"height": self.stream_height,
"format": "rgb",
"data": frame_data.hex() # 转换为十六进制字符串
}
self._send_message(frame_message)
except Exception as e:
print(f"发送帧错误: {str(e)}")
def _update_performance_stats(self):
"""更新性能统计"""
self.frame_count += 1
current_time = time.time()
if current_time - self.last_fps_time >= 1.0:
self.current_fps = self.frame_count
self.frame_count = 0
self.last_fps_time = current_time
def start_streaming(self):
"""开始串流"""
if not self.connected:
print("未连接到ALVR服务器")
return False
start_message = {"type": "start_streaming"}
self._send_message(start_message)
return True
def stop_streaming(self):
"""停止串流"""
if not self.connected:
return
stop_message = {"type": "stop_streaming"}
self._send_message(stop_message)
self.streaming = False
def send_haptic_feedback(self, controller_id, duration, intensity):
"""发送触觉反馈"""
if not self.connected:
return
haptic_message = {
"type": "haptic_feedback",
"controller_id": controller_id,
"duration": duration,
"intensity": intensity
}
self._send_message(haptic_message)
def get_streaming_status(self):
"""获取串流状态"""
return {
"connected": self.connected,
"streaming": self.streaming,
"fps": self.current_fps,
"latency": self.latency,
"resolution": f"{self.stream_width}x{self.stream_height}",
"bitrate": self.bitrate
}
def set_stream_quality(self, width, height, fps, bitrate):
"""设置串流质量"""
self.stream_width = width
self.stream_height = height
self.stream_fps = fps
self.bitrate = bitrate
# 如果正在串流,重新配置
if self.streaming:
self._configure_streaming()
def shutdown(self):
"""关闭串流"""
print("关闭ALVR串流...")
self.running = False
self.streaming = False
# 发送断开消息
if self.connected:
disconnect_message = {"type": "disconnect"}
self._send_message(disconnect_message)
# 关闭连接
if self.server_socket:
self.server_socket.close()
if self.streaming_socket:
self.streaming_socket.close()
# 等待线程结束
if self.connection_thread:
self.connection_thread.join(timeout=2)
if self.streaming_thread:
self.streaming_thread.join(timeout=2)
self.connected = False
print("✓ ALVR串流已关闭")
def is_connected(self):
"""检查是否连接"""
return self.connected
def is_streaming(self):
"""检查是否在串流"""
return self.streaming