EG/plugins/user/procedural_terrain_generation/utils/terrain_utils.py
2025-12-12 16:16:15 +08:00

1651 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
地形工具类
提供各种地形处理和分析工具
"""
import numpy as np
import math
from typing import Dict, Any, List, Tuple, Optional
import json
import time
class TerrainUtils:
"""
地形工具类
提供各种地形处理和分析工具,包括地形分析、过滤、转换等功能
"""
def __init__(self, plugin):
"""
初始化地形工具类
Args:
plugin: 程序化地形生成插件实例
"""
self.plugin = plugin
self.enabled = False
self.initialized = False
# 工具配置
self.tool_params = {
'analysis_resolution': 64,
'filter_radius': 3,
'erosion_iterations': 10,
'smoothing_iterations': 5,
'slope_threshold': 0.3,
'curvature_threshold': 0.1
}
# 分析结果缓存
self.analysis_cache = {}
# 统计信息
self.stats = {
'tools_used': 0,
'total_processing_time': 0.0,
'average_processing_time': 0.0,
'cache_hits': 0,
'cache_misses': 0
}
print("✓ 地形工具类已创建")
def initialize(self) -> bool:
"""
初始化地形工具类
Returns:
是否初始化成功
"""
try:
self.initialized = True
print("✓ 地形工具类初始化完成")
return True
except Exception as e:
print(f"✗ 地形工具类初始化失败: {e}")
import traceback
traceback.print_exc()
return False
def enable(self) -> bool:
"""
启用地形工具类
Returns:
是否启用成功
"""
try:
if not self.initialized:
print("✗ 地形工具类未初始化")
return False
self.enabled = True
print("✓ 地形工具类已启用")
return True
except Exception as e:
print(f"✗ 地形工具类启用失败: {e}")
import traceback
traceback.print_exc()
return False
def disable(self):
"""禁用地形工具类"""
try:
self.enabled = False
print("✓ 地形工具类已禁用")
except Exception as e:
print(f"✗ 地形工具类禁用失败: {e}")
import traceback
traceback.print_exc()
def finalize(self):
"""清理地形工具类资源"""
try:
self.disable()
self.initialized = False
print("✓ 地形工具类资源已清理")
except Exception as e:
print(f"✗ 地形工具类资源清理失败: {e}")
import traceback
traceback.print_exc()
def update(self, dt: float):
"""
更新地形工具类状态
Args:
dt: 时间增量
"""
# 处理更新逻辑
pass
def analyze_terrain_slope(self, heightmap: np.ndarray) -> np.ndarray:
"""
分析地形坡度
Args:
heightmap: 高度图数据
Returns:
坡度图数据 (弧度)
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return np.zeros_like(heightmap, dtype=np.float32)
print("✓ 开始分析地形坡度...")
height, width = heightmap.shape
slope_map = np.zeros((height, width), dtype=np.float32)
# 计算每个点的坡度
for y in range(height):
for x in range(width):
# 计算梯度
slope = self._calculate_slope_at_point(heightmap, x, y)
slope_map[y, x] = slope
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形坡度分析完成,耗时: {processing_time:.3f}")
return slope_map
except Exception as e:
print(f"✗ 地形坡度分析失败: {e}")
import traceback
traceback.print_exc()
return np.zeros_like(heightmap, dtype=np.float32)
def _calculate_slope_at_point(self, heightmap: np.ndarray, x: int, y: int) -> float:
"""
计算指定点的坡度
Args:
heightmap: 高度图数据
x: X坐标
y: Y坐标
Returns:
坡度值 (弧度)
"""
try:
height, width = heightmap.shape
# 计算梯度
dx = 0.0
dy = 0.0
# X方向梯度
if x > 0 and x < width - 1:
dx = (heightmap[y, x + 1] - heightmap[y, x - 1]) / 2.0
elif x == 0:
dx = heightmap[y, x + 1] - heightmap[y, x]
else:
dx = heightmap[y, x] - heightmap[y, x - 1]
# Y方向梯度
if y > 0 and y < height - 1:
dy = (heightmap[y + 1, x] - heightmap[y - 1, x]) / 2.0
elif y == 0:
dy = heightmap[y + 1, x] - heightmap[y, x]
else:
dy = heightmap[y, x] - heightmap[y - 1, x]
# 计算坡度(梯度的模)
gradient_magnitude = math.sqrt(dx * dx + dy * dy)
slope = math.atan(gradient_magnitude)
return slope
except Exception as e:
print(f"✗ 点坡度计算失败: {e}")
return 0.0
def analyze_terrain_curvature(self, heightmap: np.ndarray) -> np.ndarray:
"""
分析地形曲率
Args:
heightmap: 高度图数据
Returns:
曲率图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return np.zeros_like(heightmap, dtype=np.float32)
print("✓ 开始分析地形曲率...")
height, width = heightmap.shape
curvature_map = np.zeros((height, width), dtype=np.float32)
# 计算每个点的曲率
for y in range(1, height - 1):
for x in range(1, width - 1):
# 使用拉普拉斯算子计算曲率
center = heightmap[y, x]
laplacian = (
heightmap[y-1, x] + heightmap[y+1, x] +
heightmap[y, x-1] + heightmap[y, x+1] - 4 * center
)
curvature_map[y, x] = laplacian
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形曲率分析完成,耗时: {processing_time:.3f}")
return curvature_map
except Exception as e:
print(f"✗ 地形曲率分析失败: {e}")
import traceback
traceback.print_exc()
return np.zeros_like(heightmap, dtype=np.float32)
def analyze_terrain_aspect(self, heightmap: np.ndarray) -> np.ndarray:
"""
分析地形坡向
Args:
heightmap: 高度图数据
Returns:
坡向图数据 (弧度)
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return np.zeros_like(heightmap, dtype=np.float32)
print("✓ 开始分析地形坡向...")
height, width = heightmap.shape
aspect_map = np.zeros((height, width), dtype=np.float32)
# 计算每个点的坡向
for y in range(1, height - 1):
for x in range(1, width - 1):
# 计算梯度
dx = (heightmap[y, x + 1] - heightmap[y, x - 1]) / 2.0
dy = (heightmap[y + 1, x] - heightmap[y - 1, x]) / 2.0
# 计算坡向
if dx == 0 and dy == 0:
aspect = 0.0
else:
aspect = math.atan2(dy, dx)
# 转换为0-2π范围
if aspect < 0:
aspect += 2 * math.pi
aspect_map[y, x] = aspect
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形坡向分析完成,耗时: {processing_time:.3f}")
return aspect_map
except Exception as e:
print(f"✗ 地形坡向分析失败: {e}")
import traceback
traceback.print_exc()
return np.zeros_like(heightmap, dtype=np.float32)
def analyze_terrain_rugosity(self, heightmap: np.ndarray, window_size: int = 3) -> np.ndarray:
"""
分析地形粗糙度
Args:
heightmap: 高度图数据
window_size: 窗口大小
Returns:
粗糙度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return np.zeros_like(heightmap, dtype=np.float32)
print("✓ 开始分析地形粗糙度...")
height, width = heightmap.shape
rugosity_map = np.zeros((height, width), dtype=np.float32)
half_window = window_size // 2
# 计算每个点的粗糙度
for y in range(half_window, height - half_window):
for x in range(half_window, width - half_window):
# 计算局部窗口内的标准差
window = heightmap[y-half_window:y+half_window+1, x-half_window:x+half_window+1]
rugosity = np.std(window)
rugosity_map[y, x] = rugosity
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形粗糙度分析完成,耗时: {processing_time:.3f}")
return rugosity_map
except Exception as e:
print(f"✗ 地形粗糙度分析失败: {e}")
import traceback
traceback.print_exc()
return np.zeros_like(heightmap, dtype=np.float32)
def smooth_terrain(self, heightmap: np.ndarray, iterations: int = None) -> np.ndarray:
"""
平滑地形
Args:
heightmap: 高度图数据
iterations: 平滑迭代次数
Returns:
平滑后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
if iterations is None:
iterations = self.tool_params['smoothing_iterations']
print(f"✓ 开始平滑地形 ({iterations} 次迭代)...")
smoothed = heightmap.copy()
height, width = smoothed.shape
# 多次平滑迭代
for iteration in range(iterations):
new_heightmap = smoothed.copy()
# 应用3x3平均滤波器
for y in range(1, height - 1):
for x in range(1, width - 1):
# 计算3x3邻域的平均值
avg_value = (
smoothed[y-1, x-1] + smoothed[y-1, x] + smoothed[y-1, x+1] +
smoothed[y, x-1] + smoothed[y, x] + smoothed[y, x+1] +
smoothed[y+1, x-1] + smoothed[y+1, x] + smoothed[y+1, x+1]
) / 9.0
new_heightmap[y, x] = avg_value
smoothed = new_heightmap
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形平滑完成,耗时: {processing_time:.3f}")
return smoothed
except Exception as e:
print(f"✗ 地形平滑失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def filter_terrain(self, heightmap: np.ndarray, filter_type: str = 'gaussian',
radius: int = None) -> np.ndarray:
"""
过滤地形
Args:
heightmap: 高度图数据
filter_type: 滤波器类型 ('gaussian', 'median', 'bilateral')
radius: 滤波器半径
Returns:
过滤后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
if radius is None:
radius = self.tool_params['filter_radius']
print(f"✓ 开始{filter_type}滤波地形 (半径: {radius})...")
filtered = heightmap.copy()
height, width = filtered.shape
kernel_size = radius * 2 + 1
if filter_type == 'gaussian':
filtered = self._apply_gaussian_filter(filtered, radius)
elif filter_type == 'median':
filtered = self._apply_median_filter(filtered, radius)
elif filter_type == 'bilateral':
filtered = self._apply_bilateral_filter(filtered, radius)
else:
print(f"✗ 不支持的滤波器类型: {filter_type}")
return heightmap
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形{filter_type}滤波完成,耗时: {processing_time:.3f}")
return filtered
except Exception as e:
print(f"✗ 地形滤波失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def _apply_gaussian_filter(self, heightmap: np.ndarray, radius: int) -> np.ndarray:
"""应用高斯滤波"""
try:
filtered = heightmap.copy()
height, width = filtered.shape
kernel_size = radius * 2 + 1
# 创建高斯核
kernel = np.zeros((kernel_size, kernel_size), dtype=np.float32)
sigma = radius / 3.0
for y in range(kernel_size):
for x in range(kernel_size):
dx = x - radius
dy = y - radius
kernel[y, x] = math.exp(-(dx*dx + dy*dy) / (2 * sigma * sigma))
kernel = kernel / np.sum(kernel)
# 应用滤波
for y in range(radius, height - radius):
for x in range(radius, width - radius):
value = 0.0
for ky in range(kernel_size):
for kx in range(kernel_size):
ny = y + ky - radius
nx = x + kx - radius
value += heightmap[ny, nx] * kernel[ky, kx]
filtered[y, x] = value
return filtered
except Exception as e:
print(f"✗ 高斯滤波应用失败: {e}")
return heightmap
def _apply_median_filter(self, heightmap: np.ndarray, radius: int) -> np.ndarray:
"""应用中值滤波"""
try:
filtered = heightmap.copy()
height, width = filtered.shape
kernel_size = radius * 2 + 1
# 应用滤波
for y in range(radius, height - radius):
for x in range(radius, width - radius):
values = []
for ky in range(kernel_size):
for kx in range(kernel_size):
ny = y + ky - radius
nx = x + kx - radius
values.append(heightmap[ny, nx])
filtered[y, x] = np.median(values)
return filtered
except Exception as e:
print(f"✗ 中值滤波应用失败: {e}")
return heightmap
def _apply_bilateral_filter(self, heightmap: np.ndarray, radius: int) -> np.ndarray:
"""应用双边滤波"""
try:
filtered = heightmap.copy()
height, width = filtered.shape
kernel_size = radius * 2 + 1
# 双边滤波参数
spatial_sigma = radius / 3.0
intensity_sigma = 0.1
# 应用滤波
for y in range(radius, height - radius):
for x in range(radius, width - radius):
weighted_sum = 0.0
weight_sum = 0.0
center_value = heightmap[y, x]
for ky in range(kernel_size):
for kx in range(kernel_size):
ny = y + ky - radius
nx = x + kx - radius
neighbor_value = heightmap[ny, nx]
# 空间权重
dx = kx - radius
dy = ky - radius
spatial_weight = math.exp(-(dx*dx + dy*dy) / (2 * spatial_sigma * spatial_sigma))
# 强度权重
intensity_diff = abs(neighbor_value - center_value)
intensity_weight = math.exp(-(intensity_diff * intensity_diff) / (2 * intensity_sigma * intensity_sigma))
# 总权重
weight = spatial_weight * intensity_weight
weighted_sum += neighbor_value * weight
weight_sum += weight
if weight_sum > 0:
filtered[y, x] = weighted_sum / weight_sum
return filtered
except Exception as e:
print(f"✗ 双边滤波应用失败: {e}")
return heightmap
def normalize_heightmap(self, heightmap: np.ndarray, min_value: float = 0.0,
max_value: float = 1.0) -> np.ndarray:
"""
归一化高度图
Args:
heightmap: 高度图数据
min_value: 最小值
max_value: 最大值
Returns:
归一化后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
print("✓ 开始归一化高度图...")
# 计算当前范围
current_min = np.min(heightmap)
current_max = np.max(heightmap)
if current_max == current_min:
# 如果所有值相同,返回常量图
normalized = np.full_like(heightmap, (min_value + max_value) / 2.0)
else:
# 归一化到0-1范围
normalized_01 = (heightmap - current_min) / (current_max - current_min)
# 缩放到指定范围
normalized = normalized_01 * (max_value - min_value) + min_value
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 高度图归一化完成,耗时: {processing_time:.3f}")
return normalized
except Exception as e:
print(f"✗ 高度图归一化失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def invert_heightmap(self, heightmap: np.ndarray) -> np.ndarray:
"""
反转高度图
Args:
heightmap: 高度图数据
Returns:
反转后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
print("✓ 开始反转高度图...")
# 反转高度图
inverted = 1.0 - heightmap
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 高度图反转完成,耗时: {processing_time:.3f}")
return inverted
except Exception as e:
print(f"✗ 高度图反转失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def blend_heightmaps(self, heightmaps: List[np.ndarray], weights: List[float] = None) -> np.ndarray:
"""
混合多个高度图
Args:
heightmaps: 高度图列表
weights: 权重列表如果为None则平均分配权重
Returns:
混合后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return np.array([])
if not heightmaps:
print("✗ 高度图列表为空")
return np.array([])
if len(heightmaps) == 1:
return heightmaps[0].copy()
print(f"✓ 开始混合 {len(heightmaps)} 个高度图...")
# 确保所有高度图尺寸相同
base_shape = heightmaps[0].shape
for hm in heightmaps:
if hm.shape != base_shape:
print("✗ 所有高度图必须具有相同的尺寸")
return heightmaps[0].copy()
# 处理权重
if weights is None:
weights = [1.0 / len(heightmaps)] * len(heightmaps)
elif len(weights) != len(heightmaps):
print("✗ 权重数量必须与高度图数量相同")
return heightmaps[0].copy()
# 归一化权重
total_weight = sum(weights)
if total_weight > 0:
weights = [w / total_weight for w in weights]
# 混合高度图
blended = np.zeros(base_shape, dtype=np.float32)
for i, (hm, weight) in enumerate(zip(heightmaps, weights)):
blended += hm * weight
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 高度图混合完成,耗时: {processing_time:.3f}")
return blended
except Exception as e:
print(f"✗ 高度图混合失败: {e}")
import traceback
traceback.print_exc()
# 返回第一个高度图的副本
return heightmaps[0].copy() if heightmaps else np.array([])
def generate_terrain_mask(self, heightmap: np.ndarray, condition: str,
threshold: float = 0.5) -> np.ndarray:
"""
生成地形掩码
Args:
heightmap: 高度图数据
condition: 条件 ('above', 'below', 'equal', 'between')
threshold: 阈值
Returns:
掩码数据 (0-1范围)
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return np.zeros_like(heightmap, dtype=np.float32)
print(f"✓ 开始生成地形掩码 (条件: {condition}, 阈值: {threshold})...")
mask = np.zeros_like(heightmap, dtype=np.float32)
if condition == 'above':
mask = np.where(heightmap > threshold, 1.0, 0.0)
elif condition == 'below':
mask = np.where(heightmap < threshold, 1.0, 0.0)
elif condition == 'equal':
mask = np.where(np.abs(heightmap - threshold) < 0.01, 1.0, 0.0)
elif condition == 'between':
# 假设阈值是一个范围 [min, max]
if isinstance(threshold, (list, tuple)) and len(threshold) == 2:
min_val, max_val = threshold
mask = np.where((heightmap >= min_val) & (heightmap <= max_val), 1.0, 0.0)
else:
mask = np.where(np.abs(heightmap - threshold) < 0.1, 1.0, 0.0)
else:
print(f"✗ 不支持的条件: {condition}")
return mask
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形掩码生成完成,耗时: {processing_time:.3f}")
return mask
except Exception as e:
print(f"✗ 地形掩码生成失败: {e}")
import traceback
traceback.print_exc()
return np.zeros_like(heightmap, dtype=np.float32)
def apply_terrain_mask(self, heightmap: np.ndarray, mask: np.ndarray,
operation: str = 'multiply') -> np.ndarray:
"""
应用地形掩码
Args:
heightmap: 高度图数据
mask: 掩码数据
operation: 操作 ('multiply', 'add', 'subtract', 'replace')
Returns:
处理后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
# 确保掩码尺寸匹配
if mask.shape != heightmap.shape:
print("✗ 掩码尺寸与高度图不匹配")
return heightmap
print(f"✓ 开始应用地形掩码 (操作: {operation})...")
result = heightmap.copy()
if operation == 'multiply':
result = heightmap * mask
elif operation == 'add':
result = heightmap + mask
elif operation == 'subtract':
result = heightmap - mask
elif operation == 'replace':
# 只在掩码为1的地方替换
result = np.where(mask > 0.5, mask, heightmap)
else:
print(f"✗ 不支持的操作: {operation}")
return heightmap
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形掩码应用完成,耗时: {processing_time:.3f}")
return result
except Exception as e:
print(f"✗ 地形掩码应用失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def calculate_terrain_statistics(self, heightmap: np.ndarray) -> Dict[str, float]:
"""
计算地形统计信息
Args:
heightmap: 高度图数据
Returns:
统计信息字典
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return {}
print("✓ 开始计算地形统计信息...")
# 计算基本统计信息
stats = {
'min_height': float(np.min(heightmap)),
'max_height': float(np.max(heightmap)),
'mean_height': float(np.mean(heightmap)),
'std_height': float(np.std(heightmap)),
'median_height': float(np.median(heightmap)),
'total_area': float(heightmap.size),
'width': heightmap.shape[1],
'height': heightmap.shape[0]
}
# 计算坡度统计
slope_map = self.analyze_terrain_slope(heightmap)
stats['mean_slope'] = float(np.mean(slope_map))
stats['max_slope'] = float(np.max(slope_map))
stats['std_slope'] = float(np.std(slope_map))
# 计算曲率统计
curvature_map = self.analyze_terrain_curvature(heightmap)
stats['mean_curvature'] = float(np.mean(curvature_map))
stats['max_curvature'] = float(np.max(curvature_map))
stats['std_curvature'] = float(np.std(curvature_map))
# 计算粗糙度统计
rugosity_map = self.analyze_terrain_rugosity(heightmap)
stats['mean_rugosity'] = float(np.mean(rugosity_map))
stats['max_rugosity'] = float(np.max(rugosity_map))
stats['std_rugosity'] = float(np.std(rugosity_map))
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形统计信息计算完成,耗时: {processing_time:.3f}")
return stats
except Exception as e:
print(f"✗ 地形统计信息计算失败: {e}")
import traceback
traceback.print_exc()
return {}
def _update_stats(self, processing_time: float):
"""更新统计信息"""
self.stats['tools_used'] += 1
self.stats['total_processing_time'] += processing_time
if self.stats['tools_used'] > 0:
self.stats['average_processing_time'] = (
self.stats['total_processing_time'] / self.stats['tools_used']
)
def get_stats(self) -> Dict[str, Any]:
"""
获取统计信息
Returns:
统计信息字典
"""
return self.stats.copy()
def reset_stats(self):
"""重置统计信息"""
self.stats = {
'tools_used': 0,
'total_processing_time': 0.0,
'average_processing_time': 0.0,
'cache_hits': 0,
'cache_misses': 0
}
print("✓ 地形工具类统计信息已重置")
def set_tool_parameters(self, params: Dict[str, Any]):
"""
设置工具参数
Args:
params: 参数字典
"""
self.tool_params.update(params)
print(f"✓ 地形工具参数已更新: {self.tool_params}")
def export_terrain_data(self, heightmap: np.ndarray, filename: str,
metadata: Dict[str, Any] = None) -> bool:
"""
导出地形数据
Args:
heightmap: 高度图数据
filename: 文件名
metadata: 元数据
Returns:
是否导出成功
"""
try:
print(f"✓ 开始导出地形数据到: {filename}")
# 创建导出数据
export_data = {
'heightmap': heightmap.tolist(),
'shape': heightmap.shape,
'dtype': str(heightmap.dtype),
'metadata': metadata or {},
'export_time': time.time()
}
# 保存为JSON文件
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
print(f"✓ 地形数据导出完成")
return True
except Exception as e:
print(f"✗ 地形数据导出失败: {e}")
import traceback
traceback.print_exc()
return False
def import_terrain_data(self, filename: str) -> Optional[np.ndarray]:
"""
导入地形数据
Args:
filename: 文件名
Returns:
高度图数据或None
"""
try:
print(f"✓ 开始导入地形数据从: {filename}")
# 读取JSON文件
with open(filename, 'r', encoding='utf-8') as f:
import_data = json.load(f)
# 恢复高度图数据
heightmap_data = import_data['heightmap']
heightmap = np.array(heightmap_data, dtype=np.float32)
print(f"✓ 地形数据导入完成,尺寸: {heightmap.shape}")
return heightmap
except Exception as e:
print(f"✗ 地形数据导入失败: {e}")
import traceback
traceback.print_exc()
return None
def resample_heightmap(self, heightmap: np.ndarray, new_width: int, new_height: int) -> np.ndarray:
"""
重采样高度图
Args:
heightmap: 原始高度图数据
new_width: 新宽度
new_height: 新高度
Returns:
重采样后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
print(f"✓ 开始重采样高度图到 {new_width}x{new_height}...")
old_height, old_width = heightmap.shape
# 创建新的高度图
resampled = np.zeros((new_height, new_width), dtype=np.float32)
# 双线性插值重采样
for y in range(new_height):
for x in range(new_width):
# 计算在原图中的对应位置
old_x = x * (old_width - 1) / (new_width - 1)
old_y = y * (old_height - 1) / (new_height - 1)
# 双线性插值
resampled[y, x] = self._bilinear_interpolate(heightmap, old_x, old_y)
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 高度图重采样完成,耗时: {processing_time:.3f}")
return resampled
except Exception as e:
print(f"✗ 高度图重采样失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def _bilinear_interpolate(self, heightmap: np.ndarray, x: float, y: float) -> float:
"""
双线性插值
Args:
heightmap: 高度图数据
x: X坐标
y: Y坐标
Returns:
插值结果
"""
try:
height, width = heightmap.shape
# 获取整数部分
x0 = int(math.floor(x))
y0 = int(math.floor(y))
x1 = min(x0 + 1, width - 1)
y1 = min(y0 + 1, height - 1)
# 获取小数部分
fx = x - x0
fy = y - y0
# 双线性插值
top = heightmap[y0, x0] * (1 - fx) + heightmap[y0, x1] * fx
bottom = heightmap[y1, x0] * (1 - fx) + heightmap[y1, x1] * fx
result = top * (1 - fy) + bottom * fy
return result
except Exception as e:
print(f"✗ 双线性插值失败: {e}")
return 0.0
def generate_terrain_preview(self, heightmap: np.ndarray, width: int = 256, height: int = 256) -> np.ndarray:
"""
生成地形预览图
Args:
heightmap: 高度图数据
width: 预览图宽度
height: 预览图高度
Returns:
预览图数据 (RGB格式)
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return np.zeros((height, width, 3), dtype=np.uint8)
print("✓ 开始生成地形预览图...")
# 重采样到预览尺寸
preview_heightmap = self.resample_heightmap(heightmap, width, height)
# 创建RGB预览图
preview = np.zeros((height, width, 3), dtype=np.uint8)
# 根据高度生成颜色
for y in range(height):
for x in range(width):
height_value = preview_heightmap[y, x]
# 根据高度选择颜色
if height_value < 0.2:
# 水域 - 蓝色
r, g, b = 0, 0, int(128 + height_value * 127)
elif height_value < 0.3:
# 浅水/海滩 - 浅蓝色
r, g, b = int(height_value * 255), int(height_value * 255), 255
elif height_value < 0.5:
# 平原 - 绿色
r, g, b = 0, int(100 + height_value * 155), 0
elif height_value < 0.7:
# 丘陵 - 棕色
r, g, b = int(139 * height_value), int(69 * height_value), int(19 * height_value)
elif height_value < 0.9:
# 山地 - 灰色
r, g, b = int(100 + height_value * 155), int(100 + height_value * 155), int(100 + height_value * 155)
else:
# 雪山 - 白色
r, g, b = 255, 255, 255
preview[y, x] = [r, g, b]
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形预览图生成完成,耗时: {processing_time:.3f}")
return preview
except Exception as e:
print(f"✗ 地形预览图生成失败: {e}")
import traceback
traceback.print_exc()
return np.zeros((height, width, 3), dtype=np.uint8)
def calculate_terrain_volume(self, heightmap: np.ndarray, cell_size: float = 1.0) -> float:
"""
计算地形体积
Args:
heightmap: 高度图数据
cell_size: 网格单元大小
Returns:
体积值
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return 0.0
print("✓ 开始计算地形体积...")
# 计算总体积(假设每个网格单元是柱体)
volume = np.sum(heightmap) * cell_size * cell_size
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形体积计算完成,耗时: {processing_time:.3f}")
return float(volume)
except Exception as e:
print(f"✗ 地形体积计算失败: {e}")
import traceback
traceback.print_exc()
return 0.0
def calculate_terrain_area(self, heightmap: np.ndarray, cell_size: float = 1.0) -> float:
"""
计算地形表面积
Args:
heightmap: 高度图数据
cell_size: 网格单元大小
Returns:
表面积值
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return 0.0
print("✓ 开始计算地形表面积...")
height, width = heightmap.shape
total_area = 0.0
# 计算每个网格单元的表面积
for y in range(height - 1):
for x in range(width - 1):
# 获取四个角点的高度
h00 = heightmap[y, x]
h01 = heightmap[y, x + 1]
h10 = heightmap[y + 1, x]
h11 = heightmap[y + 1, x + 1]
# 计算两个三角形的面积
# 三角形1: (0,0,h00), (0,1,h01), (1,0,h10)
dx1 = cell_size
dy1 = 0.0
dz1 = h01 - h00
dv1 = np.array([dx1, dy1, dz1])
dx2 = 0.0
dy2 = cell_size
dz2 = h10 - h00
dv2 = np.array([dx2, dy2, dz2])
cross1 = np.cross(dv1, dv2)
area1 = 0.5 * np.linalg.norm(cross1)
# 三角形2: (1,1,h11), (0,1,h01), (1,0,h10)
dx3 = 0.0
dy3 = -cell_size
dz3 = h11 - h01
dv3 = np.array([dx3, dy3, dz3])
dx4 = -cell_size
dy4 = 0.0
dz4 = h11 - h10
dv4 = np.array([dx4, dy4, dz4])
cross2 = np.cross(dv3, dv4)
area2 = 0.5 * np.linalg.norm(cross2)
total_area += area1 + area2
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形表面积计算完成,耗时: {processing_time:.3f}")
return total_area
except Exception as e:
print(f"✗ 地形表面积计算失败: {e}")
import traceback
traceback.print_exc()
return 0.0
def detect_terrain_features(self, heightmap: np.ndarray) -> Dict[str, np.ndarray]:
"""
检测地形特征
Args:
heightmap: 高度图数据
Returns:
特征检测结果字典
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return {}
print("✓ 开始检测地形特征...")
# 检测山峰
peaks = self._detect_peaks(heightmap)
# 检测山谷
valleys = self._detect_valleys(heightmap)
# 检测山脊
ridges = self._detect_ridges(heightmap)
# 检测河道
rivers = self._detect_rivers(heightmap)
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形特征检测完成,耗时: {processing_time:.3f}")
return {
'peaks': peaks,
'valleys': valleys,
'ridges': ridges,
'rivers': rivers
}
except Exception as e:
print(f"✗ 地形特征检测失败: {e}")
import traceback
traceback.print_exc()
return {}
def _detect_peaks(self, heightmap: np.ndarray) -> np.ndarray:
"""检测山峰"""
try:
height, width = heightmap.shape
peaks = np.zeros_like(heightmap, dtype=np.float32)
# 简化的山峰检测
for y in range(1, height - 1):
for x in range(1, width - 1):
center = heightmap[y, x]
is_peak = True
# 检查是否比所有邻居都高
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
if dy == 0 and dx == 0:
continue
if heightmap[y + dy, x + dx] >= center:
is_peak = False
break
if not is_peak:
break
if is_peak and center > 0.7: # 高度阈值
peaks[y, x] = center
return peaks
except Exception as e:
print(f"✗ 山峰检测失败: {e}")
return np.zeros_like(heightmap, dtype=np.float32)
def _detect_valleys(self, heightmap: np.ndarray) -> np.ndarray:
"""检测山谷"""
try:
height, width = heightmap.shape
valleys = np.zeros_like(heightmap, dtype=np.float32)
# 简化的山谷检测
for y in range(1, height - 1):
for x in range(1, width - 1):
center = heightmap[y, x]
is_valley = True
# 检查是否比所有邻居都低
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
if dy == 0 and dx == 0:
continue
if heightmap[y + dy, x + dx] <= center:
is_valley = False
break
if not is_valley:
break
if is_valley and center < 0.3: # 高度阈值
valleys[y, x] = 1.0 - center
return valleys
except Exception as e:
print(f"✗ 山谷检测失败: {e}")
return np.zeros_like(heightmap, dtype=np.float32)
def _detect_ridges(self, heightmap: np.ndarray) -> np.ndarray:
"""检测山脊"""
try:
# 简化的山脊检测
slope_map = self.analyze_terrain_slope(heightmap)
ridge_map = np.zeros_like(heightmap, dtype=np.float32)
# 山脊通常是坡度变化较大的地方
ridge_map = np.where(slope_map > self.tool_params['slope_threshold'], slope_map, 0)
return ridge_map
except Exception as e:
print(f"✗ 山脊检测失败: {e}")
return np.zeros_like(heightmap, dtype=np.float32)
def _detect_rivers(self, heightmap: np.ndarray) -> np.ndarray:
"""检测河道"""
try:
# 简化的河道检测
curvature_map = self.analyze_terrain_curvature(heightmap)
river_map = np.zeros_like(heightmap, dtype=np.float32)
# 河道通常在负曲率区域(凹陷处)
river_map = np.where(curvature_map < -self.tool_params['curvature_threshold'],
np.abs(curvature_map), 0)
return river_map
except Exception as e:
print(f"✗ 河道检测失败: {e}")
return np.zeros_like(heightmap, dtype=np.float32)
def fill_terrain_holes(self, heightmap: np.ndarray, hole_threshold: float = 0.01) -> np.ndarray:
"""
填补地形空洞
Args:
heightmap: 高度图数据
hole_threshold: 空洞阈值
Returns:
处理后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
print("✓ 开始填补地形空洞...")
filled = heightmap.copy()
height, width = filled.shape
# 简化的空洞填补算法
for y in range(1, height - 1):
for x in range(1, width - 1):
center = filled[y, x]
# 如果中心点是空洞
if center < hole_threshold:
# 计算邻居的平均值
neighbors = []
for dy in [-1, 0, 1]:
for dx in [-1, 0, 1]:
if dy == 0 and dx == 0:
continue
neighbor_value = filled[y + dy, x + dx]
if neighbor_value >= hole_threshold: # 只考虑非空洞邻居
neighbors.append(neighbor_value)
if neighbors:
filled[y, x] = np.mean(neighbors)
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形空洞填补完成,耗时: {processing_time:.3f}")
return filled
except Exception as e:
print(f"✗ 地形空洞填补失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def clamp_heightmap(self, heightmap: np.ndarray, min_value: float = 0.0,
max_value: float = 1.0) -> np.ndarray:
"""
限制高度图范围
Args:
heightmap: 高度图数据
min_value: 最小值
max_value: 最大值
Returns:
处理后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
print("✓ 开始限制高度图范围...")
# 限制范围
clamped = np.clip(heightmap, min_value, max_value)
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 高度图范围限制完成,耗时: {processing_time:.3f}")
return clamped
except Exception as e:
print(f"✗ 高度图范围限制失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def add_terrain_noise(self, heightmap: np.ndarray, noise_amount: float = 0.01,
seed: int = None) -> np.ndarray:
"""
添加噪声到地形
Args:
heightmap: 高度图数据
noise_amount: 噪声强度
seed: 随机种子
Returns:
处理后的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return heightmap
print("✓ 开始添加噪声到地形...")
# 设置随机种子
if seed is not None:
np.random.seed(seed)
# 生成噪声
noise = np.random.normal(0, noise_amount, heightmap.shape).astype(np.float32)
# 添加噪声
noisy = heightmap + noise
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 地形噪声添加完成,耗时: {processing_time:.3f}")
return noisy
except Exception as e:
print(f"✗ 地形噪声添加失败: {e}")
import traceback
traceback.print_exc()
return heightmap
def create_terrain_from_function(self, width: int, height: int,
function: callable) -> np.ndarray:
"""
根据函数创建地形
Args:
width: 宽度
height: 高度
function: 生成函数 (x, y) -> height
Returns:
生成的高度图数据
"""
try:
processing_start_time = time.time()
if not self.enabled:
print("✗ 地形工具类未启用")
return np.zeros((height, width), dtype=np.float32)
print("✓ 开始根据函数创建地形...")
# 创建高度图
heightmap = np.zeros((height, width), dtype=np.float32)
# 应用函数
for y in range(height):
for x in range(width):
# 将坐标标准化到[-1, 1]范围
norm_x = (x / (width - 1)) * 2.0 - 1.0
norm_y = (y / (height - 1)) * 2.0 - 1.0
heightmap[y, x] = function(norm_x, norm_y)
# 更新统计信息
processing_time = time.time() - processing_start_time
self._update_stats(processing_time)
print(f"✓ 函数地形创建完成,耗时: {processing_time:.3f}")
return heightmap
except Exception as e:
print(f"✗ 函数地形创建失败: {e}")
import traceback
traceback.print_exc()
return np.zeros((height, width), dtype=np.float32)
def cache_analysis_result(self, key: str, result: Any):
"""
缓存分析结果
Args:
key: 缓存键
result: 分析结果
"""
try:
self.analysis_cache[key] = {
'result': result,
'timestamp': time.time()
}
print(f"✓ 分析结果已缓存: {key}")
except Exception as e:
print(f"✗ 分析结果缓存失败: {e}")
def get_cached_analysis_result(self, key: str) -> Optional[Any]:
"""
获取缓存的分析结果
Args:
key: 缓存键
Returns:
缓存的分析结果或None
"""
try:
if key in self.analysis_cache:
self.stats['cache_hits'] += 1
return self.analysis_cache[key]['result']
else:
self.stats['cache_misses'] += 1
return None
except Exception as e:
print(f"✗ 缓存分析结果获取失败: {e}")
return None
def clear_analysis_cache(self):
"""清空分析缓存"""
try:
self.analysis_cache.clear()
print("✓ 分析缓存已清空")
except Exception as e:
print(f"✗ 分析缓存清空失败: {e}")
def get_cache_stats(self) -> Dict[str, int]:
"""
获取缓存统计信息
Returns:
缓存统计信息
"""
return {
'cache_size': len(self.analysis_cache),
'cache_hits': self.stats['cache_hits'],
'cache_misses': self.stats['cache_misses'],
'hit_rate': self.stats['cache_hits'] / max(1, self.stats['cache_hits'] + self.stats['cache_misses'])
}