EG/plugins/user/matchmaking_system/algorithms/algorithm_manager.py
2025-10-30 15:01:29 +08:00

616 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
算法管理器模块
实现不同的匹配算法(技能匹配、快速匹配等)
"""
import time
import random
from typing import Dict, Any, List, Optional
from collections import defaultdict
class AlgorithmManager:
"""
算法管理器
实现不同的匹配算法(技能匹配、快速匹配等)
"""
def __init__(self, plugin):
"""
初始化算法管理器
Args:
plugin: 匹配系统插件实例
"""
self.plugin = plugin
self.enabled = False
self.initialized = False
# 算法配置
self.algorithm_config = {
"default_algorithm": "skill_based",
"available_algorithms": ["skill_based", "quick", "team_balanced", "custom"],
"skill_based_config": {
"skill_range": 100, # 技能匹配范围
"max_wait_time": 120.0, # 最大等待时间(秒)
"expand_range_over_time": True, # 随时间扩展匹配范围
"range_expansion_rate": 10 # 每秒扩展范围
},
"team_balanced_config": {
"balance_threshold": 0.1, # 平衡阈值
"max_balance_attempts": 5 # 最大平衡尝试次数
},
"quick_match_config": {
"max_queue_time": 30.0, # 最大排队时间
"min_players": 2, # 最少玩家数
"max_players": 8 # 最多玩家数
}
}
# 算法状态
self.algorithm_state = {
"active_algorithm": "skill_based",
"total_matches": 0,
"successful_matches": 0,
"failed_matches": 0,
"average_match_quality": 0.0
}
# 玩家数据存储
self.player_data = {} # 玩家技能、统计数据等
# 算法统计
self.algorithm_stats = {
"algorithms_run": 0,
"matches_created": 0,
"players_matched": 0,
"algorithm_errors": 0
}
# 回调函数
self.algorithm_callbacks = {
"algorithm_selected": [],
"match_quality_calculated": [],
"algorithm_error": []
}
# 时间戳记录
self.last_algorithm_run = 0.0
self.last_stats_reset = 0.0
print("✓ 算法管理器已创建")
def initialize(self) -> bool:
"""
初始化算法管理器
Returns:
是否初始化成功
"""
try:
print("正在初始化算法管理器...")
# 注册默认算法
self._register_default_algorithms()
self.initialized = True
print("✓ 算法管理器初始化完成")
return True
except Exception as e:
print(f"✗ 算法管理器初始化失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
import traceback
traceback.print_exc()
return False
def _register_default_algorithms(self):
"""注册默认算法"""
try:
print("✓ 默认算法已注册")
except Exception as e:
print(f"✗ 默认算法注册失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
def enable(self) -> bool:
"""
启用算法管理器
Returns:
是否启用成功
"""
try:
if not self.initialized:
print("✗ 算法管理器未初始化")
return False
self.enabled = True
print("✓ 算法管理器已启用")
return True
except Exception as e:
print(f"✗ 算法管理器启用失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
import traceback
traceback.print_exc()
return False
def disable(self):
"""禁用算法管理器"""
try:
self.enabled = False
print("✓ 算法管理器已禁用")
except Exception as e:
print(f"✗ 算法管理器禁用失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
import traceback
traceback.print_exc()
def finalize(self):
"""清理算法管理器资源"""
try:
# 禁用算法管理器
if self.enabled:
self.disable()
# 清理回调
self.algorithm_callbacks.clear()
self.initialized = False
print("✓ 算法管理器资源已清理")
except Exception as e:
print(f"✗ 算法管理器资源清理失败: {e}")
import traceback
traceback.print_exc()
def update(self, dt: float):
"""
更新算法管理器状态
Args:
dt: 时间增量(秒)
"""
try:
if not self.enabled:
return
self.last_algorithm_run = time.time()
except Exception as e:
print(f"✗ 算法管理器更新失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
import traceback
traceback.print_exc()
def set_active_algorithm(self, algorithm_name: str) -> bool:
"""
设置活动算法
Args:
algorithm_name: 算法名称
Returns:
是否设置成功
"""
try:
if algorithm_name not in self.algorithm_config["available_algorithms"]:
print(f"✗ 不支持的算法: {algorithm_name}")
return False
self.algorithm_state["active_algorithm"] = algorithm_name
print(f"✓ 活动算法已设置为: {algorithm_name}")
# 触发算法选择回调
self._trigger_algorithm_callback("algorithm_selected", {
"algorithm": algorithm_name,
"timestamp": time.time()
})
return True
except Exception as e:
print(f"✗ 活动算法设置失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
return False
def run_matching_algorithm(self, players: List[Dict[str, Any]], algorithm: str = None) -> Optional[List[List[Dict[str, Any]]]]:
"""
运行匹配算法
Args:
players: 玩家列表
algorithm: 算法名称(可选,默认使用活动算法)
Returns:
匹配结果玩家分组列表或None
"""
try:
if not self.enabled:
return None
# 使用指定算法或活动算法
if algorithm is None:
algorithm = self.algorithm_state["active_algorithm"]
# 验证算法支持
if algorithm not in self.algorithm_config["available_algorithms"]:
print(f"✗ 不支持的算法: {algorithm}")
self.algorithm_stats["algorithm_errors"] += 1
return None
self.algorithm_stats["algorithms_run"] += 1
# 根据算法类型运行相应算法
if algorithm == "skill_based":
result = self._run_skill_based_matching(players)
elif algorithm == "quick":
result = self._run_quick_matching(players)
elif algorithm == "team_balanced":
result = self._run_team_balanced_matching(players)
else:
# 默认使用技能匹配
result = self._run_skill_based_matching(players)
if result:
self.algorithm_state["successful_matches"] += 1
self.algorithm_state["total_matches"] += 1
self.algorithm_stats["matches_created"] += 1
# 计算匹配质量
match_quality = self._calculate_match_quality(result)
self.algorithm_state["average_match_quality"] = (
self.algorithm_state["average_match_quality"] * (self.algorithm_state["successful_matches"] - 1) +
match_quality
) / self.algorithm_state["successful_matches"]
# 触发匹配质量计算回调
self._trigger_algorithm_callback("match_quality_calculated", {
"quality": match_quality,
"algorithm": algorithm,
"timestamp": time.time()
})
else:
self.algorithm_state["failed_matches"] += 1
return result
except Exception as e:
print(f"✗ 匹配算法运行失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
return None
def _run_skill_based_matching(self, players: List[Dict[str, Any]]) -> Optional[List[List[Dict[str, Any]]]]:
"""
运行基于技能的匹配算法
Args:
players: 玩家列表
Returns:
匹配结果或None
"""
try:
if not players:
return None
config = self.algorithm_config["skill_based_config"]
skill_range = config["skill_range"]
# 按技能水平排序玩家
sorted_players = sorted(players, key=lambda p: p.get("skill_level", 0))
# 简化实现:将技能相近的玩家分组
groups = []
current_group = [sorted_players[0]]
for i in range(1, len(sorted_players)):
current_player = sorted_players[i]
last_player = current_group[-1]
# 检查技能差异是否在范围内
skill_diff = abs(current_player.get("skill_level", 0) - last_player.get("skill_level", 0))
if skill_diff <= skill_range and len(current_group) < 8: # 最多8人一组
current_group.append(current_player)
else:
# 创建新组
groups.append(current_group)
current_group = [current_player]
# 添加最后一组
if current_group:
groups.append(current_group)
print(f"✓ 技能匹配完成: 创建了 {len(groups)} 个组")
return groups
except Exception as e:
print(f"✗ 技能匹配算法失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
return None
def _run_quick_matching(self, players: List[Dict[str, Any]]) -> Optional[List[List[Dict[str, Any]]]]:
"""
运行快速匹配算法
Args:
players: 玩家列表
Returns:
匹配结果或None
"""
try:
if not players:
return None
config = self.algorithm_config["quick_match_config"]
min_players = config["min_players"]
max_players = config["max_players"]
# 简化实现:随机分组
groups = []
player_pool = players.copy()
random.shuffle(player_pool)
while player_pool:
group_size = min(random.randint(min_players, max_players), len(player_pool))
group = player_pool[:group_size]
groups.append(group)
player_pool = player_pool[group_size:]
print(f"✓ 快速匹配完成: 创建了 {len(groups)} 个组")
return groups
except Exception as e:
print(f"✗ 快速匹配算法失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
return None
def _run_team_balanced_matching(self, players: List[Dict[str, Any]]) -> Optional[List[List[Dict[str, Any]]]]:
"""
运行团队平衡匹配算法
Args:
players: 玩家列表
Returns:
匹配结果或None
"""
try:
if not players or len(players) < 2:
return None
# 简化实现:创建两个平衡的团队
# 按技能排序
sorted_players = sorted(players, key=lambda p: p.get("skill_level", 0), reverse=True)
team1 = []
team2 = []
team1_skill = 0
team2_skill = 0
# 贪心算法分配玩家到技能平衡的团队
for player in sorted_players:
player_skill = player.get("skill_level", 0)
if team1_skill <= team2_skill:
team1.append(player)
team1_skill += player_skill
else:
team2.append(player)
team2_skill += player_skill
groups = [team1, team2] if team1 and team2 else [sorted_players]
print(f"✓ 团队平衡匹配完成: 创建了 {len(groups)} 个组")
return groups
except Exception as e:
print(f"✗ 团队平衡匹配算法失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
return None
def _calculate_match_quality(self, groups: List[List[Dict[str, Any]]]) -> float:
"""
计算匹配质量
Args:
groups: 玩家分组
Returns:
匹配质量分数0-1
"""
try:
if not groups:
return 0.0
total_quality = 0.0
group_count = 0
for group in groups:
if len(group) < 2:
continue
# 计算组内技能差异
skills = [p.get("skill_level", 0) for p in group]
avg_skill = sum(skills) / len(skills)
variance = sum((s - avg_skill) ** 2 for s in skills) / len(skills)
std_dev = variance ** 0.5
# 转换为质量分数(标准差越小质量越高)
max_expected_std = 100.0 # 假设最大预期标准差
quality = max(0.0, 1.0 - (std_dev / max_expected_std))
total_quality += quality
group_count += 1
return total_quality / group_count if group_count > 0 else 0.0
except Exception as e:
print(f"✗ 匹配质量计算失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
return 0.0
def update_player_skill(self, player_id: str, skill_level: float, stats: Dict[str, Any] = None):
"""
更新玩家技能
Args:
player_id: 玩家ID
skill_level: 技能等级
stats: 玩家统计数据
"""
try:
if player_id not in self.player_data:
self.player_data[player_id] = {
"skill_history": [],
"match_stats": {}
}
player_info = self.player_data[player_id]
player_info["current_skill"] = skill_level
player_info["skill_history"].append({
"skill": skill_level,
"timestamp": time.time()
})
if stats:
player_info["match_stats"].update(stats)
except Exception as e:
print(f"✗ 玩家技能更新失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
def get_player_skill(self, player_id: str) -> Optional[float]:
"""
获取玩家技能
Args:
player_id: 玩家ID
Returns:
玩家技能等级或None
"""
try:
if player_id in self.player_data:
return self.player_data[player_id].get("current_skill")
return None
except Exception as e:
print(f"✗ 玩家技能获取失败: {e}")
self.algorithm_stats["algorithm_errors"] += 1
return None
def get_algorithm_stats(self) -> Dict[str, Any]:
"""
获取算法统计信息
Returns:
算法统计字典
"""
return {
"state": self.algorithm_state.copy(),
"stats": self.algorithm_stats.copy(),
"config": self.algorithm_config.copy(),
"tracked_players": len(self.player_data)
}
def reset_stats(self):
"""重置算法统计信息"""
try:
self.algorithm_stats = {
"algorithms_run": 0,
"matches_created": 0,
"players_matched": 0,
"algorithm_errors": 0
}
self.algorithm_state["total_matches"] = 0
self.algorithm_state["successful_matches"] = 0
self.algorithm_state["failed_matches"] = 0
self.algorithm_state["average_match_quality"] = 0.0
print("✓ 算法统计信息已重置")
except Exception as e:
print(f"✗ 算法统计信息重置失败: {e}")
def set_algorithm_config(self, config: Dict[str, Any]) -> bool:
"""
设置算法配置
Args:
config: 算法配置字典
Returns:
是否设置成功
"""
try:
self.algorithm_config.update(config)
print(f"✓ 算法配置已更新: {self.algorithm_config}")
return True
except Exception as e:
print(f"✗ 算法配置设置失败: {e}")
return False
def get_algorithm_config(self) -> Dict[str, Any]:
"""
获取算法配置
Returns:
算法配置字典
"""
return self.algorithm_config.copy()
def _trigger_algorithm_callback(self, callback_type: str, data: Dict[str, Any]):
"""
触发算法回调
Args:
callback_type: 回调类型
data: 回调数据
"""
try:
if callback_type in self.algorithm_callbacks:
for callback in self.algorithm_callbacks[callback_type]:
try:
callback(data)
except Exception as e:
print(f"✗ 算法回调执行失败: {callback_type} - {e}")
except Exception as e:
print(f"✗ 算法回调触发失败: {e}")
def register_algorithm_callback(self, callback_type: str, callback: callable):
"""
注册算法回调
Args:
callback_type: 回调类型
callback: 回调函数
"""
try:
if callback_type in self.algorithm_callbacks:
self.algorithm_callbacks[callback_type].append(callback)
print(f"✓ 算法回调已注册: {callback_type}")
else:
print(f"✗ 无效的回调类型: {callback_type}")
except Exception as e:
print(f"✗ 算法回调注册失败: {e}")
def unregister_algorithm_callback(self, callback_type: str, callback: callable):
"""
注销算法回调
Args:
callback_type: 回调类型
callback: 回调函数
"""
try:
if callback_type in self.algorithm_callbacks:
if callback in self.algorithm_callbacks[callback_type]:
self.algorithm_callbacks[callback_type].remove(callback)
print(f"✓ 算法回调已注销: {callback_type}")
else:
print(f"✗ 无效的回调类型: {callback_type}")
except Exception as e:
print(f"✗ 算法回调注销失败: {e}")