T47.2 RAG 技术系列补充 ✅

一、Agentic RAG 概述

1.1 什么是 Agentic RAG

Agentic RAG 是将 Agent 框架RAG 系统深度融合的新一代检索增强生成架构。与传统 RAG 相比,Agentic RAG 的核心特点是主动性和智能性

特性 传统 RAG Agentic RAG
检索时机 被动响应查询 主动判断是否需要检索
检索策略 固定策略 动态选择和调整
错误处理 自我反思和重试
多跳推理 单次检索 迭代检索和推理
知识源 单一 多源融合

1.2 为什么需要 Agentic RAG

传统 RAG 的痛点:

1
2
3
4
5
6
传统 RAG 痛点:
├── 查询理解单一:无法处理复杂多意图查询
├── 检索策略固定:无法根据结果动态调整
├── 缺乏自我纠错:无法识别和修正错误检索
├── 多跳能力弱:无法处理需要多步推理的问题
└── 知识源割裂:无法有效融合多源异构数据

二、Agentic RAG 架构设计

2.1 整体架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
┌──────────────────────────────────────────────────────────────────┐
│ Agentic RAG 整体架构 │
└──────────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────┐
│ 应用层 │
├──────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 智能客服 │ │ 知识问答 │ │ 文档助手 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└──────────────────────────┬───────────────────────────────────────┘

┌──────────────────────────┼───────────────────────────────────────┐
│ Agent 层 │
├──────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Orchestrator Agent │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ 规划器 │ │ 路由器 │ │ 反思器 │ │ 执行器 │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└──────────────────────────┬───────────────────────────────────────┘

┌──────────────────────────┼───────────────────────────────────────┐
│ RAG 执行层 │
├──────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 查询重写 │ │ 迭代检索 │ │ 答案生成 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└──────────────────────────┬───────────────────────────────────────┘

┌──────────────────────────┼───────────────────────────────────────┐
│ 知识层 │
├──────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 向量数据库 │ │ 知识图谱 │ │ 结构化数据库│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└──────────────────────────────────────────────────────────────────┘

2.2 核心组件详细设计

2.2.1 Orchestrator Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class OrchestratorAgent:
"""
编排器 Agent - Agentic RAG 的核心决策中心
"""

def __init__(self, llm):
self.llm = llm
self.planner = PlannerModule(llm)
self.router = RouterModule(llm)
self.reflector = ReflectorModule(llm)
self.executor = ExecutorModule(llm)

def process(self, query: str, context: dict = None) -> dict:
"""
处理用户查询的完整流程
"""
# 1. 查询理解与规划
plan = self.planner.create_plan(query, context)

# 2. 路由决策
routes = self.router.decide_routes(plan)

# 3. 执行检索
results = self.executor.execute_routes(routes, plan)

# 4. 反思与优化
optimized_results = self.reflector.reflect_and_optimize(
query, results
)

# 5. 生成最终答案
answer = self._generate_answer(query, optimized_results)

return {
'answer': answer,
'sources': optimized_results['sources'],
'reasoning': optimized_results['reasoning'],
'confidence': optimized_results['confidence']
}

2.2.2 Planner Module

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class PlannerModule:
"""
规划器 - 分解任务,制定检索策略
"""

def __init__(self, llm):
self.llm = llm

def create_plan(self, query: str, context: dict = None) -> dict:
"""
创建任务执行计划
"""
prompt = f"""
用户查询:{query}

请分析这个查询,并制定检索计划:

1. 查询类型分类:
- 事实性查询(需要具体事实)
- 解释性查询(需要概念理解)
- 分析性查询(需要综合分析)
- 多跳查询(需要多个事实关联)

2. 所需知识源:
- 哪些知识库需要检索
- 检索优先级

3. 分解子问题(如需要多跳):
- 列出需要逐步回答的子问题

4. 预期答案形式:
- 简短回答 / 详细解释 / 对比分析 / 步骤说明
"""

response = self.llm.generate(prompt)
return self._parse_plan(response)

def _parse_plan(self, response: str) -> dict:
"""解析 LLM 生成的计划"""
# 实现计划解析逻辑
pass

2.2.3 Router Module

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class RouterModule:
"""
路由器 - 决定查询的检索路径
"""

def __init__(self, llm, knowledge_sources: Dict[str, Any]):
self.llm = llm
self.knowledge_sources = knowledge_sources

def decide_routes(self, plan: dict) -> List[dict]:
"""
决定检索路由
"""
routes = []

for sub_task in plan['sub_tasks']:
# 判断需要查询哪些知识源
required_sources = self._identify_sources(sub_task)

# 确定检索策略
strategy = self._select_strategy(sub_task, required_sources)

routes.append({
'task': sub_task,
'sources': required_sources,
'strategy': strategy
})

return routes

def _identify_sources(self, task: dict) -> List[str]:
"""识别任务需要的知识源"""
prompt = f"""
子任务:{task['description']}

可用知识源:
{', '.join(self.knowledge_sources.keys())}

请确定这个子任务需要检索哪些知识源。
"""
response = self.llm.generate(prompt)
# 解析知识源列表
return []

def _select_strategy(self, task: dict, sources: List[str]) -> dict:
"""选择检索策略"""
return {
'type': 'semantic', # 或 'keyword', 'hybrid', 'graph'
'top_k': 10,
'rerank': True,
'max_iterations': 3
}

2.2.4 Reflector Module

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class ReflectorModule:
"""
反思器 - 评估和优化检索结果
"""

def __init__(self, llm):
self.llm = llm

def reflect_and_optimize(
self,
query: str,
results: dict
) -> dict:
"""
反思检索结果并进行优化
"""
# 1. 评估结果相关性
relevance_scores = self._evaluate_relevance(query, results)

# 2. 检查是否满足查询需求
sufficiency_check = self._check_sufficiency(
query, results, relevance_scores
)

# 3. 如不满足,生成优化策略
if not sufficiency_check['passed']:
optimization = self._generate_optimization(
query, results, sufficiency_check
)
return self._apply_optimization(results, optimization)

return results

def _evaluate_relevance(self, query: str, results: dict) -> List[dict]:
"""评估每条结果与查询的相关性"""
scored_results = []

for doc in results.get('documents', []):
prompt = f"""
查询:{query}
文档内容:{doc['content'][:500]}

评估这篇文档与查询的相关性,给出 0-1 的分数和简要理由。
"""
response = self.llm.generate(prompt)
# 解析分数
score = self._parse_score(response)
scored_results.append({
'doc': doc,
'score': score
})

return sorted(scored_results, key=lambda x: x['score'], reverse=True)

def _check_sufficiency(self, query: str, results: dict, scores: List[dict]) -> dict:
"""检查结果是否足够回答查询"""
high_score_count = sum(1 for s in scores if s['score'] > 0.7)

return {
'passed': high_score_count >= 2,
'missing_aspects': self._identify_gaps(query, results)
}

def _generate_optimization(self, query: str, results: dict, check: dict) -> dict:
"""生成优化策略"""
return {
'action': 'rewrite_and_retry',
'new_queries': ['改写后的查询1', '改写后的查询2'],
'focus_areas': check['missing_aspects']
}

三、迭代检索实现

3.1 迭代检索流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
┌──────────────────────────────────────────────────────────────────┐
│ 迭代检索流程 │
└──────────────────────────────────────────────────────────────────┘

┌─────────────┐
│ 初始查询 │
└──────┬──────┘


┌─────────────────────────────────────────┐
│ 第1轮检索 │
│ 查询 → 检索 → 评估 → 相关? │
└─────────────────┬───────────────────────┘

┌─────────┴─────────┐
│ │
Yes No
│ │
▼ ▼
┌─────────┐ ┌─────────────┐
│ 生成答案 │ │ 查询重写 │
└────┬────┘ └──────┬──────┘
│ │
│ ┌─────┴─────┐
│ 最多3轮 否
│ │
│ ▼
│ ┌─────────────┐
│ │ 生成答案 │
│ │(部分信息) │
│ └─────────────┘
│ │
└────────────┤


┌───────────────┐
│ 最终答案 │
└───────────────┘

3.2 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
class IterativeRetriever:
"""
迭代检索器 - 支持多轮检索优化
"""

def __init__(
self,
vector_store,
kg_store,
reranker,
max_iterations: int = 3
):
self.vector_store = vector_store
self.kg_store = kg_store
self.reranker = reranker
self.max_iterations = max_iterations

def retrieve(self, query: str) -> List[Document]:
"""
执行迭代检索
"""
all_results = []
current_query = query
context = []

for iteration in range(self.max_iterations):
# 1. 语义向量检索
vector_results = self.vector_store.search(
current_query,
top_k=20
)

# 2. 知识图谱检索(如果适用)
kg_results = []
if self._requires_kg(query):
kg_results = self.kg_store.search(current_query)

# 3. 合并结果
merged = self._merge_results(vector_results, kg_results)

# 4. 重排序
reranked = self.reranker.rerank(query, merged)

# 5. 评估是否足够
if self._is_sufficient(reranked, query):
all_results.extend(reranked[:10])
break

# 6. 生成下一轮查询
context = reranked[:5]
current_query = self._rewrite_query(query, context)
all_results.extend(reranked[:5])

# 去重并返回
return self._deduplicate(all_results)

def _requires_kg(self, query: str) -> bool:
"""判断是否需要知识图谱检索"""
kg_keywords = ['关系', '关联', '公司', '人物', '因果']
return any(kw in query for kw in kg_keywords)

def _merge_results(
self,
vector_results: List[Document],
kg_results: List[Document]
) -> List[Document]:
"""合并向量检索和知识图谱检索结果"""
seen = set()
merged = []

for doc in vector_results + kg_results:
doc_id = doc.get('id') or doc.get('chunk_id')
if doc_id not in seen:
seen.add(doc_id)
merged.append(doc)

return merged

def _rewrite_query(
self,
original_query: str,
context: List[Document]
) -> str:
"""基于已有结果重写查询"""
context_text = "\n".join([
f"- {doc['content'][:200]}" for doc in context
])

prompt = f"""
原始查询:{original_query}

已检索到的上下文:
{context_text}

基于已有信息,生成一个更好的检索查询,
以找到原始查询缺失的信息。
"""
return self.llm.generate(prompt)

def _is_sufficient(
self,
results: List[Document],
query: str
) -> bool:
"""判断检索结果是否足够回答查询"""
if len(results) < 3:
return False

# 检查 top 结果的相关性
top_relevance = results[0].get('relevance', 0)
return top_relevance > 0.8

四、多知识源融合

4.1 异构知识源类型

知识源 数据格式 检索方式 适用场景
向量数据库 文本 Embedding 语义相似度 非结构化文档
知识图谱 RDF/图结构 图遍历 实体关系
SQL 数据库 结构化表 SQL 查询 业务数据
搜索引擎 Web 内容 关键词+语义 实时信息
文档库 PDF/Word 全文检索 合同/报告

4.2 融合策略实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class MultiSourceFusion:
"""
多知识源融合
"""

def __init__(self, sources: Dict[str, Any]):
self.sources = sources

def fuse(self, query: str, source_results: Dict[str, List]) -> List[dict]:
"""
融合多个知识源的检索结果

Args:
query: 用户查询
source_results: 各知识源的检索结果字典
"""
# 1. 统一格式化
normalized = self._normalize(source_results)

# 2. 计算跨源相关性
scored = self._cross_source_scoring(query, normalized)

# 3. 去除重复
deduplicated = self._deduplicate_by_content(scored)

# 4. 排序输出
return sorted(
deduplicated,
key=lambda x: x['final_score'],
reverse=True
)[:20]

def _normalize(self, source_results: Dict[str, List]) -> List[dict]:
"""将不同来源的结果统一格式化"""
normalized = []

for source_name, results in source_results.items():
for item in results:
normalized.append({
'source': source_name,
'content': item.get('content', ''),
'metadata': item.get('metadata', {}),
'original_score': item.get('score', 0),
'embedding': item.get('embedding', None)
})

return normalized

def _cross_source_scoring(self, query: str, items: List[dict]) -> List[dict]:
"""跨源评分"""
for item in items:
# 基础相关性分数
base_score = item['original_score']

# 来源权重
source_weight = self._get_source_weight(item['source'])

# 内容质量分数
quality_score = self._assess_quality(item['content'])

# 多源一致性分数(如有多个来源提到相似内容)
consistency_score = self._check_consistency(item, items)

# 综合分数
item['final_score'] = (
base_score * source_weight * 0.4 +
quality_score * 0.3 +
consistency_score * 0.3
)

return items

def _get_source_weight(self, source: str) -> float:
"""知识源权重配置"""
weights = {
'knowledge_graph': 1.2, # 知识图谱权重较高
'vector_store': 1.0,
'sql_database': 1.1,
'web_search': 0.8
}
return weights.get(source, 1.0)

五、企业级部署方案

5.1 高可用架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
┌──────────────────────────────────────────────────────────────────┐
│ 高可用部署架构 │
└──────────────────────────────────────────────────────────────────┘

┌─────────────┐
│ 负载均衡 │
│ (Nginx) │
└──────┬──────┘

┌─────────────────┼─────────────────┐
│ │ │
┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐
│ API Pod │ │ API Pod │ │ API Pod │
│ (x3) │ │ (x3) │ │ (x3) │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
└─────────────────┼─────────────────┘

┌───────────┴───────────┐
│ │
┌─────┴─────┐ ┌─────┴─────┐
│ Redis │ │ Kafka │
│ (缓存) │ │ (消息) │
└───────────┘ └───────────┘

┌─────────────────────┼─────────────────────┐
│ │ │
┌─────┴─────┐ ┌─────┴─────┐ ┌─────┴─────┐
│ 向量存储 │ │ 知识图谱 │ │ 对象存储 │
│ (Pinecone)│ │ (Neo4j) │ │ (S3) │
└───────────┘ └───────────┘ └───────────┘

5.2 性能优化策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class PerformanceOptimizer:
"""
性能优化器
"""

def __init__(self):
self.cache = RedisCache()
self.query_optimizer = QueryOptimizer()

async def optimize_retrieval(self, query: str) -> dict:
"""
优化检索性能
"""
# 1. 检查缓存
cached = await self.cache.get(query)
if cached:
return {'result': cached, 'from_cache': True}

# 2. 查询优化
optimized_query = self.query_optimizer.optimize(query)

# 3. 并行执行多个检索
results = await self._parallel_retrieval(optimized_query)

# 4. 异步后处理
processed = await self._async_postprocess(results)

# 5. 缓存结果
await self.cache.set(query, processed, ttl=3600)

return {'result': processed, 'from_cache': False}

async def _parallel_retrieval(self, query: str) -> dict:
"""并行执行多个知识源检索"""
tasks = [
self.vector_store.search(query),
self.kg_store.search(query),
self.sql_store.query(query)
]

results = await asyncio.gather(*tasks, return_exceptions=True)

return {
'vector': results[0] if not isinstance(results[0], Exception) else [],
'kg': results[1] if not isinstance(results[1], Exception) else [],
'sql': results[2] if not isinstance(results[2], Exception) else []
}

5.3 监控与告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 监控指标配置
metrics:
retrieval:
- name: retrieval_latency_p99
threshold: 500ms
alert: true
- name: cache_hit_rate
threshold: < 0.6
alert: true
- name: retrieval_error_rate
threshold: > 0.01
alert: true

quality:
- name: answer_relevance_score
threshold: < 0.7
alert: true
- name: citation_accuracy
threshold: < 0.9
alert: true

六、行业应用案例

6.1 金融投研分析

场景:投资银行的研报分析助手

架构特点

  • 多源融合:研报、新闻、财务数据、舆情
  • 知识图谱:公司、人物、行业的关联关系
  • 迭代检索:复杂投资问题多步推理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 金融领域配置
financial_config = {
'knowledge_sources': {
'research_reports': {
'type': 'vector_store',
'weight': 1.2,
'top_k': 10
},
'financial_kg': {
'type': 'knowledge_graph',
'weight': 1.3,
'entities': ['company', 'person', 'industry']
},
'market_data': {
'type': 'sql_database',
'weight': 1.1
},
'news_feed': {
'type': 'web_search',
'weight': 0.9,
'recency': '7d'
}
},
'retrieval': {
'max_iterations': 4,
'rerank_model': 'bge-reranker-base'
}
}

6.2 医疗诊断辅助

场景:医生诊疗决策支持

架构特点

  • 严格的质量控制
  • 可解释的检索结果
  • 多语言支持(中英文医学文献)

6.3 法律合同审查

场景:企业法务的合同风险识别

架构特点

  • 条款级别的精确检索
  • 风险标签和分级
  • 知识图谱:法律条文关联

七、总结

Agentic RAG 代表了 RAG 技术的未来发展方向,其核心价值在于:

  1. 智能化:从被动检索到主动决策
  2. 迭代优化:通过反思持续提升结果质量
  3. 多源融合:打破知识孤岛
  4. 企业级:完整的部署和监控方案

企业在落地 Agentic RAG 时,建议:

  • 从小规模开始:先在单一场景验证
  • 注重数据质量:Garbage in, garbage out
  • 建立评估体系:持续监控和优化
  • 渐进式演进:根据业务需求逐步扩展

相关阅读: