Skip to content

Commit

Permalink
io: implement WriterTo for pipes
Browse files Browse the repository at this point in the history
Implements the WriterTo interface for io pipes to avoid intermediate buffers
when copying with `io.Copy`.

Updates golang#34624
  • Loading branch information
Steven Allen committed Oct 6, 2019
1 parent 9c1f14f commit df0dee0
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
39 changes: 39 additions & 0 deletions src/io/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,36 @@ func (p *pipe) Read(b []byte) (n int, err error) {
}
}

func (p *pipe) WriteTo(w Writer) (n int64, err error) {
for {
select {
case <-p.done:
err := p.readCloseError()
if err == EOF {
err = nil
}
return n, err
default:
}

select {
case bw := <-p.wrCh:
nr, err := w.Write(bw)
p.rdCh <- nr
n += int64(nr)
if err != nil {
return n, err
}
case <-p.done:
err := p.readCloseError()
if err == EOF {
err = nil
}
return n, err
}
}
}

func (p *pipe) readCloseError() error {
rerr := p.rerr.Load()
if werr := p.werr.Load(); rerr == nil && werr != nil {
Expand Down Expand Up @@ -127,6 +157,15 @@ func (r *PipeReader) Read(data []byte) (n int, err error) {
return r.p.Read(data)
}

// WriteTo implements the standard WriterTo interface:
// it forwards data from the pipe to the provided writer until the write end is
// closed.
// If the write end is closed with an error or the provided writer returns an
// error, that error is returned.
func (r *PipeReader) WriteTo(w Writer) (n int64, err error) {
return r.p.WriteTo(w)
}

// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error ErrClosedPipe.
func (r *PipeReader) Close() error {
Expand Down
41 changes: 41 additions & 0 deletions src/io/pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,44 @@ func sortBytesInGroups(b []byte, n int) []byte {
sort.Slice(groups, func(i, j int) bool { return bytes.Compare(groups[i], groups[j]) < 0 })
return bytes.Join(groups, nil)
}

// Test WriteTo
func TestPipeCopy(t *testing.T) {
done := make(chan struct{})
r, w := Pipe()

var expected bytes.Buffer
go func() {
defer close(done)
defer w.Close()
for i := 0; i < 10; i++ {
n1, _ := fmt.Fprintf(&expected, "message: %d\n", i)
n2, err := fmt.Fprintf(w, "message: %d\n", i)
if err != nil {
t.Errorf("write: %v", err)
return
}
if n1 != n2 {
t.Errorf("expected to write %d bytes, wrote %d", n1, n2)
}
}
}()

var actual bytes.Buffer
n, err := Copy(&actual, r)
if err != nil {
t.Errorf("read: %v", err)
}

// cleanup
r.Close()
<-done

if int(n) != actual.Len() {
t.Errorf("amount copied doesn't match amount read: %d != %d", n, actual.Len())
}

if !bytes.Equal(actual.Bytes(), expected.Bytes()) {
t.Errorf("did not read expected data")
}
}

0 comments on commit df0dee0

Please sign in to comment.