kangda/app/services/websocket_service.py

159 lines
6.3 KiB
Python

import asyncio
import json
import os
from datetime import datetime
import websockets
import aiofiles
from app.kangdaApi.parseConfig import parse_config
from app.services.event_sync_service import run_sync_event
from app.services.event_sync_service import EventSyncService
class WebSocketClient:
def __init__(self):
self.config = parse_config("app/config/config.yaml")
self.ws_config = self.config.get("ws", {})
self.message_dir = "logs/websocket_messages"
self.reconnect_delay = 5 # 重连延迟(秒)
self.is_connected = False
self.file_lock = asyncio.Lock() # 文件锁
self.event_sync_service = EventSyncService()
os.makedirs(self.message_dir, exist_ok=True)
def get_ws_url(self) -> str:
"""生成WebSocket连接URL"""
timestamp = int(datetime.now().timestamp() * 1000)
user_id = f"{self.ws_config.get('account')}_{timestamp}_{self.ws_config.get('tenantInfoId')}"
return f"wss://{self.ws_config.get('url')}{user_id}"
async def save_message(self, message: str):
"""保存接收到的消息到文件"""
# 获取当前日期作为文件名
current_date = datetime.now().strftime("%Y%m%d")
filename = f"{self.message_dir}/messages_{current_date}.json"
# 准备要保存的消息数据
message_data = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"content": message
}
# 使用文件锁确保并发安全
async with self.file_lock:
try:
# 检查文件是否存在
if os.path.exists(filename):
# 如果文件存在,读取现有内容
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
content = await f.read()
try:
messages = json.loads(content)
except json.JSONDecodeError:
messages = []
else:
messages = []
# 添加新消息
messages.append(message_data)
# 保存更新后的内容, 不用保存消息
# async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
# await f.write(json.dumps(messages, ensure_ascii=False, indent=2))
# print(f"消息已保存到文件: {filename}")
except Exception as e:
print(f"保存消息时出错: {str(e)}")
async def process_message(self, message: str):
"""处理接收到的消息"""
try:
# 保存消息
await self.save_message(message)
# 尝试解析JSON消息
message_dict = json.loads(message)
# 检查是否需要触发事件同步
if self._should_trigger_sync(message_dict):
print("触发事件同步...")
print(f"{datetime.now()}收到消息: {message}") # 只打印前100个字符
await self.event_sync_service.sync_event(message_dict.get("eventId"))
# 机器人状态消息
elif self._process_robot_status(message_dict):
# print("处理机器人状态消息...")
# print(f"{datetime.now()}收到消息: {message}") # 只打印前100个字符
await self.event_sync_service.sync_robot_status(message_dict)
# await run_sync_event(t)
except json.JSONDecodeError:
print("收到非JSON格式消息")
except Exception as e:
print(f"处理消息时出错: {str(e)}")
def _process_robot_status(self, message: dict):
"""处理机器人状态消息"""
try:
# 以onlineStatus判断是否是机器人消息
return message.get("onlineStatus", None) is not None
except Exception as e:
print(f"处理机器人状态消息时出错: {str(e)}")
return False
def _should_trigger_sync(self, message: dict) -> bool:
"""判断是否需要触发事件同步"""
try:
# 示例:当消息类型为"event_update"时触发同步
# 根据实际业务需求修改判断条件
return message.get("eventId", None) is not None
# return message.("type") == "event_update"
except Exception as e:
print(f"检查触发条件时出错: {str(e)}")
return False
async def connect_and_listen(self):
"""连接到WebSocket服务器并监听消息"""
while True:
try:
url = self.get_ws_url()
print(f"正在连接到WebSocket服务器: {url}")
async with websockets.connect(url) as websocket:
self.is_connected = True
print("WebSocket连接已建立")
while True:
try:
# 接收消息
message = await websocket.recv()
# 处理消息
await self.process_message(message)
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接已关闭")
break
except Exception as e:
print(f"处理消息时出错: {str(e)}")
except Exception as e:
self.is_connected = False
print(f"WebSocket连接出错: {str(e)}")
print(f"将在 {self.reconnect_delay} 秒后重试...")
await asyncio.sleep(self.reconnect_delay)
async def start(self):
"""启动WebSocket客户端"""
print("启动WebSocket客户端...")
await self.connect_and_listen()
async def run_websocket_client():
"""运行WebSocket客户端"""
client = WebSocketClient()
await client.start()
if __name__ == "__main__":
asyncio.run(run_websocket_client())