EG/demo/video_integration.py
2025-07-02 09:49:59 +08:00

444 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频播放集成模块
用于在现有的Panda3D编辑器中添加视频播放功能
"""
from panda3d.core import (
MovieTexture, CardMaker, Texture, PNMImage,
TextureStage, NodePath, Vec3
)
import os
import math
class VideoManager:
"""视频管理器 - 集成到现有的MyWorld类中"""
def __init__(self, world_instance):
self.world = world_instance
self.video_objects = [] # 存储所有视频对象
self.supported_formats = ['.mp4', '.avi', '.mov', '.mkv', '.webm']
def create_video_screen(self, pos=(0, 0, 0), size=(4, 3), video_path=None, name="video_screen"):
"""创建视频屏幕"""
try:
# 创建屏幕几何体
cm = CardMaker(f"video_screen_{len(self.video_objects)}")
cm.setFrame(-size[0]/2, size[0]/2, -size[1]/2, size[1]/2)
video_screen = self.world.render.attachNewNode(cm.generate())
video_screen.setPos(*pos)
video_screen.setName(name)
# 创建视频纹理
if video_path and os.path.exists(video_path):
movie_texture = MovieTexture(name)
success = movie_texture.read(video_path)
if success:
video_screen.setTexture(movie_texture)
# 添加视频对象到管理器
video_obj = {
'node': video_screen,
'texture': movie_texture,
'path': video_path,
'name': name,
'type': 'movie',
'is_playing': False
}
# 播放视频
movie_texture.play()
video_obj['is_playing'] = True
self.video_objects.append(video_obj)
# 添加到模型列表和场景树
self.world.models.append(video_screen)
self.world.updateSceneTree()
print(f"✓ 视频屏幕创建成功: {name}")
return video_screen
else:
print(f"✗ 无法加载视频: {video_path}")
# 创建占位符纹理
self._create_placeholder_texture(video_screen, name)
else:
# 创建程序化动画纹理作为演示
self._create_animated_texture(video_screen, name)
return video_screen
except Exception as e:
print(f"创建视频屏幕失败: {str(e)}")
return None
def create_video_billboard(self, pos=(0, 0, 0), size=(2, 1.5), video_path=None, name="video_billboard"):
"""创建面向相机的视频广告牌"""
try:
video_screen = self.create_video_screen(pos, size, video_path, name)
if video_screen:
# 设置为广告牌模式(总是面向相机)
video_screen.setBillboardAxis()
print(f"✓ 视频广告牌创建成功: {name}")
return video_screen
except Exception as e:
print(f"创建视频广告牌失败: {str(e)}")
return None
def create_spherical_video(self, pos=(0, 0, 0), radius=10, video_path=None, name="spherical_video"):
"""创建球形全景视频"""
try:
# 创建球体几何(使用多个面片模拟)
sphere = self._create_sphere_geometry(radius)
sphere.reparentTo(self.world.render)
sphere.setPos(*pos)
sphere.setName(name)
# 反转法线以便从内部观看
sphere.setTwoSided(True)
sphere.setScale(-1, 1, 1) # 反转X轴以正确显示内部
# 创建视频纹理
if video_path and os.path.exists(video_path):
movie_texture = MovieTexture(name)
success = movie_texture.read(video_path)
if success:
sphere.setTexture(movie_texture)
movie_texture.play()
video_obj = {
'node': sphere,
'texture': movie_texture,
'path': video_path,
'name': name,
'type': 'spherical',
'is_playing': True
}
self.video_objects.append(video_obj)
else:
# 创建程序化全景纹理
self._create_panoramic_texture(sphere, name)
# 添加到模型列表
self.world.models.append(sphere)
self.world.updateSceneTree()
print(f"✓ 球形全景视频创建成功: {name}")
return sphere
except Exception as e:
print(f"创建球形全景视频失败: {str(e)}")
return None
def _create_sphere_geometry(self, radius):
"""创建球体几何"""
# 简化版本:创建立方体作为球体替代
cm = CardMaker("sphere_face")
cm.setFrame(-radius, radius, -radius, radius)
sphere_root = self.world.render.attachNewNode("sphere")
# 创建6个面立方体的6个面
faces = [
(Vec3(0, radius, 0), Vec3(0, 0, 0)), # 前面
(Vec3(0, -radius, 0), Vec3(0, 180, 0)), # 后面
(Vec3(radius, 0, 0), Vec3(0, 90, 0)), # 右面
(Vec3(-radius, 0, 0), Vec3(0, -90, 0)), # 左面
(Vec3(0, 0, radius), Vec3(-90, 0, 0)), # 上面
(Vec3(0, 0, -radius), Vec3(90, 0, 0)), # 下面
]
for i, (pos, hpr) in enumerate(faces):
face = sphere_root.attachNewNode(cm.generate())
face.setPos(pos)
face.setHpr(hpr)
return sphere_root
def _create_placeholder_texture(self, node, name):
"""创建占位符纹理"""
# 创建简单的渐变纹理
texture = Texture()
texture.setup2dTexture(256, 256, Texture.TUnsignedByte, Texture.FRgba)
img = PNMImage(256, 256, 4)
for y in range(256):
for x in range(256):
# 创建棋盘格图案
if (x // 32 + y // 32) % 2:
img.setXel(x, y, 0.8, 0.8, 0.8)
else:
img.setXel(x, y, 0.3, 0.3, 0.3)
img.setAlpha(x, y, 1.0)
texture.load(img)
node.setTexture(texture)
# 添加到视频对象列表
video_obj = {
'node': node,
'texture': texture,
'path': None,
'name': name,
'type': 'placeholder',
'is_playing': False
}
self.video_objects.append(video_obj)
def _create_animated_texture(self, node, name):
"""创建动画纹理"""
texture = Texture()
texture.setup2dTexture(256, 256, Texture.TUnsignedByte, Texture.FRgba)
node.setTexture(texture)
# 启动动画更新任务 - 修复参数传递顺序
task_name = f"animate_texture_{name}"
self.world.taskMgr.add(lambda task: self._update_animated_texture(task, texture, name), task_name)
# 添加到视频对象列表
video_obj = {
'node': node,
'texture': texture,
'path': None,
'name': name,
'type': 'animated',
'is_playing': True,
'task_name': task_name
}
self.video_objects.append(video_obj)
def _create_panoramic_texture(self, node, name):
"""创建全景动画纹理"""
texture = Texture()
texture.setup2dTexture(512, 256, Texture.TUnsignedByte, Texture.FRgba)
node.setTexture(texture)
# 启动全景动画更新任务 - 修复参数传递顺序
task_name = f"animate_panoramic_{name}"
self.world.taskMgr.add(lambda task: self._update_panoramic_texture(task, texture, name), task_name)
# 添加到视频对象列表
video_obj = {
'node': node,
'texture': texture,
'path': None,
'name': name,
'type': 'panoramic',
'is_playing': True,
'task_name': task_name
}
self.video_objects.append(video_obj)
def _update_animated_texture(self, task, texture, name):
"""更新动画纹理"""
try:
time = task.time
img = PNMImage(256, 256, 4)
for y in range(256):
for x in range(256):
# 创建波纹效果
dist_x = (x - 128) / 128.0
dist_y = (y - 128) / 128.0
distance = math.sqrt(dist_x**2 + dist_y**2)
wave = math.sin(distance * 8 - time * 3)
r = (wave + 1) * 0.5
g = (math.sin(dist_x * 4 + time * 2) + 1) * 0.5
b = (math.sin(dist_y * 4 + time * 2.5) + 1) * 0.5
img.setXel(x, y, r, g, b)
img.setAlpha(x, y, 1.0)
texture.load(img)
return task.cont
except Exception as e:
print(f"更新动画纹理失败: {str(e)}")
return task.done
def _update_panoramic_texture(self, task, texture, name):
"""更新全景动画纹理"""
try:
time = task.time
img = PNMImage(512, 256, 4)
for y in range(256):
for x in range(512):
# 创建全景旋转效果
u = x / 512.0 # 经度 (0-1)
v = y / 256.0 # 纬度 (0-1)
angle = u * 2 * math.pi + time * 0.5
height = (v - 0.5) * math.pi
r = (math.sin(angle * 3) + 1) * 0.5
g = (math.cos(angle * 2 + height * 4) + 1) * 0.5
b = (math.sin(height * 6 + time) + 1) * 0.5
img.setXel(x, y, r, g, b)
img.setAlpha(x, y, 1.0)
texture.load(img)
return task.cont
except Exception as e:
print(f"更新全景纹理失败: {str(e)}")
return task.done
def play_video(self, video_name):
"""播放指定视频"""
for video_obj in self.video_objects:
if video_obj['name'] == video_name:
if video_obj['type'] == 'movie' and hasattr(video_obj['texture'], 'play'):
video_obj['texture'].play()
video_obj['is_playing'] = True
print(f"播放视频: {video_name}")
elif video_obj['type'] in ['animated', 'panoramic']:
# 重新启动动画任务
if 'task_name' in video_obj:
task_name = video_obj['task_name']
if video_obj['type'] == 'animated':
self.world.taskMgr.add(
lambda task: self._update_animated_texture(task, video_obj['texture'], video_name),
task_name
)
else:
self.world.taskMgr.add(
lambda task: self._update_panoramic_texture(task, video_obj['texture'], video_name),
task_name
)
video_obj['is_playing'] = True
print(f"恢复动画: {video_name}")
break
def pause_video(self, video_name):
"""暂停指定视频"""
for video_obj in self.video_objects:
if video_obj['name'] == video_name:
if video_obj['type'] == 'movie' and hasattr(video_obj['texture'], 'stop'):
video_obj['texture'].stop()
video_obj['is_playing'] = False
print(f"暂停视频: {video_name}")
elif video_obj['type'] in ['animated', 'panoramic']:
# 停止动画任务
if 'task_name' in video_obj:
self.world.taskMgr.remove(video_obj['task_name'])
video_obj['is_playing'] = False
print(f"暂停动画: {video_name}")
break
def stop_video(self, video_name):
"""停止指定视频"""
for video_obj in self.video_objects:
if video_obj['name'] == video_name:
if video_obj['type'] == 'movie':
if hasattr(video_obj['texture'], 'stop'):
video_obj['texture'].stop()
if hasattr(video_obj['texture'], 'setTime'):
video_obj['texture'].setTime(0)
video_obj['is_playing'] = False
print(f"停止视频: {video_name}")
elif video_obj['type'] in ['animated', 'panoramic']:
# 停止动画任务
if 'task_name' in video_obj:
self.world.taskMgr.remove(video_obj['task_name'])
video_obj['is_playing'] = False
print(f"停止动画: {video_name}")
break
def set_video_speed(self, video_name, speed):
"""设置视频播放速度"""
for video_obj in self.video_objects:
if video_obj['name'] == video_name:
if video_obj['type'] == 'movie' and hasattr(video_obj['texture'], 'setPlayRate'):
video_obj['texture'].setPlayRate(speed)
print(f"设置视频 {video_name} 播放速度: {speed}x")
break
def remove_video(self, video_name):
"""移除指定视频"""
for i, video_obj in enumerate(self.video_objects):
if video_obj['name'] == video_name:
# 停止播放
self.stop_video(video_name)
# 从场景中移除节点
if video_obj['node'] in self.world.models:
self.world.models.remove(video_obj['node'])
video_obj['node'].removeNode()
# 从列表中移除
self.video_objects.pop(i)
# 更新场景树
self.world.updateSceneTree()
print(f"移除视频: {video_name}")
break
def get_video_info(self, video_name):
"""获取视频信息"""
for video_obj in self.video_objects:
if video_obj['name'] == video_name:
info = {
'name': video_obj['name'],
'type': video_obj['type'],
'is_playing': video_obj['is_playing'],
'path': video_obj.get('path', 'N/A')
}
if video_obj['type'] == 'movie' and hasattr(video_obj['texture'], 'getVideoLength'):
info['length'] = video_obj['texture'].getVideoLength()
info['current_time'] = video_obj['texture'].getTime()
info['play_rate'] = video_obj['texture'].getPlayRate()
return info
return None
def list_videos(self):
"""列出所有视频"""
videos = []
for video_obj in self.video_objects:
videos.append({
'name': video_obj['name'],
'type': video_obj['type'],
'is_playing': video_obj['is_playing']
})
return videos
def is_video_file(self, filepath):
"""检查文件是否是支持的视频格式"""
if not filepath:
return False
ext = os.path.splitext(filepath)[1].lower()
return ext in self.supported_formats
# 使用示例(集成到现有的 MyWorld 类中):
"""
# 在 MyWorld 类的 __init__ 方法中添加:
self.video_manager = VideoManager(self)
# 在文件拖放处理中添加视频支持:
def dropEvent(self, event):
if event.mimeData().hasUrls():
for url in event.mimeData().urls():
filepath = url.toLocalFile()
if self.world.video_manager.is_video_file(filepath):
# 创建视频屏幕
self.world.video_manager.create_video_screen(
pos=(0, 0, 2),
video_path=filepath,
name=f"video_{len(self.world.video_manager.video_objects)}"
)
elif filepath.lower().endswith(('.egg', '.bam', '.obj', '.fbx')):
self.world.importModel(filepath)
"""