158 lines
6.2 KiB
Python
158 lines
6.2 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
from datetime import datetime
|
|
import websockets
|
|
import aiofiles
|
|
from app.kangdaApi.parseConfig import parse_config
|
|
from app.services.event_sync_service import run_sync_event
|
|
from app.services.event_sync_service import EventSyncService
|
|
|
|
class WebSocketClient:
|
|
def __init__(self):
|
|
self.config = parse_config("app/config/config.yaml")
|
|
self.ws_config = self.config.get("ws", {})
|
|
self.message_dir = "logs/websocket_messages"
|
|
self.reconnect_delay = 5 # 重连延迟(秒)
|
|
self.is_connected = False
|
|
self.file_lock = asyncio.Lock() # 文件锁
|
|
self.event_sync_service = EventSyncService()
|
|
os.makedirs(self.message_dir, exist_ok=True)
|
|
|
|
|
|
def get_ws_url(self) -> str:
|
|
"""生成WebSocket连接URL"""
|
|
timestamp = int(datetime.now().timestamp() * 1000)
|
|
user_id = f"{self.ws_config.get('account')}_{timestamp}_{self.ws_config.get('tenantInfoId')}"
|
|
return f"wss://{self.ws_config.get('url')}{user_id}"
|
|
|
|
async def save_message(self, message: str):
|
|
"""保存接收到的消息到文件"""
|
|
# 获取当前日期作为文件名
|
|
current_date = datetime.now().strftime("%Y%m%d")
|
|
filename = f"{self.message_dir}/messages_{current_date}.json"
|
|
|
|
# 准备要保存的消息数据
|
|
message_data = {
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"content": message
|
|
}
|
|
|
|
# 使用文件锁确保并发安全
|
|
async with self.file_lock:
|
|
try:
|
|
# 检查文件是否存在
|
|
if os.path.exists(filename):
|
|
# 如果文件存在,读取现有内容
|
|
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
|
|
content = await f.read()
|
|
try:
|
|
messages = json.loads(content)
|
|
except json.JSONDecodeError:
|
|
messages = []
|
|
else:
|
|
messages = []
|
|
|
|
# 添加新消息
|
|
messages.append(message_data)
|
|
|
|
# 保存更新后的内容, 不用保存消息
|
|
# async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
|
|
# await f.write(json.dumps(messages, ensure_ascii=False, indent=2))
|
|
|
|
print(f"消息已保存到文件: {filename}")
|
|
|
|
except Exception as e:
|
|
print(f"保存消息时出错: {str(e)}")
|
|
|
|
async def process_message(self, message: str):
|
|
"""处理接收到的消息"""
|
|
try:
|
|
# 保存消息
|
|
await self.save_message(message)
|
|
|
|
# 尝试解析JSON消息
|
|
message_dict = json.loads(message)
|
|
|
|
# 检查是否需要触发事件同步
|
|
if self._should_trigger_sync(message_dict):
|
|
print("触发事件同步...")
|
|
await self.event_sync_service.sync_event(message_dict.get("eventId"))
|
|
|
|
# 机器人状态消息
|
|
elif self._process_robot_status(message_dict):
|
|
print("处理机器人状态消息...")
|
|
|
|
await self.event_sync_service.sync_robot_status(message_dict)
|
|
|
|
# await run_sync_event(t)
|
|
|
|
except json.JSONDecodeError:
|
|
print("收到非JSON格式消息")
|
|
except Exception as e:
|
|
print(f"处理消息时出错: {str(e)}")
|
|
|
|
def _process_robot_status(self, message: dict):
|
|
"""处理机器人状态消息"""
|
|
try:
|
|
# 以onlineStatus判断是否是机器人消息
|
|
return message.get("onlineStatus", None) is not None
|
|
except Exception as e:
|
|
print(f"处理机器人状态消息时出错: {str(e)}")
|
|
return False
|
|
|
|
def _should_trigger_sync(self, message: dict) -> bool:
|
|
"""判断是否需要触发事件同步"""
|
|
try:
|
|
# 示例:当消息类型为"event_update"时触发同步
|
|
# 根据实际业务需求修改判断条件
|
|
return message.get("eventId", None) is not None
|
|
# return message.("type") == "event_update"
|
|
except Exception as e:
|
|
print(f"检查触发条件时出错: {str(e)}")
|
|
return False
|
|
|
|
async def connect_and_listen(self):
|
|
"""连接到WebSocket服务器并监听消息"""
|
|
while True:
|
|
try:
|
|
url = self.get_ws_url()
|
|
print(f"正在连接到WebSocket服务器: {url}")
|
|
|
|
async with websockets.connect(url) as websocket:
|
|
self.is_connected = True
|
|
print("WebSocket连接已建立")
|
|
|
|
while True:
|
|
try:
|
|
# 接收消息
|
|
message = await websocket.recv()
|
|
print(f"{datetime.now()}收到消息: {message[:100]}...") # 只打印前100个字符
|
|
|
|
# 处理消息
|
|
await self.process_message(message)
|
|
|
|
except websockets.exceptions.ConnectionClosed:
|
|
print("WebSocket连接已关闭")
|
|
break
|
|
except Exception as e:
|
|
print(f"处理消息时出错: {str(e)}")
|
|
|
|
except Exception as e:
|
|
self.is_connected = False
|
|
print(f"WebSocket连接出错: {str(e)}")
|
|
print(f"将在 {self.reconnect_delay} 秒后重试...")
|
|
await asyncio.sleep(self.reconnect_delay)
|
|
|
|
async def start(self):
|
|
"""启动WebSocket客户端"""
|
|
print("启动WebSocket客户端...")
|
|
await self.connect_and_listen()
|
|
|
|
async def run_websocket_client():
|
|
"""运行WebSocket客户端"""
|
|
client = WebSocketClient()
|
|
await client.start()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_websocket_client()) |