""" 日志管理器 基于文件存储的操作日志管理系统,支持JSON Lines格式 """ import os import json import uuid import asyncio from datetime import datetime, timedelta from typing import List, Optional, Dict, Any from pathlib import Path import logging from app.models.operation_log import OperationLog, LogFilter, LogType, LogLevel, OperationStatus BASE_DIR = Path(os.environ.get("CADHUB_BASE_DIR", Path(__file__).resolve().parents[2])) logger = logging.getLogger(__name__) class LogManager: """日志管理器""" def __init__(self, log_dir: str = "logs/operation_logs"): self.log_dir = Path(log_dir) if not self.log_dir.is_absolute(): self.log_dir = BASE_DIR / self.log_dir self.log_dir.mkdir(parents=True, exist_ok=True) # 配置参数 self.max_log_days = 30 # 日志保留天数 self.max_file_size = 50 * 1024 * 1024 # 50MB # 内存缓存 self._cache: List[OperationLog] = [] self._cache_max_size = 1000 # 异步写入队列 self._write_queue = asyncio.Queue() self._writer_task = None async def start(self): """启动日志管理器""" if self._writer_task is None: self._writer_task = asyncio.create_task(self._background_writer()) logger.info("日志管理器已启动") async def stop(self): """停止日志管理器""" if self._writer_task: self._writer_task.cancel() try: await self._writer_task except asyncio.CancelledError: pass logger.info("日志管理器已停止") async def log_operation( self, operation: str, details: str, log_type: LogType = LogType.SYSTEM_OPERATION, user_id: Optional[str] = None, client_id: Optional[str] = None, **kwargs ) -> str: """ 记录操作日志 Args: operation: 操作名称 details: 操作详情 log_type: 日志类型 user_id: 用户ID client_id: 客户端ID **kwargs: 其他日志字段 Returns: 日志ID """ log_id = str(uuid.uuid4()) log_entry = OperationLog( id=log_id, log_type=log_type, operation=operation, details=details, user_id=user_id, client_id=client_id, created_at=datetime.now(), **kwargs ) # 添加到写入队列 await self._write_queue.put(log_entry) # 添加到内存缓存 self._add_to_cache(log_entry) return log_id async def log_system_operation( self, operation: str, details: str, status: OperationStatus = OperationStatus.SUCCESS, **kwargs ) -> str: """记录系统操作日志""" # 如果没有指定operation_category,使用默认值"system" if "operation_category" not in kwargs: kwargs["operation_category"] = "system" return await self.log_operation( operation=operation, details=details, log_type=LogType.SYSTEM_OPERATION, status=status, **kwargs ) async def log_user_operation( self, operation: str, details: str, user_id: str, client_id: str, **kwargs ) -> str: """记录用户操作日志""" # 如果没有指定operation_category,使用默认值"user" if "operation_category" not in kwargs: kwargs["operation_category"] = "user" return await self.log_operation( operation=operation, details=details, log_type=LogType.USER_OPERATION, user_id=user_id, client_id=client_id, **kwargs ) async def query_logs(self, filter_params: LogFilter) -> List[OperationLog]: """ 查询日志 Args: filter_params: 过滤条件 Returns: 日志列表 """ # 首先从缓存中查询 cached_logs = self._query_from_cache(filter_params) if len(cached_logs) >= filter_params.limit: return cached_logs[:filter_params.limit] # 从文件中查询 file_logs = await self._query_from_files(filter_params) # 合并和去重 all_logs = self._merge_and_deduplicate(cached_logs, file_logs) # 排序和分页 all_logs.sort(key=lambda x: x.created_at, reverse=True) start_index = filter_params.offset end_index = start_index + filter_params.limit return all_logs[start_index:end_index] async def get_log_by_id(self, log_id: str) -> Optional[OperationLog]: """根据ID获取日志""" # 先从缓存查找 for log in self._cache: if log.id == log_id: return log # 从文件查找 return await self._find_log_in_files(log_id) async def cleanup_old_logs(self): """清理过期日志文件""" cutoff_date = datetime.now() - timedelta(days=self.max_log_days) for log_file in self.log_dir.glob("*.jsonl"): try: # 从文件名提取日期 date_str = log_file.stem.replace("operation_", "") file_date = datetime.strptime(date_str, "%Y-%m-%d") if file_date < cutoff_date: log_file.unlink() logger.info(f"已删除过期日志文件: {log_file}") except (ValueError, OSError) as e: logger.warning(f"处理日志文件 {log_file} 时出错: {e}") def _add_to_cache(self, log_entry: OperationLog): """添加日志到内存缓存""" self._cache.append(log_entry) # 保持缓存大小限制 if len(self._cache) > self._cache_max_size: self._cache = self._cache[-self._cache_max_size:] def _query_from_cache(self, filter_params: LogFilter) -> List[OperationLog]: """从缓存中查询日志""" results = [] for log in self._cache: if self._matches_filter(log, filter_params): results.append(log) return results async def _query_from_files(self, filter_params: LogFilter) -> List[OperationLog]: """从文件中查询日志""" results = [] # 确定要搜索的文件 files_to_search = self._get_relevant_files(filter_params) for file_path in files_to_search: try: logs = await self._read_logs_from_file(file_path) for log in logs: if self._matches_filter(log, filter_params): results.append(log) except Exception as e: logger.error(f"读取日志文件 {file_path} 时出错: {e}") return results def _matches_filter(self, log: OperationLog, filter_params: LogFilter) -> bool: """检查日志是否匹配过滤条件""" if filter_params.log_type and log.log_type != filter_params.log_type: return False if filter_params.operation and filter_params.operation not in log.operation: return False if filter_params.user_id and log.user_id != filter_params.user_id: return False if filter_params.client_id and log.client_id != filter_params.client_id: return False if filter_params.level and log.level != filter_params.level: return False if filter_params.start_time and log.created_at < filter_params.start_time: return False if filter_params.end_time and log.created_at > filter_params.end_time: return False return True def _get_relevant_files(self, filter_params: LogFilter) -> List[Path]: """获取相关的日志文件""" files = [] if filter_params.start_time or filter_params.end_time: # 基于时间范围筛选文件 start_date = filter_params.start_time.date() if filter_params.start_time else None end_date = filter_params.end_time.date() if filter_params.end_time else None for log_file in sorted(self.log_dir.glob("operation_*.jsonl")): try: date_str = log_file.stem.replace("operation_", "") file_date = datetime.strptime(date_str, "%Y-%m-%d").date() if start_date and file_date < start_date: continue if end_date and file_date > end_date: continue files.append(log_file) except ValueError: continue else: # 获取所有文件,按时间倒序 files = sorted(self.log_dir.glob("operation_*.jsonl"), reverse=True) return files async def _read_logs_from_file(self, file_path: Path) -> List[OperationLog]: """从文件读取日志""" logs = [] try: with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: try: log_data = json.loads(line) log = OperationLog(**log_data) logs.append(log) except (json.JSONDecodeError, ValueError) as e: logger.warning(f"解析日志行时出错: {e}") except Exception as e: logger.error(f"读取日志文件 {file_path} 时出错: {e}") return logs def _merge_and_deduplicate(self, list1: List[OperationLog], list2: List[OperationLog]) -> List[OperationLog]: """合并并去重日志列表""" seen_ids = set() merged = [] for log_list in [list1, list2]: for log in log_list: if log.id not in seen_ids: seen_ids.add(log.id) merged.append(log) return merged async def _find_log_in_files(self, log_id: str) -> Optional[OperationLog]: """在文件中查找指定ID的日志""" for log_file in self.log_dir.glob("operation_*.jsonl"): try: logs = await self._read_logs_from_file(log_file) for log in logs: if log.id == log_id: return log except Exception as e: logger.error(f"查找日志时出错: {e}") return None async def _background_writer(self): """后台日志写入任务""" try: while True: log_entry = await self._write_queue.get() await self._write_log_to_file(log_entry) except asyncio.CancelledError: # 处理剩余的日志 while not self._write_queue.empty(): try: log_entry = self._write_queue.get_nowait() await self._write_log_to_file(log_entry) except asyncio.QueueEmpty: break raise async def _write_log_to_file(self, log_entry: OperationLog): """将日志写入文件""" try: # 生成文件名 date_str = log_entry.created_at.strftime("%Y-%m-%d") filename = f"operation_{date_str}.jsonl" file_path = self.log_dir / filename # 检查文件大小 if file_path.exists() and file_path.stat().st_size > self.max_file_size: # 文件过大,创建新文件 timestamp = datetime.now().strftime("%H%M%S") filename = f"operation_{date_str}_{timestamp}.jsonl" file_path = self.log_dir / filename # 写入日志 log_line = log_entry.model_dump_json(indent=None) + "\n" with open(file_path, 'a', encoding='utf-8') as f: f.write(log_line) except Exception as e: logger.error(f"写入日志文件时出错: {e}") # 全局日志管理器实例 log_manager = LogManager()