1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
| # src/chunker.py
from dataclasses import dataclass
@dataclass
class Chunk:
"""文档片段"""
content: str # 片段内容
metadata: dict # 继承自原文档 + 新增片段级元数据
chunk_id: str # 唯一标识:doc_id + chunk_index
def __repr__(self) -> str:
return f"Chunk(id={self.chunk_id}, content={self.content}, metadata={self.metadata})"
class DocumentChunker:
"""文档分块器
分块策略对 RAG 效果影响大
* 块太大:包含太多无关信息,检索精度下降
* 块太小:丢失上下文语义完整性,回答碎片化
* 没有重叠:信息在块边界被切断,可能遗漏关键内容
* 重叠太多:存储浪费,检索到重复内容,token 浪费
推荐起步参数:chunk_size=500, overlap=100
根据实际效果调整
"""
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 100,
min_chunk_size: int = 50,
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
# ===============================
# 策略 1:固定大小分块
#================================
def chunk_by_size(self, doc) -> list[Chunk]:
"""按固定字符数分块 + 滑动窗口重叠
这是最简单的分块方法,适合快速原型
chunk_size = 500, overlap = 100
文档:|---------500--------|
|--------500-------|
100 字重叠
"""
text = doc.content
chunks = []
start = 0
chunk_index = 0
while start < len(text):
end = start + self.chunk_size
# 尝试在句子边界切分
if end < len(text):
# 从 end 位置向前找最近的句号/换行
boundary = self._find_boundary(text, end)
if boundary > start + self.min_chunk_size:
end = boundary
chunk_text = text[start:end].strip()
if len(chunk_text) >= self.min_chunk_size:
chunks.append(Chunk(
content=chunk_text,
metadata={
**doc.metadata,
"chunk_index": chunk_index,
"start_char": start,
"end_chart": end,
},
chunk_id=f"{doc.doc_id}::chunk_{chunk_index}",
))
chunk_index += 1
# 下一个块的起始位置 = 当前结束位置 - 重叠长度
start = end - self.chunk_overlap
return chunks
# ================================================
# 策略 2:按段落/标题分块(适合 Markdown 和结构化文档)
# ================================================
def chunk_by_section(self, doc) -> list[Chunk]:
"""按 Markdown 标题分块
对于有结构的文档(如笔记、技术文档),按标题分块
能保证语义完整性,效果通常由于固定大小分块
# 标题 1 -> 独立 chunk
## 标题 1.1 -> 独立 chunk
"""
lines = doc.content.split("\n")
chunks = []
current_section = []
current_heading = ""
chunk_index = 0
for line in lines:
# 检测标题行
if line.strip().startswith("#"):
# 保存前一个 section
if current_section:
section_text = "\n".join(current_section).strip()
if len(section_text) >= self.min_chunk_size:
chunks.append(Chunk(
content=section_text,
metadata={
**doc.metadata,
"chunk_index": chunk_index,
"heading": current_heading,
},
chunk_id=f"{doc.doc_id}::section_{chunk_index}",
))
chunk_index += 1
# 开始新 section
current_heading = line.strip().lstrip("#").strip()
current_section = [line]
else:
current_section.append(line)
# 最后一个 section
if current_section:
section_text = "\n".join(current_section).strip()
if len(section_text) >= self.min_chunk_size:
chunks.append(Chunk(
content=section_text,
metadata={
**doc.metadata,
"chunk_index": chunk_index,
"heading": current_heading,
},
chunk_id=f"{doc.doc_id}::section_{chunk_index}",
))
# 如果某个 section 超过 chunk_size,再做二次分块
final_chunks = []
for chunk in chunks:
if len(chunk.content) > self.chunk_size * 2:
# 对过长的 section 做固定大小分块
sub_doc = type(doc)(
content=chunk.content,
metadata=chunk.metadata,
doc_id=chunk.chunk_id,
)
final_chunks.extend(self.chunk_by_size(sub_doc))
else:
final_chunks.append(chunk)
return final_chunks
# =====================================================
# 策略 3:递归字符分块(LangChain 默认策略)
# =====================================================
def chunk_recursive(self, doc) -> list[Chunk]:
""" 递归分块 - 按层级分隔符依次尝试
分隔符优先级:段落 > 句子 > 词
先尝试按段落分,如果段落太长再按句子分,依次类推
这是 LangChain RecursiveCharacterTextSplitter 的核心思想
"""
separators = ["\n\n", "\n", "。", ".", "!", "!", "?", "?", " "]
return self._recursive_split(
text=doc.content,
separators=separators,
doc=doc,
)
def _recursive_split(self, text: str, separators: list[str], doc, chunk_index: int = 0) -> list[Chunk]:
"""递归分割的核心逻辑"""
chunks = []
if len(text) <= self.chunk_size:
if len(text) >= self.min_chunk_size:
chunks.append(Chunk(
content=text.strip(),
metadata={**doc.metadata, "chunk_index": chunk_index},
chunk_id=f"{doc.doc_id}::recursive_{chunk_index}",
))
return chunks
# 找到第一个能有效分割的分隔符
sep = separators[0] if separators else ""
parts = text.split(sep) if sep else list(text)
current_chunk = ""
for part in parts:
candidate = current_chunk + sep + part if current_chunk else part
if len(candidate) > self.chunk_size:
# 当前块已满
if current_chunk:
if len(current_chunk) > self.min_chunk_size:
chunks.append(Chunk(
content=current_chunk.strip(),
metadata={**doc.metadata, "chunk_index": chunk_index},
chunk_id=f"{doc.doc_id}::recursive_{chunk_index}",
))
chunk_index += 1
# 如果单个 part 就超长,用下一级分隔符继续分
if len(part) > self.chunk_size and len(separators) > 1:
sub_chunks = self._recursive_split(
part, separators[1:], doc, chunk_index
)
chunks.extend(sub_chunks)
chunk_index += len(sub_chunks)
current_chunk = ""
else:
current_chunk = part
else:
current_chunk = candidate
if current_chunk and len(current_chunk) > self.min_chunk_size:
chunks.append(Chunk(
content=current_chunk.strip(),
metadata={**doc.metadata, "chunk_index": chunk_index},
chunk_id=f"{doc.doc_id}::recursive_{chunk_index}",
))
return chunks
# ================================================================
# 辅助方法
# ================================================================
def _find_boundary(self, text: str, position: int, window: int = 100) -> int:
"""在 position 附近找到最近的句子边界"""
search_start = max(position - window, 0)
search_text = text[search_start:position + window]
# 按优先级查找分隔符
for sep in ["\n\n", "\n", "。", ".", "!", "!", "?", "?"]:
idx = search_text.rfind(sep, 0, position - search_start + 1)
if idx != -1:
return search_start + idx + len(sep)
return position # 没找到合适的边界,在原位置切分
|