python_websocket/webSocketClient.py

50 lines
1.9 KiB
Python

import websockets
import asyncio
from aioconsole import ainput # 使用异步输入库
class WebSocketClient:
def __init__(self, url):
self.url = url
async def run(self):
try:
async with websockets.connect(self.url) as websocket:
print("Connected to server.")
sender = asyncio.create_task(self.send_messages(websocket))
receiver = asyncio.create_task(self.receive_messages(websocket))
await asyncio.gather(sender, receiver)
except Exception as e:
print(f"Error: {e}")
async def send_messages(self, websocket):
"""使用异步输入处理用户输入"""
try:
while True:
# 使用aioconsole的异步输入避免阻塞
message = await ainput("Enter message (or 'exit' to quit): ")
if message.lower() == "exit":
await websocket.close()
break
await websocket.send(message)
except asyncio.CancelledError:
pass # 任务被取消时正常退出
except Exception as e:
print(f"Send error: {e}")
async def receive_messages(self, websocket):
"""优化消息显示格式"""
try:
while True:
message = await websocket.recv()
# 使用ANSI转义码清除当前行并打印服务器消息
print(f"\r\033[K[Server] {message}", flush=True)
# 重新显示输入提示
print("Enter message (or 'exit' to quit): ", end="", flush=True)
except websockets.exceptions.ConnectionClosed:
print("\nConnection closed.")
except asyncio.CancelledError:
pass # 任务被取消时正常退出
if __name__ == "__main__":
client = WebSocketClient("ws://10.0.0.202:8788")
asyncio.run(client.run())