Skip to content

refactor: Rag enhancement#5945

Open
kawayiYokami wants to merge 5 commits intoAstrBotDevs:masterfrom
kawayiYokami:rag-enhancement
Open

refactor: Rag enhancement#5945
kawayiYokami wants to merge 5 commits intoAstrBotDevs:masterfrom
kawayiYokami:rag-enhancement

Conversation

@kawayiYokami
Copy link
Contributor

@kawayiYokami kawayiYokami commented Mar 9, 2026

实现#5262

Modifications / 改动点

  • 重构知识库底层架构,提升稳定性和健壮性

  • 新增 kb_db_sqlite.py 知识库数据库抽象层

  • 新增 index_rebuilder.py 索引重建器

  • 新增 structure_parser.py 结构解析器

  • 优化 FAISS 向量数据库实现 (vec_db.py, document_storage.py, embedding_storage.py)

  • 完善知识库管理器和帮助类 (kb_mgr.py, kb_helper.py)

  • 新增知识库回归测试 tests/core/test_kb_architecture_refactor.py

  • 新增架构文档 docs/knowledge-base-architecture.md

  • This is NOT a breaking change. / 这不是一个破坏性变更。


Screenshots or Test Results / 运行截图或测试结果

image

单元测试和短期实机使用测试都已经进行。

但是前端还未修改,暂时还未做前端主动切换嵌入式模型时,重建向量索引的测试


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
  • 👀 我的更改经过了良好的测试,并已在上方提供了"验证步骤"和"运行截图"
  • 🤓 我确保没有引入新依赖库。
  • 😮 我的更改没有引入恶意代码。

Summary by Sourcery

优化知识库架构,以支持按提供商隔离的 FAISS 索引结构化文档索引以及在保持备份与检索流程兼容的前提下进行安全的在线索引重建

New Features:

  • 引入按提供商隔离的 FAISS 索引文件,并支持在运行时为每个知识库在不同嵌入提供商之间进行热切换。
  • 新增结构化索引模式,对文档进行层级化分段解析,分别存储各层级片段,并暴露一个工具,使 LLM 流程可以按需读取各分段正文。
  • 暴露后台索引重建任务,支持进度追踪,同时提供仪表盘 API 用于监控在切换提供商时的索引重建操作。
  • 允许通过仪表盘 API 为每个知识库配置默认索引模式,并为每次上传的文档单独配置索引模式。

Enhancements:

  • 扩展 SQLite 知识库 schema,增加“当前索引提供商”“默认索引模式”“按文档索引模式”和结构化分段表,并附带相应的迁移。
  • 加强 FAISS 向量数据库实现,引入索引切换锁、安全索引替换机制,以及额外的文档元数据(如索引标记和源哈希)。
  • 在稀疏检索、融合检索和管理层中传播检索分块元数据,使消费方能够区分结构化结果与扁平结果,并据此作出相应处理。
  • 重构会话级/全局知识库解析逻辑,并增加结构化文档检查,仅在相关时注册对应工具。
  • 更新备份导出/导入逻辑,以处理多个按提供商划分的 FAISS 索引文件以及新的知识库 schema 版本。

Documentation:

  • 新增一篇详细的知识库架构文档,说明按提供商隔离的索引、结构化索引、迁移流程以及数据流。

Tests:

  • 新增全面测试,覆盖索引重建行为、结构化解析、文档分段的 upsert/查找语义、FAISS 索引切换与加锁、结构化上传进度,以及重建任务的生命周期管理。
Original summary in English

Summary by Sourcery

Refine the knowledge base architecture to support provider-specific FAISS indexes, structured document indexing, and safe online index rebuilding while keeping backups and retrieval flows compatible.

New Features:

  • Introduce provider-isolated FAISS index files with runtime hot-switching between embedding providers per knowledge base.
  • Add a structured indexing mode that parses documents into hierarchical sections, stores them separately, and exposes a tool to read section bodies on demand from LLM flows.
  • Expose background index rebuild tasks with progress tracking and a dashboard API to monitor provider-switch rebuild operations.
  • Allow configuring a default index mode per knowledge base and per-upload document index mode via the dashboard API.

Enhancements:

  • Extend the SQLite knowledge base schema with active index provider, default index mode, per-document index mode, and structured section tables plus corresponding migrations.
  • Harden the FAISS vector DB implementation with an index-switch lock, safe index swapping, and additional document metadata such as indexing flags and source hashes.
  • Propagate retrieval chunk metadata through sparse, fusion, and manager layers so consumers can distinguish structured vs flat results and react accordingly.
  • Refactor session/global KB resolution logic and add structured-doc checks so tools are only registered when relevant.
  • Update backup export/import to handle multiple provider-specific FAISS index files and the new knowledge base schema version.

Documentation:

  • Add a detailed knowledge base architecture document describing provider-isolated indexes, structured indexing, migrations, and data flows.

Tests:

  • Add comprehensive tests covering index rebuilding behavior, structured parsing, doc section upsert/lookup semantics, FAISS index switching and locking, structured upload progress, and rebuild task lifecycle management.

@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Mar 9, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the knowledge base system by introducing advanced capabilities for managing and interacting with indexed data. It enables dynamic switching between different embedding models without requiring a full re-index, and provides a novel approach to indexing structured documents, allowing LLMs to retrieve information more efficiently and contextually. These changes aim to make the knowledge base more flexible, scalable, and intelligent in its retrieval mechanisms.

Highlights

  • Knowledge Base Architecture Refactoring: The core knowledge base architecture was refactored to enhance stability and robustness, introducing new concepts like provider-isolated FAISS indexes and structured document indexing.
  • Provider-Isolated FAISS Indexes: Implemented support for hot-swapping embedding models without full re-indexing. FAISS index files are now named with the provider ID (e.g., index.{provider_id}.faiss), allowing multiple indexes to coexist. An incremental index rebuilder (index_rebuilder.py) was added to manage this process, with progress tracking available via a new API endpoint.
  • Structured Document Indexing: Introduced a new indexing mode for documents with clear hierarchical structures (e.g., Markdown). Instead of chunking, sections are indexed by their path, and an LLM tool (ReadDocumentSectionTool) can be used to retrieve full section content on demand, reducing token usage and improving context.
  • Database Schema Migrations: New database migrations (v2 and v3) were added for the knowledge_bases and kb_documents tables to support new fields like active_index_provider_id, default_index_mode, and index_mode. The documents table in the vector database also gained is_indexed and source_hash fields.
  • New LLM Tool for Structured Documents: A READ_DOCUMENT_SECTION_TOOL was added, allowing the LLM to specifically request the full content of a structured document section using its document ID and section path. This tool is dynamically enabled when structured documents are present.
  • Enhanced FAISS Vector Database Operations: The FAISS vector database implementation was optimized with an asynchronous lock (asyncio.Lock) to ensure thread safety during index operations (insert, retrieve, delete, switch). New methods for retrieving all integer IDs and documents by IDs were added to the document storage.
  • Comprehensive Documentation and Testing: A detailed architecture document (docs/knowledge-base-architecture.md) was added to explain the new design. New regression tests (tests/core/test_kb_architecture_refactor.py) were also included to validate the refactored knowledge base components.
Changelog
  • astrbot/core/astr_main_agent.py
    • Imported READ_DOCUMENT_SECTION_TOOL for structured document access.
    • Added logic to dynamically apply READ_DOCUMENT_SECTION_TOOL if structured documents are present in the active knowledge bases.
  • astrbot/core/astr_main_agent_resources.py
    • Added ReadDocumentSectionTool class, defining its name, description, and parameters for reading structured knowledge base sections.
    • Implemented read_knowledge_base_section function to fetch section content from the knowledge base.
    • Refactored knowledge base name resolution into a new helper function _resolve_kb_names_for_session.
  • astrbot/core/backup/constants.py
    • Imported DocSection model.
    • Added doc_sections to the MODEL_MAP for backup and restore operations.
  • astrbot/core/backup/exporter.py
    • Modified _export_faiss_index to export all FAISS index files matching index*.faiss, supporting provider-specific naming.
    • Updated the kb_db schema version to v3 in the manifest generation.
  • astrbot/core/backup/importer.py
    • Modified _import_knowledge_bases to import all FAISS index files matching index*.faiss, ensuring compatibility with new naming conventions.
  • astrbot/core/db/vec_db/faiss_impl/document_storage.py
    • Added sha256 import for content hashing.
    • Introduced is_indexed and source_hash fields to the Document model.
    • Added ALTER TABLE statements in initialize to add new columns for backward compatibility.
    • Updated insert_document and insert_documents_batch to populate is_indexed and source_hash fields.
    • Added get_all_int_ids and get_documents_by_int_ids methods for retrieving document IDs and documents by their integer IDs.
  • astrbot/core/db/vec_db/faiss_impl/embedding_storage.py
    • Ensured directory existence for the FAISS index path during initialization.
    • Added get_all_ids method to retrieve all integer IDs stored in the FAISS index.
    • Implemented close method to persist the index and release in-memory resources.
    • Modified save_index to check if self.path is set before saving.
  • astrbot/core/db/vec_db/faiss_impl/vec_db.py
    • Imported asyncio for asynchronous operations.
    • Added an _index_switch_lock to protect concurrent access to the FAISS index.
    • Wrapped insert, insert_batch, retrieve, delete, and delete_documents methods with the _index_switch_lock.
    • Implemented switch_index method for hot-swapping to a new FAISS index and embedding provider.
    • Modified close method to also close the embedding_storage.
  • astrbot/core/knowledge_base/index_path.py
    • Added new file to define utility functions for normalizing provider IDs and building file-system-safe FAISS index paths.
  • astrbot/core/knowledge_base/index_rebuilder.py
    • Added new file to implement the IndexRebuilder class, which handles incremental synchronization of vector indexes for new embedding providers.
  • astrbot/core/knowledge_base/kb_db_sqlite.py
    • Imported DocSection, build_index_path, OperationalError, and ProgrammingError.
    • Added migrate_to_v2 and migrate_to_v3 methods to handle database schema updates for knowledge bases.
    • Implemented has_structured_docs to check for structured indexed documents.
    • Added upsert_doc_section and get_doc_section methods for managing structured document sections.
  • astrbot/core/knowledge_base/kb_helper.py
    • Imported numpy, DocSection, build_index_path, and StructureParser.
    • Added last_rebuild_task_id attribute for tracking index rebuild tasks.
    • Implemented get_embedding_provider_by_id, get_active_embedding_provider_id, get_active_index_path methods for managing embedding providers and index paths.
    • Added persist_kb method to save knowledge base metadata changes.
    • Modified _ensure_vec_db to use the active index path.
    • Added index_mode parameter to upload_document and introduced _upload_document_structured for handling structured document uploads.
  • astrbot/core/knowledge_base/kb_mgr.py
    • Imported asyncio, uuid, and IndexRebuilder.
    • Added attributes rebuild_tasks, kb_rebuild_task_map, _running_rebuild_tasks, and index_rebuilder for managing index rebuild processes.
    • Implemented _validate_index_mode for validating index mode strings.
    • Integrated migrate_to_v2 and migrate_to_v3 calls during initialization.
    • Added logic to automatically start index rebuilding if embedding_provider_id and active_index_provider_id differ on load.
    • Added default_index_mode parameter to create_kb and update_kb methods.
    • Implemented has_structured_docs_by_names, read_document_section, start_rebuild_index, and get_rebuild_progress methods.
    • Included metadata in retrieval results and modified _format_context to display structured document markers.
    • Added task cancellation logic to terminate for running rebuild tasks.
  • astrbot/core/knowledge_base/models.py
    • Added active_index_provider_id and default_index_mode fields to the KnowledgeBase model.
    • Added index_mode field to the KBDocument model.
    • Introduced a new DocSection model for storing structured document section information.
  • astrbot/core/knowledge_base/retrieval/manager.py
    • Included metadata in the RetrievalResult dictionary.
  • astrbot/core/knowledge_base/retrieval/rank_fusion.py
    • Added metadata field to the FusedResult class.
    • Updated the fuse method to pass metadata along with fused results.
  • astrbot/core/knowledge_base/retrieval/sparse_retriever.py
    • Added metadata field to the SparseResult class.
    • Updated the retrieve method to include metadata in sparse retrieval results.
  • astrbot/dashboard/routes/knowledge_base.py
    • Added a new GET route /kb/rebuild/progress for querying index rebuild status.
    • Added index_mode parameter to _background_upload_task and upload_document methods.
    • Included default_index_mode parameter in create_kb and update_kb API calls.
    • Modified update_kb response to optionally include rebuild_task_id.
    • Implemented get_rebuild_progress method to expose index rebuilding status via the dashboard API.
  • astrbot/dashboard/utils.py
    • Modified generate_tsne_visualization to use kb_helper.get_active_index_path() for locating the active FAISS index.
  • docs/knowledge-base-architecture.md
    • Added a new documentation file detailing the design and implementation of the knowledge base architecture enhancements.
  • tests/core/test_kb_architecture_refactor.py
    • Added a new test file containing unit tests for the refactored knowledge base architecture, including StructureParser, IndexRebuilder, KBSQLiteDatabase section handling, FaissVecDB index switching, and KBHelper structured uploads.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@kawayiYokami kawayiYokami changed the title Rag enhancement refactor: Rag enhancement Mar 9, 2026
@dosubot dosubot bot added the feature:knowledge-base The bug / feature is about knowledge base label Mar 9, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 4 个问题,并给出了一些高层面的反馈:

  • IndexRebuilder.sync 中,你构建了一个新的 EmbeddingStorage 并对其进行修改(删除/插入),但在 FaissVecDB.switch_index 针对同一路径创建第二个 EmbeddingStorage 实例之前,从未对这个实例调用 save_index/close,因此重建后的索引实际上从未持久化到磁盘;建议要么在 switch_index 中复用同一个 storage,要么在切换之前显式地保存/关闭这个临时 storage。
  • 在结构化上传路径 KBHelper._upload_document_structured 中,你在循环内调用 upsert_doc_section,每次调用都会开启一个新的事务;如果能在单个 session.begin() 块中批量插入或 upsert 所有 section,可以显著加快速度并减少数据库开销。
  • IndexRebuilder.sync 中,你在 Python 循环里对每个 batch 调用一次 provider.get_embeddings,而不是使用已有的 get_embeddings_batch API,这会限制大规模重建时的吞吐量;改为使用批量接口会与其它向量代码保持一致,并提升重建性能。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `IndexRebuilder.sync` you build a new `EmbeddingStorage` and mutate it (delete/insert) but never call `save_index`/`close` on that instance before `FaissVecDB.switch_index` creates a *second* `EmbeddingStorage` on the same path, so the rebuilt index is never persisted to disk; consider either reusing the same storage in `switch_index` or explicitly saving/closing the temporary storage before switching.
- The structured upload path in `KBHelper._upload_document_structured` calls `upsert_doc_section` in a loop, each opening its own transaction; this could be significantly faster and reduce DB overhead if you batch-insert or upsert all sections in a single `session.begin()` block.
- In `IndexRebuilder.sync` you call `provider.get_embeddings` in a Python loop per batch instead of using the existing `get_embeddings_batch` API, which will limit throughput on large rebuilds; switching to the batched interface would align with the rest of the vector code and improve rebuild performance.

## Individual Comments

### Comment 1
<location path="astrbot/core/knowledge_base/kb_db_sqlite.py" line_range="177-186" />
<code_context>
     ) -> None:
         """导出 FAISS 索引文件"""
         try:
-            index_path = kb_helper.kb_dir / "index.faiss"
-            if index_path.exists():
</code_context>
<issue_to_address>
**suggestion (bug_risk):** v3 迁移会吞掉所有异常,这可能会隐藏真实的迁移失败。

`migrate_to_v3` 将每个 `ALTER`/`UPDATE` 都包在 `except Exception: pass` 里,这会屏蔽所有失败(包括语法、权限或数据问题),并可能在没有任何信号的情况下留下部分应用的迁移。

建议仅显式处理预期的幂等场景(例如针对重复/缺失列的特定数据库错误,类似 `migrate_to_v2` 中的做法),对其他异常进行日志记录并重新抛出,以便运维人员可以看到失败的迁移。
</issue_to_address>

### Comment 2
<location path="astrbot/core/knowledge_base/kb_mgr.py" line_range="38" />
<code_context>
         self._session_deleted_callback_registered = False

         self.kb_insts: dict[str, KBHelper] = {}
+        self.rebuild_tasks: dict[str, dict] = {}
+        self.kb_rebuild_task_map: dict[str, str] = {}
+        self._running_rebuild_tasks: dict[str, asyncio.Task] = {}
</code_context>
<issue_to_address>
**issue (complexity):** 建议引入一个有类型的 RebuildTask dataclass 和集中注册表,来替代索引重建流程中临时的 dict 和闭包,同时保持对外行为不变。

通过引入一个小而有类型的抽象,而不是依赖临时 dict 和嵌套闭包,你可以在保持现有行为的前提下显著降低重建流程的复杂度。

### 1. 使用 `@dataclass` 替换弱类型的 dict

当前:

- `rebuild_tasks: dict[str, dict]`
- `kb_rebuild_task_map: dict[str, str]`
- `_running_rebuild_tasks: dict[str, asyncio.Task]`

都在用带有“魔法 key”(`"status"``"stage"` 等)的 dict 来跟踪同一个概念的不同方面。你可以将这些状态集中到一个 dataclass 中,并用显式字段来表达:

```python
# somewhere near top-level
from dataclasses import dataclass, field
from typing import Optional, Literal

RebuildStatus = Literal["processing", "completed", "failed"]
RebuildStage = Literal["prepare", "finished"]

@dataclass
class RebuildTask:
    task_id: str
    kb_id: str
    provider_id: str
    status: RebuildStatus = "processing"
    stage: RebuildStage = "prepare"
    current: int = 0
    total: int = 0
    error: Optional[str] = None
    task: Optional[asyncio.Task] = field(default=None, repr=False)
```

然后,在 `__init__` 中,用一个以 `task_id` 为 key 的注册表,再配上一个薄的 kb→task 映射:

```python
self._rebuild_tasks: dict[str, RebuildTask] = {}
self._kb_to_task_id: dict[str, str] = {}
```

### 2. 简化 `start_rebuild_index` 和进度更新

重构 `start_rebuild_index` 使其基于 `RebuildTask` 而不是自由形式的 dict,同时避免依赖捕获外层状态的嵌套闭包:

```python
async def start_rebuild_index(
    self,
    kb_id: str,
    new_provider_id: str,
    batch_size: int = 32,
) -> str:
    kb_helper = await self.get_kb(kb_id)
    if not kb_helper:
        raise ValueError("知识库不存在")

    existing_task_id = self._kb_to_task_id.get(kb_id)
    if existing_task_id:
        existing = self._rebuild_tasks.get(existing_task_id)
        if existing and existing.status == "processing":
            return existing_task_id

    task_id = str(uuid.uuid4())
    kb_helper.last_rebuild_task_id = task_id

    rebuild_task = RebuildTask(
        task_id=task_id,
        kb_id=kb_id,
        provider_id=new_provider_id,
    )
    self._kb_to_task_id[kb_id] = task_id
    self._rebuild_tasks[task_id] = rebuild_task

    async def progress(stage: str, current: int, total: int) -> None:
        t = self._rebuild_tasks.get(task_id)
        if not t:
            return
        t.stage = stage  # type: ignore[assignment]
        t.current = current
        t.total = total

    async def runner() -> None:
        try:
            await self.index_rebuilder.sync(
                kb_helper=kb_helper,
                new_provider_id=new_provider_id,
                batch_size=batch_size,
                progress_callback=progress,
            )
            t = self._rebuild_tasks.get(task_id)
            if t:
                t.status = "completed"
                t.stage = "finished"
        except Exception as e:
            logger.error(
                "Index rebuild failed for kb %s provider %s: %s",
                kb_id,
                new_provider_id,
                e,
            )
            logger.error(traceback.format_exc())
            t = self._rebuild_tasks.get(task_id)
            if t:
                t.status = "failed"
                t.error = str(e)
        finally:
            task = rebuild_task.task
            if task and not task.done():
                task.cancel()
            self._running_rebuild_tasks.pop(task_id, None)

    rebuild_task.task = asyncio.create_task(runner())
    self._running_rebuild_tasks[task_id] = rebuild_task.task
    return task_id
```

这样可以在保持相同行为的同时:

- 移除非结构化 dict 和魔法字符串。
- 将一次重建的所有状态聚合到一个对象中。
- 让进度回调逻辑更清晰、更安全。

### 3. 简化 `get_rebuild_progress`

有了 dataclass 之后,`get_rebuild_progress` 就不需要直接操作 dict 了:

```python
def get_rebuild_progress(
    self,
    kb_id: str | None = None,
    task_id: str | None = None,
) -> dict | None:
    target_task_id = task_id or (kb_id and self._kb_to_task_id.get(kb_id))
    if not target_task_id:
        return None

    t = self._rebuild_tasks.get(target_task_id)
    if not t:
        return None

    # 保持对外 API 不变:返回一个 dict
    return {
        "task_id": t.task_id,
        "kb_id": t.kb_id,
        "provider_id": t.provider_id,
        "status": t.status,
        "stage": t.stage,
        "current": t.current,
        "total": t.total,
        "error": t.error,
    }
```

这样:

- 内部模型是强类型且集中管理的。
- 外部 API 完全保持不变(仍然返回 dict)。

### 4. 清理与重建任务相关的 `terminate`

`terminate` 可以基于有类型的任务来取消:

```python
async def terminate(self) -> None:
    for t in list(self._rebuild_tasks.values()):
        if t.task and not t.task.done():
            t.task.cancel()
    self._running_rebuild_tasks.clear()

    # existing kb cleanup remains...
```

这些聚焦的改动在保留全部功能(包括基于 dict 的公开进度接口和自动重建行为)的同时,统一了作业状态,降低了管理重建索引的认知负担。
</issue_to_address>

### Comment 3
<location path="astrbot/core/db/vec_db/faiss_impl/vec_db.py" line_range="222" />
<code_context>
+                metadata_filters=metadata_filters,
+            )
+
+    async def switch_index(
+        self,
+        index_store_path: str,
</code_context>
<issue_to_address>
**issue (complexity):** 建议在 `switch_index` 中收紧 `_index_switch_lock` 的临界区,使其只保护索引相关字段的原子交换,而不是整个 storage 创建和销毁的过程。

通过收紧 `switch_index` 中的临界区,在不改变行为的前提下,你可以同时降低复杂度和锁竞争。

当前 `switch_index``_index_switch_lock` 下执行*所有*操作:创建新的 storage(可能很慢)以及关闭旧的 storage。这正是审阅者指出的问题:锁在长时间 I/O 过程中一直被持有,从而串行化了不相关的操作,也让这个锁的用途更难理解。

你可以保持语义不变,但只在*字段交换*时持有锁;在获取锁之前创建新 storage,在释放锁之后关闭旧 storage:

```python
async def switch_index(
    self,
    index_store_path: str,
    embedding_provider: EmbeddingProvider,
    rerank_provider: RerankProvider | None = None,
) -> None:
    """Hot-switch to a new FAISS index and embedding provider."""
    # 1. 在加锁前创建新的 storage。
    try:
        new_storage = EmbeddingStorage(
            embedding_provider.get_dim(),
            index_store_path,
        )
    except Exception:
        # 如果创建失败,保持旧 storage 不变。
        raise

    # 2. 只在引用的原子交换期间持有锁。
    async with self._index_switch_lock:
        old_storage = self.embedding_storage

        self.index_store_path = index_store_path
        self.embedding_provider = embedding_provider
        self.rerank_provider = rerank_provider
        self.embedding_storage = new_storage

    # 3. 在释放锁之后关闭旧的 storage。
    if old_storage:
        await old_storage.close()
```

这样可以:

- 保持所有其他方法在索引切换期间仍然与切换操作串行(行为无变化)。
-`embedding_storage`/`embedding_provider` 的交换原子且易于理解。

但同时去掉了不必要的锁竞争,降低了概念复杂度:`_index_switch_lock` 现在只明确保护“当前使用哪个索引”的那些指针,而不是整个 storage 生命周期的创建和销毁。
</issue_to_address>

### Comment 4
<location path="astrbot/core/knowledge_base/kb_helper.py" line_range="387" />
<code_context>

             raise e

+    async def _upload_document_structured(
+        self,
+        file_name: str,
</code_context>
<issue_to_address>
**issue (complexity):** 建议将共享的向量化和索引工作流抽取为一个专用 helper,使“平铺上传”和“结构化上传”都复用同一套逻辑,而不是各自重复实现。

你可以通过抽取一个小的“embed + 持久化 + 统计/刷新”流水线 helper,让平铺和结构化两种路径都调用它,从而减少分支和重复。

目前,`upload_document``_upload_document_structured` 都会:

- 生成 `doc_id`
- 构建 `chunk_ids``contents``metadatas`
- 调用 `get_ep()` 并带进度地批量获取 embeddings
- 写入 `vec_db.document_storage``vec_db.embedding_storage`
- 创建并持久化 `KBDocument`
- 更新知识库统计并刷新 KB/文档

只有“如何构建 `contents` + `metadatas` + sections” 的逻辑不同。你可以把公共的尾部逻辑封装到一个 helper 中,让模式相关的逻辑保持精简。

例如:

```python
async def _index_chunks(
    self,
    *,
    doc_id: str,
    contents: list[str],
    metadatas: list[dict],
    index_mode: str,
    file_name: str,
    file_type: str,
    file_size: int,
    batch_size: int,
    tasks_limit: int,
    max_retries: int,
    progress_callback=None,
) -> KBDocument:
    await self._ensure_vec_db()
    ep = await self.get_ep()

    async def embedding_progress(current: int, total: int) -> None:
        if progress_callback:
            await progress_callback("embedding", current, total)

    vectors = await ep.get_embeddings_batch(
        contents,
        batch_size=batch_size,
        tasks_limit=tasks_limit,
        max_retries=max_retries,
        progress_callback=embedding_progress if progress_callback else None,
    )

    chunk_ids = [str(uuid.uuid4()) for _ in contents]
    int_ids = await self.vec_db.document_storage.insert_documents_batch(
        chunk_ids, contents, metadatas,
    )
    await self.vec_db.embedding_storage.insert_batch(
        vectors=np.array(vectors, dtype=np.float32),
        ids=int_ids,
    )

    kb_doc = KBDocument(
        doc_id=doc_id,
        kb_id=self.kb.kb_id,
        doc_name=file_name,
        file_type=file_type,
        file_size=file_size,
        file_path="",
        index_mode=index_mode,
        chunk_count=len(contents),
        media_count=0,
    )
    async with self.kb_db.get_db() as session:
        async with session.begin():
            session.add(kb_doc)
        await session.refresh(kb_doc)

    await self.kb_db.update_kb_stats(kb_id=self.kb.kb_id, vec_db=self.vec_db)  # type: ignore[arg-type]
    await self.refresh_kb()
    await self.refresh_document(doc_id)
    return kb_doc
```

然后,两种模式主要只负责准备输入:

**结构化模式:**

```python
async def _upload_document_structured(...):
    if file_content is None:
        raise ValueError("结构化上传需要 file_content")

    doc_id = str(uuid.uuid4())
    parser = await select_parser(f".{file_type}")
    if progress_callback:
        await progress_callback("parsing", 0, 100)
    parse_result = await parser.parse(file_content, file_name)
    if progress_callback:
        await progress_callback("parsing", 100, 100)

    structure_parser = StructureParser()
    if progress_callback:
        await progress_callback("chunking", 0, 100)
    nodes = await structure_parser.parse_structure(parse_result.text, file_type)
    sections = structure_parser.flatten(nodes)
    if not sections:
        logger.warning("Structured parse failed, fallback to flat mode for %s", file_name)
        return await self.upload_document(
            file_name=file_name,
            file_content=file_content,
            file_type=file_type,
            index_mode="flat",
            batch_size=batch_size,
            tasks_limit=tasks_limit,
            max_retries=max_retries,
            progress_callback=progress_callback,
        )

    contents = [section.path for section in sections]
    metadatas = [
        {
            "kb_id": self.kb.kb_id,
            "kb_doc_id": doc_id,
            "chunk_index": i,
            "index_mode": "structure",
            "section_path": section.path,
            "section_level": section.level,
        }
        for i, section in enumerate(sections)
    ]
    if progress_callback:
        await progress_callback("chunking", 100, 100)

    kb_doc = await self._index_chunks(
        doc_id=doc_id,
        contents=contents,
        metadatas=metadatas,
        index_mode="structure",
        file_name=file_name,
        file_type=file_type,
        file_size=len(file_content),
        batch_size=batch_size,
        tasks_limit=tasks_limit,
        max_retries=max_retries,
        progress_callback=progress_callback,
    )

    for i, section in enumerate(sections):
        await self.kb_db.upsert_doc_section(
            DocSection(
                doc_id=doc_id,
                kb_id=self.kb.kb_id,
                section_path=section.path,
                section_level=section.level,
                section_title=section.title,
                section_body=section.body,
                parent_section_id=None,
                sort_order=i,
            ),
        )
    return kb_doc
```

**平铺模式:**`upload_document` 中,当你已经构建好 `chunks_text` 和对应的平铺 metadatas 后:

```python
contents = chunks_text
metadatas = [
    {
        "kb_id": self.kb.kb_id,
        "kb_doc_id": doc_id,
        "chunk_index": i,
        "index_mode": "flat",
    }
    for i, _ in enumerate(chunks_text)
]

kb_doc = await self._index_chunks(
    doc_id=doc_id,
    contents=contents,
    metadatas=metadatas,
    index_mode="flat",
    file_name=file_name,
    file_type=file_type,
    file_size=file_size,
    batch_size=batch_size,
    tasks_limit=tasks_limit,
    max_retries=max_retries,
    progress_callback=progress_callback,
)
```

这样可以在保留当前所有行为(`index_mode`、结构化 section、进度上报)的同时,将关键的“索引流水线”集中在一个地方。未来如果要修改(例如切换向量数据库、调整 metadata 结构、变更统计/刷新行为),只需要改 `_index_chunks`,就能降低复杂度并减少平铺和结构化路径之间出现细微偏差的风险。
</issue_to_address>

Sourcery 对开源项目免费使用 —— 如果你觉得这次 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请对每条评论点 👍 或 👎,我会根据你的反馈持续改进 Review 质量。
Original comment in English

Hey - I've found 4 issues, and left some high level feedback:

  • In IndexRebuilder.sync you build a new EmbeddingStorage and mutate it (delete/insert) but never call save_index/close on that instance before FaissVecDB.switch_index creates a second EmbeddingStorage on the same path, so the rebuilt index is never persisted to disk; consider either reusing the same storage in switch_index or explicitly saving/closing the temporary storage before switching.
  • The structured upload path in KBHelper._upload_document_structured calls upsert_doc_section in a loop, each opening its own transaction; this could be significantly faster and reduce DB overhead if you batch-insert or upsert all sections in a single session.begin() block.
  • In IndexRebuilder.sync you call provider.get_embeddings in a Python loop per batch instead of using the existing get_embeddings_batch API, which will limit throughput on large rebuilds; switching to the batched interface would align with the rest of the vector code and improve rebuild performance.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `IndexRebuilder.sync` you build a new `EmbeddingStorage` and mutate it (delete/insert) but never call `save_index`/`close` on that instance before `FaissVecDB.switch_index` creates a *second* `EmbeddingStorage` on the same path, so the rebuilt index is never persisted to disk; consider either reusing the same storage in `switch_index` or explicitly saving/closing the temporary storage before switching.
- The structured upload path in `KBHelper._upload_document_structured` calls `upsert_doc_section` in a loop, each opening its own transaction; this could be significantly faster and reduce DB overhead if you batch-insert or upsert all sections in a single `session.begin()` block.
- In `IndexRebuilder.sync` you call `provider.get_embeddings` in a Python loop per batch instead of using the existing `get_embeddings_batch` API, which will limit throughput on large rebuilds; switching to the batched interface would align with the rest of the vector code and improve rebuild performance.

## Individual Comments

### Comment 1
<location path="astrbot/core/knowledge_base/kb_db_sqlite.py" line_range="177-186" />
<code_context>
     ) -> None:
         """导出 FAISS 索引文件"""
         try:
-            index_path = kb_helper.kb_dir / "index.faiss"
-            if index_path.exists():
</code_context>
<issue_to_address>
**suggestion (bug_risk):** v3 migration swallows all exceptions, which can hide real migration failures.

`migrate_to_v3` wraps each `ALTER`/`UPDATE` in `except Exception: pass`, which suppresses all failures (including syntax, permissions, or data issues) and can leave migrations partially applied with no signal.

Instead, only handle the expected idempotency cases explicitly (e.g. specific DB errors for duplicate/missing columns, like in `migrate_to_v2`), and log + re-raise any other exceptions so failed migrations are visible to operators.
</issue_to_address>

### Comment 2
<location path="astrbot/core/knowledge_base/kb_mgr.py" line_range="38" />
<code_context>
         self._session_deleted_callback_registered = False

         self.kb_insts: dict[str, KBHelper] = {}
+        self.rebuild_tasks: dict[str, dict] = {}
+        self.kb_rebuild_task_map: dict[str, str] = {}
+        self._running_rebuild_tasks: dict[str, asyncio.Task] = {}
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing a typed RebuildTask dataclass and central registries to replace ad‑hoc dicts and closures in the index rebuild flow while keeping the external behavior unchanged.

You can significantly reduce complexity in the rebuild flow by introducing a small typed abstraction instead of ad‑hoc dicts and nested closures, while keeping all behavior intact.

### 1. Replace loosely‑typed dicts with a `@dataclass`

Right now:

- `rebuild_tasks: dict[str, dict]`
- `kb_rebuild_task_map: dict[str, str]`
- `_running_rebuild_tasks: dict[str, asyncio.Task]`

all track aspects of the same concept using dicts with magic keys (`"status"`, `"stage"`, etc.). You can centralize this into a dataclass with explicit fields:

```python
# somewhere near top-level
from dataclasses import dataclass, field
from typing import Optional, Literal

RebuildStatus = Literal["processing", "completed", "failed"]
RebuildStage = Literal["prepare", "finished"]

@dataclass
class RebuildTask:
    task_id: str
    kb_id: str
    provider_id: str
    status: RebuildStatus = "processing"
    stage: RebuildStage = "prepare"
    current: int = 0
    total: int = 0
    error: Optional[str] = None
    task: Optional[asyncio.Task] = field(default=None, repr=False)
```

Then, in `__init__`, use a single registry keyed by `task_id` plus a thin kb→task mapping:

```python
self._rebuild_tasks: dict[str, RebuildTask] = {}
self._kb_to_task_id: dict[str, str] = {}
```

### 2. Simplify `start_rebuild_index` and progress updates

Refactor `start_rebuild_index` to work with `RebuildTask` instead of free‑form dicts and to avoid nested closures capturing outer state:

```python
async def start_rebuild_index(
    self,
    kb_id: str,
    new_provider_id: str,
    batch_size: int = 32,
) -> str:
    kb_helper = await self.get_kb(kb_id)
    if not kb_helper:
        raise ValueError("知识库不存在")

    existing_task_id = self._kb_to_task_id.get(kb_id)
    if existing_task_id:
        existing = self._rebuild_tasks.get(existing_task_id)
        if existing and existing.status == "processing":
            return existing_task_id

    task_id = str(uuid.uuid4())
    kb_helper.last_rebuild_task_id = task_id

    rebuild_task = RebuildTask(
        task_id=task_id,
        kb_id=kb_id,
        provider_id=new_provider_id,
    )
    self._kb_to_task_id[kb_id] = task_id
    self._rebuild_tasks[task_id] = rebuild_task

    async def progress(stage: str, current: int, total: int) -> None:
        t = self._rebuild_tasks.get(task_id)
        if not t:
            return
        t.stage = stage  # type: ignore[assignment]
        t.current = current
        t.total = total

    async def runner() -> None:
        try:
            await self.index_rebuilder.sync(
                kb_helper=kb_helper,
                new_provider_id=new_provider_id,
                batch_size=batch_size,
                progress_callback=progress,
            )
            t = self._rebuild_tasks.get(task_id)
            if t:
                t.status = "completed"
                t.stage = "finished"
        except Exception as e:
            logger.error(
                "Index rebuild failed for kb %s provider %s: %s",
                kb_id,
                new_provider_id,
                e,
            )
            logger.error(traceback.format_exc())
            t = self._rebuild_tasks.get(task_id)
            if t:
                t.status = "failed"
                t.error = str(e)
        finally:
            task = rebuild_task.task
            if task and not task.done():
                task.cancel()
            self._running_rebuild_tasks.pop(task_id, None)

    rebuild_task.task = asyncio.create_task(runner())
    self._running_rebuild_tasks[task_id] = rebuild_task.task
    return task_id
```

This keeps the same behavior but:

- Removes unstructured dicts and magic strings.
- Groups all state about a rebuild into one object.
- Makes the progress callback logic clearer and safer.

### 3. Simplify `get_rebuild_progress`

With the dataclass, `get_rebuild_progress` no longer needs to manipulate dicts directly:

```python
def get_rebuild_progress(
    self,
    kb_id: str | None = None,
    task_id: str | None = None,
) -> dict | None:
    target_task_id = task_id or (kb_id and self._kb_to_task_id.get(kb_id))
    if not target_task_id:
        return None

    t = self._rebuild_tasks.get(target_task_id)
    if not t:
        return None

    # Keep external API unchanged: return a dict
    return {
        "task_id": t.task_id,
        "kb_id": t.kb_id,
        "provider_id": t.provider_id,
        "status": t.status,
        "stage": t.stage,
        "current": t.current,
        "total": t.total,
        "error": t.error,
    }
```

This way:

- The internal model is strongly typed and centralized.
- The external API remains exactly as it was (still returns a dict).

### 4. Clean up `terminate` for rebuild tasks

`terminate` can cancel using the typed tasks:

```python
async def terminate(self) -> None:
    for t in list(self._rebuild_tasks.values()):
        if t.task and not t.task.done():
            t.task.cancel()
    self._running_rebuild_tasks.clear()

    # existing kb cleanup remains...
```

These focused changes keep all functionality (including the public dict-based progress interface and auto‑rebuild behavior) while consolidating job state and reducing the cognitive load around rebuild index management.
</issue_to_address>

### Comment 3
<location path="astrbot/core/db/vec_db/faiss_impl/vec_db.py" line_range="222" />
<code_context>
+                metadata_filters=metadata_filters,
+            )
+
+    async def switch_index(
+        self,
+        index_store_path: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider tightening the `_index_switch_lock` critical section in `switch_index` so it only guards the atomic swap of index-related fields instead of the whole storage creation and teardown process.

You can reduce both complexity and contention without changing behavior by tightening the critical section in `switch_index`.

Currently, `switch_index` does *everything* under `_index_switch_lock`: creating the new storage (which may be slow) and closing the old one. That’s exactly what the reviewer is pointing at: the lock is held across long-running I/O, serializing unrelated operations and making the lock’s purpose harder to reason about.

You can keep the same semantics but only hold the lock around the *field swap*; new storage is created before acquiring the lock, and old storage is closed after releasing it:

```python
async def switch_index(
    self,
    index_store_path: str,
    embedding_provider: EmbeddingProvider,
    rerank_provider: RerankProvider | None = None,
) -> None:
    """Hot-switch to a new FAISS index and embedding provider."""
    # 1. Create new storage *before* taking the lock.
    try:
        new_storage = EmbeddingStorage(
            embedding_provider.get_dim(),
            index_store_path,
        )
    except Exception:
        # If creation fails, keep old storage intact.
        raise

    # 2. Only keep the lock for the atomic swap of references.
    async with self._index_switch_lock:
        old_storage = self.embedding_storage

        self.index_store_path = index_store_path
        self.embedding_provider = embedding_provider
        self.rerank_provider = rerank_provider
        self.embedding_storage = new_storage

    # 3. Close the old storage after releasing the lock.
    if old_storage:
        await old_storage.close()
```

This keeps:

- All other methods still serialized vs. index switches (no functional change).
- The swap of `embedding_storage`/`embedding_provider` atomic and easy to reason about.

But it removes unnecessary lock contention and reduces conceptual complexity: `_index_switch_lock` now clearly protects just the “which index are we using?” pointers, not the entire lifecycle of storage creation and teardown.
</issue_to_address>

### Comment 4
<location path="astrbot/core/knowledge_base/kb_helper.py" line_range="387" />
<code_context>

             raise e

+    async def _upload_document_structured(
+        self,
+        file_name: str,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared embedding and indexing workflow into a dedicated helper so both flat and structured upload paths reuse the same logic instead of duplicating it.

You can reduce the branching/duplication by extracting the common “embed + persist + stats/refresh” pipeline into a small helper that both flat and structured paths call.

Right now, both `upload_document` and `_upload_document_structured`:

- Generate `doc_id`
- Build `chunk_ids`, `contents`, `metadatas`
- Call `get_ep()` and do batched embeddings with progress
- Write to `vec_db.document_storage` and `vec_db.embedding_storage`
- Create and persist a `KBDocument`
- Update KB stats and refresh KB/doc

Only the “how to build `contents` + `metadatas` + sections” differs. You can encapsulate the common tail in a helper and keep the mode-specific logic small.

For example:

```python
async def _index_chunks(
    self,
    *,
    doc_id: str,
    contents: list[str],
    metadatas: list[dict],
    index_mode: str,
    file_name: str,
    file_type: str,
    file_size: int,
    batch_size: int,
    tasks_limit: int,
    max_retries: int,
    progress_callback=None,
) -> KBDocument:
    await self._ensure_vec_db()
    ep = await self.get_ep()

    async def embedding_progress(current: int, total: int) -> None:
        if progress_callback:
            await progress_callback("embedding", current, total)

    vectors = await ep.get_embeddings_batch(
        contents,
        batch_size=batch_size,
        tasks_limit=tasks_limit,
        max_retries=max_retries,
        progress_callback=embedding_progress if progress_callback else None,
    )

    chunk_ids = [str(uuid.uuid4()) for _ in contents]
    int_ids = await self.vec_db.document_storage.insert_documents_batch(
        chunk_ids, contents, metadatas,
    )
    await self.vec_db.embedding_storage.insert_batch(
        vectors=np.array(vectors, dtype=np.float32),
        ids=int_ids,
    )

    kb_doc = KBDocument(
        doc_id=doc_id,
        kb_id=self.kb.kb_id,
        doc_name=file_name,
        file_type=file_type,
        file_size=file_size,
        file_path="",
        index_mode=index_mode,
        chunk_count=len(contents),
        media_count=0,
    )
    async with self.kb_db.get_db() as session:
        async with session.begin():
            session.add(kb_doc)
        await session.refresh(kb_doc)

    await self.kb_db.update_kb_stats(kb_id=self.kb.kb_id, vec_db=self.vec_db)  # type: ignore[arg-type]
    await self.refresh_kb()
    await self.refresh_document(doc_id)
    return kb_doc
```

Then the two modes become mostly about preparing inputs:

**Structured:**

```python
async def _upload_document_structured(...):
    if file_content is None:
        raise ValueError("结构化上传需要 file_content")

    doc_id = str(uuid.uuid4())
    parser = await select_parser(f".{file_type}")
    if progress_callback:
        await progress_callback("parsing", 0, 100)
    parse_result = await parser.parse(file_content, file_name)
    if progress_callback:
        await progress_callback("parsing", 100, 100)

    structure_parser = StructureParser()
    if progress_callback:
        await progress_callback("chunking", 0, 100)
    nodes = await structure_parser.parse_structure(parse_result.text, file_type)
    sections = structure_parser.flatten(nodes)
    if not sections:
        logger.warning("Structured parse failed, fallback to flat mode for %s", file_name)
        return await self.upload_document(
            file_name=file_name,
            file_content=file_content,
            file_type=file_type,
            index_mode="flat",
            batch_size=batch_size,
            tasks_limit=tasks_limit,
            max_retries=max_retries,
            progress_callback=progress_callback,
        )

    contents = [section.path for section in sections]
    metadatas = [
        {
            "kb_id": self.kb.kb_id,
            "kb_doc_id": doc_id,
            "chunk_index": i,
            "index_mode": "structure",
            "section_path": section.path,
            "section_level": section.level,
        }
        for i, section in enumerate(sections)
    ]
    if progress_callback:
        await progress_callback("chunking", 100, 100)

    kb_doc = await self._index_chunks(
        doc_id=doc_id,
        contents=contents,
        metadatas=metadatas,
        index_mode="structure",
        file_name=file_name,
        file_type=file_type,
        file_size=len(file_content),
        batch_size=batch_size,
        tasks_limit=tasks_limit,
        max_retries=max_retries,
        progress_callback=progress_callback,
    )

    for i, section in enumerate(sections):
        await self.kb_db.upsert_doc_section(
            DocSection(
                doc_id=doc_id,
                kb_id=self.kb.kb_id,
                section_path=section.path,
                section_level=section.level,
                section_title=section.title,
                section_body=section.body,
                parent_section_id=None,
                sort_order=i,
            ),
        )
    return kb_doc
```

**Flat:**

Inside `upload_document`, after you’ve built `chunks_text` and flat metadatas:

```python
contents = chunks_text
metadatas = [
    {
        "kb_id": self.kb.kb_id,
        "kb_doc_id": doc_id,
        "chunk_index": i,
        "index_mode": "flat",
    }
    for i, _ in enumerate(chunks_text)
]

kb_doc = await self._index_chunks(
    doc_id=doc_id,
    contents=contents,
    metadatas=metadatas,
    index_mode="flat",
    file_name=file_name,
    file_type=file_type,
    file_size=file_size,
    batch_size=batch_size,
    tasks_limit=tasks_limit,
    max_retries=max_retries,
    progress_callback=progress_callback,
)
```

This keeps all current behavior (`index_mode`, structured sections, progress reporting) but concentrates the critical “indexing pipeline” into one place. Future changes (e.g., switching vector DB, changing metadata shape, altering stats/refresh behavior) will only need to be done in `_index_chunks`, reducing complexity and the chance of subtle drift between flat and structured paths.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

self._session_deleted_callback_registered = False

self.kb_insts: dict[str, KBHelper] = {}
self.rebuild_tasks: dict[str, dict] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议引入一个有类型的 RebuildTask dataclass 和集中注册表,来替代索引重建流程中临时的 dict 和闭包,同时保持对外行为不变。

通过引入一个小而有类型的抽象,而不是依赖临时 dict 和嵌套闭包,你可以在保持现有行为的前提下显著降低重建流程的复杂度。

1. 使用 @dataclass 替换弱类型的 dict

当前:

  • rebuild_tasks: dict[str, dict]
  • kb_rebuild_task_map: dict[str, str]
  • _running_rebuild_tasks: dict[str, asyncio.Task]

都在用带有“魔法 key”("status""stage" 等)的 dict 来跟踪同一个概念的不同方面。你可以将这些状态集中到一个 dataclass 中,并用显式字段来表达:

# somewhere near top-level
from dataclasses import dataclass, field
from typing import Optional, Literal

RebuildStatus = Literal["processing", "completed", "failed"]
RebuildStage = Literal["prepare", "finished"]

@dataclass
class RebuildTask:
    task_id: str
    kb_id: str
    provider_id: str
    status: RebuildStatus = "processing"
    stage: RebuildStage = "prepare"
    current: int = 0
    total: int = 0
    error: Optional[str] = None
    task: Optional[asyncio.Task] = field(default=None, repr=False)

然后,在 __init__ 中,用一个以 task_id 为 key 的注册表,再配上一个薄的 kb→task 映射:

self._rebuild_tasks: dict[str, RebuildTask] = {}
self._kb_to_task_id: dict[str, str] = {}

2. 简化 start_rebuild_index 和进度更新

重构 start_rebuild_index 使其基于 RebuildTask 而不是自由形式的 dict,同时避免依赖捕获外层状态的嵌套闭包:

async def start_rebuild_index(
    self,
    kb_id: str,
    new_provider_id: str,
    batch_size: int = 32,
) -> str:
    kb_helper = await self.get_kb(kb_id)
    if not kb_helper:
        raise ValueError("知识库不存在")

    existing_task_id = self._kb_to_task_id.get(kb_id)
    if existing_task_id:
        existing = self._rebuild_tasks.get(existing_task_id)
        if existing and existing.status == "processing":
            return existing_task_id

    task_id = str(uuid.uuid4())
    kb_helper.last_rebuild_task_id = task_id

    rebuild_task = RebuildTask(
        task_id=task_id,
        kb_id=kb_id,
        provider_id=new_provider_id,
    )
    self._kb_to_task_id[kb_id] = task_id
    self._rebuild_tasks[task_id] = rebuild_task

    async def progress(stage: str, current: int, total: int) -> None:
        t = self._rebuild_tasks.get(task_id)
        if not t:
            return
        t.stage = stage  # type: ignore[assignment]
        t.current = current
        t.total = total

    async def runner() -> None:
        try:
            await self.index_rebuilder.sync(
                kb_helper=kb_helper,
                new_provider_id=new_provider_id,
                batch_size=batch_size,
                progress_callback=progress,
            )
            t = self._rebuild_tasks.get(task_id)
            if t:
                t.status = "completed"
                t.stage = "finished"
        except Exception as e:
            logger.error(
                "Index rebuild failed for kb %s provider %s: %s",
                kb_id,
                new_provider_id,
                e,
            )
            logger.error(traceback.format_exc())
            t = self._rebuild_tasks.get(task_id)
            if t:
                t.status = "failed"
                t.error = str(e)
        finally:
            task = rebuild_task.task
            if task and not task.done():
                task.cancel()
            self._running_rebuild_tasks.pop(task_id, None)

    rebuild_task.task = asyncio.create_task(runner())
    self._running_rebuild_tasks[task_id] = rebuild_task.task
    return task_id

这样可以在保持相同行为的同时:

  • 移除非结构化 dict 和魔法字符串。
  • 将一次重建的所有状态聚合到一个对象中。
  • 让进度回调逻辑更清晰、更安全。

3. 简化 get_rebuild_progress

有了 dataclass 之后,get_rebuild_progress 就不需要直接操作 dict 了:

def get_rebuild_progress(
    self,
    kb_id: str | None = None,
    task_id: str | None = None,
) -> dict | None:
    target_task_id = task_id or (kb_id and self._kb_to_task_id.get(kb_id))
    if not target_task_id:
        return None

    t = self._rebuild_tasks.get(target_task_id)
    if not t:
        return None

    # 保持对外 API 不变:返回一个 dict
    return {
        "task_id": t.task_id,
        "kb_id": t.kb_id,
        "provider_id": t.provider_id,
        "status": t.status,
        "stage": t.stage,
        "current": t.current,
        "total": t.total,
        "error": t.error,
    }

这样:

  • 内部模型是强类型且集中管理的。
  • 外部 API 完全保持不变(仍然返回 dict)。

4. 清理与重建任务相关的 terminate

terminate 可以基于有类型的任务来取消:

async def terminate(self) -> None:
    for t in list(self._rebuild_tasks.values()):
        if t.task and not t.task.done():
            t.task.cancel()
    self._running_rebuild_tasks.clear()

    # existing kb cleanup remains...

这些聚焦的改动在保留全部功能(包括基于 dict 的公开进度接口和自动重建行为)的同时,统一了作业状态,降低了管理重建索引的认知负担。

Original comment in English

issue (complexity): Consider introducing a typed RebuildTask dataclass and central registries to replace ad‑hoc dicts and closures in the index rebuild flow while keeping the external behavior unchanged.

You can significantly reduce complexity in the rebuild flow by introducing a small typed abstraction instead of ad‑hoc dicts and nested closures, while keeping all behavior intact.

1. Replace loosely‑typed dicts with a @dataclass

Right now:

  • rebuild_tasks: dict[str, dict]
  • kb_rebuild_task_map: dict[str, str]
  • _running_rebuild_tasks: dict[str, asyncio.Task]

all track aspects of the same concept using dicts with magic keys ("status", "stage", etc.). You can centralize this into a dataclass with explicit fields:

# somewhere near top-level
from dataclasses import dataclass, field
from typing import Optional, Literal

RebuildStatus = Literal["processing", "completed", "failed"]
RebuildStage = Literal["prepare", "finished"]

@dataclass
class RebuildTask:
    task_id: str
    kb_id: str
    provider_id: str
    status: RebuildStatus = "processing"
    stage: RebuildStage = "prepare"
    current: int = 0
    total: int = 0
    error: Optional[str] = None
    task: Optional[asyncio.Task] = field(default=None, repr=False)

Then, in __init__, use a single registry keyed by task_id plus a thin kb→task mapping:

self._rebuild_tasks: dict[str, RebuildTask] = {}
self._kb_to_task_id: dict[str, str] = {}

2. Simplify start_rebuild_index and progress updates

Refactor start_rebuild_index to work with RebuildTask instead of free‑form dicts and to avoid nested closures capturing outer state:

async def start_rebuild_index(
    self,
    kb_id: str,
    new_provider_id: str,
    batch_size: int = 32,
) -> str:
    kb_helper = await self.get_kb(kb_id)
    if not kb_helper:
        raise ValueError("知识库不存在")

    existing_task_id = self._kb_to_task_id.get(kb_id)
    if existing_task_id:
        existing = self._rebuild_tasks.get(existing_task_id)
        if existing and existing.status == "processing":
            return existing_task_id

    task_id = str(uuid.uuid4())
    kb_helper.last_rebuild_task_id = task_id

    rebuild_task = RebuildTask(
        task_id=task_id,
        kb_id=kb_id,
        provider_id=new_provider_id,
    )
    self._kb_to_task_id[kb_id] = task_id
    self._rebuild_tasks[task_id] = rebuild_task

    async def progress(stage: str, current: int, total: int) -> None:
        t = self._rebuild_tasks.get(task_id)
        if not t:
            return
        t.stage = stage  # type: ignore[assignment]
        t.current = current
        t.total = total

    async def runner() -> None:
        try:
            await self.index_rebuilder.sync(
                kb_helper=kb_helper,
                new_provider_id=new_provider_id,
                batch_size=batch_size,
                progress_callback=progress,
            )
            t = self._rebuild_tasks.get(task_id)
            if t:
                t.status = "completed"
                t.stage = "finished"
        except Exception as e:
            logger.error(
                "Index rebuild failed for kb %s provider %s: %s",
                kb_id,
                new_provider_id,
                e,
            )
            logger.error(traceback.format_exc())
            t = self._rebuild_tasks.get(task_id)
            if t:
                t.status = "failed"
                t.error = str(e)
        finally:
            task = rebuild_task.task
            if task and not task.done():
                task.cancel()
            self._running_rebuild_tasks.pop(task_id, None)

    rebuild_task.task = asyncio.create_task(runner())
    self._running_rebuild_tasks[task_id] = rebuild_task.task
    return task_id

This keeps the same behavior but:

  • Removes unstructured dicts and magic strings.
  • Groups all state about a rebuild into one object.
  • Makes the progress callback logic clearer and safer.

3. Simplify get_rebuild_progress

With the dataclass, get_rebuild_progress no longer needs to manipulate dicts directly:

def get_rebuild_progress(
    self,
    kb_id: str | None = None,
    task_id: str | None = None,
) -> dict | None:
    target_task_id = task_id or (kb_id and self._kb_to_task_id.get(kb_id))
    if not target_task_id:
        return None

    t = self._rebuild_tasks.get(target_task_id)
    if not t:
        return None

    # Keep external API unchanged: return a dict
    return {
        "task_id": t.task_id,
        "kb_id": t.kb_id,
        "provider_id": t.provider_id,
        "status": t.status,
        "stage": t.stage,
        "current": t.current,
        "total": t.total,
        "error": t.error,
    }

This way:

  • The internal model is strongly typed and centralized.
  • The external API remains exactly as it was (still returns a dict).

4. Clean up terminate for rebuild tasks

terminate can cancel using the typed tasks:

async def terminate(self) -> None:
    for t in list(self._rebuild_tasks.values()):
        if t.task and not t.task.done():
            t.task.cancel()
    self._running_rebuild_tasks.clear()

    # existing kb cleanup remains...

These focused changes keep all functionality (including the public dict-based progress interface and auto‑rebuild behavior) while consolidating job state and reducing the cognitive load around rebuild index management.


raise e

async def _upload_document_structured(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议将共享的向量化和索引工作流抽取为一个专用 helper,使“平铺上传”和“结构化上传”都复用同一套逻辑,而不是各自重复实现。

你可以通过抽取一个小的“embed + 持久化 + 统计/刷新”流水线 helper,让平铺和结构化两种路径都调用它,从而减少分支和重复。

目前,upload_document_upload_document_structured 都会:

  • 生成 doc_id
  • 构建 chunk_idscontentsmetadatas
  • 调用 get_ep() 并带进度地批量获取 embeddings
  • 写入 vec_db.document_storagevec_db.embedding_storage
  • 创建并持久化 KBDocument
  • 更新知识库统计并刷新 KB/文档

只有“如何构建 contents + metadatas + sections” 的逻辑不同。你可以把公共的尾部逻辑封装到一个 helper 中,让模式相关的逻辑保持精简。

例如:

async def _index_chunks(
    self,
    *,
    doc_id: str,
    contents: list[str],
    metadatas: list[dict],
    index_mode: str,
    file_name: str,
    file_type: str,
    file_size: int,
    batch_size: int,
    tasks_limit: int,
    max_retries: int,
    progress_callback=None,
) -> KBDocument:
    await self._ensure_vec_db()
    ep = await self.get_ep()

    async def embedding_progress(current: int, total: int) -> None:
        if progress_callback:
            await progress_callback("embedding", current, total)

    vectors = await ep.get_embeddings_batch(
        contents,
        batch_size=batch_size,
        tasks_limit=tasks_limit,
        max_retries=max_retries,
        progress_callback=embedding_progress if progress_callback else None,
    )

    chunk_ids = [str(uuid.uuid4()) for _ in contents]
    int_ids = await self.vec_db.document_storage.insert_documents_batch(
        chunk_ids, contents, metadatas,
    )
    await self.vec_db.embedding_storage.insert_batch(
        vectors=np.array(vectors, dtype=np.float32),
        ids=int_ids,
    )

    kb_doc = KBDocument(
        doc_id=doc_id,
        kb_id=self.kb.kb_id,
        doc_name=file_name,
        file_type=file_type,
        file_size=file_size,
        file_path="",
        index_mode=index_mode,
        chunk_count=len(contents),
        media_count=0,
    )
    async with self.kb_db.get_db() as session:
        async with session.begin():
            session.add(kb_doc)
        await session.refresh(kb_doc)

    await self.kb_db.update_kb_stats(kb_id=self.kb.kb_id, vec_db=self.vec_db)  # type: ignore[arg-type]
    await self.refresh_kb()
    await self.refresh_document(doc_id)
    return kb_doc

然后,两种模式主要只负责准备输入:

结构化模式:

async def _upload_document_structured(...):
    if file_content is None:
        raise ValueError("结构化上传需要 file_content")

    doc_id = str(uuid.uuid4())
    parser = await select_parser(f".{file_type}")
    if progress_callback:
        await progress_callback("parsing", 0, 100)
    parse_result = await parser.parse(file_content, file_name)
    if progress_callback:
        await progress_callback("parsing", 100, 100)

    structure_parser = StructureParser()
    if progress_callback:
        await progress_callback("chunking", 0, 100)
    nodes = await structure_parser.parse_structure(parse_result.text, file_type)
    sections = structure_parser.flatten(nodes)
    if not sections:
        logger.warning("Structured parse failed, fallback to flat mode for %s", file_name)
        return await self.upload_document(
            file_name=file_name,
            file_content=file_content,
            file_type=file_type,
            index_mode="flat",
            batch_size=batch_size,
            tasks_limit=tasks_limit,
            max_retries=max_retries,
            progress_callback=progress_callback,
        )

    contents = [section.path for section in sections]
    metadatas = [
        {
            "kb_id": self.kb.kb_id,
            "kb_doc_id": doc_id,
            "chunk_index": i,
            "index_mode": "structure",
            "section_path": section.path,
            "section_level": section.level,
        }
        for i, section in enumerate(sections)
    ]
    if progress_callback:
        await progress_callback("chunking", 100, 100)

    kb_doc = await self._index_chunks(
        doc_id=doc_id,
        contents=contents,
        metadatas=metadatas,
        index_mode="structure",
        file_name=file_name,
        file_type=file_type,
        file_size=len(file_content),
        batch_size=batch_size,
        tasks_limit=tasks_limit,
        max_retries=max_retries,
        progress_callback=progress_callback,
    )

    for i, section in enumerate(sections):
        await self.kb_db.upsert_doc_section(
            DocSection(
                doc_id=doc_id,
                kb_id=self.kb.kb_id,
                section_path=section.path,
                section_level=section.level,
                section_title=section.title,
                section_body=section.body,
                parent_section_id=None,
                sort_order=i,
            ),
        )
    return kb_doc

平铺模式:

upload_document 中,当你已经构建好 chunks_text 和对应的平铺 metadatas 后:

contents = chunks_text
metadatas = [
    {
        "kb_id": self.kb.kb_id,
        "kb_doc_id": doc_id,
        "chunk_index": i,
        "index_mode": "flat",
    }
    for i, _ in enumerate(chunks_text)
]

kb_doc = await self._index_chunks(
    doc_id=doc_id,
    contents=contents,
    metadatas=metadatas,
    index_mode="flat",
    file_name=file_name,
    file_type=file_type,
    file_size=file_size,
    batch_size=batch_size,
    tasks_limit=tasks_limit,
    max_retries=max_retries,
    progress_callback=progress_callback,
)

这样可以在保留当前所有行为(index_mode、结构化 section、进度上报)的同时,将关键的“索引流水线”集中在一个地方。未来如果要修改(例如切换向量数据库、调整 metadata 结构、变更统计/刷新行为),只需要改 _index_chunks,就能降低复杂度并减少平铺和结构化路径之间出现细微偏差的风险。

Original comment in English

issue (complexity): Consider extracting the shared embedding and indexing workflow into a dedicated helper so both flat and structured upload paths reuse the same logic instead of duplicating it.

You can reduce the branching/duplication by extracting the common “embed + persist + stats/refresh” pipeline into a small helper that both flat and structured paths call.

Right now, both upload_document and _upload_document_structured:

  • Generate doc_id
  • Build chunk_ids, contents, metadatas
  • Call get_ep() and do batched embeddings with progress
  • Write to vec_db.document_storage and vec_db.embedding_storage
  • Create and persist a KBDocument
  • Update KB stats and refresh KB/doc

Only the “how to build contents + metadatas + sections” differs. You can encapsulate the common tail in a helper and keep the mode-specific logic small.

For example:

async def _index_chunks(
    self,
    *,
    doc_id: str,
    contents: list[str],
    metadatas: list[dict],
    index_mode: str,
    file_name: str,
    file_type: str,
    file_size: int,
    batch_size: int,
    tasks_limit: int,
    max_retries: int,
    progress_callback=None,
) -> KBDocument:
    await self._ensure_vec_db()
    ep = await self.get_ep()

    async def embedding_progress(current: int, total: int) -> None:
        if progress_callback:
            await progress_callback("embedding", current, total)

    vectors = await ep.get_embeddings_batch(
        contents,
        batch_size=batch_size,
        tasks_limit=tasks_limit,
        max_retries=max_retries,
        progress_callback=embedding_progress if progress_callback else None,
    )

    chunk_ids = [str(uuid.uuid4()) for _ in contents]
    int_ids = await self.vec_db.document_storage.insert_documents_batch(
        chunk_ids, contents, metadatas,
    )
    await self.vec_db.embedding_storage.insert_batch(
        vectors=np.array(vectors, dtype=np.float32),
        ids=int_ids,
    )

    kb_doc = KBDocument(
        doc_id=doc_id,
        kb_id=self.kb.kb_id,
        doc_name=file_name,
        file_type=file_type,
        file_size=file_size,
        file_path="",
        index_mode=index_mode,
        chunk_count=len(contents),
        media_count=0,
    )
    async with self.kb_db.get_db() as session:
        async with session.begin():
            session.add(kb_doc)
        await session.refresh(kb_doc)

    await self.kb_db.update_kb_stats(kb_id=self.kb.kb_id, vec_db=self.vec_db)  # type: ignore[arg-type]
    await self.refresh_kb()
    await self.refresh_document(doc_id)
    return kb_doc

Then the two modes become mostly about preparing inputs:

Structured:

async def _upload_document_structured(...):
    if file_content is None:
        raise ValueError("结构化上传需要 file_content")

    doc_id = str(uuid.uuid4())
    parser = await select_parser(f".{file_type}")
    if progress_callback:
        await progress_callback("parsing", 0, 100)
    parse_result = await parser.parse(file_content, file_name)
    if progress_callback:
        await progress_callback("parsing", 100, 100)

    structure_parser = StructureParser()
    if progress_callback:
        await progress_callback("chunking", 0, 100)
    nodes = await structure_parser.parse_structure(parse_result.text, file_type)
    sections = structure_parser.flatten(nodes)
    if not sections:
        logger.warning("Structured parse failed, fallback to flat mode for %s", file_name)
        return await self.upload_document(
            file_name=file_name,
            file_content=file_content,
            file_type=file_type,
            index_mode="flat",
            batch_size=batch_size,
            tasks_limit=tasks_limit,
            max_retries=max_retries,
            progress_callback=progress_callback,
        )

    contents = [section.path for section in sections]
    metadatas = [
        {
            "kb_id": self.kb.kb_id,
            "kb_doc_id": doc_id,
            "chunk_index": i,
            "index_mode": "structure",
            "section_path": section.path,
            "section_level": section.level,
        }
        for i, section in enumerate(sections)
    ]
    if progress_callback:
        await progress_callback("chunking", 100, 100)

    kb_doc = await self._index_chunks(
        doc_id=doc_id,
        contents=contents,
        metadatas=metadatas,
        index_mode="structure",
        file_name=file_name,
        file_type=file_type,
        file_size=len(file_content),
        batch_size=batch_size,
        tasks_limit=tasks_limit,
        max_retries=max_retries,
        progress_callback=progress_callback,
    )

    for i, section in enumerate(sections):
        await self.kb_db.upsert_doc_section(
            DocSection(
                doc_id=doc_id,
                kb_id=self.kb.kb_id,
                section_path=section.path,
                section_level=section.level,
                section_title=section.title,
                section_body=section.body,
                parent_section_id=None,
                sort_order=i,
            ),
        )
    return kb_doc

Flat:

Inside upload_document, after you’ve built chunks_text and flat metadatas:

contents = chunks_text
metadatas = [
    {
        "kb_id": self.kb.kb_id,
        "kb_doc_id": doc_id,
        "chunk_index": i,
        "index_mode": "flat",
    }
    for i, _ in enumerate(chunks_text)
]

kb_doc = await self._index_chunks(
    doc_id=doc_id,
    contents=contents,
    metadatas=metadatas,
    index_mode="flat",
    file_name=file_name,
    file_type=file_type,
    file_size=file_size,
    batch_size=batch_size,
    tasks_limit=tasks_limit,
    max_retries=max_retries,
    progress_callback=progress_callback,
)

This keeps all current behavior (index_mode, structured sections, progress reporting) but concentrates the critical “indexing pipeline” into one place. Future changes (e.g., switching vector DB, changing metadata shape, altering stats/refresh behavior) will only need to be done in _index_chunks, reducing complexity and the chance of subtle drift between flat and structured paths.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-designed enhancement to the knowledge base (RAG) architecture. The key features, including provider-isolated FAISS indexes for hot-switching embedding models, structured document indexing, and safe online index rebuilding, are impressive and add a lot of flexibility. The addition of detailed architecture documentation and comprehensive tests is also highly commendable.

My review has identified a few minor issues:

  • A potential for silent failures in the database migration logic due to overly broad exception handling.
  • An inconsistency in the path separator used for structured documents between the implementation and the documentation.
  • A small typo in the new architecture document.

Overall, this is an excellent contribution that greatly improves the knowledge base system's capabilities and robustness. The provided feedback aims to polish these final details.

Note: Security Review did not run due to the size of the PR.

Comment on lines +259 to +306
async def migrate_to_v3(self) -> None:
"""执行知识库数据库 v3 迁移

变更:
- knowledge_bases.default_index_mode
- kb_documents.index_mode
"""
async with self.get_db() as session:
session: AsyncSession
async with session.begin():
try:
await session.execute(
text(
"ALTER TABLE knowledge_bases "
"ADD COLUMN default_index_mode VARCHAR(20) DEFAULT 'flat'",
),
)
except Exception:
pass
try:
await session.execute(
text(
"ALTER TABLE kb_documents "
"ADD COLUMN index_mode VARCHAR(20) DEFAULT 'flat'",
),
)
except Exception:
pass
try:
await session.execute(
text(
"UPDATE knowledge_bases SET default_index_mode = 'flat' "
"WHERE default_index_mode IS NULL OR default_index_mode = ''",
),
)
except Exception:
pass
try:
await session.execute(
text(
"UPDATE kb_documents SET index_mode = 'flat' "
"WHERE index_mode IS NULL OR index_mode = ''",
),
)
except Exception:
pass
await session.commit()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The try...except Exception: pass blocks in this migration function are too broad and can silently swallow unexpected errors, making debugging difficult. For robustness, please consider adopting the pattern from migrate_to_v2, where specific exceptions like (ProgrammingError, OperationalError) are caught and inspected. This ensures that only expected errors (like a column already existing) are ignored, while other potential issues are properly logged or raised.

stack.pop()

parent_path = stack[-1].path if stack else ""
path = f"{parent_path} > {title}" if parent_path else title
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There appears to be an inconsistency in the section path separator. This implementation uses > as the separator. However, the architecture documentation (docs/knowledge-base-architecture.md) and the ReadDocumentSectionTool's description in astrbot/core/astr_main_agent_resources.py both use / in their examples (e.g., 'Chapter 1/API'). This could cause confusion and issues for the astr_kb_read_section tool. To ensure consistency, please align the separator across the implementation, documentation, and tool descriptions. Using > is a good choice to avoid ambiguity with file path separators, but all related code and documentation should reflect this.


> 对应 Issue: #5262
> 分支: `rag-enhancement`
> 最后更新: 2026-03-08
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There seems to be a typo in the 'last updated' date. The year is set to 2026, which is in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可怜的哈基米

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature:knowledge-base The bug / feature is about knowledge base size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant