Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ jobs:
enableCrossOsArchive: true

- name: "Get dependencies"
# Passing --offline here asserts that dependencies have been cached properly.
run: |
dart pub get --offline
dart pub get
dart pub global activate pana

- name: Setup
Expand Down
7 changes: 6 additions & 1 deletion packages/sqlite_async/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
- Instead of `package:sqlite_async/sqlite3_wasm.dart`, import `package:sqlite3/wasm.dart`.
- Instead of `package:sqlite_async/sqlite3_web.dart`, import `package:sqlite3_web/sqlite3_web.dart`.
- __Breaking__: Remove `SqliteDatabaseMixin` and `SqliteQueries`. Extend `SqliteConnection` instead.

- __Breaking__: Calls locking the database that fail with a timeout now throw an `AbortException` instead of a
`TimeoutException`.
- The `throttle` parameter on `watch` and `onChange` can now be set to `null`. This also introduces `watchUnthrottled`
and `onChangeUnthrottled`, which only buffer on paused subscriptions instead of applying a static timeout.
- Add `abortableReadLock` and `abortableWriteLock`. The methods can be used to acquire a read/write context with a flexible
abort signal instead of a static timeout.

## 0.13.1

Expand Down
1 change: 1 addition & 0 deletions packages/sqlite_async/lib/sqlite_async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export 'src/common/abstract_open_factory.dart' hide InternalOpenFactory;
export 'src/common/connection/sync_sqlite_connection.dart';
export 'src/common/mutex.dart';
export 'src/common/sqlite_database.dart' hide SqliteDatabaseImpl;
export 'src/common/timeouts.dart' show AbortException;
export 'src/sqlite_connection.dart';
export 'src/sqlite_migrations.dart';
export 'src/sqlite_options.dart';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ final class SyncSqliteConnection extends SqliteConnection {
}

@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
Future<T> abortableReadLock<T>(
Future<T> Function(SqliteReadContext tx) callback,
{Future<void>? abortTrigger,
String? debugContext}) {
final task = profileQueries ? TimelineTask() : null;
task?.start('${profilerPrefix}mutex_lock');

Expand All @@ -49,13 +51,15 @@ final class SyncSqliteConnection extends SqliteConnection {
callback,
);
},
timeout: lockTimeout,
abortTrigger: abortTrigger,
);
}

@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
Future<T> abortableWriteLock<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Future<void>? abortTrigger,
String? debugContext}) {
final task = profileQueries ? TimelineTask() : null;
task?.start('${profilerPrefix}mutex_lock');

Expand All @@ -67,7 +71,7 @@ final class SyncSqliteConnection extends SqliteConnection {
callback,
);
},
timeout: lockTimeout,
abortTrigger: abortTrigger,
);
}

Expand Down
49 changes: 30 additions & 19 deletions packages/sqlite_async/lib/src/common/mutex.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import 'dart:async';

import 'timeouts.dart';

/// An asynchronous mutex.
abstract interface class Mutex {
/// Creates a simple mutex instance that can't be shared between tabs or
/// isolates.
factory Mutex.simple() = _SimpleMutex;

/// timeout is a timeout for acquiring the lock, not for the callback
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout});
/// Runs [callback] in a critical section.
///
/// If [abortTrigger] completes before the critical section was entered, an
/// [AbortException] is thrown and [callback] will not be invoked.
Future<T> lock<T>(Future<T> Function() callback,
{Future<void>? abortTrigger});
}

class LockError extends Error {
Expand All @@ -24,40 +30,45 @@ class LockError extends Error {
/// Mutex maintains a queue of Future-returning functions that are executed
/// sequentially.
final class _SimpleMutex implements Mutex {
Future<dynamic>? last;
Future<void>? last;

// Hack to make sure the Mutex is not copied to another isolate.
// ignore: unused_field
final Finalizer _f = Finalizer((_) {});

@override
Future<T> lock<T>(Future<T> Function() callback, {Duration? timeout}) async {
Future<T> lock<T>(Future<T> Function() callback,
{Future<void>? abortTrigger}) async {
if (Zone.current[this] != null) {
throw LockError('Recursive lock is not allowed');
}
var zone = Zone.current.fork(zoneValues: {this: true});

return zone.run(() async {
final prev = last;
var previousDidComplete = false;

final completer = Completer<void>.sync();
last = completer.future;
try {
// If there is a previous running block, wait for it
if (prev != null) {
if (timeout != null) {
// This could throw a timeout error
try {
await prev.timeout(timeout);
} catch (error) {
if (error is TimeoutException) {
throw TimeoutException('Failed to acquire lock', timeout);
} else {
rethrow;
final prevOrAbort = Completer<void>.sync();

prev.then((_) {
previousDidComplete = true;
if (!prevOrAbort.isCompleted) prevOrAbort.complete();
});
if (abortTrigger != null) {
abortTrigger.whenComplete(() {
if (!prevOrAbort.isCompleted) {
prevOrAbort.completeError(
AbortException('lock'), StackTrace.current);
}
}
} else {
await prev;
});
}

await prevOrAbort.future;
}

// Run the function and return the result
Expand All @@ -66,7 +77,7 @@ final class _SimpleMutex implements Mutex {
// Cleanup
// waiting for the previous task to be done in case of timeout
void complete() {
// Only mark it unlocked when the last one complete
// Only mark it unlocked when the last one completes
if (identical(last, completer.future)) {
last = null;
}
Expand All @@ -75,8 +86,8 @@ final class _SimpleMutex implements Mutex {

// In case of timeout, wait for the previous one to complete too
// before marking this task as complete
if (prev != null && timeout != null) {
// But we still returns immediately
if (prev != null && !previousDidComplete) {
// But we still return immediately
prev.then((_) {
complete();
}).ignore();
Expand Down
20 changes: 20 additions & 0 deletions packages/sqlite_async/lib/src/common/timeouts.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,24 @@
/// @docImport '../sqlite_connection.dart';
library;

import 'package:meta/meta.dart';

@internal
extension TimeoutDurationToFuture on Duration {
/// Returns a future that completes with `void` after this duration.
Future<void> get asTimeout => Future.delayed(this);
}

/// An exception thrown when calls to [SqliteConnection.readLock],
/// [SqliteConnection.writeLock] and similar methods are aborted or time out
/// before a connection could be obtained from the pool.
final class AbortException implements Exception {
final String _methodName;

AbortException(this._methodName);

@override
String toString() {
return 'A call to $_methodName has been aborted';
}
}
20 changes: 12 additions & 8 deletions packages/sqlite_async/lib/src/impl/single_connection_database.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@ final class SingleConnectionDatabase extends SqliteDatabaseImpl {
SqliteOpenFactory get openFactory => throw UnimplementedError();

@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
return connection.readLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
Future<T> abortableReadLock<T>(
Future<T> Function(SqliteReadContext tx) callback,
{Future<void>? abortTrigger,
String? debugContext}) {
return connection.abortableReadLock(callback,
abortTrigger: abortTrigger, debugContext: debugContext);
}

@override
Stream<UpdateNotification> get updates => connection.updates;

@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) {
return connection.writeLock(callback,
lockTimeout: lockTimeout, debugContext: debugContext);
Future<T> abortableWriteLock<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Future<void>? abortTrigger,
String? debugContext}) {
return connection.abortableWriteLock(callback,
abortTrigger: abortTrigger, debugContext: debugContext);
}

@override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
{Duration? lockTimeout}) async {
return _useConnection(
writer: false,
lockTimeout: lockTimeout,
abortTrigger: lockTimeout?.asTimeout,
debugContext: 'readTransaction',
(context) {
return _transactionInLease(context, callback);
Expand All @@ -120,7 +120,7 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
{Duration? lockTimeout}) {
return _useConnection(
writer: true,
lockTimeout: lockTimeout,
abortTrigger: lockTimeout?.asTimeout,
debugContext: 'writeTransaction',
(context) {
return _transactionInLease(context, callback);
Expand All @@ -137,23 +137,27 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
}

@override
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
Future<T> abortableReadLock<T>(
Future<T> Function(SqliteReadContext tx) callback,
{Future<void>? abortTrigger,
String? debugContext}) async {
return _useConnection(
writer: false,
debugContext: debugContext ?? 'readLock',
lockTimeout: lockTimeout,
abortTrigger: abortTrigger,
(context) => ScopedReadContext.assumeReadLock(context, callback),
);
}

@override
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
{Duration? lockTimeout, String? debugContext}) async {
Future<T> abortableWriteLock<T>(
Future<T> Function(SqliteWriteContext tx) callback,
{Future<void>? abortTrigger,
String? debugContext}) async {
return _useConnection(
writer: true,
debugContext: debugContext ?? 'writeLock',
lockTimeout: lockTimeout,
abortTrigger: abortTrigger,
(context) => ScopedWriteContext.assumeWriteLock(context, callback),
);
}
Expand All @@ -162,14 +166,14 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
Future<T> Function(_LeasedContext context) callback, {
required bool writer,
required String debugContext,
Duration? lockTimeout,
Future<void>? abortTrigger,
}) {
final timeout = lockTimeout?.asTimeout;
return _runInLockContext(debugContext, () async {
final pool = await _pool;
final connection = await (writer
? pool.writer(abortSignal: timeout)
: pool.reader(abortSignal: timeout));
? pool.writer(abortSignal: abortTrigger)
: pool.reader(abortSignal: abortTrigger))
.translateAbortExceptions(debugContext);

try {
final context = _LeasedContext(
Expand Down Expand Up @@ -447,3 +451,10 @@ final class _LeasedContext extends UnscopedContext {
return (db) => db.execute(sql);
}
}

extension<T> on Future<T> {
Future<T> translateAbortExceptions(String debugContext) {
return onError<PoolAbortException>(
(e, s) => Error.throwWithStackTrace(AbortException(debugContext), s));
}
}
Loading
Loading