Skip to content

Commit

Permalink
zstd: Fix zstd in zip decompressor, add options (#539)
Browse files Browse the repository at this point in the history
Decompressor would return error when finishing.

Add options to `ZipDecompressor`.
  • Loading branch information
klauspost authored Mar 23, 2022
1 parent 8dc799d commit fbdf277
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions zstd/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,34 @@ const ZipMethodPKWare = 20
var zipReaderPool sync.Pool

// newZipReader creates a pooled zip decompressor.
func newZipReader(r io.Reader) io.ReadCloser {
dec, ok := zipReaderPool.Get().(*Decoder)
if ok {
dec.Reset(r)
} else {
d, err := NewReader(r, WithDecoderConcurrency(1), WithDecoderLowmem(true))
if err != nil {
panic(err)
func newZipReader(opts ...DOption) func(r io.Reader) io.ReadCloser {
pool := &zipReaderPool
if len(opts) > 0 {
opts = append([]DOption{WithDecoderLowmem(true), WithDecoderMaxWindow(128 << 20)}, opts...)
// Force concurrency 1
opts = append(opts, WithDecoderConcurrency(1))
// Create our own pool
pool = &sync.Pool{}
}
return func(r io.Reader) io.ReadCloser {
dec, ok := pool.Get().(*Decoder)
if ok {
dec.Reset(r)
} else {
d, err := NewReader(r, opts...)
if err != nil {
panic(err)
}
dec = d
}
dec = d
return &pooledZipReader{dec: dec, pool: pool}
}
return &pooledZipReader{dec: dec}
}

type pooledZipReader struct {
mu sync.Mutex // guards Close and Read
dec *Decoder
mu sync.Mutex // guards Close and Read
pool *sync.Pool
dec *Decoder
}

func (r *pooledZipReader) Read(p []byte) (n int, err error) {
Expand All @@ -48,8 +59,8 @@ func (r *pooledZipReader) Read(p []byte) (n int, err error) {
}
dec, err := r.dec.Read(p)
if err == io.EOF {
err = r.dec.Reset(nil)
zipReaderPool.Put(r.dec)
r.dec.Reset(nil)
r.pool.Put(r.dec)
r.dec = nil
}
return dec, err
Expand All @@ -61,7 +72,7 @@ func (r *pooledZipReader) Close() error {
var err error
if r.dec != nil {
err = r.dec.Reset(nil)
zipReaderPool.Put(r.dec)
r.pool.Put(r.dec)
r.dec = nil
}
return err
Expand Down Expand Up @@ -115,6 +126,9 @@ func ZipCompressor(opts ...EOption) func(w io.Writer) (io.WriteCloser, error) {

// ZipDecompressor returns a decompressor that can be registered with zip libraries.
// See ZipCompressor for example.
func ZipDecompressor() func(r io.Reader) io.ReadCloser {
return newZipReader
// Options can be specified. WithDecoderConcurrency(1) is forced,
// and by default a 128MB maximum decompression window is specified.
// The window size can be overridden if required.
func ZipDecompressor(opts ...DOption) func(r io.Reader) io.ReadCloser {
return newZipReader(opts...)
}

0 comments on commit fbdf277

Please sign in to comment.