From 55bf6e515e29522e5a8770d2583743bcc6276bc3 Mon Sep 17 00:00:00 2001 From: simlecode <69969590+simlecode@users.noreply.github.com> Date: Tue, 22 Aug 2023 10:52:58 +0800 Subject: [PATCH] feat: api: add SyncIncomingBlocks --- app/submodule/syncer/syncer_api.go | 4 ++ pkg/chainsync/chainsync.go | 1 + pkg/chainsync/dispatcher/dispatcher.go | 42 +++++++++++++ venus-shared/api/chain/v0/method.md | 63 +++++++++++++++++++ .../api/chain/v0/mock/mock_fullnode.go | 15 +++++ venus-shared/api/chain/v0/proxy_gen.go | 4 ++ venus-shared/api/chain/v0/syncer.go | 3 + venus-shared/api/chain/v1/method.md | 63 +++++++++++++++++++ .../api/chain/v1/mock/mock_fullnode.go | 15 +++++ venus-shared/api/chain/v1/proxy_gen.go | 4 ++ venus-shared/api/chain/v1/syncer.go | 3 + venus-shared/compatible-checks/api-diff.txt | 2 - 12 files changed, 217 insertions(+), 2 deletions(-) diff --git a/app/submodule/syncer/syncer_api.go b/app/submodule/syncer/syncer_api.go index 559ea48785..b23209345f 100644 --- a/app/submodule/syncer/syncer_api.go +++ b/app/submodule/syncer/syncer_api.go @@ -207,3 +207,7 @@ func (sa *syncerAPI) SyncState(ctx context.Context) (*types.SyncState, error) { return syncState, nil } + +func (sa *syncerAPI) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { + return sa.syncer.ChainSyncManager.BlockProposer().IncomingBlocks(ctx) +} diff --git a/pkg/chainsync/chainsync.go b/pkg/chainsync/chainsync.go index 4c4c17c050..3d55f30e00 100644 --- a/pkg/chainsync/chainsync.go +++ b/pkg/chainsync/chainsync.go @@ -25,6 +25,7 @@ type BlockProposer interface { SendHello(ci *types2.ChainInfo) error SendOwnBlock(ci *types2.ChainInfo) error SendGossipBlock(ci *types2.ChainInfo) error + IncomingBlocks(ctx context.Context) (<-chan *types2.BlockHeader, error) } var _ = (BlockProposer)((*dispatcher.Dispatcher)(nil)) diff --git a/pkg/chainsync/dispatcher/dispatcher.go b/pkg/chainsync/dispatcher/dispatcher.go index 1776f61e92..b1eb34ef76 100644 --- a/pkg/chainsync/dispatcher/dispatcher.go +++ b/pkg/chainsync/dispatcher/dispatcher.go @@ -8,6 +8,7 @@ import ( atmoic2 "sync/atomic" "time" + "github.com/filecoin-project/pubsub" "github.com/filecoin-project/venus/pkg/chainsync/types" types2 "github.com/filecoin-project/venus/venus-shared/types" "github.com/streadway/handy/atomic" @@ -23,6 +24,8 @@ const DefaultInQueueSize = 5 // DefaultWorkQueueSize is the bucketSize of the work queue const DefaultWorkQueueSize = 15 +const LocalIncoming = "incoming" + // dispatchSyncer is the interface of the logic syncing incoming chains type dispatchSyncer interface { Head() *types2.TipSet @@ -44,6 +47,7 @@ func NewDispatcherWithSizes(syncer dispatchSyncer, workQueueSize, inQueueSize in registeredCb: func(t *types.Target, err error) {}, cancelControler: list.New(), maxCount: 1, + incomingPubsub: pubsub.New(50), } } @@ -84,6 +88,8 @@ type Dispatcher struct { lk sync.Mutex conCurrent atomic.Int maxCount int64 + + incomingPubsub *pubsub.PubSub } // SyncTracker returns the target tracker of syncing @@ -107,6 +113,7 @@ func (d *Dispatcher) SendGossipBlock(ci *types2.ChainInfo) error { } func (d *Dispatcher) addTracker(ci *types2.ChainInfo) error { + d.incomingPubsub.Pub(ci.Head.Blocks(), LocalIncoming) d.incoming <- &types.Target{ ChainInfo: *ci, Base: d.syncer.Head(), @@ -176,6 +183,41 @@ func (d *Dispatcher) Concurrent() int64 { return d.maxCount } +func (d *Dispatcher) IncomingBlocks(ctx context.Context) (<-chan *types2.BlockHeader, error) { + sub := d.incomingPubsub.Sub(LocalIncoming) + out := make(chan *types2.BlockHeader, 32) + + go func() { + defer func() { + close(out) + + d.incomingPubsub.Unsub(sub) + }() + + for { + select { + case val, ok := <-sub: + if !ok { + return + } + for _, blk := range val.([]*types2.BlockHeader) { + select { + case out <- blk: + case <-ctx.Done(): + return + default: + log.Infof("incoming blocks subscription due to slow reader") + } + } + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + func (d *Dispatcher) selectTarget(lastTarget *types.Target, ch <-chan struct{}) (*types.Target, bool) { exitFor: for { // we are purpose to consume all notifies in channel diff --git a/venus-shared/api/chain/v0/method.md b/venus-shared/api/chain/v0/method.md index 4aba6edc9b..ca3d53e165 100644 --- a/venus-shared/api/chain/v0/method.md +++ b/venus-shared/api/chain/v0/method.md @@ -178,6 +178,7 @@ curl http://:/rpc/v0 -X POST -H "Content-Type: application/json" -H " * [ChainTipSetWeight](#chaintipsetweight) * [Concurrent](#concurrent) * [SetConcurrent](#setconcurrent) + * [SyncIncomingBlocks](#syncincomingblocks) * [SyncState](#syncstate) * [SyncSubmitBlock](#syncsubmitblock) * [SyncerTracker](#syncertracker) @@ -5633,6 +5634,68 @@ Inputs: Response: `{}` +### SyncIncomingBlocks +SyncIncomingBlocks returns a channel streaming incoming, potentially not +yet synced block headers. + + +Perms: read + +Inputs: `[]` + +Response: +```json +{ + "Miner": "f01234", + "Ticket": { + "VRFProof": "Bw==" + }, + "ElectionProof": { + "WinCount": 9, + "VRFProof": "Bw==" + }, + "BeaconEntries": [ + { + "Round": 42, + "Data": "Ynl0ZSBhcnJheQ==" + } + ], + "WinPoStProof": [ + { + "PoStProof": 8, + "ProofBytes": "Ynl0ZSBhcnJheQ==" + } + ], + "Parents": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } + ], + "ParentWeight": "0", + "Height": 10101, + "ParentStateRoot": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "ParentMessageReceipts": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "Messages": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "BLSAggregate": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "Timestamp": 42, + "BlockSig": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "ForkSignaling": 42, + "ParentBaseFee": "0" +} +``` + ### SyncState diff --git a/venus-shared/api/chain/v0/mock/mock_fullnode.go b/venus-shared/api/chain/v0/mock/mock_fullnode.go index 7d150eb7d7..cd5fe6076c 100644 --- a/venus-shared/api/chain/v0/mock/mock_fullnode.go +++ b/venus-shared/api/chain/v0/mock/mock_fullnode.go @@ -2433,6 +2433,21 @@ func (mr *MockFullNodeMockRecorder) StateWaitMsgLimited(arg0, arg1, arg2, arg3 i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsgLimited", reflect.TypeOf((*MockFullNode)(nil).StateWaitMsgLimited), arg0, arg1, arg2, arg3) } +// SyncIncomingBlocks mocks base method. +func (m *MockFullNode) SyncIncomingBlocks(arg0 context.Context) (<-chan *types0.BlockHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncIncomingBlocks", arg0) + ret0, _ := ret[0].(<-chan *types0.BlockHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SyncIncomingBlocks indicates an expected call of SyncIncomingBlocks. +func (mr *MockFullNodeMockRecorder) SyncIncomingBlocks(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncIncomingBlocks", reflect.TypeOf((*MockFullNode)(nil).SyncIncomingBlocks), arg0) +} + // SyncState mocks base method. func (m *MockFullNode) SyncState(arg0 context.Context) (*types0.SyncState, error) { m.ctrl.T.Helper() diff --git a/venus-shared/api/chain/v0/proxy_gen.go b/venus-shared/api/chain/v0/proxy_gen.go index ff50b889cb..f295109c4d 100644 --- a/venus-shared/api/chain/v0/proxy_gen.go +++ b/venus-shared/api/chain/v0/proxy_gen.go @@ -713,6 +713,7 @@ type ISyncerStruct struct { ChainTipSetWeight func(ctx context.Context, tsk types.TipSetKey) (big.Int, error) `perm:"read"` Concurrent func(ctx context.Context) int64 `perm:"read"` SetConcurrent func(ctx context.Context, concurrent int64) error `perm:"admin"` + SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"` SyncState func(ctx context.Context) (*types.SyncState, error) `perm:"read"` SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` SyncerTracker func(ctx context.Context) *types.TargetTracker `perm:"read"` @@ -729,6 +730,9 @@ func (s *ISyncerStruct) Concurrent(p0 context.Context) int64 { return s.Internal func (s *ISyncerStruct) SetConcurrent(p0 context.Context, p1 int64) error { return s.Internal.SetConcurrent(p0, p1) } +func (s *ISyncerStruct) SyncIncomingBlocks(p0 context.Context) (<-chan *types.BlockHeader, error) { + return s.Internal.SyncIncomingBlocks(p0) +} func (s *ISyncerStruct) SyncState(p0 context.Context) (*types.SyncState, error) { return s.Internal.SyncState(p0) } diff --git a/venus-shared/api/chain/v0/syncer.go b/venus-shared/api/chain/v0/syncer.go index beaa649387..d3a97ee14a 100644 --- a/venus-shared/api/chain/v0/syncer.go +++ b/venus-shared/api/chain/v0/syncer.go @@ -16,4 +16,7 @@ type ISyncer interface { ChainTipSetWeight(ctx context.Context, tsk types.TipSetKey) (big.Int, error) //perm:read SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error //perm:write SyncState(ctx context.Context) (*types.SyncState, error) //perm:read + // SyncIncomingBlocks returns a channel streaming incoming, potentially not + // yet synced block headers. + SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) //perm:read } diff --git a/venus-shared/api/chain/v1/method.md b/venus-shared/api/chain/v1/method.md index 24b4e2ced6..cf9997f5e0 100644 --- a/venus-shared/api/chain/v1/method.md +++ b/venus-shared/api/chain/v1/method.md @@ -225,6 +225,7 @@ curl http://:/rpc/v1 -X POST -H "Content-Type: application/json" -H " * [ChainTipSetWeight](#chaintipsetweight) * [Concurrent](#concurrent) * [SetConcurrent](#setconcurrent) + * [SyncIncomingBlocks](#syncincomingblocks) * [SyncState](#syncstate) * [SyncSubmitBlock](#syncsubmitblock) * [SyncerTracker](#syncertracker) @@ -6768,6 +6769,68 @@ Inputs: Response: `{}` +### SyncIncomingBlocks +SyncIncomingBlocks returns a channel streaming incoming, potentially not +yet synced block headers. + + +Perms: read + +Inputs: `[]` + +Response: +```json +{ + "Miner": "f01234", + "Ticket": { + "VRFProof": "Bw==" + }, + "ElectionProof": { + "WinCount": 9, + "VRFProof": "Bw==" + }, + "BeaconEntries": [ + { + "Round": 42, + "Data": "Ynl0ZSBhcnJheQ==" + } + ], + "WinPoStProof": [ + { + "PoStProof": 8, + "ProofBytes": "Ynl0ZSBhcnJheQ==" + } + ], + "Parents": [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } + ], + "ParentWeight": "0", + "Height": 10101, + "ParentStateRoot": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "ParentMessageReceipts": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "Messages": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "BLSAggregate": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "Timestamp": 42, + "BlockSig": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + }, + "ForkSignaling": 42, + "ParentBaseFee": "0" +} +``` + ### SyncState diff --git a/venus-shared/api/chain/v1/mock/mock_fullnode.go b/venus-shared/api/chain/v1/mock/mock_fullnode.go index d379f37f2a..c06ece4a3a 100644 --- a/venus-shared/api/chain/v1/mock/mock_fullnode.go +++ b/venus-shared/api/chain/v1/mock/mock_fullnode.go @@ -3110,6 +3110,21 @@ func (mr *MockFullNodeMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4 in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockFullNode)(nil).StateWaitMsg), arg0, arg1, arg2, arg3, arg4) } +// SyncIncomingBlocks mocks base method. +func (m *MockFullNode) SyncIncomingBlocks(arg0 context.Context) (<-chan *types0.BlockHeader, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncIncomingBlocks", arg0) + ret0, _ := ret[0].(<-chan *types0.BlockHeader) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SyncIncomingBlocks indicates an expected call of SyncIncomingBlocks. +func (mr *MockFullNodeMockRecorder) SyncIncomingBlocks(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncIncomingBlocks", reflect.TypeOf((*MockFullNode)(nil).SyncIncomingBlocks), arg0) +} + // SyncState mocks base method. func (m *MockFullNode) SyncState(arg0 context.Context) (*types0.SyncState, error) { m.ctrl.T.Helper() diff --git a/venus-shared/api/chain/v1/proxy_gen.go b/venus-shared/api/chain/v1/proxy_gen.go index 2b306b3f27..0da34f2a59 100644 --- a/venus-shared/api/chain/v1/proxy_gen.go +++ b/venus-shared/api/chain/v1/proxy_gen.go @@ -728,6 +728,7 @@ type ISyncerStruct struct { ChainTipSetWeight func(ctx context.Context, tsk types.TipSetKey) (big.Int, error) `perm:"read"` Concurrent func(ctx context.Context) int64 `perm:"read"` SetConcurrent func(ctx context.Context, concurrent int64) error `perm:"admin"` + SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"` SyncState func(ctx context.Context) (*types.SyncState, error) `perm:"read"` SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` SyncerTracker func(ctx context.Context) *types.TargetTracker `perm:"read"` @@ -744,6 +745,9 @@ func (s *ISyncerStruct) Concurrent(p0 context.Context) int64 { return s.Internal func (s *ISyncerStruct) SetConcurrent(p0 context.Context, p1 int64) error { return s.Internal.SetConcurrent(p0, p1) } +func (s *ISyncerStruct) SyncIncomingBlocks(p0 context.Context) (<-chan *types.BlockHeader, error) { + return s.Internal.SyncIncomingBlocks(p0) +} func (s *ISyncerStruct) SyncState(p0 context.Context) (*types.SyncState, error) { return s.Internal.SyncState(p0) } diff --git a/venus-shared/api/chain/v1/syncer.go b/venus-shared/api/chain/v1/syncer.go index fe2b71c916..35a8014c2b 100644 --- a/venus-shared/api/chain/v1/syncer.go +++ b/venus-shared/api/chain/v1/syncer.go @@ -16,4 +16,7 @@ type ISyncer interface { ChainTipSetWeight(ctx context.Context, tsk types.TipSetKey) (big.Int, error) //perm:read SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error //perm:write SyncState(ctx context.Context) (*types.SyncState, error) //perm:read + // SyncIncomingBlocks returns a channel streaming incoming, potentially not + // yet synced block headers. + SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) //perm:read } diff --git a/venus-shared/compatible-checks/api-diff.txt b/venus-shared/compatible-checks/api-diff.txt index 4543ef504f..e9ba8a6b49 100644 --- a/venus-shared/compatible-checks/api-diff.txt +++ b/venus-shared/compatible-checks/api-diff.txt @@ -99,7 +99,6 @@ github.com/filecoin-project/venus/venus-shared/api/chain/v0.FullNode <> github.c > StateReplay {[func(context.Context, types.TipSetKey, cid.Cid) (*types.InvocResult, error) <> func(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error)] base=func out type: #0 input; nested={[*types.InvocResult <> *api.InvocResult] base=pointed type; nested={[types.InvocResult <> api.InvocResult] base=struct field; nested={[types.InvocResult <> api.InvocResult] base=exported field type: #4 field named ExecutionTrace; nested={[types.ExecutionTrace <> types.ExecutionTrace] base=struct field; nested={[types.ExecutionTrace <> types.ExecutionTrace] base=exported fields count: 6 != 4; nested=nil}}}}}} - SyncCheckBad - SyncCheckpoint - - SyncIncomingBlocks - SyncMarkBad - SyncUnmarkAllBad - SyncUnmarkBad @@ -233,7 +232,6 @@ github.com/filecoin-project/venus/venus-shared/api/chain/v1.FullNode <> github.c > StateReplay {[func(context.Context, types.TipSetKey, cid.Cid) (*types.InvocResult, error) <> func(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error)] base=func out type: #0 input; nested={[*types.InvocResult <> *api.InvocResult] base=pointed type; nested={[types.InvocResult <> api.InvocResult] base=struct field; nested={[types.InvocResult <> api.InvocResult] base=exported field type: #4 field named ExecutionTrace; nested={[types.ExecutionTrace <> types.ExecutionTrace] base=struct field; nested={[types.ExecutionTrace <> types.ExecutionTrace] base=exported fields count: 6 != 4; nested=nil}}}}}} - SyncCheckBad - SyncCheckpoint - - SyncIncomingBlocks - SyncMarkBad - SyncUnmarkAllBad - SyncUnmarkBad