diff --git a/state/chainsync.go b/state/chainsync.go index efff9f5..6a4ed88 100644 --- a/state/chainsync.go +++ b/state/chainsync.go @@ -110,6 +110,52 @@ func (ls *LedgerState) handleEventChainsyncBlockHeader(e ChainsyncEvent) error { } func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error { + ls.chainsyncBlockEvents = append( + ls.chainsyncBlockEvents, + e, + ) + return nil +} + +func (ls *LedgerState) processBlockEvents() error { + batchOffset := 0 + for { + batchSize := min( + 10, // Chosen to stay well under badger transaction size limit + len(ls.chainsyncBlockEvents)-batchOffset, + ) + if batchSize <= 0 { + break + } + // Start a transaction + txn := ls.db.Transaction(true) + err := txn.Do(func(txn *database.Txn) error { + for _, evt := range ls.chainsyncBlockEvents[batchOffset : batchOffset+batchSize] { + if err := ls.processBlockEvent(txn, evt); err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + batchOffset += batchSize + } + ls.chainsyncBlockEvents = nil + ls.config.Logger.Info( + fmt.Sprintf( + "chain extended, new tip: %x at slot %d", + ls.currentTip.Point.Hash, + ls.currentTip.Point.Slot, + ), + "component", + "ledger", + ) + return nil +} + +func (ls *LedgerState) processBlockEvent(txn *database.Txn, e BlockfetchEvent) error { tmpBlock := models.Block{ Slot: e.Point.Slot, Hash: e.Point.Hash, @@ -119,77 +165,8 @@ func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error { Type: e.Type, Cbor: e.Block.Cbor(), } - // Start a transaction - txn := ls.db.Transaction(true) - err := txn.Do(func(txn *database.Txn) error { - // Special handling for genesis block - if ls.currentEpoch.ID == 0 { - // Check for era change - if uint(e.Block.Era().Id) != ls.currentEra.Id { - targetEraId := uint(e.Block.Era().Id) - // Transition through every era between the current and the target era - for nextEraId := ls.currentEra.Id + 1; nextEraId <= targetEraId; nextEraId++ { - if err := ls.transitionToEra(txn, nextEraId, ls.currentEpoch.EpochId, e.Point.Slot); err != nil { - return err - } - } - } - // Create initial epoch record - epochSlotLength, epochLength, err := ls.currentEra.EpochLengthFunc(ls.config.CardanoNodeConfig) - if err != nil { - return err - } - newEpoch := models.Epoch{ - EpochId: 0, - EraId: ls.currentEra.Id, - StartSlot: 0, - SlotLength: epochSlotLength, - LengthInSlots: epochLength, - } - if result := txn.Metadata().Create(&newEpoch); result.Error != nil { - return result.Error - } - ls.currentEpoch = newEpoch - ls.config.Logger.Debug( - "added initial epoch to DB", - "epoch", fmt.Sprintf("%+v", newEpoch), - "component", "ledger", - ) - } - // Check for epoch rollover - if e.Point.Slot > ls.currentEpoch.StartSlot+uint64( - ls.currentEpoch.LengthInSlots, - ) { - // Apply pending pparam updates - if err := ls.applyPParamUpdates(txn, ls.currentEpoch.EpochId, e.Point.Slot); err != nil { - return err - } - // Create next epoch record - epochSlotLength, epochLength, err := ls.currentEra.EpochLengthFunc(ls.config.CardanoNodeConfig) - if err != nil { - return err - } - newEpoch := models.Epoch{ - EpochId: ls.currentEpoch.EpochId + 1, - EraId: uint(e.Block.Era().Id), - SlotLength: epochSlotLength, - LengthInSlots: epochLength, - StartSlot: ls.currentEpoch.StartSlot + uint64( - ls.currentEpoch.LengthInSlots, - ), - } - if result := txn.Metadata().Create(&newEpoch); result.Error != nil { - return result.Error - } - ls.currentEpoch = newEpoch - ls.metrics.epochNum.Set(float64(newEpoch.EpochId)) - ls.config.Logger.Debug( - "added next epoch to DB", - "epoch", fmt.Sprintf("%+v", newEpoch), - "component", "ledger", - ) - } - // TODO: track this using protocol params and hard forks + // Special handling for genesis block + if ls.currentEpoch.ID == 0 { // Check for era change if uint(e.Block.Era().Id) != ls.currentEra.Id { targetEraId := uint(e.Block.Era().Id) @@ -200,57 +177,118 @@ func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error { } } } - // Add block to database - if err := ls.addBlock(txn, tmpBlock); err != nil { - return fmt.Errorf("add block: %w", err) + // Create initial epoch record + epochSlotLength, epochLength, err := ls.currentEra.EpochLengthFunc(ls.config.CardanoNodeConfig) + if err != nil { + return err } - // Process transactions - for _, tx := range e.Block.Transactions() { - // Process consumed UTxOs - for _, consumed := range tx.Consumed() { - if err := ls.consumeUtxo(txn, consumed, e.Point.Slot); err != nil { - return fmt.Errorf("remove consumed UTxO: %w", err) - } + newEpoch := models.Epoch{ + EpochId: 0, + EraId: ls.currentEra.Id, + StartSlot: 0, + SlotLength: epochSlotLength, + LengthInSlots: epochLength, + } + if result := txn.Metadata().Create(&newEpoch); result.Error != nil { + return result.Error + } + ls.currentEpoch = newEpoch + ls.config.Logger.Debug( + "added initial epoch to DB", + "epoch", fmt.Sprintf("%+v", newEpoch), + "component", "ledger", + ) + } + // Check for epoch rollover + if e.Point.Slot > ls.currentEpoch.StartSlot+uint64( + ls.currentEpoch.LengthInSlots, + ) { + // Apply pending pparam updates + if err := ls.applyPParamUpdates(txn, ls.currentEpoch.EpochId, e.Point.Slot); err != nil { + return err + } + // Create next epoch record + epochSlotLength, epochLength, err := ls.currentEra.EpochLengthFunc(ls.config.CardanoNodeConfig) + if err != nil { + return err + } + newEpoch := models.Epoch{ + EpochId: ls.currentEpoch.EpochId + 1, + EraId: uint(e.Block.Era().Id), + SlotLength: epochSlotLength, + LengthInSlots: epochLength, + StartSlot: ls.currentEpoch.StartSlot + uint64( + ls.currentEpoch.LengthInSlots, + ), + } + if result := txn.Metadata().Create(&newEpoch); result.Error != nil { + return result.Error + } + ls.currentEpoch = newEpoch + ls.metrics.epochNum.Set(float64(newEpoch.EpochId)) + ls.config.Logger.Debug( + "added next epoch to DB", + "epoch", fmt.Sprintf("%+v", newEpoch), + "component", "ledger", + ) + } + // TODO: track this using protocol params and hard forks + // Check for era change + if uint(e.Block.Era().Id) != ls.currentEra.Id { + targetEraId := uint(e.Block.Era().Id) + // Transition through every era between the current and the target era + for nextEraId := ls.currentEra.Id + 1; nextEraId <= targetEraId; nextEraId++ { + if err := ls.transitionToEra(txn, nextEraId, ls.currentEpoch.EpochId, e.Point.Slot); err != nil { + return err } - // Process produced UTxOs - for _, produced := range tx.Produced() { - outAddr := produced.Output.Address() - tmpUtxo := models.Utxo{ - TxId: produced.Id.Id().Bytes(), - OutputIdx: produced.Id.Index(), - AddedSlot: e.Point.Slot, - PaymentKey: outAddr.PaymentKeyHash().Bytes(), - StakingKey: outAddr.StakeKeyHash().Bytes(), - Cbor: produced.Output.Cbor(), - } - if err := ls.addUtxo(txn, tmpUtxo); err != nil { - return fmt.Errorf("add produced UTxO: %w", err) - } + } + } + // Add block to database + if err := ls.addBlock(txn, tmpBlock); err != nil { + return fmt.Errorf("add block: %w", err) + } + // Process transactions + for _, tx := range e.Block.Transactions() { + // Process consumed UTxOs + for _, consumed := range tx.Consumed() { + if err := ls.consumeUtxo(txn, consumed, e.Point.Slot); err != nil { + return fmt.Errorf("remove consumed UTxO: %w", err) } - // XXX: generate event for each TX/UTxO? - // Protocol parameter updates - if updateEpoch, paramUpdates := tx.ProtocolParameterUpdates(); updateEpoch > 0 { - for genesisHash, update := range paramUpdates { - tmpUpdate := models.PParamUpdate{ - AddedSlot: e.Point.Slot, - Epoch: updateEpoch, - GenesisHash: genesisHash.Bytes(), - Cbor: update.Cbor(), - } - if result := txn.Metadata().Create(&tmpUpdate); result.Error != nil { - return result.Error - } - } + } + // Process produced UTxOs + for _, produced := range tx.Produced() { + outAddr := produced.Output.Address() + tmpUtxo := models.Utxo{ + TxId: produced.Id.Id().Bytes(), + OutputIdx: produced.Id.Index(), + AddedSlot: e.Point.Slot, + PaymentKey: outAddr.PaymentKeyHash().Bytes(), + StakingKey: outAddr.StakeKeyHash().Bytes(), + Cbor: produced.Output.Cbor(), } - // Certificates - if err := ls.processTransactionCertificates(txn, e.Point, tx); err != nil { - return err + if err := ls.addUtxo(txn, tmpUtxo); err != nil { + return fmt.Errorf("add produced UTxO: %w", err) } } - return nil - }) - if err != nil { - return err + // XXX: generate event for each TX/UTxO? + // Protocol parameter updates + if updateEpoch, paramUpdates := tx.ProtocolParameterUpdates(); updateEpoch > 0 { + for genesisHash, update := range paramUpdates { + tmpUpdate := models.PParamUpdate{ + AddedSlot: e.Point.Slot, + Epoch: updateEpoch, + GenesisHash: genesisHash.Bytes(), + Cbor: update.Cbor(), + } + if result := txn.Metadata().Create(&tmpUpdate); result.Error != nil { + return result.Error + } + } + } + // Certificates + if err := ls.processTransactionCertificates(txn, e.Point, tx); err != nil { + return err + } } // Generate event ls.config.EventBus.Publish( @@ -263,19 +301,15 @@ func (ls *LedgerState) handleEventBlockfetchBlock(e BlockfetchEvent) error { }, ), ) - ls.config.Logger.Info( - fmt.Sprintf( - "chain extended, new tip: %s at slot %d", - e.Block.Hash(), - e.Block.SlotNumber(), - ), - "component", - "ledger", - ) return nil } func (ls *LedgerState) handleEventBlockfetchBatchDone(e BlockfetchEvent) error { + // Process pending block events + if err := ls.processBlockEvents(); err != nil { + return err + } + // Check for pending block range request if !ls.chainsyncBlockfetchWaiting { ls.chainsyncBlockfetchBusy = false return nil diff --git a/state/state.go b/state/state.go index 55cda95..f23509b 100644 --- a/state/state.go +++ b/state/state.go @@ -67,6 +67,7 @@ type LedgerState struct { currentTip ochainsync.Tip metrics stateMetrics chainsyncHeaderPoints []ocommon.Point + chainsyncBlockEvents []BlockfetchEvent chainsyncBlockfetchBusy bool chainsyncBlockfetchWaiting bool }