From cfe4e5ac1a0bb073053643a9132f8c2567c8d3ee Mon Sep 17 00:00:00 2001 From: haotian <2421912570@qq.com> Date: Wed, 18 Jun 2025 09:34:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9--=E4=BF=AE=E6=94=B9=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E6=9C=BA=E5=99=A8=E4=BA=BA=E6=95=B0=E6=8D=AE=E8=84=9A?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/config.py | 12 ++++--- app/core/database.py | 44 +++++++++++++++++++----- app/services/scheduler.py | 71 +++++++++++++++++++++++++++++++++------ change_log.md | 4 +++ 4 files changed, 108 insertions(+), 23 deletions(-) diff --git a/app/core/config.py b/app/core/config.py index 87b7778..007af76 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -13,10 +13,14 @@ class Settings(BaseSettings): # DB_NAME: str = "kangda_test" # 测试数据库 DB_CHARSET: str = "utf8mb4" - DB_POOL_SIZE: int = 10 - DB_MAX_OVERFLOW: int = 10 - DB_POOL_TIMEOUT: int = 30 - DB_POOL_RECYCLE: int = 1800 + + # 优化连接池配置 + DB_POOL_SIZE: int = 5 # 减小连接池大小 + DB_MAX_OVERFLOW: int = 5 # 减小最大溢出连接数 + DB_POOL_TIMEOUT: int = 60 # 增加连接超时时间 + DB_POOL_RECYCLE: int = 300 # 减少连接回收时间 + DB_POOL_PRE_PING: bool = True # 启用连接前ping + DB_ECHO: bool = False # 关闭SQL语句打印 class Config: env_file = ".env" diff --git a/app/core/database.py b/app/core/database.py index d753eda..0b9f22f 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -1,5 +1,8 @@ from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, declarative_base +from sqlalchemy.exc import OperationalError, SQLAlchemyError +import asyncio +import logging from ..kangdaApi.parseConfig import parse_config from .config import settings @@ -13,6 +16,10 @@ from .config import settings # print("sssss", config_kangda) +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + # 创建异步引擎 engine = create_async_engine( f"mysql+aiomysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}", @@ -20,7 +27,8 @@ engine = create_async_engine( max_overflow=settings.DB_MAX_OVERFLOW, pool_timeout=settings.DB_POOL_TIMEOUT, pool_recycle=settings.DB_POOL_RECYCLE, - echo=False + pool_pre_ping=settings.DB_POOL_PRE_PING, + echo=settings.DB_ECHO ) # engine = create_async_engine( @@ -42,12 +50,30 @@ Base = declarative_base() # 获取数据库会话 async def get_db(): - async with async_session() as session: + max_retries = 3 + retry_delay = 5 # 重试延迟(秒) + + for attempt in range(max_retries): try: - yield session - await session.commit() - except Exception: - await session.rollback() - raise - finally: - await session.close() \ No newline at end of file + async with async_session() as session: + try: + yield session + await session.commit() + except Exception as e: + await session.rollback() + raise + finally: + await session.close() + break # 如果成功,跳出重试循环 + + except (OperationalError, SQLAlchemyError) as e: + if attempt < max_retries - 1: # 如果不是最后一次尝试 + logger.warning(f"数据库连接失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}") + await asyncio.sleep(retry_delay) + continue + else: + logger.error(f"数据库连接失败,已达到最大重试次数: {str(e)}") + raise + except Exception as e: + logger.error(f"未知错误: {str(e)}") + raise \ No newline at end of file diff --git a/app/services/scheduler.py b/app/services/scheduler.py index 6290f59..25f68ad 100644 --- a/app/services/scheduler.py +++ b/app/services/scheduler.py @@ -1,38 +1,89 @@ import asyncio from datetime import datetime +import logging +from sqlalchemy.exc import OperationalError, SQLAlchemyError from app.core.database import async_session from app.services.robot_sync_service import robot_sync_service +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + class Scheduler: def __init__(self): self.is_running = False + self.last_sync_time = None + self.retry_count = 0 + self.max_retries = 3 + self.retry_delay = 60 # 重试延迟(秒) + self.sync_interval = 300 # 同步间隔(秒) async def start(self): """启动定时任务""" if self.is_running: + logger.info("定时任务已经在运行中") return self.is_running = True + logger.info("定时任务启动") + while self.is_running: try: + current_time = datetime.now() + if self.last_sync_time: + time_diff = (current_time - self.last_sync_time).total_seconds() + logger.info(f"距离上次同步已过去 {time_diff} 秒") + # 同步机器人数据 async with async_session() as session: - # 同步分组信息 - await robot_sync_service.sync_robot_data(session) - - # 同步机器人任务信息 - await robot_sync_service.sync_robot_task(session) - - # 等待10分钟 - await asyncio.sleep(600) + try: + # 同步分组信息 + logger.info("开始同步机器人数据...") + success = await robot_sync_service.sync_robot_data(session) + if not success: + raise Exception("同步机器人数据失败") + + # 同步机器人任务信息 + logger.info("开始同步机器人任务...") + await robot_sync_service.sync_robot_task(session) + + self.last_sync_time = current_time + self.retry_count = 0 # 重置重试计数 + logger.info("同步完成,等待下一次同步") + + except OperationalError as e: + logger.error(f"数据库连接错误: {str(e)}") + if self.retry_count < self.max_retries: + self.retry_count += 1 + logger.info(f"将在 {self.retry_delay} 秒后进行第 {self.retry_count} 次重试") + await asyncio.sleep(self.retry_delay) + continue + else: + logger.error("达到最大重试次数,等待下一次定时同步") + self.retry_count = 0 # 重置重试计数 + + except SQLAlchemyError as e: + logger.error(f"数据库操作错误: {str(e)}") + await asyncio.sleep(self.retry_delay) + + except Exception as e: + logger.error(f"同步过程发生错误: {str(e)}") + await asyncio.sleep(self.retry_delay) + + # 等待下一次同步 + await asyncio.sleep(self.sync_interval) except Exception as e: - print(f"定时任务执行失败: {str(e)}") - await asyncio.sleep(60) # 发生错误时等待1分钟后重试 + logger.error(f"定时任务执行失败: {str(e)}") + await asyncio.sleep(self.retry_delay) def stop(self): """停止定时任务""" self.is_running = False + logger.info("定时任务已停止") # 创建单例 scheduler = Scheduler() \ No newline at end of file diff --git a/change_log.md b/change_log.md index b839e0f..e13a445 100644 --- a/change_log.md +++ b/change_log.md @@ -55,3 +55,7 @@ # 20250617 - 修改 同步数据库脚本 + +# 20250618 + +- 修改同步机器人数据脚本