Skip to content

Commit

Permalink
core/rawdb: freezer batch should implement the offset commit, ref bnb…
Browse files Browse the repository at this point in the history
…-chain/bsc#1005

Signed-off-by: Delweng <delweng@gmail.com>
  • Loading branch information
jsvisa committed Feb 21, 2023
1 parent 3a05b3b commit ecffeda
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 22 deletions.
2 changes: 1 addition & 1 deletion core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (f *freezer) MigrateTable(kind string, convert convertLegacyFn) error {
return err
}
var (
batch = newTable.newBatch()
batch = newTable.newBatch(f.offset)
out []byte
start = time.Now()
logged = time.Now()
Expand Down
13 changes: 8 additions & 5 deletions core/rawdb/freezer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type freezerBatch struct {
func newFreezerBatch(f *freezer) *freezerBatch {
batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))}
for kind, table := range f.tables {
batch.tables[kind] = table.newBatch()
batch.tables[kind] = table.newBatch(f.offset)
}
return batch
}
Expand Down Expand Up @@ -91,11 +91,12 @@ type freezerTableBatch struct {
indexBuffer []byte
curItem uint64 // expected index of next append
totalBytes int64 // counts written bytes since reset
offset uint64
}

// newBatch creates a new batch for the freezer table.
func (t *freezerTable) newBatch() *freezerTableBatch {
batch := &freezerTableBatch{t: t}
func (t *freezerTable) newBatch(offset uint64) *freezerTableBatch {
batch := &freezerTableBatch{t: t, offset: offset}
if !t.noCompression {
batch.sb = new(snappyBuffer)
}
Expand All @@ -107,7 +108,8 @@ func (t *freezerTable) newBatch() *freezerTableBatch {
func (batch *freezerTableBatch) reset() {
batch.dataBuffer = batch.dataBuffer[:0]
batch.indexBuffer = batch.indexBuffer[:0]
batch.curItem = atomic.LoadUint64(&batch.t.items)
curItem := batch.t.items + batch.offset
batch.curItem = atomic.LoadUint64(&curItem)
batch.totalBytes = 0
}

Expand Down Expand Up @@ -201,7 +203,8 @@ func (batch *freezerTableBatch) commit() error {

// Update headBytes of table.
batch.t.headBytes += dataSize
atomic.StoreUint64(&batch.t.items, batch.curItem)
items := batch.curItem - batch.offset
atomic.StoreUint64(&batch.t.items, items)

// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
// Fill adds empty data till given number (convenience method for backward compatibilty)
func (t *freezerTable) Fill(number uint64) error {
if t.items < number {
b := t.newBatch()
b := t.newBatch(0)
log.Info("Filling all data into freezer for backward compatablity", "name", t.name, "items", t.items, "number", number)
for t.items < number {
if err := b.Append(t.items, nil); err != nil {
Expand Down
26 changes: 13 additions & 13 deletions core/rawdb/freezer_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestFreezerBasicsClosing(t *testing.T) {
// In-between writes, the table is closed and re-opened.
for x := 0; x < 255; x++ {
data := getChunk(15, x)
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(uint64(x), data))
require.NoError(t, batch.commit())
f.Close()
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) {
t.Errorf("Expected error for missing index entry")
}
// We should now be able to store items again, from item = 1
batch := f.newBatch()
batch := f.newBatch(0)
for x := 1; x < 0xff; x++ {
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
t.Fatal(err)
}
// Write 80 bytes, splitting out into two files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE)))
require.NoError(t, batch.commit())
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestFreezerRepairFirstFile(t *testing.T) {
}

// Write 40 bytes
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD)))
require.NoError(t, batch.commit())

Expand Down Expand Up @@ -512,7 +512,7 @@ func TestFreezerReadAndTruncate(t *testing.T) {
f.truncateHead(0)

// Write the data again
batch := f.newBatch()
batch := f.newBatch(0)
for x := 0; x < 30; x++ {
require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x)))
}
Expand All @@ -534,7 +534,7 @@ func TestFreezerOffset(t *testing.T) {
}

// Write 6 x 20 bytes, splitting out into three files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))

Expand Down Expand Up @@ -598,7 +598,7 @@ func TestFreezerOffset(t *testing.T) {
t.Log(f.dumpIndexString(0, 100))

// It should allow writing item 6.
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
require.NoError(t, batch.commit())

Expand Down Expand Up @@ -676,7 +676,7 @@ func TestTruncateTail(t *testing.T) {
}

// Write 7 x 20 bytes, splitting out into four files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
Expand Down Expand Up @@ -791,7 +791,7 @@ func TestTruncateHead(t *testing.T) {
}

// Write 7 x 20 bytes, splitting out into four files
batch := f.newBatch()
batch := f.newBatch(0)
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
Expand All @@ -816,7 +816,7 @@ func TestTruncateHead(t *testing.T) {
})

// Append new items
batch = f.newBatch()
batch = f.newBatch(0)
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11)))
Expand Down Expand Up @@ -880,7 +880,7 @@ func getChunk(size int, b int) []byte {
func writeChunks(t *testing.T, ft *freezerTable, n int, length int) {
t.Helper()

batch := ft.newBatch()
batch := ft.newBatch(0)
for i := 0; i < n; i++ {
if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil {
t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err)
Expand Down Expand Up @@ -1076,7 +1076,7 @@ func TestFreezerReadonly(t *testing.T) {

// Case 5: Now write some data via a batch.
// This should fail either during AppendRaw or Commit
batch := f.newBatch()
batch := f.newBatch(0)
writeErr := batch.AppendRaw(32, make([]byte, 1))
if writeErr == nil {
writeErr = batch.commit()
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func runRandTest(rt randTest) bool {
}

case opAppend:
batch := f.newBatch()
batch := f.newBatch(0)
for i := 0; i < len(step.items); i++ {
batch.AppendRaw(step.items[i], step.blobs[i])
}
Expand Down
4 changes: 2 additions & 2 deletions core/rawdb/freezer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,12 @@ func TestFreezerReadonlyValidate(t *testing.T) {
t.Fatal("can't open freezer", err)
}
var item = make([]byte, 1024)
aBatch := f.tables["a"].newBatch()
aBatch := f.tables["a"].newBatch(0)
require.NoError(t, aBatch.AppendRaw(0, item))
require.NoError(t, aBatch.AppendRaw(1, item))
require.NoError(t, aBatch.AppendRaw(2, item))
require.NoError(t, aBatch.commit())
bBatch := f.tables["b"].newBatch()
bBatch := f.tables["b"].newBatch(0)
require.NoError(t, bBatch.AppendRaw(0, item))
require.NoError(t, bBatch.commit())
if f.tables["a"].items != 3 {
Expand Down

0 comments on commit ecffeda

Please sign in to comment.