Skip to content

Commit

Permalink
EVM-831 Implement state sync relayer without blocktracker (0xPolygon#…
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar authored Oct 17, 2023
1 parent 7433edf commit 4efe1d7
Show file tree
Hide file tree
Showing 23 changed files with 799 additions and 505 deletions.
26 changes: 10 additions & 16 deletions command/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ type Config struct {
JSONLogFormat bool `json:"json_log_format" yaml:"json_log_format"`
CorsAllowedOrigins []string `json:"cors_allowed_origins" yaml:"cors_allowed_origins"`

Relayer bool `json:"relayer" yaml:"relayer"`
NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"`
RelayerTrackerPollInterval time.Duration `json:"relayer_tracker_poll_interval" yaml:"relayer_tracker_poll_interval"`
Relayer bool `json:"relayer" yaml:"relayer"`
NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"`

ConcurrentRequestsDebug uint64 `json:"concurrent_requests_debug" yaml:"concurrent_requests_debug"`
WebSocketReadLimit uint64 `json:"web_socket_read_limit" yaml:"web_socket_read_limit"`
Expand Down Expand Up @@ -94,10 +93,6 @@ const (
// the connection sends a close message to the peer and returns ErrReadLimit to the application.
DefaultWebSocketReadLimit uint64 = 8192

// DefaultRelayerTrackerPollInterval specifies time interval after which relayer node's event tracker
// polls child chain to get the latest block
DefaultRelayerTrackerPollInterval time.Duration = time.Second

// DefaultMetricsInterval specifies the time interval after which Prometheus metrics will be generated.
// A value of 0 means the metrics are disabled.
DefaultMetricsInterval time.Duration = time.Second * 8
Expand Down Expand Up @@ -132,15 +127,14 @@ func DefaultConfig() *Config {
Headers: &Headers{
AccessControlAllowOrigins: []string{"*"},
},
LogFilePath: "",
JSONRPCBatchRequestLimit: DefaultJSONRPCBatchRequestLimit,
JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit,
Relayer: false,
NumBlockConfirmations: DefaultNumBlockConfirmations,
ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug,
WebSocketReadLimit: DefaultWebSocketReadLimit,
RelayerTrackerPollInterval: DefaultRelayerTrackerPollInterval,
MetricsInterval: DefaultMetricsInterval,
LogFilePath: "",
JSONRPCBatchRequestLimit: DefaultJSONRPCBatchRequestLimit,
JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit,
Relayer: false,
NumBlockConfirmations: DefaultNumBlockConfirmations,
ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug,
WebSocketReadLimit: DefaultWebSocketReadLimit,
MetricsInterval: DefaultMetricsInterval,
}
}

Expand Down
4 changes: 0 additions & 4 deletions command/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ func (p *serverParams) initRawParams() error {

p.relayer = p.rawConfig.Relayer

if p.relayer && p.rawConfig.RelayerTrackerPollInterval == 0 {
return helper.ErrBlockTrackerPollInterval
}

return p.initAddresses()
}

Expand Down
9 changes: 3 additions & 6 deletions command/server/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ const (
concurrentRequestsDebugFlag = "concurrent-requests-debug"
webSocketReadLimitFlag = "websocket-read-limit"

relayerTrackerPollIntervalFlag = "relayer-poll-interval"

metricsIntervalFlag = "metrics-interval"
)

Expand Down Expand Up @@ -187,9 +185,8 @@ func (p *serverParams) generateConfig() *server.Config {
JSONLogFormat: p.rawConfig.JSONLogFormat,
LogFilePath: p.logFileLocation,

Relayer: p.relayer,
NumBlockConfirmations: p.rawConfig.NumBlockConfirmations,
RelayerTrackerPollInterval: p.rawConfig.RelayerTrackerPollInterval,
MetricsInterval: p.rawConfig.MetricsInterval,
Relayer: p.relayer,
NumBlockConfirmations: p.rawConfig.NumBlockConfirmations,
MetricsInterval: p.rawConfig.MetricsInterval,
}
}
7 changes: 0 additions & 7 deletions command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,6 @@ func setFlags(cmd *cobra.Command) {
"maximum size in bytes for a message read from the peer by websocket",
)

cmd.Flags().DurationVar(
&params.rawConfig.RelayerTrackerPollInterval,
relayerTrackerPollIntervalFlag,
defaultConfig.RelayerTrackerPollInterval,
"interval (number of seconds) at which relayer's tracker polls for latest block at childchain",
)

cmd.Flags().DurationVar(
&params.rawConfig.MetricsInterval,
metricsIntervalFlag,
Expand Down
6 changes: 6 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ type Config struct {

// Path is the directory path for the consensus protocol to store information
Path string

// IsRelayer is true if node is relayer
IsRelayer bool

// RPCEndpoint
RPCEndpoint string
}

type Params struct {
Expand Down
42 changes: 42 additions & 0 deletions consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/0xPolygon/polygon-edge/consensus"
"github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi"
bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer"
"github.com/0xPolygon/polygon-edge/consensus/polybft/validator"
Expand Down Expand Up @@ -82,6 +83,7 @@ type runtimeConfig struct {
txPool txPoolInterface
bridgeTopic topic
numBlockConfirmations uint64
consensusConfig *consensus.Config
}

// consensusRuntime is a struct that provides consensus runtime features like epoch, state and event management
Expand Down Expand Up @@ -121,6 +123,9 @@ type consensusRuntime struct {

eventProvider *EventProvider

// stateSyncRelayer is relayer for commitment events
stateSyncRelayer StateSyncRelayer

// logger instance
logger hcf.Logger
}
Expand Down Expand Up @@ -160,6 +165,10 @@ func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) (*consensusRunti
return nil, err
}

if err := runtime.initStateSyncRelayer(log); err != nil {
return nil, err
}

// we need to call restart epoch on runtime to initialize epoch state
runtime.epoch, err = runtime.restartEpoch(runtime.lastBuiltBlock, dbTx)
if err != nil {
Expand All @@ -175,6 +184,7 @@ func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) (*consensusRunti

// close is used to tear down allocated resources
func (c *consensusRuntime) close() {
c.stateSyncRelayer.Close()
c.stateSyncManager.Close()
}

Expand Down Expand Up @@ -238,6 +248,33 @@ func (c *consensusRuntime) initCheckpointManager(logger hcf.Logger) error {
return nil
}

// initStateSyncRelayer initializes state sync relayer
// if not enabled, then a dummy state sync relayer will be used
func (c *consensusRuntime) initStateSyncRelayer(logger hcf.Logger) error {
if c.config.consensusConfig.IsRelayer {
txRelayer, err := getStateSyncTxRelayer(c.config.consensusConfig.RPCEndpoint, logger)
if err != nil {
return err
}

c.stateSyncRelayer = NewStateSyncRelayer(
txRelayer,
contracts.StateReceiverContract,
c.state.StateSyncStore,
c,
c.config.blockchain,
wallet.NewEcdsaSigner(c.config.Key),
nil,
logger.Named("state_sync_relayer"))
} else {
c.stateSyncRelayer = &dummyStateSyncRelayer{}
}

c.eventProvider.Subscribe(c.stateSyncRelayer)

return c.stateSyncRelayer.Init()
}

// initStakeManager initializes stake manager
func (c *consensusRuntime) initStakeManager(logger hcf.Logger, dbTx *bolt.Tx) error {
rootRelayer, err := txrelayer.NewTxRelayer(txrelayer.WithIPAddress(c.config.PolyBFTConfig.Bridge.JSONRPCEndpoint))
Expand Down Expand Up @@ -369,6 +406,11 @@ func (c *consensusRuntime) OnBlockInserted(fullBlock *types.FullBlock) {
return
}

// handle state sync relayer events that happened in block
if err := c.stateSyncRelayer.PostBlock(postBlock); err != nil {
c.logger.Error("post block callback failed in state sync relayer", "err", err)
}

if isEndOfEpoch {
if epoch, err = c.restartEpoch(fullBlock.Block.Header, dbTx); err != nil {
c.logger.Error("failed to restart epoch after block inserted", "error", err)
Expand Down
16 changes: 9 additions & 7 deletions consensus/polybft/consensus_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func TestConsensusRuntime_OnBlockInserted_EndOfEpoch(t *testing.T) {
checkpointManager: &dummyCheckpointManager{},
stakeManager: &dummyStakeManager{},
eventProvider: NewEventProvider(blockchainMock),
stateSyncRelayer: &dummyStateSyncRelayer{},
}
runtime.OnBlockInserted(&types.FullBlock{Block: builtBlock})

Expand Down Expand Up @@ -474,13 +475,14 @@ func Test_NewConsensusRuntime(t *testing.T) {

tmpDir := t.TempDir()
config := &runtimeConfig{
polybftBackend: polybftBackendMock,
State: newTestState(t),
PolyBFTConfig: polyBftConfig,
DataDir: tmpDir,
Key: createTestKey(t),
blockchain: blockchainMock,
bridgeTopic: &mockTopic{},
polybftBackend: polybftBackendMock,
State: newTestState(t),
PolyBFTConfig: polyBftConfig,
DataDir: tmpDir,
Key: createTestKey(t),
blockchain: blockchainMock,
bridgeTopic: &mockTopic{},
consensusConfig: &consensus.Config{},
}

require.NoError(t, config.State.StakeStore.insertFullValidatorSet(validatorSetState{
Expand Down
1 change: 1 addition & 0 deletions consensus/polybft/contractsapi/bindings-gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func main() {
[]string{
"commit",
"execute",
"batchExecute",
},
[]string{
"StateSyncResult",
Expand Down
17 changes: 17 additions & 0 deletions consensus/polybft/contractsapi/contractsapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ func (p *Polybft) initRuntime() error {
txPool: p.txPool,
bridgeTopic: p.bridgeTopic,
numBlockConfirmations: p.config.NumBlockConfirmations,
consensusConfig: p.config.Config,
}

runtime, err := newConsensusRuntime(p.logger, runtimeConfig)
Expand Down
5 changes: 4 additions & 1 deletion consensus/polybft/polybft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ func TestPolybft_Close(t *testing.T) {
polybft := Polybft{
closeCh: make(chan struct{}),
syncer: syncer,
runtime: &consensusRuntime{stateSyncManager: &dummyStateSyncManager{}},
runtime: &consensusRuntime{
stateSyncManager: &dummyStateSyncManager{},
stateSyncRelayer: &dummyStateSyncRelayer{},
},
}

assert.NoError(t, polybft.Close())
Expand Down
75 changes: 75 additions & 0 deletions consensus/polybft/state_store_state_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var (
stateSyncProofsBucket = []byte("stateSyncProofs")
// bucket to store message votes (signatures)
messageVotesBucket = []byte("votes")
// bucket to store all state sync relayer events
stateSyncRelayerEventsBucket = []byte("relayerEvents")

// errNotEnoughStateSyncs error message
errNotEnoughStateSyncs = errors.New("there is either a gap or not enough sync events")
Expand All @@ -39,6 +41,9 @@ commitments/
stateSyncProofs/
|--> stateSyncProof.StateSync.Id -> *StateSyncProof (json marshalled)
relayerEvents/
|--> StateSyncRelayerEventData.EventID -> *StateSyncRelayerEventData (json marshalled)
*/

type StateSyncStore struct {
Expand All @@ -59,6 +64,10 @@ func (s *StateSyncStore) initialize(tx *bolt.Tx) error {
return fmt.Errorf("failed to create bucket=%s: %w", string(stateSyncProofsBucket), err)
}

if _, err := tx.CreateBucketIfNotExists(stateSyncRelayerEventsBucket); err != nil {
return fmt.Errorf("failed to create bucket=%s: %w", string(stateSyncRelayerEventsBucket), err)
}

return nil
}

Expand Down Expand Up @@ -365,3 +374,69 @@ func (s *StateSyncStore) getStateSyncProof(stateSyncID uint64) (*StateSyncProof,

return ssp, err
}

// updateStateSyncRelayerEvents updates/remove desired events
func (s *StateSyncStore) updateStateSyncRelayerEvents(
events []*StateSyncRelayerEventData, removeIDs []uint64, dbTx *bolt.Tx) error {
updateFn := func(tx *bolt.Tx) error {
relayerEventsBucket := tx.Bucket(stateSyncRelayerEventsBucket)

for _, evnt := range events {
raw, err := json.Marshal(evnt)
if err != nil {
return err
}

key := common.EncodeUint64ToBytes(evnt.EventID)

if err := relayerEventsBucket.Put(key, raw); err != nil {
return err
}
}

for _, stateSyncEventID := range removeIDs {
stateSyncEventIDKey := common.EncodeUint64ToBytes(stateSyncEventID)

if err := relayerEventsBucket.Delete(stateSyncEventIDKey); err != nil {
return fmt.Errorf("failed to remove state sync relayer event (ID=%d): %w", stateSyncEventID, err)
}
}

return nil
}

if dbTx == nil {
return s.db.Update(func(tx *bolt.Tx) error {
return updateFn(tx)
})
}

return updateFn(dbTx)
}

// getAllAvailableEvents retrieves all StateSyncRelayerEventData that should be sent as a transactions
func (s *StateSyncStore) getAllAvailableEvents(limit int) (result []*StateSyncRelayerEventData, err error) {
if err = s.db.View(func(tx *bolt.Tx) error {
cursor := tx.Bucket(stateSyncRelayerEventsBucket).Cursor()

for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var event *StateSyncRelayerEventData

if err := json.Unmarshal(v, &event); err != nil {
return err
}

result = append(result, event)

if limit > 0 && len(result) >= limit {
break
}
}

return nil
}); err != nil {
return nil, err
}

return result, nil
}
Loading

0 comments on commit 4efe1d7

Please sign in to comment.