162 lines
6.5 KiB
Python
162 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Collision Checker Module
|
||
|
||
路径规划阶段的碰撞检测工具。
|
||
核心目标:任何碰撞检查都不改变真实仿真世界(无副作用)。
|
||
做到这一点的方法是:在 save/restore 的状态下用 resetJointState 设置“试配置”,
|
||
只触发碰撞检测(performCollisionDetection),而不 stepSimulation。
|
||
"""
|
||
|
||
import numpy as np
|
||
from typing import List, Tuple
|
||
import pybullet as p
|
||
|
||
|
||
class CollisionChecker:
|
||
"""碰撞检测器(规划期无副作用)"""
|
||
|
||
def __init__(self, arm_controller, environment, config_loader):
|
||
# 依赖
|
||
self.arm_controller = arm_controller
|
||
self.environment = environment
|
||
self.config_loader = config_loader
|
||
|
||
# 配置
|
||
cfg = config_loader.get_full_config()
|
||
c_cfg = cfg['path_planning']['collision']
|
||
self.check_resolution = c_cfg['check_resolution']
|
||
self.safety_margin = c_cfg['safety_margin']
|
||
|
||
# 资源句柄
|
||
self.robot_loader = arm_controller.robot_loader
|
||
self.robot_id = self.robot_loader.get_robot_id()
|
||
self.physics_client = self.robot_loader.physics_client
|
||
|
||
def _iter_obstacles(self):
|
||
"""遍历需要考虑的环境障碍物 body ids(墙体各分段、可选 transport 物体)。"""
|
||
# 墙体可能由多个 body 组成
|
||
wall_parts = getattr(self.environment, '_wall_part_ids', []) or []
|
||
for wid in wall_parts:
|
||
if wid is not None:
|
||
yield wid
|
||
# 可选地加入运输物体(若要把它也视为障碍)
|
||
if getattr(self.environment, 'transport_object_id', None) is not None:
|
||
yield self.environment.transport_object_id
|
||
|
||
def check_collision(self, joint_config: List[float]) -> bool:
|
||
"""检测给定关节配置是否发生碰撞(含安全边界)"""
|
||
# 基本有效性
|
||
if not self.arm_controller.validate_joint_configuration(joint_config):
|
||
return True
|
||
|
||
state_id = p.saveState(physicsClientId=self.physics_client)
|
||
try:
|
||
# 将机器人打到“试配置”(仅状态,不经电机/动力学)
|
||
joint_indices = self.robot_loader.joint_indices
|
||
if len(joint_config) != len(joint_indices):
|
||
return True
|
||
for i, jid in enumerate(joint_indices):
|
||
p.resetJointState(self.robot_id, jid, float(joint_config[i]), physicsClientId=self.physics_client)
|
||
|
||
# 触发碰撞检测(不推进世界时间)
|
||
try:
|
||
p.performCollisionDetection(physicsClientId=self.physics_client)
|
||
except Exception:
|
||
pass
|
||
|
||
# 自碰撞(严格接触)
|
||
if len(p.getContactPoints(self.robot_id, self.robot_id, physicsClientId=self.physics_client)) > 0:
|
||
return True
|
||
|
||
# 环境碰撞(含安全边界,使用最近点)
|
||
# 与墙体各分段/其他障碍
|
||
for obs_id in self._iter_obstacles():
|
||
cps = p.getClosestPoints(
|
||
self.robot_id,
|
||
int(obs_id),
|
||
distance=float(max(0.0, self.safety_margin)),
|
||
physicsClientId=self.physics_client,
|
||
)
|
||
if len(cps) > 0:
|
||
return True
|
||
|
||
# 与地面(排除基座)
|
||
if self.environment.ground_plane_id is not None:
|
||
cps = p.getClosestPoints(
|
||
self.robot_id,
|
||
self.environment.ground_plane_id,
|
||
distance=float(max(0.0, self.safety_margin)),
|
||
physicsClientId=self.physics_client,
|
||
)
|
||
for cp in cps:
|
||
# cp[3]: link index on robot
|
||
if int(cp[3]) > 0:
|
||
return True
|
||
|
||
return False
|
||
finally:
|
||
p.restoreState(state_id, physicsClientId=self.physics_client)
|
||
|
||
def check_collision_batch(self, joint_configs: List[List[float]]) -> List[bool]:
|
||
"""批量检查配置是否碰撞(无副作用)"""
|
||
return [self.check_collision(cfg) for cfg in joint_configs]
|
||
|
||
def check_path_collision(self, path: List[List[float]], resolution: float = None) -> bool:
|
||
"""检测整条路径是否存在碰撞"""
|
||
if len(path) < 2:
|
||
return False
|
||
if resolution is None:
|
||
resolution = self.check_resolution
|
||
|
||
for i in range(len(path) - 1):
|
||
start = np.array(path[i])
|
||
end = np.array(path[i + 1])
|
||
distance = np.linalg.norm(end - start)
|
||
num_checks = max(2, int(distance / resolution))
|
||
for j in range(num_checks):
|
||
t = j / (num_checks - 1)
|
||
interpolated = (start + t * (end - start)).tolist()
|
||
if self.check_collision(interpolated):
|
||
return True
|
||
return False
|
||
|
||
def get_obstacle_distance(self, joint_config: List[float]) -> float:
|
||
"""估算到墙体的最近距离(米)"""
|
||
state_id = p.saveState(physicsClientId=self.physics_client)
|
||
try:
|
||
joint_indices = self.robot_loader.joint_indices
|
||
if len(joint_config) != len(joint_indices):
|
||
return float('inf')
|
||
for i, jid in enumerate(joint_indices):
|
||
p.resetJointState(self.robot_id, jid, float(joint_config[i]), physicsClientId=self.physics_client)
|
||
try:
|
||
p.performCollisionDetection(physicsClientId=self.physics_client)
|
||
except Exception:
|
||
pass
|
||
|
||
min_distance = float('inf')
|
||
for obs_id in self._iter_obstacles():
|
||
cps = p.getClosestPoints(
|
||
self.robot_id,
|
||
int(obs_id),
|
||
distance=1.0,
|
||
physicsClientId=self.physics_client,
|
||
)
|
||
for cp in cps:
|
||
d = float(cp[8])
|
||
if d < min_distance:
|
||
min_distance = d
|
||
return min_distance
|
||
finally:
|
||
p.restoreState(state_id, physicsClientId=self.physics_client)
|
||
|
||
def validate_configuration(self, joint_config: List[float]) -> Tuple[bool, str]:
|
||
"""验证配置是否在限位内且无碰撞"""
|
||
is_valid, violations = self.arm_controller.check_joint_limits(joint_config)
|
||
if not is_valid:
|
||
return False, f"Joint limit violations: {', '.join(violations)}"
|
||
if self.check_collision(joint_config):
|
||
return False, "Collision detected"
|
||
return True, ""
|