From 8f1d19acc983eac0f8e005fccd45c4d01e6b2013 Mon Sep 17 00:00:00 2001 From: Dallas98 <990259227@qq.com> Date: Tue, 17 Mar 2026 01:22:24 +0800 Subject: [PATCH] fix: increase batch delete size and optimize document loading with async --- .../datamate-python/app/module/rag/infra/vectorstore/store.py | 2 +- .../app/module/rag/service/common/batch_processor.py | 3 ++- .../datamate-python/app/module/rag/service/file_processor.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/runtime/datamate-python/app/module/rag/infra/vectorstore/store.py b/runtime/datamate-python/app/module/rag/infra/vectorstore/store.py index 58ae56ba..77424bcb 100644 --- a/runtime/datamate-python/app/module/rag/infra/vectorstore/store.py +++ b/runtime/datamate-python/app/module/rag/infra/vectorstore/store.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) -BATCH_DELETE_SIZE = 100 +BATCH_DELETE_SIZE = 500 def _delete_chunks_by_rag_file_id_batched(client, collection_name: str, rag_file_id: str) -> int: diff --git a/runtime/datamate-python/app/module/rag/service/common/batch_processor.py b/runtime/datamate-python/app/module/rag/service/common/batch_processor.py index 46755746..c5d01d79 100644 --- a/runtime/datamate-python/app/module/rag/service/common/batch_processor.py +++ b/runtime/datamate-python/app/module/rag/service/common/batch_processor.py @@ -80,7 +80,8 @@ async def _store_single_batch( documents, doc_ids = chunks_to_documents(batch_chunks, ids=ids) try: - vectorstore.add_documents(documents=documents, ids=doc_ids) + # 使用异步方法避免阻塞事件循环 + await vectorstore.aadd_documents(documents=documents, ids=doc_ids) logger.info("批次 %d-%d 存储成功", batch_start + 1, batch_end) except Exception as e: logger.error( diff --git a/runtime/datamate-python/app/module/rag/service/file_processor.py b/runtime/datamate-python/app/module/rag/service/file_processor.py index ada1e154..f12a0c65 100644 --- a/runtime/datamate-python/app/module/rag/service/file_processor.py +++ b/runtime/datamate-python/app/module/rag/service/file_processor.py @@ -153,7 +153,7 @@ async def _process_single_graph_file( await self._mark_failed(db, file_repo, str(rag_file.id), "文件不存在") return - documents = load_documents(file_path) + documents = await asyncio.to_thread(load_documents, file_path) if not documents: await self._mark_failed(db, file_repo, str(rag_file.id), "文件解析失败,未生成文档") return