Skip to content

Commit

Permalink
Add chunked encoding of large directories (#16)
Browse files Browse the repository at this point in the history
When warm, it provides a nice speedup:

```
BEFORE:

BenchmarkFindSerialized/100-32         	   84632	     14027 ns/op	        24.28 b/file	         84632 files/s	    5062 B/op	       4 allocs/op
BenchmarkFindSerialized/1000-32        	   13371	     88087 ns/op	        23.32 b/file	         13371 files/s	   49398 B/op	       4 allocs/op
BenchmarkFindSerialized/10000-32       	    1491	    769408 ns/op	        20.66 b/file	          1491 files/s	  451106 B/op	       4 allocs/op
BenchmarkFindSerialized/100000-32      	     163	   7350855 ns/op	        20.25 b/file	         163.0 files/s	 4499216 B/op	       5 allocs/op
BenchmarkFindSerialized/1000000-32     	      15	 175437187 ns/op	        20.16 b/file	         7.500 files/s	44978624 B/op	       6 allocs/op

AFTER:
BenchmarkFindSerialized/100-32         	   89673	     13144 ns/op	        24.28 b/file	      89673 files/s	     216 B/op	       4 allocs/op
BenchmarkFindSerialized/1000-32        	   15356	     78862 ns/op	        23.32 b/file	      15356 files/s	     220 B/op	       4 allocs/op
BenchmarkFindSerialized/10000-32       	    1663	    719424 ns/op	        20.66 b/file	       1663 files/s	     228 B/op	       4 allocs/op
BenchmarkFindSerialized/100000-32      	     170	   6894286 ns/op	        20.25 b/file	       170.0 files/s	 26760 B/op	       4 allocs/op
BenchmarkFindSerialized/1000000-32     	      16	  67916169 ns/op	        20.16 b/file	       16.00 files/s	 2812044 B/op	   4 allocs/op
```

* Add new chunked format for faster FindSerialized.

It kicks in at 25000 dir entries. Ensures that performance will be somewhat linear.

Bumps index limit to 1B.

```
BenchmarkFindSerialized/100-32         	   88186	     13926 ns/op	        24.28 b/file	     88186 files/s	     216 B/op	       4 allocs/op
BenchmarkFindSerialized/1000-32        	   14157	     82875 ns/op	        23.32 b/file	     14157 files/s	     220 B/op	       4 allocs/op
BenchmarkFindSerialized/10000-32       	    1562	    757547 ns/op	        20.66 b/file	      1562 files/s	     225 B/op	       4 allocs/op
BenchmarkFindSerialized/100000-32      	     567	   2101872 ns/op	        21.66 b/file	       567.0 files/s	 1164419 B/op	      10 allocs/op
BenchmarkFindSerialized/1000000-32     	     583	   2035905 ns/op	        21.73 b/file	       583.0 files/s	 1181472 B/op	      47 allocs/op
```
  • Loading branch information
klauspost authored Aug 19, 2024
1 parent a7d408b commit 44c138d
Show file tree
Hide file tree
Showing 10 changed files with 526 additions and 36 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
go-version: [1.21.x,1.22.x]
go-version: [1.21.x,1.22.x,1.23.x]
os: [ubuntu-latest]
steps:
- name: Set up Go ${{ matrix.go-version }} on ${{ matrix.os }}
Expand All @@ -30,6 +30,6 @@ jobs:
env:
GO111MODULE: on
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.59.0
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.60.1
$(go env GOPATH)/bin/golangci-lint run --timeout=5m --config ./.golangci.yml
go test -race ./...
9 changes: 4 additions & 5 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
linters-settings:
golint:
min-confidence: 0

staticcheck:
checks:
- all
- '-SA6002' # disable the rule SA6002, slices are fine.
misspell:
locale: US

Expand All @@ -24,5 +25,3 @@ issues:
- comment on exported method
- should have comment or be unexported
- error strings should not be capitalized or end with punctuation or a newline
service:
golangci-lint-version: 1.59.0 # use the fixed version to not introduce new linters unexpectedly
236 changes: 217 additions & 19 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"

"github.com/klauspost/compress/zstd"
"github.com/tinylib/msgp/msgp"
Expand Down Expand Up @@ -92,6 +93,7 @@ type files []File
const currentVerPlain = 1
const currentVerCompressed = 2
const currentVerCompressedStructs = 3
const currentVerCompressedStructsChunked = 4

var zstdEnc, _ = zstd.NewWriter(nil, zstd.WithWindowSize(128<<10), zstd.WithEncoderConcurrency(2), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
var zstdDec, _ = zstd.NewReader(nil, zstd.WithDecoderLowmem(true), zstd.WithDecoderConcurrency(2), zstd.WithDecoderMaxMemory(MaxIndexSize), zstd.WithDecoderMaxWindow(8<<20))
Expand Down Expand Up @@ -132,7 +134,44 @@ func (f Files) Serialize() ([]byte, error) {
res = append(res, currentVerCompressed)
return zstdEnc.EncodeAll(payload, res), nil
}
const chunkN = 25000
if len(f) < chunkN {
return f.encodeAoS(nil)
}
f.SortByName()
res := append([]byte{}, currentVerCompressedStructsChunked)
left := f
var tmp []byte
for len(left) > 0 {
todo := left
if len(left) < chunkN*2 && len(left) > chunkN {
todo = left[:len(left)/2]
} else if len(left) > chunkN {
todo = left[:chunkN]
}
ch := chunk{
Files: len(todo),
First: todo[0].Name,
Last: todo[len(todo)-1].Name,
}
// Sort each chunk for smaller offsets.
todo.Sort()
var err error
tmp, err = todo.encodeAoS(tmp[:0])
if err != nil {
return nil, err
}
ch.Payload = tmp
res, err = ch.MarshalMsg(res)
if err != nil {
return nil, err
}
left = left[len(todo):]
}
return res, nil
}

func (f Files) encodeAoS(res []byte) ([]byte, error) {
// Encode many files as struct of arrays...
x := filesAsStructs{
Names: make([][]byte, len(f)),
Expand Down Expand Up @@ -173,7 +212,6 @@ func (f Files) Serialize() ([]byte, error) {
if err != nil {
return nil, err
}
res := make([]byte, 0, len(payload))
res = append(res, currentVerCompressedStructs)
return zstdEnc.EncodeAll(payload, res), nil
}
Expand Down Expand Up @@ -209,6 +247,20 @@ func (f Files) Sort() {
}
}

// SortByName will sort files by file name in zip file.
func (f Files) SortByName() {
less := func(i, j int) bool {
a, b := f[i], f[j]
if a.Name != b.Name {
return a.Name < b.Name
}
return a.Offset < b.Offset
}
if !sort.SliceIsSorted(f, less) {
sort.Slice(f, less)
}
}

// Find the file with the provided name.
// Search is linear.
func (f Files) Find(name string) *File {
Expand Down Expand Up @@ -243,41 +295,52 @@ func (f Files) StripFlags(mask uint16) {
}
}

var decompBuffer sync.Pool

// unpackPayload unpacks and optionally decompresses the payload.
func unpackPayload(b []byte) ([]byte, bool, error) {
func unpackPayload(b []byte) (payload []byte, typ byte, newBuf bool, err error) {
if len(b) < 1 {
return nil, false, io.ErrUnexpectedEOF
return nil, 0, false, io.ErrUnexpectedEOF
}
if len(b) > MaxIndexSize {
return nil, false, ErrMaxSizeExceeded
return nil, 0, false, ErrMaxSizeExceeded
}
var out []byte
switch b[0] {
case currentVerPlain:
typ = b[0]
switch typ {
case currentVerPlain, currentVerCompressedStructsChunked:
out = b[1:]
case currentVerCompressed, currentVerCompressedStructs:
decoded, err := zstdDec.DecodeAll(b[1:], nil)
newBuf = true
dst, _ := decompBuffer.Get().([]byte)
// It is ok if we get a nil buffer, the decoder will allocate.

decoded, err := zstdDec.DecodeAll(b[1:], dst[:0])
if err != nil {
switch err {
case zstd.ErrDecoderSizeExceeded, zstd.ErrWindowSizeExceeded:
err = ErrMaxSizeExceeded
}
return nil, false, err
decompBuffer.Put(dst)
return nil, typ, false, err
}
out = decoded
default:
return nil, false, errors.New("unknown version")
return nil, typ, false, errors.New("unknown version")
}
return out, b[0] == currentVerCompressedStructs, nil
return out, typ, newBuf, nil
}

// DeserializeFiles will de-serialize the files.
func DeserializeFiles(b []byte) (Files, error) {
b, structs, err := unpackPayload(b)
b, typ, newBuf, err := unpackPayload(b)
if err != nil {
return nil, err
}
if !structs {
if newBuf {
defer decompBuffer.Put(b)
}
if typ == currentVerCompressed || typ == currentVerPlain {
var dst files
// Check number of files.
nFiles, _, err := msgp.ReadArrayHeaderBytes(b)
Expand All @@ -290,15 +353,53 @@ func DeserializeFiles(b []byte) (Files, error) {
_, err = dst.UnmarshalMsg(b)
return Files(dst), err
}
if typ == currentVerCompressedStructsChunked {
var dst Files
var c chunk
tmp, _ := decompBuffer.Get().([]byte)
defer func() {
if tmp != nil {
decompBuffer.Put(tmp)
}
}()
for len(b) > 0 {
b, err = c.UnmarshalMsgZC(b)
if err != nil {
return nil, err
}
if len(c.Payload) == 0 {
return nil, errors.New("missing payload")
}
switch c.Payload[0] {
case currentVerCompressedStructs:
tmp, err = zstdDec.DecodeAll(c.Payload[1:], tmp[:0])
if err != nil {
return nil, err
}
dst, err = deserializeAoS(tmp, dst)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unknown version")
}
}
return dst, nil
}
return deserializeAoS(b, nil)
}

func deserializeAoS(b []byte, files Files) (Files, error) {
var dst filesAsStructs
var err error
if _, err = dst.UnmarshalMsg(b); err != nil {
return nil, err
}

files := make(Files, len(dst.Names))
start := len(files)
files = append(files, make(Files, len(dst.Names))...)
add := files[start:]
var cur File
for i := range files {
for i := range add {
cur = File{
Name: string(dst.Names[i]),
CompressedSize64: uint64(dst.CSizes[i] + int64(cur.CompressedSize64)),
Expand All @@ -310,17 +411,18 @@ func DeserializeFiles(b []byte) (Files, error) {
if i == 0 {
cur.Offset = dst.Offsets[i]
} else {
cur.Offset = dst.Offsets[i] + files[i-1].Offset + int64(files[i-1].CompressedSize64) + fileHeaderLen + int64(len(files[i-1].Name)) + dataDescriptorLen
cur.Offset = dst.Offsets[i] + add[i-1].Offset + int64(add[i-1].CompressedSize64) + fileHeaderLen + int64(len(add[i-1].Name)) + dataDescriptorLen
}
if len(dst.Custom[i]) > 0 {
if cur.Custom, err = readCustomData(dst.Custom[i]); err != nil {
return nil, err
}
}
files[i] = cur
add[i] = cur

}
return files, err

}

func readCustomData(bts []byte) (dst map[string]string, err error) {
Expand Down Expand Up @@ -360,11 +462,14 @@ func readCustomData(bts []byte) (dst map[string]string, err error) {
// Expected speed scales O(n) for n files.
// Returns nil, io.EOF if not found.
func FindSerialized(b []byte, name string) (*File, error) {
buf, structs, err := unpackPayload(b)
buf, typ, newBuf, err := unpackPayload(b)
if err != nil {
return nil, err
}
if !structs {
if newBuf {
defer decompBuffer.Put(buf)
}
if typ == currentVerCompressed || typ == currentVerPlain {
n, buf, err := msgp.ReadArrayHeaderBytes(buf)
if err != nil {
return nil, err
Expand All @@ -385,6 +490,38 @@ func FindSerialized(b []byte, name string) (*File, error) {
return nil, io.EOF
}

if typ == currentVerCompressedStructsChunked {
var c chunk
for len(buf) > 0 {
buf, err = c.UnmarshalMsgZC(buf)
if err != nil {
return nil, err
}
if name < c.First {
return nil, io.EOF
}
if name >= c.First && name <= c.Last {
if len(c.Payload) == 0 {
return nil, errors.New("missing payload")
}
buf = c.Payload
if buf[0] != currentVerCompressedStructs {
return nil, errors.New("unknown chunk")
}
dst, _ := decompBuffer.Get().([]byte)
defer decompBuffer.Put(dst)
buf, err = zstdDec.DecodeAll(buf[1:], dst[:0])
if err != nil {
return nil, err
}
break
}
if len(buf) == 0 {
return nil, io.EOF
}
}
}

// Files are packed as an array of arrays...
idx := -1
var zb0001 uint32
Expand Down Expand Up @@ -812,3 +949,64 @@ func (z *filesAsStructs) UnmarshalMsg(bts []byte) (o []byte, err error) {
o = bts
return
}

type chunk struct {
Files int
First string
Last string
Payload []byte
}

// UnmarshalMsgZC unmarshals, but does a zero copy bytes
func (z *chunk) UnmarshalMsgZC(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zb0001 uint32
zb0001, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
for zb0001 > 0 {
zb0001--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
switch msgp.UnsafeString(field) {
case "Files":
z.Files, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Files")
return
}
case "First":
z.First, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "First")
return
}
case "Last":
z.Last, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Last")
return
}
case "Payload":
z.Payload, bts, err = msgp.ReadBytesZC(bts)
if err != nil {
err = msgp.WrapError(err, "Payload")
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
err = msgp.WrapError(err)
return
}
}
}
o = bts
return
}
Loading

0 comments on commit 44c138d

Please sign in to comment.