Conversation
Summary of ChangesHello @superintendent2521, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request represents a significant architectural change, transitioning the application's data persistence layer from MongoDB to PostgreSQL. This involves a comprehensive migration script to move existing data, a re-engineered database abstraction layer that mimics the MongoDB API for minimal code changes in the application, and updates to dependencies and configurations to support the new database system. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request initiates a migration from MongoDB to PostgreSQL, introducing a new Python script (scripts/mongo_to_postgres_migration.py) to handle the data transfer by backing up MongoDB and then upserting collections into PostgreSQL JSONB tables. The core database layer (src/database.py) has been rewritten to use asyncpg for PostgreSQL, emulating MongoDB's collection API (e.g., find, aggregate) through in-memory processing of fetched documents and defining JSONB indexes. Supporting changes include adding POSTGRES_DSN to .env, updating requirements.txt with asyncpg and pydantic-settings, and modifying src/server.py to use pydantic-settings. However, review comments highlight critical architectural and functional issues: the in-memory emulation of query operations in src/database.py will lead to severe performance and memory problems, necessitating a translation to native SQL queries; the removal of transactions for page creation in src/services/page_service.py introduces data inconsistency risks; the search functionality is broken due to the custom _matches_filter not supporting $regex; the migration script's upsert_documents performs slow one-by-one insertions instead of batch operations; the PostgresCursor's synchronous iteration blocks the event loop; and the new database connection logic lacks the resilience (retry and monitoring) of the previous MongoDB implementation.
I am having trouble creating individual review comments. Click here to see my feedback.
src/database.py (260-457)
The current implementation of PostgresCollection and PostgresCursor emulates MongoDB's query functionality by fetching all documents from a table into memory and then performing filtering, sorting, projection, and aggregation in Python. This is a major architectural issue that will lead to severe performance degradation and high memory usage as the database grows. For example, _fetch_docs loads the entire table, and methods like find, count_documents, and aggregate process this in-memory list. This negates the benefits of using a powerful database like PostgreSQL.
The correct approach is to translate the MongoDB-style queries into equivalent SQL queries that can be executed by the database engine.
- Filtering (
find) should be translated to aWHEREclause. - Counting (
count_documents) should useSELECT COUNT(*) ... WHERE .... - Aggregations (
aggregate) should be translated toGROUP BY,SUM(), etc. - Sorting should use
ORDER BY. - Projections should use
SELECT col1, col2, ....
This will be a significant refactoring, but it's essential for the application to be performant and scalable.
src/services/page_service.py (225-244)
The database transaction for creating a new page along with its talk page has been removed. Previously, this was wrapped in client.start_session() and s.start_transaction(), ensuring atomicity. Now, the two create_page calls are independent. If the second call to create the 'talk' page fails, the 'main' page will still have been created, leaving the database in an inconsistent state. Transactions should be reintroduced to ensure both pages are created successfully or neither is.
src/services/page_service.py (352-355)
The search functionality is broken. This find call uses a $regex operator in the filter. However, the custom _matches_filter function in src/database.py does not implement the $regex operator. As a result, this query will not match any documents, and the search will always return empty results.
scripts/mongo_to_postgres_migration.py (146-170)
The upsert_documents function inserts documents one by one in a loop. For large collections, this will be very slow due to the overhead of individual INSERT statements. asyncpg supports batch operations via executemany, which is significantly more performant. Consider refactoring this to use executemany to speed up the migration process.
async def upsert_documents(
pool: asyncpg.Pool, collection: str, documents: Iterable[Dict]
) -> int:
table_name = table_name_for(collection)
records_to_insert = []
for doc in documents:
doc = dict(doc)
doc_id = str(doc.get("_id") or doc.get("id") or doc.get("uuid") or "")
if not doc_id:
continue
clean_doc = sanitize_document(doc)
clean_doc["_id"] = doc_id
json_payload = json.dumps(clean_doc, ensure_ascii=False)
records_to_insert.append((doc_id, json_payload))
if not records_to_insert:
return 0
async with pool.acquire() as conn:
async with conn.transaction():
stmt = await conn.prepare(
f"""
INSERT INTO {table_name} (id, doc)
VALUES ($1, $2::jsonb)
ON CONFLICT (id) DO UPDATE SET doc = EXCLUDED.doc
"""
)
await stmt.executemany(records_to_insert)
return len(records_to_insert)src/database.py (248-254)
The synchronous iterator __iter__ uses asyncio.run(self._ensure_loaded()). This will block the entire event loop and will raise a RuntimeError if called from within an already running async function, as correctly checked. This design makes it impossible to use synchronous iteration on a cursor from within async code (e.g., for doc in my_cursor: inside an async def function). This is a significant limitation and can lead to unexpected errors. It's generally better to require async iteration (async for) for async database drivers and avoid mixing sync/async paradigms in this way.
src/database.py (114-182)
The database connection logic has lost its resilience. The previous implementation had a connect method with retry logic and a monitor_connection background task to handle disconnects. The new implementation attempts to connect only once, and if it fails, the database remains disconnected. This makes the application less robust to transient network issues or database restarts. Consider reintroducing connection retry logic and a monitoring task.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request represents a significant and impressive migration of the database backend from MongoDB to PostgreSQL. The new database abstraction layer in src/database.py is well-designed, emulating the motor API on top of Postgres's JSONB features. The refactoring of complex queries, such as log aggregation, to use raw SQL will yield substantial performance benefits.
However, the review has identified several critical issues that must be addressed. Most importantly, several unique indexes from the MongoDB schema have not been correctly translated to unique constraints in Postgres, which introduces a serious data integrity risk. Additionally, there is a major feature regression in the page search functionality and a high-severity bug in the update_many implementation. I have provided detailed comments and suggestions for these and other medium-severity issues related to performance and code quality.
| INDEX_SPECS = { | ||
| "pages": [ | ||
| ("title_branch", "((doc->>'title'), (doc->>'branch'))"), | ||
| ("branch", "((doc->>'branch'))"), | ||
| ("updated_at", "((doc->>'updated_at'))"), | ||
| ("title_trgm", "USING gin ((doc->>'title') gin_trgm_ops)"), | ||
| ], | ||
| "sessions": [ | ||
| ("session_id", {"unique": True}), | ||
| ("user_id", {}), | ||
| ("expires_at", {"expireAfterSeconds": 0}), | ||
| "history": [ | ||
| ("title_branch", "((doc->>'title'), (doc->>'branch'))"), | ||
| ("updated_at", "((doc->>'updated_at'))"), | ||
| ], | ||
| "branches": [ | ||
| ("page_branch", "((doc->>'page_title'), (doc->>'branch_name'))"), | ||
| ("created_at", "((doc->>'created_at'))"), | ||
| ("branch_name", "((doc->>'branch_name'))"), | ||
| ], | ||
| "users": [ | ||
| ("username", "((doc->>'username'))"), | ||
| ("email", "((doc->>'email'))"), | ||
| ], | ||
| "image_hashes": [ | ||
| ("filename", {"unique": True}), | ||
| ("sha256", {}), | ||
| ("filename", "((doc->>'filename'))"), | ||
| ("sha256", "((doc->>'sha256'))"), | ||
| ], | ||
| "analytics_events": [ | ||
| ([("event_type", 1), ("timestamp", -1)], {}), | ||
| ("timestamp", {}), | ||
| ("query_normalized", {}), | ||
| ("event_type_timestamp", "((doc->>'event_type'), ((doc->>'timestamp')::timestamptz))"), | ||
| ("query_normalized_trgm", "USING gin ((doc->>'query_normalized') gin_trgm_ops)"), | ||
| ("timestamp_only", "(((doc->>'timestamp'))::timestamptz)"), | ||
| ], | ||
| "sessions": [ | ||
| ("session_id", "((doc->>'session_id'))"), | ||
| ("expires_at", "(((doc->>'expires_at'))::timestamptz)"), | ||
| ("user_id", "((doc->>'user_id'))"), | ||
| ], | ||
| "settings": [ | ||
| ("doc_id", "((doc->>'_id'))"), | ||
| ], | ||
| "system_logs": [ | ||
| ("action_timestamp", "((doc->>'action'), ((doc->>'timestamp')::timestamptz))"), | ||
| ("timestamp_only", "(((doc->>'timestamp'))::timestamptz)"), | ||
| ], | ||
| } |
There was a problem hiding this comment.
Several indexes that were unique in the MongoDB schema are no longer defined as unique in the new INDEX_SPECS. This is a critical data integrity issue that could lead to duplicate data. Specifically:
pages: The index on(title, branch)should be unique.users: The indexes onusernameandemailshould be unique.image_hashes: The index onfilenameshould be unique.sessions: The index onsession_idshould be unique.
Please update the index definitions to include the UNIQUE constraint. I've also renamed the index suffixes to reflect this (e.g., _unique).
INDEX_SPECS = {
"pages": [
("title_branch_unique", "UNIQUE ((doc->>'title'), (doc->>'branch'))"),
("branch", "((doc->>'branch'))"),
("updated_at", "((doc->>'updated_at'))"),
("title_trgm", "USING gin ((doc->>'title') gin_trgm_ops)"),
],
"history": [
("title_branch", "((doc->>'title'), (doc->>'branch'))"),
("updated_at", "((doc->>'updated_at'))"),
],
"branches": [
("page_branch", "((doc->>'page_title'), (doc->>'branch_name'))"),
(
"created_at",
"((doc->>'created_at'))",
),
("branch_name", "((doc->>'branch_name'))"),
],
"users": [
("username_unique", "UNIQUE ((doc->>'username'))"),
("email_unique", "UNIQUE ((doc->>'email'))"),
],
"image_hashes": [
("filename_unique", "UNIQUE ((doc->>'filename'))"),
("sha256", "((doc->>'sha256'))"),
],
"analytics_events": [
(
"event_type_timestamp",
"((doc->>'event_type'), ((doc->>'timestamp')::timestamptz))",
),
(
"query_normalized_trgm",
"USING gin ((doc->>'query_normalized') gin_trgm_ops)",
),
("timestamp_only", "(((doc->>'timestamp'))::timestamptz)"),
],
"sessions": [
("session_id_unique", "UNIQUE ((doc->>'session_id'))"),
("expires_at", "(((doc->>'expires_at'))::timestamptz)"),
("user_id", "((doc->>'user_id'))"),
],
"settings": [("doc_id", "((doc->>'_id'))")],
"system_logs": [
(
"action_timestamp",
"((doc->>'action'), ((doc->>'timestamp')::timestamptz))",
),
("timestamp_only", "(((doc->>'timestamp'))::timestamptz)"),
],
}| async def update_many( | ||
| self, | ||
| filt: Dict[str, Any], | ||
| update: Dict[str, Any], | ||
| *, | ||
| upsert: bool = False, | ||
| connection: Optional[asyncpg.Connection] = None, | ||
| ) -> UpdateResult: | ||
| await self._ensure_table() | ||
| params: List[Any] = [] | ||
| where_clause = self._build_where_clause(filt or {}, params) | ||
| update_expr = self._build_update_expression(update, params) | ||
|
|
||
| query = f"UPDATE {self._table_name} SET doc = {update_expr}" | ||
| if where_clause: | ||
| query += f" WHERE {where_clause}" | ||
| query += " RETURNING 1" | ||
|
|
||
| rows = await self._db.fetch(query, *params, conn=connection) | ||
| matched = len(rows) | ||
| modified = matched | ||
| upserted_id = None | ||
|
|
||
| if upsert and matched == 0: | ||
| base = {k: v for k, v in filt.items() if not isinstance(v, dict)} | ||
| new_doc = _apply_update(base, update) | ||
| result = await self.insert_one(new_doc, connection=connection) | ||
| upserted_id = result.inserted_id | ||
| modified += 1 | ||
|
|
||
| return UpdateResult(matched_count=matched, modified_count=modified, upserted_id=upserted_id) |
There was a problem hiding this comment.
There is a bug in the upsert logic for update_many. When no documents match the filter, it attempts to create a new document. However, the base for this new document is created using base = {k: v for k, v in filt.items() if not isinstance(v, dict)}, which only handles simple equality filters. If the filter contains operators (e.g., {'field': {'$gt': 5}}), the base dictionary will be empty, and the created document will be incorrect. This can lead to data corruption and does not match MongoDB's behavior.
| @classmethod | ||
| async def _maybe_enforce_retention(cls, collection) -> None: | ||
| """Delete analytics events older than the retention window on a fixed cadence.""" | ||
| now = _utcnow() | ||
| if cls._last_retention_check and now - cls._last_retention_check < cls._RETENTION_CHECK_INTERVAL: | ||
| return | ||
| cutoff = now - timedelta(days=cls._RETENTION_DAYS) | ||
| try: | ||
| await collection.delete_many({"timestamp": {"$lt": cutoff}}) | ||
| except Exception as exc: # IGNORE W0718 | ||
| logger.warning(f"Failed to enforce analytics retention: {exc}") | ||
| cls._last_retention_check = now |
There was a problem hiding this comment.
Calling _maybe_enforce_retention after every event insertion is inefficient. While the internal time check prevents the DELETE query from running every time, it still adds overhead to every write operation. A more robust approach would be to run this retention logic in a separate, periodic background task (e.g., using asyncio.create_task on application startup) that runs once every few hours.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request represents a significant and well-executed migration from MongoDB to PostgreSQL. The introduction of a new database abstraction layer in src/database.py to emulate the MongoDB collection API is a clever approach to minimize changes in the service layer. The migration script is thorough, and many parts of the application have been refactored to leverage PostgreSQL's features, such as the improved search and log aggregation. My review focuses on some correctness issues in the database emulation layer, a few performance considerations, and a regression in observability. Overall, this is a fantastic piece of engineering.
| if op == "$exists": | ||
| clauses.append(f"{expr} IS NOT NULL" if raw_expected else f"{expr} IS NULL") | ||
| continue |
There was a problem hiding this comment.
The implementation of the $exists operator is incorrect. The current logic ({expr}) IS NOT NULL checks if the value of a field is not SQL NULL. However, in JSONB, if a key does not exist, extracting it with ->> also results in SQL NULL. Therefore, this check cannot distinguish between a non-existent key and a key that exists with a null value.
To correctly check for the existence of a key, you should use the #> operator (which _json_path_expr(as_text=False) provides) and check if the result IS NOT NULL.
| if op == "$exists": | |
| clauses.append(f"{expr} IS NOT NULL" if raw_expected else f"{expr} IS NULL") | |
| continue | |
| if op == "$exists": | |
| # Use #> to check for path existence, as #>> returns null for both | |
| # non-existent keys and keys with an explicit null value. | |
| expr_for_exists = self._json_path_expr(key, as_text=False) | |
| clauses.append(f"{expr_for_exists} IS NOT NULL" if raw_expected else f"{expr_for_exists} IS NULL") | |
| continue |
| async def insert_one( | ||
| self, document: Dict[str, Any], *, connection: Optional[asyncpg.Connection] = None | ||
| ) -> InsertOneResult: | ||
| await self._ensure_table() | ||
| doc = dict(document) | ||
| if "_id" not in doc: | ||
| doc["_id"] = str(uuid.uuid4()) | ||
| json_payload = json.dumps(_jsonable(doc), ensure_ascii=False) | ||
| await self._db.execute( | ||
| f""" | ||
| INSERT INTO {self._table_name} (id, doc) | ||
| VALUES ($1, $2::jsonb) | ||
| ON CONFLICT (id) DO UPDATE SET doc = EXCLUDED.doc | ||
| """, | ||
| str(doc["_id"]), | ||
| json_payload, | ||
| conn=connection, | ||
| ) | ||
| return InsertOneResult(inserted_id=str(doc["_id"])) |
There was a problem hiding this comment.
The implementation of insert_one uses ON CONFLICT (id) DO UPDATE, which makes it behave as an 'upsert' operation. This differs from the standard pymongo.insert_one behavior, which would raise a DuplicateKeyError on an ID collision. This could lead to unexpected silent overwrites of data if the calling code assumes the original behavior. If the intention is to prevent overwrites, you should remove the ON CONFLICT clause and let the database raise a unique constraint violation error, which can then be handled by the caller.
| if group_stage is None: | ||
| # Fall back to a find-like query if no grouping is requested | ||
| combined_sorts: List[tuple[str, int]] = [] | ||
| if sort_stage: | ||
| combined_sorts.extend(list(sort_stage.items())) | ||
| combined_sorts.extend(sorts) | ||
| effective_limit = limit_stage if limit_stage is not None else limit | ||
| results = await self._find_docs(match_filter, None, combined_sorts, effective_limit) | ||
| if project_stage: | ||
| results = self._apply_project_stage(results, project_stage) | ||
| return results |
There was a problem hiding this comment.
In the aggregation pipeline, when no $group stage is present, the implementation falls back to _find_docs. However, it passes projection=None and then applies the $project stage in Python (_apply_project_stage). This is inefficient, as it fetches full documents from the database only to discard fields in the application.
You could improve performance by translating the $project stage into a projection dictionary and passing it to _find_docs. This would allow the database to handle the projection for include-only cases, reducing network traffic and memory usage.
be81c1f to
bcde4e7
Compare
|
clusterfuck |
/gemini review
long term pr