修改--修改同步机器人数据脚本

This commit is contained in:
haotian 2025-06-18 09:34:54 +08:00
parent 51da97e509
commit cfe4e5ac1a
4 changed files with 108 additions and 23 deletions

View File

@ -13,10 +13,14 @@ class Settings(BaseSettings):
# DB_NAME: str = "kangda_test" # 测试数据库
DB_CHARSET: str = "utf8mb4"
DB_POOL_SIZE: int = 10
DB_MAX_OVERFLOW: int = 10
DB_POOL_TIMEOUT: int = 30
DB_POOL_RECYCLE: int = 1800
# 优化连接池配置
DB_POOL_SIZE: int = 5 # 减小连接池大小
DB_MAX_OVERFLOW: int = 5 # 减小最大溢出连接数
DB_POOL_TIMEOUT: int = 60 # 增加连接超时时间
DB_POOL_RECYCLE: int = 300 # 减少连接回收时间
DB_POOL_PRE_PING: bool = True # 启用连接前ping
DB_ECHO: bool = False # 关闭SQL语句打印
class Config:
env_file = ".env"

View File

@ -1,5 +1,8 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import OperationalError, SQLAlchemyError
import asyncio
import logging
from ..kangdaApi.parseConfig import parse_config
from .config import settings
@ -13,6 +16,10 @@ from .config import settings
# print("sssss", config_kangda)
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 创建异步引擎
engine = create_async_engine(
f"mysql+aiomysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}",
@ -20,7 +27,8 @@ engine = create_async_engine(
max_overflow=settings.DB_MAX_OVERFLOW,
pool_timeout=settings.DB_POOL_TIMEOUT,
pool_recycle=settings.DB_POOL_RECYCLE,
echo=False
pool_pre_ping=settings.DB_POOL_PRE_PING,
echo=settings.DB_ECHO
)
# engine = create_async_engine(
@ -42,12 +50,30 @@ Base = declarative_base()
# 获取数据库会话
async def get_db():
async with async_session() as session:
max_retries = 3
retry_delay = 5 # 重试延迟(秒)
for attempt in range(max_retries):
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async with async_session() as session:
try:
yield session
await session.commit()
except Exception as e:
await session.rollback()
raise
finally:
await session.close()
break # 如果成功,跳出重试循环
except (OperationalError, SQLAlchemyError) as e:
if attempt < max_retries - 1: # 如果不是最后一次尝试
logger.warning(f"数据库连接失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
await asyncio.sleep(retry_delay)
continue
else:
logger.error(f"数据库连接失败,已达到最大重试次数: {str(e)}")
raise
except Exception as e:
logger.error(f"未知错误: {str(e)}")
raise

View File

@ -1,38 +1,89 @@
import asyncio
from datetime import datetime
import logging
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from app.core.database import async_session
from app.services.robot_sync_service import robot_sync_service
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class Scheduler:
def __init__(self):
self.is_running = False
self.last_sync_time = None
self.retry_count = 0
self.max_retries = 3
self.retry_delay = 60 # 重试延迟(秒)
self.sync_interval = 300 # 同步间隔(秒)
async def start(self):
"""启动定时任务"""
if self.is_running:
logger.info("定时任务已经在运行中")
return
self.is_running = True
logger.info("定时任务启动")
while self.is_running:
try:
current_time = datetime.now()
if self.last_sync_time:
time_diff = (current_time - self.last_sync_time).total_seconds()
logger.info(f"距离上次同步已过去 {time_diff}")
# 同步机器人数据
async with async_session() as session:
# 同步分组信息
await robot_sync_service.sync_robot_data(session)
# 同步机器人任务信息
await robot_sync_service.sync_robot_task(session)
# 等待10分钟
await asyncio.sleep(600)
try:
# 同步分组信息
logger.info("开始同步机器人数据...")
success = await robot_sync_service.sync_robot_data(session)
if not success:
raise Exception("同步机器人数据失败")
# 同步机器人任务信息
logger.info("开始同步机器人任务...")
await robot_sync_service.sync_robot_task(session)
self.last_sync_time = current_time
self.retry_count = 0 # 重置重试计数
logger.info("同步完成,等待下一次同步")
except OperationalError as e:
logger.error(f"数据库连接错误: {str(e)}")
if self.retry_count < self.max_retries:
self.retry_count += 1
logger.info(f"将在 {self.retry_delay} 秒后进行第 {self.retry_count} 次重试")
await asyncio.sleep(self.retry_delay)
continue
else:
logger.error("达到最大重试次数,等待下一次定时同步")
self.retry_count = 0 # 重置重试计数
except SQLAlchemyError as e:
logger.error(f"数据库操作错误: {str(e)}")
await asyncio.sleep(self.retry_delay)
except Exception as e:
logger.error(f"同步过程发生错误: {str(e)}")
await asyncio.sleep(self.retry_delay)
# 等待下一次同步
await asyncio.sleep(self.sync_interval)
except Exception as e:
print(f"定时任务执行失败: {str(e)}")
await asyncio.sleep(60) # 发生错误时等待1分钟后重试
logger.error(f"定时任务执行失败: {str(e)}")
await asyncio.sleep(self.retry_delay)
def stop(self):
"""停止定时任务"""
self.is_running = False
logger.info("定时任务已停止")
# 创建单例
scheduler = Scheduler()

View File

@ -55,3 +55,7 @@
# 20250617
- 修改 同步数据库脚本
# 20250618
- 修改同步机器人数据脚本