diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index f1c6cbfd..5f4aa967 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -2,12 +2,14 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from __future__ import annotations +from contextlib import contextmanager import json import logging import os +import threading import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum from typing import Optional, List, Dict, Set, Tuple from dataclasses import dataclass, asdict @@ -17,11 +19,11 @@ from flask_login import current_user from pygeodiff import GeoDiff from sqlalchemy import text, null, desc, nullslast, tuple_ -from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM +from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert from sqlalchemy.types import String from sqlalchemy.ext.hybrid import hybrid_property from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError -from flask import current_app +from flask import Flask, current_app from .files import ( DeltaChangeMerged, @@ -44,7 +46,6 @@ LOG_BASE, Checkpoint, generate_checksum, - Toucher, get_chunk_location, get_project_path, is_supported_type, @@ -1805,6 +1806,11 @@ class Upload(db.Model): db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True ) created = db.Column(db.DateTime, default=datetime.utcnow) + # last ping time to determine if upload is still active + last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + transaction_id = db.Column( + UUID(as_uuid=True), unique=True, nullable=False, index=True + ) user = db.relationship("User") project = db.relationship( @@ -1822,28 +1828,173 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int): self.version = version self.changes = ChangesSchema().dump(changes) self.user_id = user_id + self.transaction_id = str(uuid.uuid4()) + + @classmethod + def create_upload( + cls, project_id: str, version: int, changes: dict, user_id: int + ) -> Upload | None: + """Create upload session, it can either create a new record or handover an existing one but with new transaction id + Old transaction folder is removed and new one is created. + """ + now = datetime.now(timezone.utc).replace(tzinfo=None) + expiration = current_app.config["LOCKFILE_EXPIRATION"] + new_tx_id = str(uuid.uuid4()) + + # CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot) + # NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload + existing_cte = ( + db.select(Upload.transaction_id) + .where( + Upload.project_id == project_id, + Upload.version == version, + ) + .cte("existing") + ) + + stmt = ( + insert(Upload) + .values( + id=str(uuid.uuid4()), + transaction_id=new_tx_id, + project_id=project_id, + version=version, + user_id=user_id, + last_ping=now, + changes=ChangesSchema().dump(changes), + ) + .add_cte(existing_cte) + ) + + upsert_stmt = stmt.on_conflict_do_update( + constraint="uq_upload_project_id", + set_={ + "transaction_id": new_tx_id, + "user_id": user_id, + "last_ping": now, + "changes": ChangesSchema().dump(changes), + }, + # ONLY update if the existing row is stale + where=(Upload.last_ping < (now - timedelta(seconds=expiration))), + ) + + upsert_stmt = upsert_stmt.returning( + Upload, + db.select(existing_cte.c.transaction_id) + .scalar_subquery() + .label("old_transaction_id"), + ) + + result = db.session.execute(upsert_stmt).fetchone() + db.session.commit() + + # if nothing returned, it means the WHERE clause failed (active upload) + if not result: + return + + upload = result.Upload + old_transaction_id = result.old_transaction_id + + try: + os.makedirs(upload.upload_dir) + + # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload + if old_transaction_id: + upload.project.sync_failed( + "", "push_lost", "Push artefact removed by subsequent push", user_id + ) + if os.path.exists( + os.path.join( + upload.project.storage.project_dir, + "tmp", + str(old_transaction_id), + ) + ): + move_to_tmp( + os.path.join( + upload.project.storage.project_dir, + "tmp", + str(old_transaction_id), + ), + str(old_transaction_id), + ) + except OSError as err: + # filesystem setup failed after the DB row was already committed. + # delete the row immediately so the next attempt isn't blocked until expiration. + db.session.delete(upload) + db.session.commit() + logging.error(f"Failed to create upload directory: {err}") + return + + return upload @property def upload_dir(self): - return os.path.join(self.project.storage.project_dir, "tmp", self.id) + return os.path.join( + self.project.storage.project_dir, "tmp", str(self.transaction_id) + ) - @property - def lockfile(self): - return os.path.join(self.upload_dir, "lockfile") - - def is_active(self): - """Check if upload is still active because there was a ping (lockfile update) from underlying process""" - return os.path.exists(self.lockfile) and ( - time.time() - os.path.getmtime(self.lockfile) - < current_app.config["LOCKFILE_EXPIRATION"] + def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int): + """ + Background task: Runs as a Thread, it is compatible with Sync (direct) or Gevent (monkey-patch) worker type. + Uses a fresh engine connection to stay pool-efficient. + """ + # manual context push is required for background execution + with app.app_context(): + while not stop_event.is_set(): + try: + # db.engine.begin() is efficient and isolated, it immediately returns a connection to the pool + with db.engine.begin() as conn: + conn.execute( + db.text( + "UPDATE upload SET last_ping = NOW() WHERE id = :id" + ), + {"id": self.id}, + ) + except Exception as e: + logging.exception( + f"Upload heartbeat failed for ID {self.project_id} and version {self.version}: {e}" + ) + + # wait for x seconds, but wake up immediately if stop_event is set + stop_event.wait(timeout) + + @contextmanager + def heartbeat(self, timeout: int = 5): + """ + Context manager to be used inside a Flask route. + + Example of usage: + ----------------- + with upload.heartbeat(interval): + do_something_slow + """ + # we need to pass a real Flask app object to the thread + app = current_app._get_current_object() + stop_event = threading.Event() + + bg = threading.Thread( + target=self._heartbeat_task, args=(app, stop_event, timeout), daemon=True ) + bg.start() + try: + yield + finally: + # signal the loop to stop + stop_event.set() + + # wait for the task to finish its last SQL call. + # in Gevent, this yields to other requests (non-blocking), while in Sync, this blocks the current thread for up to 2s + # this is to protect main thread / greenlet from zombie bg processes + bg.join(timeout=2) + def clear(self): """Clean up pending upload. Uploaded files and table records are removed, and another upload can start. """ try: - move_to_tmp(self.upload_dir, self.id) + move_to_tmp(self.upload_dir, str(self.transaction_id)) db.session.delete(self) db.session.commit() except Exception: @@ -1864,7 +2015,7 @@ def process_chunks( to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] current_files = [f for f in self.project.files if f.path not in to_remove] - with Toucher(self.lockfile, 5): + with self.heartbeat(5): for f in file_changes: if f.change == PushChangeType.DELETE: continue diff --git a/server/mergin/sync/permissions.py b/server/mergin/sync/permissions.py index e155020a..4fe3ad54 100644 --- a/server/mergin/sync/permissions.py +++ b/server/mergin/sync/permissions.py @@ -271,8 +271,10 @@ def check_project_permissions( return None -def get_upload(transaction_id): - upload = Upload.query.get_or_404(transaction_id) +def get_upload_or_fail(transaction_id: str) -> Upload: + if not is_valid_uuid(transaction_id): + abort(404) + upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404() # upload to 'removed' projects is forbidden if upload.project.removed_at: abort(404) @@ -280,8 +282,7 @@ def get_upload(transaction_id): if upload.user_id != current_user.id: abort(403, "You do not have permissions for ongoing upload") - upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", transaction_id) - return upload, upload_dir + return upload def projects_query(permission, as_admin=True, public=True): diff --git a/server/mergin/sync/public_api.yaml b/server/mergin/sync/public_api.yaml index 5227b562..157e8262 100644 --- a/server/mergin/sync/public_api.yaml +++ b/server/mergin/sync/public_api.yaml @@ -699,7 +699,7 @@ paths: - do integrity check comparing uploaded file sizes with what was expected - move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset) - bump up version in database - - remove artifacts (chunks, lockfile) by moving them to tmp directory" + - remove artifacts (chunks) by moving them to tmp directory" operationId: push_finish parameters: - name: transaction_id diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 8e2b0ea8..8f142e71 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -25,7 +25,7 @@ from pygeodiff import GeoDiffLibError from flask_login import current_user from sqlalchemy import and_, desc, asc -from sqlalchemy.exc import IntegrityError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from gevent import sleep import base64 from werkzeug.exceptions import HTTPException, Conflict @@ -70,12 +70,11 @@ require_project, projects_query, ProjectPermissions, - get_upload, + get_upload_or_fail, require_project_by_uuid, ) from .utils import ( generate_checksum, - Toucher, get_ip, get_user_agent, generate_location, @@ -775,13 +774,6 @@ def project_push(namespace, project_name): if all(len(changes[key]) == 0 for key in changes.keys()): abort(400, "No changes") - # reject upload early if there is another one already running - pending_upload = Upload.query.filter_by( - project_id=project.id, version=version - ).first() - if pending_upload and pending_upload.is_active(): - abort(400, "Another process is running. Please try later.") - try: ChangesSchema().validate(changes) upload_changes = ChangesSchema().dump(changes) @@ -813,47 +805,22 @@ def project_push(namespace, project_name): if requested_storage > ws.storage: return StorageLimitHit(current_usage, ws.storage).response(422) - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) try: - # Creating upload transaction with different project's version is possible. - db.session.commit() + upload = Upload.create_upload( + project.id, version, upload_changes, current_user.id + ) + if not upload: + abort(400, "Another process is running. Please try later.") + logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" + f"Upload transaction {upload.transaction_id} created for project: {project.id}, version: {version}" ) - except IntegrityError: + except (IntegrityError, SQLAlchemyError) as err: db.session.rollback() - # check and clean dangling uploads or abort - for current_upload in project.uploads.all(): - if current_upload.is_active(): - abort(400, "Another process is running. Please try later.") - db.session.delete(current_upload) - db.session.commit() - # previous push attempt is definitely lost - project.sync_failed( - "", - "push_lost", - "Push artefact removed by subsequent push", - current_user.id, - ) - - # Try again after cleanup - db.session.add(upload) - try: - db.session.commit() - logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" - ) - move_to_tmp(upload.upload_dir) - except IntegrityError as err: - logging.error(f"Failed to create upload session: {str(err)}") - abort(422, "Failed to create upload session. Please try later.") + logging.exception(f"Failed to create upload: {str(err)}") + abort(422, "Failed to create upload session. Please try later.") - # Create transaction folder and lockfile - os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() - - # Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit + # Update immediately without uploading of new/modified files and remove transaction after successful commit if not (changes["added"] or changes["updated"]): next_version = version + 1 file_changes = files_changes_from_upload( @@ -876,7 +843,7 @@ def project_push(namespace, project_name): db.session.commit() logging.info( f"A project version {ProjectVersion.to_v_name(next_version)} for project: {project.id} created. " - f"Transaction id: {upload.id}. No upload." + f"Transaction id: {upload.transaction_id}. No upload." ) project_version_created.send(pv) push_finished.send(pv) @@ -884,7 +851,7 @@ def project_push(namespace, project_name): except IntegrityError as err: db.session.rollback() logging.exception( - f"Failed to upload a new project version using transaction id: {upload.id}: {str(err)}" + f"Failed to upload a new project version using transaction id: {upload.transaction_id}: {str(err)}" ) abort(422, "Failed to upload a new project version. Please try later.") except gevent.timeout.Timeout: @@ -893,7 +860,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id}, 200 + return {"transaction": upload.transaction_id}, 200 @auth_required @@ -910,7 +877,7 @@ def chunk_upload(transaction_id, chunk_id): :rtype: Dict """ - upload, upload_dir = get_upload(transaction_id) + upload = get_upload_or_fail(transaction_id) request.view_args["project"] = upload.project chunks = [] for file in upload.changes["added"] + upload.changes["updated"]: @@ -919,8 +886,8 @@ def chunk_upload(transaction_id, chunk_id): if chunk_id not in chunks: abort(404) - dest = os.path.join(upload_dir, "chunks", chunk_id) - with Toucher(upload.lockfile, 30): + dest = os.path.join(upload.upload_dir, "chunks", chunk_id) + with upload.heartbeat(30): try: # we could have used request.data here, but it could eventually cause OOM issue save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) @@ -945,14 +912,14 @@ def push_finish(transaction_id): - do integrity check comparing uploaded file sizes with what was expected - move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset) - bump up version in database - - remove artifacts (chunks, lockfile) by moving them to tmp directory + - remove artifacts (chunks) by moving them to tmp directory :param transaction_id: Transaction id. :type transaction_id: str :rtype: None """ - upload, upload_dir = get_upload(transaction_id) + upload = get_upload_or_fail(transaction_id) request.view_args["project"] = upload.project project = upload.project next_version = project.next_version() @@ -991,7 +958,7 @@ def push_finish(transaction_id): abort(422, f"Failed to create new version: {msg}") - files_dir = os.path.join(upload_dir, "files", v_next_version) + files_dir = os.path.join(upload.upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) if os.path.exists(target_dir): pv = ProjectVersion.query.filter_by( @@ -1009,39 +976,57 @@ def push_finish(transaction_id): move_to_tmp(target_dir) try: - user_agent = get_user_agent(request) - device_id = get_device_id(request) - pv = ProjectVersion( - project, - next_version, - current_user.id, - file_changes, - get_ip(request), - user_agent, - device_id, - ) - db.session.add(pv) - db.session.add(project) - db.session.commit() + # let's keep upload alive until all work is done so no one else can claim it + with upload.heartbeat(5): + user_agent = get_user_agent(request) + device_id = get_device_id(request) + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + user_agent, + device_id, + ) + db.session.add(pv) + db.session.add(project) - # let's move uploaded files where they are expected to be - os.renames(files_dir, version_dir) + # move files before committing so a filesystem failure leaves the DB clean + if os.path.exists(files_dir): + os.renames(files_dir, version_dir) - logging.info( - f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." - ) - project_version_created.send(pv) - push_finished.send(pv) - except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: + db.session.commit() + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." + ) + project_version_created.send(pv) + push_finished.send(pv) + except (psycopg2.Error, OSError, IntegrityError) as err: db.session.rollback() logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " f"transaction id: {transaction_id}.: {str(err)}" ) + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) abort(422, "Failed to create new version: {}".format(str(err))) # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up except gevent.timeout.Timeout: db.session.rollback() + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) raise finally: # remove artifacts @@ -1061,10 +1046,8 @@ def push_cancel(transaction_id): :rtype: None """ - upload, upload_dir = get_upload(transaction_id) - db.session.delete(upload) - db.session.commit() - move_to_tmp(upload_dir) + upload = get_upload_or_fail(transaction_id) + upload.clear() return NoContent, 200 diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 9a82a211..ebd909ad 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -15,8 +15,7 @@ from flask import abort, jsonify, current_app from flask_login import current_user from marshmallow import ValidationError -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.exc import ObjectDeletedError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from .schemas_v2 import BatchErrorSchema, ProjectSchema as ProjectSchemaV2 from ..app import db @@ -241,11 +240,6 @@ def create_project_version(id): if pv and pv.name != version: return ProjectVersionExists(version, pv.name).response(409) - # reject push if there is another one already running - pending_upload = Upload.query.filter_by(project_id=project.id).first() - if pending_upload and pending_upload.is_active(): - return AnotherUploadRunning().response(409) - try: ChangesSchema().validate(changes) upload_changes = ChangesSchema().dump(changes) @@ -296,88 +290,78 @@ def create_project_version(id): return NoContent, 204 try: - # while processing data, block other uploads - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) - # Creating blocking upload can fail, e.g. in case of racing condition - db.session.commit() - except IntegrityError: - db.session.rollback() - # check and clean dangling blocking uploads or abort - for current_upload in project.uploads.all(): - if current_upload.is_active(): - return AnotherUploadRunning().response(409) - db.session.delete(current_upload) - db.session.commit() - # previous push attempt is definitely lost - project.sync_failed( - "", - "push_lost", - "Push artefact removed by subsequent push", - current_user.id, - ) - - try: - # Try again after cleanup - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) - db.session.commit() - move_to_tmp(upload.upload_dir) - except IntegrityError as err: - logging.error(f"Failed to create upload session: {str(err)}") + upload = Upload.create_upload( + project.id, version, upload_changes, current_user.id + ) + if not upload: return AnotherUploadRunning().response(409) + except (IntegrityError, SQLAlchemyError) as err: + db.session.rollback() + logging.exception(f"Failed to create upload: {str(err)}") + return UploadError().response(422) + except OSError as err: + logging.exception(f"Failed to create upload directory: {str(err)}") + return UploadError().response(422) - # Create transaction folder and lockfile - os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() - + # this is the heavy work of processing upload data file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted if errors: upload.clear() return DataSyncError(failed_files=errors).response(422) - upload_deleted = False + if os.path.exists(version_dir): + if ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count(): + return UploadError( + error=f"Version {v_next_version} already exists" + ).response(409) + move_to_tmp(version_dir) + try: - pv = ProjectVersion( - project, - next_version, - current_user.id, - file_changes, - get_ip(request), - get_user_agent(request), - get_device_id(request), - ) - db.session.add(pv) - db.session.add(project) - db.session.commit() - - # let's move uploaded files where they are expected to be - if to_be_added_files or to_be_updated_files: - temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) - os.renames(temp_files_dir, version_dir) - - # remove used chunks - # get chunks from added and updated files - chunks_ids = [] - for file in to_be_added_files + to_be_updated_files: - file_chunks = file.get("chunks", []) - chunks_ids.extend(file_chunks) - remove_transaction_chunks.delay(chunks_ids) - - logging.info( - f"Push finished for project: {project.id}, project version: {v_next_version}." - ) - project_version_created.send(pv) - push_finished.send(pv) + # let's keep upload alive until all work is done so no one else can claim it + with upload.heartbeat(5): + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + get_user_agent(request), + get_device_id(request), + ) + db.session.add(pv) + db.session.add(project) + + # move files before committing so a filesystem failure leaves the DB clean + if to_be_added_files or to_be_updated_files: + temp_files_dir = os.path.join( + upload.upload_dir, "files", v_next_version + ) + os.renames(temp_files_dir, version_dir) + + db.session.commit() + + # remove used chunks only after commit — chunks belong to the now-committed version + if to_be_added_files or to_be_updated_files: + chunks_ids = [] + for file in to_be_added_files + to_be_updated_files: + file_chunks = file.get("chunks", []) + chunks_ids.extend(file_chunks) + remove_transaction_chunks.delay(chunks_ids) + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}." + ) + project_version_created.send(pv) + push_finished.send(pv) except ( psycopg2.Error, - FileNotFoundError, + OSError, IntegrityError, - ObjectDeletedError, ) as err: db.session.rollback() - upload_deleted = isinstance(err, ObjectDeletedError) logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}: {str(err)}" ) @@ -401,9 +385,8 @@ def create_project_version(id): move_to_tmp(version_dir) raise finally: - # remove artifacts only if upload object is still valid - if not upload_deleted: - upload.clear() + # remove upload artifacts + upload.clear() result = ProjectSchemaV2().dump(project) result["files"] = ProjectFileSchema( diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 4eecca0c..da18f7db 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -132,7 +132,7 @@ def _role(self, obj): return role.value def _uploads(self, obj): - return [u.id for u in obj.project.uploads.all()] + return [u.transaction_id for u in obj.project.uploads.all()] def _permissions(self, obj): return project_user_permissions(obj.project) @@ -180,7 +180,7 @@ def _role(self, obj): return role.value def _uploads(self, obj): - return [u.id for u in obj.uploads.all()] + return [u.transaction_id for u in obj.uploads.all()] class Meta: model = Project diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index 9e89eb7c..48966457 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -57,53 +57,6 @@ def generate_checksum(file, chunk_size=4096): checksum.update(chunk) -class Toucher: - """ - Helper class to periodically update modification time of file during - execution of longer lasting task. - - Example of usage: - ----------------- - with Toucher(file, interval): - do_something_slow - - """ - - def __init__(self, lockfile, interval): - self.lockfile = lockfile - self.interval = interval - self.running = False - self.timer = None - - def __enter__(self): - self.acquire() - - def __exit__(self, type, value, tb): # pylint: disable=W0612,W0622 - self.release() - - def release(self): - self.running = False - if self.timer: - self.timer.cancel() - self.timer = None - - def acquire(self): - self.running = True - self.touch_lockfile() - - def touch_lockfile(self): - # do an NFS ACCESS procedure request to clear the attribute cache (for various pods to actually see the file) - # https://docs.aws.amazon.com/efs/latest/ug/troubleshooting-efs-general.html#custom-nfs-settings-write-delays - os.access(self.lockfile, os.W_OK) - with open(self.lockfile, "a"): - os.utime(self.lockfile, None) - - sleep(0) # to unblock greenlet - if self.running: - self.timer = Timer(self.interval, self.touch_lockfile) - self.timer.start() - - def is_qgis(path: str) -> bool: """ Check if file is a QGIS project file. diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index 044294c5..27aadb5b 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -114,8 +114,7 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - upload = Upload(diff_project, 10, [], mergin_user.id) - db.session.add(upload) + upload = Upload.create_upload(diff_project.id, 10, [], mergin_user.id) project_id = diff_project.id user = add_user("user", "user") access_request = AccessRequest(diff_project, user.id) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index d1981391..07d8fe77 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -404,7 +404,7 @@ def test_add_project(client, app, data, expected): "mergin", _get_changes_with_diff(test_project_dir) ) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # add TEMPLATES user and make him creator of test_project (to become template) @@ -508,7 +508,7 @@ def test_delete_project(client): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # try force delete for active project @@ -1119,7 +1119,7 @@ def test_push_to_new_project(client): assert resp.status_code == 200 upload_id = resp.json["transaction"] - upload = Upload.query.filter_by(id=upload_id).first() + upload = Upload.query.filter_by(transaction_id=upload_id).first() blacklisted_file = all( added["path"] != "test_dir/test4.txt" for added in upload.changes["added"] ) @@ -1210,6 +1210,52 @@ def test_push_integrity_error(client, app): assert failure.error_details == "No changes" +def test_stale_upload_takeover(client, app): + """Stale upload (last_ping expired) is atomically replaced by a new one. + + Verifies that: + - the new upload gets a fresh transaction_id + - the old upload directory is cleaned up + - a push_lost failure is recorded for the abandoned upload + """ + project = Project.query.filter_by( + name=test_project, workspace_id=test_workspace_id + ).first() + user = User.query.filter_by(username="mergin").first() + changes = _get_changes(test_project_dir) + changes["added"] = changes["removed"] = [] + + # create initial upload and record its identity + upload = Upload.create_upload(project.id, 1, changes, user.id) + old_tx_id = upload.transaction_id + old_upload_dir = upload.upload_dir + assert os.path.exists(old_upload_dir) + + # backdate last_ping to make the upload appear stale + db.session.execute( + db.text( + "UPDATE upload SET last_ping = NOW() - :expiry * INTERVAL '1 second' WHERE id = :id" + ), + { + "id": upload.id, + "expiry": client.application.config["LOCKFILE_EXPIRATION"] + 1, + }, + ) + db.session.commit() + + # takeover — should succeed and replace the stale upload + new_upload = Upload.create_upload(project.id, 1, changes, user.id) + assert new_upload is not None + assert new_upload.transaction_id != old_tx_id + assert os.path.exists(new_upload.upload_dir) + # old directory was moved away + assert not os.path.exists(old_upload_dir) + # push_lost was recorded for the abandoned upload + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + assert failure.error_type == "push_lost" + assert failure.error_details == "Push artefact removed by subsequent push" + + def test_exceed_data_limit(client): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id @@ -1288,13 +1334,8 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload = Upload(project, version, changes, user.id) - db.session.add(upload) - db.session.commit() - upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) - os.makedirs(upload_dir) - open(os.path.join(upload_dir, "lockfile"), "w").close() - return upload, upload_dir + upload = Upload.create_upload(project.id, version, changes, user.id) + return upload, upload.upload_dir def remove_transaction(transaction_id): @@ -1310,7 +1351,7 @@ def test_chunk_upload(client, app): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) chunk_id = upload.changes["added"][0]["chunks"][0] - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(test_project_dir, "test_dir", "test4.txt"), "rb") as file: data = file.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -1319,6 +1360,7 @@ def test_chunk_upload(client, app): resp = client.post(url, data=data, headers=headers) assert resp.status_code == 200 assert resp.json["checksum"] == checksum.hexdigest() + assert os.path.exists(os.path.join(upload_dir, "chunks", chunk_id)) # tests to send bigger chunk than allowed app.config["MAX_CHUNK_SIZE"] = 10 * CHUNK_SIZE @@ -1331,6 +1373,8 @@ def test_chunk_upload(client, app): failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() assert failure.error_type == "chunk_upload" assert failure.error_details == "Too big chunk" + # residual after upload was removed + assert not os.path.exists(os.path.join(upload_dir, "chunks", chunk_id)) # tests with transaction with no uploads expected changes = _get_changes(test_project_dir) @@ -1341,9 +1385,8 @@ def test_chunk_upload(client, app): resp2 = client.post(url, data=data, headers=headers) assert resp2.status_code == 404 assert SyncFailuresHistory.query.count() == 1 - - # cleanup - shutil.rmtree(upload_dir) + # we do not have any chunks, so parent dir was removed as well + assert not os.path.exists(os.path.join(upload_dir)) def upload_chunks(upload_dir, changes, src_dir=test_project_dir): @@ -1365,7 +1408,9 @@ def test_push_finish(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - resp = client.post(f"/v1/project/push/finish/{upload.id}", headers=json_headers) + resp = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", headers=json_headers + ) assert resp.status_code == 422 assert "corrupted_files" in resp.json["detail"].keys() assert not os.path.exists(os.path.join(upload_dir, "files", "test.txt")) @@ -1386,7 +1431,7 @@ def test_push_finish(client): chunks.append(chunk) resp2 = client.post( - f"/v1/project/push/finish/{upload.id}", + f"/v1/project/push/finish/{upload.transaction_id}", headers={**json_headers, "User-Agent": "Werkzeug"}, ) assert resp2.status_code == 200 @@ -1413,7 +1458,7 @@ def test_push_finish(client): db.session.commit() upload, upload_dir = create_transaction(user.username, changes) - url = "/v1/project/push/finish/{}".format(upload.id) + url = "/v1/project/push/finish/{}".format(upload.transaction_id) db.session.add(upload) db.session.commit() # still log in as mergin user @@ -1427,7 +1472,7 @@ def test_push_finish(client): def test_push_close(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/cancel/{}".format(upload.id) + url = "/v1/project/push/cancel/{}".format(upload.transaction_id) resp = client.post(url) assert resp.status_code == 200 @@ -1470,12 +1515,12 @@ def test_whole_push_process(client): assert resp.status_code == 200 assert "transaction" in resp.json.keys() - upload = Upload.query.get(resp.json["transaction"]) + upload = Upload.query.filter_by(transaction_id=resp.json["transaction"]).first() assert upload # assert we can get project info with active upload resp = client.get(f"/v1/project/{test_workspace_name}/{upload.project.name}") assert resp.status_code == 200 - assert upload.id in resp.json["uploads"] + assert str(upload.transaction_id) in resp.json["uploads"] assert ( client.get( f"/v1/project/{test_workspace_name}/{upload.project.name}?version=v1" @@ -1486,7 +1531,7 @@ def test_whole_push_process(client): # push upload: upload file chunks for file in changes["added"]: for chunk_id in file["chunks"]: - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(test_dir, file["path"]), "rb") as f: data = f.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -1498,7 +1543,7 @@ def test_whole_push_process(client): assert resp.json["checksum"] == checksum.hexdigest() # push finish: call server to concatenate chunks and finish upload - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 200 project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id @@ -1526,7 +1571,7 @@ def test_push_diff_finish(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # check there are not any changes between local modified file and server patched file (using geodiff) geodiff = GeoDiff() @@ -1550,7 +1595,7 @@ def test_push_diff_finish(client): upload, upload_dir = create_transaction("mergin", changes, 2) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert ( "GEODIFF ERROR: Nothing inserted (this should never happen)" @@ -1559,10 +1604,10 @@ def test_push_diff_finish(client): error = resp.json["detail"] # try again to make sure geodiff logs are related only to recent event - client.post("/v1/project/push/cancel/{}".format(upload.id)) + client.post("/v1/project/push/cancel/{}".format(upload.transaction_id)) upload, upload_dir = create_transaction("mergin", changes, 2) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert resp.json["detail"] == error @@ -1570,7 +1615,7 @@ def test_push_diff_finish(client): changes = _get_changes_with_diff_0_size(test_project_dir) upload, upload_dir = create_transaction("mergin", changes, 3) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 @@ -1596,7 +1641,7 @@ def test_push_no_diff_finish(client): } upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes, src_dir=working_dir) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # check diff file was generated by server, and it is in file history latest_version = upload.project.get_latest_version() @@ -1636,7 +1681,7 @@ def test_push_no_diff_finish(client): } upload, upload_dir = create_transaction("mergin", changes, version=2) upload_chunks(upload_dir, upload.changes, src_dir=working_dir) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 latest_version = upload.project.get_latest_version() assert all( @@ -1708,7 +1753,7 @@ def test_clone_project(client, data, username, expected): "mergin", _get_changes_with_diff(test_project_dir) ) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 endpoint = "/v1/project/clone/{}/{}".format(test_workspace_name, test_project) @@ -1850,7 +1895,7 @@ def test_optimize_storage(app, client, diff_project): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 200 assert os.path.exists(optimize_v4) @@ -2212,16 +2257,16 @@ def test_inactive_project(client, diff_project): upload, upload_dir = create_transaction("mergin", _get_changes(test_project_dir)) chunk_id = upload.changes["added"][0]["chunks"][0] resp = client.post( - f"/v1/project/push/chunk/{upload.id}/{chunk_id}", + f"/v1/project/push/chunk/{upload.transaction_id}/{chunk_id}", data=data, headers={"Content-Type": "application/octet-stream"}, ) assert resp.status_code == 404 - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 404 - resp = client.post(f"/v1/project/push/cancel/{upload.id}") + resp = client.post(f"/v1/project/push/cancel/{upload.transaction_id}") assert resp.status_code == 404 # delete project again @@ -2322,7 +2367,7 @@ def test_project_version_integrity(client): "__init__", side_effect=IntegrityError("Project version already exists", None, None), ): - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert "Failed to create new version" in resp.json["detail"] failure = SyncFailuresHistory.query.filter_by( @@ -2381,7 +2426,7 @@ def _get_user_agent(): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 @@ -2417,12 +2462,12 @@ def test_delete_diff_file(client): } upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - client.post(f"/v1/project/push/finish/{upload.id}") + client.post(f"/v1/project/push/finish/{upload.transaction_id}") changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes, version=2) upload_chunks(upload_dir, upload.changes) - client.post(f"/v1/project/push/finish/{upload.id}") + client.post(f"/v1/project/push/finish/{upload.transaction_id}") fh = FileHistory.query.filter_by( project_version_name=upload.project.latest_version, @@ -2568,12 +2613,12 @@ def test_supported_file_upload(client): headers=json_headers, ) assert resp.status_code == 200 - upload = Upload.query.get(resp.json["transaction"]) + upload = Upload.query.filter_by(transaction_id=resp.json["transaction"]).first() assert upload # Even chunks are correctly uploaded for file in changes["added"]: for chunk_id in file["chunks"]: - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(TMP_DIR, file["path"]), "rb") as f: data = f.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -2584,7 +2629,7 @@ def test_supported_file_upload(client): assert resp.status_code == 200 assert resp.json["checksum"] == checksum.hexdigest() # Unsupported file type is revealed when reconstructed from chunks - based on the mime type - and upload is refused - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 400 assert ( resp.json["detail"] @@ -2617,8 +2662,8 @@ def test_locked_project(client, diff_project): assert resp.headers["Content-Type"] == "application/problem+json" assert resp.json["code"] == "ProjectLocked" # to play safe push finish is also blocked - upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/finish/{}".format(upload.id) + upload, _ = create_transaction("mergin", changes) + url = "/v1/project/push/finish/{}".format(upload.transaction_id) resp = client.post(url, headers=json_headers) assert resp.status_code == 422 diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 7a87b1d0..56caa7ff 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1031,12 +1031,7 @@ def test_create_version_failures(client): data = {"version": "v1", "changes": _get_changes_without_added(test_project_dir)} # somebody else is syncing - upload = Upload(project, 1, _get_changes(test_project_dir), 1) - db.session.add(upload) - db.session.commit() - os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() - + upload = Upload.create_upload(project.id, 1, _get_changes(test_project_dir), 1) response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == 409 assert response.json["code"] == AnotherUploadRunning.code @@ -1073,16 +1068,6 @@ def test_create_version_failures(client): assert response.status_code == 422 assert response.json["code"] == UploadError.code - # try to finish the transaction which would fail on existing Upload integrity error, e.g. race conditions - with patch.object( - Upload, - "__init__", - side_effect=IntegrityError("Cannot insert upload", None, None), - ): - response = client.post(f"v2/projects/{project.id}/versions", json=data) - assert response.status_code == 409 - assert response.json["code"] == AnotherUploadRunning.code - # try to finish the transaction which would fail on unexpected integrity error # patch of ChangesSchema is just a workaround to trigger and error with patch.object( @@ -1094,46 +1079,6 @@ def test_create_version_failures(client): assert response.status_code == 409 -def test_create_version_object_deleted_error(client): - """Test that ObjectDeletedError during push returns 422 without secondary exception""" - project = Project.query.filter_by( - workspace_id=test_workspace_id, name=test_project - ).first() - - data = { - "version": "v1", - "changes": { - "added": [], - "removed": [ - file_info(test_project_dir, "base.gpkg"), - ], - "updated": [], - }, - } - - # Create a real ObjectDeletedError by using internal SQLAlchemy state - def raise_object_deleted(*args, **kwargs): - # Create a minimal state-like object that ObjectDeletedError can use - class FakeState: - class_ = Upload - - def obj(self): - return None - - raise ObjectDeletedError(FakeState()) - - with patch.object( - ProjectVersion, - "__init__", - side_effect=raise_object_deleted, - ): - response = client.post(f"v2/projects/{project.id}/versions", json=data) - - # Should return 422 UploadError, not 500 from secondary exception - assert response.status_code == 422 - assert response.json["code"] == UploadError.code - - def test_upload_chunk(client): """Test pushing a chunk to a project""" project = Project.query.filter_by( diff --git a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py new file mode 100644 index 00000000..d53d4440 --- /dev/null +++ b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py @@ -0,0 +1,29 @@ +"""Add last_ping to upload + +Revision ID: e3a7f2b1c94d +Revises: e3f1a9b2c4d6 +Create Date: 2026-04-14 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "e3a7f2b1c94d" +down_revision = "e3f1a9b2c4d6" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("upload", sa.Column("last_ping", sa.DateTime(), nullable=True)) + # backfill existing rows before adding NOT NULL constraint + op.execute("UPDATE upload SET last_ping = NOW() WHERE last_ping IS NULL") + op.alter_column("upload", "last_ping", nullable=False) + + +def downgrade(): + # drop the column but required lockfiles will be missing - make sure all uploads are gone + op.drop_column("upload", "last_ping") diff --git a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py new file mode 100644 index 00000000..d3797b8b --- /dev/null +++ b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py @@ -0,0 +1,36 @@ +"""Add transaction_id to upload + +Revision ID: f1d9e4a7b823 +Revises: e3a7f2b1c94d +Create Date: 2026-04-14 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID + + +# revision identifiers, used by Alembic. +revision = "f1d9e4a7b823" +down_revision = "e3a7f2b1c94d" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "upload", sa.Column("transaction_id", UUID(as_uuid=True), nullable=True) + ) + # backfill existing rows before adding NOT NULL constraint + op.execute("UPDATE upload SET transaction_id = id WHERE transaction_id IS NULL") + op.alter_column("upload", "transaction_id", nullable=False) + op.create_index( + op.f("ix_upload_transaction_id"), "upload", ["transaction_id"], unique=True + ) + + +def downgrade(): + op.drop_index(op.f("ix_upload_transaction_id"), table_name="upload") + # column is dropped but there could be orphan transaction folders, make sure upload table is empty + op.drop_column("upload", "transaction_id")