Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 167 additions & 16 deletions server/mergin/sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
#
# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial
from __future__ import annotations
from contextlib import contextmanager
import json
import logging
import os
import threading
import time
import uuid
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Optional, List, Dict, Set, Tuple
from dataclasses import dataclass, asdict
Expand All @@ -17,11 +19,11 @@
from flask_login import current_user
from pygeodiff import GeoDiff
from sqlalchemy import text, null, desc, nullslast, tuple_
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert
from sqlalchemy.types import String
from sqlalchemy.ext.hybrid import hybrid_property
from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError
from flask import current_app
from flask import Flask, current_app

from .files import (
DeltaChangeMerged,
Expand All @@ -44,7 +46,6 @@
LOG_BASE,
Checkpoint,
generate_checksum,
Toucher,
get_chunk_location,
get_project_path,
is_supported_type,
Expand Down Expand Up @@ -1805,6 +1806,11 @@ class Upload(db.Model):
db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True
)
created = db.Column(db.DateTime, default=datetime.utcnow)
# last ping time to determine if upload is still active
last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
transaction_id = db.Column(
UUID(as_uuid=True), unique=True, nullable=False, index=True
)

user = db.relationship("User")
project = db.relationship(
Expand All @@ -1822,28 +1828,173 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int):
self.version = version
self.changes = ChangesSchema().dump(changes)
self.user_id = user_id
self.transaction_id = str(uuid.uuid4())

@classmethod
def create_upload(
cls, project_id: str, version: int, changes: dict, user_id: int
) -> Upload | None:
"""Create upload session, it can either create a new record or handover an existing one but with new transaction id
Old transaction folder is removed and new one is created.
"""
now = datetime.now(timezone.utc).replace(tzinfo=None)
expiration = current_app.config["LOCKFILE_EXPIRATION"]
new_tx_id = str(uuid.uuid4())

# CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot)
# NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload
existing_cte = (
db.select(Upload.transaction_id)
.where(
Upload.project_id == project_id,
Upload.version == version,
)
.cte("existing")
)

stmt = (
insert(Upload)
.values(
id=str(uuid.uuid4()),
transaction_id=new_tx_id,
project_id=project_id,
version=version,
user_id=user_id,
last_ping=now,
changes=ChangesSchema().dump(changes),
)
.add_cte(existing_cte)
)

upsert_stmt = stmt.on_conflict_do_update(
constraint="uq_upload_project_id",
set_={
"transaction_id": new_tx_id,
"user_id": user_id,
"last_ping": now,
"changes": ChangesSchema().dump(changes),
},
# ONLY update if the existing row is stale
where=(Upload.last_ping < (now - timedelta(seconds=expiration))),
)

upsert_stmt = upsert_stmt.returning(
Upload,
db.select(existing_cte.c.transaction_id)
.scalar_subquery()
.label("old_transaction_id"),
)

result = db.session.execute(upsert_stmt).fetchone()
db.session.commit()

# if nothing returned, it means the WHERE clause failed (active upload)
if not result:
return

upload = result.Upload
old_transaction_id = result.old_transaction_id

try:
os.makedirs(upload.upload_dir)

# old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload
if old_transaction_id:
upload.project.sync_failed(
"", "push_lost", "Push artefact removed by subsequent push", user_id
)
if os.path.exists(
os.path.join(
upload.project.storage.project_dir,
"tmp",
str(old_transaction_id),
)
):
move_to_tmp(
os.path.join(
upload.project.storage.project_dir,
"tmp",
str(old_transaction_id),
),
str(old_transaction_id),
)
except OSError as err:
# filesystem setup failed after the DB row was already committed.
# delete the row immediately so the next attempt isn't blocked until expiration.
db.session.delete(upload)
db.session.commit()
logging.error(f"Failed to create upload directory: {err}")
return

Comment thread
varmar05 marked this conversation as resolved.
return upload

@property
def upload_dir(self):
return os.path.join(self.project.storage.project_dir, "tmp", self.id)
return os.path.join(
self.project.storage.project_dir, "tmp", str(self.transaction_id)
)

@property
def lockfile(self):
return os.path.join(self.upload_dir, "lockfile")

def is_active(self):
"""Check if upload is still active because there was a ping (lockfile update) from underlying process"""
return os.path.exists(self.lockfile) and (
time.time() - os.path.getmtime(self.lockfile)
< current_app.config["LOCKFILE_EXPIRATION"]
def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int):
"""
Background task: Runs as a Thread, it is compatible with Sync (direct) or Gevent (monkey-patch) worker type.
Uses a fresh engine connection to stay pool-efficient.
"""
# manual context push is required for background execution
with app.app_context():
while not stop_event.is_set():
try:
# db.engine.begin() is efficient and isolated, it immediately returns a connection to the pool
with db.engine.begin() as conn:
conn.execute(
db.text(
"UPDATE upload SET last_ping = NOW() WHERE id = :id"
),
{"id": self.id},
)
except Exception as e:
logging.exception(
f"Upload heartbeat failed for ID {self.project_id} and version {self.version}: {e}"
)

# wait for x seconds, but wake up immediately if stop_event is set
stop_event.wait(timeout)

@contextmanager
def heartbeat(self, timeout: int = 5):
"""
Context manager to be used inside a Flask route.

Example of usage:
-----------------
with upload.heartbeat(interval):
do_something_slow
"""
# we need to pass a real Flask app object to the thread
app = current_app._get_current_object()
stop_event = threading.Event()

bg = threading.Thread(
target=self._heartbeat_task, args=(app, stop_event, timeout), daemon=True
)

bg.start()
try:
yield
finally:
# signal the loop to stop
stop_event.set()

# wait for the task to finish its last SQL call.
# in Gevent, this yields to other requests (non-blocking), while in Sync, this blocks the current thread for up to 2s
# this is to protect main thread / greenlet from zombie bg processes
bg.join(timeout=2)

def clear(self):
"""Clean up pending upload.
Uploaded files and table records are removed, and another upload can start.
"""
try:
move_to_tmp(self.upload_dir, self.id)
move_to_tmp(self.upload_dir, str(self.transaction_id))
db.session.delete(self)
db.session.commit()
except Exception:
Expand All @@ -1864,7 +2015,7 @@ def process_chunks(
to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE]
current_files = [f for f in self.project.files if f.path not in to_remove]

with Toucher(self.lockfile, 5):
with self.heartbeat(5):
for f in file_changes:
if f.change == PushChangeType.DELETE:
continue
Expand Down
9 changes: 5 additions & 4 deletions server/mergin/sync/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,18 @@ def check_project_permissions(
return None


def get_upload(transaction_id):
upload = Upload.query.get_or_404(transaction_id)
def get_upload_or_fail(transaction_id: str) -> Upload:
if not is_valid_uuid(transaction_id):
abort(404)
upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404()
# upload to 'removed' projects is forbidden
if upload.project.removed_at:
abort(404)

if upload.user_id != current_user.id:
abort(403, "You do not have permissions for ongoing upload")

upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", transaction_id)
return upload, upload_dir
return upload


def projects_query(permission, as_admin=True, public=True):
Expand Down
2 changes: 1 addition & 1 deletion server/mergin/sync/public_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ paths:
- do integrity check comparing uploaded file sizes with what was expected
- move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset)
- bump up version in database
- remove artifacts (chunks, lockfile) by moving them to tmp directory"
- remove artifacts (chunks) by moving them to tmp directory"
operationId: push_finish
parameters:
- name: transaction_id
Expand Down
Loading
Loading