禁用 gzip 中间件

This commit is contained in:
Tian jianyong 2025-12-17 15:38:42 +08:00
parent 52951871c0
commit b303ae2a44
2 changed files with 30 additions and 70 deletions

View File

@ -1,12 +1,17 @@
from fastapi import FastAPI
from starlette.middleware.gzip import GZipMiddleware
def add_gzip_middleware(app: FastAPI):
"""
添加gzip压缩中间件
添加gzip压缩中间件 - 已禁用
注意: 由于SSE流式响应需要实时传输gzip压缩被禁用
SSE数据流很小压缩意义不大反而会影响实时性
:param app: FastAPI对象
:return:
"""
app.add_middleware(GZipMiddleware, minimum_size=1000, compresslevel=9)
# 暂时禁用GZip中间件以支持SSE流式响应
# 如果需要重新启用,请取消下面的注释
# app.add_middleware(GZipMiddleware, minimum_size=1000, compresslevel=9)
pass

View File

@ -11,78 +11,51 @@ import requests # 同步 HTTP 客户端
# 下面的默认参数可以通过环境变量覆盖,避免把机密写死在代码里。
API_URL = os.environ.get(
"LOCAL_API_URL",
"http://localhost:9099/system/ragflow/converse_with_chat_assistant",
"RAGFLOW_URL",
"http://10.0.0.202:9099/system/ragflow/converse_with_chat_assistant",
)
AUTH_TOKEN = os.environ.get(
"LOCAL_API_TOKEN",
"RAGFLOW_TOKEN",
"Bearer ",
)
DEFAULT_CHAT_ID = os.environ.get(
"LOCAL_CHAT_ID", "db4bb966895b11f08cda0242ac130006"
"RAGFLOW_CHAT_ID", "db4bb966895b11f08cda0242ac130006"
)
DEFAULT_SESSION_ID = os.environ.get(
"LOCAL_SESSION_ID", "38d765e48a3811f0be310242ac130006"
"RAGFLOW_SESSION_ID", "38d765e48a3811f0be310242ac130006"
)
DEFAULT_QUESTION = os.environ.get(
"LOCAL_QUESTION", "你好,请用简洁的语言介绍你自己"
"RAGFLOW_QUESTION", "你好,请用简洁的语言介绍你自己"
)
LOGIN_URL = os.environ.get(
"LOCAL_LOGIN_URL",
"http://localhost:9099/login",
"RAGFLOW_LOGIN_URL",
"http://10.0.0.202:9099/login",
)
def print_log(message: str):
"""打印带时间戳的日志"""
current_time = time.time()
print(f"[CLIENT {current_time:.3f}] {message}")
async def login_get_token() -> str:
"""
获取本地服务token
"""
print_log("开始登录获取token...")
login_start = time.time()
payload = {
"username": "admin",
"password": "admin123"
}
headers = {
"Content-Type": "application/json"
"Authorization": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiMSIsInVzZXJfbmFtZSI6ImFkbWluIiwiZGVwdF9uYW1lIjoiXHU3ODE0XHU1M2QxXHU5MGU4XHU5NWU4Iiwic2Vzc2lvbl9pZCI6IjYwNjdlMzkxLTRiNGMtNGUwYy1hZjM5LTVmNzczZjIzMmE3OSIsImxvZ2luX2luZm8iOnsiaXBhZGRyIjpudWxsLCJsb2dpbkxvY2F0aW9uIjoiXHU2NzJhXHU3N2U1IiwiYnJvd3NlciI6Ik90aGVyIiwib3MiOiJPdGhlciIsImxvZ2luVGltZSI6IjIwMjUtMTItMDIgMTA6NTk6NDAifSwiZXhwIjoxNzY3MjM2MzgwfQ.NM9j0emNz8trxU1DX87PVZZaWWHlFFwF7o0Pxop6B5c"
}
async with aiohttp.ClientSession() as session:
# 使用data参数发送form-data而不是json
async with session.post(
LOGIN_URL, json=payload, headers=headers, timeout=60
LOGIN_URL, data=payload, headers=headers, timeout=60
) as resp:
if resp.status == 200:
response_data = await resp.json()
token = response_data.get("token") or response_data.get("access_token")
login_end = time.time()
print_log(f"登录成功,耗时: {login_end - login_start:.3f}s")
if token:
return token
else:
print_log("登录响应无token返回空字符串")
return ""
else:
print_log(f"登录失败,状态码: {resp.status}")
return ""
response_data = await resp.json() # 将响应转换为JSON
token = response_data.get("token") # 获取token字段
return token
async def stream_chat_async(question: str, chat_id: str, session_id: str) -> str:
"""
异步版aiohttp适合已有事件循环的应用不阻塞主线程
"""
client_start = time.time()
print_log(f"开始SSE流式请求 - 问题: {question}")
print_log(f"chat_id: {chat_id}, session_id: {session_id}")
token = await login_get_token()
token_received = time.time()
print_log(f"Token获取完成耗时: {token_received - client_start:.3f}s")
payload = {
"chatId": chat_id,
@ -95,24 +68,15 @@ async def stream_chat_async(question: str, chat_id: str, session_id: str) -> str
"Accept": "text/event-stream",
}
print_log("发送HTTP请求到服务器...")
request_sent = time.time()
print_log(f"请求耗时: {request_sent - token_received:.3f}s")
answer_parts = []
event_type = "message"
first_chunk_received = False
chunk_count = 0
async with aiohttp.ClientSession() as session:
async with session.post(
API_URL, json=payload, headers=headers, timeout=60
) as resp:
response_received = time.time()
print_log(f"HTTP响应接收完成状态码: {resp.status}")
print_log(f"连接建立耗时: {response_received - request_sent:.3f}s")
print_log(f"Content-Type: {resp.headers.get('content-type')}")
print(
f"# status={resp.status} content-type={resp.headers.get('content-type')}")
resp.raise_for_status()
while True:
@ -126,44 +90,35 @@ async def stream_chat_async(question: str, chat_id: str, session_id: str) -> str
event_type = line.split(":", 1)[1].strip() or "message"
continue
if not line.startswith("data:"):
print(f"[skip] {line}")
continue
data_str = line[len("data:"):].strip()
if not data_str:
continue
current_time = time.time()
if not first_chunk_received:
first_chunk_received = True
first_chunk_time = current_time
print_log(f"🎯 首块数据到达,耗时: {first_chunk_time - response_received:.3f}s")
try:
payload_obj = json.loads(data_str)
except json.JSONDecodeError:
print_log(f"[{event_type}] {data_str}")
print(f"[{event_type}] {data_str}")
event_type = "message"
continue
if event_type == "end" or payload_obj.get("status") == "completed":
print_log(f"🏁 流式响应完成,总耗时: {current_time - client_start:.3f}s")
print_log(f"📊 接收到的chunk数量: {chunk_count}")
print("\n[stream] completed")
break
if payload_obj.get("data") is True:
event_type = "message"
continue
if "answer" in payload_obj:
piece = payload_obj.get("answer", "")
chunk_count += 1
answer_parts.append(piece)
# 流式输出:每个片段都单独打印,并且添加延时让效果更明显
print(piece, end="", flush=True)
time.sleep(0.1) # 每个片段间隔100ms让流式效果更明显
else:
print_log(f"[{event_type}] {payload_obj}")
print(f"[{event_type}] {payload_obj}")
event_type = "message"
print("\n\n[CLIENT] 流式响应完成 - 通过SSE实时接收")
print("\n\n[stream completed] - Answer received in real-time via streaming!")
full_answer = "".join(answer_parts)
return full_answer