Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 4.5.0

* Added `AsyncSeq.last` — returns the last element of the sequence; raises `InvalidOperationException` if empty, mirroring `Seq.last`.
* Added `AsyncSeq.item` — returns the element at the specified index; raises `ArgumentException` if out of bounds, mirroring `Seq.item`.
* Added `AsyncSeq.tryItem` — returns the element at the specified index as `option`, or `None` if the index is out of bounds, mirroring `Seq.tryItem`.

### 4.4.0

* Added `AsyncSeq.findAsync` — async-predicate variant of `AsyncSeq.find`; raises `KeyNotFoundException` if no match, mirroring `Seq.find`.
Expand Down
26 changes: 26 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,32 @@
| None -> return raise (System.InvalidOperationException("The input sequence was empty."))
| Some v -> return v }

let last (source : AsyncSeq<'T>) = async {
let! result = tryLast source
match result with
| None -> return raise (System.InvalidOperationException("The input sequence was empty."))
| Some v -> return v }

let tryItem (index : int) (source : AsyncSeq<'T>) = async {
if index < 0 then return None
else
use ie = source.GetEnumerator()
let! first = ie.MoveNext()
let b = ref first
let i = ref 0
while b.Value.IsSome && !i < index do
let! next = ie.MoveNext()
b := next
i := !i + 1
if !i = index then return b.Value
else return None }

let item (index : int) (source : AsyncSeq<'T>) = async {
let! result = tryItem index source
match result with
| None -> return raise (System.ArgumentException(sprintf "The input sequence has an insufficient number of elements. index = %d" index))
| Some v -> return v }

let exactlyOne (source : AsyncSeq<'T>) = async {
use ie = source.GetEnumerator()
let! first = ie.MoveNext()
Expand Down Expand Up @@ -2221,7 +2247,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2250 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2250 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
12 changes: 12 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ module AsyncSeq =
/// given asynchronous sequence (or None if the sequence is empty).
val tryLast : source:AsyncSeq<'T> -> Async<'T option>

/// Asynchronously returns the last element of the asynchronous sequence.
/// Raises InvalidOperationException if the sequence is empty, mirroring Seq.last.
val last : source:AsyncSeq<'T> -> Async<'T>

/// Asynchronously returns the element at the specified index in the asynchronous sequence.
/// Raises ArgumentException if the index is out of bounds, mirroring Seq.item.
val item : index:int -> source:AsyncSeq<'T> -> Async<'T>

/// Asynchronously returns the element at the specified index in the asynchronous sequence,
/// or None if the index is out of bounds, mirroring Seq.tryItem.
val tryItem : index:int -> source:AsyncSeq<'T> -> Async<'T option>

/// Asynchronously returns the first element that was generated by the
/// given asynchronous sequence (or the specified default value).
val firstOrDefault : ``default``:'T -> source:AsyncSeq<'T> -> Async<'T>
Expand Down
65 changes: 65 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2003 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2009,7 +2009,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2012 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3209,3 +3209,68 @@
|> AsyncSeq.forallAsync (fun _ -> async { return false })
|> Async.RunSynchronously
Assert.IsTrue(result)

// ===== last =====

[<Test>]
let ``AsyncSeq.last returns last element`` () =
let source = asyncSeq { yield 1; yield 2; yield 3 }
let result = AsyncSeq.last source |> Async.RunSynchronously
Assert.AreEqual(3, result)

[<Test>]
let ``AsyncSeq.last on singleton returns that element`` () =
let result = AsyncSeq.last (AsyncSeq.singleton 42) |> Async.RunSynchronously
Assert.AreEqual(42, result)

[<Test>]
let ``AsyncSeq.last raises on empty sequence`` () =
Assert.Throws<InvalidOperationException>(fun () ->
AsyncSeq.last AsyncSeq.empty<int> |> Async.RunSynchronously |> ignore) |> ignore

// ===== item =====

[<Test>]
let ``AsyncSeq.item returns element at index 0`` () =
let source = asyncSeq { yield 10; yield 20; yield 30 }
let result = AsyncSeq.item 0 source |> Async.RunSynchronously
Assert.AreEqual(10, result)

[<Test>]
let ``AsyncSeq.item returns element at index 2`` () =
let source = asyncSeq { yield 10; yield 20; yield 30 }
let result = AsyncSeq.item 2 source |> Async.RunSynchronously
Assert.AreEqual(30, result)

[<Test>]
let ``AsyncSeq.item raises when index out of bounds`` () =
Assert.Throws<ArgumentException>(fun () ->
AsyncSeq.item 5 (AsyncSeq.ofSeq [1;2;3]) |> Async.RunSynchronously |> ignore) |> ignore

[<Test>]
let ``AsyncSeq.item raises when index negative`` () =
Assert.Throws<ArgumentException>(fun () ->
AsyncSeq.item -1 (AsyncSeq.ofSeq [1;2;3]) |> Async.RunSynchronously |> ignore) |> ignore

// ===== tryItem =====

[<Test>]
let ``AsyncSeq.tryItem returns Some for valid index`` () =
let source = asyncSeq { yield 10; yield 20; yield 30 }
let result = AsyncSeq.tryItem 1 source |> Async.RunSynchronously
Assert.AreEqual(Some 20, result)

[<Test>]
let ``AsyncSeq.tryItem returns None for out-of-bounds index`` () =
let result = AsyncSeq.tryItem 10 (AsyncSeq.ofSeq [1;2;3]) |> Async.RunSynchronously
Assert.AreEqual(None, result)

[<Test>]
let ``AsyncSeq.tryItem returns None for negative index`` () =
let result = AsyncSeq.tryItem -1 (AsyncSeq.ofSeq [1;2;3]) |> Async.RunSynchronously
Assert.AreEqual(None, result)

[<Test>]
let ``AsyncSeq.tryItem returns None on empty sequence`` () =
let result = AsyncSeq.tryItem 0 AsyncSeq.empty<int> |> Async.RunSynchronously
Assert.AreEqual(None, result)