File tree Expand file tree Collapse file tree 2 files changed +32
-13
lines changed
Expand file tree Collapse file tree 2 files changed +32
-13
lines changed Original file line number Diff line number Diff line change @@ -53,23 +53,29 @@ function append_chunk(bs::BufferStream, data::AbstractVector{UInt8})
5353 throw (ArgumentError (" Stream is closed" ))
5454 end
5555
56- if bs. max_len != 0 && length (data) > bs. max_len
57- throw (ArgumentError (" Chunk is too large to fit into this BufferStream!" ))
58- end
59-
6056 # Copy the data so that users can't clobber our internal list
6157 lock (bs. write_cond) do
62- # If we would exceed our maximum length, then we must wait until someone reads from us.
63- while bs. max_len != 0 && length (bs) + length (data) > bs. max_len
64- wait (bs. write_cond)
58+ data_written = 0
59+ while data_written < length (data)
60+ if bs. max_len == 0
61+ space_available = length (data)
62+ else
63+ space_available = bs. max_len - length (bs)
64+ end
65+ if space_available == 0
66+ wait (bs. write_cond)
67+ continue
68+ end
69+ bytes_to_write = min (space_available, length (data) - data_written)
70+ push! (bs. chunks, data[data_written+ 1 : data_written+ bytes_to_write])
71+ # Notify someone who was waiting for some data
72+ lock (bs. read_cond) do
73+ notify (bs. read_cond; all= false )
74+ end
75+ data_written += bytes_to_write
6576 end
66- push! (bs. chunks, data)
6777 end
6878
69- # Notify someone who was waiting for some data
70- lock (bs. read_cond) do
71- notify (bs. read_cond; all= false )
72- end
7379 return length (data)
7480end
7581
Original file line number Diff line number Diff line change 8181
8282@testset " max_len" begin
8383 bs = BufferStream (1000 )
84- @test_throws ArgumentError write (bs, zeros (UInt8, 1001 ))
8584 write (bs, zeros (UInt8, 1000 ))
8685 t_write = @async begin
8786 t_elapsed = @elapsed write (bs, zeros (UInt8, 1 ))
9796 # Consume the rest of that chunk so it can be dropped, allowing the pending write to go through
9897 readavailable (bs)
9998 wait (t_write)
99+ # read the last byte that it outputs
100+ @test length (readavailable (bs)) == 1
101+
102+ # Also test writing a chunk larger than the entire buffer
103+ t_write = @async begin
104+ t_elapsed = @elapsed write (bs, zeros (UInt8, 3000 ))
105+ @test t_elapsed > 0.01
106+ end
107+ sleep (0.05 )
108+ # We need to read three times, as each time we can only read 1000 elements
109+ @test length (readavailable (bs)) == 1000
110+ @test length (readavailable (bs)) == 1000
111+ @test length (readavailable (bs)) == 1000
112+ wait (t_write)
100113end
101114
102115function tee_task (io_in, io_outs... )
You can’t perform that action at this time.
0 commit comments