EG/plugins/user/fluid_simulation/fluids/fluid_dynamics.py
2025-10-30 11:46:41 +08:00

707 lines
22 KiB
Python

"""
流体动力学模块
负责实现流体的基本物理模拟和动力学计算
"""
import time
from typing import Dict, Any, List, Optional, Tuple
import math
import numpy as np
class FluidDynamics:
"""
流体动力学模拟器
负责实现流体的基本物理模拟和动力学计算
"""
def __init__(self, plugin):
"""
初始化流体动力学模拟器
Args:
plugin: 水体和流体模拟插件实例
"""
self.plugin = plugin
self.enabled = False
self.initialized = False
# 流体网格配置
self.grid_config = {
'resolution_x': 64,
'resolution_y': 64,
'resolution_z': 64,
'cell_size': 1.0, # 网格单元大小
'boundary_condition': 'closed' # 边界条件
}
# 物理参数
self.physics_params = {
'gravity': 9.81, # 重力加速度 (m/s²)
'viscosity': 0.001, # 动力粘度 (Pa·s)
'density': 1000.0, # 密度 (kg/m³)
'time_step': 0.016, # 时间步长 (秒)
'solver_iterations': 10 # 求解器迭代次数
}
# 流体状态场
self.velocity_field = None # 速度场
self.pressure_field = None # 压力场
self.density_field = None # 密度场
self.temperature_field = None # 温度场
# 粒子系统
self.fluid_particles = []
self.particle_counter = 0
self.max_particles = 10000
# 边界条件
self.boundary_conditions = {
'left': 'closed',
'right': 'closed',
'top': 'closed',
'bottom': 'closed',
'front': 'closed',
'back': 'closed'
}
# 外力影响
self.external_forces = {
'gravity': (0.0, -9.81, 0.0),
'wind': (0.0, 0.0, 0.0),
'buoyancy': (0.0, 0.0, 0.0)
}
# 模拟统计
self.simulation_stats = {
'steps_completed': 0,
'particles_updated': 0,
'solver_iterations': 0,
'collision_count': 0
}
# 性能优化
self.optimization_settings = {
'adaptive_timestep': True,
'spatial_partitioning': True,
'particle_culling': True,
'lod_simulation': True
}
print("✓ 流体动力学模拟器已创建")
def initialize(self) -> bool:
"""
初始化流体动力学模拟器
Returns:
是否初始化成功
"""
try:
# 初始化流体场
self._initialize_fluid_fields()
# 初始化粒子系统
self._initialize_particle_system()
self.initialized = True
print("✓ 流体动力学模拟器初始化完成")
return True
except Exception as e:
print(f"✗ 流体动力学模拟器初始化失败: {e}")
import traceback
traceback.print_exc()
return False
def enable(self) -> bool:
"""
启用流体动力学模拟器
Returns:
是否启用成功
"""
try:
if not self.initialized:
print("✗ 流体动力学模拟器未初始化")
return False
self.enabled = True
print("✓ 流体动力学模拟器已启用")
return True
except Exception as e:
print(f"✗ 流体动力学模拟器启用失败: {e}")
import traceback
traceback.print_exc()
return False
def disable(self):
"""禁用流体动力学模拟器"""
try:
self.enabled = False
print("✓ 流体动力学模拟器已禁用")
except Exception as e:
print(f"✗ 流体动力学模拟器禁用失败: {e}")
import traceback
traceback.print_exc()
def finalize(self):
"""清理流体动力学模拟器资源"""
try:
self.disable()
self._clear_fluid_fields()
self.fluid_particles.clear()
self.initialized = False
print("✓ 流体动力学模拟器资源已清理")
except Exception as e:
print(f"✗ 流体动力学模拟器资源清理失败: {e}")
import traceback
traceback.print_exc()
def update(self, dt: float):
"""
更新流体动力学模拟器状态
Args:
dt: 时间增量
"""
try:
if not self.enabled:
return
# 根据实际时间步长调整模拟步长
actual_dt = dt * self.plugin.fluid_manager.simulation_config['simulation_speed']
# 执行流体模拟步骤
self._simulate_fluid_dynamics(actual_dt)
# 更新统计信息
self.simulation_stats['steps_completed'] += 1
except Exception as e:
print(f"✗ 流体动力学模拟器更新失败: {e}")
import traceback
traceback.print_exc()
def _initialize_fluid_fields(self):
"""初始化流体场"""
try:
res_x = self.grid_config['resolution_x']
res_y = self.grid_config['resolution_y']
res_z = self.grid_config['resolution_z']
# 初始化速度场 (每个维度一个分量)
self.velocity_field = {
'u': np.zeros((res_x, res_y, res_z)), # x方向速度
'v': np.zeros((res_x, res_y, res_z)), # y方向速度
'w': np.zeros((res_x, res_y, res_z)) # z方向速度
}
# 初始化压力场
self.pressure_field = np.zeros((res_x, res_y, res_z))
# 初始化密度场
self.density_field = np.full((res_x, res_y, res_z), self.physics_params['density'])
# 初始化温度场
self.temperature_field = np.full((res_x, res_y, res_z), 20.0) # 20°C
print("✓ 流体场初始化完成")
except Exception as e:
print(f"✗ 流体场初始化失败: {e}")
raise
def _clear_fluid_fields(self):
"""清理流体场"""
try:
if self.velocity_field:
for component in self.velocity_field.values():
component.fill(0)
self.velocity_field = None
if self.pressure_field is not None:
self.pressure_field.fill(0)
self.pressure_field = None
if self.density_field is not None:
self.density_field.fill(0)
self.density_field = None
if self.temperature_field is not None:
self.temperature_field.fill(0)
self.temperature_field = None
except Exception as e:
print(f"✗ 流体场清理失败: {e}")
def _initialize_particle_system(self):
"""初始化粒子系统"""
try:
# 预分配粒子数组
self.fluid_particles = []
self.particle_counter = 0
print("✓ 粒子系统初始化完成")
except Exception as e:
print(f"✗ 粒子系统初始化失败: {e}")
raise
def _simulate_fluid_dynamics(self, dt: float):
"""
执行流体动力学模拟
Args:
dt: 时间步长
"""
try:
# 应用外力
self._apply_external_forces(dt)
# 更新速度场
self._update_velocity_field(dt)
# 求解压力场 (确保不可压缩性)
self._solve_pressure_field()
# 更新粒子
self._update_fluid_particles(dt)
# 处理边界条件
self._apply_boundary_conditions()
except Exception as e:
print(f"✗ 流体动力学模拟失败: {e}")
def _apply_external_forces(self, dt: float):
"""
应用外力到流体
Args:
dt: 时间步长
"""
try:
if not self.velocity_field:
return
res_x, res_y, res_z = self.velocity_field['u'].shape
# 应用重力
gravity = self.external_forces['gravity']
self.velocity_field['v'] += gravity[1] * dt
# 应用风力
wind = self.external_forces['wind']
self.velocity_field['u'] += wind[0] * dt
self.velocity_field['w'] += wind[2] * dt
# 应用浮力 (基于温度和密度差异)
buoyancy = self.external_forces['buoyancy']
self.velocity_field['v'] += buoyancy[1] * dt
except Exception as e:
print(f"✗ 外力应用失败: {e}")
def _update_velocity_field(self, dt: float):
"""
更新速度场
Args:
dt: 时间步长
"""
try:
if not self.velocity_field:
return
# 添加粘性扩散项
self._apply_viscosity(dt)
# 添加对流项
self._apply_advection(dt)
except Exception as e:
print(f"✗ 速度场更新失败: {e}")
def _apply_viscosity(self, dt: float):
"""
应用粘性扩散
Args:
dt: 时间步长
"""
try:
if not self.velocity_field:
return
viscosity = self.physics_params['viscosity']
cell_size = self.grid_config['cell_size']
# 简化的粘性扩散计算 (拉普拉斯算子)
# 在实际实现中,这会是一个更复杂的计算
pass # 简化处理,实际实现中会计算粘性扩散
except Exception as e:
print(f"✗ 粘性扩散应用失败: {e}")
def _apply_advection(self, dt: float):
"""
应用对流项
Args:
dt: 时间步长
"""
try:
if not self.velocity_field:
return
# 简化的对流计算
# 在实际实现中,这会使用半拉格朗日方法追踪粒子轨迹
pass # 简化处理,实际实现中会计算对流
except Exception as e:
print(f"✗ 对流项应用失败: {e}")
def _solve_pressure_field(self):
"""求解压力场"""
try:
# 使用投影方法确保不可压缩性
# 在实际实现中,这会使用泊松方程求解器
self.simulation_stats['solver_iterations'] += self.physics_params['solver_iterations']
except Exception as e:
print(f"✗ 压力场求解失败: {e}")
def _update_fluid_particles(self, dt: float):
"""
更新流体粒子
Args:
dt: 时间步长
"""
try:
updated_count = 0
for particle in self.fluid_particles:
if particle['active']:
# 更新粒子位置
velocity = particle['velocity']
particle['position'] = (
particle['position'][0] + velocity[0] * dt,
particle['position'][1] + velocity[1] * dt,
particle['position'][2] + velocity[2] * dt
)
# 应用重力
particle['velocity'] = (
velocity[0],
velocity[1] - self.physics_params['gravity'] * dt,
velocity[2]
)
updated_count += 1
self.simulation_stats['particles_updated'] += updated_count
except Exception as e:
print(f"✗ 流体粒子更新失败: {e}")
def _apply_boundary_conditions(self):
"""应用边界条件"""
try:
# 根据边界条件设置边界值
# 在实际实现中,这会处理各种边界条件
pass
except Exception as e:
print(f"✗ 边界条件应用失败: {e}")
def add_fluid_particle(self, position: Tuple[float, float, float],
velocity: Tuple[float, float, float] = (0.0, 0.0, 0.0),
density: float = None) -> int:
"""
添加流体粒子
Args:
position: 粒子位置 (x, y, z)
velocity: 粒子速度 (vx, vy, vz)
density: 粒子密度
Returns:
粒子ID
"""
try:
if len(self.fluid_particles) >= self.max_particles:
print("✗ 粒子数量已达上限")
return -1
if density is None:
density = self.physics_params['density']
particle_id = self.particle_counter
self.particle_counter += 1
particle = {
'id': particle_id,
'position': position,
'velocity': velocity,
'density': density,
'temperature': 20.0,
'active': True,
'creation_time': time.time()
}
self.fluid_particles.append(particle)
return particle_id
except Exception as e:
print(f"✗ 流体粒子添加失败: {e}")
return -1
def remove_fluid_particle(self, particle_id: int) -> bool:
"""
移除流体粒子
Args:
particle_id: 粒子ID
Returns:
是否移除成功
"""
try:
for i, particle in enumerate(self.fluid_particles):
if particle['id'] == particle_id:
self.fluid_particles.pop(i)
return True
print(f"✗ 粒子不存在: {particle_id}")
return False
except Exception as e:
print(f"✗ 流体粒子移除失败: {e}")
return False
def get_fluid_particle(self, particle_id: int) -> Optional[Dict[str, Any]]:
"""
获取流体粒子信息
Args:
particle_id: 粒子ID
Returns:
粒子信息或None
"""
try:
for particle in self.fluid_particles:
if particle['id'] == particle_id:
return particle.copy()
return None
except Exception as e:
print(f"✗ 流体粒子获取失败: {e}")
return None
def set_external_force(self, force_name: str, force_vector: Tuple[float, float, float]):
"""
设置外力
Args:
force_name: 力的名称
force_vector: 力向量 (fx, fy, fz)
"""
try:
if force_name in self.external_forces:
self.external_forces[force_name] = force_vector
print(f"✓ 外力 '{force_name}' 已设置: {force_vector}")
else:
print(f"✗ 无效的外力名称: {force_name}")
except Exception as e:
print(f"✗ 外力设置失败: {e}")
def get_external_forces(self) -> Dict[str, Tuple[float, float, float]]:
"""
获取所有外力
Returns:
外力字典
"""
return self.external_forces.copy()
def set_physics_parameters(self, params: Dict[str, float]):
"""
设置物理参数
Args:
params: 物理参数字典
"""
try:
self.physics_params.update(params)
print(f"✓ 物理参数已更新: {self.physics_params}")
except Exception as e:
print(f"✗ 物理参数设置失败: {e}")
def get_physics_parameters(self) -> Dict[str, float]:
"""
获取物理参数
Returns:
物理参数字典
"""
return self.physics_params.copy()
def set_grid_configuration(self, config: Dict[str, Any]):
"""
设置网格配置
Args:
config: 网格配置字典
"""
try:
old_resolution = (
self.grid_config['resolution_x'],
self.grid_config['resolution_y'],
self.grid_config['resolution_z']
)
self.grid_config.update(config)
new_resolution = (
self.grid_config['resolution_x'],
self.grid_config['resolution_y'],
self.grid_config['resolution_z']
)
# 如果分辨率改变,重新初始化流体场
if old_resolution != new_resolution:
self._initialize_fluid_fields()
print(f"✓ 网格配置已更新: {self.grid_config}")
except Exception as e:
print(f"✗ 网格配置设置失败: {e}")
def get_grid_configuration(self) -> Dict[str, Any]:
"""
获取网格配置
Returns:
网格配置字典
"""
return self.grid_config.copy()
def get_simulation_stats(self) -> Dict[str, int]:
"""
获取模拟统计信息
Returns:
统计信息字典
"""
return self.simulation_stats.copy()
def reset_simulation_stats(self):
"""重置模拟统计信息"""
try:
self.simulation_stats = {
'steps_completed': 0,
'particles_updated': 0,
'solver_iterations': 0,
'collision_count': 0
}
print("✓ 模拟统计信息已重置")
except Exception as e:
print(f"✗ 模拟统计信息重置失败: {e}")
def get_fluid_velocity_at(self, x: int, y: int, z: int) -> Optional[Tuple[float, float, float]]:
"""
获取指定位置的流体速度
Args:
x, y, z: 网格坐标
Returns:
速度向量 (vx, vy, vz) 或 None
"""
try:
if not self.velocity_field:
return None
shape = self.velocity_field['u'].shape
if 0 <= x < shape[0] and 0 <= y < shape[1] and 0 <= z < shape[2]:
vx = self.velocity_field['u'][x, y, z]
vy = self.velocity_field['v'][x, y, z]
vz = self.velocity_field['w'][x, y, z]
return (float(vx), float(vy), float(vz))
return None
except Exception as e:
print(f"✗ 速度获取失败: {e}")
return None
def set_fluid_velocity_at(self, x: int, y: int, z: int, velocity: Tuple[float, float, float]):
"""
设置指定位置的流体速度
Args:
x, y, z: 网格坐标
velocity: 速度向量 (vx, vy, vz)
"""
try:
if not self.velocity_field:
return
shape = self.velocity_field['u'].shape
if 0 <= x < shape[0] and 0 <= y < shape[1] and 0 <= z < shape[2]:
self.velocity_field['u'][x, y, z] = velocity[0]
self.velocity_field['v'][x, y, z] = velocity[1]
self.velocity_field['w'][x, y, z] = velocity[2]
except Exception as e:
print(f"✗ 速度设置失败: {e}")
def calculate_cfl_condition(self) -> float:
"""
计算CFL条件(柯朗-弗里德里希-李维条件)
Returns:
CFL数
"""
try:
if not self.velocity_field:
return 0.0
max_velocity = 0.0
# 计算最大速度
u_max = np.max(np.abs(self.velocity_field['u']))
v_max = np.max(np.abs(self.velocity_field['v']))
w_max = np.max(np.abs(self.velocity_field['w']))
max_velocity = max(u_max, v_max, w_max)
cell_size = self.grid_config['cell_size']
dt = self.physics_params['time_step']
# CFL = (v * dt) / dx
cfl = (max_velocity * dt) / cell_size if cell_size > 0 else 0.0
return cfl
except Exception as e:
print(f"✗ CFL条件计算失败: {e}")
return 0.0
def is_stable(self) -> bool:
"""
检查模拟是否稳定
Returns:
是否稳定
"""
try:
# CFL条件应小于等于1以保证稳定性
cfl = self.calculate_cfl_condition()
return cfl <= 1.0
except Exception as e:
print(f"✗ 稳定性检查失败: {e}")
return False