From 4109984717ab9a202c78948a8bba05d370cf2a2a Mon Sep 17 00:00:00 2001 From: Mark Kremer Date: Thu, 8 Aug 2024 12:25:35 +0200 Subject: [PATCH] Make flac.NewSeek() use a buffered reader too (#72) * Make flac.NewSeek() use a buffered reader Which is similar to how flac.New() works. * Test ReadSeeker * Rewrite ReadSeeker tests * Fix error message inconsistencies --- flac.go | 6 +- internal/bufseekio/readseeker.go | 152 +++++++++++++++ internal/bufseekio/readseeker_test.go | 266 ++++++++++++++++++++++++++ 3 files changed, 422 insertions(+), 2 deletions(-) create mode 100644 internal/bufseekio/readseeker.go create mode 100644 internal/bufseekio/readseeker_test.go diff --git a/flac.go b/flac.go index 20d4531..030090d 100644 --- a/flac.go +++ b/flac.go @@ -35,6 +35,7 @@ import ( "os" "github.com/mewkiz/flac/frame" + "github.com/mewkiz/flac/internal/bufseekio" "github.com/mewkiz/flac/meta" ) @@ -96,7 +97,8 @@ func New(r io.Reader) (stream *Stream, err error) { // will not be buffered, which might result in performance issues. Using an // in-memory buffer like *bytes.Reader should work well. func NewSeek(rs io.ReadSeeker) (stream *Stream, err error) { - stream = &Stream{r: rs, seekTableSize: defaultSeekTableSize} + br := bufseekio.NewReadSeeker(rs) + stream = &Stream{r: br, seekTableSize: defaultSeekTableSize} // Verify FLAC signature and parse the StreamInfo metadata block. block, err := stream.parseStreamInfo() @@ -121,7 +123,7 @@ func NewSeek(rs io.ReadSeeker) (stream *Stream, err error) { } // Record file offset of the first frame header. - stream.dataStart, err = rs.Seek(0, io.SeekCurrent) + stream.dataStart, err = br.Seek(0, io.SeekCurrent) return stream, err } diff --git a/internal/bufseekio/readseeker.go b/internal/bufseekio/readseeker.go new file mode 100644 index 0000000..9678eda --- /dev/null +++ b/internal/bufseekio/readseeker.go @@ -0,0 +1,152 @@ +package bufseekio + +import ( + "errors" + "io" +) + +const ( + defaultBufSize = 4096 +) + +// ReadSeeker implements buffering for an io.ReadSeeker object. +// ReadSeeker is based on bufio.Reader with Seek functionality added +// and unneeded functionality removed. +type ReadSeeker struct { + buf []byte + pos int64 // absolute start position of buf + rd io.ReadSeeker // read-seeker provided by the client + r, w int // buf read and write positions within buf + err error +} + +const minReadBufferSize = 16 + +// NewReadSeekerSize returns a new ReadSeeker whose buffer has at least the specified +// size. If the argument io.ReadSeeker is already a ReadSeeker with large enough +// size, it returns the underlying ReadSeeker. +func NewReadSeekerSize(rd io.ReadSeeker, size int) *ReadSeeker { + // Is it already a Reader? + b, ok := rd.(*ReadSeeker) + if ok && len(b.buf) >= size { + return b + } + if size < minReadBufferSize { + size = minReadBufferSize + } + r := new(ReadSeeker) + r.reset(make([]byte, size), rd) + return r +} + +// NewReadSeeker returns a new ReadSeeker whose buffer has the default size. +func NewReadSeeker(rd io.ReadSeeker) *ReadSeeker { + return NewReadSeekerSize(rd, defaultBufSize) +} + +var errNegativeRead = errors.New("bufseekio: reader returned negative count from Read") + +func (b *ReadSeeker) reset(buf []byte, r io.ReadSeeker) { + *b = ReadSeeker{ + buf: buf, + rd: r, + } +} + +func (b *ReadSeeker) readErr() error { + err := b.err + b.err = nil + return err +} + +// Read reads data into p. +// It returns the number of bytes read into p. +// The bytes are taken from at most one Read on the underlying Reader, +// hence n may be less than len(p). +// To read exactly len(p) bytes, use io.ReadFull(b, p). +// If the underlying Reader can return a non-zero count with io.EOF, +// then this Read method can do so as well; see the [io.Reader] docs. +func (b *ReadSeeker) Read(p []byte) (n int, err error) { + n = len(p) + if n == 0 { + if b.buffered() > 0 { + return 0, nil + } + return 0, b.readErr() + } + if b.r == b.w { + if b.err != nil { + return 0, b.readErr() + } + if len(p) >= len(b.buf) { + // Large read, empty buffer. + // Read directly into p to avoid copy. + n, b.err = b.rd.Read(p) + if n < 0 { + panic(errNegativeRead) + } + b.pos += int64(n) + return n, b.readErr() + } + // One read. + b.pos += int64(b.r) + b.r = 0 + b.w = 0 + n, b.err = b.rd.Read(b.buf) + if n < 0 { + panic(errNegativeRead) + } + if n == 0 { + return 0, b.readErr() + } + b.w += n + } + + // copy as much as we can + // Note: if the slice panics here, it is probably because + // the underlying reader returned a bad count. See issue 49795. + n = copy(p, b.buf[b.r:b.w]) + b.r += n + return n, nil +} + +// buffered returns the number of bytes that can be read from the current buffer. +func (b *ReadSeeker) buffered() int { return b.w - b.r } + +func (b *ReadSeeker) Seek(offset int64, whence int) (int64, error) { + // The stream.Seek() implementation makes heavy use of seeking with offset 0 + // to obtain the current position; let's optimize for it. + if offset == 0 && whence == io.SeekCurrent { + return b.position(), nil + } + // When seeking from the end, the absolute position isn't known by ReadSeeker + // so the current buffer cannot be used. Seeking cannot be avoided. + if whence == io.SeekEnd { + return b.seek(offset, whence) + } + // Calculate the absolute offset. + abs := offset + if whence == io.SeekCurrent { + abs += b.position() + } + // Check if the offset is within buf. + if abs >= b.pos && abs < b.pos+int64(b.w) { + b.r = int(abs - b.pos) + return abs, nil + } + + return b.seek(abs, io.SeekStart) +} + +func (b *ReadSeeker) seek(offset int64, whence int) (int64, error) { + b.r = 0 + b.w = 0 + var err error + b.pos, err = b.rd.Seek(offset, whence) + return b.pos, err +} + +// position returns the absolute read offset. +func (b *ReadSeeker) position() int64 { + return b.pos + int64(b.r) +} diff --git a/internal/bufseekio/readseeker_test.go b/internal/bufseekio/readseeker_test.go new file mode 100644 index 0000000..06b162d --- /dev/null +++ b/internal/bufseekio/readseeker_test.go @@ -0,0 +1,266 @@ +package bufseekio + +import ( + "bytes" + "errors" + "io" + "reflect" + "testing" +) + +func TestNewReadSeekerSize(t *testing.T) { + buf := bytes.NewReader(make([]byte, 100)) + + // Test custom buffer size. + if rs := NewReadSeekerSize(buf, 20); len(rs.buf) != 20 { + t.Fatalf("want %d got %d", 20, len(rs.buf)) + } + + // Test too small buffer size. + if rs := NewReadSeekerSize(buf, 1); len(rs.buf) != minReadBufferSize { + t.Fatalf("want %d got %d", minReadBufferSize, len(rs.buf)) + } + + // Test reuse existing ReadSeeker. + rs := NewReadSeekerSize(buf, 20) + if rs2 := NewReadSeekerSize(rs, 5); rs != rs2 { + t.Fatal("expected ReadSeeker to be reused but got a different ReadSeeker") + } +} + +func TestNewReadSeeker(t *testing.T) { + buf := bytes.NewReader(make([]byte, 100)) + if rs := NewReadSeeker(buf); len(rs.buf) != defaultBufSize { + t.Fatalf("want %d got %d", defaultBufSize, len(rs.buf)) + } +} + +func TestReadSeeker_Read(t *testing.T) { + data := make([]byte, 100) + for i := range data { + data[i] = byte(i) + } + rs := NewReadSeekerSize(bytes.NewReader(data), 20) + if len(rs.buf) != 20 { + t.Fatal("the buffer size was changed and the validity of this test has become unknown") + } + + // Test small read. + got := make([]byte, 5) + if n, err := rs.Read(got); err != nil || n != 5 || !reflect.DeepEqual(got, []byte{0, 1, 2, 3, 4}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 5, n, []byte{0, 1, 2, 3, 4}, got, err) + } + if p, err := rs.Seek(0, io.SeekCurrent); err != nil || p != 5 { + t.Fatalf("want %d got %d, err=%v", 5, p, err) + } + + // Test big read with initially filled buffer. + got = make([]byte, 25) + if n, err := rs.Read(got); err != nil || n != 15 || !reflect.DeepEqual(got, []byte{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 15, n, []byte{5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, got, err) + } + if p, err := rs.Seek(0, io.SeekCurrent); err != nil || p != 20 { + t.Fatalf("want %d got %d, err=%v", 20, p, err) + } + + // Test big read with initially empty buffer. + if n, err := rs.Read(got); err != nil || n != 25 || !reflect.DeepEqual(got, []byte{20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 25, n, []byte{20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44}, got, err) + } + if p, err := rs.Seek(0, io.SeekCurrent); err != nil || p != 45 { + t.Fatalf("want %d got %d, err=%v", 45, p, err) + } + + // Test EOF. + if p, err := rs.Seek(98, io.SeekStart); err != nil || p != 98 { + t.Fatalf("want %d got %d, err=%v", 98, p, err) + } + got = make([]byte, 5) + if n, err := rs.Read(got); err != nil || n != 2 || !reflect.DeepEqual(got, []byte{98, 99, 0, 0, 0}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 2, n, []byte{98, 99, 0, 0, 0}, got, err) + } + if n, err := rs.Read(got); err != io.EOF || n != 0 { + t.Fatalf("want n read %d got %d, err=%v", 0, n, err) + } + + // Test source that returns bytes and an error at the same time. + rs = NewReadSeekerSize(&readAndError{bytes: []byte{2, 3, 5}}, 20) + if len(rs.buf) != 20 { + t.Fatal("the buffer size was changed and the validity of this test has become unknown") + } + got = make([]byte, 5) + if n, err := rs.Read(got); err != nil || n != 3 || !reflect.DeepEqual(got, []byte{2, 3, 5, 0, 0}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 3, n, []byte{2, 3, 5, 0, 0}, got, err) + } + if n, err := rs.Read(got); err != expectedErr || n != 0 { + t.Fatalf("want n read %d got %d, want error %v, got %v", 0, n, expectedErr, err) + } + + // Test read nothing with an empty buffer and a queued error. + rs = NewReadSeekerSize(&readAndError{bytes: []byte{2, 3, 5}}, 20) + if len(rs.buf) != 20 { + t.Fatal("the buffer size was changed and the validity of this test has become unknown") + } + got = make([]byte, 3) + if n, err := rs.Read(got); err != nil || n != 3 || !reflect.DeepEqual(got, []byte{2, 3, 5}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 3, n, []byte{2, 3, 5}, got, err) + } + if n, err := rs.Read(nil); err != expectedErr || n != 0 { + t.Fatalf("want n read %d got %d, want error %v, got %v", 0, n, expectedErr, err) + } + if n, err := rs.Read(nil); err != nil || n != 0 { + t.Fatalf("want n read %d got %d, err=%v", 0, n, err) + } + + // Test read nothing with a non-empty buffer and a queued error. + rs = NewReadSeekerSize(&readAndError{bytes: []byte{2, 3, 5}}, 20) + if len(rs.buf) != 20 { + t.Fatal("the buffer size was changed and the validity of this test has become unknown") + } + got = make([]byte, 1) + if n, err := rs.Read(got); err != nil || n != 1 || !reflect.DeepEqual(got, []byte{2}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 1, n, []byte{}, got, err) + } + if n, err := rs.Read(nil); err != nil || n != 0 { + t.Fatalf("want n read %d got %d, err=%v", 0, n, err) + } +} + +var expectedErr = errors.New("expected error") + +type readAndError struct { + bytes []byte +} + +func (r *readAndError) Read(p []byte) (n int, err error) { + for i, b := range r.bytes { + p[i] = b + } + return len(r.bytes), expectedErr +} + +func (r *readAndError) Seek(offset int64, whence int) (int64, error) { + panic("not implemented") +} + +func TestReadSeeker_Seek(t *testing.T) { + data := make([]byte, 100) + for i := range data { + data[i] = byte(i) + } + r := &seekRecorder{rs: bytes.NewReader(data)} + rs := NewReadSeekerSize(r, 20) + if len(rs.buf) != 20 { + t.Fatal("the buffer size was changed and the validity of this test has become unknown") + } + + got := make([]byte, 5) + + // Test with io.SeekStart + if p, err := rs.Seek(10, io.SeekStart); err != nil || p != 10 { + t.Fatalf("want %d got %d, err=%v", 10, p, err) + } + r.assertSeeked(t, []seekRecord{{10, io.SeekStart}}) + if n, err := rs.Read(got); err != nil || n != 5 || !reflect.DeepEqual(got, []byte{10, 11, 12, 13, 14}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 5, n, []byte{10, 11, 12, 13, 14}, got, err) + } + if p, err := rs.Seek(0, io.SeekCurrent); err != nil || p != 15 { + t.Fatalf("want %d got %d, err=%v", 15, p, err) + } + r.assertSeeked(t, nil) + + // Test with io.SeekCurrent and positive offset within buffer. + if p, err := rs.Seek(5, io.SeekCurrent); err != nil || p != 20 { + t.Fatalf("want %d got %d, err=%v", 20, p, err) + } + r.assertSeeked(t, nil) + if n, err := rs.Read(got); err != nil || n != 5 || !reflect.DeepEqual(got, []byte{20, 21, 22, 23, 24}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 5, n, []byte{20, 21, 22, 23, 24}, got, err) + } + if p, err := rs.Seek(0, io.SeekCurrent); err != nil || p != 25 { + t.Fatalf("want %d got %d, err=%v", 25, p, err) + } + + // Test with io.SeekCurrent and negative offset within buffer. + if p, err := rs.Seek(-10, io.SeekCurrent); err != nil || p != 15 { + t.Fatalf("want %d got %d, err=%v", 15, p, err) + } + r.assertSeeked(t, nil) + if n, err := rs.Read(got); err != nil || n != 5 || !reflect.DeepEqual(got, []byte{15, 16, 17, 18, 19}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 5, n, []byte{15, 16, 17, 18, 19}, got, err) + } + if p, err := rs.Seek(0, io.SeekCurrent); err != nil || p != 20 { + t.Fatalf("want %d got %d, err=%v", 20, p, err) + } + + // Test with io.SeekCurrent and positive offset outside buffer. + if p, err := rs.Seek(30, io.SeekCurrent); err != nil || p != 50 { + t.Fatalf("want %d got %d, err=%v", 50, p, err) + } + r.assertSeeked(t, []seekRecord{{50, io.SeekStart}}) + if n, err := rs.Read(got); err != nil || n != 5 || !reflect.DeepEqual(got, []byte{50, 51, 52, 53, 54}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 5, n, []byte{50, 51, 52, 53, 54}, got, err) + } + if p, err := rs.Seek(0, io.SeekCurrent); err != nil || p != 55 { + t.Fatalf("want %d got %d, err=%v", 55, p, err) + } + + // Test seek with io.SeekEnd within buffer. + if p, err := rs.Seek(-45, io.SeekEnd); err != nil || p != 55 { + t.Fatalf("want %d got %d, err=%v", 55, p, err) + } + r.assertSeeked(t, []seekRecord{{-45, io.SeekEnd}}) + if n, err := rs.Read(got); err != nil || n != 5 || !reflect.DeepEqual(got, []byte{55, 56, 57, 58, 59}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 5, n, []byte{55, 56, 57, 58, 59}, got, err) + } + if p, err := rs.Seek(0, io.SeekCurrent); err != nil || p != 60 { + t.Fatalf("want %d got %d, err=%v", 60, p, err) + } + + // Test seek with error. + if _, err := rs.Seek(-100, io.SeekStart); err == nil || err.Error() != "bytes.Reader.Seek: negative position" { + t.Fatalf("want error 'bytes.Reader.Seek: negative position' got %v", err) + } + r.assertSeeked(t, []seekRecord{{-100, io.SeekStart}}) + + // Test seek after error. + if p, err := rs.Seek(10, io.SeekStart); err != nil || p != 10 { + t.Fatalf("want %d got %d, err=%v", 10, p, err) + } + r.assertSeeked(t, []seekRecord{{10, io.SeekStart}}) + if n, err := rs.Read(got); err != nil || n != 5 || !reflect.DeepEqual(got, []byte{10, 11, 12, 13, 14}) { + t.Fatalf("want n read %d got %d, want buffer %v got %v, err=%v", 5, n, []byte{10, 11, 12, 13, 14}, got, err) + } +} + +type seekRecord struct { + offset int64 + whence int +} + +type seekRecorder struct { + rs io.ReadSeeker + seeks []seekRecord +} + +func (r *seekRecorder) Read(p []byte) (n int, err error) { + return r.rs.Read(p) +} + +func (r *seekRecorder) Seek(offset int64, whence int) (int64, error) { + r.seeks = append(r.seeks, seekRecord{offset: offset, whence: whence}) + return r.rs.Seek(offset, whence) +} + +func (r *seekRecorder) assertSeeked(t *testing.T, expected []seekRecord) { + t.Helper() + + if !reflect.DeepEqual(expected, r.seeks) { + t.Fatalf("seek mismatch; expected %#v, got %#v", expected, r.seeks) + } + r.reset() +} + +func (r *seekRecorder) reset() { + r.seeks = nil +}