diff --git a/ruoyi-fastapi-backend/middlewares/gzip_middleware.py b/ruoyi-fastapi-backend/middlewares/gzip_middleware.py index eb371ce..4a70a75 100644 --- a/ruoyi-fastapi-backend/middlewares/gzip_middleware.py +++ b/ruoyi-fastapi-backend/middlewares/gzip_middleware.py @@ -1,12 +1,17 @@ from fastapi import FastAPI -from starlette.middleware.gzip import GZipMiddleware def add_gzip_middleware(app: FastAPI): """ - 添加gzip压缩中间件 - + 添加gzip压缩中间件 - 已禁用 + + 注意: 由于SSE流式响应需要实时传输,gzip压缩被禁用 + SSE数据流很小,压缩意义不大,反而会影响实时性 + :param app: FastAPI对象 :return: """ - app.add_middleware(GZipMiddleware, minimum_size=1000, compresslevel=9) + # 暂时禁用GZip中间件以支持SSE流式响应 + # 如果需要重新启用,请取消下面的注释 + # app.add_middleware(GZipMiddleware, minimum_size=1000, compresslevel=9) + pass \ No newline at end of file diff --git a/ruoyi-fastapi-backend/test/test_sse.py b/ruoyi-fastapi-backend/test/test_sse.py index 1e4ea6f..4d7b142 100644 --- a/ruoyi-fastapi-backend/test/test_sse.py +++ b/ruoyi-fastapi-backend/test/test_sse.py @@ -11,78 +11,51 @@ import requests # 同步 HTTP 客户端 # 下面的默认参数可以通过环境变量覆盖,避免把机密写死在代码里。 API_URL = os.environ.get( - "LOCAL_API_URL", - "http://localhost:9099/system/ragflow/converse_with_chat_assistant", + "RAGFLOW_URL", + "http://10.0.0.202:9099/system/ragflow/converse_with_chat_assistant", ) AUTH_TOKEN = os.environ.get( - "LOCAL_API_TOKEN", + "RAGFLOW_TOKEN", "Bearer ", ) DEFAULT_CHAT_ID = os.environ.get( - "LOCAL_CHAT_ID", "db4bb966895b11f08cda0242ac130006" + "RAGFLOW_CHAT_ID", "db4bb966895b11f08cda0242ac130006" ) DEFAULT_SESSION_ID = os.environ.get( - "LOCAL_SESSION_ID", "38d765e48a3811f0be310242ac130006" + "RAGFLOW_SESSION_ID", "38d765e48a3811f0be310242ac130006" ) DEFAULT_QUESTION = os.environ.get( - "LOCAL_QUESTION", "你好,请用简洁的语言介绍你自己" + "RAGFLOW_QUESTION", "你好,请用简洁的语言介绍你自己" ) LOGIN_URL = os.environ.get( - "LOCAL_LOGIN_URL", - "http://localhost:9099/login", + "RAGFLOW_LOGIN_URL", + "http://10.0.0.202:9099/login", ) -def print_log(message: str): - """打印带时间戳的日志""" - current_time = time.time() - print(f"[CLIENT {current_time:.3f}] {message}") - - async def login_get_token() -> str: - """ - 获取本地服务token - """ - print_log("开始登录获取token...") - login_start = time.time() - payload = { "username": "admin", "password": "admin123" } headers = { - "Content-Type": "application/json" + "Authorization": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiMSIsInVzZXJfbmFtZSI6ImFkbWluIiwiZGVwdF9uYW1lIjoiXHU3ODE0XHU1M2QxXHU5MGU4XHU5NWU4Iiwic2Vzc2lvbl9pZCI6IjYwNjdlMzkxLTRiNGMtNGUwYy1hZjM5LTVmNzczZjIzMmE3OSIsImxvZ2luX2luZm8iOnsiaXBhZGRyIjpudWxsLCJsb2dpbkxvY2F0aW9uIjoiXHU2NzJhXHU3N2U1IiwiYnJvd3NlciI6Ik90aGVyIiwib3MiOiJPdGhlciIsImxvZ2luVGltZSI6IjIwMjUtMTItMDIgMTA6NTk6NDAifSwiZXhwIjoxNzY3MjM2MzgwfQ.NM9j0emNz8trxU1DX87PVZZaWWHlFFwF7o0Pxop6B5c" } async with aiohttp.ClientSession() as session: + # 使用data参数发送form-data而不是json async with session.post( - LOGIN_URL, json=payload, headers=headers, timeout=60 + LOGIN_URL, data=payload, headers=headers, timeout=60 ) as resp: - if resp.status == 200: - response_data = await resp.json() - token = response_data.get("token") or response_data.get("access_token") - login_end = time.time() - print_log(f"登录成功,耗时: {login_end - login_start:.3f}s") - if token: - return token - else: - print_log("登录响应无token,返回空字符串") - return "" - else: - print_log(f"登录失败,状态码: {resp.status}") - return "" + response_data = await resp.json() # 将响应转换为JSON + token = response_data.get("token") # 获取token字段 + return token async def stream_chat_async(question: str, chat_id: str, session_id: str) -> str: """ 异步版(aiohttp),适合已有事件循环的应用,不阻塞主线程。 """ - client_start = time.time() - print_log(f"开始SSE流式请求 - 问题: {question}") - print_log(f"chat_id: {chat_id}, session_id: {session_id}") - token = await login_get_token() - token_received = time.time() - print_log(f"Token获取完成,耗时: {token_received - client_start:.3f}s") payload = { "chatId": chat_id, @@ -95,24 +68,15 @@ async def stream_chat_async(question: str, chat_id: str, session_id: str) -> str "Accept": "text/event-stream", } - print_log("发送HTTP请求到服务器...") - request_sent = time.time() - print_log(f"请求耗时: {request_sent - token_received:.3f}s") - answer_parts = [] event_type = "message" - first_chunk_received = False - chunk_count = 0 async with aiohttp.ClientSession() as session: async with session.post( API_URL, json=payload, headers=headers, timeout=60 ) as resp: - response_received = time.time() - print_log(f"HTTP响应接收完成,状态码: {resp.status}") - print_log(f"连接建立耗时: {response_received - request_sent:.3f}s") - print_log(f"Content-Type: {resp.headers.get('content-type')}") - + print( + f"# status={resp.status} content-type={resp.headers.get('content-type')}") resp.raise_for_status() while True: @@ -126,44 +90,35 @@ async def stream_chat_async(question: str, chat_id: str, session_id: str) -> str event_type = line.split(":", 1)[1].strip() or "message" continue if not line.startswith("data:"): + print(f"[skip] {line}") continue data_str = line[len("data:"):].strip() if not data_str: continue - - current_time = time.time() - - if not first_chunk_received: - first_chunk_received = True - first_chunk_time = current_time - print_log(f"🎯 首块数据到达,耗时: {first_chunk_time - response_received:.3f}s") - try: payload_obj = json.loads(data_str) except json.JSONDecodeError: - print_log(f"[{event_type}] {data_str}") + print(f"[{event_type}] {data_str}") event_type = "message" continue if event_type == "end" or payload_obj.get("status") == "completed": - print_log(f"🏁 流式响应完成,总耗时: {current_time - client_start:.3f}s") - print_log(f"📊 接收到的chunk数量: {chunk_count}") + print("\n[stream] completed") break if payload_obj.get("data") is True: event_type = "message" continue if "answer" in payload_obj: piece = payload_obj.get("answer", "") - chunk_count += 1 answer_parts.append(piece) # 流式输出:每个片段都单独打印,并且添加延时让效果更明显 print(piece, end="", flush=True) time.sleep(0.1) # 每个片段间隔100ms,让流式效果更明显 else: - print_log(f"[{event_type}] {payload_obj}") + print(f"[{event_type}] {payload_obj}") event_type = "message" - print("\n\n[CLIENT] 流式响应完成 - 通过SSE实时接收!") + print("\n\n[stream completed] - Answer received in real-time via streaming!") full_answer = "".join(answer_parts) return full_answer