diff --git a/src/memos/api/config.py b/src/memos/api/config.py index 65049b0c2..f296a1bc6 100644 --- a/src/memos/api/config.py +++ b/src/memos/api/config.py @@ -437,6 +437,7 @@ def get_embedder_config() -> dict[str, Any]: "provider": os.getenv("MOS_EMBEDDER_PROVIDER", "openai"), "api_key": os.getenv("MOS_EMBEDDER_API_KEY", "sk-xxxx"), "model_name_or_path": os.getenv("MOS_EMBEDDER_MODEL", "text-embedding-3-large"), + "embedding_dims": int(os.getenv("EMBEDDING_DIMENSION", "1024")), "headers_extra": json.loads(os.getenv("MOS_EMBEDDER_HEADERS_EXTRA", "{}")), "base_url": os.getenv("MOS_EMBEDDER_API_BASE", "http://openai.com"), "backup_client": os.getenv("MOS_EMBEDDER_BACKUP_CLIENT", "false").lower() @@ -984,43 +985,71 @@ def create_user_config(user_name: str, user_id: str) -> tuple["MOSConfig", "Gene graph_db_backend = os.getenv( "GRAPH_DB_BACKEND", os.getenv("NEO4J_BACKEND", "neo4j-community") ).lower() - if graph_db_backend in graph_db_backend_map: + text_mem_type = os.getenv("MOS_TEXT_MEM_TYPE", "tree_text").lower() + + if text_mem_type == "general_text": + text_mem_cfg = { + "backend": "general_text", + "config": { + "extractor_llm": {"backend": "openai", "config": openai_config}, + "vector_db": { + "backend": "qdrant", + "config": { + "collection_name": os.getenv("MOS_GENERAL_TEXT_COLLECTION", "general_text_mem"), + "vector_dimension": int(os.getenv("EMBEDDING_DIMENSION", 3072)), + "distance_metric": "cosine", + "host": os.getenv("QDRANT_HOST", "localhost"), + "port": int(os.getenv("QDRANT_PORT", "6333")), + "path": os.getenv("QDRANT_PATH"), + "url": os.getenv("QDRANT_URL"), + "api_key": os.getenv("QDRANT_API_KEY"), + }, + }, + "embedder": APIConfig.get_embedder_config(), + }, + } + elif graph_db_backend in graph_db_backend_map: + text_mem_cfg = { + "backend": "tree_text", + "config": { + "extractor_llm": {"backend": "openai", "config": openai_config}, + "dispatcher_llm": {"backend": "openai", "config": openai_config}, + "graph_db": { + "backend": graph_db_backend, + "config": graph_db_backend_map[graph_db_backend], + }, + "embedder": APIConfig.get_embedder_config(), + "internet_retriever": internet_config, + "reranker": APIConfig.get_reranker_config(), + "reorganize": os.getenv("MOS_ENABLE_REORGANIZE", "false").lower() + == "true", + "memory_size": { + "WorkingMemory": int(os.getenv("NEBULAR_WORKING_MEMORY", 20)), + "LongTermMemory": int(os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6)), + "UserMemory": int(os.getenv("NEBULAR_USER_MEMORY", 1e6)), + }, + "search_strategy": { + "fast_graph": bool(os.getenv("FAST_GRAPH", "false") == "true"), + "bm25": bool(os.getenv("BM25_CALL", "false") == "true"), + "cot": bool(os.getenv("VEC_COT_CALL", "false") == "true"), + "fulltext": bool(os.getenv("FULLTEXT_CALL", "false") == "true"), + }, + "include_embedding": bool( + os.getenv("INCLUDE_EMBEDDING", "false") == "true" + ), + }, + } + else: + raise ValueError(f"Invalid graph db backend: {graph_db_backend}") + + if text_mem_cfg: # Create MemCube config default_cube_config = GeneralMemCubeConfig.model_validate( { "user_id": user_id, "cube_id": f"{user_name}_default_cube", - "text_mem": { - "backend": "tree_text", - "config": { - "extractor_llm": {"backend": "openai", "config": openai_config}, - "dispatcher_llm": {"backend": "openai", "config": openai_config}, - "graph_db": { - "backend": graph_db_backend, - "config": graph_db_backend_map[graph_db_backend], - }, - "embedder": APIConfig.get_embedder_config(), - "internet_retriever": internet_config, - "reranker": APIConfig.get_reranker_config(), - "reorganize": os.getenv("MOS_ENABLE_REORGANIZE", "false").lower() - == "true", - "memory_size": { - "WorkingMemory": int(os.getenv("NEBULAR_WORKING_MEMORY", 20)), - "LongTermMemory": int(os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6)), - "UserMemory": int(os.getenv("NEBULAR_USER_MEMORY", 1e6)), - }, - "search_strategy": { - "fast_graph": bool(os.getenv("FAST_GRAPH", "false") == "true"), - "bm25": bool(os.getenv("BM25_CALL", "false") == "true"), - "cot": bool(os.getenv("VEC_COT_CALL", "false") == "true"), - "fulltext": bool(os.getenv("FULLTEXT_CALL", "false") == "true"), - }, - "include_embedding": bool( - os.getenv("INCLUDE_EMBEDDING", "false") == "true" - ), - }, - }, + "text_mem": text_mem_cfg, "act_mem": {} if os.getenv("ENABLE_ACTIVATION_MEMORY", "false").lower() == "false" else APIConfig.get_activation_vllm_config(), @@ -1030,8 +1059,6 @@ def create_user_config(user_name: str, user_id: str) -> tuple["MOSConfig", "Gene else APIConfig.get_preference_memory_config(), } ) - else: - raise ValueError(f"Invalid Neo4j backend: {graph_db_backend}") default_mem_cube = GeneralMemCube(default_cube_config) return default_config, default_mem_cube @@ -1069,42 +1096,70 @@ def get_default_cube_config() -> "GeneralMemCubeConfig | None": graph_db_backend = os.getenv( "GRAPH_DB_BACKEND", os.getenv("NEO4J_BACKEND", "neo4j-community") ).lower() - if graph_db_backend in graph_db_backend_map: + text_mem_type = os.getenv("MOS_TEXT_MEM_TYPE", "tree_text").lower() + + if text_mem_type == "general_text": + text_mem_cfg = { + "backend": "general_text", + "config": { + "extractor_llm": {"backend": "openai", "config": openai_config}, + "vector_db": { + "backend": "qdrant", + "config": { + "collection_name": os.getenv("MOS_GENERAL_TEXT_COLLECTION", "general_text_mem"), + "vector_dimension": int(os.getenv("EMBEDDING_DIMENSION", 3072)), + "distance_metric": "cosine", + "host": os.getenv("QDRANT_HOST", "localhost"), + "port": int(os.getenv("QDRANT_PORT", "6333")), + "path": os.getenv("QDRANT_PATH"), + "url": os.getenv("QDRANT_URL"), + "api_key": os.getenv("QDRANT_API_KEY"), + }, + }, + "embedder": APIConfig.get_embedder_config(), + }, + } + elif graph_db_backend in graph_db_backend_map: + text_mem_cfg = { + "backend": "tree_text", + "config": { + "extractor_llm": {"backend": "openai", "config": openai_config}, + "dispatcher_llm": {"backend": "openai", "config": openai_config}, + "graph_db": { + "backend": graph_db_backend, + "config": graph_db_backend_map[graph_db_backend], + }, + "embedder": APIConfig.get_embedder_config(), + "reranker": APIConfig.get_reranker_config(), + "reorganize": os.getenv("MOS_ENABLE_REORGANIZE", "false").lower() + == "true", + "internet_retriever": internet_config, + "memory_size": { + "WorkingMemory": int(os.getenv("NEBULAR_WORKING_MEMORY", 20)), + "LongTermMemory": int(os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6)), + "UserMemory": int(os.getenv("NEBULAR_USER_MEMORY", 1e6)), + }, + "search_strategy": { + "fast_graph": bool(os.getenv("FAST_GRAPH", "false") == "true"), + "bm25": bool(os.getenv("BM25_CALL", "false") == "true"), + "cot": bool(os.getenv("VEC_COT_CALL", "false") == "true"), + "fulltext": bool(os.getenv("FULLTEXT_CALL", "false") == "true"), + }, + "mode": os.getenv("ASYNC_MODE", "sync"), + "include_embedding": bool( + os.getenv("INCLUDE_EMBEDDING", "false") == "true" + ), + }, + } + else: + raise ValueError(f"Invalid graph db backend: {graph_db_backend}") + + if text_mem_cfg: return GeneralMemCubeConfig.model_validate( { "user_id": "default", "cube_id": "default_cube", - "text_mem": { - "backend": "tree_text", - "config": { - "extractor_llm": {"backend": "openai", "config": openai_config}, - "dispatcher_llm": {"backend": "openai", "config": openai_config}, - "graph_db": { - "backend": graph_db_backend, - "config": graph_db_backend_map[graph_db_backend], - }, - "embedder": APIConfig.get_embedder_config(), - "reranker": APIConfig.get_reranker_config(), - "reorganize": os.getenv("MOS_ENABLE_REORGANIZE", "false").lower() - == "true", - "internet_retriever": internet_config, - "memory_size": { - "WorkingMemory": int(os.getenv("NEBULAR_WORKING_MEMORY", 20)), - "LongTermMemory": int(os.getenv("NEBULAR_LONGTERM_MEMORY", 1e6)), - "UserMemory": int(os.getenv("NEBULAR_USER_MEMORY", 1e6)), - }, - "search_strategy": { - "fast_graph": bool(os.getenv("FAST_GRAPH", "false") == "true"), - "bm25": bool(os.getenv("BM25_CALL", "false") == "true"), - "cot": bool(os.getenv("VEC_COT_CALL", "false") == "true"), - "fulltext": bool(os.getenv("FULLTEXT_CALL", "false") == "true"), - }, - "mode": os.getenv("ASYNC_MODE", "sync"), - "include_embedding": bool( - os.getenv("INCLUDE_EMBEDDING", "false") == "true" - ), - }, - }, + "text_mem": text_mem_cfg, "act_mem": {} if os.getenv("ENABLE_ACTIVATION_MEMORY", "false").lower() == "false" else APIConfig.get_activation_vllm_config(), @@ -1114,5 +1169,3 @@ def get_default_cube_config() -> "GeneralMemCubeConfig | None": else APIConfig.get_preference_memory_config(), } ) - else: - raise ValueError(f"Invalid Neo4j backend: {graph_db_backend}") diff --git a/src/memos/api/handlers/add_handler.py b/src/memos/api/handlers/add_handler.py index 3cdbedabf..bcba3c902 100644 --- a/src/memos/api/handlers/add_handler.py +++ b/src/memos/api/handlers/add_handler.py @@ -33,9 +33,11 @@ def __init__(self, dependencies: HandlerDependencies): dependencies: HandlerDependencies instance """ super().__init__(dependencies) - self._validate_dependencies( - "naive_mem_cube", "mem_reader", "mem_scheduler", "feedback_server" - ) + required = ["naive_mem_cube", "mem_reader", "mem_scheduler"] + text_mem = getattr(getattr(dependencies, "naive_mem_cube", None), "text_mem", None) + if hasattr(text_mem, "get_searcher"): + required.append("feedback_server") + self._validate_dependencies(*required) def handle_add_memories(self, add_req: APIADDRequest) -> MemoryResponse: """ diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index ba527d602..9f4a588dc 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -41,6 +41,7 @@ ExtractorFactory, RetrieverFactory, ) +from memos.memories.textual.general import GeneralTextMemory from memos.memories.textual.simple_preference import SimplePreferenceTextMemory from memos.memories.textual.simple_tree import SimpleTreeTextMemory from memos.memories.textual.tree_text_memory.organize.history_manager import MemoryHistoryManager @@ -65,6 +66,28 @@ logger = get_logger(__name__) +class _NullGraphDB: + """No-op graph DB for general_text-only deployments.""" + + def get_user_names_by_memory_ids(self, memory_ids): + return {} + + def exist_user_name(self, user_name): + return False + + def delete_node_by_mem_cube_id(self, mem_cube_id, delete_record_id=None, hard_delete=False): + return None + + def recover_memory_by_mem_cube_id(self, mem_cube_id, delete_record_id=None): + return None + + def search_by_embedding(self, *args, **kwargs): + return [] + + def get_node_by_id(self, *args, **kwargs): + return None + + def _get_default_memory_size(cube_config: Any) -> dict[str, int]: """ Get default memory size configuration. @@ -160,7 +183,8 @@ def init_server() -> dict[str, Any]: dingding_enabled = APIConfig.is_dingding_bot_enabled() # Build component configurations - graph_db_config = build_graph_db_config() + text_mem_type = os.getenv("MOS_TEXT_MEM_TYPE", "tree_text").lower() + graph_db_config = build_graph_db_config() if text_mem_type != "general_text" else None llm_config = build_llm_config() chat_llm_config = build_chat_llm_config() embedder_config = build_embedder_config() @@ -177,7 +201,7 @@ def init_server() -> dict[str, Any]: logger.debug("Component configurations built successfully") # Create component instances - graph_db = GraphStoreFactory.from_config(graph_db_config) + graph_db = GraphStoreFactory.from_config(graph_db_config) if graph_db_config else _NullGraphDB() vector_db = ( VecDBFactory.from_config(vector_db_config) if os.getenv("ENABLE_PREFERENCE_MEMORY", "false") == "true" @@ -191,7 +215,11 @@ def init_server() -> dict[str, Any]: ) embedder = EmbedderFactory.from_config(embedder_config) nli_client = NLIClient(base_url=nli_client_config["base_url"]) - memory_history_manager = MemoryHistoryManager(nli_client=nli_client, graph_db=graph_db) + memory_history_manager = ( + MemoryHistoryManager(nli_client=nli_client, graph_db=graph_db) + if text_mem_type != "general_text" + else None + ) # Pass graph_db to mem_reader for recall operations (deduplication, conflict detection) mem_reader = MemReaderFactory.from_config(mem_reader_config, graph_db=graph_db) reranker = RerankerFactory.from_config(reranker_config) @@ -204,32 +232,38 @@ def init_server() -> dict[str, Any]: logger.debug("Core components instantiated") - # Initialize memory manager - memory_manager = MemoryManager( - graph_db, - embedder, - llm, - memory_size=_get_default_memory_size(default_cube_config), - is_reorganize=getattr(default_cube_config.text_mem.config, "reorganize", False), - ) + text_mem_backend = getattr(default_cube_config.text_mem, "backend", "tree_text") + memory_manager = None - logger.debug("Memory manager initialized") - tokenizer = FastTokenizer() - # Initialize text memory - text_mem = SimpleTreeTextMemory( - llm=llm, - embedder=embedder, - mem_reader=mem_reader, - graph_db=graph_db, - reranker=reranker, - memory_manager=memory_manager, - config=default_cube_config.text_mem.config, - internet_retriever=internet_retriever, - tokenizer=tokenizer, - include_embedding=bool(os.getenv("INCLUDE_EMBEDDING", "false") == "true"), - ) + if text_mem_backend == "general_text": + text_mem = GeneralTextMemory(config=default_cube_config.text_mem.config) + logger.debug("Text memory initialized: general_text") + else: + # Initialize memory manager for tree_text backend + memory_manager = MemoryManager( + graph_db, + embedder, + llm, + memory_size=_get_default_memory_size(default_cube_config), + is_reorganize=getattr(default_cube_config.text_mem.config, "reorganize", False), + ) + + logger.debug("Memory manager initialized") + tokenizer = FastTokenizer() + text_mem = SimpleTreeTextMemory( + llm=llm, + embedder=embedder, + mem_reader=mem_reader, + graph_db=graph_db, + reranker=reranker, + memory_manager=memory_manager, + config=default_cube_config.text_mem.config, + internet_retriever=internet_retriever, + tokenizer=tokenizer, + include_embedding=bool(os.getenv("INCLUDE_EMBEDDING", "false") == "true"), + ) - logger.debug("Text memory initialized") + logger.debug("Text memory initialized: tree_text") # Initialize preference memory components pref_extractor = ( @@ -305,28 +339,33 @@ def init_server() -> dict[str, Any]: logger.debug("MemCube created") - tree_mem: TreeTextMemory = naive_mem_cube.text_mem - searcher: Searcher = tree_mem.get_searcher( - manual_close_internet=os.getenv("ENABLE_INTERNET", "true").lower() == "false", - moscube=False, - process_llm=mem_reader.llm, - ) - logger.debug("Searcher created") + tree_mem = naive_mem_cube.text_mem + searcher = None + feedback_server = None + + if hasattr(tree_mem, "get_searcher"): + searcher = tree_mem.get_searcher( + manual_close_internet=os.getenv("ENABLE_INTERNET", "true").lower() == "false", + moscube=False, + process_llm=mem_reader.llm, + ) + logger.debug("Searcher created") # Set searcher to mem_reader mem_reader.set_searcher(searcher) - # Initialize feedback server - feedback_server = SimpleMemFeedback( - llm=llm, - embedder=embedder, - graph_store=graph_db, - memory_manager=memory_manager, - mem_reader=mem_reader, - searcher=searcher, - reranker=feedback_reranker, - pref_mem=pref_mem, - ) + # Initialize feedback server for tree_text backend only + if memory_manager is not None and searcher is not None: + feedback_server = SimpleMemFeedback( + llm=llm, + embedder=embedder, + graph_store=graph_db, + memory_manager=memory_manager, + mem_reader=mem_reader, + searcher=searcher, + reranker=feedback_reranker, + pref_mem=pref_mem, + ) # Initialize Scheduler scheduler_config_dict = APIConfig.get_scheduler_config() diff --git a/src/memos/api/handlers/feedback_handler.py b/src/memos/api/handlers/feedback_handler.py index 217bca7cd..30a19d723 100644 --- a/src/memos/api/handlers/feedback_handler.py +++ b/src/memos/api/handlers/feedback_handler.py @@ -28,7 +28,11 @@ def __init__(self, dependencies: HandlerDependencies): dependencies: HandlerDependencies instance """ super().__init__(dependencies) - self._validate_dependencies("mem_reader", "mem_scheduler", "searcher", "reranker") + required = ["mem_reader", "mem_scheduler", "reranker"] + text_mem = getattr(getattr(dependencies, "naive_mem_cube", None), "text_mem", None) + if hasattr(text_mem, "get_searcher"): + required.append("searcher") + self._validate_dependencies(*required) def handle_feedback_memories(self, feedback_req: APIFeedbackRequest) -> MemoryResponse: """ diff --git a/src/memos/api/handlers/formatters_handler.py b/src/memos/api/handlers/formatters_handler.py index 06c4fd223..d4ddaf51d 100644 --- a/src/memos/api/handlers/formatters_handler.py +++ b/src/memos/api/handlers/formatters_handler.py @@ -155,7 +155,7 @@ def separate_knowledge_and_conversation_mem(memories: list[dict[str, Any]]): knowledge_mem = [] conversation_mem = [] for item in memories: - sources = item.get("metadata", {}).get("sources", []) + sources = item.get("metadata", {}).get("sources") or [] if ( item["metadata"]["memory_type"] != "RawFileMemory" and len(sources) > 0 diff --git a/src/memos/api/handlers/memory_handler.py b/src/memos/api/handlers/memory_handler.py index ef56c7489..d7d921bae 100644 --- a/src/memos/api/handlers/memory_handler.py +++ b/src/memos/api/handlers/memory_handler.py @@ -36,6 +36,62 @@ logger = get_logger(__name__) +def _get_text_mem_records( + text_mem: Any, + mem_cube_id: str, + user_id: str | None = None, + page: int | None = None, + page_size: int | None = None, + filter: dict[str, Any] | None = None, + memory_type: list[str] | None = None, +) -> tuple[list[dict[str, Any]], int]: + """ + Compatibility wrapper for text memory `get_all` across backends. + + Tree_text memories return a dict payload with `nodes` and `total_nodes`, + while general_text memories currently return a list. We normalize both to + `(nodes, total_nodes)` to keep endpoint behavior stable. + """ + memory_type = memory_type.copy() if memory_type is not None else None + try: + result = text_mem.get_all( + user_name=mem_cube_id, + user_id=user_id, + page=page, + page_size=page_size, + filter=filter, + memory_type=memory_type, + ) + except TypeError: + # Fallbacks for different backend signatures + try: + result = text_mem.get_all( + user_name=mem_cube_id, + user_id=user_id, + page=page, + page_size=page_size, + filter=filter, + ) + except TypeError: + try: + result = text_mem.get_all(user_name=mem_cube_id) + except TypeError: + result = text_mem.get_all() + + if isinstance(result, dict): + nodes = result.get("nodes", []) + total_nodes = int(result.get("total_nodes", len(nodes) if isinstance(nodes, list) else 0)) + return nodes if isinstance(nodes, list) else list(nodes or []), total_nodes + + if isinstance(result, list): + return result, len(result) + + logger.error( + f"Unsupported get_all return type from {type(text_mem)}: {type(result)}" + ) + return [], 0 + + def handle_get_all_memories( user_id: str, mem_cube_id: str, @@ -60,11 +116,34 @@ def handle_get_all_memories( reformat_memory_list = [] if memory_type == "text_mem": - # Get all text memories from the graph database - memories = naive_mem_cube.text_mem.get_all(user_name=mem_cube_id) + # Get all text memories from backend. Tree backends return graph dict, + # general_text returns paginated flat nodes. + try: + raw_memories = naive_mem_cube.text_mem.get_all( + user_name=mem_cube_id, + user_id=user_id, + page=1, + page_size=100, + ) + except TypeError: + raw_memories = naive_mem_cube.text_mem.get_all(user_name=mem_cube_id) + + # general_text / flat mode + if isinstance(raw_memories, dict) and "nodes" in raw_memories and "edges" not in raw_memories: + reformat_memory_list.append( + { + "cube_id": mem_cube_id, + "memories": [{"nodes": remove_embedding_recursive(raw_memories.get("nodes", []))}], + "memory_statistics": {"total_nodes": int(raw_memories.get("total_nodes", 0))}, + } + ) + return MemoryResponse( + message="Memories retrieved successfully", + data=reformat_memory_list, + ) - # Format and convert to tree structure - memories_cleaned = remove_embedding_recursive(memories) + # tree_text / graph mode + memories_cleaned = remove_embedding_recursive(raw_memories) custom_type_ratios = { "WorkingMemory": 0.20, "LongTermMemory": 0.40, @@ -73,7 +152,6 @@ def handle_get_all_memories( tree_result, node_type_count = convert_graph_to_tree_forworkmem( memories_cleaned, target_node_count=200, type_ratios=custom_type_ratios ) - # Ensure all node IDs are unique in the tree structure tree_result = ensure_unique_tree_ids(tree_result) memories_filtered = filter_nodes_by_tree_ids(tree_result, memories_cleaned) children = tree_result["children"] @@ -284,15 +362,15 @@ def handle_get_memories( ) -> GetMemoryResponse: results: dict[str, Any] = {"text_mem": [], "pref_mem": [], "tool_mem": [], "skill_mem": []} text_memory_type = ["WorkingMemory", "LongTermMemory", "UserMemory", "OuterMemory"] - text_memories_info = naive_mem_cube.text_mem.get_all( - user_name=get_mem_req.mem_cube_id, + text_memories, total_text_nodes = _get_text_mem_records( + naive_mem_cube.text_mem, + get_mem_req.mem_cube_id, user_id=get_mem_req.user_id, page=get_mem_req.page, page_size=get_mem_req.page_size, filter=get_mem_req.filter, memory_type=text_memory_type, ) - text_memories, total_text_nodes = text_memories_info["nodes"], text_memories_info["total_nodes"] results["text_mem"] = [ { "cube_id": get_mem_req.mem_cube_id, @@ -302,18 +380,15 @@ def handle_get_memories( ] if get_mem_req.include_tool_memory: - tool_memories_info = naive_mem_cube.text_mem.get_all( - user_name=get_mem_req.mem_cube_id, + tool_memories, total_tool_nodes = _get_text_mem_records( + naive_mem_cube.text_mem, + get_mem_req.mem_cube_id, user_id=get_mem_req.user_id, page=get_mem_req.page, page_size=get_mem_req.page_size, filter=get_mem_req.filter, memory_type=["ToolSchemaMemory", "ToolTrajectoryMemory"], ) - tool_memories, total_tool_nodes = ( - tool_memories_info["nodes"], - tool_memories_info["total_nodes"], - ) results["tool_mem"] = [ { @@ -323,18 +398,15 @@ def handle_get_memories( } ] if get_mem_req.include_skill_memory: - skill_memories_info = naive_mem_cube.text_mem.get_all( - user_name=get_mem_req.mem_cube_id, + skill_memories, total_skill_nodes = _get_text_mem_records( + naive_mem_cube.text_mem, + get_mem_req.mem_cube_id, user_id=get_mem_req.user_id, page=get_mem_req.page, page_size=get_mem_req.page_size, filter=get_mem_req.filter, memory_type=["SkillMemory"], ) - skill_memories, total_skill_nodes = ( - skill_memories_info["nodes"], - skill_memories_info["total_nodes"], - ) results["skill_mem"] = [ { @@ -471,15 +543,15 @@ def handle_get_memories_dashboard( total_preference_nodes = 0 text_memory_type = ["WorkingMemory", "LongTermMemory", "UserMemory", "OuterMemory"] - text_memories_info = naive_mem_cube.text_mem.get_all( - user_name=get_mem_req.mem_cube_id, + text_memories, total_text_nodes = _get_text_mem_records( + naive_mem_cube.text_mem, + get_mem_req.mem_cube_id, user_id=get_mem_req.user_id, page=get_mem_req.page, page_size=get_mem_req.page_size, filter=get_mem_req.filter, memory_type=text_memory_type, ) - text_memories, total_text_nodes = text_memories_info["nodes"], text_memories_info["total_nodes"] # Group text memories by cube_id from metadata.user_name text_mem_by_cube: dict[str, list] = {} @@ -503,18 +575,15 @@ def handle_get_memories_dashboard( ] if get_mem_req.include_tool_memory: - tool_memories_info = naive_mem_cube.text_mem.get_all( - user_name=get_mem_req.mem_cube_id, + tool_memories, total_tool_nodes = _get_text_mem_records( + naive_mem_cube.text_mem, + get_mem_req.mem_cube_id, user_id=get_mem_req.user_id, page=get_mem_req.page, page_size=get_mem_req.page_size, filter=get_mem_req.filter, memory_type=["ToolSchemaMemory", "ToolTrajectoryMemory"], ) - tool_memories, total_tool_nodes = ( - tool_memories_info["nodes"], - tool_memories_info["total_nodes"], - ) # Group tool memories by cube_id from metadata.user_name tool_mem_by_cube: dict[str, list] = {} @@ -538,18 +607,15 @@ def handle_get_memories_dashboard( ] if get_mem_req.include_skill_memory: - skill_memories_info = naive_mem_cube.text_mem.get_all( - user_name=get_mem_req.mem_cube_id, + skill_memories, total_skill_nodes = _get_text_mem_records( + naive_mem_cube.text_mem, + get_mem_req.mem_cube_id, user_id=get_mem_req.user_id, page=get_mem_req.page, page_size=get_mem_req.page_size, filter=get_mem_req.filter, memory_type=["SkillMemory"], ) - skill_memories, total_skill_nodes = ( - skill_memories_info["nodes"], - skill_memories_info["total_nodes"], - ) # Group skill memories by cube_id from metadata.user_name skill_mem_by_cube: dict[str, list] = {} diff --git a/src/memos/api/handlers/search_handler.py b/src/memos/api/handlers/search_handler.py index 8e7785ad5..1c4ddf618 100644 --- a/src/memos/api/handlers/search_handler.py +++ b/src/memos/api/handlers/search_handler.py @@ -40,9 +40,11 @@ def __init__(self, dependencies: HandlerDependencies): dependencies: HandlerDependencies instance """ super().__init__(dependencies) - self._validate_dependencies( - "naive_mem_cube", "mem_scheduler", "searcher", "deepsearch_agent" - ) + required = ["naive_mem_cube", "mem_scheduler", "deepsearch_agent"] + text_mem = getattr(getattr(dependencies, "naive_mem_cube", None), "text_mem", None) + if hasattr(text_mem, "get_searcher"): + required.append("searcher") + self._validate_dependencies(*required) def handle_search_memories(self, search_req: APISearchRequest) -> SearchResponse: """ @@ -75,12 +77,18 @@ def handle_search_memories(self, search_req: APISearchRequest) -> SearchResponse results = self._apply_relativity_threshold(results, search_req_local.relativity) if search_req_local.dedup == "sim": - results = self._dedup_text_memories(results, search_req.top_k) - self._strip_embeddings(results) + if getattr(self, "searcher", None) is None: + self.logger.warning("[SearchHandler] dedup=sim requested but searcher is None; skipping dedup") + else: + results = self._dedup_text_memories(results, search_req.top_k) + self._strip_embeddings(results) elif search_req_local.dedup == "mmr": - pref_top_k = getattr(search_req_local, "pref_top_k", 6) - results = self._mmr_dedup_text_memories(results, search_req.top_k, pref_top_k) - self._strip_embeddings(results) + if getattr(self, "searcher", None) is None: + self.logger.warning("[SearchHandler] dedup=mmr requested but searcher is None; skipping dedup") + else: + pref_top_k = getattr(search_req_local, "pref_top_k", 6) + results = self._mmr_dedup_text_memories(results, search_req.top_k, pref_top_k) + self._strip_embeddings(results) text_mem = results["text_mem"] results["text_mem"] = rerank_knowledge_mem( diff --git a/src/memos/api/mcp_serve.py b/src/memos/api/mcp_serve.py index 8f8e70311..62ed9bb10 100644 --- a/src/memos/api/mcp_serve.py +++ b/src/memos/api/mcp_serve.py @@ -107,7 +107,7 @@ def load_default_config(user_id="default_user"): # Extract mandatory or special params openai_api_key = kwargs.pop("openai_api_key", os.getenv("OPENAI_API_KEY")) openai_api_base = kwargs.pop("openai_api_base", "https://api.openai.com/v1") - text_mem_type = kwargs.pop("text_mem_type", "tree_text") + text_mem_type = kwargs.pop("text_mem_type", "general_text") # Ensure embedder_model has a default value if not set if "embedder_model" not in kwargs: diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 5bf27e985..1380e80fc 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -373,7 +373,7 @@ class APISearchRequest(BaseRequest): ) search_tool_memory: bool = Field( - True, + False, description=( "Whether to retrieve tool memories along with general memories. " "If enabled, the system will automatically recall tool memories " @@ -388,7 +388,7 @@ class APISearchRequest(BaseRequest): ) include_skill_memory: bool = Field( - True, + False, description="Whether to retrieve skill memories along with general memories. " "If enabled, the system will automatically recall skill memories " "relevant to the query. Default: True.", @@ -834,7 +834,7 @@ class GetMemoryRequest(BaseRequest): user_id: str | None = Field(None, description="User ID") include_preference: bool = Field(True, description="Whether to return preference memory") include_tool_memory: bool = Field(True, description="Whether to return tool memory") - include_skill_memory: bool = Field(True, description="Whether to return skill memory") + include_skill_memory: bool = Field(False, description="Whether to return skill memory") filter: dict[str, Any] | None = Field(None, description="Filter for the memory") page: int | None = Field( None, diff --git a/src/memos/api/start_api.py b/src/memos/api/start_api.py index 24a36f017..eb22826ad 100644 --- a/src/memos/api/start_api.py +++ b/src/memos/api/start_api.py @@ -16,8 +16,11 @@ # Configure logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) +# Silence verbose INFO loggers that dump full embeddings/LLM responses +for _noisy in ("httpx", "memos.utils", "memos.graph_dbs.neo4j", "memos.embedders", "memos.llms", "memos.mem_reader", "memos.mem_scheduler"): + logging.getLogger(_noisy).setLevel(logging.WARNING) # Load environment variables load_dotenv(override=True) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index 2b3bd0967..1b9a3097e 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -61,10 +61,14 @@ def embed(self, texts: list[str]) -> list[list[float]]: try: async def _create_embeddings(): - return self.client.embeddings.create( - model=getattr(self.config, "model_name_or_path", "text-embedding-3-large"), - input=texts, - ) + kwargs = { + "model": getattr(self.config, "model_name_or_path", "text-embedding-3-large"), + "input": texts, + } + dims = getattr(self.config, "embedding_dims", None) + if dims: + kwargs["dimensions"] = int(dims) + return self.client.embeddings.create(**kwargs) init_time = time.time() response = asyncio.run( @@ -82,14 +86,18 @@ async def _create_embeddings(): try: async def _create_embeddings_backup(): - return self.backup_client.embeddings.create( - model=getattr( + kwargs = { + "model": getattr( self.config, "backup_model_name_or_path", "text-embedding-3-large", ), - input=texts, - ) + "input": texts, + } + dims = getattr(self.config, "embedding_dims", None) + if dims: + kwargs["dimensions"] = int(dims) + return self.backup_client.embeddings.create(**kwargs) init_time = time.time() response = asyncio.run( diff --git a/src/memos/graph_dbs/neo4j_community.py b/src/memos/graph_dbs/neo4j_community.py index 09ad46c42..51bf3e66e 100644 --- a/src/memos/graph_dbs/neo4j_community.py +++ b/src/memos/graph_dbs/neo4j_community.py @@ -135,6 +135,13 @@ def add_nodes_batch(self, nodes: list[dict[str, Any]], user_name: str | None = N metadata = _prepare_node_metadata(metadata) metadata = _flatten_info_fields(metadata) + # Sanitize: Neo4j Community can't store Map/dict values as properties + for k, v in list(metadata.items()): + if isinstance(v, dict): + metadata[k] = json.dumps(v) + elif isinstance(v, list) and v and isinstance(v[0], dict): + metadata[k] = [json.dumps(item) if isinstance(item, dict) else item for item in v] + # Initialize delete_time and delete_record_id fields metadata.setdefault("delete_time", "") metadata.setdefault("delete_record_id", "") diff --git a/src/memos/llms/openai_new.py b/src/memos/llms/openai_new.py index 766a17fda..f66e84220 100644 --- a/src/memos/llms/openai_new.py +++ b/src/memos/llms/openai_new.py @@ -46,11 +46,17 @@ def generate(self, messages: MessageList, **kwargs) -> str: if tool_call_outputs: return self.tool_call_parser(tool_call_outputs) - output_text = getattr(response, "output_text", "") + output_text = getattr(response, "output_text", "") or "" output_reasoning = [ item for item in response.output if isinstance(item, ResponseReasoningItem) ] - summary = output_reasoning[0].summary + summary = output_reasoning[0].summary if output_reasoning else None + + if not output_text: + logger.warning( + "[OpenAI Responses LLM] Empty output_text from model %s", + kwargs.get("model_name_or_path", self.config.model_name_or_path), + ) if self.config.remove_think_prefix: return remove_thinking_tags(output_text) @@ -134,11 +140,17 @@ def generate(self, messages: MessageList, **kwargs) -> str: else NOT_GIVEN, ) - output_text = getattr(response, "output_text", "") + output_text = getattr(response, "output_text", "") or "" output_reasoning = [ item for item in response.output if isinstance(item, ResponseReasoningItem) ] - summary = output_reasoning[0].summary + summary = output_reasoning[0].summary if output_reasoning else None + + if not output_text: + logger.warning( + "[Azure Responses LLM] Empty output_text from model %s", + self.config.model_name_or_path, + ) if self.config.remove_think_prefix: return remove_thinking_tags(output_text) diff --git a/src/memos/log.py b/src/memos/log.py index c0bb5bf31..1e1b55730 100644 --- a/src/memos/log.py +++ b/src/memos/log.py @@ -196,7 +196,7 @@ def close(self): "filters": ["package_tree_filter", "context_filter"], }, "file": { - "level": "INFO", + "level": os.getenv("MEMOS_FILE_LOG_LEVEL", "WARNING"), "class": "concurrent_log_handler.ConcurrentTimedRotatingFileHandler", "when": "midnight", "interval": 1, diff --git a/src/memos/mem_reader/multi_modal_struct.py b/src/memos/mem_reader/multi_modal_struct.py index e3d2bece9..768d56b8f 100644 --- a/src/memos/mem_reader/multi_modal_struct.py +++ b/src/memos/mem_reader/multi_modal_struct.py @@ -1,5 +1,6 @@ import concurrent.futures import json +import os import re import traceback @@ -892,6 +893,9 @@ def _process_tool_trajectory_fine( """ Process tool trajectory memory items through LLM to generate fine mode memories. """ + if os.getenv("MOS_ENABLE_TOOL_TRAJECTORY_MEMORY", "false").lower() != "true": + return [] + if not fast_memory_items: return [] @@ -1000,23 +1004,25 @@ def _process_multi_modal_data( future_tool = executor.submit( self._process_tool_trajectory_fine, fast_memory_items, info, **kwargs ) - future_skill = executor.submit( - process_skill_memory_fine, - fast_memory_items=fast_memory_items, - info=info, - searcher=self.searcher, - graph_db=self.graph_db, - llm=self.llm, - embedder=self.embedder, - oss_config=self.oss_config, - skills_dir_config=self.skills_dir_config, - **kwargs, - ) + future_skill = None + if os.getenv("MOS_ENABLE_SKILL_MEMORY", "false").lower() == "true": + future_skill = executor.submit( + process_skill_memory_fine, + fast_memory_items=fast_memory_items, + info=info, + searcher=self.searcher, + graph_db=self.graph_db, + llm=self.llm, + embedder=self.embedder, + oss_config=self.oss_config, + skills_dir_config=self.skills_dir_config, + **kwargs, + ) # Collect results fine_memory_items_string_parser = future_string.result() fine_memory_items_tool_trajectory_parser = future_tool.result() - fine_memory_items_skill_memory_parser = future_skill.result() + fine_memory_items_skill_memory_parser = future_skill.result() if future_skill else [] fine_memory_items.extend(fine_memory_items_string_parser) fine_memory_items.extend(fine_memory_items_tool_trajectory_parser) @@ -1067,23 +1073,25 @@ def _process_transfer_multi_modal_data( future_tool = executor.submit( self._process_tool_trajectory_fine, raw_nodes, info, **kwargs ) - future_skill = executor.submit( - process_skill_memory_fine, - raw_nodes, - info, - searcher=self.searcher, - llm=self.llm, - embedder=self.embedder, - graph_db=self.graph_db, - oss_config=self.oss_config, - skills_dir_config=self.skills_dir_config, - **kwargs, - ) + future_skill = None + if os.getenv("MOS_ENABLE_SKILL_MEMORY", "false").lower() == "true": + future_skill = executor.submit( + process_skill_memory_fine, + raw_nodes, + info, + searcher=self.searcher, + llm=self.llm, + embedder=self.embedder, + graph_db=self.graph_db, + oss_config=self.oss_config, + skills_dir_config=self.skills_dir_config, + **kwargs, + ) # Collect results fine_memory_items_string_parser = future_string.result() fine_memory_items_tool_trajectory_parser = future_tool.result() - fine_memory_items_skill_memory_parser = future_skill.result() + fine_memory_items_skill_memory_parser = future_skill.result() if future_skill else [] fine_memory_items.extend(fine_memory_items_string_parser) fine_memory_items.extend(fine_memory_items_tool_trajectory_parser) fine_memory_items.extend(fine_memory_items_skill_memory_parser) diff --git a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py index a9a727b08..a4bc86f37 100644 --- a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py +++ b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py @@ -353,6 +353,8 @@ def _split_task_chunk_by_llm(llm: BaseLLM, messages: MessageList) -> dict[str, M skills_llm = os.getenv("SKILLS_LLM", None) llm_kwargs = {"model_name_or_path": skills_llm} if skills_llm else {} response_text = llm.generate(prompt, **llm_kwargs) + if not response_text: + raise ValueError("LLM returned empty/None response for skill chunking") response_json = json.loads(response_text.replace("```json", "").replace("```", "")) break except Exception as e: @@ -1015,9 +1017,13 @@ def process_skill_memory_fine( rewrite_query: bool = True, oss_config: dict[str, Any] | None = None, skills_dir_config: dict[str, Any] | None = None, + # PATCH: skip skill processing for fast migration + _SKIP_SKILL = os.getenv("MEMOS_SKIP_SKILL_PROCESSING", "").lower() in ("1", "true", "yes"), complete_skill_memory: bool = True, **kwargs, ) -> list[TextualMemoryItem]: + if _SKIP_SKILL: + return [] skills_repo_backend = _get_skill_file_storage_location() oss_client, _missing_keys, flag = _skill_init( skills_repo_backend, oss_config, skills_dir_config diff --git a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py index b103acf3a..d11efbeb7 100644 --- a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py +++ b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py @@ -28,6 +28,7 @@ ExtractorFactory, RetrieverFactory, ) +from memos.memories.textual.general import GeneralTextMemory from memos.memories.textual.simple_preference import SimplePreferenceTextMemory from memos.memories.textual.simple_tree import SimpleTreeTextMemory from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager @@ -318,32 +319,39 @@ def init_components() -> dict[str, Any]: # Initialize chat llms logger.debug("Core components instantiated") - # Initialize memory manager - memory_manager = MemoryManager( - graph_db, - embedder, - llm, - memory_size=_get_default_memory_size(default_cube_config), - is_reorganize=getattr(default_cube_config.text_mem.config, "reorganize", False), - ) + text_mem_backend = getattr(default_cube_config.text_mem, "backend", "tree_text") + memory_manager = None + + if text_mem_backend == "general_text": + text_mem = GeneralTextMemory(config=default_cube_config.text_mem.config) + logger.debug("Text memory initialized: general_text") + else: + # Initialize memory manager + memory_manager = MemoryManager( + graph_db, + embedder, + llm, + memory_size=_get_default_memory_size(default_cube_config), + is_reorganize=getattr(default_cube_config.text_mem.config, "reorganize", False), + ) - logger.debug("Memory manager initialized") - - tokenizer = FastTokenizer() - # Initialize text memory - text_mem = SimpleTreeTextMemory( - llm=llm, - embedder=embedder, - mem_reader=mem_reader, - graph_db=graph_db, - reranker=reranker, - memory_manager=memory_manager, - config=default_cube_config.text_mem.config, - internet_retriever=internet_retriever, - tokenizer=tokenizer, - ) + logger.debug("Memory manager initialized") + + tokenizer = FastTokenizer() + # Initialize text memory + text_mem = SimpleTreeTextMemory( + llm=llm, + embedder=embedder, + mem_reader=mem_reader, + graph_db=graph_db, + reranker=reranker, + memory_manager=memory_manager, + config=default_cube_config.text_mem.config, + internet_retriever=internet_retriever, + tokenizer=tokenizer, + ) - logger.debug("Text memory initialized") + logger.debug("Text memory initialized: tree_text") # Initialize preference memory components pref_extractor = ( @@ -406,22 +414,33 @@ def init_components() -> dict[str, Any]: para_mem=None, ) - tree_mem: SimpleTreeTextMemory = naive_mem_cube.text_mem - searcher: Searcher = tree_mem.get_searcher( - manual_close_internet=os.getenv("ENABLE_INTERNET", "true").lower() == "false", - moscube=False, - process_llm=mem_reader.llm, - ) - # Initialize feedback server - feedback_server = SimpleMemFeedback( - llm=llm, - embedder=embedder, - graph_store=graph_db, - memory_manager=memory_manager, - mem_reader=mem_reader, - searcher=searcher, - reranker=feedback_reranker, - pref_mem=pref_mem, - ) + tree_mem = naive_mem_cube.text_mem + searcher = None + if hasattr(tree_mem, "get_searcher"): + searcher = tree_mem.get_searcher( + manual_close_internet=os.getenv("ENABLE_INTERNET", "true").lower() == "false", + moscube=False, + process_llm=mem_reader.llm, + ) + + # Initialize feedback server for tree_text backend only + feedback_server = None + if memory_manager is not None and searcher is not None: + feedback_server = SimpleMemFeedback( + llm=llm, + embedder=embedder, + graph_store=graph_db, + memory_manager=memory_manager, + mem_reader=mem_reader, + searcher=searcher, + reranker=feedback_reranker, + pref_mem=pref_mem, + ) # Return all components as a dictionary for easy access and extension - return {"naive_mem_cube": naive_mem_cube, "feedback_server": feedback_server} + return { + "naive_mem_cube": naive_mem_cube, + "feedback_server": feedback_server, + "searcher": searcher, + "memory_manager": memory_manager, + "text_mem_backend": text_mem_backend, + } diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py index 20dbb63b2..6ecaa79c8 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py @@ -80,7 +80,10 @@ def process_message(self, message: ScheduleMessageItem): text_mem = mem_cube.text_mem if not isinstance(text_mem, TreeTextMemory): - logger.error("Expected TreeTextMemory but got %s", type(text_mem).__name__) + logger.info( + "Skipping mem_read task for non-tree backend: %s", + type(text_mem).__name__, + ) return self._process_memories_with_reader( diff --git a/src/memos/memories/textual/general.py b/src/memos/memories/textual/general.py index b90f2a6ab..50c49bda2 100644 --- a/src/memos/memories/textual/general.py +++ b/src/memos/memories/textual/general.py @@ -78,20 +78,47 @@ def extract(self, messages: MessageList) -> list[TextualMemoryItem]: return extracted_memories - def add(self, memories: list[TextualMemoryItem | dict[str, Any]]) -> None: + def add( + self, memories: list[TextualMemoryItem | dict[str, Any]], user_name: str | None = None, **kwargs + ) -> list[str]: """Add memories. Args: memories: List of TextualMemoryItem objects or dictionaries to add. + user_name: Optional user scope (ignored for general_text; kept for API compatibility). + **kwargs: Extra args for compatibility with tree_text callers. + + Returns: + List of successfully added memory IDs. """ memory_items = [TextualMemoryItem(**m) if isinstance(m, dict) else m for m in memories] - # Memory encode + if not memory_items: + return [] + + # Memory encode (fallback safely if batch embedding returns None/partial) embed_memories = self.embedder.embed([m.memory for m in memory_items]) + if embed_memories is None: + logger.warning("Embedding service returned None; attempting per-item fallback") + embed_memories = [] + for memo in memory_items: + try: + embed_memories.append(self._embed_one_sentence(memo.memory)) + except Exception as exc: + logger.error(f"Failed embedding for memory {memo.id}: {exc}") + continue + + # Create vector db items for successfully embedded memories only + vec_db_items: list[VecDBItem] = [] + added_ids: list[str] = [] + for idx, item in enumerate(memory_items): + if idx >= len(embed_memories): + break + emb = embed_memories[idx] + if emb is None: + logger.warning(f"Skipping memory {item.id}: embedding missing") + continue - # Create vector db items - vec_db_items = [] - for item, emb in zip(memory_items, embed_memories, strict=True): vec_db_items.append( VecDBItem( id=item.id, @@ -99,9 +126,14 @@ def add(self, memories: list[TextualMemoryItem | dict[str, Any]]) -> None: vector=emb, ) ) + added_ids.append(item.id) + + if not vec_db_items: + return [] # Add to vector db self.vector_db.add(vec_db_items) + return added_ids def update(self, memory_id: str, new_memory: TextualMemoryItem | dict[str, Any]) -> None: """Update a memory by memory_id.""" @@ -154,14 +186,86 @@ def get_by_ids(self, memory_ids: list[str]) -> list[TextualMemoryItem]: memories = [TextualMemoryItem(**db_item.payload) for db_item in db_items] return memories - def get_all(self) -> list[TextualMemoryItem]: - """Get all memories. - Returns: - list[TextualMemoryItem]: List of all memories. + def get_all( + self, + user_name: str | None = None, + user_id: str | None = None, + page: int | None = None, + page_size: int | None = None, + filter: dict[str, Any] | None = None, + memory_type: list[str] | None = None, + ) -> dict[str, Any]: + """Get memories with compatibility pagination/filtering. + + Returns a tree_text-compatible payload shape: + {"nodes": [...], "total_nodes": N} """ - all_items = self.vector_db.get_all() - all_memories = [TextualMemoryItem(**memo.payload) for memo in all_items] - return all_memories + if page is None: + page = 1 + if page_size is None: + page_size = int(os.getenv("MOS_GENERAL_TEXT_PAGE_SIZE", "100")) + + page = max(1, int(page)) + page_size = max(1, int(page_size)) + + qdrant_filter: dict[str, Any] = {} + if user_id is not None: + qdrant_filter["metadata.user_id"] = user_id + elif user_name is not None: + qdrant_filter["metadata.mem_cube_id"] = user_name + + # Apply optional flat filter keys directly to payload filter map. + if filter: + for k, v in filter.items(): + qdrant_filter[k] = v + + all_items = self.vector_db.get_by_filter( + qdrant_filter, + scroll_limit=min(max(page_size, 50), 500), + page=page, + page_size=page_size, + include_vectors=False, + ) + all_memories = [TextualMemoryItem(**memo.payload).model_dump() for memo in all_items] + + def _match(item: dict[str, Any]) -> bool: + md = item.get("metadata") or {} + + # Strict scoping to avoid accidental global scans in user-scoped endpoints. + if user_id is not None and md.get("user_id") != user_id: + return False + + if user_name is not None: + md_user_name = md.get("user_name") + md_mem_cube = md.get("mem_cube_id") + if md_user_name is not None and md_user_name != user_name: + return False + if md_mem_cube is not None and md_mem_cube != user_name: + return False + if md_user_name is None and md_mem_cube is None and user_id is None: + return False + + if memory_type: + md_type = md.get("memory_type") + if md_type not in memory_type: + return False + + if filter: + for k, v in filter.items(): + if item.get(k) != v and md.get(k) != v: + return False + + return True + + filtered = [m for m in all_memories if _match(m)] + + # Count is best-effort; if unsupported for nested filters, fallback to page length. + try: + total_nodes = int(self.vector_db.count(qdrant_filter if qdrant_filter else None)) + except Exception: + total_nodes = len(filtered) + + return {"nodes": filtered, "total_nodes": total_nodes} def delete(self, memory_ids: list[str]) -> None: """Delete a memory.""" @@ -216,6 +320,13 @@ def drop( ) -> None: pass + def add_rawfile_nodes_n_edges(self, *args, **kwargs) -> None: + """Compatibility no-op for tree_text API. + + general_text backend does not maintain graph nodes/edges for raw files. + """ + return None + def _embed_one_sentence(self, sentence: str) -> list[float]: """Embed a single sentence.""" return self.embedder.embed([sentence])[0] diff --git a/src/memos/memories/textual/tree_text_memory/organize/reorganizer.py b/src/memos/memories/textual/tree_text_memory/organize/reorganizer.py index b7fb6b1a0..7b381a007 100644 --- a/src/memos/memories/textual/tree_text_memory/organize/reorganizer.py +++ b/src/memos/memories/textual/tree_text_memory/organize/reorganizer.py @@ -150,24 +150,28 @@ def _run_message_consumer_loop(self): ) def _run_structure_organizer_loop(self): """ - Use schedule library to periodically trigger structure optimization. - This runs until the stop flag is set. + Periodically trigger structure optimization. + Runs on new-node events AND every ~5 minutes to mop up orphans. """ - import schedule + logger.info("Structure optimizer loop started.") + last_periodic_run = 0 + periodic_interval = 300 # 5 minutes - schedule.every(100).seconds.do(self.optimize_structure, scope="LongTermMemory") - schedule.every(100).seconds.do(self.optimize_structure, scope="UserMemory") - - logger.info("Structure optimizer schedule started.") while not getattr(self, "_stop_scheduler", False): if any(self._is_optimizing.values()): time.sleep(1) continue - if self._reorganize_needed: - logger.info("[Reorganizer] Triggering optimize_structure due to new nodes.") + + now = time.time() + should_run = self._reorganize_needed or (now - last_periodic_run >= periodic_interval) + + if should_run: + reason = "new nodes" if self._reorganize_needed else "periodic" + logger.info(f"[Reorganizer] Triggering optimize_structure ({reason}).") self.optimize_structure(scope="LongTermMemory") self.optimize_structure(scope="UserMemory") self._reorganize_needed = False + last_periodic_run = now time.sleep(30) def stop(self): @@ -212,8 +216,8 @@ def optimize_structure( self, scope: str = "LongTermMemory", local_tree_threshold: int = 10, - min_cluster_size: int = 4, - min_group_size: int = 20, + min_cluster_size: int = 2, + min_group_size: int = 3, max_duration_sec: int = 600, user_name: str | None = None, ): @@ -456,7 +460,7 @@ def _local_subcluster( install_command="pip install scikit-learn", install_link="https://scikit-learn.org/stable/install.html", ) - def _partition(self, nodes, min_cluster_size: int = 10, max_cluster_size: int = 20): + def _partition(self, nodes, min_cluster_size: int = 2, max_cluster_size: int = 20): """ Partition nodes by: - If total nodes <= max_cluster_size -> return all nodes in one cluster. diff --git a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py index cc269e8c4..2512edd45 100644 --- a/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py +++ b/src/memos/memories/textual/tree_text_memory/retrieve/searcher.py @@ -306,10 +306,9 @@ def _parse_task( ) query = parsed_goal.rephrased_query or query - # if goal has extra memories, embed them too - if parsed_goal.memories: - embed_texts = list(dict.fromkeys([query, *parsed_goal.memories])) - query_embedding = self.embedder.embed(embed_texts) + # Always embed the query; include extra memories from parsed goal if present + embed_texts = list(dict.fromkeys([query, *(parsed_goal.memories or [])])) + query_embedding = self.embedder.embed(embed_texts) return parsed_goal, query_embedding, context, query @timed diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index d890c77bf..3a36eb90a 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -849,12 +849,12 @@ def _process_text_mem( target_session_id = add_req.session_id or "default_session" # Decide extraction mode: - # - async: always fast (ignore add_req.mode) - # - sync: use add_req.mode == "fast" to switch to fast pipeline, otherwise fine + # - Patched: always use "fine" to ensure embeddings are generated + # - Original fast mode skips embedding generation, breaking Qdrant persistence if sync_mode == "async": - extract_mode = "fast" + extract_mode = "fine" else: # sync - extract_mode = "fast" if add_req.mode == "fast" else "fine" + extract_mode = "fine" if add_req.mode != "fast" else "fine" self.logger.info( "[SingleCubeView] cube=%s Processing text memory " diff --git a/src/memos/settings.py b/src/memos/settings.py index 3b3f05ebd..82e104e25 100644 --- a/src/memos/settings.py +++ b/src/memos/settings.py @@ -4,7 +4,7 @@ MEMOS_DIR = Path(os.getenv("MEMOS_BASE_PATH", Path.cwd())) / ".memos" -DEBUG = False +DEBUG = os.getenv("MEMOS_DEBUG", "false").lower() == "true" # "memos" or "memos.submodules" ... to filter logs from specific packages LOG_FILTER_TREE_PREFIX = "" diff --git a/src/memos/vec_dbs/qdrant.py b/src/memos/vec_dbs/qdrant.py index d0853c4af..7befd0249 100644 --- a/src/memos/vec_dbs/qdrant.py +++ b/src/memos/vec_dbs/qdrant.py @@ -172,13 +172,13 @@ def _dict_to_filter(self, filter_dict: dict[str, Any]) -> Any: return models.Filter(must=conditions) - def get_by_id(self, id: str) -> VecDBItem | None: + def get_by_id(self, id: str, include_vectors: bool = False) -> VecDBItem | None: """Get a single item by ID.""" response = self.client.retrieve( collection_name=self.config.collection_name, ids=[id], with_payload=True, - with_vectors=True, + with_vectors=include_vectors, ) if not response: @@ -191,13 +191,13 @@ def get_by_id(self, id: str) -> VecDBItem | None: payload=point.payload, ) - def get_by_ids(self, ids: list[str]) -> list[VecDBItem]: + def get_by_ids(self, ids: list[str], include_vectors: bool = False) -> list[VecDBItem]: """Get multiple items by their IDs.""" response = self.client.retrieve( collection_name=self.config.collection_name, ids=ids, with_payload=True, - with_vectors=True, + with_vectors=include_vectors, ) if not response: @@ -212,29 +212,87 @@ def get_by_ids(self, ids: list[str]) -> list[VecDBItem]: for point in response ] - def get_by_filter(self, filter: dict[str, Any], scroll_limit: int = 100) -> list[VecDBItem]: + def get_by_filter( + self, + filter: dict[str, Any], + scroll_limit: int = 100, + page: int | None = None, + page_size: int | None = None, + include_vectors: bool = False, + ) -> list[VecDBItem]: """ - Retrieve all items that match the given filter criteria. + Retrieve items matching filter criteria, with optional pagination. Args: filter: Payload filters to match against stored items scroll_limit: Maximum number of items to retrieve per scroll request + page: 1-based page index for paginated retrieval + page_size: Number of items per page when page is provided + include_vectors: Whether to include vectors in response payload Returns: - List of items including vectors and payload that match the filter + List of items including payload (and vectors optionally) """ qdrant_filter = self._dict_to_filter(filter) if filter else None + + # Paginated retrieval path: avoid loading full collection. + if page is not None and page_size is not None: + page = max(1, int(page)) + page_size = max(1, int(page_size)) + start = (page - 1) * page_size + end = start + page_size + + selected_points = [] + seen = 0 + offset = None + + while len(selected_points) < page_size: + points, offset = self.client.scroll( + collection_name=self.config.collection_name, + limit=max(1, min(int(scroll_limit), 1000)), + scroll_filter=qdrant_filter, + offset=offset, + with_vectors=include_vectors, + with_payload=True, + ) + + if not points: + break + + next_seen = seen + len(points) + if next_seen > start: + slice_start = max(0, start - seen) + slice_end = min(len(points), end - seen) + if slice_start < slice_end: + selected_points.extend(points[slice_start:slice_end]) + + seen = next_seen + + if offset is None or seen >= end: + break + + logger.info( + f"Qdrant retrieve by filter completed with {len(selected_points)} results (page={page}, page_size={page_size})." + ) + return [ + VecDBItem( + id=point.id, + vector=point.vector, + payload=point.payload, + ) + for point in selected_points + ] + + # Legacy full-scan path. all_points = [] offset = None - - # Use scroll to paginate through all matching points while True: points, offset = self.client.scroll( collection_name=self.config.collection_name, - limit=scroll_limit, + limit=max(1, min(int(scroll_limit), 1000)), scroll_filter=qdrant_filter, offset=offset, - with_vectors=True, + with_vectors=include_vectors, with_payload=True, ) @@ -243,7 +301,6 @@ def get_by_filter(self, filter: dict[str, Any], scroll_limit: int = 100) -> list all_points.extend(points) - # Update offset for next iteration if offset is None: break @@ -257,9 +314,9 @@ def get_by_filter(self, filter: dict[str, Any], scroll_limit: int = 100) -> list for point in all_points ] - def get_all(self, scroll_limit=100) -> list[VecDBItem]: + def get_all(self, scroll_limit=100, include_vectors: bool = False) -> list[VecDBItem]: """Retrieve all items in the vector database.""" - return self.get_by_filter({}, scroll_limit=scroll_limit) + return self.get_by_filter({}, scroll_limit=scroll_limit, include_vectors=include_vectors) def count(self, filter: dict[str, Any] | None = None) -> int: """Count items in the database, optionally with filter."""