修改后端 rag 的片段内容重复问题

This commit is contained in:
Tian jianyong 2025-12-17 11:24:19 +08:00
parent 22a7a058e4
commit 86c7f5d182
8 changed files with 176 additions and 2 deletions

View File

@ -211,7 +211,7 @@ class RAGFlowSettings(BaseSettings):
所以使用 BaseSettings 而不是类常量
"""
RAGFLOW_BASE_URL: str = "http://10.0.0.202:82"
RAGFLOW_API_KEY: str = ""
RAGFLOW_API_KEY: str = "ragflow-hlMjRmNzE2ODNiNTExZjA4ZTNlMDI0Mm"
class SearchSettings:
"""搜索服务配置"""

View File

@ -227,17 +227,50 @@ async def converse_with_chat_assistant(
logger.info('ragflow对话命中缓存: chat=%s', converse_params.chat_id)
return ResponseUtil.success(json.loads(cached))
# Revert to Native RAGFlow Service (OpenAI endpoint failed with Auth error)
result = await RAGFlowService.converse_with_chat_assistant_services(converse_params)
if converse_params.stream:
async def stream_response():
last_answer = ""
first_token_received = False
start_stream_time = time.perf_counter()
try:
async for chunk in result:
if not first_token_received:
first_token_received = True
latency = time.perf_counter() - start_stream_time
logger.info(f"RAGFlow Stream Start Latency: {latency:.3f}s")
# Native RAGFlow returns cumulative text in 'answer' field
# Chunk structure: {'data': {'answer': 'Cumulative Text...'}}
payload = chunk.get('data') if isinstance(chunk, dict) else chunk
if not payload:
continue
body = payload if isinstance(payload, dict) else {'data': payload}
yield format_sse(body)
if isinstance(body, dict) and 'answer' in body:
# Inspect for errors even in stream
if isinstance(chunk, dict) and chunk.get('code') and chunk.get('code') != 0:
logger.error(f"RAGFlow Stream Error: {chunk}")
current_answer = body['answer']
# Calculate Delta
if current_answer.startswith(last_answer):
delta = current_answer[len(last_answer):]
if delta:
body['answer'] = delta # Send only the new part
last_answer = current_answer
yield format_sse(body)
else:
# Context reset (unlikely but safe fallback)
last_answer = current_answer
yield format_sse(body)
else:
yield format_sse(body)
yield format_sse({'status': 'completed'}, event='end')
except Exception as exc:
logger.exception('ragflow流式对话异常: %s', exc)

View File

@ -177,5 +177,19 @@ class RAGFlowService:
client = await get_ragflow_client()
return await client.converse_with_chat_assistant(**(converse_params.model_dump()))
# 与助手聊天 (OpenAI Compatible)
@classmethod
async def converse_with_chat_assistant_services_openai(cls, converse_params: ConverseWithChatAssistantModel):
client = await get_ragflow_client()
# Construct messages list for OpenAI format
messages = [{"role": "user", "content": converse_params.question}]
# Uses defaults for model name as per user indication "server will parse this automatically"
return await client.create_chat_completion(
chat_id=converse_params.chat_id,
model="ragflow",
messages=messages,
stream=converse_params.stream
)

View File

@ -0,0 +1,73 @@
import asyncio
import sys
import os
import json
# Add project path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/../")
from module_admin.service.ragflow_service import RAGFlowService
from module_admin.entity.vo.ragflow_vo import ConverseWithChatAssistantModel
async def test_fix_logic():
print("=" * 60)
print("VERIFICATION: Testing Delta Logic Locally")
print("=" * 60)
# Use the valid key and chat ID
# Chat ID from test_sse.py or environment
chat_id = "db4bb966895b11f08cda0242ac130006"
q = "康达什么时候成立的?"
params = ConverseWithChatAssistantModel(
chat_id=chat_id,
question=q,
stream=True
)
print(f"Query: {q}")
print("[1] Fetching Stream from Native Service (Expect Cumulative)...")
try:
result_stream = await RAGFlowService.converse_with_chat_assistant_services(params)
print("[2] Applying Controller Delta Logic...")
print("-" * 20 + " OUTPUT " + "-" * 20)
last_answer = ""
async for chunk in result_stream:
payload = chunk.get('data') if isinstance(chunk, dict) else chunk
if not payload: continue
body = payload if isinstance(payload, dict) else {'data': payload}
if isinstance(body, dict) and 'answer' in body:
current_answer = body['answer']
# --- THIS IS THE LOGIC IN CONTROLLER ---
if current_answer.startswith(last_answer):
delta = current_answer[len(last_answer):]
if delta:
print(delta, end="", flush=True)
last_answer = current_answer
else:
# Fallback
last_answer = current_answer
print(f"\n[RESET] {current_answer}", end="", flush=True)
# ---------------------------------------
else:
pass
# print(f"[Non-Answer] {body}")
print("\n" + "-" * 60)
print("Verification Complete.")
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_fix_logic())

View File

@ -0,0 +1,54 @@
import asyncio
import sys
import os
# Add project path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/../")
from module_admin.service.ragflow_service import RAGFlowService
from module_admin.entity.vo.ragflow_vo import ConverseWithChatAssistantModel
async def test_rag_internal():
print("=" * 60)
print("Direct RAGFlow Service Test (Native Endpoint)")
print("=" * 60)
chat_id = "db4bb966895b11f08cda0242ac130006"
q = "康达什么时候成立的?"
print(f"Chat ID: {chat_id}")
print(f"Query: {q}")
print("-" * 60)
params = ConverseWithChatAssistantModel(
chat_id=chat_id,
question=q,
stream=True
)
try:
# Call the NATIVE SERVICE method directly
# This bypassed the Controller Delta Logic, so we expect CUMULATIVE text here.
result_stream = await RAGFlowService.converse_with_chat_assistant_services(params)
print("\n[Start Streaming]...")
async for chunk in result_stream:
# Native chunk structure: {'data': {'answer': '...'}}
payload = chunk.get('data') if isinstance(chunk, dict) else chunk
if isinstance(payload, dict):
ans = payload.get('answer', '')
if ans:
print(f"Token: {repr(ans)}")
else:
print(f"Raw Chunk: {chunk}")
print("\n[End Streaming]")
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_rag_internal())