From 2ce07e5800b158a116d9d2e4104da0dc1e705251 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 1 Apr 2026 10:28:09 +0200 Subject: [PATCH 1/3] Make throttle optional --- packages/sqlite_async/CHANGELOG.md | 2 + .../lib/src/sqlite_connection.dart | 39 ++++++++++++-- .../lib/src/update_notification.dart | 21 +++++--- .../test/update_notification_test.dart | 53 +++++++++++++++++++ 4 files changed, 104 insertions(+), 11 deletions(-) diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index be27193..0f01c1a 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -15,6 +15,8 @@ - Instead of `package:sqlite_async/sqlite3_wasm.dart`, import `package:sqlite3/wasm.dart`. - Instead of `package:sqlite_async/sqlite3_web.dart`, import `package:sqlite3_web/sqlite3_web.dart`. - __Breaking__: Remove `SqliteDatabaseMixin` and `SqliteQueries`. Extend `SqliteConnection` instead. +- The `throttle` parameter on `watch` and `onChange` can now be set to `null`. This also introduces `watchUnthrottled` + and `onChangeUnthrottled`, which only buffer on paused subscriptions instead of applying a static timeout. ## 0.13.1 diff --git a/packages/sqlite_async/lib/src/sqlite_connection.dart b/packages/sqlite_async/lib/src/sqlite_connection.dart index 05c7533..b38e29a 100644 --- a/packages/sqlite_async/lib/src/sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/sqlite_connection.dart @@ -173,8 +173,14 @@ abstract class SqliteConnection implements SqliteWriteContext { /// /// This is preferred over [watch] when multiple queries need to be performed /// together when data is changed. + /// + /// While the stream can efficiently handle backpressure (paused subscriptions + /// will not be notified multiple times afer resuming, they'll receive a + /// single aggregated event instead), it also installs a throttle of 30 during + /// which the stream will be paused automtically. To disable this behavior, + /// pass `null` for `throttle` or se [onChangeUnthrottled] explicitly. Stream onChange(Iterable? tables, - {Duration throttle = const Duration(milliseconds: 30), + {Duration? throttle = const Duration(milliseconds: 30), bool triggerImmediately = true}) { final filteredStream = tables != null ? updates.transform(UpdateNotification.filterTablesTransformer(tables)) @@ -185,15 +191,28 @@ abstract class SqliteConnection implements SqliteWriteContext { return throttledStream; } + /// Like [onChange], but without a defaut throttle duration. + /// + /// The stream still respects backpressure for paused subscriptions with an + /// efficient internal buffer of update notifications. + Stream onChangeUnthrottled(Iterable? tables, + {bool triggerImmediately = true}) { + return onChange(tables, + triggerImmediately: triggerImmediately, throttle: null); + } + /// Execute a read query every time the source tables are modified. /// - /// Use [throttle] to specify the minimum interval between queries. + /// Use [throttle] to specify the minimum interval between queries. It can + /// also be set to `null`, in which case the stream will only be throttled + /// when its subscription is paused. [watchUnthrottled] can also be used to + /// make that more explicit. /// /// Source tables are automatically detected using `EXPLAIN QUERY PLAN`. Stream watch( String sql, { List parameters = const [], - Duration throttle = const Duration(milliseconds: 30), + Duration? throttle = const Duration(milliseconds: 30), Iterable? triggerOnTables, }) { Stream watchInner(Iterable trigger) { @@ -212,6 +231,20 @@ abstract class SqliteConnection implements SqliteWriteContext { } } + /// Like [watch], but without a default throttle. + Stream watchUnthrottled( + String sql, { + List parameters = const [], + Iterable? triggerOnTables, + }) { + return watch( + sql, + parameters: parameters, + triggerOnTables: triggerOnTables, + throttle: null, + ); + } + /// Takes a read lock, without starting a transaction. /// /// The lock only applies to a single [SqliteConnection], and multiple diff --git a/packages/sqlite_async/lib/src/update_notification.dart b/packages/sqlite_async/lib/src/update_notification.dart index 21f3541..b43010d 100644 --- a/packages/sqlite_async/lib/src/update_notification.dart +++ b/packages/sqlite_async/lib/src/update_notification.dart @@ -50,7 +50,7 @@ class UpdateNotification { /// /// Use [addOne] to immediately send one update to the output stream. static Stream throttleStream( - Stream input, Duration timeout, + Stream input, Duration? timeout, {UpdateNotification? addOne}) { return _throttleStream( input: input, @@ -70,8 +70,11 @@ class UpdateNotification { } } -/// Throttles anĀ [input] stream to not emit events more often than with a -/// frequency of 1/[timeout]. +/// Throttles stream with by efficiently buffering upstream events while a +/// downstream subscription is paused. +/// +/// Additionally, if a [timeout] is set, the [input] stream is throttled to not +/// emit events more often than with a frequency of 1/[timeout]. /// /// When an event is received and no timeout window is active, it is forwarded /// downstream and a timeout window is started. For events received within a @@ -89,7 +92,7 @@ class UpdateNotification { /// earlier than after [timeout]). Stream _throttleStream({ required Stream input, - required Duration timeout, + required Duration? timeout, required bool throttleFirst, required T Function(T, T) add, required T? addOne, @@ -135,10 +138,12 @@ Stream _throttleStream({ } setTimeout = () { - activeTimeoutWindow = Timer(timeout, () { - activeTimeoutWindow = null; - maybeEmit(); - }); + if (timeout != null) { + activeTimeoutWindow = Timer(timeout, () { + activeTimeoutWindow = null; + maybeEmit(); + }); + } }; void onData(T data) { diff --git a/packages/sqlite_async/test/update_notification_test.dart b/packages/sqlite_async/test/update_notification_test.dart index 05c94c6..e6f67b8 100644 --- a/packages/sqlite_async/test/update_notification_test.dart +++ b/packages/sqlite_async/test/update_notification_test.dart @@ -187,6 +187,59 @@ void main() { expect(control.pendingTimers, isEmpty); }); }); + + group('without timeout', () { + test('can forward notifications unchanged', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final a = UpdateNotification({'a'}); + final b = UpdateNotification({'b'}); + final c = UpdateNotification({'c'}); + + final received = []; + UpdateNotification.throttleStream(source.stream, null) + .listen(received.add); + expect(received, isEmpty); + + source.add(a); + expect(identical(received.last, a), isTrue); + source.add(b); + expect(identical(received.last, b), isTrue); + source.add(c); + expect(identical(received.last, c), isTrue); + }); + }); + + test('can accumulate results while listener is paused', () { + fakeAsync((control) { + final source = StreamController(sync: true); + final received = []; + final subscription = + UpdateNotification.throttleStream(source.stream, null) + .listen(received.add); + expect(source.hasListener, isTrue); + + subscription.pause(); + // We should keep the source subscription active since we want to + // buffer update notifications in throttleStream. + expect(source.isPaused, isFalse); + + source.add(UpdateNotification({'a'})); + source.add(UpdateNotification({'a', 'b'})); + source.add(UpdateNotification({'a'})); + + control.flushTimers(); + expect(received, isEmpty); + + subscription.resume(); + expect(received, isEmpty); + control.flushMicrotasks(); + expect(received, [ + UpdateNotification({'a', 'b'}) + ]); + }); + }); + }); }); test('filter tables', () async { From 335e1bc535c57a69f93a6eac508952f22da73307 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 1 Apr 2026 11:32:08 +0200 Subject: [PATCH 2/3] Add abortable lock methods --- .github/workflows/test.yaml | 3 +- packages/sqlite_async/CHANGELOG.md | 5 +- packages/sqlite_async/lib/sqlite_async.dart | 1 + .../connection/sync_sqlite_connection.dart | 16 +++--- .../sqlite_async/lib/src/common/mutex.dart | 49 ++++++++++++------- .../sqlite_async/lib/src/common/timeouts.dart | 20 ++++++++ .../src/impl/single_connection_database.dart | 20 +++++--- .../database/native_sqlite_database.dart | 35 ++++++++----- .../lib/src/sqlite_connection.dart | 44 ++++++++++++++++- .../sqlite_async/lib/src/web/connection.dart | 7 +++ .../sqlite_async/lib/src/web/database.dart | 46 ++++++++++++----- .../src/web/database/async_web_database.dart | 23 +++++++-- .../sqlite_async/lib/src/web/web_mutex.dart | 44 ++++++++++------- packages/sqlite_async/test/basic_test.dart | 28 +++++++++-- packages/sqlite_async/test/mutex_test.dart | 17 ++++--- .../sqlite_async/test/native/basic_test.dart | 3 +- .../test/utils/test_utils_impl.dart | 9 ++++ 17 files changed, 272 insertions(+), 98 deletions(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index df88fdd..50e4d2b 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -85,9 +85,8 @@ jobs: enableCrossOsArchive: true - name: "Get dependencies" - # Passing --offline here asserts that dependencies have been cached properly. run: | - dart pub get --offline + dart pub get dart pub global activate pana - name: Setup diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 0f01c1a..4435384 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -15,9 +15,12 @@ - Instead of `package:sqlite_async/sqlite3_wasm.dart`, import `package:sqlite3/wasm.dart`. - Instead of `package:sqlite_async/sqlite3_web.dart`, import `package:sqlite3_web/sqlite3_web.dart`. - __Breaking__: Remove `SqliteDatabaseMixin` and `SqliteQueries`. Extend `SqliteConnection` instead. +- __Breaking__: Calls locking the database that file with a timeout now throw an `AbortException` instead of a + `TimeoutException`. - The `throttle` parameter on `watch` and `onChange` can now be set to `null`. This also introduces `watchUnthrottled` and `onChangeUnthrottled`, which only buffer on paused subscriptions instead of applying a static timeout. - +- Add `abortableReadLock` and `abortableWriteLock`. The methods can be used to acquire a read/write context with a flexible + abort signal instead of a static timeout. ## 0.13.1 diff --git a/packages/sqlite_async/lib/sqlite_async.dart b/packages/sqlite_async/lib/sqlite_async.dart index c378aa9..7f280b2 100644 --- a/packages/sqlite_async/lib/sqlite_async.dart +++ b/packages/sqlite_async/lib/sqlite_async.dart @@ -7,6 +7,7 @@ export 'src/common/abstract_open_factory.dart' hide InternalOpenFactory; export 'src/common/connection/sync_sqlite_connection.dart'; export 'src/common/mutex.dart'; export 'src/common/sqlite_database.dart' hide SqliteDatabaseImpl; +export 'src/common/timeouts.dart' show AbortException; export 'src/sqlite_connection.dart'; export 'src/sqlite_migrations.dart'; export 'src/sqlite_options.dart'; diff --git a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart index 4dcaeb1..7d04fbf 100644 --- a/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/common/connection/sync_sqlite_connection.dart @@ -36,8 +36,10 @@ final class SyncSqliteConnection extends SqliteConnection { } @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) { + Future abortableReadLock( + Future Function(SqliteReadContext tx) callback, + {Future? abortTrigger, + String? debugContext}) { final task = profileQueries ? TimelineTask() : null; task?.start('${profilerPrefix}mutex_lock'); @@ -49,13 +51,15 @@ final class SyncSqliteConnection extends SqliteConnection { callback, ); }, - timeout: lockTimeout, + abortTrigger: abortTrigger, ); } @override - Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, String? debugContext}) { + Future abortableWriteLock( + Future Function(SqliteWriteContext tx) callback, + {Future? abortTrigger, + String? debugContext}) { final task = profileQueries ? TimelineTask() : null; task?.start('${profilerPrefix}mutex_lock'); @@ -67,7 +71,7 @@ final class SyncSqliteConnection extends SqliteConnection { callback, ); }, - timeout: lockTimeout, + abortTrigger: abortTrigger, ); } diff --git a/packages/sqlite_async/lib/src/common/mutex.dart b/packages/sqlite_async/lib/src/common/mutex.dart index c93a917..ecbc93f 100644 --- a/packages/sqlite_async/lib/src/common/mutex.dart +++ b/packages/sqlite_async/lib/src/common/mutex.dart @@ -1,13 +1,19 @@ import 'dart:async'; +import 'timeouts.dart'; + /// An asynchronous mutex. abstract interface class Mutex { /// Creates a simple mutex instance that can't be shared between tabs or /// isolates. factory Mutex.simple() = _SimpleMutex; - /// timeout is a timeout for acquiring the lock, not for the callback - Future lock(Future Function() callback, {Duration? timeout}); + /// Runs [callback] in a critical section. + /// + /// If [abortTrigger] completes before the critical section was entered, an + /// [AbortException] is thrown and [callback] will not be invoked. + Future lock(Future Function() callback, + {Future? abortTrigger}); } class LockError extends Error { @@ -24,14 +30,15 @@ class LockError extends Error { /// Mutex maintains a queue of Future-returning functions that are executed /// sequentially. final class _SimpleMutex implements Mutex { - Future? last; + Future? last; // Hack to make sure the Mutex is not copied to another isolate. // ignore: unused_field final Finalizer _f = Finalizer((_) {}); @override - Future lock(Future Function() callback, {Duration? timeout}) async { + Future lock(Future Function() callback, + {Future? abortTrigger}) async { if (Zone.current[this] != null) { throw LockError('Recursive lock is not allowed'); } @@ -39,25 +46,29 @@ final class _SimpleMutex implements Mutex { return zone.run(() async { final prev = last; + var previousDidComplete = false; + final completer = Completer.sync(); last = completer.future; try { // If there is a previous running block, wait for it if (prev != null) { - if (timeout != null) { - // This could throw a timeout error - try { - await prev.timeout(timeout); - } catch (error) { - if (error is TimeoutException) { - throw TimeoutException('Failed to acquire lock', timeout); - } else { - rethrow; + final prevOrAbort = Completer.sync(); + + prev.then((_) { + previousDidComplete = true; + if (!prevOrAbort.isCompleted) prevOrAbort.complete(); + }); + if (abortTrigger != null) { + abortTrigger.whenComplete(() { + if (!prevOrAbort.isCompleted) { + prevOrAbort.completeError( + AbortException('lock'), StackTrace.current); } - } - } else { - await prev; + }); } + + await prevOrAbort.future; } // Run the function and return the result @@ -66,7 +77,7 @@ final class _SimpleMutex implements Mutex { // Cleanup // waiting for the previous task to be done in case of timeout void complete() { - // Only mark it unlocked when the last one complete + // Only mark it unlocked when the last one completes if (identical(last, completer.future)) { last = null; } @@ -75,8 +86,8 @@ final class _SimpleMutex implements Mutex { // In case of timeout, wait for the previous one to complete too // before marking this task as complete - if (prev != null && timeout != null) { - // But we still returns immediately + if (prev != null && !previousDidComplete) { + // But we still return immediately prev.then((_) { complete(); }).ignore(); diff --git a/packages/sqlite_async/lib/src/common/timeouts.dart b/packages/sqlite_async/lib/src/common/timeouts.dart index 8bdeec5..9bb0632 100644 --- a/packages/sqlite_async/lib/src/common/timeouts.dart +++ b/packages/sqlite_async/lib/src/common/timeouts.dart @@ -1,4 +1,24 @@ +/// @docImport '../sqlite_connection.dart'; +library; + +import 'package:meta/meta.dart'; + +@internal extension TimeoutDurationToFuture on Duration { /// Returns a future that completes with `void` after this duration. Future get asTimeout => Future.delayed(this); } + +/// An exception thrown when calls to [SqliteConnection.readLock], +/// [SqliteConnection.writeLock] and similar methods are aborted or time out +/// before a connection could be obtained from the pool. +final class AbortException implements Exception { + final String _methodName; + + AbortException(this._methodName); + + @override + String toString() { + return 'A call to $_methodName has been aborted'; + } +} diff --git a/packages/sqlite_async/lib/src/impl/single_connection_database.dart b/packages/sqlite_async/lib/src/impl/single_connection_database.dart index b28074f..2bb0328 100644 --- a/packages/sqlite_async/lib/src/impl/single_connection_database.dart +++ b/packages/sqlite_async/lib/src/impl/single_connection_database.dart @@ -32,20 +32,24 @@ final class SingleConnectionDatabase extends SqliteDatabaseImpl { SqliteOpenFactory get openFactory => throw UnimplementedError(); @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) { - return connection.readLock(callback, - lockTimeout: lockTimeout, debugContext: debugContext); + Future abortableReadLock( + Future Function(SqliteReadContext tx) callback, + {Future? abortTrigger, + String? debugContext}) { + return connection.abortableReadLock(callback, + abortTrigger: abortTrigger, debugContext: debugContext); } @override Stream get updates => connection.updates; @override - Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, String? debugContext}) { - return connection.writeLock(callback, - lockTimeout: lockTimeout, debugContext: debugContext); + Future abortableWriteLock( + Future Function(SqliteWriteContext tx) callback, + {Future? abortTrigger, + String? debugContext}) { + return connection.abortableWriteLock(callback, + abortTrigger: abortTrigger, debugContext: debugContext); } @override diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart index 4411fa6..1db701b 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_database.dart @@ -99,7 +99,7 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl { {Duration? lockTimeout}) async { return _useConnection( writer: false, - lockTimeout: lockTimeout, + abortTrigger: lockTimeout?.asTimeout, debugContext: 'readTransaction', (context) { return _transactionInLease(context, callback); @@ -120,7 +120,7 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl { {Duration? lockTimeout}) { return _useConnection( writer: true, - lockTimeout: lockTimeout, + abortTrigger: lockTimeout?.asTimeout, debugContext: 'writeTransaction', (context) { return _transactionInLease(context, callback); @@ -137,23 +137,27 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl { } @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) async { + Future abortableReadLock( + Future Function(SqliteReadContext tx) callback, + {Future? abortTrigger, + String? debugContext}) async { return _useConnection( writer: false, debugContext: debugContext ?? 'readLock', - lockTimeout: lockTimeout, + abortTrigger: abortTrigger, (context) => ScopedReadContext.assumeReadLock(context, callback), ); } @override - Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, String? debugContext}) async { + Future abortableWriteLock( + Future Function(SqliteWriteContext tx) callback, + {Future? abortTrigger, + String? debugContext}) async { return _useConnection( writer: true, debugContext: debugContext ?? 'writeLock', - lockTimeout: lockTimeout, + abortTrigger: abortTrigger, (context) => ScopedWriteContext.assumeWriteLock(context, callback), ); } @@ -162,14 +166,14 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl { Future Function(_LeasedContext context) callback, { required bool writer, required String debugContext, - Duration? lockTimeout, + Future? abortTrigger, }) { - final timeout = lockTimeout?.asTimeout; return _runInLockContext(debugContext, () async { final pool = await _pool; final connection = await (writer - ? pool.writer(abortSignal: timeout) - : pool.reader(abortSignal: timeout)); + ? pool.writer(abortSignal: abortTrigger) + : pool.reader(abortSignal: abortTrigger)) + .translateAbortExceptions(debugContext); try { final context = _LeasedContext( @@ -447,3 +451,10 @@ final class _LeasedContext extends UnscopedContext { return (db) => db.execute(sql); } } + +extension on Future { + Future translateAbortExceptions(String debugContext) { + return onError( + (e, s) => Error.throwWithStackTrace(AbortException(debugContext), s)); + } +} diff --git a/packages/sqlite_async/lib/src/sqlite_connection.dart b/packages/sqlite_async/lib/src/sqlite_connection.dart index b38e29a..fa6801d 100644 --- a/packages/sqlite_async/lib/src/sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/sqlite_connection.dart @@ -1,4 +1,5 @@ /// @docImport 'common/sqlite_database.dart'; +/// @docImport 'common/timeouts.dart'; library; import 'dart:async'; @@ -9,6 +10,7 @@ import 'package:sqlite_async/src/update_notification.dart'; import 'common/connection/sync_sqlite_connection.dart'; import 'common/mutex.dart'; +import 'common/timeouts.dart'; import 'utils/shared_utils.dart'; /// Abstract class representing calls available in a read-only or read-write context. @@ -251,8 +253,27 @@ abstract class SqliteConnection implements SqliteWriteContext { /// connections may hold read locks at the same time. /// /// In most cases, [readTransaction] should be used instead. + /// + /// If a timeout is set and no read connection becomes available in time, an + /// [AbortException] will be thrown. Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}); + {Duration? lockTimeout, String? debugContext}) { + return abortableReadLock(callback, + debugContext: debugContext, abortTrigger: lockTimeout?.asTimeout); + } + + /// Takes a read lock, without starting a transaction. + /// + /// The lock only applies to a single [SqliteConnection], and multiple + /// connections may hold read locks at the same time. + /// + /// If [abortTrigger] is set and completes before the database was able to + /// obtain a read lock, an [AbortException] will be thrown. + Future abortableReadLock( + Future Function(SqliteReadContext tx) callback, { + Future? abortTrigger, + String? debugContext, + }); /// Takes a global lock, without starting a transaction. /// @@ -261,8 +282,27 @@ abstract class SqliteConnection implements SqliteWriteContext { /// The lock applies to all [SqliteConnection] instances for a [SqliteDatabase]. /// Locks for separate [SqliteDatabase] instances on the same database file /// may be held concurrently. + /// + /// If a timeout is set and no read connection becomes available in time, an + /// [AbortException] will be thrown. Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, String? debugContext}); + {Duration? lockTimeout, String? debugContext}) { + return abortableWriteLock(callback, + debugContext: debugContext, abortTrigger: lockTimeout?.asTimeout); + } + + /// Takes a global lock, without starting a transaction. + /// + /// The lock applies to all [SqliteConnection] instances for a [SqliteDatabase]. + /// Locks for separate [SqliteDatabase] instances on the same database file + /// may be held concurrently. + /// + /// If [abortTrigger] is set and completes before the database was able to + /// obtain a read lock, an [AbortException] will be thrown. + Future abortableWriteLock( + Future Function(SqliteWriteContext tx) callback, + {Future? abortTrigger, + String? debugContext}); Future close(); diff --git a/packages/sqlite_async/lib/src/web/connection.dart b/packages/sqlite_async/lib/src/web/connection.dart index 99da467..475962e 100644 --- a/packages/sqlite_async/lib/src/web/connection.dart +++ b/packages/sqlite_async/lib/src/web/connection.dart @@ -79,6 +79,13 @@ abstract class WebSqliteConnection implements SqliteConnection { Future writeLock(Future Function(SqliteWriteContext tx) callback, {Duration? lockTimeout, String? debugContext, bool? flush}); + @override + Future abortableWriteLock( + Future Function(SqliteWriteContext tx) callback, + {Future? abortTrigger, + String? debugContext, + bool? flush}); + /// Same as [SqliteConnection.writeTransaction]. /// /// Has an additional [flush] (defaults to true). This can be set to false diff --git a/packages/sqlite_async/lib/src/web/database.dart b/packages/sqlite_async/lib/src/web/database.dart index 1f8e0b3..0f81732 100644 --- a/packages/sqlite_async/lib/src/web/database.dart +++ b/packages/sqlite_async/lib/src/web/database.dart @@ -4,7 +4,8 @@ import 'dart:js_interop'; import 'package:meta/meta.dart'; import 'package:sqlite3/common.dart'; -import 'package:sqlite3_web/sqlite3_web.dart'; +import 'package:sqlite3_web/sqlite3_web.dart' hide AbortException; +import 'package:sqlite3_web/sqlite3_web.dart' as sqlite_web; import 'package:sqlite_async/sqlite_async.dart'; import 'package:sqlite_async/src/utils/profiler.dart'; import 'package:sqlite_async/src/web/database/broadcast_updates.dart'; @@ -89,8 +90,10 @@ final class WebDatabase extends SqliteDatabaseImpl } @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) async { + Future abortableReadLock( + Future Function(SqliteReadContext tx) callback, + {Future? abortTrigger, + String? debugContext}) { // Since there is only a single physical connection per database on the web, // we can't enable concurrent readers to a writer. Even supporting multiple // readers alone isn't safe, since these readers could start read @@ -98,7 +101,7 @@ final class WebDatabase extends SqliteDatabaseImpl // `COMMIT` statements if they were to start their own transactions. return _lockInternal( (unscoped) => ScopedReadContext.assumeReadLock(unscoped, callback), - lockTimeout: lockTimeout, + abortTrigger: abortTrigger, debugContext: debugContext, flush: false, ); @@ -119,31 +122,45 @@ final class WebDatabase extends SqliteDatabaseImpl ); }, flush: flush ?? true, - lockTimeout: lockTimeout, + abortTrigger: lockTimeout?.asTimeout, ); } @override Future writeLock(Future Function(SqliteWriteContext tx) callback, - {Duration? lockTimeout, String? debugContext, bool? flush}) async { + {Duration? lockTimeout, String? debugContext, bool? flush}) { + return abortableWriteLock( + callback, + abortTrigger: lockTimeout?.asTimeout, + debugContext: debugContext, + flush: flush, + ); + } + + @override + Future abortableWriteLock( + Future Function(SqliteWriteContext tx) callback, + {Future? abortTrigger, + String? debugContext, + bool? flush}) async { return await _lockInternal( (unscoped) { return ScopedWriteContext.assumeWriteLock(unscoped, callback); }, flush: flush ?? true, debugContext: debugContext, - lockTimeout: lockTimeout, + abortTrigger: abortTrigger, ); } Future _lockInternal( Future Function(_UnscopedContext) callback, { required bool flush, - Duration? lockTimeout, + Future? abortTrigger, String? debugContext, }) async { if (_mutex case var mutex?) { - return await mutex.lock(timeout: lockTimeout, () async { + return await mutex.lock(abortTrigger: abortTrigger, () async { final context = _UnscopedContext(this, null); try { return await callback(context); @@ -154,7 +171,7 @@ final class WebDatabase extends SqliteDatabaseImpl } }); } else { - return await _database.requestLock(abortTrigger: lockTimeout?.asTimeout, + return await _database.requestLock(abortTrigger: abortTrigger, (token) async { final context = _UnscopedContext(this, token); try { @@ -164,7 +181,7 @@ final class WebDatabase extends SqliteDatabaseImpl await this.flush(); } } - }); + }).translateAbortExceptions(debugContext ?? 'lock'); } } @@ -322,3 +339,10 @@ Future wrapSqliteException(Future Function() callback) async { rethrow; } } + +extension on Future { + Future translateAbortExceptions(String debugContext) { + return onError( + (e, s) => Error.throwWithStackTrace(AbortException(debugContext), s)); + } +} diff --git a/packages/sqlite_async/lib/src/web/database/async_web_database.dart b/packages/sqlite_async/lib/src/web/database/async_web_database.dart index c7704ea..caea217 100644 --- a/packages/sqlite_async/lib/src/web/database/async_web_database.dart +++ b/packages/sqlite_async/lib/src/web/database/async_web_database.dart @@ -84,12 +84,14 @@ final class AsyncWebDatabaseImpl extends SqliteDatabaseImpl } @override - Future readLock(Future Function(SqliteReadContext tx) callback, - {Duration? lockTimeout, String? debugContext}) async { + Future abortableReadLock( + Future Function(SqliteReadContext tx) callback, + {Future? abortTrigger, + String? debugContext}) async { await isInitialized; return _runZoned(() { - return _connection.readLock(callback, - lockTimeout: lockTimeout, debugContext: debugContext); + return _connection.abortableReadLock(callback, + abortTrigger: abortTrigger, debugContext: debugContext); }, debugContext: debugContext ?? 'execute()'); } @@ -103,6 +105,19 @@ final class AsyncWebDatabaseImpl extends SqliteDatabaseImpl }, debugContext: debugContext ?? 'execute()'); } + @override + Future abortableWriteLock( + Future Function(SqliteWriteContext tx) callback, + {Future? abortTrigger, + String? debugContext, + bool? flush}) async { + await isInitialized; + return _runZoned(() { + return _connection.abortableWriteLock(callback, + abortTrigger: abortTrigger, debugContext: debugContext, flush: flush); + }, debugContext: debugContext ?? 'execute()'); + } + @override Future writeTransaction( Future Function(SqliteWriteContext tx) callback, diff --git a/packages/sqlite_async/lib/src/web/web_mutex.dart b/packages/sqlite_async/lib/src/web/web_mutex.dart index 546b462..e165b92 100644 --- a/packages/sqlite_async/lib/src/web/web_mutex.dart +++ b/packages/sqlite_async/lib/src/web/web_mutex.dart @@ -9,6 +9,8 @@ import 'package:web/web.dart'; import 'package:sqlite_async/src/common/mutex.dart'; +import '../common/timeouts.dart'; + @JS('navigator') external Navigator get _navigator; @@ -30,24 +32,25 @@ class WebMutexImpl implements Mutex { "${DateTime.now().microsecondsSinceEpoch}-${Random().nextDouble()}"; @override - Future lock(Future Function() callback, {Duration? timeout}) { + Future lock(Future Function() callback, + {Future? abortTrigger}) { if (_navigator.has('locks')) { - return _webLock(callback, timeout: timeout); + return _webLock(callback, abortTrigger: abortTrigger); } else { - return _fallbackLock(callback, timeout: timeout); + return _fallbackLock(callback, abortTrigger: abortTrigger); } } /// Locks the callback with a standard Mutex from the `mutex` package Future _fallbackLock(Future Function() callback, - {Duration? timeout}) { - return fallback.lock(callback, timeout: timeout); + {Future? abortTrigger}) { + return fallback.lock(callback, abortTrigger: abortTrigger); } /// Locks the callback with web Navigator locks Future _webLock(Future Function() callback, - {Duration? timeout}) async { - final lock = await _getWebLock(timeout); + {Future? abortTrigger}) async { + final lock = await _getWebLock(abortTrigger); try { final result = await callback(); return result; @@ -61,28 +64,31 @@ class WebMutexImpl implements Mutex { /// which is represented as a [HeldLock]. This hold can be used when wrapping the Dart /// callback to manage the JS lock. /// This is inspired and adapted from https://github.com/simolus3/sqlite3.dart/blob/7bdca77afd7be7159dbef70fd1ac5aa4996211a9/sqlite3_web/lib/src/locks.dart#L6 - Future _getWebLock(Duration? timeout) { + Future _getWebLock(Future? abortTrigger) { final gotLock = Completer.sync(); // Navigator locks can be timed out by using an AbortSignal final controller = AbortController(); - Timer? timer; - - if (timeout != null) { - timer = Timer(timeout, () { - gotLock - .completeError(TimeoutException('Failed to acquire lock', timeout)); - controller.abort('Timeout'.toJS); + if (abortTrigger != null) { + abortTrigger.whenComplete(() { + if (!gotLock.isCompleted) { + gotLock.completeError(AbortException('getWebLock')); + controller.abort('aborted in Dart'.toJS); + } }); } // If timeout occurred before the lock is available, then this callback should not be called. JSPromise jsCallback(JSAny lock) { - timer?.cancel(); - // Give the Held lock something to mark this Navigator lock as completed - final jsCompleter = Completer.sync(); - gotLock.complete(HeldLock._(jsCompleter)); + final jsCompleter = Completer.sync(); + if (!gotLock.isCompleted) { + gotLock.complete(HeldLock._(jsCompleter)); + } else { + // Already aborted, return the navigator lock asap, + jsCompleter.complete(); + } + return jsCompleter.future.toJS; } diff --git a/packages/sqlite_async/test/basic_test.dart b/packages/sqlite_async/test/basic_test.dart index 1295be3..c9e3421 100644 --- a/packages/sqlite_async/test/basic_test.dart +++ b/packages/sqlite_async/test/basic_test.dart @@ -292,14 +292,32 @@ void main() { await expectLater( () => db.writeLock( lockTimeout: Duration(milliseconds: 200), (_) async => {}), - throwsA(anything), + throwsAbortException, ); await completion; - }, onPlatform: { - 'browser': Skip( - 'Web locks are managed with a shared worker, which does not support timeouts', - ) + }); + + test('custom abort triggers', () async { + final releaseLock = Completer(); + final hasLock = Completer(); + + final db = await testUtils.setupDatabase(path: path); + db.withAllConnections((writer, readers) async { + hasLock.complete(); + await releaseLock.future; + }); + + await hasLock.future; + + await expectLater( + db.abortableReadLock((_) async {}, abortTrigger: Future.value(null)), + throwsAbortException, + ); + await expectLater( + db.abortableWriteLock((_) async {}, abortTrigger: Future.value(null)), + throwsAbortException, + ); }); test('execute single statement with RETURNING populates ResultSet', diff --git a/packages/sqlite_async/test/mutex_test.dart b/packages/sqlite_async/test/mutex_test.dart index 577957d..bf7611a 100644 --- a/packages/sqlite_async/test/mutex_test.dart +++ b/packages/sqlite_async/test/mutex_test.dart @@ -32,19 +32,22 @@ void main() { }); }); - test('Timeout should throw a TimeoutException', () async { + test('abort should throw a AbortException', () async { final m = Mutex.simple(); m.lock(() async { await Future.delayed(Duration(milliseconds: 300)); }); await expectLater( - m.lock(() async { + m.lock( + () async { print('This should not get executed'); - }, timeout: Duration(milliseconds: 200)), - throwsA((e) => - e is TimeoutException && - e.message!.contains('Failed to acquire lock'))); + }, + abortTrigger: Future.delayed(const Duration(milliseconds: 200)), + ), + throwsA(isAbortException().having((e) => e.toString(), 'toString()', + contains('A call to lock has been aborted'))), + ); }); test('In-time timeout should function normally', () async { @@ -57,7 +60,7 @@ void main() { await m.lock(() async { results.add(2); - }, timeout: Duration(milliseconds: 200)); + }, abortTrigger: Future.delayed(const Duration(milliseconds: 200))); expect(results, equals([1, 2])); }); diff --git a/packages/sqlite_async/test/native/basic_test.dart b/packages/sqlite_async/test/native/basic_test.dart index b1820fd..721bf73 100644 --- a/packages/sqlite_async/test/native/basic_test.dart +++ b/packages/sqlite_async/test/native/basic_test.dart @@ -4,7 +4,6 @@ library; import 'dart:async'; import 'package:sqlite3/common.dart' as sqlite; -import 'package:sqlite3_connection_pool/sqlite3_connection_pool.dart'; import 'package:sqlite_async/native.dart'; import 'package:sqlite_async/sqlite_async.dart'; import 'package:test/test.dart'; @@ -343,7 +342,7 @@ void main() { await db.readLock((tx) async { await tx.get('select 1'); }, lockTimeout: const Duration(milliseconds: 2)); - }, throwsA(isA())); + }, throwsAbortException); await Future.wait([f1, f2]); }); diff --git a/packages/sqlite_async/test/utils/test_utils_impl.dart b/packages/sqlite_async/test/utils/test_utils_impl.dart index 3406d1e..5cdf084 100644 --- a/packages/sqlite_async/test/utils/test_utils_impl.dart +++ b/packages/sqlite_async/test/utils/test_utils_impl.dart @@ -1,5 +1,14 @@ +import 'package:sqlite_async/sqlite_async.dart'; +import 'package:test/test.dart'; + export 'stub_test_utils.dart' // ignore: uri_does_not_exist if (dart.library.io) 'native_test_utils.dart' // ignore: uri_does_not_exist if (dart.library.js_interop) 'web_test_utils.dart'; + +TypeMatcher isAbortException() { + return isA(); +} + +Matcher get throwsAbortException => throwsA(isAbortException()); From 68c0af4a322422587df387f2744c38aec3b94e39 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 1 Apr 2026 11:57:16 +0200 Subject: [PATCH 3/3] Fix typos --- packages/sqlite_async/CHANGELOG.md | 2 +- packages/sqlite_async/lib/src/sqlite_connection.dart | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/sqlite_async/CHANGELOG.md b/packages/sqlite_async/CHANGELOG.md index 4435384..0b19248 100644 --- a/packages/sqlite_async/CHANGELOG.md +++ b/packages/sqlite_async/CHANGELOG.md @@ -15,7 +15,7 @@ - Instead of `package:sqlite_async/sqlite3_wasm.dart`, import `package:sqlite3/wasm.dart`. - Instead of `package:sqlite_async/sqlite3_web.dart`, import `package:sqlite3_web/sqlite3_web.dart`. - __Breaking__: Remove `SqliteDatabaseMixin` and `SqliteQueries`. Extend `SqliteConnection` instead. -- __Breaking__: Calls locking the database that file with a timeout now throw an `AbortException` instead of a +- __Breaking__: Calls locking the database that fail with a timeout now throw an `AbortException` instead of a `TimeoutException`. - The `throttle` parameter on `watch` and `onChange` can now be set to `null`. This also introduces `watchUnthrottled` and `onChangeUnthrottled`, which only buffer on paused subscriptions instead of applying a static timeout. diff --git a/packages/sqlite_async/lib/src/sqlite_connection.dart b/packages/sqlite_async/lib/src/sqlite_connection.dart index fa6801d..7228bbe 100644 --- a/packages/sqlite_async/lib/src/sqlite_connection.dart +++ b/packages/sqlite_async/lib/src/sqlite_connection.dart @@ -177,10 +177,10 @@ abstract class SqliteConnection implements SqliteWriteContext { /// together when data is changed. /// /// While the stream can efficiently handle backpressure (paused subscriptions - /// will not be notified multiple times afer resuming, they'll receive a + /// will not be notified multiple times after resuming, they'll receive a /// single aggregated event instead), it also installs a throttle of 30 during - /// which the stream will be paused automtically. To disable this behavior, - /// pass `null` for `throttle` or se [onChangeUnthrottled] explicitly. + /// which the stream will be paused automatically. To disable this behavior, + /// pass `null` for `throttle` or use [onChangeUnthrottled] explicitly. Stream onChange(Iterable? tables, {Duration? throttle = const Duration(milliseconds: 30), bool triggerImmediately = true}) { @@ -298,7 +298,7 @@ abstract class SqliteConnection implements SqliteWriteContext { /// may be held concurrently. /// /// If [abortTrigger] is set and completes before the database was able to - /// obtain a read lock, an [AbortException] will be thrown. + /// obtain the write lock, an [AbortException] will be thrown. Future abortableWriteLock( Future Function(SqliteWriteContext tx) callback, {Future? abortTrigger,