Skip to content

Commit

Permalink
Move block/header exchange to block package (cosmos#1224)
Browse files Browse the repository at this point in the history
<!--
Please read and fill out this form before submitting your PR.

Please make sure you have reviewed our contributors guide before
submitting your
first PR.
-->

## Overview
Closes: cosmos#1225 

<!-- 
Please provide an explanation of the PR, including the appropriate
context,
background, goal, and rationale. If there is an issue with this
information,
please provide a tl;dr and link the issue. 
-->

## Checklist

<!-- 
Please complete the checklist to ensure that the PR is ready to be
reviewed.

IMPORTANT:
PRs should be left in Draft until the below checklist is completed.
-->

- [x] New and updated code has appropriate documentation
- [ ] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [ ] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords
  • Loading branch information
Manav-Aggarwal authored Oct 3, 2023
1 parent 70603f1 commit e3f2b40
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 18 deletions.
9 changes: 7 additions & 2 deletions node/block_exchange.go → block/block_exchange.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package node
package block

import (
"context"
Expand Down Expand Up @@ -72,6 +72,11 @@ func NewBlockExchangeService(ctx context.Context, store ds.TxnDatastore, conf co
}, nil
}

// BlockStore returns the blockstore of the BlockExchangeService
func (bExService *BlockExchangeService) BlockStore() *goheaderstore.Store[*types.Block] {
return bExService.blockStore
}

func (bExService *BlockExchangeService) initBlockStoreAndStartSyncer(ctx context.Context, initial *types.Block) error {
if initial == nil {
return fmt.Errorf("failed to initialize the blockstore and start syncer")
Expand All @@ -87,7 +92,7 @@ func (bExService *BlockExchangeService) initBlockStoreAndStartSyncer(ctx context

// Initialize block store if needed and broadcasts provided block.
// Note: Only returns an error in case block store can't be initialized. Logs error if there's one while broadcasting.
func (bExService *BlockExchangeService) writeToBlockStoreAndBroadcast(ctx context.Context, block *types.Block) error {
func (bExService *BlockExchangeService) WriteToBlockStoreAndBroadcast(ctx context.Context, block *types.Block) error {
// For genesis block initialize the store and start the syncer
if int64(block.Height()) == bExService.genesis.InitialHeight {
if err := bExService.blockStore.Init(ctx, block); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion node/exchange.go → block/exchange.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package node
package block

import (
"sync"
Expand Down
9 changes: 7 additions & 2 deletions node/header_exchange.go → block/header_exchange.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package node
package block

import (
"context"
Expand Down Expand Up @@ -72,6 +72,11 @@ func NewHeaderExchangeService(ctx context.Context, store ds.TxnDatastore, conf c
}, nil
}

// HeaderStore returns the headerstore of the HeaderExchangeService
func (hExService *HeaderExchangeService) HeaderStore() *goheaderstore.Store[*types.SignedHeader] {
return hExService.headerStore
}

func (hExService *HeaderExchangeService) initHeaderStoreAndStartSyncer(ctx context.Context, initial *types.SignedHeader) error {
if initial == nil {
return fmt.Errorf("failed to initialize the headerstore and start syncer")
Expand All @@ -87,7 +92,7 @@ func (hExService *HeaderExchangeService) initHeaderStoreAndStartSyncer(ctx conte

// Initialize header store if needed and broadcasts provided header.
// Note: Only returns an error in case header store can't be initialized. Logs error if there's one while broadcasting.
func (hExService *HeaderExchangeService) writeToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error {
func (hExService *HeaderExchangeService) WriteToHeaderStoreAndBroadcast(ctx context.Context, signedHeader *types.SignedHeader) error {
// For genesis header initialize the store and start the syncer
if int64(signedHeader.Height()) == hExService.genesis.InitialHeight {
if err := hExService.headerStore.Init(ctx, signedHeader); err != nil {
Expand Down
14 changes: 7 additions & 7 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ type FullNode struct {
BlockIndexer indexer.BlockIndexer
IndexerService *txindex.IndexerService

hExService *HeaderExchangeService
bExService *BlockExchangeService
hExService *block.HeaderExchangeService
bExService *block.BlockExchangeService

// keep context here only because of API compatibility
// - it's used in `OnStart` (defined in service.Service interface)
Expand Down Expand Up @@ -151,18 +151,18 @@ func newFullNode(
mpIDs := newMempoolIDs()
mp.EnableTxsAvailable()

headerExchangeService, err := NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
headerExchangeService, err := block.NewHeaderExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
}

blockExchangeService, err := NewBlockExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "BlockExchangeService"))
blockExchangeService, err := block.NewBlockExchangeService(ctx, mainKV, conf, genesis, client, logger.With("module", "BlockExchangeService"))
if err != nil {
return nil, fmt.Errorf("BlockExchangeService initialization error: %w", err)
}

doneBuildingChannel := make(chan struct{})
blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel, blockExchangeService.blockStore)
blockManager, err := block.NewManager(signingKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), doneBuildingChannel, blockExchangeService.BlockStore())
if err != nil {
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (n *FullNode) headerPublishLoop(ctx context.Context) {
for {
select {
case signedHeader := <-n.blockManager.HeaderCh:
err := n.hExService.writeToHeaderStoreAndBroadcast(ctx, signedHeader)
err := n.hExService.WriteToHeaderStoreAndBroadcast(ctx, signedHeader)
if err != nil {
// failed to init or start headerstore
n.Logger.Error(err.Error())
Expand All @@ -247,7 +247,7 @@ func (n *FullNode) blockPublishLoop(ctx context.Context) {
for {
select {
case block := <-n.blockManager.BlockCh:
err := n.bExService.writeToBlockStoreAndBroadcast(ctx, block)
err := n.bExService.WriteToBlockStoreAndBroadcast(ctx, block)
if err != nil {
// failed to init or start blockstore
n.Logger.Error(err.Error())
Expand Down
2 changes: 1 addition & 1 deletion node/full_node_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func testSingleAggregatorSingleFullNodeTrustedHash(t *testing.T, source Source)
require.NoError(waitForFirstBlock(node1, source))

// Get the trusted hash from node1 and pass it to node2 config
trustedHash, err := node1.hExService.headerStore.GetByHeight(aggCtx, 1)
trustedHash, err := node1.hExService.HeaderStore().GetByHeight(aggCtx, 1)
require.NoError(err)
node2.conf.TrustedHash = trustedHash.Hash().String()
require.NoError(node2.Start())
Expand Down
5 changes: 3 additions & 2 deletions node/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"go.uber.org/multierr"

"github.com/rollkit/rollkit/block"
"github.com/rollkit/rollkit/config"
"github.com/rollkit/rollkit/p2p"
"github.com/rollkit/rollkit/store"
Expand All @@ -27,7 +28,7 @@ type LightNode struct {

proxyApp proxy.AppConns

hExService *HeaderExchangeService
hExService *block.HeaderExchangeService

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -61,7 +62,7 @@ func newLightNode(
return nil, err
}

headerExchangeService, err := NewHeaderExchangeService(ctx, datastore, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
headerExchangeService, err := block.NewHeaderExchangeService(ctx, datastore, conf, genesis, client, logger.With("module", "HeaderExchangeService"))
if err != nil {
return nil, fmt.Errorf("HeaderExchangeService initialization error: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions node/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,17 @@ func getNodeHeight(node Node, source Source) (uint64, error) {

func getNodeHeightFromHeader(node Node) (uint64, error) {
if fn, ok := node.(*FullNode); ok {
return fn.hExService.headerStore.Height(), nil
return fn.hExService.HeaderStore().Height(), nil
}
if ln, ok := node.(*LightNode); ok {
return ln.hExService.headerStore.Height(), nil
return ln.hExService.HeaderStore().Height(), nil
}
return 0, errors.New("not a full or light node")
}

func getNodeHeightFromBlock(node Node) (uint64, error) {
if fn, ok := node.(*FullNode); ok {
return fn.bExService.blockStore.Height(), nil
return fn.bExService.BlockStore().Height(), nil
}
return 0, errors.New("not a full node")
}
Expand Down

0 comments on commit e3f2b40

Please sign in to comment.