From 1f953d6e59ca7be4e34633bc1ce8f89b848dcf77 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Fri, 8 Dec 2023 20:02:43 +0000 Subject: [PATCH] spawn: permit using IOBuffer at stdout People expect to use this (the docs even almost even suggested it at some point), so it is better to make it work as expected (and better than they can emulate) than to criticize their choices. Also fix a few regressions and handling mistakes in setup_stdios: - #44500 tried to store a Redirectable into a SpawnIO, dropping FileRedirect - CmdRedirect did not allocate a ProcessChain, so it wouldd call setup_stdio then call setup_stdios on the result of that, which is strongly discouraged as setup_stdio(s) should only be called once - BufferStream was missing `check_open` calls before writing, and ignored `Base.reseteof` as a possible means of resuming writing after `closewrite` sends a shutdown message. Currently this fix is disabled because Pkg seems like a bit of a disaster with IO mismanagement. - Add `closewrite` to more methods, and document it. Fixes #39311 Fixes #49234 Closes #49233 Closes #46768 --- NEWS.md | 8 +++ base/coreio.jl | 1 + base/exports.jl | 10 ++++ base/filesystem.jl | 2 + base/io.jl | 12 ++++ base/iobuffer.jl | 1 - base/iostream.jl | 2 + base/process.jl | 131 ++++++++++++++++++++++++++++---------------- base/stream.jl | 17 +++++- src/support/ios.c | 12 +++- test/cmdlineargs.jl | 1 + test/spawn.jl | 3 +- 12 files changed, 146 insertions(+), 54 deletions(-) diff --git a/NEWS.md b/NEWS.md index 0dd4d2f53bc01c..3d3b2d0fef1d13 100644 --- a/NEWS.md +++ b/NEWS.md @@ -72,6 +72,14 @@ New library features write the output to a stream rather than returning a string ([#48625]). * `sizehint!(s, n)` now supports an optional `shrink` argument to disable shrinking ([#51929]). * New function `Docs.hasdoc(module, symbol)` tells whether a name has a docstring ([#52139]). +* Passing an IOBuffer as a stdout argument for Process spawn now works as + expected, synchronized with `wait` or `success`, so a `Base.BufferStream` is + no longer required there for correctness to avoid data-races ([#TBD]). +* After a process exits, `closewrite` will no longer be automatically called on + the stream passed to it. Call `wait` on the process instead to ensure the + content is fully written, then call `closewrite` manually to avoid + data-races. Or use the callback form of `open` to have all that handled + automatically. Standard library changes ------------------------ diff --git a/base/coreio.jl b/base/coreio.jl index 3e508c64a0a64d..7fc608111d5f2f 100644 --- a/base/coreio.jl +++ b/base/coreio.jl @@ -11,6 +11,7 @@ struct DevNull <: IO end const devnull = DevNull() write(::DevNull, ::UInt8) = 1 unsafe_write(::DevNull, ::Ptr{UInt8}, n::UInt)::Int = n +closewrite(::DevNull) = nothing close(::DevNull) = nothing wait_close(::DevNull) = wait() bytesavailable(io::DevNull) = 0 diff --git a/base/exports.jl b/base/exports.jl index 398d828f9cf199..4ec35d1d2b3abd 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1155,6 +1155,16 @@ public @locals, @propagate_inbounds, +# IO + # types + BufferStream, + IOServer, + OS_HANDLE, + PipeEndpoint, + TTY, + # functions + reseteof, + # misc notnothing, runtests, diff --git a/base/filesystem.jl b/base/filesystem.jl index d291dd3b316309..d9760ec08a8a95 100644 --- a/base/filesystem.jl +++ b/base/filesystem.jl @@ -143,6 +143,8 @@ function close(f::File) nothing end +closewrite(f::File) = nothing + # sendfile is the most efficient way to copy from a file descriptor function sendfile(dst::File, src::File, src_offset::Int64, bytes::Int) check_open(dst) diff --git a/base/io.jl b/base/io.jl index a2e68882bbc863..fd286297ec0902 100644 --- a/base/io.jl +++ b/base/io.jl @@ -25,6 +25,14 @@ end lock(::IO) = nothing unlock(::IO) = nothing + +""" + reseteof(io) + +Clear the EOF flag from IO so that further reads (and possibly writes) are +again allowed. Note that it may immediately get re-set, if the underlying +stream object is at EOF and cannot be resumed. +""" reseteof(x::IO) = nothing const SZ_UNBUFFERED_IO = 65536 @@ -68,6 +76,10 @@ Shutdown the write half of a full-duplex I/O stream. Performs a [`flush`](@ref) first. Notify the other end that no more data will be written to the underlying file. This is not supported by all IO types. +If implemented, `closewrite` causes subsequent `read` or `eof` calls that would +block to instead throw EOF or return true, respectively. If the stream is +already closed, this is idempotent. + # Examples ```jldoctest julia> io = Base.BufferStream(); # this never blocks, so we can read and write on the same Task diff --git a/base/iobuffer.jl b/base/iobuffer.jl index deb86e774f4e47..121d303973ff07 100644 --- a/base/iobuffer.jl +++ b/base/iobuffer.jl @@ -349,7 +349,6 @@ eof(io::GenericIOBuffer) = (io.ptr-1 == io.size) function closewrite(io::GenericIOBuffer) io.writable = false - # OR throw(_UVError("closewrite", UV_ENOTSOCK)) nothing end diff --git a/base/iostream.jl b/base/iostream.jl index b048d8a5f1d01d..ba422cd692fcd8 100644 --- a/base/iostream.jl +++ b/base/iostream.jl @@ -63,6 +63,8 @@ function close(s::IOStream) systemerror("close", bad) end +closewrite(s::IOStream) = nothing + function flush(s::IOStream) sigatomic_begin() bad = @_lock_ios s ccall(:ios_flush, Cint, (Ptr{Cvoid},), s.ios) != 0 diff --git a/base/process.jl b/base/process.jl index ed51a30ae3cedd..24e676138bfae6 100644 --- a/base/process.jl +++ b/base/process.jl @@ -6,11 +6,12 @@ mutable struct Process <: AbstractPipe in::IO out::IO err::IO + syncd::Vector{Task} exitcode::Int64 termsignal::Int32 exitnotify::ThreadSynchronizer - function Process(cmd::Cmd, handle::Ptr{Cvoid}) - this = new(cmd, handle, devnull, devnull, devnull, + function Process(cmd::Cmd, handle::Ptr{Cvoid}, syncd::Vector{Task}) + this = new(cmd, handle, devnull, devnull, devnull, syncd, typemin(fieldtype(Process, :exitcode)), typemin(fieldtype(Process, :termsignal)), ThreadSynchronizer()) @@ -35,6 +36,15 @@ end pipe_reader(p::ProcessChain) = p.out pipe_writer(p::ProcessChain) = p.in +# a lightweight pair of a child OS_HANDLE and associated Task that will +# complete only after all content has been read from it for synchronizing +# state without the kernel to aide +struct SyncCloseFD + fd + t::Task +end +rawhandle(io::SyncCloseFD) = rawhandle(io.fd) + # release ownership of the libuv handle function uvfinalize(proc::Process) if proc.handle != C_NULL @@ -74,8 +84,8 @@ function _uv_hook_close(proc::Process) nothing end -const SpawnIO = Union{IO, RawFD, OS_HANDLE} -const SpawnIOs = Vector{SpawnIO} # convenience name for readability +const SpawnIO = Union{IO, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD +const SpawnIOs = Memory{SpawnIO} # convenience name for readability (used for dispatch also to clearly distinguish from Vector{Redirectable}) function as_cpumask(cpus::Vector{UInt16}) n = max(Int(maximum(cpus)), Int(ccall(:uv_cpumask_size, Cint, ()))) @@ -100,6 +110,7 @@ end error("invalid spawn handle $h from $io") end for io in stdio] + syncd = Task[io.t for io in stdio if io isa SyncCloseFD] handle = Libc.malloc(_sizeof_uv_process) disassociate_julia_struct(handle) (; exec, flags, env, dir) = cmd @@ -117,7 +128,7 @@ end cpumask === nothing ? 0 : length(cpumask), @cfunction(uv_return_spawn, Cvoid, (Ptr{Cvoid}, Int64, Int32))) if err == 0 - pp = Process(cmd, handle) + pp = Process(cmd, handle, syncd) associate_julia_struct(handle, pp) else ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), handle) # will call free on handle eventually @@ -130,23 +141,24 @@ end return pp end -_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIO[]) +_spawn(cmds::AbstractCmd) = _spawn(cmds, SpawnIOs()) -# optimization: we can spawn `Cmd` directly without allocating the ProcessChain -function _spawn(cmd::Cmd, stdios::SpawnIOs) - isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) +function _spawn(cmd::AbstractCmd, stdios::Vector{Redirectable}) pp = setup_stdios(stdios) do stdios - return _spawn_primitive(cmd.exec[1], cmd, stdios) + return _spawn(cmd, stdios) end return pp end +# optimization: we can spawn `Cmd` directly without allocating the ProcessChain +function _spawn(cmd::Cmd, stdios::SpawnIOs) + isempty(cmd.exec) && throw(ArgumentError("cannot spawn empty command")) + return _spawn_primitive(cmd.exec[1], cmd, stdios) +end + # assume that having a ProcessChain means that the stdio are setup function _spawn(cmds::AbstractCmd, stdios::SpawnIOs) - pp = setup_stdios(stdios) do stdios - return _spawn(cmds, stdios, ProcessChain()) - end - return pp + return _spawn(cmds, stdios, ProcessChain()) end # helper function for making a copy of a SpawnIOs, with replacement @@ -212,7 +224,7 @@ end # open the child end of each element of `stdios`, and initialize the parent end -function setup_stdios(f, stdios::SpawnIOs) +function setup_stdios(f, stdios::Vector{Redirectable}) nstdio = length(stdios) open_io = SpawnIOs(undef, nstdio) close_io = falses(nstdio) @@ -295,25 +307,26 @@ function setup_stdio(stdio::IO, child_readable::Bool) child = child_readable ? rd : wr try let in = (child_readable ? parent : stdio), - out = (child_readable ? stdio : parent) - @async try + out = (child_readable ? stdio : parent), + t = @async try write(in, out) catch ex @warn "Process I/O error" exception=(ex, catch_backtrace()) + rethrow() finally close(parent) - child_readable || closewrite(stdio) end + return (SyncCloseFD(child, t), true) end catch close_pipe_sync(child) rethrow() end - return (child, true) end -close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) close_stdio(stdio) = close(stdio) +close_stdio(stdio::OS_HANDLE) = close_pipe_sync(stdio) +close_stdio(stdio::SyncCloseFD) = close_stdio(stdio.fd) # INTERNAL # pad out stdio to have at least three elements, @@ -325,19 +338,19 @@ close_stdio(stdio) = close(stdio) # - An Filesystem.File or IOStream object to redirect the output to # - A FileRedirect, containing a string specifying a filename to be opened for the child -spawn_opts_swallow(stdios::StdIOSet) = SpawnIO[stdios...] -spawn_opts_inherit(stdios::StdIOSet) = SpawnIO[stdios...] +spawn_opts_swallow(stdios::StdIOSet) = Redirectable[stdios...] +spawn_opts_inherit(stdios::StdIOSet) = Redirectable[stdios...] spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) = - SpawnIO[in, out, err] + Redirectable[in, out, err] # pass original descriptors to child processes by default, because we might # have already exhausted and closed the libuv object for our standard streams. # ref issue #8529 spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) = - SpawnIO[in, out, err] + Redirectable[in, out, err] function eachline(cmd::AbstractCmd; keep::Bool=false) out = PipeEndpoint() - processes = _spawn(cmd, SpawnIO[devnull, out, stderr]) + processes = _spawn(cmd, Redirectable[devnull, out, stderr]) # if the user consumes all the data, also check process exit status for success ondone = () -> (success(processes) || pipeline_error(processes); nothing) return EachLine(out, keep=keep, ondone=ondone)::EachLine @@ -385,20 +398,20 @@ function open(cmds::AbstractCmd, stdio::Redirectable=devnull; write::Bool=false, stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in read-write mode")) in = PipeEndpoint() out = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[in, out, stderr]) + processes = _spawn(cmds, Redirectable[in, out, stderr]) processes.in = in processes.out = out elseif read out = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[stdio, out, stderr]) + processes = _spawn(cmds, Redirectable[stdio, out, stderr]) processes.out = out elseif write in = PipeEndpoint() - processes = _spawn(cmds, SpawnIO[in, stdio, stderr]) + processes = _spawn(cmds, Redirectable[in, stdio, stderr]) processes.in = in else stdio === devnull || throw(ArgumentError("no stream can be specified for `stdio` in no-access mode")) - processes = _spawn(cmds, SpawnIO[devnull, devnull, stderr]) + processes = _spawn(cmds, Redirectable[devnull, devnull, stderr]) end return processes end @@ -415,12 +428,18 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...) P = open(cmds, args...; kwargs...) function waitkill(P::Union{Process,ProcessChain}) close(P) - # 0.1 seconds after we hope it dies (from closing stdio), - # we kill the process with SIGTERM (15) - local t = Timer(0.1) do t + # shortly after we hope it starts cleanup and dies (from closing + # stdio), we kill the process with SIGTERM (15) so that we can proceed + # with throwing the error and hope it will exit soon from that + local t = Timer(2) do t process_running(P) && kill(P) end - wait(P) + # pass false to indicate that we do not care about data-races on the + # Julia stdio objects after this point, since we already know this is + # an error path and the state of them is fairly unpredictable anyways + # in that case. Since we closed P some of those should come crumbling + # down already, and we don't want to throw that error here either. + wait(P, false) close(t) end ret = try @@ -430,10 +449,23 @@ function open(f::Function, cmds::AbstractCmd, args...; kwargs...) rethrow() end close(P.in) + closestdio = @async begin + # wait for P to complete (including sync'd), then mark the output streams for EOF (if applicable to that stream type) + wait(P) + err = P.err + applicable(closewrite, err) && closewrite(err) + out = P.out + applicable(closewrite, out) && closewrite(out) + nothing + end + # now verify that the output stream is at EOF, and the user didn't fail to consume it successfully + # (we do not currently verify the user dealt with the stderr stream) if !(eof(P.out)::Bool) waitkill(P) throw(_UVError("open(do)", UV_EPIPE)) end + # make sure to closestdio is completely done to avoid data-races later + wait(closestdio) success(P) || pipeline_error(P) return ret end @@ -650,26 +682,31 @@ function process_status(s::Process) error("process status error") end -function wait(x::Process) - process_exited(x) && return - iolock_begin() +function wait(x::Process, syncd::Bool=true) if !process_exited(x) - preserve_handle(x) - lock(x.exitnotify) - iolock_end() - try - wait(x.exitnotify) - finally - unlock(x.exitnotify) - unpreserve_handle(x) + iolock_begin() + if !process_exited(x) + preserve_handle(x) + lock(x.exitnotify) + iolock_end() + try + wait(x.exitnotify) + finally + unlock(x.exitnotify) + unpreserve_handle(x) + end + else + iolock_end() end - else - iolock_end() + end + # and make sure all sync'd Tasks are complete too + syncd && for t in x.syncd + wait(t) end nothing end -wait(x::ProcessChain) = foreach(wait, x.processes) +wait(x::ProcessChain, syncd::Bool=true) = foreach(p -> wait(p, syncd), x.processes) show(io::IO, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")") diff --git a/base/stream.jl b/base/stream.jl index 22af8d59359f38..5fd621d229f435 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -436,7 +436,10 @@ end function closewrite(s::LibuvStream) iolock_begin() - check_open(s) + if !iswritable(x) + iolock_end() + return + end req = Libc.malloc(_sizeof_uv_shutdown) uv_req_set_data(req, C_NULL) # in case we get interrupted before arriving at the wait call err = ccall(:uv_shutdown, Int32, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), @@ -1489,7 +1492,7 @@ closewrite(s::BufferStream) = close(s) function close(s::BufferStream) lock(s.cond) do s.status = StatusClosed - notify(s.cond) + notify(s.cond) # aka flush nothing end end @@ -1549,6 +1552,7 @@ stop_reading(s::BufferStream) = nothing write(s::BufferStream, b::UInt8) = write(s, Ref{UInt8}(b)) function unsafe_write(s::BufferStream, p::Ptr{UInt8}, nb::UInt) nwrite = lock(s.cond) do + check_open(s) rv = unsafe_write(s.buffer, p, nb) s.buffer_writes || notify(s.cond) rv @@ -1569,9 +1573,18 @@ end buffer_writes(s::BufferStream, bufsize=0) = (s.buffer_writes = true; s) function flush(s::BufferStream) lock(s.cond) do + check_open(s) notify(s.cond) nothing end end skip(s::BufferStream, n) = skip(s.buffer, n) + +function reseteof(x::BufferStream) + lock(s.cond) do + s.status = StatusOpen + nothing + end + nothing +end diff --git a/src/support/ios.c b/src/support/ios.c index 2c20dcb45e4c42..7f70112c82cc0c 100644 --- a/src/support/ios.c +++ b/src/support/ios.c @@ -602,12 +602,12 @@ int ios_eof(ios_t *s) { if (s->state == bst_rd && s->bpos < s->size) return 0; + if (s->_eof) + return 1; if (s->bm == bm_mem) - return (s->_eof ? 1 : 0); + return 0; if (s->fd == -1) return 1; - if (s->_eof) - return 1; return 0; /* if (_fd_available(s->fd)) @@ -617,6 +617,12 @@ int ios_eof(ios_t *s) */ } +void ios_reseteof(ios_t *s) +{ + if (s->bm != bm_mem && s->fd != -1) + s->_eof = 0; +} + int ios_eof_blocking(ios_t *s) { if (s->state == bst_rd && s->bpos < s->size) diff --git a/test/cmdlineargs.jl b/test/cmdlineargs.jl index 564f3524243699..92bada23cb2582 100644 --- a/test/cmdlineargs.jl +++ b/test/cmdlineargs.jl @@ -71,6 +71,7 @@ end @error "cmd failed" cmd read(io, String) rethrow() end + closewrite(io) return read(io, String) end diff --git a/test/spawn.jl b/test/spawn.jl index dab18573993752..4ae4b3368bef66 100644 --- a/test/spawn.jl +++ b/test/spawn.jl @@ -807,8 +807,9 @@ let text = "input-test-text" out = Base.BufferStream() proc = run(catcmd, IOBuffer(text), out, wait=false) @test proc.out === out - @test read(out, String) == text @test success(proc) + closewrite(out) + @test read(out, String) == text out = PipeBuffer() proc = run(catcmd, IOBuffer(SubString(text)), out)