Skip to content

Commit

Permalink
Compressor Optimizer (#367)
Browse files Browse the repository at this point in the history
* refactor: synchronous optimization

---------

Signed-off-by: Arya Tabaie <arya.pourtabatabaie@gmail.com>
Co-authored-by: Arya Tabaie <15056835+Tabaie@users.noreply.github.com>
  • Loading branch information
Tabaie and Tabaie authored Dec 24, 2024
1 parent 972cc85 commit 3805acb
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 58 deletions.
104 changes: 72 additions & 32 deletions prover/lib/compressor/blob/v1/blob_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ import (
"slices"
"strings"

"github.com/consensys/compress/lzss"
fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
"github.com/consensys/linea-monorepo/prover/lib/compressor/blob/dictionary"
"github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode"

fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
"github.com/sirupsen/logrus"

"github.com/consensys/compress/lzss"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/sirupsen/logrus"
)

const (
Expand All @@ -39,7 +37,7 @@ type BlobMaker struct {
dict []byte // dictionary used for compression
dictStore dictionary.Store // dictionary store comprising only dict, used for decompression sanity checks

Header Header
header Header

// contains currentBlob data from latest **valid** call to Write
// that is the header (uncompressed) and the body (compressed)
Expand Down Expand Up @@ -74,7 +72,7 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) {
if err != nil {
return nil, err
}
copy(blobMaker.Header.DictChecksum[:], dictChecksum)
copy(blobMaker.header.DictChecksum[:], dictChecksum)

blobMaker.compressor, err = lzss.NewCompressor(dict)
if err != nil {
Expand All @@ -89,18 +87,18 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) {

// StartNewBatch starts a new batch of blocks.
func (bm *BlobMaker) StartNewBatch() {
bm.Header.sealBatch()
bm.header.sealBatch()
}

// Reset resets the bm to its initial state.
func (bm *BlobMaker) Reset() {
bm.Header.resetTable()
bm.header.resetTable()
bm.currentBlobLength = 0
bm.buf.Reset()
bm.packBuffer.Reset()
bm.compressor.Reset()

bm.StartNewBatch()
bm.header.sealBatch()
}

// Len returns the length of the compressed data, which includes the header.
Expand All @@ -121,14 +119,14 @@ func (bm *BlobMaker) Bytes() []byte {
if err != nil {
var sbb strings.Builder
fmt.Fprintf(&sbb, "invalid blob: %v\n", err)
fmt.Fprintf(&sbb, "header: %v\n", bm.Header)
fmt.Fprintf(&sbb, "header: %v\n", bm.header)
fmt.Fprintf(&sbb, "bm.currentBlobLength: %v\n", bm.currentBlobLength)
fmt.Fprintf(&sbb, "bm.currentBlob: %x\n", bm.currentBlob[:bm.currentBlobLength])

panic(sbb.String())
}
// compare the header
if !header.Equals(&bm.Header) {
if !header.Equals(&bm.header) {
panic("invalid blob: header mismatch")
}
if !bytes.Equal(rawBlocks, bm.compressor.WrittenBytes()) {
Expand All @@ -141,6 +139,7 @@ func (bm *BlobMaker) Bytes() []byte {
// Write attempts to append the RLP block to the current batch.
// if forceReset is set; this will NOT append the bytes but still returns true if the chunk could have been appended
func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error) {
prevLen := bm.compressor.Written()

// decode the RLP block.
var block types.Block
Expand Down Expand Up @@ -169,17 +168,16 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error
if innerErr := bm.compressor.Revert(); innerErr != nil {
return false, fmt.Errorf("when reverting compressor because writing failed: %w\noriginal error: %w", innerErr, err)
}
bm.Header.removeLastBlock()
return false, fmt.Errorf("when writing block to compressor: %w", err)
}

// increment length of the current batch
bm.Header.addBlock(blockLen)
bm.header.addBlock(blockLen)
// write the header to get its length.
bm.buf.Reset()
if _, err = bm.Header.WriteTo(&bm.buf); err != nil {
if _, err = bm.header.WriteTo(&bm.buf); err != nil {
// only possible error is an underlying writer error (shouldn't happen we use a simple in-memory buffer)
bm.Header.removeLastBlock()
bm.header.removeLastBlock()
return false, fmt.Errorf("when writing header to buffer: %w", err)
}

Expand All @@ -191,45 +189,80 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error
if err := bm.compressor.Revert(); err != nil {
return false, fmt.Errorf("when reverting compressor because uncompressed blob is > maxUncompressedSize: %w", err)
}
bm.Header.removeLastBlock()
bm.header.removeLastBlock()
return false, nil
}

fitsInBlob := func() bool {
return encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit
}

payload := bm.compressor.WrittenBytes()
recompressionAttempted := false
revert := func() error { // from this point on, we may have recompressed the entire payload in one go
// that makes the compressor's own Revert method unusable.
bm.header.removeLastBlock()
if !recompressionAttempted { // fast path for most "CanWrite" calls
return bm.compressor.Revert()
}
// we can't use the compressor's own Revert method because we tried to compress in one go.
bm.compressor.Reset()
_, err := bm.compressor.Write(payload[:prevLen])
return wrapError(err, "reverting the compressor")
}

// check that the header + the compressed data fits in the blob
fitsInBlob := encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit
if !fitsInBlob {
// first thing to check is if we bypass compression, would that fit?
if !fitsInBlob() {
recompressionAttempted = true

// first thing to check is whether we can fit the block if we recompress everything in one go, known to achieve a higher ratio.
bm.compressor.Reset()
if _, err = bm.compressor.Write(payload); err != nil {
err = fmt.Errorf("when recompressing the blob: %w", err)

if innerErr := revert(); innerErr != nil {
err = fmt.Errorf("%w\n\tto recover from write failure: %w", innerErr, err)
}

return false, err
}
if fitsInBlob() {
goto bypass
}

// that didn't work. a "desperate" attempt is not to compress at all.
if bm.compressor.ConsiderBypassing() {
// we can bypass compression and get a better ratio.
// let's check if now we fit in the blob.
if encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit {
if fitsInBlob() {
goto bypass
}
}

// discard.
if err = bm.compressor.Revert(); err != nil {
if err = revert(); err != nil {
return false, fmt.Errorf("when reverting compressor because blob is full: %w", err)
}
bm.Header.removeLastBlock()
return false, nil
}
bypass:
if forceReset {
// we don't want to append the data, but we could have.
if err = bm.compressor.Revert(); err != nil {
return false, fmt.Errorf("when reverting compressor (blob is not full but forceReset == true): %w", err)
if err = revert(); err != nil {
return false, fmt.Errorf("%w\nreverting because forceReset == true even though the blob isn't full", err)
}
bm.Header.removeLastBlock()
return true, nil
}

// copy the compressed data to the blob
bm.packBuffer.Reset()
n2, err := encode.PackAlign(&bm.packBuffer, bm.buf.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(bm.compressor.Bytes()))
if err != nil {
bm.compressor.Revert()
bm.Header.removeLastBlock()
err = fmt.Errorf("when packing blob: %w", err)
innerErr := revert()
if innerErr != nil {
err = fmt.Errorf("%w\n\twhen attempting to recover from: %w", innerErr, err)
}
return false, fmt.Errorf("when packing blob: %w", err)
}
bm.currentBlobLength = int(n2)
Expand All @@ -241,9 +274,9 @@ bypass:
// Clone returns a (almost) deep copy of the bm -- this is used for test purposes.
func (bm *BlobMaker) Clone() *BlobMaker {
deepCopy := *bm
deepCopy.Header.BatchSizes = make([]int, len(bm.Header.BatchSizes))
deepCopy.header.BatchSizes = make([]int, len(bm.header.BatchSizes))

copy(deepCopy.Header.BatchSizes, bm.Header.BatchSizes)
copy(deepCopy.header.BatchSizes, bm.header.BatchSizes)

return &deepCopy
}
Expand All @@ -259,10 +292,10 @@ func (bm *BlobMaker) Equals(other *BlobMaker) bool {
if !bytes.Equal(bm.currentBlob[:bm.currentBlobLength], other.currentBlob[:other.currentBlobLength]) {
return false
}
if len(bm.Header.BatchSizes) != len(other.Header.BatchSizes) {
if len(bm.header.BatchSizes) != len(other.header.BatchSizes) {
return false
}
if !slices.Equal(bm.Header.BatchSizes, other.Header.BatchSizes) {
if !slices.Equal(bm.header.BatchSizes, other.header.BatchSizes) {
return false
}
return true
Expand Down Expand Up @@ -410,3 +443,10 @@ func (bm *BlobMaker) RawCompressedSize(data []byte) (int, error) {

return n, nil
}

func wrapError(err error, format string, args ...any) error {
if err == nil {
return nil
}
return fmt.Errorf(format+": %w", append(args, err)...)
}
26 changes: 0 additions & 26 deletions prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import (
"path/filepath"
"strings"

"github.com/consensys/compress/lzss"
fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
"github.com/consensys/linea-monorepo/prover/backend/execution"
"github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode"
v1 "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -81,29 +78,6 @@ func RandIntn(n int) int { // TODO @Tabaie remove
return int(binary.BigEndian.Uint64(b[:]) % uint64(n))
}

func EmptyBlob(t require.TestingT) []byte {
var headerB bytes.Buffer

repoRoot, err := GetRepoRootPath()
assert.NoError(t, err)
// Init bm
bm, err := v1.NewBlobMaker(1000, filepath.Join(repoRoot, "prover/lib/compressor/compressor_dict.bin"))
assert.NoError(t, err)

if _, err = bm.Header.WriteTo(&headerB); err != nil {
panic(err)
}

compressor, err := lzss.NewCompressor(GetDict(t))
assert.NoError(t, err)

var bb bytes.Buffer
if _, err = encode.PackAlign(&bb, headerB.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(compressor.Bytes())); err != nil {
panic(err)
}
return bb.Bytes()
}

func SingleBlockBlob(t require.TestingT) []byte {
testBlocks, bm := TestBlocksAndBlobMaker(t)

Expand Down

0 comments on commit 3805acb

Please sign in to comment.