381 lines
13 KiB
Python
381 lines
13 KiB
Python
"""
|
||
日志管理器
|
||
基于文件存储的操作日志管理系统,支持JSON Lines格式
|
||
"""
|
||
import os
|
||
import json
|
||
import uuid
|
||
import asyncio
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Optional, Dict, Any
|
||
from pathlib import Path
|
||
import logging
|
||
|
||
from app.models.operation_log import OperationLog, LogFilter, LogType, LogLevel, OperationStatus
|
||
|
||
|
||
BASE_DIR = Path(os.environ.get("CADHUB_BASE_DIR", Path(__file__).resolve().parents[2]))
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class LogManager:
|
||
"""日志管理器"""
|
||
|
||
def __init__(self, log_dir: str = "logs/operation_logs"):
|
||
self.log_dir = Path(log_dir)
|
||
if not self.log_dir.is_absolute():
|
||
self.log_dir = BASE_DIR / self.log_dir
|
||
self.log_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 配置参数
|
||
self.max_log_days = 30 # 日志保留天数
|
||
self.max_file_size = 50 * 1024 * 1024 # 50MB
|
||
|
||
# 内存缓存
|
||
self._cache: List[OperationLog] = []
|
||
self._cache_max_size = 1000
|
||
|
||
# 异步写入队列
|
||
self._write_queue = asyncio.Queue()
|
||
self._writer_task = None
|
||
|
||
async def start(self):
|
||
"""启动日志管理器"""
|
||
if self._writer_task is None:
|
||
self._writer_task = asyncio.create_task(self._background_writer())
|
||
logger.info("日志管理器已启动")
|
||
|
||
async def stop(self):
|
||
"""停止日志管理器"""
|
||
if self._writer_task:
|
||
self._writer_task.cancel()
|
||
try:
|
||
await self._writer_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
logger.info("日志管理器已停止")
|
||
|
||
async def log_operation(
|
||
self,
|
||
operation: str,
|
||
details: str,
|
||
log_type: LogType = LogType.SYSTEM_OPERATION,
|
||
user_id: Optional[str] = None,
|
||
client_id: Optional[str] = None,
|
||
**kwargs
|
||
) -> str:
|
||
"""
|
||
记录操作日志
|
||
|
||
Args:
|
||
operation: 操作名称
|
||
details: 操作详情
|
||
log_type: 日志类型
|
||
user_id: 用户ID
|
||
client_id: 客户端ID
|
||
**kwargs: 其他日志字段
|
||
|
||
Returns:
|
||
日志ID
|
||
"""
|
||
log_id = str(uuid.uuid4())
|
||
|
||
log_entry = OperationLog(
|
||
id=log_id,
|
||
log_type=log_type,
|
||
operation=operation,
|
||
details=details,
|
||
user_id=user_id,
|
||
client_id=client_id,
|
||
created_at=datetime.now(),
|
||
**kwargs
|
||
)
|
||
|
||
# 添加到写入队列
|
||
await self._write_queue.put(log_entry)
|
||
|
||
# 添加到内存缓存
|
||
self._add_to_cache(log_entry)
|
||
|
||
return log_id
|
||
|
||
async def log_system_operation(
|
||
self,
|
||
operation: str,
|
||
details: str,
|
||
status: OperationStatus = OperationStatus.SUCCESS,
|
||
**kwargs
|
||
) -> str:
|
||
"""记录系统操作日志"""
|
||
# 如果没有指定operation_category,使用默认值"system"
|
||
if "operation_category" not in kwargs:
|
||
kwargs["operation_category"] = "system"
|
||
|
||
return await self.log_operation(
|
||
operation=operation,
|
||
details=details,
|
||
log_type=LogType.SYSTEM_OPERATION,
|
||
status=status,
|
||
**kwargs
|
||
)
|
||
|
||
async def log_user_operation(
|
||
self,
|
||
operation: str,
|
||
details: str,
|
||
user_id: str,
|
||
client_id: str,
|
||
**kwargs
|
||
) -> str:
|
||
"""记录用户操作日志"""
|
||
# 如果没有指定operation_category,使用默认值"user"
|
||
if "operation_category" not in kwargs:
|
||
kwargs["operation_category"] = "user"
|
||
|
||
return await self.log_operation(
|
||
operation=operation,
|
||
details=details,
|
||
log_type=LogType.USER_OPERATION,
|
||
user_id=user_id,
|
||
client_id=client_id,
|
||
**kwargs
|
||
)
|
||
|
||
async def query_logs(self, filter_params: LogFilter) -> List[OperationLog]:
|
||
"""
|
||
查询日志
|
||
|
||
Args:
|
||
filter_params: 过滤条件
|
||
|
||
Returns:
|
||
日志列表
|
||
"""
|
||
# 首先从缓存中查询
|
||
cached_logs = self._query_from_cache(filter_params)
|
||
|
||
if len(cached_logs) >= filter_params.limit:
|
||
return cached_logs[:filter_params.limit]
|
||
|
||
# 从文件中查询
|
||
file_logs = await self._query_from_files(filter_params)
|
||
|
||
# 合并和去重
|
||
all_logs = self._merge_and_deduplicate(cached_logs, file_logs)
|
||
|
||
# 排序和分页
|
||
all_logs.sort(key=lambda x: x.created_at, reverse=True)
|
||
|
||
start_index = filter_params.offset
|
||
end_index = start_index + filter_params.limit
|
||
|
||
return all_logs[start_index:end_index]
|
||
|
||
async def get_log_by_id(self, log_id: str) -> Optional[OperationLog]:
|
||
"""根据ID获取日志"""
|
||
# 先从缓存查找
|
||
for log in self._cache:
|
||
if log.id == log_id:
|
||
return log
|
||
|
||
# 从文件查找
|
||
return await self._find_log_in_files(log_id)
|
||
|
||
async def cleanup_old_logs(self):
|
||
"""清理过期日志文件"""
|
||
cutoff_date = datetime.now() - timedelta(days=self.max_log_days)
|
||
|
||
for log_file in self.log_dir.glob("*.jsonl"):
|
||
try:
|
||
# 从文件名提取日期
|
||
date_str = log_file.stem.replace("operation_", "")
|
||
file_date = datetime.strptime(date_str, "%Y-%m-%d")
|
||
|
||
if file_date < cutoff_date:
|
||
log_file.unlink()
|
||
logger.info(f"已删除过期日志文件: {log_file}")
|
||
|
||
except (ValueError, OSError) as e:
|
||
logger.warning(f"处理日志文件 {log_file} 时出错: {e}")
|
||
|
||
def _add_to_cache(self, log_entry: OperationLog):
|
||
"""添加日志到内存缓存"""
|
||
self._cache.append(log_entry)
|
||
|
||
# 保持缓存大小限制
|
||
if len(self._cache) > self._cache_max_size:
|
||
self._cache = self._cache[-self._cache_max_size:]
|
||
|
||
def _query_from_cache(self, filter_params: LogFilter) -> List[OperationLog]:
|
||
"""从缓存中查询日志"""
|
||
results = []
|
||
|
||
for log in self._cache:
|
||
if self._matches_filter(log, filter_params):
|
||
results.append(log)
|
||
|
||
return results
|
||
|
||
async def _query_from_files(self, filter_params: LogFilter) -> List[OperationLog]:
|
||
"""从文件中查询日志"""
|
||
results = []
|
||
|
||
# 确定要搜索的文件
|
||
files_to_search = self._get_relevant_files(filter_params)
|
||
|
||
for file_path in files_to_search:
|
||
try:
|
||
logs = await self._read_logs_from_file(file_path)
|
||
for log in logs:
|
||
if self._matches_filter(log, filter_params):
|
||
results.append(log)
|
||
|
||
except Exception as e:
|
||
logger.error(f"读取日志文件 {file_path} 时出错: {e}")
|
||
|
||
return results
|
||
|
||
def _matches_filter(self, log: OperationLog, filter_params: LogFilter) -> bool:
|
||
"""检查日志是否匹配过滤条件"""
|
||
if filter_params.log_type and log.log_type != filter_params.log_type:
|
||
return False
|
||
|
||
if filter_params.operation and filter_params.operation not in log.operation:
|
||
return False
|
||
|
||
if filter_params.user_id and log.user_id != filter_params.user_id:
|
||
return False
|
||
|
||
if filter_params.client_id and log.client_id != filter_params.client_id:
|
||
return False
|
||
|
||
if filter_params.level and log.level != filter_params.level:
|
||
return False
|
||
|
||
if filter_params.start_time and log.created_at < filter_params.start_time:
|
||
return False
|
||
|
||
if filter_params.end_time and log.created_at > filter_params.end_time:
|
||
return False
|
||
|
||
return True
|
||
|
||
def _get_relevant_files(self, filter_params: LogFilter) -> List[Path]:
|
||
"""获取相关的日志文件"""
|
||
files = []
|
||
|
||
if filter_params.start_time or filter_params.end_time:
|
||
# 基于时间范围筛选文件
|
||
start_date = filter_params.start_time.date() if filter_params.start_time else None
|
||
end_date = filter_params.end_time.date() if filter_params.end_time else None
|
||
|
||
for log_file in sorted(self.log_dir.glob("operation_*.jsonl")):
|
||
try:
|
||
date_str = log_file.stem.replace("operation_", "")
|
||
file_date = datetime.strptime(date_str, "%Y-%m-%d").date()
|
||
|
||
if start_date and file_date < start_date:
|
||
continue
|
||
if end_date and file_date > end_date:
|
||
continue
|
||
|
||
files.append(log_file)
|
||
|
||
except ValueError:
|
||
continue
|
||
else:
|
||
# 获取所有文件,按时间倒序
|
||
files = sorted(self.log_dir.glob("operation_*.jsonl"), reverse=True)
|
||
|
||
return files
|
||
|
||
async def _read_logs_from_file(self, file_path: Path) -> List[OperationLog]:
|
||
"""从文件读取日志"""
|
||
logs = []
|
||
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line:
|
||
try:
|
||
log_data = json.loads(line)
|
||
log = OperationLog(**log_data)
|
||
logs.append(log)
|
||
except (json.JSONDecodeError, ValueError) as e:
|
||
logger.warning(f"解析日志行时出错: {e}")
|
||
except Exception as e:
|
||
logger.error(f"读取日志文件 {file_path} 时出错: {e}")
|
||
|
||
return logs
|
||
|
||
def _merge_and_deduplicate(self, list1: List[OperationLog], list2: List[OperationLog]) -> List[OperationLog]:
|
||
"""合并并去重日志列表"""
|
||
seen_ids = set()
|
||
merged = []
|
||
|
||
for log_list in [list1, list2]:
|
||
for log in log_list:
|
||
if log.id not in seen_ids:
|
||
seen_ids.add(log.id)
|
||
merged.append(log)
|
||
|
||
return merged
|
||
|
||
async def _find_log_in_files(self, log_id: str) -> Optional[OperationLog]:
|
||
"""在文件中查找指定ID的日志"""
|
||
for log_file in self.log_dir.glob("operation_*.jsonl"):
|
||
try:
|
||
logs = await self._read_logs_from_file(log_file)
|
||
for log in logs:
|
||
if log.id == log_id:
|
||
return log
|
||
except Exception as e:
|
||
logger.error(f"查找日志时出错: {e}")
|
||
|
||
return None
|
||
|
||
async def _background_writer(self):
|
||
"""后台日志写入任务"""
|
||
try:
|
||
while True:
|
||
log_entry = await self._write_queue.get()
|
||
await self._write_log_to_file(log_entry)
|
||
except asyncio.CancelledError:
|
||
# 处理剩余的日志
|
||
while not self._write_queue.empty():
|
||
try:
|
||
log_entry = self._write_queue.get_nowait()
|
||
await self._write_log_to_file(log_entry)
|
||
except asyncio.QueueEmpty:
|
||
break
|
||
raise
|
||
|
||
async def _write_log_to_file(self, log_entry: OperationLog):
|
||
"""将日志写入文件"""
|
||
try:
|
||
# 生成文件名
|
||
date_str = log_entry.created_at.strftime("%Y-%m-%d")
|
||
filename = f"operation_{date_str}.jsonl"
|
||
file_path = self.log_dir / filename
|
||
|
||
# 检查文件大小
|
||
if file_path.exists() and file_path.stat().st_size > self.max_file_size:
|
||
# 文件过大,创建新文件
|
||
timestamp = datetime.now().strftime("%H%M%S")
|
||
filename = f"operation_{date_str}_{timestamp}.jsonl"
|
||
file_path = self.log_dir / filename
|
||
|
||
# 写入日志
|
||
log_line = log_entry.model_dump_json(indent=None) + "\n"
|
||
|
||
with open(file_path, 'a', encoding='utf-8') as f:
|
||
f.write(log_line)
|
||
|
||
except Exception as e:
|
||
logger.error(f"写入日志文件时出错: {e}")
|
||
|
||
|
||
# 全局日志管理器实例
|
||
log_manager = LogManager() |