diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index b08db8eace..29f4416f49 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -712,7 +712,7 @@ func (f *freezer) MigrateTable(kind string, convert convertLegacyFn) error { return err } var ( - batch = newTable.newBatch() + batch = newTable.newBatch(f.offset) out []byte start = time.Now() logged = time.Now() diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 864a7f5e98..044bad7da1 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -37,7 +37,7 @@ type freezerBatch struct { func newFreezerBatch(f *freezer) *freezerBatch { batch := &freezerBatch{tables: make(map[string]*freezerTableBatch, len(f.tables))} for kind, table := range f.tables { - batch.tables[kind] = table.newBatch() + batch.tables[kind] = table.newBatch(f.offset) } return batch } @@ -91,11 +91,12 @@ type freezerTableBatch struct { indexBuffer []byte curItem uint64 // expected index of next append totalBytes int64 // counts written bytes since reset + offset uint64 } // newBatch creates a new batch for the freezer table. -func (t *freezerTable) newBatch() *freezerTableBatch { - batch := &freezerTableBatch{t: t} +func (t *freezerTable) newBatch(offset uint64) *freezerTableBatch { + batch := &freezerTableBatch{t: t, offset: offset} if !t.noCompression { batch.sb = new(snappyBuffer) } @@ -107,7 +108,8 @@ func (t *freezerTable) newBatch() *freezerTableBatch { func (batch *freezerTableBatch) reset() { batch.dataBuffer = batch.dataBuffer[:0] batch.indexBuffer = batch.indexBuffer[:0] - batch.curItem = atomic.LoadUint64(&batch.t.items) + curItem := batch.t.items + batch.offset + batch.curItem = atomic.LoadUint64(&curItem) batch.totalBytes = 0 } @@ -201,7 +203,8 @@ func (batch *freezerTableBatch) commit() error { // Update headBytes of table. batch.t.headBytes += dataSize - atomic.StoreUint64(&batch.t.items, batch.curItem) + items := batch.curItem - batch.offset + atomic.StoreUint64(&batch.t.items, items) // Update metrics. batch.t.sizeGauge.Inc(dataSize + indexSize) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index a076ded067..a01d8dfd9a 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -931,7 +931,7 @@ func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { // Fill adds empty data till given number (convenience method for backward compatibilty) func (t *freezerTable) Fill(number uint64) error { if t.items < number { - b := t.newBatch() + b := t.newBatch(0) log.Info("Filling all data into freezer for backward compatablity", "name", t.name, "items", t.items, "number", number) for t.items < number { if err := b.Append(t.items, nil); err != nil { diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 0bddcf7211..5cbb140b04 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -99,7 +99,7 @@ func TestFreezerBasicsClosing(t *testing.T) { // In-between writes, the table is closed and re-opened. for x := 0; x < 255; x++ { data := getChunk(15, x) - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(uint64(x), data)) require.NoError(t, batch.commit()) f.Close() @@ -227,7 +227,7 @@ func TestFreezerRepairDanglingHeadLarge(t *testing.T) { t.Errorf("Expected error for missing index entry") } // We should now be able to store items again, from item = 1 - batch := f.newBatch() + batch := f.newBatch(0) for x := 1; x < 0xff; x++ { require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) } @@ -417,7 +417,7 @@ func TestFreezerRepairFirstFile(t *testing.T) { t.Fatal(err) } // Write 80 bytes, splitting out into two files - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(0, getChunk(40, 0xFF))) require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xEE))) require.NoError(t, batch.commit()) @@ -455,7 +455,7 @@ func TestFreezerRepairFirstFile(t *testing.T) { } // Write 40 bytes - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(1, getChunk(40, 0xDD))) require.NoError(t, batch.commit()) @@ -512,7 +512,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { f.truncateHead(0) // Write the data again - batch := f.newBatch() + batch := f.newBatch(0) for x := 0; x < 30; x++ { require.NoError(t, batch.AppendRaw(uint64(x), getChunk(15, ^x))) } @@ -534,7 +534,7 @@ func TestFreezerOffset(t *testing.T) { } // Write 6 x 20 bytes, splitting out into three files - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) @@ -598,7 +598,7 @@ func TestFreezerOffset(t *testing.T) { t.Log(f.dumpIndexString(0, 100)) // It should allow writing item 6. - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) require.NoError(t, batch.commit()) @@ -676,7 +676,7 @@ func TestTruncateTail(t *testing.T) { } // Write 7 x 20 bytes, splitting out into four files - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) @@ -791,7 +791,7 @@ func TestTruncateHead(t *testing.T) { } // Write 7 x 20 bytes, splitting out into four files - batch := f.newBatch() + batch := f.newBatch(0) require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) @@ -816,7 +816,7 @@ func TestTruncateHead(t *testing.T) { }) // Append new items - batch = f.newBatch() + batch = f.newBatch(0) require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x11))) @@ -880,7 +880,7 @@ func getChunk(size int, b int) []byte { func writeChunks(t *testing.T, ft *freezerTable, n int, length int) { t.Helper() - batch := ft.newBatch() + batch := ft.newBatch(0) for i := 0; i < n; i++ { if err := batch.AppendRaw(uint64(i), getChunk(length, i)); err != nil { t.Fatalf("AppendRaw(%d, ...) returned error: %v", i, err) @@ -1076,7 +1076,7 @@ func TestFreezerReadonly(t *testing.T) { // Case 5: Now write some data via a batch. // This should fail either during AppendRaw or Commit - batch := f.newBatch() + batch := f.newBatch(0) writeErr := batch.AppendRaw(32, make([]byte, 1)) if writeErr == nil { writeErr = batch.commit() @@ -1231,7 +1231,7 @@ func runRandTest(rt randTest) bool { } case opAppend: - batch := f.newBatch() + batch := f.newBatch(0) for i := 0; i < len(step.items); i++ { batch.AppendRaw(step.items[i], step.blobs[i]) } diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go index 3379736047..e8857042fb 100644 --- a/core/rawdb/freezer_test.go +++ b/core/rawdb/freezer_test.go @@ -268,12 +268,12 @@ func TestFreezerReadonlyValidate(t *testing.T) { t.Fatal("can't open freezer", err) } var item = make([]byte, 1024) - aBatch := f.tables["a"].newBatch() + aBatch := f.tables["a"].newBatch(0) require.NoError(t, aBatch.AppendRaw(0, item)) require.NoError(t, aBatch.AppendRaw(1, item)) require.NoError(t, aBatch.AppendRaw(2, item)) require.NoError(t, aBatch.commit()) - bBatch := f.tables["b"].newBatch() + bBatch := f.tables["b"].newBatch(0) require.NoError(t, bBatch.AppendRaw(0, item)) require.NoError(t, bBatch.commit()) if f.tables["a"].items != 3 {