Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/memos/graph_dbs/neo4j_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def add_node(
# Safely process metadata
metadata = _prepare_node_metadata(metadata)

# Flatten info fields to top level (for Neo4j flat structure)
metadata = _flatten_info_fields(metadata)

# Initialize delete_time and delete_record_id fields
metadata.setdefault("delete_time", "")
metadata.setdefault("delete_record_id", "")
Expand Down
9 changes: 9 additions & 0 deletions src/memos/mem_feedback/feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ def _single_add_operation(
)

logger.info(f"[Memory Feedback ADD] memory id: {added_ids!s}")
if not added_ids:
logger.warning(
"[Memory Feedback ADD] add returned empty list, memory may not have been persisted"
)
return {
"id": None,
"text": to_add_memory.memory,
"source_doc_id": None,
}
return {
"id": added_ids[0],
"text": to_add_memory.memory,
Expand Down
97 changes: 97 additions & 0 deletions tests/graph_dbs/test_neo4j_community_flatten_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
Regression tests for issue #1122:
Neo4jCommunityGraphDB.add_node() must flatten nested 'info' dict in metadata
to avoid Neo4j CypherTypeError on Map-type property values.
"""

import uuid

from datetime import datetime
from unittest.mock import MagicMock, patch

import pytest

from memos.configs.graph_db import Neo4jGraphDBConfig


@pytest.fixture
def neo4j_community_config():
return Neo4jGraphDBConfig(
uri="bolt://localhost:7687",
user="neo4j",
password="test",
db_name="neo4j",
auto_create=False,
use_multi_db=False,
user_name="test-user",
embedding_dimension=3,
)


@pytest.fixture
def neo4j_community_db(neo4j_community_config):
with patch("neo4j.GraphDatabase") as mock_gd:
mock_driver = MagicMock()
mock_gd.driver.return_value = mock_driver

from memos.graph_dbs.neo4j_community import Neo4jCommunityGraphDB

db = object.__new__(Neo4jCommunityGraphDB)
db.config = neo4j_community_config
db.driver = mock_driver
db.db_name = neo4j_community_config.db_name
db.vec_db = MagicMock()
yield db


class TestNeo4jCommunityFlattenInfo:
"""Regression: add_node must flatten nested info dict before passing to Neo4j."""

def test_add_node_flattens_info_field(self, neo4j_community_db):
"""Nested 'info' dict should be flattened to top-level keys in metadata."""
node_id = str(uuid.uuid4())
memory = "User prefers Python for AI development"
now = datetime.utcnow().isoformat()
metadata = {
"embedding": [0.1, 0.2, 0.3],
"created_at": now,
"updated_at": now,
"sources": [],
"info": {
"preference": "python",
},
}

session_mock = neo4j_community_db.driver.session.return_value.__enter__.return_value

neo4j_community_db.add_node(
id=node_id, memory=memory, metadata=metadata, user_name="test-user"
)

call_args = session_mock.run.call_args
passed_metadata = call_args.kwargs.get("metadata", call_args[1].get("metadata", {}))

assert "info" not in passed_metadata, (
f"'info' field was not flattened: metadata={passed_metadata}"
)
assert passed_metadata.get("preference") == "python"

def test_add_node_without_info_field_still_works(self, neo4j_community_db):
"""add_node should work normally when metadata has no 'info' field."""
node_id = str(uuid.uuid4())
memory = "Simple memory"
now = datetime.utcnow().isoformat()
metadata = {
"embedding": [0.1, 0.2, 0.3],
"created_at": now,
"updated_at": now,
"sources": [],
}

session_mock = neo4j_community_db.driver.session.return_value.__enter__.return_value

neo4j_community_db.add_node(
id=node_id, memory=memory, metadata=metadata, user_name="test-user"
)

assert session_mock.run.called
Empty file added tests/mem_feedback/__init__.py
Empty file.
78 changes: 78 additions & 0 deletions tests/mem_feedback/test_feedback_empty_added_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Regression tests for issue #1122 (Bug #2):
MemFeedback._single_add_operation() must handle empty added_ids
without raising IndexError.
"""

import uuid

from datetime import datetime
from unittest.mock import MagicMock

import pytest

from memos.mem_feedback.feedback import MemFeedback
from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata


def _make_memory_item(memory_text="I prefer Python for AI development"):
"""Create a minimal TextualMemoryItem for testing."""
return TextualMemoryItem(
id=str(uuid.uuid4()),
memory=memory_text,
metadata=TreeNodeTextualMemoryMetadata(
user_id="user1",
memory_type="WorkingMemory",
sources=[],
embedding=[0.1, 0.2, 0.3],
created_at=datetime.now().isoformat(),
background="",
key="test-key",
tags=[],
),
)


@pytest.fixture
def feedback_instance():
"""Create a MemFeedback instance bypassing __init__."""
fb = object.__new__(MemFeedback)
fb.memory_manager = MagicMock()
fb.embedder = MagicMock()
return fb


class TestSingleAddOperationEmptyIds:
"""Test _single_add_operation when memory_manager.add() returns empty list."""

def test_empty_added_ids_returns_none_id(self, feedback_instance):
"""When added_ids is empty, should return {"id": None} instead of IndexError."""
feedback_instance.memory_manager.add.return_value = []

new_item = _make_memory_item()
result = feedback_instance._single_add_operation(
old_memory_item=None,
new_memory_item=new_item,
user_id="user1",
user_name="test-user",
)

assert result["id"] is None
assert result["text"] == new_item.memory
assert result["source_doc_id"] is None

def test_normal_added_ids_returns_first_id(self, feedback_instance):
"""When added_ids has values, should return the first id normally."""
expected_id = str(uuid.uuid4())
feedback_instance.memory_manager.add.return_value = [expected_id]

new_item = _make_memory_item()
result = feedback_instance._single_add_operation(
old_memory_item=None,
new_memory_item=new_item,
user_id="user1",
user_name="test-user",
)

assert result["id"] == expected_id
assert result["text"] == new_item.memory