From bb744fd14f01223961b0d2c74229a0ce001872e5 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Fri, 28 Aug 2015 00:41:09 -0400 Subject: [PATCH] refactor IO types to be less brittle, more flat, and to fix #12829 and #12050 (cherry picked from commit 43e283d6535efa793e968964c75e6d60c43b0120) ref #12839 --- base/REPL.jl | 5 +- base/fs.jl | 15 +- base/io.jl | 45 ++++- base/iobuffer.jl | 1 + base/iostream.jl | 13 +- base/multi.jl | 8 +- base/process.jl | 49 +++--- base/socket.jl | 24 +-- base/stream.jl | 405 ++++++++++++++++++++++----------------------- base/strings/io.jl | 21 ++- test/spawn.jl | 46 ++++- 11 files changed, 348 insertions(+), 284 deletions(-) diff --git a/base/REPL.jl b/base/REPL.jl index df3af0a1d26d1..1616ca8f85e24 100644 --- a/base/REPL.jl +++ b/base/REPL.jl @@ -13,7 +13,6 @@ export StreamREPL import Base: - AsyncStream, Display, display, writemime, @@ -885,7 +884,7 @@ end outstream(s::StreamREPL) = s.stream -StreamREPL(stream::AsyncStream) = StreamREPL(stream, julia_green, Base.text_colors[:white], Base.answer_color()) +StreamREPL(stream::IO) = StreamREPL(stream, julia_green, Base.text_colors[:white], Base.answer_color()) answer_color(r::LineEditREPL) = r.envcolors ? Base.answer_color() : r.answer_color answer_color(r::StreamREPL) = r.answer_color @@ -893,7 +892,7 @@ input_color(r::LineEditREPL) = r.envcolors ? Base.input_color() : r.input_color input_color(r::StreamREPL) = r.input_color -function run_repl(stream::AsyncStream) +function run_repl(stream::IO) repl = @async begin repl_channel = Channel(1) diff --git a/base/fs.jl b/base/fs.jl index db991a47a9206..c890d16402213 100644 --- a/base/fs.jl +++ b/base/fs.jl @@ -231,19 +231,14 @@ function read(f::File, ::Type{UInt8}) return ret%UInt8 end -function read!{T}(f::File, a::Array{T}, nel=length(a)) +function read!(f::File, a::Vector{UInt8}, nel=length(a)) if nel < 0 || nel > length(a) throw(BoundsError()) end - if isbits(T) - nb = nel*sizeof(T) - ret = ccall(:jl_fs_read, Int32, (Int32, Ptr{Void}, Csize_t), - f.handle, a, nb) - uv_error("read",ret) - else - invoke(read, Tuple{IO, Array}, s, a) - end - a + ret = ccall(:jl_fs_read, Int32, (Int32, Ptr{Void}, Csize_t), + f.handle, a, nel) + uv_error("read",ret) + return a end nb_available(f::File) = filesize(f) - position(f) diff --git a/base/io.jl b/base/io.jl index 1203f1244e6dd..ca653d8970dd9 100644 --- a/base/io.jl +++ b/base/io.jl @@ -28,9 +28,6 @@ isreadonly(s) = isreadable(s) && !iswritable(s) ## binary I/O ## -# all subtypes should implement this -write(s::IO, x::UInt8) = error(typeof(s)," does not support byte I/O") - write(io::IO, x) = throw(MethodError(write, (io, x))) function write(io::IO, xs...) local written::Int = 0 @@ -107,9 +104,6 @@ function write(io::IO, s::Symbol) return write(io, pname, Int(ccall(:strlen, Csize_t, (Ptr{UInt8},), pname))) end -# all subtypes should implement this -read(s::IO, ::Type{UInt8}) = error(typeof(s)," does not support byte I/O") - read(s::IO, ::Type{Int8}) = reinterpret(Int8, read(s,UInt8)) function read{T <: Integer}(s::IO, ::Type{T}) @@ -131,9 +125,20 @@ read{T}(s::IO, t::Type{T}, d1::Integer, dims::Integer...) = read{T}(s::IO, ::Type{T}, dims::Dims) = read!(s, Array(T, dims)) +function read!(s::IO, a::Vector{UInt8}) + for i in 1:length(a) + a[i] = read(s, UInt8) + end +end + function read!{T}(s::IO, a::Array{T}) - for i in eachindex(a) - a[i] = read(s, T) + if isbits(T) + nb::Int = length(a) * sizeof(T) + read!(s, reinterpret(UInt8, a, (nb,))) + else + for i in eachindex(a) + a[i] = read(s, T) + end end return a end @@ -219,7 +224,7 @@ function readuntil(s::IO, t::AbstractString) return takebuf_string(out) end - +readline() = readline(STDIN) readline(s::IO) = readuntil(s, '\n') readchomp(x) = chomp!(readall(x)) @@ -308,3 +313,25 @@ ismarked(io::IO) = io.mark >= 0 lock(::IO) = nothing unlock(::IO) = nothing +reseteof(x::IO) = nothing + +const SZ_UNBUFFERED_IO = 65536 +buffer_writes(x::IO, bufsize=SZ_UNBUFFERED_IO) = nothing + +function isopen end +function close end +function flush end +function wait_connected end +function wait_readnb end +function wait_readbyte end +function wait_close end +function nb_available end +function readavailable end +function isreadable end +function iswritable end +function copy end +function eof end + +# all subtypes should implement this +read(s::IO, ::Type{UInt8}) = error(typeof(s)," does not support byte I/O") +write(s::IO, x::UInt8) = error(typeof(s)," does not support byte I/O") diff --git a/base/iobuffer.jl b/base/iobuffer.jl index e6f4914741b1e..0b3c177b02b6e 100644 --- a/base/iobuffer.jl +++ b/base/iobuffer.jl @@ -52,6 +52,7 @@ show(io::IO, b::AbstractIOBuffer) = print(io, "IOBuffer(data=UInt8[...], ", "ptr=", b.ptr, ", ", "mark=", b.mark, ")") +read!(from::AbstractIOBuffer, a::Vector{UInt8}) = read_sub(from, a, 1, length(a)) read!(from::AbstractIOBuffer, a::Array) = read_sub(from, a, 1, length(a)) function read_sub{T}(from::AbstractIOBuffer, a::AbstractArray{T}, offs, nel) diff --git a/base/iostream.jl b/base/iostream.jl index b1c08cfb0be60..e3bdd07329baf 100644 --- a/base/iostream.jl +++ b/base/iostream.jl @@ -170,15 +170,10 @@ function read{T<:Union{UInt16, Int16, UInt32, Int32, UInt64, Int64}}(s::IOStream ccall(:jl_ios_get_nbyte_int, UInt64, (Ptr{Void}, Csize_t), s.ios, sizeof(T)) % T end -function read!{T}(s::IOStream, a::Array{T}) - if isbits(T) - nb = length(a)*sizeof(T) - if ccall(:ios_readall, UInt, - (Ptr{Void}, Ptr{Void}, UInt), s.ios, a, nb) < nb - throw(EOFError()) - end - else - invoke(read!, Tuple{IO, Array}, s, a) +function read!(s::IOStream, a::Vector{UInt8}) + if ccall(:ios_readall, UInt, + (Ptr{Void}, Ptr{Void}, UInt), s.ios, a, sizeof(a)) < sizeof(a) + throw(EOFError()) end a end diff --git a/base/multi.jl b/base/multi.jl index cff41f09549aa..3378654457d59 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -134,8 +134,8 @@ type Worker c_state::Condition # wait for state changes ct_time::Float64 # creation time - r_stream::AsyncStream - w_stream::AsyncStream + r_stream::IO + w_stream::IO manager::ClusterManager config::WorkerConfig @@ -836,9 +836,9 @@ function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket) message_handler_loop(r_stream, w_stream) end -process_messages(r_stream::AsyncStream, w_stream::AsyncStream) = @schedule message_handler_loop(r_stream, w_stream) +process_messages(r_stream::IO, w_stream::IO) = @schedule message_handler_loop(r_stream, w_stream) -function message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream) +function message_handler_loop(r_stream::IO, w_stream::IO) global PGRP global cluster_manager diff --git a/base/process.jl b/base/process.jl index 879e4288bf356..c9af9053f9d7d 100644 --- a/base/process.jl +++ b/base/process.jl @@ -86,7 +86,7 @@ immutable FileRedirect end end -immutable DevNullStream <: AsyncStream end +immutable DevNullStream <: IO end const DevNull = DevNullStream() isreadable(::DevNullStream) = false iswritable(::DevNullStream) = true @@ -96,17 +96,24 @@ write{T<:DevNullStream}(::T, args...) = 0 close(::DevNullStream) = nothing flush(::DevNullStream) = nothing copy(::DevNullStream) = DevNull +wait_connected(::DevNullStream) = nothing +wait_readnb(::DevNullStream) = wait() +wait_readbyte(::DevNullStream) = wait() +wait_close(::DevNullStream) = wait() +eof(::DevNullStream) = true uvhandle(::DevNullStream) = C_NULL +uvtype(::DevNullStream) = UV_STREAM + uvhandle(x::Ptr) = x uvtype(::Ptr) = UV_STREAM -uvtype(::DevNullStream) = UV_STREAM # Not actually a pointer, but that's how we pass it through the C API so it's fine uvhandle(x::RawFD) = convert(Ptr{Void}, x.fd % UInt) uvtype(x::RawFD) = UV_RAW_FD -typealias Redirectable Union{AsyncStream, FS.File, FileRedirect, DevNullStream, IOStream, RawFD} +typealias Redirectable Union{IO, FileRedirect, RawFD} +typealias StdIOSet NTuple{3, Union{Redirectable, Ptr{Void}}} # XXX: remove Ptr{Void} once libuv is refactored to use upstream release immutable CmdRedirect <: AbstractCmd cmd::AbstractCmd @@ -197,30 +204,30 @@ pipeline(src::Union{Redirectable,AbstractString}, cmd::AbstractCmd) = pipeline(c pipeline(a, b, c, d...) = pipeline(pipeline(a,b), c, d...) -typealias RawOrBoxedHandle Union{UVHandle,AsyncStream,Redirectable,IOStream} -typealias StdIOSet NTuple{3,RawOrBoxedHandle} - type Process <: AbstractPipe cmd::Cmd handle::Ptr{Void} - in::AsyncStream - out::AsyncStream - err::AsyncStream + in::IO + out::IO + err::IO exitcode::Int64 termsignal::Int32 exitcb::Callback exitnotify::Condition closecb::Callback closenotify::Condition - function Process(cmd::Cmd, handle::Ptr{Void}, in::RawOrBoxedHandle, out::RawOrBoxedHandle, err::RawOrBoxedHandle) - if !isa(in, AsyncStream) || in === DevNull - in=DevNull + function Process(cmd::Cmd, handle::Ptr{Void}, + in::Union{Redirectable, Ptr{Void}}, + out::Union{Redirectable, Ptr{Void}}, + err::Union{Redirectable, Ptr{Void}}) + if !isa(in, IO) + in = DevNull end - if !isa(out, AsyncStream) || out === DevNull - out=DevNull + if !isa(out, IO) + out = DevNull end - if !isa(err, AsyncStream) || err === DevNull - err=DevNull + if !isa(err, IO) + err = DevNull end this = new(cmd, handle, in, out, err, typemin(fieldtype(Process, :exitcode)), @@ -431,7 +438,7 @@ end # | | \ The function to be called once the uv handle is closed # | \ The function to be called once the process exits # \ A set of up to 256 stdio instructions, where each entry can be either: -# | - An AsyncStream to be passed to the child +# | - An IO to be passed to the child # | - DevNull to pass /dev/null # | - An FS.File object to redirect the output to # \ - An ASCIIString specifying a filename to be opened @@ -464,7 +471,7 @@ end eachline(cmd::AbstractCmd) = eachline(cmd, DevNull) # return a Process object to read-to/write-from the pipeline -function open(cmds::AbstractCmd, mode::AbstractString="r", other::AsyncStream=DevNull) +function open(cmds::AbstractCmd, mode::AbstractString="r", other::Redirectable=DevNull) if mode == "r" in = other out = io = Pipe() @@ -502,18 +509,18 @@ function readandwrite(cmds::AbstractCmd) (out, in, processes) end -function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull) +function readbytes(cmd::AbstractCmd, stdin::Redirectable=DevNull) out, procs = open(cmd, "r", stdin) bytes = readbytes(out) !success(procs) && pipeline_error(procs) return bytes end -function readall(cmd::AbstractCmd, stdin::AsyncStream=DevNull) +function readall(cmd::AbstractCmd, stdin::Redirectable=DevNull) return bytestring(readbytes(cmd, stdin)) end -function writeall(cmd::AbstractCmd, stdin::AbstractString, stdout::AsyncStream=DevNull) +function writeall(cmd::AbstractCmd, stdin::AbstractString, stdout::Redirectable=DevNull) open(cmd, "w", stdout) do io write(io, stdin) end diff --git a/base/socket.jl b/base/socket.jl index 01cfa0756d31d..8c5491fe86a66 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -248,9 +248,7 @@ end ## SOCKETS ## -abstract Socket <: AsyncStream - -type TCPSocket <: Socket +type TCPSocket <: LibuvStream handle::Ptr{Void} status::Int line_buffered::Bool @@ -294,10 +292,7 @@ function TCPSocket() this end -lock(s::TCPSocket) = lock(s.lock) -unlock(s::TCPSocket) = unlock(s.lock) - -type TCPServer <: UVServer +type TCPServer <: LibuvServer handle::Ptr{Void} status::Int ccb::Callback @@ -328,13 +323,8 @@ function TCPServer() this end -isreadable(io::TCPSocket) = true -iswritable(io::TCPSocket) = true - -show(io::IO,sock::TCPSocket) = print(io,"TCPSocket(",uv_status_string(sock),", ", - nb_available(sock.buffer)," bytes waiting)") - -show(io::IO,sock::TCPServer) = print(io,"TCPServer(",uv_status_string(sock),")") +isreadable(io::TCPSocket) = isopen(io) || nb_available(io) > 0 +iswritable(io::TCPSocket) = isopen(io) && io.status != StatusClosing ## VARIOUS METHODS TO BE MOVED TO BETTER LOCATION @@ -365,7 +355,7 @@ _bind(sock::TCPServer, host::IPv6, port::UInt16) = ccall(:jl_tcp_bind6, Int32, ( # UDP -type UDPSocket <: Socket +type UDPSocket <: LibuvStream handle::Ptr{Void} status::Int recvnotify::Condition @@ -694,7 +684,7 @@ end ## -listen(sock::UVServer; backlog::Integer=BACKLOG_DEFAULT) = (uv_error("listen",_listen(sock;backlog=backlog)); sock) +listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT) = (uv_error("listen",_listen(sock;backlog=backlog)); sock) function listen(addr; backlog::Integer=BACKLOG_DEFAULT) sock = TCPServer() @@ -706,7 +696,7 @@ listen(port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(IPv4(UInt32(0)) listen(host::IPAddr, port::Integer; backlog::Integer=BACKLOG_DEFAULT) = listen(InetAddr(host,port);backlog=backlog) listen(cb::Callback,args...; backlog::Integer=BACKLOG_DEFAULT) = (sock=listen(args...;backlog=backlog);sock.ccb=cb;sock) -listen(cb::Callback,sock::Socket; backlog::Integer=BACKLOG_DEFAULT) = (sock.ccb=cb;listen(sock;backlog=backlog)) +listen(cb::Callback,sock::Union{TCPSocket,UDPSocket}; backlog::Integer=BACKLOG_DEFAULT) = (sock.ccb=cb;listen(sock;backlog=backlog)) ## diff --git a/base/stream.jl b/base/stream.jl index b254a1f5e8e76..8027d464f4c21 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -9,10 +9,35 @@ import .Libc: RawFD, dup ## types ## typealias Callback Union{Function,Bool} -abstract AsyncStream <: IO -abstract UVServer - -typealias UVHandle Ptr{Void} +abstract IOServer +abstract LibuvServer <: IOServer +abstract LibuvStream <: IO + +# IO +# +- AbstractIOBuffer{T<:AbstractArray{UInt8,1}} (not exported) +# +- AbstractPipe (not exported) +# . +- Pipe +# . +- Process (not exported) +# . +- ProcessChain (not exported) +# +- Base64DecodePipe +# +- Base64EncodePipe +# +- BufferStream +# +- DevNullStream (not exported) +# +- Filesystem.File +# +- LibuvStream (not exported) +# . +- PipeEndpoint (not exported) +# . +- TCPSocket +# . +- TTY (not exported) +# . +- UDPSocket +# +- IOBuffer = Base.AbstractIOBuffer{Array{UInt8,1}} +# +- IOStream + +# IOServer +# +- LibuvServer +# . +- PipeServer +# . +- TCPServer + +# Redirectable = Union{IO, FileRedirect, Libc.RawFD} (not exported) # convert UV handle data to julia object, checking for null macro handle_as(hand, typ) @@ -26,10 +51,19 @@ end # A dict of all libuv handles that are being waited on somewhere in the system # and should thus not be garbage collected const uvhandles = ObjectIdDict() - preserve_handle(x) = uvhandles[x] = get(uvhandles,x,0)+1 unpreserve_handle(x) = (v = uvhandles[x]; v == 1 ? pop!(uvhandles,x) : (uvhandles[x] = v-1); nothing) +function stream_wait(x, c...) # for x::LibuvObject + preserve_handle(x) + try + return wait(c...) + finally + unpreserve_handle(x) + end +end + + function uv_sizeof_handle(handle) if !(UV_UNKNOWN_HANDLE < handle < UV_HANDLE_TYPE_MAX) throw(DomainError()) @@ -51,9 +85,9 @@ for r in uv_req_types @eval const $(symbol("_sizeof_"*lowercase(string(r)))) = uv_sizeof_req($r) end -nb_available(s::AsyncStream) = nb_available(s.buffer) +nb_available(s::LibuvStream) = nb_available(s.buffer) -function eof(s::AsyncStream) +function eof(s::LibuvStream) wait_readnb(s,1) !isopen(s) && nb_available(s)<=0 end @@ -102,7 +136,7 @@ uv_req_data(handle) = ccall(:jl_uv_req_data,Ptr{Void},(Ptr{Void},),handle) uv_req_set_data(req,data) = ccall(:jl_uv_req_set_data,Void,(Ptr{Void},Any),req,data) uv_req_set_data(req,data::Ptr{Void}) = ccall(:jl_uv_req_set_data,Void,(Ptr{Void},Ptr{Void}),req,data) -type PipeEndpoint <: AsyncStream +type PipeEndpoint <: LibuvStream handle::Ptr{Void} status::Int buffer::IOBuffer @@ -129,10 +163,7 @@ type PipeEndpoint <: AsyncStream DEFAULT_READ_BUFFER_SZ) end -lock(p::PipeEndpoint) = lock(p.lock) -unlock(p::PipeEndpoint) = unlock(p.lock) - -type PipeServer <: UVServer +type PipeServer <: LibuvServer handle::Ptr{Void} status::Int ccb::Callback @@ -146,20 +177,7 @@ type PipeServer <: UVServer false,Condition()) end -function init_pipe!(pipe::Union{PipeEndpoint,PipeServer}; - readable::Bool = false, - writable::Bool = false, - julia_only::Bool = true) - if pipe.status != StatusUninit - error("pipe is already initialized") - end - if pipe.handle == C_NULL - malloc_julia_pipe!(pipe) - end - uv_error("init_pipe",ccall(:jl_init_pipe, Cint, (Ptr{Void},Int32,Int32,Int32), pipe.handle, writable,readable,julia_only)) - pipe.status = StatusInit - pipe -end +typealias LibuvPipe Union{PipeEndpoint, PipeServer} function PipeServer() handle = Libc.malloc(_sizeof_uv_named_pipe) @@ -174,11 +192,7 @@ function PipeServer() end end -show(io::IO,stream::PipeEndpoint) = print(io,"PipeEndpoint(",uv_status_string(stream),", ", - nb_available(stream.buffer)," bytes waiting)") -show(io::IO,stream::PipeServer) = print(io,"PipeServer(",uv_status_string(stream),")") - -type TTY <: AsyncStream +type TTY <: LibuvStream handle::Ptr{Void} status::Int line_buffered::Bool @@ -219,55 +233,44 @@ function TTY(fd::RawFD; readable::Bool = false) ret end -lock(t::TTY) = lock(t.lock) -unlock(t::TTY) = unlock(t.lock) +show(io::IO,stream::LibuvServer) = print(io, typeof(stream), "(", uv_status_string(stream), ")") +show(io::IO, stream::LibuvStream) = print(io, typeof(stream), "(", uv_status_string(stream), ", ", + nb_available(stream.buffer)," bytes waiting)") -# note that uv_is_readable/writable work for any subtype of -# uv_stream_t, including uv_tty_t and uv_pipe_t -function isreadable(io::Union{PipeEndpoint,TTY}) +# Shared LibuvStream object interface + +function isreadable(io::LibuvStream) nb_available(io) > 0 && return true isopen(io) || return false return ccall(:uv_is_readable, Cint, (Ptr{Void},), io.handle) != 0 end -function iswritable(io::Union{PipeEndpoint,TTY}) + +function iswritable(io::LibuvStream) isopen(io) || return false io.status == StatusClosing && return false return ccall(:uv_is_writable, Cint, (Ptr{Void},), io.handle) != 0 end -nb_available(stream::AsyncStream) = nb_available(stream.buffer) - -show(io::IO,stream::TTY) = print(io,"TTY(",uv_status_string(stream),", ", - nb_available(stream.buffer)," bytes waiting)") - -function println(io::AsyncStream, xs...) - lock(io) - try - invoke(println, Tuple{IO, map(typeof,xs)...}, io, xs...) - finally - unlock(io) - end -end - +nb_available(stream::LibuvStream) = nb_available(stream.buffer) -uvtype(::AsyncStream) = UV_STREAM -uvhandle(stream::AsyncStream) = stream.handle +lock(s::LibuvStream) = lock(s.lock) +unlock(s::LibuvStream) = unlock(s.lock) -convert(T::Type{Ptr{Void}}, s::AsyncStream) = convert(T, s.handle) -handle(s::AsyncStream) = s.handle -handle(s::Ptr{Void}) = s +uvtype(::LibuvStream) = UV_STREAM +uvhandle(stream::LibuvStream) = stream.handle +unsafe_convert(::Type{Ptr{Void}}, s::Union{LibuvStream, LibuvServer}) = s.handle -associate_julia_struct(handle::Ptr{Void},jlobj::ANY) = - ccall(:jl_uv_associate_julia_struct,Void,(Ptr{Void},Any),handle,jlobj) +associate_julia_struct(handle::Ptr{Void}, jlobj::ANY) = + ccall(:jl_uv_associate_julia_struct, Void, (Ptr{Void}, Any), handle, jlobj) disassociate_julia_struct(uv) = disassociate_julia_struct(uv.handle) disassociate_julia_struct(handle::Ptr{Void}) = - handle != C_NULL && ccall(:jl_uv_disassociate_julia_struct,Void,(Ptr{Void},),handle) + handle != C_NULL && ccall(:jl_uv_disassociate_julia_struct, Void, (Ptr{Void},), handle) -function init_stdio(handle) - t = ccall(:jl_uv_handle_type,Int32,(Ptr{Void},),handle) +function init_stdio(handle::Ptr{Void}) + t = ccall(:jl_uv_handle_type, Int32, (Ptr{Void},), handle) if t == UV_FILE - return fdio(ccall(:jl_uv_file_handle,Int32,(Ptr{Void},),handle)) -# Replace ios.c filw with libuv file? + return fdio(ccall(:jl_uv_file_handle, Int32, (Ptr{Void},), handle)) +# Replace ios.c file with libuv file? # return File(RawFD(ccall(:jl_uv_file_handle,Int32,(Ptr{Void},),handle))) else if t == UV_TTY @@ -281,21 +284,12 @@ function init_stdio(handle) end ret.status = StatusOpen ret.line_buffered = false - associate_julia_struct(ret.handle,ret) - finalizer(ret,uvfinalize) + associate_julia_struct(ret.handle, ret) + finalizer(ret, uvfinalize) return ret end end -function stream_wait(x, c...) # for x::LibuvObject - preserve_handle(x) - try - return wait(c...) - finally - unpreserve_handle(x) - end -end - function reinit_stdio() global uv_jl_asynccb = cfunction(uv_asynccb, Void, (Ptr{Void},)) global uv_jl_timercb = cfunction(uv_timercb, Void, (Ptr{Void},)) @@ -318,20 +312,20 @@ function reinit_stdio() global STDERR = init_stdio(ccall(:jl_stderr_stream,Ptr{Void},())) end -function isopen(x::Union{AsyncStream,UVServer}) +function isopen(x::Union{LibuvStream, LibuvServer}) if x.status == StatusUninit || x.status == StatusInit throw(ArgumentError("$x is not initialized")) end x.status != StatusClosed && x.status != StatusEOF end -function check_open(x::Union{AsyncStream,UVServer}) +function check_open(x::Union{LibuvStream, LibuvServer}) if !isopen(x) || x.status == StatusClosing throw(ArgumentError("stream is closed or unusable")) end end -function wait_connected(x) +function wait_connected(x::Union{LibuvStream, LibuvServer}) check_open(x) while x.status == StatusConnecting stream_wait(x, x.connectnotify) @@ -339,10 +333,10 @@ function wait_connected(x) end end -function wait_readbyte(x::AsyncStream, c::UInt8) +function wait_readbyte(x::LibuvStream, c::UInt8) preserve_handle(x) try - while isopen(x) && search(x.buffer,c) <= 0 + while isopen(x) && search(x.buffer, c) <= 0 start_reading(x) # ensure we are reading wait(x.readnotify) end @@ -354,7 +348,7 @@ function wait_readbyte(x::AsyncStream, c::UInt8) end end -function wait_readnb(x::AsyncStream, nb::Int) +function wait_readnb(x::LibuvStream, nb::Int) oldthrottle = x.throttle preserve_handle(x) try @@ -374,16 +368,26 @@ function wait_readnb(x::AsyncStream, nb::Int) end end -function wait_close(x::AsyncStream) +function wait_close(x::Union{LibuvStream, LibuvServer}) if isopen(x) stream_wait(x, x.closenotify) end end +function close(stream::Union{LibuvStream, LibuvServer}) + if isopen(stream) && stream.status != StatusClosing + ccall(:jl_close_uv,Void, (Ptr{Void},), stream.handle) + stream.status = StatusClosing + end + nothing +end + +### Libuv callbacks ### + #from `connect` function uv_connectcb(conn::Ptr{Void}, status::Cint) hand = ccall(:jl_uv_connect_handle, Ptr{Void}, (Ptr{Void},), conn) - sock = @handle_as hand AsyncStream + sock = @handle_as hand LibuvStream @assert sock.status == StatusConnecting if status >= 0 sock.status = StatusOpen @@ -402,16 +406,16 @@ end # from `listen` function uv_connectioncb(stream::Ptr{Void}, status::Cint) - sock = @handle_as stream UVServer + sock = @handle_as stream LibuvServer if status >= 0 err = nothing else err = UVError("connection",status) end - if isa(sock.ccb,Function) - sock.ccb(sock,status) + if isa(sock.ccb, Function) + sock.ccb(sock, status) end - err===nothing ? notify(sock.connectnotify) : notify_error(sock.connectnotify, err) + err === nothing ? notify(sock.connectnotify) : notify_error(sock.connectnotify, err) end ## BUFFER ## @@ -419,7 +423,7 @@ end function alloc_request(buffer::IOBuffer, recommended_size::UInt) ensureroom(buffer, Int(recommended_size)) ptr = buffer.append ? buffer.size + 1 : buffer.ptr - return (pointer(buffer.data, ptr), length(buffer.data)-ptr+1) + return (pointer(buffer.data, ptr), length(buffer.data) - ptr + 1) end function uv_alloc_buf(handle::Ptr{Void}, size::Csize_t, buf::Ptr{Void}) @@ -428,9 +432,9 @@ function uv_alloc_buf(handle::Ptr{Void}, size::Csize_t, buf::Ptr{Void}) ccall(:jl_uv_buf_set_len, Void, (Ptr{Void}, Csize_t), buf, 0) return nothing end - stream = unsafe_pointer_to_objref(hd)::AsyncStream + stream = unsafe_pointer_to_objref(hd)::LibuvStream - (data,newsize) = alloc_buf_hook(stream, UInt(size)) + (data, newsize) = alloc_buf_hook(stream, UInt(size)) ccall(:jl_uv_buf_set_base, Void, (Ptr{Void}, Ptr{Void}), buf, data) ccall(:jl_uv_buf_set_len, Void, (Ptr{Void}, Csize_t), buf, newsize) @@ -438,7 +442,7 @@ function uv_alloc_buf(handle::Ptr{Void}, size::Csize_t, buf::Ptr{Void}) nothing end -alloc_buf_hook(stream::AsyncStream, size::UInt) = alloc_request(stream.buffer, UInt(size)) +alloc_buf_hook(stream::LibuvStream, size::UInt) = alloc_request(stream.buffer, UInt(size)) function notify_filled(buffer::IOBuffer, nread::Int, base::Ptr{Void}, len::UInt) if buffer.append @@ -447,7 +451,8 @@ function notify_filled(buffer::IOBuffer, nread::Int, base::Ptr{Void}, len::UInt) buffer.ptr += nread end end -function notify_filled(stream::AsyncStream, nread::Int) + +function notify_filled(stream::LibuvStream, nread::Int) more = true while more if isa(stream.readcb,Function) @@ -464,7 +469,7 @@ function notify_filled(stream::AsyncStream, nread::Int) end function uv_readcb(handle::Ptr{Void}, nread::Cssize_t, buf::Ptr{Void}) - stream = @handle_as handle AsyncStream + stream = @handle_as handle LibuvStream nread = Int(nread) base = ccall(:jl_uv_buf_base, Ptr{Void}, (Ptr{Void},), buf) len = UInt(ccall(:jl_uv_buf_len, Csize_t, (Ptr{Void},), buf)) @@ -474,7 +479,7 @@ function uv_readcb(handle::Ptr{Void}, nread::Cssize_t, buf::Ptr{Void}) # remind the client that stream.buffer is full notify(stream.readnotify) elseif nread == UV_EOF - if isa(stream,TTY) + if isa(stream, TTY) stream.status = StatusEOF # libuv called stop_reading already notify(stream.readnotify) notify(stream.closenotify) @@ -484,7 +489,7 @@ function uv_readcb(handle::Ptr{Void}, nread::Cssize_t, buf::Ptr{Void}) else # This is a fatal connection error. Shutdown requests as per the usual # close function won't work and libuv will fail with an assertion failure - ccall(:jl_forceclose_uv,Void,(Ptr{Void},),stream.handle) + ccall(:jl_forceclose_uv, Void, (Ptr{Void},), stream) notify_error(stream.readnotify, UVError("readcb",nread)) end else @@ -503,7 +508,6 @@ function uv_readcb(handle::Ptr{Void}, nread::Cssize_t, buf::Ptr{Void}) nothing end -reseteof(x::IO) = nothing function reseteof(x::TTY) if x.status == StatusEOF x.status = StatusOpen @@ -511,7 +515,7 @@ function reseteof(x::TTY) nothing end -function _uv_hook_close(uv::Union{AsyncStream,UVServer}) +function _uv_hook_close(uv::Union{LibuvStream, LibuvServer}) uv.handle = C_NULL uv.status = StatusClosed if isa(uv.closecb, Function) @@ -526,10 +530,10 @@ end ########################################## # Pipe Abstraction -# (composed of two half-pipes) +# (composed of two half-pipes: .in and .out) ########################################## -abstract AbstractPipe <: AsyncStream +abstract AbstractPipe <: IO # allows sharing implementation with Process and ProcessChain type Pipe <: AbstractPipe @@ -549,6 +553,7 @@ show(io::IO,stream::Pipe) = print(io, uv_status_string(stream.in), " => ", uv_status_string(stream.out), ", ", nb_available(stream), " bytes waiting)") + write(io::AbstractPipe, byte::UInt8) = write(io.in, byte) write(io::AbstractPipe, bytes::Vector{UInt8}) = write(io.in, bytes) write{T<:AbstractPipe}(io::T, args...) = write(io.in, args...) @@ -711,8 +716,26 @@ function process_events(block::Bool) end end -## pipe functions ## -function malloc_julia_pipe!(x) +## Functions for PipeEndpoint and PipeServer ## + +function init_pipe!(pipe::LibuvPipe; + readable::Bool = false, + writable::Bool = false, + julia_only::Bool = true) + if pipe.status != StatusUninit + error("pipe is already initialized") + end + if pipe.handle == C_NULL + malloc_julia_pipe!(pipe) + end + uv_error("init_pipe",ccall(:jl_init_pipe, Cint, + (Ptr{Void}, Int32, Int32, Int32), + pipe.handle, writable, readable, julia_only)) + pipe.status = StatusInit + pipe +end + +function malloc_julia_pipe!(x::LibuvPipe) assert(x.handle == C_NULL) x.handle = Libc.malloc(_sizeof_uv_named_pipe) associate_julia_struct(x.handle, x) @@ -726,7 +749,7 @@ end function link_pipe(read_end::Ptr{Void}, readable_julia_only::Bool, write_end::Ptr{Void}, writable_julia_only::Bool, - readpipe::AsyncStream, writepipe::AsyncStream) + readpipe::PipeEndpoint, writepipe::PipeEndpoint) #make the pipe an unbuffered stream for now #TODO: this is probably not freeing memory properly after errors uv_error("init_pipe(read)", @@ -794,26 +817,20 @@ function close_pipe_sync(p::PipeEndpoint) p.status = StatusClosed nothing end -function close_pipe_sync(handle::UVHandle) - ccall(:uv_pipe_close_sync, Void, (UVHandle,), handle) -end -function close(stream::Union{AsyncStream, UVServer}) - if isopen(stream) && stream.status != StatusClosing - ccall(:jl_close_uv,Void, (Ptr{Void},), stream.handle) - stream.status = StatusClosing - end - nothing +function close_pipe_sync(handle::Ptr{Void}) + ccall(:uv_pipe_close_sync, Void, (Ptr{Void},), handle) end -## stream functions ## -function start_reading(stream::AsyncStream) +## Functions for any LibuvStream ## + +function start_reading(stream::LibuvStream) if stream.status == StatusOpen if !isreadable(stream) error("tried to read a stream that is not readable") end - ret = ccall(:uv_read_start,Cint,(Ptr{Void},Ptr{Void},Ptr{Void}), - handle(stream),uv_jl_alloc_buf::Ptr{Void},uv_jl_readcb::Ptr{Void}) + ret = ccall(:uv_read_start, Cint, (Ptr{Void}, Ptr{Void}, Ptr{Void}), + stream, uv_jl_alloc_buf::Ptr{Void}, uv_jl_readcb::Ptr{Void}) stream.status = StatusActive ret elseif stream.status == StatusActive @@ -822,7 +839,8 @@ function start_reading(stream::AsyncStream) Int32(-1) end end -function start_reading(stream::AsyncStream, cb::Function) + +function start_reading(stream::LibuvStream, cb::Function) failure = start_reading(stream) stream.readcb = cb nread = nb_available(stream.buffer) @@ -831,15 +849,16 @@ function start_reading(stream::AsyncStream, cb::Function) end return failure_code end -function start_reading(stream::AsyncStream, cb::Bool) + +function start_reading(stream::LibuvStream, cb::Bool) failure_code = start_reading(stream) stream.readcb = cb return failure_code end -function stop_reading(stream::AsyncStream) +function stop_reading(stream::LibuvStream) if stream.status == StatusActive - ret = ccall(:uv_read_stop,Cint,(Ptr{Void},),stream.handle) + ret = ccall(:uv_read_stop, Cint, (Ptr{Void},), stream) stream.status = StatusOpen ret elseif stream.status == StatusOpen @@ -849,20 +868,12 @@ function stop_reading(stream::AsyncStream) end end -function readbytes(stream::AsyncStream) +function readbytes(stream::LibuvStream) wait_readnb(stream, typemax(Int)) return takebuf_array(stream.buffer) end -function read!{T}(s::AsyncStream, a::Array{T}) - isbits(T) || throw(ArgumentError("read from AsyncStream only supports bits types or arrays of bits types")) - nb = length(a) * sizeof(T) - read!(s, reshape(reinterpret(UInt8, a), nb)) - return a -end - -const SZ_UNBUFFERED_IO=65536 -function read!(s::AsyncStream, a::Vector{UInt8}) +function read!(s::LibuvStream, a::Array{UInt8, 1}) nb = length(a) sbuf = s.buffer @assert sbuf.seekable == false @@ -893,66 +904,44 @@ function read!(s::AsyncStream, a::Vector{UInt8}) return a end -function read{T}(s::AsyncStream, ::Type{T}, dims::Dims) - isbits(T) || throw(ArgumentError("read from AsyncStream only supports bits types or arrays of bits types")) - nb = prod(dims)*sizeof(T) - a = read!(s, Array(UInt8, nb)) - reshape(reinterpret(T, a), dims) -end - -function read(this::AsyncStream,::Type{UInt8}) +function read(this::LibuvStream, ::Type{UInt8}) + wait_readnb(this, 1) buf = this.buffer @assert buf.seekable == false - wait_readnb(this, 1) read(buf, UInt8) end -readline(this::AsyncStream) = readuntil(this, '\n') - -readline() = readline(STDIN) - -function readavailable(this::AsyncStream) +function readavailable(this::LibuvStream) + wait_readnb(this, 1) buf = this.buffer @assert buf.seekable == false - wait_readnb(this, 1) takebuf_array(buf) end -function readuntil(this::AsyncStream, c::UInt8) +function readuntil(this::LibuvStream, c::UInt8) + wait_readbyte(this, c) buf = this.buffer @assert buf.seekable == false - wait_readbyte(this, c) readuntil(buf, c) end -#function finish_read(pipe::PipeEndpoint) -# close(pipe) #handles to UV and ios will be invalid after this point -#end -# -#function finish_read(state::(PipeEndpoint,ByteString)) -# finish_read(state...) -#end - -function uv_write(s::AsyncStream, p, n::Integer) +uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(length(p))) +function uv_write(s::LibuvStream, p::Ptr, n::UInt) check_open(s) uvw = Libc.malloc(_sizeof_uv_write) - try - uv_req_set_data(uvw,C_NULL) - err = ccall(:jl_uv_write, - Int32, - (Ptr{Void}, Ptr{Void}, UInt, Ptr{Void}, Ptr{Void}), - handle(s), p, n, uvw, - uv_jl_writecb_task::Ptr{Void}) - if err < 0 - uv_error("write", err) - end - ct = current_task() - uv_req_set_data(uvw,ct) - ct.state = :waiting - stream_wait(ct) - finally - Libc.free(uvw) + uv_req_set_data(uvw,C_NULL) + err = ccall(:jl_uv_write, + Int32, + (Ptr{Void}, Ptr{Void}, UInt, Ptr{Void}, Ptr{Void}), + s, p, n, uvw, + uv_jl_writecb_task::Ptr{Void}) + if err < 0 + uv_error("write", err) end + ct = current_task() + uv_req_set_data(uvw,ct) + ct.state = :waiting + stream_wait(ct) return Int(n) end @@ -960,13 +949,12 @@ end # - smaller writes are buffered, final uv write on flush or when buffer full # - large isbits arrays are unbuffered and written directly -function buffer_or_write(s::AsyncStream, p::Ptr, n::Integer) +function buffer_or_write(s::LibuvStream, p::Ptr, n::Integer) if isnull(s.sendbuf) - return uv_write(s, p, n) - else - buf = get(s.sendbuf) + return uv_write(s, p, UInt(n)) end + buf = get(s.sendbuf) totb = nb_available(buf) + n if totb < buf.maxsize nb = write(buf, p, n) @@ -981,39 +969,37 @@ function buffer_or_write(s::AsyncStream, p::Ptr, n::Integer) return nb end -function flush(s::AsyncStream) +function flush(s::LibuvStream) if isnull(s.sendbuf) return s end buf = get(s.sendbuf) if nb_available(buf) > 0 arr = takebuf_array(buf) # Array of UInt8s - uv_write(s, arr, length(arr)) + uv_write(s, arr) end s end -buffer_writes(s::AsyncStream, bufsize=SZ_UNBUFFERED_IO) = (s.sendbuf=PipeBuffer(bufsize); s) +buffer_writes(s::LibuvStream, bufsize) = (s.sendbuf=PipeBuffer(bufsize); s) -## low-level calls ## +## low-level calls to libuv ## -write(s::AsyncStream, b::UInt8) = write(s, [b]) -write(s::AsyncStream, c::Char) = write(s, string(c)) -function write{T}(s::AsyncStream, a::Array{T}) +write(s::LibuvStream, b::UInt8) = write(s, [b]) +write(s::LibuvStream, c::Char) = write(s, string(c)) +function write{T}(s::LibuvStream, a::Array{T}) if isbits(T) - n = UInt(length(a)*sizeof(T)) - return buffer_or_write(s, pointer(a), n); + n = UInt(length(a) * sizeof(T)) + return buffer_or_write(s, pointer(a), n) else check_open(s) - invoke(write, Tuple{IO, Array},s,a) + invoke(write, Tuple{IO, typeof(a)}, s, a) end end -write(s::AsyncStream, p::Ptr, n::Integer) = buffer_or_write(s, p, n) +write(s::LibuvStream, p::Ptr, n::Integer) = buffer_or_write(s, p, n) function uv_writecb_task(req::Ptr{Void}, status::Cint) - #handle = ccall(:jl_uv_write_handle, Ptr{Void}, (Ptr{Void},), req) - #s = @handle_as handle AsyncStream d = uv_req_data(req) @assert d != C_NULL if status < 0 @@ -1022,10 +1008,12 @@ function uv_writecb_task(req::Ptr{Void}, status::Cint) else schedule(unsafe_pointer_to_objref(d)::Task) end + Libc.free(req) nothing end ## Libuv error handling ## + type UVError <: Exception prefix::AbstractString code::Int32 @@ -1059,7 +1047,7 @@ function accept_nonblock(server::PipeServer) client end -function accept(server::UVServer, client::AsyncStream) +function accept(server::LibuvServer, client::LibuvStream) if server.status != StatusActive throw(ArgumentError("server not connected, make sure \"listen\" has been called")) end @@ -1077,10 +1065,10 @@ end const BACKLOG_DEFAULT = 511 -function _listen(sock::UVServer; backlog::Integer=BACKLOG_DEFAULT) +function _listen(sock::LibuvServer; backlog::Integer=BACKLOG_DEFAULT) check_open(sock) err = ccall(:uv_listen, Cint, (Ptr{Void}, Cint, Ptr{Void}), - sock.handle, backlog, uv_jl_connectioncb::Ptr{Void}) + sock, backlog, uv_jl_connectioncb::Ptr{Void}) sock.status = StatusActive err end @@ -1088,7 +1076,7 @@ end function bind(server::PipeServer, name::AbstractString) @assert server.status == StatusInit err = ccall(:uv_pipe_bind, Int32, (Ptr{Void}, Cstring), - server.handle, name) + server, name) if err != 0 if err != UV_EADDRINUSE && err != UV_EACCES #TODO: this codepath is currently not tested @@ -1118,8 +1106,8 @@ function connect!(sock::PipeEndpoint, path::AbstractString) sock end -function connect(sock::AsyncStream, args...) - connect!(sock,args...) +function connect(sock::LibuvStream, args...) + connect!(sock, args...) wait_connected(sock) sock end @@ -1129,8 +1117,8 @@ end connect(path::AbstractString) = connect(init_pipe!(PipeEndpoint(); readable=false, writable=false, julia_only=true),path) _fd(x::IOStream) = RawFD(fd(x)) -@unix_only _fd(x::AsyncStream) = RawFD(ccall(:jl_uv_handle,Int32,(Ptr{Void},),x.handle)) -@windows_only _fd(x::AsyncStream) = WindowsRawSocket( +@unix_only _fd(x::LibuvStream) = RawFD(ccall(:jl_uv_handle,Int32,(Ptr{Void},),x.handle)) +@windows_only _fd(x::LibuvStream) = WindowsRawSocket( ccall(:jl_uv_handle,Ptr{Void},(Ptr{Void},),x.handle)) for (x,writable,unix_fd,c_symbol) in ((:STDIN,false,0,:jl_uv_stdin),(:STDOUT,true,1,:jl_uv_stdout),(:STDERR,true,2,:jl_uv_stderr)) @@ -1141,11 +1129,11 @@ for (x,writable,unix_fd,c_symbol) in ((:STDIN,false,0,:jl_uv_stdin),(:STDOUT,tru global $x @windows? ( ccall(:SetStdHandle,stdcall,Int32,(Int32,Ptr{Void}), - $(-10-unix_fd), Libc._get_osfhandle(_fd(stream)).handle) : + $(-10-unix_fd), Libc._get_osfhandle(_fd(stream)).handle) ) : ( dup(_fd(stream), RawFD($unix_fd)) ) $x = stream end - function ($f)(handle::Union{AsyncStream,IOStream}) + function ($f)(handle::Union{LibuvStream,IOStream}) $(_f)(handle) unsafe_store!(cglobal($(Expr(:quote,c_symbol)),Ptr{Void}), handle.handle) @@ -1160,13 +1148,13 @@ for (x,writable,unix_fd,c_symbol) in ((:STDIN,false,0,:jl_uv_stdin),(:STDOUT,tru end end -mark(x::AsyncStream) = mark(x.buffer) -unmark(x::AsyncStream) = unmark(x.buffer) -reset(x::AsyncStream) = reset(x.buffer) -ismarked(x::AsyncStream) = ismarked(x.buffer) +mark(x::LibuvStream) = mark(x.buffer) +unmark(x::LibuvStream) = unmark(x.buffer) +reset(x::LibuvStream) = reset(x.buffer) +ismarked(x::LibuvStream) = ismarked(x.buffer) # BufferStream's are non-OS streams, backed by a regular IOBuffer -type BufferStream <: AsyncStream +type BufferStream <: LibuvStream buffer::IOBuffer r_c::Condition close_c::Condition @@ -1177,10 +1165,14 @@ type BufferStream <: AsyncStream BufferStream() = new(PipeBuffer(), Condition(), Condition(), true, false, ReentrantLock()) end -lock(s::BufferStream) = lock(s.lock) -unlock(s::BufferStream) = unlock(s.unlock) isopen(s::BufferStream) = s.is_open close(s::BufferStream) = (s.is_open = false; notify(s.r_c; all=true); notify(s.close_c; all=true); nothing) +read(s::BufferStream, ::Type{UInt8}) = (wait_readnb(s, 1); read(s.buffer, UInt8)) +read!(s::BufferStream, a::Vector{UInt8}) = (wait_readnb(s, length(a)); read!(s.buffer, a)) +nb_available(s::BufferStream) = nb_available(s.buffer) + +isreadable(s::BufferStream) = s.buffer.readable +iswritable(s::BufferStream) = s.buffer.writable function wait_readnb(s::BufferStream, nb::Int) while isopen(s) && nb_available(s.buffer) < nb @@ -1213,6 +1205,11 @@ function write(s::BufferStream, p::Ptr, nb::Integer) rv end +function eof(s::LibuvStream) + wait_readnb(s,1) + !isopen(s) && nb_available(s)<=0 +end + # If buffer_writes is called, it will delay notifying waiters till a flush is called. buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes=true; s) flush(s::BufferStream) = (notify(s.r_c; all=true); s) diff --git a/base/strings/io.jl b/base/strings/io.jl index 6bef21cbaa6db..e78cb5ddcd6ce 100644 --- a/base/strings/io.jl +++ b/base/strings/io.jl @@ -2,8 +2,25 @@ ## core text I/O ## -print(io::IO, x) = show(io, x) -print(io::IO, xs...) = for x in xs print(io, x) end +function print(io::IO, x) + lock(io) + try + show(io, x) + finally + unlock(io) + end +end + +function print(io::IO, xs...) + lock(io) + try + for x in xs + print(io, x) + end + finally + unlock(io) + end +end println(io::IO, xs...) = print(io, xs..., '\n') diff --git a/test/spawn.jl b/test/spawn.jl index 1856df40147d1..a0a4083bda9ae 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -249,13 +249,49 @@ let bad = "bad\0name" end # issue #12829 -let out = Pipe() +let out = Pipe(), echo = `$exename -f -e 'print(STDOUT, " 1\t", readall(STDIN))'`, ready = Condition() @test_throws ArgumentError write(out, "not open error") - open(`cat`, "w", out) do io - println(io, 1) + @async begin # spawn writer task + open(echo, "w", out) do in1 + open(echo, "w", out) do in2 + notify(ready) + write(in1, 'h') + write(in2, UInt8['w']) + println(in1, "ello") + write(in2, "orld\n") + end + end + show(out, out) + notify(ready) + @test isreadable(out) + @test iswritable(out) + close(out.in) + @test_throws ArgumentError write(out, "now closed error") + @test isreadable(out) + @test !iswritable(out) + @test isopen(out) end - close(out.in) - @test readline(out) == "1\n" + wait(ready) # wait for writer task to be ready before using `out` + @test nb_available(out) == 0 + @test endswith(readuntil(out, '1'), '1') + @test read(out, UInt8) == '\t' + c = UInt8[0] + @test c == read!(out, c) + Base.wait_readnb(out, 1) + @test nb_available(out) > 0 + ln1 = readline(out) + ln2 = readline(out) + desc = readall(out) + @test !isreadable(out) + @test !iswritable(out) + @test !isopen(out) + @test nb_available(out) == 0 + @test c == ['w'] + @test lstrip(ln2) == "1\thello\n" + @test ln1 == "orld\n" + @test isempty(readbytes(out)) + @test eof(out) + @test desc == "Pipe(open => active, 0 bytes waiting)" end # issue #8529