Langchain-Chatchat:源码调试与核心流程解析
项目结构概览
项目采用模块化设计,主要包含以下核心目录:
- configs/:存放模型与服务配置文件,如LLM和Embedding模型的参数设定。
- server/:API服务、大模型接口及对话逻辑实现,是系统的核心处理层。
- webui_pages/:前端页面组件,负责用户交互展示。
- startup.py:统一启动脚本,用于快速部署所有服务。
关键子目录说明:
- server/chat/:对话处理模块,涵盖与大模型、知识库、搜索引擎的交互逻辑。
- server/knowledge_base/:知识库管理服务,支持文档上传、向量化存储与检索。
- server/db/models/:数据库模型定义,基于SQLAlchemy构建。
- text_splitter/:多种文本分块策略,适配中文与特定格式文档。
对话请求处理链路分析
当用户在前端提交问题时,触发如下调用路径:
- HTTP POST 请求发送至
/chat/chat接口。 - 后端通过异步函数
chat()处理请求,封装为流式响应。 - 内部生成
chat_iterator()作为异步数据生成器,用于逐步返回模型输出。 - 最终由
StreamingResponse返回 Server-Sent Events 格式的实时响应。
核心处理逻辑如下:
async def chat(
query: str = Body(...),
conversation_id: str = Body(""),
history_len: int = Body(-1),
history: Union[int, List[History]] = Body([]),
stream: bool = Body(False),
model_name: str = Body(LLM_MODELS[0]),
temperature: float = Body(TEMPERATURE),
max_tokens: Optional[int] = Body(None),
prompt_name: str = Body("default")
):
async def chat_iterator() -> AsyncIterable[str]:
# 构建回调处理器,用于捕获生成过程中的每一步输出
callback = AsyncIteratorCallbackHandler()
callbacks = [callback]
# 若存在会话ID,记录当前查询到数据库
if conversation_id:
message_id = add_message_to_db(chat_type="llm_chat", query=query, conversation_id=conversation_id)
# 添加会话级回调,持久化响应内容
conversation_callback = ConversationCallbackHandler(conversation_id, message_id, "llm_chat", query)
callbacks.append(conversation_callback)
# 初始化语言模型实例
model = get_ChatOpenAI(
model_name=model_name,
temperature=temperature,
max_tokens=max_tokens,
callbacks=callbacks
)
# 构造提示模板与历史消息
if isinstance(history, list) and history:
prompt_template = get_prompt_template("llm_chat", prompt_name)
input_msg = History(role="user", content=prompt_template).to_msg_template(False)
chat_prompt = ChatPromptTemplate.from_messages([i.to_msg_template() for i in history] + [input_msg])
elif conversation_id and history_len > 0:
prompt = get_prompt_template("llm_chat", "with_history")
chat_prompt = PromptTemplate.from_template(prompt)
memory = ConversationBufferDBMemory(conversation_id=conversation_id, llm=model, message_limit=history_len)
else:
prompt_template = get_prompt_template("llm_chat", prompt_name)
input_msg = History(role="user", content=prompt_template).to_msg_template(False)
chat_prompt = ChatPromptTemplate.from_messages([input_msg])
# 创建 LLMChain 执行推理
chain = LLMChain(prompt=chat_prompt, llm=model, memory=memory)
# 启动异步任务并等待完成
task = asyncio.create_task(wrap_done(chain.acall({"input": query}), callback.done))
# 流式输出处理
if stream:
async for token in callback.aiter():
yield json.dumps({"text": token, "message_id": message_id}, ensure_ascii=False)
else:
answer = ""
async for token in callback.aiter():
answer += token
yield json.dumps({"text": answer, "message_id": message_id}, ensure_ascii=False)
await task
return StreamingResponse(chat_iterator(), media_type="text/event-stream")
数据库结构与数据持久化
系统使用 SQLite 作为默认数据库,存储路径位于 knowledge_base/info.db。其主要数据表包括:
message:聊天记录表,保存每轮对话的输入与输出。conversation:会话信息表,管理多轮对话上下文。knowledge_base与knowledge_file:分别表示知识库及其文件元数据。
示例实体类定义(server/db/models/message.py):
class MessageModel(Base):
__tablename__ = 'message'
id = Column(String(32), primary_key=True)
conversation_id = Column(String(32), index=True)
chat_type = Column(String(50))
query = Column(String(4096))
response = Column(String(4096))
meta_data = Column(JSON, default={})
feedback_score = Column(Integer, default=-1)
feedback_reason = Column(String(255), default="")
create_time = Column(DateTime, default=func.now())
可通过以下方式查看数据库内容:
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///knowledge_base/info.db')
Session = sessionmaker(bind=engine)
session = Session()
try:
tables = session.execute("SELECT name FROM sqlite_master WHERE type='table'")
for table in tables:
print(f"Table: {table[0]}")
messages = session.execute("SELECT query, response FROM message LIMIT 2")
for row in messages:
print(f"Query: {row[0]}, Response: {row[1]}")
finally:
session.close()
调试建议与优化方向
- 使用
logging模块在关键节点添加日志输出,便于追踪执行流程。 - 对
get_prompt_template和get_ChatOpenAI等函数进行断点调试,验证参数传递正确性。 - 在
AsyncIteratorCallbackHandler中插入打印语句,确认 token 生成顺序与完整性。 - 结合 Postman 进行接口测试,验证不同参数组合下的行为一致性。