import time from datetime import datetime from panda3d.core import ( CollisionTraverser, CollisionHandlerQueue, CollisionNode, CollisionRay, CollisionSphere, BitMask32, Point3, Vec3 ) from direct.task.TaskManagerGlobal import taskMgr class CollisionManager: """统一的碰撞检测管理器""" def __init__(self, world): self.world = world self.traverser = CollisionTraverser("main_traverser") self.queue = CollisionHandlerQueue() # 碰撞掩码定义 self.MASKS = { # === 基础碰撞掩码定义 === # 每个掩码使用不同的位(bit)来标识,可以进行位运算组合 # 'TERRAIN': BitMask32.bit(0), # 地形/地面 - 通常用于地面碰撞检测 # 'UI_ELEMENT': BitMask32.bit(1), # UI元素 - 界面组件的碰撞检测 # 'CAMERA': BitMask32.bit(2), # 摄像机 - 相机的碰撞检测 'MODEL_COLLISION': BitMask32.bit(6), # 模型碰撞 - 通用模型间碰撞检测 } # 碰撞体形状类型 self.COLLISION_SHAPES = { 'SPHERE': 'sphere', 'BOX': 'box', 'CAPSULE': 'capsule', 'PLANE': 'plane', 'POLYGON': 'polygon', 'MESH': 'mesh' } # 掩码组合定义 self.MASK_COMBINATIONS = { } # 碰撞分组管理 self.collision_groups = {} self.group_enabled = {} # 碰撞回调系统 self.collision_callbacks = {} self.collision_filters = {} # 空间分割系统(八叉树) self.spatial_partitioning_enabled = False self.octree = None self.octree_max_depth = 6 self.octree_max_objects = 10 # 连续碰撞检测 self.ccd_enabled = False self.ccd_threshold = 10.0 # 速度阈值 # 性能监控 self.performance_monitor = CollisionPerformanceMonitor() # 模型间碰撞检测配置 self.model_collision_enabled = False self.collision_detection_frequency = 0.1 self.collision_distance_threshold = 0 self.last_collision_check = 0 self.collision_history = [] self.max_collision_history = 100 # 碰撞状态跟踪 - 避免重复打印 self.active_collisions = set() # 当前活跃的碰撞对 # 模型间碰撞检测任务 self.collision_task = None print("✅ 碰撞检测管理器初始化完成") def enableModelCollisionDetection(self, enable=True, frequency=0.1, threshold=0.5): """启用/禁用模型间碰撞检测 Args: enable: 是否启用碰撞检测 frequency: 检测频率(秒) threshold: 碰撞距离阈值 """ self.model_collision_enabled = enable self.collision_detection_frequency = frequency self.collision_distance_threshold = threshold if enable: print(f"✅ 启用模型间碰撞检测 - 频率: {frequency}s, 阈值: {threshold}") self._startCollisionDetectionTask() else: print("❌ 禁用模型间碰撞检测") self._stopCollisionDetectionTask() def _startCollisionDetectionTask(self): """启动碰撞检测任务""" if self.collision_task: taskMgr.remove(self.collision_task) def collisionDetectionTask(task): try: self.detectModelCollisions() return task.again except Exception as e: print(f"❌ 碰撞检测任务异常: {str(e)}") return task.again self.collision_task = taskMgr.doMethodLater( self.collision_detection_frequency, collisionDetectionTask, "modelCollisionDetection" ) def _stopCollisionDetectionTask(self): """停止碰撞检测任务""" if self.collision_task: taskMgr.remove(self.collision_task) self.collision_task = None def detectModelCollisions(self, specific_models=None, log_results=True): """检测模型间碰撞""" if not self.model_collision_enabled and specific_models is None: return [] start_time = time.perf_counter() current_time = datetime.now() models_to_check = specific_models if specific_models else self.world.models if len(models_to_check) < 2: return [] collision_results = [] checked_pairs = set() current_collision_pairs = set() # 遍历所有模型对 for i, model_a in enumerate(models_to_check): for j, model_b in enumerate(models_to_check[i+1:], i+1): pair_key = tuple(sorted([id(model_a), id(model_b)])) if pair_key in checked_pairs: continue checked_pairs.add(pair_key) collision_info = self._checkModelPairCollision(model_a, model_b, current_time) if collision_info: collision_results.append(collision_info) current_collision_pairs.add(pair_key) # 只有新碰撞才打印日志 if log_results and pair_key not in self.active_collisions: self._logCollisionInfo(collision_info) print(f"🆕 新碰撞检测到!") # 检测碰撞结束的情况 ended_collisions = self.active_collisions - current_collision_pairs for pair_key in ended_collisions: print(f"✅ 碰撞结束: 模型对 {pair_key}") # 更新活跃碰撞状态 self.active_collisions = current_collision_pairs # 记录性能和历史 detection_time = time.perf_counter() - start_time self.performance_monitor.recordModelCollisionDetection( detection_time, len(models_to_check), len(collision_results)) if collision_results: self.collision_history.extend(collision_results) if len(self.collision_history) > self.max_collision_history: self.collision_history = self.collision_history[-self.max_collision_history:] return collision_results def _checkModelPairCollision(self, model_a, model_b, timestamp): """使用Panda3D内置碰撞系统检查两个模型是否碰撞""" try: # 检查模型是否有碰撞节点 collision_node_a = self._findCollisionNode(model_a) collision_node_b = self._findCollisionNode(model_b) if not collision_node_a or not collision_node_b: return None # 使用Panda3D的碰撞遍历器 temp_traverser = CollisionTraverser() temp_queue = CollisionHandlerQueue() # 设置碰撞检测 temp_traverser.addCollider(collision_node_a, temp_queue) # 执行碰撞检测 temp_traverser.traverse(model_b) if temp_queue.getNumEntries() > 0: # 有碰撞,获取碰撞信息 entry = temp_queue.getEntry(0) center_a = model_a.getPos(self.world.render) center_b = model_b.getPos(self.world.render) distance = (center_b - center_a).length() return { 'timestamp': timestamp, 'model_a': { 'name': model_a.getName(), 'center': center_a, 'radius': 0, # 不再需要手动计算半径 'node': model_a }, 'model_b': { 'name': model_b.getName(), 'center': center_b, 'radius': 0, 'node': model_b }, 'collision_point': entry.getSurfacePoint(self.world.render), 'distance': distance, 'penetration_depth': 0, # Panda3D会自动处理 'collision_normal': entry.getSurfaceNormal(self.world.render) } return None except Exception as e: print(f"❌ 检测模型对碰撞失败 ({model_a.getName()} vs {model_b.getName()}): {str(e)}") return None def _findCollisionNode(self, model): """查找模型的碰撞节点""" for child in model.getChildren(): if isinstance(child.node(), CollisionNode): return child return None def _logCollisionInfo(self, collision_info): """输出碰撞信息日志""" timestamp = collision_info['timestamp'].strftime('%H:%M:%S.%f')[:-3] model_a = collision_info['model_a'] model_b = collision_info['model_b'] print(f"🔥 [{timestamp}] 检测到碰撞:") print(f" 模型A: {model_a['name']} (中心: {model_a['center']}, 半径: {model_a['radius']:.2f})") print(f" 模型B: {model_b['name']} (中心: {model_b['center']}, 半径: {model_b['radius']:.2f})") print(f" 碰撞点: {collision_info['collision_point']}") print(f" 中心距离: {collision_info['distance']:.3f}") print(f" 穿透深度: {collision_info['penetration_depth']:.3f}") print(f" 碰撞法向: {collision_info['collision_normal']}") def getCollisionHistory(self, limit=None): """获取碰撞历史记录 Args: limit: 返回记录数量限制 Returns: list: 碰撞历史记录 """ if limit: return self.collision_history[-limit:] return self.collision_history.copy() def clearCollisionHistory(self): """清除碰撞历史记录""" self.collision_history.clear() print("✅ 碰撞历史记录已清除") def getCollisionStatistics(self): """获取碰撞统计信息""" if not self.collision_history: return {"total_collisions": 0, "unique_pairs": 0, "most_frequent_pair": None} # 统计碰撞对 collision_pairs = {} for collision in self.collision_history: pair_key = tuple(sorted([ collision['model_a']['name'], collision['model_b']['name'] ])) collision_pairs[pair_key] = collision_pairs.get(pair_key, 0) + 1 most_frequent_pair = max(collision_pairs.items(), key=lambda x: x[1]) return { "total_collisions": len(self.collision_history), "unique_pairs": len(collision_pairs), "most_frequent_pair": { "models": most_frequent_pair[0], "count": most_frequent_pair[1] }, "collision_pairs": collision_pairs } def createCollisionShape(self, model, shape_type='auto', **kwargs): """为模型创建指定类型的碰撞体 Args: model: 模型节点 shape_type: 碰撞体类型 ('auto', 'sphere', 'box', 'capsule', 'plane', 'polygon') **kwargs: 形状特定参数 Returns: CollisionSolid: 创建的碰撞体 """ from panda3d.core import ( CollisionSphere, CollisionBox, CollisionCapsule, CollisionPlane, CollisionPolygon, Plane, Vec3 ) # 获取考虑变换后的实际尺寸和中心 transformed_info = self._getTransformedModelInfo(model) if not transformed_info: # 默认小球体 return CollisionSphere(Point3(0, 0, 0), 1.0) center = transformed_info['center'] radius = transformed_info['radius'] actual_size = transformed_info['size'] scale_factor = transformed_info['scale_factor'] # 自动选择最适合的形状 if shape_type == 'auto': shape_type = self._determineOptimalShape(model, transformed_info) if shape_type == 'sphere': # 优化球形碰撞体 sphere_radius = kwargs.get('radius', radius) # 支持位置偏移 pos_offset = kwargs.get('position_offset', Vec3(0, 0, 0)) sphere_center = Point3(center.x + pos_offset.x, center.y + pos_offset.y, center.z + pos_offset.z) return CollisionSphere(sphere_center, sphere_radius) elif shape_type == 'box': # 优化盒型碰撞体 - 更精确的尺寸和位置控制(考虑缩放) # 获取自定义尺寸,如果没有提供则使用变换后的实际尺寸 width = kwargs.get('width', actual_size.x) length = kwargs.get('length', actual_size.y) height = kwargs.get('height', actual_size.z) # 支持位置偏移 pos_offset = kwargs.get('position_offset', Vec3(0, 0, 0)) box_center = Point3(center.x + pos_offset.x, center.y + pos_offset.y, center.z + pos_offset.z) # 计算盒子的最小和最大点(基于偏移后的中心) half_width = width / 2 half_length = length / 2 half_height = height / 2 min_point = Point3(box_center.x - half_width, box_center.y - half_length, box_center.z - half_height) max_point = Point3(box_center.x + half_width, box_center.y + half_length, box_center.z + half_height) return CollisionBox(min_point, max_point) elif shape_type == 'capsule': # 优化胶囊体碰撞 - 更合理的比例和位置控制(考虑缩放) # 使用变换后的实际高度,或自定义高度 custom_height = kwargs.get('height', actual_size.z) # 更合理的半径计算:基于变换后模型宽度的平均值 default_radius = min(actual_size.x, actual_size.y) / 2.5 # 稍微小于模型的宽度 custom_radius = kwargs.get('radius', min(default_radius, custom_height * 0.4)) # 支持位置偏移 pos_offset = kwargs.get('position_offset', Vec3(0, 0, 0)) capsule_center = Point3(center.x + pos_offset.x, center.y + pos_offset.y, center.z + pos_offset.z) # 计算胶囊体的两个端点(确保半径不会超出高度) effective_height = max(custom_height, custom_radius * 2.1) # 确保高度至少是半径的2倍多一点 point_a = Point3(capsule_center.x, capsule_center.y, capsule_center.z - effective_height/2 + custom_radius) point_b = Point3(capsule_center.x, capsule_center.y, capsule_center.z + effective_height/2 - custom_radius) return CollisionCapsule(point_a, point_b, custom_radius) elif shape_type == 'plane': # 优化平面碰撞 - 支持位置偏移和更灵活的法向量 normal = kwargs.get('normal', Vec3(0, 0, 1)) # 支持位置偏移 pos_offset = kwargs.get('position_offset', Vec3(0, 0, 0)) plane_point = kwargs.get('point', Point3(center.x + pos_offset.x, center.y + pos_offset.y, center.z + pos_offset.z)) # 标准化法向量 normal.normalize() plane = Plane(normal, plane_point) return CollisionPlane(plane) elif shape_type == 'polygon': # === 创建多边形碰撞体 === # CollisionPolygon特点: # - 单个平面多边形,所有顶点必须共面 # - 适用于:地板、墙面、简单平台等平面区域 # - 性能较好,但只能表示平面形状 # # Mesh碰撞体特点: # - 由多个三角形组成的复杂3D形状 # - 适用于:复杂地形、建筑物、不规则物体 # - 性能较差,但可以表示任意复杂形状 vertices = kwargs.get('vertices', []) if len(vertices) >= 3: # 将顶点转换为Point3对象 # 要求:所有顶点必须在同一平面上 collision_poly = CollisionPolygon(*[Point3(*v) for v in vertices]) return collision_poly else: #print("⚠️ 多边形至少需要3个顶点,回退到球体") return CollisionSphere(center, radius) def _determineOptimalShape(self, model, transformed_info): """根据模型特征自动确定最适合的碰撞体形状""" # 获取变换后的模型尺寸比例 size = transformed_info['size'] max_dim = max(size.x, size.y, size.z) min_dim = min(size.x, size.y, size.z) # 根据模型名称和比例判断 model_name = model.getName().lower() # 角色类模型使用胶囊体 if any(keyword in model_name for keyword in ['character', 'human', 'player', 'npc']): return 'capsule' # 建筑、箱子类使用包围盒 if any(keyword in model_name for keyword in ['building', 'box', 'cube', 'wall', 'door']): return 'box' # 地面、平台使用平面 if any(keyword in model_name for keyword in ['ground', 'floor', 'platform', 'terrain']): return 'plane' # 根据尺寸比例判断 aspect_ratio = max_dim / min_dim if min_dim > 0 else 1 if aspect_ratio > 3: # 细长物体用胶囊体 return 'capsule' elif aspect_ratio < 1.5: # 接近球形用球体 return 'sphere' else: # 其他用包围盒 return 'box' def _getTransformedModelInfo(self, model): """获取考虑变换后的模型信息 Args: model: 模型节点 Returns: dict: 包含变换后的尺寸、中心、半径等信息 """ try: # 获取原始包围盒 bounds = model.getBounds() if bounds.isEmpty(): return None # 获取模型的变换矩阵 transform = model.getTransform() scale = model.getScale() # 计算缩放因子(取三轴缩放的平均值) scale_factor = (abs(scale.x) + abs(scale.y) + abs(scale.z)) / 3.0 # 获取原始尺寸 original_size = bounds.getMax() - bounds.getMin() # 应用缩放到尺寸 actual_size = Vec3( original_size.x * abs(scale.x), original_size.y * abs(scale.y), original_size.z * abs(scale.z) ) # 获取变换后的中心点(在世界坐标系中) original_center = bounds.getCenter() if hasattr(model, 'getPos'): # 模型在世界坐标系中的位置 world_center = model.getPos(model.getParent() if model.getParent() else model) center = Point3(world_center.x, world_center.y, world_center.z) else: # 如果无法获取世界位置,使用原始中心 center = original_center # 计算变换后的半径(考虑缩放) original_radius = bounds.getRadius() transformed_radius = original_radius * scale_factor # 调试信息 print(f"🔍 模型 {model.getName()} 变换信息:") print(f" 原始尺寸: {original_size}") print(f" 缩放因子: {scale}") print(f" 变换后尺寸: {actual_size}") print(f" 变换后半径: {transformed_radius:.2f}") return { 'center': center, 'radius': transformed_radius, 'size': actual_size, 'scale_factor': scale_factor, 'original_size': original_size, 'scale': scale, 'transform': transform } except Exception as e: print(f"⚠️ 获取模型变换信息失败: {e}") # 回退到原始包围盒 bounds = model.getBounds() if bounds.isEmpty(): return None original_size = bounds.getMax() - bounds.getMin() return { 'center': bounds.getCenter(), 'radius': bounds.getRadius(), 'size': original_size, 'scale_factor': 1.0, 'original_size': original_size, 'scale': Vec3(1, 1, 1), 'transform': None } def createMouseRay(self, screen_x, screen_y, mask_types=['SELECTABLE']): """创建鼠标射线""" # 组合掩码 combined_mask = BitMask32.allOff() for mask_type in mask_types: if mask_type in self.MASKS: combined_mask |= self.MASKS[mask_type] # 坐标转换 near_point, far_point = self.world.event_handler.robustCoordinateConversion( screen_x, screen_y) if not near_point: return None # 创建射线节点 ray_node = CollisionNode('mouse_ray') ray_node.setFromCollideMask(combined_mask) ray_node.setIntoCollideMask(BitMask32.allOff()) # 创建射线 direction = far_point - near_point direction.normalize() ray = CollisionRay(near_point, direction) ray_node.addSolid(ray) return ray_node, near_point, far_point def performRaycast(self, ray_node, scene_root=None): """执行射线检测""" if scene_root is None: scene_root = self.world.render # 创建临时节点路径 ray_np = self.world.cam.attachNewNode(ray_node) try: # 清除之前的结果 self.queue.clearEntries() # 添加碰撞器 self.traverser.addCollider(ray_np, self.queue) # 执行遍历 start_time = time.perf_counter() self.traverser.traverse(scene_root) detection_time = time.perf_counter() - start_time # 记录性能 self.performance_monitor.recordDetection( detection_time, self.queue.getNumEntries()) # 处理结果 results = [] if self.queue.getNumEntries() > 0: self.queue.sortEntries() for i in range(self.queue.getNumEntries()): entry = self.queue.getEntry(i) results.append({ 'hit_pos': entry.getSurfacePoint(scene_root), 'hit_normal': entry.getSurfaceNormal(scene_root), 'hit_node': entry.getIntoNodePath(), 'distance': entry.getT() }) return results finally: # 清理资源 self.traverser.removeCollider(ray_np) ray_np.removeNode() def getCollisionMask(self, mask_type): """获取碰撞掩码""" return self.MASKS.get(mask_type, BitMask32.allOff()) def createCollisionGroup(self, group_name, mask_types, enabled=True): """创建碰撞分组 Args: group_name: 分组名称 mask_types: 掩码类型列表 enabled: 是否启用 """ combined_mask = BitMask32.allOff() for mask_type in mask_types: if mask_type in self.MASKS: combined_mask |= self.MASKS[mask_type] self.collision_groups[group_name] = { 'mask': combined_mask, 'types': mask_types, 'objects': [] } self.group_enabled[group_name] = enabled print(f"✅ 创建碰撞分组: {group_name}, 掩码: {mask_types}") def addToCollisionGroup(self, group_name, model, collision_node): """将模型添加到碰撞分组""" if group_name in self.collision_groups: self.collision_groups[group_name]['objects'].append({ 'model': model, 'collision_node': collision_node }) def enableCollisionGroup(self, group_name, enabled=True): """启用/禁用碰撞分组""" if group_name in self.group_enabled: self.group_enabled[group_name] = enabled # 更新分组中所有对象的碰撞状态 if group_name in self.collision_groups: for obj in self.collision_groups[group_name]['objects']: collision_node = obj['collision_node'] if enabled: collision_node.show() else: collision_node.hide() print(f"{'✅ 启用' if enabled else '❌ 禁用'}碰撞分组: {group_name}") def registerCollisionCallback(self, mask_a, mask_b, callback_func, filter_func=None): """注册碰撞回调函数 Args: mask_a: 第一个掩码类型 mask_b: 第二个掩码类型 callback_func: 碰撞回调函数 callback(collision_info) filter_func: 过滤函数 filter(model_a, model_b) -> bool """ key = tuple(sorted([mask_a, mask_b])) if key not in self.collision_callbacks: self.collision_callbacks[key] = [] self.collision_callbacks[key].append(callback_func) if filter_func: if key not in self.collision_filters: self.collision_filters[key] = [] self.collision_filters[key].append(filter_func) print(f"✅ 注册碰撞回调: {mask_a} <-> {mask_b}") def _executeCollisionCallbacks(self, collision_info): """执行碰撞回调""" model_a = collision_info['model_a']['node'] model_b = collision_info['model_b']['node'] # 获取模型的掩码类型 mask_a = self._getModelMaskType(model_a) mask_b = self._getModelMaskType(model_b) if mask_a and mask_b: key = tuple(sorted([mask_a, mask_b])) # 检查过滤条件 if key in self.collision_filters: for filter_func in self.collision_filters[key]: if not filter_func(model_a, model_b): return # 执行回调 if key in self.collision_callbacks: for callback_func in self.collision_callbacks[key]: try: callback_func(collision_info) except Exception as e: print(f"❌ 碰撞回调执行失败: {e}") def _getModelMaskType(self, model): """获取模型的掩码类型""" # 从模型标签或属性中获取掩码类型 if model.hasTag('collision_mask_type'): return model.getTag('collision_mask_type') return None def enableSpatialPartitioning(self, enabled=True, max_depth=6, max_objects=10): """启用空间分割优化 Args: enabled: 是否启用 max_depth: 八叉树最大深度 max_objects: 每个节点最大对象数 """ self.spatial_partitioning_enabled = enabled self.octree_max_depth = max_depth self.octree_max_objects = max_objects if enabled: self._buildOctree() print(f"✅ 启用空间分割优化 - 深度: {max_depth}, 对象数: {max_objects}") else: self.octree = None print("❌ 禁用空间分割优化") def _buildOctree(self): """构建八叉树""" if not hasattr(self.world, 'models') or not self.world.models: return # 计算场景边界 min_bound = Point3(float('inf'), float('inf'), float('inf')) max_bound = Point3(float('-inf'), float('-inf'), float('-inf')) for model in self.world.models: bounds = model.getBounds() if not bounds.isEmpty(): model_min = bounds.getMin() model_max = bounds.getMax() min_bound.x = min(min_bound.x, model_min.x) min_bound.y = min(min_bound.y, model_min.y) min_bound.z = min(min_bound.z, model_min.z) max_bound.x = max(max_bound.x, model_max.x) max_bound.y = max(max_bound.y, model_max.y) max_bound.z = max(max_bound.z, model_max.z) # 创建八叉树根节点 self.octree = OctreeNode(min_bound, max_bound, 0, self.octree_max_depth, self.octree_max_objects) # 将模型插入八叉树 for model in self.world.models: self.octree.insert(model) def detectModelCollisionsOptimized(self, specific_models=None, log_results=True): """使用空间分割优化的碰撞检测""" if not self.spatial_partitioning_enabled or not self.octree: return self.detectModelCollisions(specific_models, log_results) start_time = time.perf_counter() current_time = datetime.now() models_to_check = specific_models if specific_models else self.world.models if len(models_to_check) < 2: return [] collision_results = [] current_collision_pairs = set() # 使用八叉树进行优化检测 for model in models_to_check: # 获取可能碰撞的模型列表 potential_collisions = self.octree.query(model) for other_model in potential_collisions: if model == other_model: continue pair_key = tuple(sorted([id(model), id(other_model)])) if pair_key in current_collision_pairs: continue collision_info = self._checkModelPairCollision(model, other_model, current_time) if collision_info: collision_results.append(collision_info) current_collision_pairs.add(pair_key) # 执行回调 self._executeCollisionCallbacks(collision_info) # 只有新碰撞才打印日志 if log_results and pair_key not in self.active_collisions: self._logCollisionInfo(collision_info) print(f"🆕 新碰撞检测到!") # 更新活跃碰撞状态 ended_collisions = self.active_collisions - current_collision_pairs for pair_key in ended_collisions: print(f"✅ 碰撞结束: 模型对 {pair_key}") self.active_collisions = current_collision_pairs # 记录性能 detection_time = time.perf_counter() - start_time self.performance_monitor.recordModelCollisionDetection( detection_time, len(models_to_check), len(collision_results)) return collision_results def cleanup(self): """清理碰撞管理器资源""" self._stopCollisionDetectionTask() self.collision_history.clear() self.active_collisions.clear() print("✅ 碰撞管理器已清理") def setupAdvancedCollision(self, model, shape_type='auto', mask_type='SELECTABLE', group_name=None, **shape_kwargs): """高级碰撞设置方法 Args: model: 模型节点 shape_type: 碰撞体形状类型 mask_type: 掩码类型 group_name: 碰撞分组名称 **shape_kwargs: 形状特定参数 Returns: collision_node_path: 碰撞节点路径 """ # 创建碰撞节点 cNode = CollisionNode(f'collision_{model.getName()}') # 设置掩码 if mask_type in self.MASKS: cNode.setIntoCollideMask(self.MASKS[mask_type]) # 设置模型标签用于回调识别 model.setTag('collision_mask_type', mask_type) # 创建碰撞体 collision_solid = self.createCollisionShape(model, shape_type, **shape_kwargs) cNode.addSolid(collision_solid) # 附加到模型 cNodePath = model.attachNewNode(cNode) # 添加到分组 if group_name: if group_name not in self.collision_groups: self.createCollisionGroup(group_name, [mask_type]) self.addToCollisionGroup(group_name, model, cNodePath) # 根据调试设置决定是否显示 if hasattr(self.world, 'debug_collision') and self.world.debug_collision: cNodePath.show() else: cNodePath.hide() print(f"✅ 为 {model.getName()} 设置高级碰撞 - 形状: {shape_type}, 掩码: {mask_type}") return cNodePath def example_usage(self): """碰撞系统使用示例""" print("\n=== 碰撞系统使用示例 ===") # 1. 创建碰撞分组 self.createCollisionGroup('characters', ['CHARACTER', 'DYNAMIC_OBJECT']) self.createCollisionGroup('environment', ['STATIC_GEOMETRY', 'TERRAIN']) self.createCollisionGroup('pickups', ['PICKUP_ITEM', 'INTERACTIVE']) # 2. 注册碰撞回调 def character_pickup_callback(collision_info): print(f"角色拾取物品: {collision_info['model_a']['name']} -> {collision_info['model_b']['name']}") def character_environment_callback(collision_info): print(f"角色碰撞环境: {collision_info['model_a']['name']} 碰到 {collision_info['model_b']['name']}") self.registerCollisionCallback('CHARACTER', 'PICKUP_ITEM', character_pickup_callback) self.registerCollisionCallback('CHARACTER', 'STATIC_GEOMETRY', character_environment_callback) # 3. 启用空间分割优化 self.enableSpatialPartitioning(enabled=True, max_depth=6, max_objects=8) # 4. 为模型设置不同类型的碰撞体 # model1 = ... # 假设已有模型 # self.setupAdvancedCollision(model1, 'capsule', 'CHARACTER', 'characters') # self.setupAdvancedCollision(model2, 'box', 'PICKUP_ITEM', 'pickups') # self.setupAdvancedCollision(model3, 'plane', 'TERRAIN', 'environment') print("✅ 碰撞系统示例配置完成") class CollisionPerformanceMonitor: """碰撞检测性能监控器""" def __init__(self): self.detection_times = [] self.collision_counts = [] self.model_collision_times = [] # 新增:模型间碰撞检测时间 self.model_collision_data = [] # 新增:模型间碰撞数据 self.max_records = 100 def recordDetection(self, detection_time, collision_count): """记录射线检测性能""" self.detection_times.append(detection_time) self.collision_counts.append(collision_count) # 限制记录数量 if len(self.detection_times) > self.max_records: self.detection_times.pop(0) self.collision_counts.pop(0) def recordModelCollisionDetection(self, detection_time, model_count, collision_count): """记录模型间碰撞检测性能""" self.model_collision_times.append(detection_time) self.model_collision_data.append({ 'model_count': model_count, 'collision_count': collision_count, 'timestamp': time.time() }) # 限制记录数量 if len(self.model_collision_times) > self.max_records: self.model_collision_times.pop(0) self.model_collision_data.pop(0) def getAverageTime(self): """获取平均射线检测时间""" if not self.detection_times: return 0 return sum(self.detection_times) / len(self.detection_times) def getAverageModelCollisionTime(self): """获取平均模型间碰撞检测时间""" if not self.model_collision_times: return 0 return sum(self.model_collision_times) / len(self.model_collision_times) def getPerformanceReport(self): """获取完整性能报告""" report = [] # 射线检测性能 if self.detection_times: avg_time = self.getAverageTime() max_time = max(self.detection_times) avg_collisions = sum(self.collision_counts) / len(self.collision_counts) report.append("=== 射线检测性能 ===") report.append(f"平均检测时间: {avg_time*1000:.2f}ms") report.append(f"最大检测时间: {max_time*1000:.2f}ms") report.append(f"平均碰撞数量: {avg_collisions:.1f}") report.append(f"检测次数: {len(self.detection_times)}") # 模型间碰撞检测性能 if self.model_collision_times: avg_model_time = self.getAverageModelCollisionTime() max_model_time = max(self.model_collision_times) avg_model_count = sum(d['model_count'] for d in self.model_collision_data) / len(self.model_collision_data) total_collisions = sum(d['collision_count'] for d in self.model_collision_data) report.append("\n=== 模型间碰撞检测性能 ===") report.append(f"平均检测时间: {avg_model_time*1000:.2f}ms") report.append(f"最大检测时间: {max_model_time*1000:.2f}ms") report.append(f"平均模型数量: {avg_model_count:.1f}") report.append(f"总碰撞次数: {total_collisions}") report.append(f"检测次数: {len(self.model_collision_times)}") return "\n".join(report) if report else "没有性能数据" class OctreeNode: """八叉树节点""" def __init__(self, min_bound, max_bound, depth, max_depth, max_objects): self.min_bound = min_bound self.max_bound = max_bound self.depth = depth self.max_depth = max_depth self.max_objects = max_objects self.objects = [] self.children = [] self.is_leaf = True def insert(self, model): """插入模型到八叉树""" if not self._contains(model): return False if self.is_leaf: self.objects.append(model) # 如果超过最大对象数且未达到最大深度,则分割 if len(self.objects) > self.max_objects and self.depth < self.max_depth: self._subdivide() # 重新分配对象到子节点 remaining_objects = [] for obj in self.objects: inserted = False for child in self.children: if child.insert(obj): inserted = True break if not inserted: remaining_objects.append(obj) self.objects = remaining_objects else: # 尝试插入到子节点 inserted = False for child in self.children: if child.insert(model): inserted = True break if not inserted: self.objects.append(model) return True def query(self, model): """查询可能与指定模型碰撞的模型列表""" if not self._contains(model): return [] result = list(self.objects) if not self.is_leaf: for child in self.children: result.extend(child.query(model)) return result def _contains(self, model): """检查模型是否在此节点范围内""" bounds = model.getBounds() if bounds.isEmpty(): return False model_min = bounds.getMin() model_max = bounds.getMax() return (model_max.x >= self.min_bound.x and model_min.x <= self.max_bound.x and model_max.y >= self.min_bound.y and model_min.y <= self.max_bound.y and model_max.z >= self.min_bound.z and model_min.z <= self.max_bound.z) def _subdivide(self): """分割节点为8个子节点""" center = Point3( (self.min_bound.x + self.max_bound.x) * 0.5, (self.min_bound.y + self.max_bound.y) * 0.5, (self.min_bound.z + self.max_bound.z) * 0.5 ) # 创建8个子节点 subdivisions = [ (self.min_bound, center), (Point3(center.x, self.min_bound.y, self.min_bound.z), Point3(self.max_bound.x, center.y, center.z)), (Point3(self.min_bound.x, center.y, self.min_bound.z), Point3(center.x, self.max_bound.y, center.z)), (Point3(center.x, center.y, self.min_bound.z), Point3(self.max_bound.x, self.max_bound.y, center.z)), (Point3(self.min_bound.x, self.min_bound.y, center.z), Point3(center.x, center.y, self.max_bound.z)), (Point3(center.x, self.min_bound.y, center.z), Point3(self.max_bound.x, center.y, self.max_bound.z)), (Point3(self.min_bound.x, center.y, center.z), Point3(center.x, self.max_bound.y, self.max_bound.z)), (center, self.max_bound) ] for min_b, max_b in subdivisions: child = OctreeNode(min_b, max_b, self.depth + 1, self.max_depth, self.max_objects) self.children.append(child) self.is_leaf = False