Skip to content

Commit

Permalink
feat: batch bulk block writes to database (#236)
Browse files Browse the repository at this point in the history
* delay process of blocks in each blockfetch batch until batch is done
* write cached blocks in batches of 10
  • Loading branch information
agaffney authored Nov 15, 2024
1 parent e58dae0 commit 09a0aea
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 125 deletions.
284 changes: 159 additions & 125 deletions state/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,52 @@ func (ls *LedgerState) handleEventChainsyncBlockHeader(e ChainsyncEvent) error {
}

func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error {
ls.chainsyncBlockEvents = append(
ls.chainsyncBlockEvents,
e,
)
return nil
}

func (ls *LedgerState) processBlockEvents() error {
batchOffset := 0
for {
batchSize := min(
10, // Chosen to stay well under badger transaction size limit
len(ls.chainsyncBlockEvents)-batchOffset,
)
if batchSize <= 0 {
break
}
// Start a transaction
txn := ls.db.Transaction(true)
err := txn.Do(func(txn *database.Txn) error {
for _, evt := range ls.chainsyncBlockEvents[batchOffset : batchOffset+batchSize] {
if err := ls.processBlockEvent(txn, evt); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
batchOffset += batchSize
}
ls.chainsyncBlockEvents = nil
ls.config.Logger.Info(
fmt.Sprintf(
"chain extended, new tip: %x at slot %d",
ls.currentTip.Point.Hash,
ls.currentTip.Point.Slot,
),
"component",
"ledger",
)
return nil
}

func (ls *LedgerState) processBlockEvent(txn *database.Txn, e BlockfetchEvent) error {
tmpBlock := models.Block{
Slot: e.Point.Slot,
Hash: e.Point.Hash,
Expand All @@ -119,77 +165,8 @@ func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error {
Type: e.Type,
Cbor: e.Block.Cbor(),
}
// Start a transaction
txn := ls.db.Transaction(true)
err := txn.Do(func(txn *database.Txn) error {
// Special handling for genesis block
if ls.currentEpoch.ID == 0 {
// Check for era change
if uint(e.Block.Era().Id) != ls.currentEra.Id {
targetEraId := uint(e.Block.Era().Id)
// Transition through every era between the current and the target era
for nextEraId := ls.currentEra.Id + 1; nextEraId <= targetEraId; nextEraId++ {
if err := ls.transitionToEra(txn, nextEraId, ls.currentEpoch.EpochId, e.Point.Slot); err != nil {
return err
}
}
}
// Create initial epoch record
epochSlotLength, epochLength, err := ls.currentEra.EpochLengthFunc(ls.config.CardanoNodeConfig)
if err != nil {
return err
}
newEpoch := models.Epoch{
EpochId: 0,
EraId: ls.currentEra.Id,
StartSlot: 0,
SlotLength: epochSlotLength,
LengthInSlots: epochLength,
}
if result := txn.Metadata().Create(&newEpoch); result.Error != nil {
return result.Error
}
ls.currentEpoch = newEpoch
ls.config.Logger.Debug(
"added initial epoch to DB",
"epoch", fmt.Sprintf("%+v", newEpoch),
"component", "ledger",
)
}
// Check for epoch rollover
if e.Point.Slot > ls.currentEpoch.StartSlot+uint64(
ls.currentEpoch.LengthInSlots,
) {
// Apply pending pparam updates
if err := ls.applyPParamUpdates(txn, ls.currentEpoch.EpochId, e.Point.Slot); err != nil {
return err
}
// Create next epoch record
epochSlotLength, epochLength, err := ls.currentEra.EpochLengthFunc(ls.config.CardanoNodeConfig)
if err != nil {
return err
}
newEpoch := models.Epoch{
EpochId: ls.currentEpoch.EpochId + 1,
EraId: uint(e.Block.Era().Id),
SlotLength: epochSlotLength,
LengthInSlots: epochLength,
StartSlot: ls.currentEpoch.StartSlot + uint64(
ls.currentEpoch.LengthInSlots,
),
}
if result := txn.Metadata().Create(&newEpoch); result.Error != nil {
return result.Error
}
ls.currentEpoch = newEpoch
ls.metrics.epochNum.Set(float64(newEpoch.EpochId))
ls.config.Logger.Debug(
"added next epoch to DB",
"epoch", fmt.Sprintf("%+v", newEpoch),
"component", "ledger",
)
}
// TODO: track this using protocol params and hard forks
// Special handling for genesis block
if ls.currentEpoch.ID == 0 {
// Check for era change
if uint(e.Block.Era().Id) != ls.currentEra.Id {
targetEraId := uint(e.Block.Era().Id)
Expand All @@ -200,57 +177,118 @@ func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error {
}
}
}
// Add block to database
if err := ls.addBlock(txn, tmpBlock); err != nil {
return fmt.Errorf("add block: %w", err)
// Create initial epoch record
epochSlotLength, epochLength, err := ls.currentEra.EpochLengthFunc(ls.config.CardanoNodeConfig)
if err != nil {
return err
}
// Process transactions
for _, tx := range e.Block.Transactions() {
// Process consumed UTxOs
for _, consumed := range tx.Consumed() {
if err := ls.consumeUtxo(txn, consumed, e.Point.Slot); err != nil {
return fmt.Errorf("remove consumed UTxO: %w", err)
}
newEpoch := models.Epoch{
EpochId: 0,
EraId: ls.currentEra.Id,
StartSlot: 0,
SlotLength: epochSlotLength,
LengthInSlots: epochLength,
}
if result := txn.Metadata().Create(&newEpoch); result.Error != nil {
return result.Error
}
ls.currentEpoch = newEpoch
ls.config.Logger.Debug(
"added initial epoch to DB",
"epoch", fmt.Sprintf("%+v", newEpoch),
"component", "ledger",
)
}
// Check for epoch rollover
if e.Point.Slot > ls.currentEpoch.StartSlot+uint64(
ls.currentEpoch.LengthInSlots,
) {
// Apply pending pparam updates
if err := ls.applyPParamUpdates(txn, ls.currentEpoch.EpochId, e.Point.Slot); err != nil {
return err
}
// Create next epoch record
epochSlotLength, epochLength, err := ls.currentEra.EpochLengthFunc(ls.config.CardanoNodeConfig)
if err != nil {
return err
}
newEpoch := models.Epoch{
EpochId: ls.currentEpoch.EpochId + 1,
EraId: uint(e.Block.Era().Id),
SlotLength: epochSlotLength,
LengthInSlots: epochLength,
StartSlot: ls.currentEpoch.StartSlot + uint64(
ls.currentEpoch.LengthInSlots,
),
}
if result := txn.Metadata().Create(&newEpoch); result.Error != nil {
return result.Error
}
ls.currentEpoch = newEpoch
ls.metrics.epochNum.Set(float64(newEpoch.EpochId))
ls.config.Logger.Debug(
"added next epoch to DB",
"epoch", fmt.Sprintf("%+v", newEpoch),
"component", "ledger",
)
}
// TODO: track this using protocol params and hard forks
// Check for era change
if uint(e.Block.Era().Id) != ls.currentEra.Id {
targetEraId := uint(e.Block.Era().Id)
// Transition through every era between the current and the target era
for nextEraId := ls.currentEra.Id + 1; nextEraId <= targetEraId; nextEraId++ {
if err := ls.transitionToEra(txn, nextEraId, ls.currentEpoch.EpochId, e.Point.Slot); err != nil {
return err
}
// Process produced UTxOs
for _, produced := range tx.Produced() {
outAddr := produced.Output.Address()
tmpUtxo := models.Utxo{
TxId: produced.Id.Id().Bytes(),
OutputIdx: produced.Id.Index(),
AddedSlot: e.Point.Slot,
PaymentKey: outAddr.PaymentKeyHash().Bytes(),
StakingKey: outAddr.StakeKeyHash().Bytes(),
Cbor: produced.Output.Cbor(),
}
if err := ls.addUtxo(txn, tmpUtxo); err != nil {
return fmt.Errorf("add produced UTxO: %w", err)
}
}
}
// Add block to database
if err := ls.addBlock(txn, tmpBlock); err != nil {
return fmt.Errorf("add block: %w", err)
}
// Process transactions
for _, tx := range e.Block.Transactions() {
// Process consumed UTxOs
for _, consumed := range tx.Consumed() {
if err := ls.consumeUtxo(txn, consumed, e.Point.Slot); err != nil {
return fmt.Errorf("remove consumed UTxO: %w", err)
}
// XXX: generate event for each TX/UTxO?
// Protocol parameter updates
if updateEpoch, paramUpdates := tx.ProtocolParameterUpdates(); updateEpoch > 0 {
for genesisHash, update := range paramUpdates {
tmpUpdate := models.PParamUpdate{
AddedSlot: e.Point.Slot,
Epoch: updateEpoch,
GenesisHash: genesisHash.Bytes(),
Cbor: update.Cbor(),
}
if result := txn.Metadata().Create(&tmpUpdate); result.Error != nil {
return result.Error
}
}
}
// Process produced UTxOs
for _, produced := range tx.Produced() {
outAddr := produced.Output.Address()
tmpUtxo := models.Utxo{
TxId: produced.Id.Id().Bytes(),
OutputIdx: produced.Id.Index(),
AddedSlot: e.Point.Slot,
PaymentKey: outAddr.PaymentKeyHash().Bytes(),
StakingKey: outAddr.StakeKeyHash().Bytes(),
Cbor: produced.Output.Cbor(),
}
// Certificates
if err := ls.processTransactionCertificates(txn, e.Point, tx); err != nil {
return err
if err := ls.addUtxo(txn, tmpUtxo); err != nil {
return fmt.Errorf("add produced UTxO: %w", err)
}
}
return nil
})
if err != nil {
return err
// XXX: generate event for each TX/UTxO?
// Protocol parameter updates
if updateEpoch, paramUpdates := tx.ProtocolParameterUpdates(); updateEpoch > 0 {
for genesisHash, update := range paramUpdates {
tmpUpdate := models.PParamUpdate{
AddedSlot: e.Point.Slot,
Epoch: updateEpoch,
GenesisHash: genesisHash.Bytes(),
Cbor: update.Cbor(),
}
if result := txn.Metadata().Create(&tmpUpdate); result.Error != nil {
return result.Error
}
}
}
// Certificates
if err := ls.processTransactionCertificates(txn, e.Point, tx); err != nil {
return err
}
}
// Generate event
ls.config.EventBus.Publish(
Expand All @@ -263,19 +301,15 @@ func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error {
},
),
)
ls.config.Logger.Info(
fmt.Sprintf(
"chain extended, new tip: %s at slot %d",
e.Block.Hash(),
e.Block.SlotNumber(),
),
"component",
"ledger",
)
return nil
}

func (ls *LedgerState) handleEventBlockfetchBatchDone(e BlockfetchEvent) error {
// Process pending block events
if err := ls.processBlockEvents(); err != nil {
return err
}
// Check for pending block range request
if !ls.chainsyncBlockfetchWaiting {
ls.chainsyncBlockfetchBusy = false
return nil
Expand Down
1 change: 1 addition & 0 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type LedgerState struct {
currentTip ochainsync.Tip
metrics stateMetrics
chainsyncHeaderPoints []ocommon.Point
chainsyncBlockEvents []BlockfetchEvent
chainsyncBlockfetchBusy bool
chainsyncBlockfetchWaiting bool
}
Expand Down

0 comments on commit 09a0aea

Please sign in to comment.