diff --git a/ruoyi-fastapi-backend/config/static_qa.json b/ruoyi-fastapi-backend/config/static_qa.json new file mode 100644 index 0000000..c1e9782 --- /dev/null +++ b/ruoyi-fastapi-backend/config/static_qa.json @@ -0,0 +1,127 @@ +{ + "qa_pairs": [ + { + "question": "你好!你叫什么名字呀?", + "answer": "您好呀!我是厂区专属接待机器人达达,很高兴为您服务~如果您需要了解厂区信息、指引路线或咨询参观流程,都可以跟我说哦!", + "category": "基础问候与身份", + "priority": 10, + "sub_questions": [ + "你叫什么名字", + "你是谁", + "介绍一下你自己" + ], + "variations": [ + "你好达达", + "达达你好" + ] + }, + { + "question": "达达,你能帮我做什么呀?", + "answer": "我能为您提供超多实用帮助呢!比如介绍厂区的发展历史、各功能区域位置指引、参观预约流程讲解~", + "category": "基础问候与身份", + "priority": 10, + "sub_questions": [ + "你能做什么", + "达达有什么功能", + "你能帮我什么" + ], + "variations": [] + }, + { + "question": "达达,咱们厂区什么时候成立的呀?", + "answer": "公司成立于1988年,深交所主板上市公司。公司管理总部位于浦东新区,毗邻张江科学城,在上海奉贤、北京、天津、西安、深圳、成都、唐山、福建邵武等地设有子公司、生产基地及分支机构。", + "category": "厂区信息", + "priority": 9, + "sub_questions": [ + "厂区什么时候成立", + "公司什么时候成立", + "康达新材什么时候成立" + ], + "variations": [] + }, + { + "question": "达达,咱们厂区主要做什么业务呢?", + "answer": "公司业务范围覆盖装备制造、新能源、轨道交通、航空航天、电子信息、半导体、国防军工以及低碳环保等新兴产业等。公司一直以来以技术创新为第一核心竞争力,拥有多项领先、具有竞争力的胶粘剂、显示材料、电磁兼容设备、电源模块技术,其中多个系列的产品性能已经达到或已经超过国内、国际同类产品的水平,为客户提供系统化解决方案。公司未来将大力发展功能性高分子新材料,以胶粘剂为主,特种树脂为支撑,结合自身资源与优势向电子信息材料、高性能复合材料等方向纵深发展转型,完善军工领域的战略布局,打造 \"新材料+军工电子科技\" 上市公司平台,争做细分领域龙头,争创隐形冠军,通过多个外部基地的建设,公司将逐步打造成为一个高端化、信息化、一体化的高分子化工新材料与军工电子产业集团。", + "category": "厂区信息", + "priority": 9, + "sub_questions": [ + "公司主要做什么业务", + "康达新材做什么", + "厂区主营业务是什么" + ], + "variations": [] + }, + { + "question": "达达,咱们厂区获得过什么荣誉吗?", + "answer": "当然有啦!咱们厂区先后获得过 \"2020上海制造业企业100强\"\"高新技术企业\"\"国家认定博士后科研工作站\" 等荣誉,这些荣誉是对咱们产品质量、技术实力和管理水平的肯定,也激励着我们不断进步~", + "category": "厂区信息", + "priority": 9, + "sub_questions": [ + "厂区获得过什么荣誉", + "公司有什么荣誉", + "康达新材获得过什么奖项" + ], + "variations": [] + }, + { + "question": "达达,公司愿景,康达愿景是什么?", + "answer": "未来3-5年,坚守以流动性合理充裕为底线,康达将坚定不移地践行\"新材料+军工科技\"双轮驱动战略,着力优化完善顶层设计,做强做大。新材料板块扩充体量、提升规模、弘扬民族品牌、传承行业精髓;军工科技板块加强产业协同、科技引领、赋能创新。康达将回顾过去,放眼未来,充分发挥混合所有制优势,国有与民营体制有机融合,高度市场化,快速提升企业整体规模和市场场价值康达将有序推进全国范围内研发与生产基地布局,收购兼并与园区建设同步互动、有效补充,\"一带一路\"走出去。沿着国际示化、互联网化、证券化方向迈进,打造多项细分领域隐形冠军。\"不忘初心、牢记使命\"、\"传承融合、创新超越\",弘扬\"专业创造价值\"理念,崇尚\"百年康达\"愿景。", + "category": "厂区信息", + "priority": 9, + "sub_questions": [ + "公司愿景是什么", + "康达愿景是什么", + "康达新材的愿景" + ], + "variations": [] + }, + { + "question": "我想参观厂区,需要提前预约吗?", + "answer": "参观厂区建议提前 1-2 个工作日预约哦,这样方便我们为您安排专属参观路线和讲解人员~", + "category": "参观与指引", + "priority": 9, + "sub_questions": [ + "参观厂区需要预约吗", + "参观需要提前预约吗", + "厂区参观要预约吗" + ], + "variations": [] + }, + { + "question": "我想参观厂区,怎么预约呀?", + "answer": "预约方式如下:关注 \"康达新材\" 官方微信公众号,在 \"预约\" 栏目填写信息提交即可,提交后 1 个工作日内会有工作人员与您联系确认~", + "category": "参观与指引", + "priority": 9, + "sub_questions": [ + "怎么预约参观厂区", + "厂区参观怎么预约", + "预约参观的流程" + ], + "variations": [] + }, + { + "question": "参观生产区需要注意什么呀?", + "answer": "参观生产区有几个小注意事项要跟您说哦:一是禁止携带手机、相机等拍摄设备进入,二是禁止触摸生产设备和产品,三是要跟紧讲解人员,不要随意走动~", + "category": "参观与指引", + "priority": 9, + "sub_questions": [ + "参观生产区注意事项", + "生产区参观要注意什么", + "进生产区有什么要求" + ], + "variations": [] + }, + { + "question": "参观生产区需要穿特殊服装吗?", + "answer": "关于服装,厂区会为参观人员提供一次性防尘帽和鞋套,您只需要穿着日常休闲服装即可,尽量不要穿裙子和高跟鞋,避免行动不便~", + "category": "参观与指引", + "priority": 9, + "sub_questions": [ + "参观需要穿什么衣服", + "生产区参观着装要求", + "参观厂区穿什么" + ], + "variations": [] + } + ] +} diff --git a/ruoyi-fastapi-backend/module_admin/controller/ragflow_controller.py b/ruoyi-fastapi-backend/module_admin/controller/ragflow_controller.py index ff0fd4d..659ef19 100644 --- a/ruoyi-fastapi-backend/module_admin/controller/ragflow_controller.py +++ b/ruoyi-fastapi-backend/module_admin/controller/ragflow_controller.py @@ -23,6 +23,7 @@ from module_admin.entity.vo.ragflow_vo import ( from utils.log_util import logger from utils.response_util import ResponseUtil from utils.semantic_cache_service import get_semantic_cache_service, lookup_question, store_qa_pair +from utils.static_qa_service import get_static_qa_service async def _async_store_qa(chat_id: str, question: str, answer: str, redis) -> None: @@ -69,6 +70,11 @@ async def converse_with_chat_assistant( ): """ 与聊天助手进行对话 - 集成语义缓存版本(支持流式和非流式) + + 匹配流程: + 1. 静态问答匹配 (threshold=0.70) + 2. RAG历史缓存匹配 (threshold=0.60) + 3. RAG服务调用 """ start_time = time.perf_counter() @@ -77,8 +83,40 @@ async def converse_with_chat_assistant( cache_key = None cached_answer = None cache_similarity = 0.0 + cache_source = None - # ========== 语义缓存查找(流式和非流式都支持)========== + # ========== 1. 静态问答匹配 ========== + try: + static_qa_service = get_static_qa_service() + static_match, static_sim = static_qa_service.find_match(converse_params.question, threshold=0.70) + + if static_match: + cached_answer = static_match.get('answer', '') + cache_similarity = static_sim + cache_source = 'static_qa' + logger.info(f'[RAG_SOURCE] 命中静态FAQ | chat_id={converse_params.chat_id} | question={converse_params.question} | similarity={cache_similarity:.2f} | answer_length={len(cached_answer)}') + + # 非流式:直接返回静态问答答案 + if not converse_params.stream: + return ResponseUtil.success(data={'answer': cached_answer, 'from_cache': True, 'similarity': cache_similarity, 'source': 'static_qa'}) + + # 流式:使用静态问答答案进行流式响应 + if converse_params.stream: + logger.info(f'[StaticQA] 流式响应使用静态问答答案,chat_id={converse_params.chat_id}') + return StreamingResponse( + stream_cached_response(cached_answer, converse_params.chat_id, start_time, cache_source='static_qa'), + media_type='text/event-stream', + headers={ + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', + 'Transfer-Encoding': 'chunked' + } + ) + except Exception as e: + logger.warning(f'[StaticQA] 静态问答匹配失败: {e}') + + # ========== 2. RAG历史缓存查找 ========== if redis: cache_result = await lookup_question( converse_params.chat_id, @@ -87,17 +125,18 @@ async def converse_with_chat_assistant( ) if cache_result: cached_answer, cache_similarity = cache_result - logger.info(f'[SemanticCache] 命中缓存 (相似度={cache_similarity:.2f}): chat_id={converse_params.chat_id}') + cache_source = 'rag_history' + logger.info(f'[RAG_SOURCE] 命中RAG会话历史 | chat_id={converse_params.chat_id} | question={converse_params.question} | similarity={cache_similarity:.2f} | answer_length={len(cached_answer)}') # 非流式:直接返回缓存答案 if not converse_params.stream: - return ResponseUtil.success(data={'answer': cached_answer, 'from_cache': True, 'similarity': cache_similarity}) + return ResponseUtil.success(data={'answer': cached_answer, 'from_cache': True, 'similarity': cache_similarity, 'source': 'rag_history'}) # 流式:使用缓存答案进行流式响应 if converse_params.stream: - logger.info(f'[SemanticCache] 流式响应使用缓存答案,chat_id={converse_params.chat_id}') + logger.info(f'[SemanticCache] 流式响应使用RAG历史缓存答案,chat_id={converse_params.chat_id}') return StreamingResponse( - stream_cached_response(cached_answer, converse_params.chat_id, start_time), + stream_cached_response(cached_answer, converse_params.chat_id, start_time, cache_source='rag_history'), media_type='text/event-stream', headers={ 'Cache-Control': 'no-cache', @@ -130,6 +169,7 @@ async def converse_with_chat_assistant( # 直接使用同步RAGFlow服务 try: + logger.info(f'[RAG_SOURCE] 调用原生RAG服务 | chat_id={converse_params.chat_id} | question={converse_params.question}') result = RAGFlowService.converse_with_chat_assistant_services(converse_params) # 流式响应 @@ -148,6 +188,12 @@ async def converse_with_chat_assistant( # 非流式响应 response = parse_result(result) + # 记录原生RAG响应日志 + if isinstance(result, dict) and result.get('code') == 0: + answer_data = result.get('data', {}) + answer_text = answer_data.get('answer') if isinstance(answer_data, dict) else str(answer_data) + logger.info(f'[RAG_SOURCE] 原生RAG非流式响应完成 | chat_id={converse_params.chat_id} | answer_length={len(answer_text) if answer_text else 0}') + # 设置语义缓存(存储问答对) if not converse_params.stream and redis and isinstance(result, dict) and result.get('code') == 0: try: @@ -259,8 +305,9 @@ def stream_ragflow_response(result: Generator, chat_id: str, start_time: float) stream_end_time = time.time() end_message = format_sse({'status': 'completed'}, event='end') yield end_message - logger.info(f"[RAG_SERVER {stream_end_time:.3f}] 🏁 流式响应完成,chat_id: {chat_id}") - logger.info(f"[RAG_SERVER {stream_end_time:.3f}] 📊 总共处理chunk数量: {chunk_count}") + logger.info(f'[RAG_SERVER {stream_end_time:.3f}] 流式响应完成,chat_id: {chat_id}') + logger.info(f'[RAG_SERVER {stream_end_time:.3f}] 总共处理chunk数量: {chunk_count}') + logger.info(f'[RAG_SOURCE] 原生RAG流式响应完成 | chat_id={chat_id} | total_chunks={chunk_count} | answer_length={len(last_answer)}') except Exception as exc: error_time = time.time() @@ -331,7 +378,7 @@ async def delete_datasets( return parse_result(result) -def stream_cached_response(cached_answer: str, chat_id: str, start_time: float) -> Generator[str, None, None]: +def stream_cached_response(cached_answer: str, chat_id: str, start_time: float, cache_source: str = 'cache') -> Generator[str, None, None]: """ 流式返回缓存的答案 @@ -339,6 +386,7 @@ def stream_cached_response(cached_answer: str, chat_id: str, start_time: float) cached_answer: 缓存的答案文本 chat_id: 会话ID start_time: 请求开始时间 + cache_source: 缓存来源 ('static_qa' 或 'rag_history') """ import time @@ -380,6 +428,7 @@ def stream_cached_response(cached_answer: str, chat_id: str, start_time: float) end_message = format_sse({ 'status': 'completed', 'from_cache': True, + 'source': cache_source, 'total_time': stream_end_time - server_stream_start }, event='end') yield end_message diff --git a/ruoyi-fastapi-backend/utils/match_service.py b/ruoyi-fastapi-backend/utils/match_service.py new file mode 100644 index 0000000..d50464e --- /dev/null +++ b/ruoyi-fastapi-backend/utils/match_service.py @@ -0,0 +1,247 @@ +import re +import jieba +from typing import List, Tuple, Optional +from loguru import logger + +class MatchService: + def __init__(self): + self._colloquial_map = { + '咋': '怎么', + '咋办': '怎么办', + '咋样': '怎么样', + '啥': '什么', + '啥子': '什么', + '啥时候': '什么时候', + '啥地方': '什么地方', + '哪儿': '哪里', + '咋个': '怎么', + '咋整': '怎么办', + '咋回事': '怎么回事', + '咋回事儿': '怎么回事', + '咋样': '怎么样', + '啥人': '什么人', + '啥东西': '什么东西', + '啥玩意': '什么东西', + '啥情况': '什么情况', + '咋样': '怎么样', + '咋办': '怎么办', + '咋弄': '怎么办', + '咋整': '怎么办', + '咋回事': '怎么回事', + '咋回事儿': '怎么回事', + '咋样': '怎么样', + '啥': '什么', + '啥子': '什么', + '啥时候': '什么时候', + '啥地方': '什么地方', + '啥人': '什么人', + '啥东西': '什么东西', + '啥玩意': '什么东西', + '啥情况': '什么情况', + '啥事': '什么事', + '啥事儿': '什么事', + '啥时候': '什么时候', + '啥地方': '什么地方', + '哪儿': '哪里', + '哪儿个': '哪个', + '咋': '怎么', + '咋办': '怎么办', + '咋样': '怎么样', + '咋个': '怎么', + '咋整': '怎么办', + '咋弄': '怎么办', + '咋回事': '怎么回事', + '咋回事儿': '怎么回事', + '咋样': '怎么样', + '啥': '什么', + '啥子': '什么', + '啥时候': '什么时候', + '啥地方': '什么地方', + '啥人': '什么人', + '啥东西': '什么东西', + '啥玩意': '什么东西', + '啥情况': '什么情况', + '啥事': '什么事', + '啥事儿': '什么事', + } + + self._stop_words = { + '的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', + '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', + '看', '好', '自己', '这', '那', '吗', '呢', '吧', '啊', '哦', '呀', '嘛', + '啦', '呗', '哇', '嗯', '哎', '唉', '哼', '哦', '噢', '哎哟', '哎呀', + '请问', '请问一下', '请问一下', '请问一下', '我想知道', '我想问', + '帮我', '帮我看', '帮我看一下', '帮我查', '帮我查一下', + '能不能', '可不可以', '是否', '有没有', '是不是', + '麻烦', '麻烦问一下', '麻烦您', '麻烦帮我', + '请问', '请问一下', '请问问', + '我们', '你们', '他们', '她们', '它们', + '这个', '那个', '这些', '那些', + '这种', '那种', '这类', '那类', + '怎么样', '如何', '怎样', + '关于', '对于', '至于', + '以及', '或者', '还是', + '因为', '所以', '但是', '不过', + '如果', '要是', '假如', + '虽然', '尽管', + '而且', '并且', '同时', + '然后', '接着', '之后', + '首先', '其次', '最后', + '总之', '总的来说', + } + + self._question_words = { + '什么', '怎么', '如何', '为什么', '哪儿', '哪里', '哪个', '谁', + '多少', '几', '何时', '什么时候', '怎样', '怎么样', + '是否', '有没有', '是不是', '能不能', '可不可以', + } + + jieba.setLogLevel(jieba.logging.INFO) + + def preprocess_text(self, text: str) -> str: + if not text: + return '' + + text = text.strip() + text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9,。?!、;:""''()【】《》\s]', '', text) + text = re.sub(r'\s+', ' ', text) + + for colloquial, standard in self._colloquial_map.items(): + text = text.replace(colloquial, standard) + + text = re.sub(r'[,。?!、;:""''()【】《》]', ' ', text) + text = re.sub(r'\s+', ' ', text) + + return text.strip() + + def extract_keywords(self, text: str, top_k: int = 10) -> List[str]: + if not text: + return [] + + text = self.preprocess_text(text) + words = jieba.lcut(text) + + keywords = [] + for word in words: + word = word.strip() + if len(word) < 2: + continue + if word in self._stop_words: + continue + if word.isdigit(): + continue + keywords.append(word) + + question_keywords = [] + for word in words: + if word in self._question_words: + question_keywords.append(word) + + keywords = list(set(keywords)) + + word_freq = {} + for word in words: + if word in keywords: + word_freq[word] = word_freq.get(word, 0) + 1 + + keywords.sort(key=lambda x: word_freq.get(x, 0), reverse=True) + + return keywords[:top_k] + + def calculate_similarity(self, q1: str, q2: str) -> float: + if not q1 or not q2: + return 0.0 + + n1 = self.preprocess_text(q1) + n2 = self.preprocess_text(q2) + + if n1 == n2: + return 1.0 + + kw1 = set(self.extract_keywords(q1)) + kw2 = set(self.extract_keywords(q2)) + + if not kw1 or not kw2: + return 0.0 + + intersection = len(kw1 & kw2) + union = len(kw1 | kw2) + jaccard_sim = intersection / union if union > 0 else 0 + + len_sim = 1 - abs(len(n1) - len(n2)) / max(len(n1), len(n2)) + len_sim = max(0, len_sim) + + similarity = 0.7 * jaccard_sim + 0.3 * len_sim + + return similarity + + def find_best_match(self, query: str, candidates: List[Tuple[str, any]], threshold: float = 0.6) -> Tuple[Optional[any], float]: + if not query or not candidates: + return None, 0.0 + + best_match = None + best_similarity = 0.0 + + for candidate_text, candidate_data in candidates: + similarity = self.calculate_similarity(query, candidate_text) + + if similarity > best_similarity: + best_similarity = similarity + best_match = candidate_data + + if best_similarity >= threshold: + return best_match, best_similarity + + return None, best_similarity + + def decompose_question(self, question: str) -> List[str]: + if not question: + return [] + + question = self.preprocess_text(question) + + separators = ['?', '?', '。', '.', ';', ';', ',', ',', '和', '以及', '还有', '另外'] + + sub_questions = [question] + for sep in separators: + new_sub_questions = [] + for sq in sub_questions: + parts = sq.split(sep) + new_sub_questions.extend(parts) + sub_questions = new_sub_questions + + sub_questions = [sq.strip() for sq in sub_questions if sq.strip()] + + unique_sub_questions = [] + seen = set() + for sq in sub_questions: + if sq not in seen: + seen.add(sq) + unique_sub_questions.append(sq) + + return unique_sub_questions + + def calculate_keyword_coverage(self, query: str, candidate: str) -> float: + if not query or not candidate: + return 0.0 + + kw1 = set(self.extract_keywords(query)) + kw2 = set(self.extract_keywords(candidate)) + + if not kw1: + return 0.0 + + if not kw2: + return 0.0 + + coverage = len(kw1 & kw2) / len(kw1) + + return coverage + +_match_service_instance = None + +def get_match_service() -> MatchService: + global _match_service_instance + if _match_service_instance is None: + _match_service_instance = MatchService() + return _match_service_instance diff --git a/ruoyi-fastapi-backend/utils/semantic_cache_service.py b/ruoyi-fastapi-backend/utils/semantic_cache_service.py index f621298..915d367 100644 --- a/ruoyi-fastapi-backend/utils/semantic_cache_service.py +++ b/ruoyi-fastapi-backend/utils/semantic_cache_service.py @@ -24,14 +24,7 @@ import re import logging from typing import Optional, Tuple, Dict, Any, List from dataclasses import dataclass, asdict - -# 尝试导入jieba分词库 -try: - import jieba - JIEBA_AVAILABLE = True -except ImportError: - JIEBA_AVAILABLE = False - logger.warning("jieba未安装,语义匹配将使用简单的关键词提取") +from .match_service import get_match_service # 配置日志 logger = logging.getLogger(__name__) @@ -102,79 +95,18 @@ class SemanticCacheService: def _normalize_question(self, question: str) -> str: """标准化问题文本""" - # 去除首尾空格 - q = question.strip() - # 统一空格 - q = re.sub(r'\s+', ' ', q) - # 统一中文标点 - q = q.replace('?', '?').replace(',', ',').replace('。', '.') - return q + match_service = get_match_service() + return match_service.preprocess_text(question) def _extract_keywords(self, question: str) -> List[str]: - """ - 提取问题关键词(用于语义匹配) - - 策略: - 1. 使用jieba分词(如果可用)提取中文词语 - 2. 保留完整问题作为主要匹配依据 - 3. 提取名词、动词等核心词汇 - 4. 过滤停用词 - """ - # 简单停用词列表 - stopwords = {'的', '是', '了', '在', '有', '和', '与', '或', '吗', '呢', '吧', '啊', '哦', '请问', '能不能', '可以', '怎么', '如何', '什么', '多少', '几个', '哪些', '那个', '这个'} - - if JIEBA_AVAILABLE: - # 使用jieba分词 - words = list(jieba.cut(question)) - - # 过滤停用词、过短的词和标点 - keywords = [w.strip() for w in words - if w.strip() and w.strip() not in stopwords - and len(w.strip()) > 1 - and not re.match(r'^[\s\d\W]+$', w)] - else: - # 分词(简单按字符或词语切分) - words = re.findall(r'[\w\u4e00-\u9fff]+', question.lower()) - - # 过滤停用词和过短的词 - keywords = [w for w in words if w not in stopwords and len(w) > 1] - - return keywords + """提取问题关键词(用于语义匹配)""" + match_service = get_match_service() + return match_service.extract_keywords(question) def _calculate_text_similarity(self, q1: str, q2: str) -> float: - """ - 计算两个问题的文本相似度 - - 使用Jaccard相似度 + 关键词匹配的综合策略 - """ - # 标准化 - n1 = self._normalize_question(q1) - n2 = self._normalize_question(q2) - - # 如果完全相同,直接返回1.0 - if n1 == n2: - return 1.0 - - # 计算关键词重叠度 - kw1 = set(self._extract_keywords(q1)) - kw2 = set(self._extract_keywords(q2)) - - if not kw1 or not kw2: - return 0.0 - - # Jaccard相似度 - intersection = len(kw1 & kw2) - union = len(kw1 | kw2) - jaccard_sim = intersection / union if union > 0 else 0 - - # 长度相似度(惩罚长度差异过大的问题) - len_sim = 1 - abs(len(n1) - len(n2)) / max(len(n1), len(n2)) - len_sim = max(0, len_sim) # 确保非负 - - # 综合相似度(关键词权重0.7,长度权重0.3) - similarity = 0.7 * jaccard_sim + 0.3 * len_sim - - return similarity + """计算两个问题的文本相似度""" + match_service = get_match_service() + return match_service.calculate_similarity(q1, q2) async def lookup( self, diff --git a/ruoyi-fastapi-backend/utils/static_qa_service.py b/ruoyi-fastapi-backend/utils/static_qa_service.py new file mode 100644 index 0000000..edca59a --- /dev/null +++ b/ruoyi-fastapi-backend/utils/static_qa_service.py @@ -0,0 +1,153 @@ +import json +import os +from typing import List, Dict, Optional, Tuple +from loguru import logger +from .match_service import get_match_service + +class StaticQAService: + def __init__(self, config_path: str = None): + if config_path is None: + current_dir = os.path.dirname(os.path.abspath(__file__)) + config_path = os.path.join(os.path.dirname(os.path.dirname(current_dir)), 'config', 'static_qa.json') + + self.config_path = config_path + self.match_service = get_match_service() + self._qa_pairs: List[Dict] = [] + self._flattened_pairs: List[Tuple[str, Dict]] = [] + self._load_qa_data() + + def _load_qa_data(self): + try: + if not os.path.exists(self.config_path): + logger.warning(f'[StaticQA] 配置文件不存在: {self.config_path}') + self._qa_pairs = [] + self._flattened_pairs = [] + return + + with open(self.config_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + self._qa_pairs = data.get('qa_pairs', []) + self._build_flattened_pairs() + + logger.info(f'[StaticQA] 加载了 {len(self._qa_pairs)} 个问答对,{len(self._flattened_pairs)} 个问题变体') + + except Exception as e: + logger.error(f'[StaticQA] 加载配置文件失败: {e}') + self._qa_pairs = [] + self._flattened_pairs = [] + + def _build_flattened_pairs(self): + self._flattened_pairs = [] + + for qa_pair in self._qa_pairs: + main_question = qa_pair.get('question', '') + answer = qa_pair.get('answer', '') + category = qa_pair.get('category', '') + priority = qa_pair.get('priority', 0) + + if not main_question or not answer: + continue + + qa_data = { + 'question': main_question, + 'answer': answer, + 'category': category, + 'priority': priority, + 'source': 'static_qa' + } + + self._flattened_pairs.append((main_question, qa_data)) + + sub_questions = qa_pair.get('sub_questions', []) + for sub_q in sub_questions: + if sub_q: + sub_qa_data = qa_data.copy() + sub_qa_data['is_sub_question'] = True + self._flattened_pairs.append((sub_q, sub_qa_data)) + + variations = qa_pair.get('variations', []) + for var in variations: + if var: + var_qa_data = qa_data.copy() + var_qa_data['is_variation'] = True + self._flattened_pairs.append((var, var_qa_data)) + + def reload(self): + self._load_qa_data() + + def find_match(self, question: str, threshold: float = 0.70) -> Tuple[Optional[Dict], float]: + if not question or not self._flattened_pairs: + return None, 0.0 + + decomposed_questions = self.match_service.decompose_question(question) + + if len(decomposed_questions) > 1: + logger.info(f'[StaticQA] 问题分解为 {len(decomposed_questions)} 个子问题: {decomposed_questions}') + + best_match = None + best_similarity = 0.0 + + for sub_q in decomposed_questions: + match, similarity = self.match_service.find_best_match( + sub_q, + self._flattened_pairs, + threshold=threshold + ) + + if match and similarity > best_similarity: + best_match = match + best_similarity = similarity + + if best_match: + logger.info(f'[StaticQA] 找到匹配 (相似度={best_similarity:.2f}): {best_match.get("question")}') + + return best_match, best_similarity + + def find_all_matches(self, question: str, threshold: float = 0.70, max_results: int = 5) -> List[Tuple[Dict, float]]: + if not question or not self._flattened_pairs: + return [] + + decomposed_questions = self.match_service.decompose_question(question) + + all_matches = [] + + for sub_q in decomposed_questions: + for candidate_text, candidate_data in self._flattened_pairs: + similarity = self.match_service.calculate_similarity(sub_q, candidate_text) + + if similarity >= threshold: + all_matches.append((candidate_data, similarity)) + + all_matches.sort(key=lambda x: (x[1], x[0].get('priority', 0)), reverse=True) + + return all_matches[:max_results] + + def get_all_categories(self) -> List[str]: + categories = set() + for qa_pair in self._qa_pairs: + category = qa_pair.get('category', '') + if category: + categories.add(category) + return sorted(list(categories)) + + def get_qa_by_category(self, category: str) -> List[Dict]: + result = [] + for qa_pair in self._qa_pairs: + if qa_pair.get('category') == category: + result.append(qa_pair) + return result + + def get_qa_count(self) -> int: + return len(self._qa_pairs) + + def get_question_count(self) -> int: + return len(self._flattened_pairs) + +_static_qa_service_instance = None + +def get_static_qa_service(config_path: str = None) -> StaticQAService: + global _static_qa_service_instance + if _static_qa_service_instance is None: + _static_qa_service_instance = StaticQAService(config_path) + return _static_qa_service_instance