Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 4.9.0

* Added `AsyncSeq.transpose` — transposes an async sequence of sequences, yielding each column as an array; buffers all rows before yielding; mirrors `Seq.transpose`. Raises `InvalidOperationException` if inner sequences have different lengths.

### 4.8.0

* Added `AsyncSeq.mapFoldAsync` — maps each element using an asynchronous folder that also threads an accumulator state, returning both the array of results and the final state; mirrors `Seq.mapFold`.
Expand Down
12 changes: 12 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2035,6 +2035,18 @@
let! arr = toArrayAsync source
for i in arr.Length - 1 .. -1 .. 0 do
yield arr.[i] }

/// Transposes the rows and columns of an async sequence of sequences.
/// Buffers the entire source sequence. Raises InvalidOperationException if inner sequences
/// have different lengths. Mirrors Seq.transpose.
let transpose (source: AsyncSeq<seq<'T>>) : AsyncSeq<'T[]> = asyncSeq {
let! rows = toListAsync (source |> map Seq.toArray)
if not rows.IsEmpty then
let firstLen = rows.Head.Length
if rows |> List.exists (fun row -> row.Length <> firstLen) then
invalidOp "The input sequences have different lengths."
for col in 0 .. firstLen - 1 do
yield rows |> List.map (fun row -> row.[col]) |> List.toArray }
#endif

#if !FABLE_COMPILER
Expand Down Expand Up @@ -2397,7 +2409,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2412 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2412 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
6 changes: 6 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,12 @@ module AsyncSeq =
/// sequence is buffered before yielding any elements, mirroring Seq.rev.
/// This function should not be used with large or infinite sequences.
val rev : source:AsyncSeq<'T> -> AsyncSeq<'T>

/// Transposes the rows and columns of an async sequence of sequences, yielding each
/// column as an array. The entire source sequence is buffered before any column is yielded,
/// mirroring Seq.transpose. Raises InvalidOperationException if inner sequences have
/// different lengths. This function should not be used with large or infinite sequences.
val transpose : source:AsyncSeq<seq<'T>> -> AsyncSeq<'T[]>
#endif

/// Interleaves two async sequences of the same type into a resulting sequence. The provided
Expand Down
66 changes: 66 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2003 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2009,7 +2009,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2012 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3662,3 +3662,69 @@
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously |> ignore)
|> ignore

// ===== transpose =====

[<Test>]
let ``AsyncSeq.transpose basic 2x3 matrix`` () =
let source =
asyncSeq {
yield seq { yield 1; yield 2; yield 3 }
yield seq { yield 4; yield 5; yield 6 }
}
let result = AsyncSeq.transpose source |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual(3, result.Length)
Assert.AreEqual([| 1; 4 |], result.[0])
Assert.AreEqual([| 2; 5 |], result.[1])
Assert.AreEqual([| 3; 6 |], result.[2])

[<Test>]
let ``AsyncSeq.transpose empty outer sequence yields empty`` () =
let result =
AsyncSeq.empty<seq<int>>
|> AsyncSeq.transpose
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.transpose single row returns one column per element`` () =
let source = asyncSeq { yield seq { yield 1; yield 2; yield 3 } }
let result = AsyncSeq.transpose source |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual(3, result.Length)
Assert.AreEqual([| 1 |], result.[0])
Assert.AreEqual([| 2 |], result.[1])
Assert.AreEqual([| 3 |], result.[2])

[<Test>]
let ``AsyncSeq.transpose single column returns one row per element`` () =
let source =
asyncSeq {
yield seq { yield 1 }
yield seq { yield 2 }
yield seq { yield 3 }
}
let result = AsyncSeq.transpose source |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual(1, result.Length)
Assert.AreEqual([| 1; 2; 3 |], result.[0])

[<Test>]
let ``AsyncSeq.transpose of singleton rows yields one column`` () =
let source = asyncSeq { yield seq { yield 7 }; yield seq { yield 8 } }
let result = AsyncSeq.transpose source |> AsyncSeq.toArrayAsync |> Async.RunSynchronously
Assert.AreEqual(1, result.Length)
Assert.AreEqual([| 7; 8 |], result.[0])

[<Test>]
let ``AsyncSeq.transpose raises InvalidOperationException for jagged input`` () =
let source =
asyncSeq {
yield seq { yield 1; yield 2 }
yield seq { yield 3 }
}
Assert.Throws<System.InvalidOperationException>(fun () ->
AsyncSeq.transpose source
|> AsyncSeq.toArrayAsync
|> Async.RunSynchronously
|> ignore)
|> ignore