Qanything 2.0 文件解析服务核心机制详解
本文聚焦于Qanything 2.0版本中的文件解析服务,该服务被设计为独立模块运行。相比于1.x版本将文件上传与解析合并处理(通过/api/local_doc_qa/upload_files接口同步完成),2.0版本将其解耦,提升了系统的可维护性和扩展性。
服务启动与架构
文件解析服务的启动命令如下,它运行在端口8110:
nohup python3 -u qanything_kernel/dependent_server/insert_files_serve/insert_files_server.py --port 8110 --workers 1 > /workspace/QAnything/logs/debug_logs/insert_files_server.log 2>&1 &
核心函数分析
该服务主要由两个异步函数构成:check_and_process 和 process_data,它们协同完成文件的智能解析与向量化存储。
1. check_and_process:任务调度器
服务初始化时,通过 app.listener('before_server_start') 装饰器注册了一个钩子函数setup_workers。该函数首先建立数据库连接池,随后将 check_and_process 作为一个长期运行的后台任务添加到应用中:
@app.listener('before_server_start')
async def setup_workers(app, loop):
app.ctx.pool = await aiomysql.create_pool(
**db_config,
minsize=1,
maxsize=16,
loop=loop,
autocommit=False,
init_command='SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED'
)
app.add_task(check_and_process(app.ctx.pool))
check_and_process 的核心是一个无限循环。它不断查询File表中 status='gray' 的记录(表示文件已上传但未解析),每次只取一条,然后将其交给 process_data 函数处理。
2. process_data:解析与向量化引擎
该函数负责文件内容的实际处理,执行两个关键步骤:
步骤一:实例化核心解析对象
首先为每个文件创建一个 LocalFileForInsert 实例,该类定义在 qanything_kernel/core/retriever/general_document.py 中:
local_file = LocalFileForInsert(
user_id,
kb_id,
file_id,
file_location,
file_name,
file_url,
chunk_size,
mysql_client
)
步骤二:文件切片
调用 local_file.split_file_to_docs() 方法将原始文件转换为文档片段(doc chunks)。不同文件类型采用不同的解析策略:
- 图片类型(png、jpg、jpeg):详见专题文章《Qanything 2.0 图片解析逻辑》。
- PDF类型:使用专用解析模块(详情将在后续文章介绍)。
步骤三:向量化与存储
执行 retriever.insert_documents(local_file.docs, chunk_size) 完成后续处理:
- 对文档片段进行分割(split),确保粒度符合
chunk_size参数。 - 通过BCE-Embedding模型对每个片段进行向量化。
- 将生成的向量数据存入Milvus向量数据库,以便后续的语义检索。
关键数据流
整个文件解析服务的处理流程可概括为:
- 监控File表中
status='gray'的任务 - 实例化
LocalFileForInsert对象 - 根据文件类型执行对应的解析逻辑
- 将解析后的文档切成适当大小的片段
- 向量化后写入Milvus数据库
这种异步、流式的处理方式保证了系统在高并发上传场景下的稳定性。