Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements of streamsync to deploy on mainnet #4493

Merged
merged 11 commits into from
Sep 29, 2023
Merged
7 changes: 2 additions & 5 deletions api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish.
SoftQueueCap int = 100

// DefaultConcurrency is the default settings for concurrency
DefaultConcurrency int = 4

// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout time.Duration = 1 * time.Minute
Expand Down Expand Up @@ -74,10 +71,10 @@ type (

func (c *Config) fixValues() {
if c.Concurrency == 0 {
c.Concurrency = DefaultConcurrency
c.Concurrency = c.MinStreams
}
if c.Concurrency > c.MinStreams {
c.MinStreams = c.Concurrency
c.Concurrency = c.MinStreams
}
if c.MinStreams > c.InitStreams {
c.InitStreams = c.MinStreams
Expand Down
21 changes: 17 additions & 4 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscript
// waitForBootFinish waits for stream manager to finish the initial discovery and have
// enough peers to start downloader
func (d *Downloader) waitForBootFinish() {
bootCompleted, numStreams := d.waitForEnoughStreams(d.config.InitStreams)
if bootCompleted {
fmt.Printf("boot completed for shard %d ( %d streams are connected )\n",
d.bc.ShardID(), numStreams)
}
}

func (d *Downloader) waitForEnoughStreams(requiredStreams int) (bool, int) {
d.logger.Info().Int("requiredStreams", requiredStreams).
Msg("waiting for enough stream connections to continue syncing")

evtCh := make(chan streammanager.EvtStreamAdded, 1)
sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh)
defer sub.Unsubscribe()
Expand All @@ -177,12 +188,11 @@ func (d *Downloader) waitForBootFinish() {
trigger()

case <-checkCh:
if d.syncProtocol.NumStreams() >= d.config.InitStreams {
fmt.Printf("boot completed for shard %d ( %d streams are connected )\n", d.bc.ShardID(), d.syncProtocol.NumStreams())
return
if d.syncProtocol.NumStreams() >= requiredStreams {
return true, d.syncProtocol.NumStreams()
}
case <-d.closeC:
return
return false, d.syncProtocol.NumStreams()
}
}
}
Expand Down Expand Up @@ -212,6 +222,9 @@ func (d *Downloader) loop() {
case <-d.downloadC:
bnBeforeSync := d.bc.CurrentBlock().NumberU64()
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
if err == ErrNotEnoughStreams {
d.waitForEnoughStreams(d.config.MinStreams)
}
if err != nil {
//TODO: if there is a bad block which can't be resolved
if d.stagedSyncInstance.invalidBlock.Active {
Expand Down
2 changes: 1 addition & 1 deletion api/service/stagedstreamsync/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
ErrUnexpectedNumberOfBlockHashes = WrapStagedSyncError("unexpected number of getBlocksByHashes result")
ErrUnexpectedBlockHashes = WrapStagedSyncError("unexpected get block hashes result delivered")
ErrNilBlock = WrapStagedSyncError("nil block found")
ErrNotEnoughStreams = WrapStagedSyncError("not enough streams")
ErrNotEnoughStreams = WrapStagedSyncError("number of streams smaller than minimum required")
ErrParseCommitSigAndBitmapFail = WrapStagedSyncError("parse commitSigAndBitmap failed")
ErrVerifyHeaderFail = WrapStagedSyncError("verify header failed")
ErrInsertChainFail = WrapStagedSyncError("insert to chain failed")
Expand Down
5 changes: 5 additions & 0 deletions api/service/stagedstreamsync/short_range_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -132,6 +133,10 @@ func (sh *srHelper) getBlocksByHashes(ctx context.Context, hashes []common.Hash,

func (sh *srHelper) checkPrerequisites() error {
if sh.syncProtocol.NumStreams() < sh.config.Concurrency {
utils.Logger().Info().
Int("available streams", sh.syncProtocol.NumStreams()).
Interface("concurrency", sh.config.Concurrency).
Msg("not enough streams to do concurrent processes")
return ErrNotEnoughStreams
}
return nil
Expand Down
13 changes: 11 additions & 2 deletions api/service/stagedstreamsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,22 @@ func (b *StageBodies) runBlockWorkerLoop(ctx context.Context, gbm *blockDownload
Msg(WrapStagedSyncMsg("downloadRawBlocks failed"))
err = errors.Wrap(err, "request error")
gbm.HandleRequestError(batch, err, stid)
} else if blockBytes == nil || len(blockBytes) == 0 {
} else if blockBytes == nil {
utils.Logger().Warn().
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes"))
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received invalid (nil) blockBytes"))
err := errors.New("downloadRawBlocks received invalid (nil) blockBytes")
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.StreamFailed(stid, "downloadRawBlocks failed")
} else if len(blockBytes) == 0 {
utils.Logger().Warn().
Str("stream", string(stid)).
Interface("block numbers", batch).
Msg(WrapStagedSyncMsg("downloadRawBlocks failed, received empty blockBytes, remote peer is not fully synced"))
sophoah marked this conversation as resolved.
Show resolved Hide resolved
err := errors.New("downloadRawBlocks received empty blockBytes")
gbm.HandleRequestError(batch, err, stid)
b.configs.protocol.RemoveStream(stid)
} else {
if err = b.saveBlocks(ctx, gbm.tx, batch, blockBytes, sigBytes, loopID, stid); err != nil {
panic(ErrSaveBlocksToDbFailed)
Expand Down
7 changes: 6 additions & 1 deletion api/service/stagedstreamsync/stage_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage
}

if err := sh.checkPrerequisites(); err != nil {
return 0, errors.Wrap(err, "prerequisite")
// if error is ErrNotEnoughStreams but still some streams available,
// it can continue syncing, otherwise return error
// here we are not doing concurrent processes, so even 1 stream should be enough
if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() == 0 {
return 0, errors.Wrap(err, "prerequisite")
}
}
curBN := s.state.bc.CurrentBlock().NumberU64()
bns := make([]uint64, 0, BlocksPerRequest)
Expand Down
7 changes: 6 additions & 1 deletion api/service/stagedstreamsync/stage_short_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState)
}

if err := sh.checkPrerequisites(); err != nil {
return 0, errors.Wrap(err, "prerequisite")
// if error is ErrNotEnoughStreams but still two streams available,
// it can continue syncing, otherwise return error
// at least 2 streams are needed to do concurrent processes
if err != ErrNotEnoughStreams || s.state.protocol.NumStreams() < 2 {
return 0, errors.Wrap(err, "prerequisite")
}
}
curBN := sr.configs.bc.CurrentBlock().NumberU64()
blkNums := sh.prepareBlockHashNumbers(curBN)
Expand Down
3 changes: 2 additions & 1 deletion api/service/stagedstreamsync/staged_stream_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,9 @@ func (s *StagedStreamSync) promLabels() prometheus.Labels {
func (s *StagedStreamSync) checkHaveEnoughStreams() error {
numStreams := s.protocol.NumStreams()
if numStreams < s.config.MinStreams {
return fmt.Errorf("number of streams smaller than minimum: %v < %v",
s.logger.Debug().Msgf("number of streams smaller than minimum: %v < %v",
numStreams, s.config.MinStreams)
return ErrNotEnoughStreams
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/harmony/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ var (
Downloader: true,
StagedSync: false,
StagedSyncCfg: defaultStagedSyncConfig,
Concurrency: 4,
Concurrency: 2,
MinPeers: 2,
InitStreams: 2,
MaxAdvertiseWaitTime: 2, //minutes
Expand Down
3 changes: 2 additions & 1 deletion p2p/stream/common/requestmanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -118,7 +119,7 @@ func (st *testStream) FailedTimes() int {
return 0
}

func (st *testStream) AddFailedTimes() {
func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
sophoah marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/stream/common/streammanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"

sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (st *testStream) FailedTimes() int {
return 0
}

func (st *testStream) AddFailedTimes() {
func (st *testStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
return
}

Expand Down
6 changes: 3 additions & 3 deletions p2p/stream/protocols/sync/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,17 @@ func (ch *chainHelperImpl) getNodeData(hs []common.Hash) ([][]byte, error) {

// getReceipts assembles the response to a receipt query.
func (ch *chainHelperImpl) getReceipts(hs []common.Hash) ([]types.Receipts, error) {
var receipts []types.Receipts

receipts := make([]types.Receipts, len(hs))
for i, hash := range hs {
// Retrieve the requested block's receipts
results := ch.chain.GetReceiptsByHash(hash)
if results == nil {
if header := ch.chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash() != types.EmptyRootHash {
continue
}
return nil, errors.New("invalid hashes to get receipts")
}
receipts[i] = append(receipts[i], results...)
receipts[i] = results
}
return receipts, nil
}
17 changes: 16 additions & 1 deletion p2p/stream/protocols/sync/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (tch *testChainHelper) getNodeData(hs []common.Hash) ([][]byte, error) {

func (tch *testChainHelper) getReceipts(hs []common.Hash) ([]types.Receipts, error) {
testReceipts := makeTestReceipts(len(hs), 3)
receipts := make([]types.Receipts, len(hs)*3)
receipts := make([]types.Receipts, len(hs))
for i, _ := range hs {
receipts[i] = testReceipts
}
Expand Down Expand Up @@ -200,3 +200,18 @@ func checkBlocksByHashesResult(b []byte, hs []common.Hash) error {
}
return nil
}

func checkGetReceiptsResult(b []byte, hs []common.Hash) error {
var msg = &syncpb.Message{}
if err := protobuf.Unmarshal(b, msg); err != nil {
return err
}
bhResp, err := msg.GetReceiptsResponse()
if err != nil {
return err
}
if len(hs) != len(bhResp.Receipts) {
return errors.New("unexpected size")
}
return nil
}
6 changes: 5 additions & 1 deletion p2p/stream/protocols/sync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ const (
GetReceiptsCap = 10

// MaxStreamFailures is the maximum allowed failures before stream gets removed
MaxStreamFailures = 3
MaxStreamFailures = 5

// FaultRecoveryThreshold is the minimum duration before it resets the previous failures
// So, if stream hasn't had any issue for a certain amount of time since last failure, we can still trust it
FaultRecoveryThreshold = 30 * time.Minute

// minAdvertiseInterval is the minimum advertise interval
minAdvertiseInterval = 1 * time.Minute
Expand Down
16 changes: 16 additions & 0 deletions p2p/stream/protocols/sync/message/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,19 @@ func (msg *Message) GetBlocksByHashesResponse() (*GetBlocksByHashesResponse, err
}
return gbResp, nil
}

// GetReceiptsResponse parse the message to GetReceiptsResponse
func (msg *Message) GetReceiptsResponse() (*GetReceiptsResponse, error) {
resp := msg.GetResp()
if resp == nil {
return nil, errors.New("not response message")
}
if errResp := resp.GetErrorResponse(); errResp != nil {
return nil, &ResponseError{errResp.Error}
}
grResp := resp.GetGetReceiptsResponse()
if grResp == nil {
return nil, errors.New("not GetGetReceiptsResponse")
}
return grResp, nil
}
5 changes: 4 additions & 1 deletion p2p/stream/protocols/sync/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,16 @@ func (p *Protocol) RemoveStream(stID sttypes.StreamID) {
st.Close()
// stream manager removes this stream from the list and triggers discovery if number of streams are not enough
p.sm.RemoveStream(stID) //TODO: double check to see if this part is needed
p.logger.Info().
Str("stream ID", string(stID)).
Msg("stream removed")
}
}

func (p *Protocol) StreamFailed(stID sttypes.StreamID, reason string) {
st, exist := p.sm.GetStreamByID(stID)
if exist && st != nil {
st.AddFailedTimes()
st.AddFailedTimes(FaultRecoveryThreshold)
p.logger.Info().
Str("stream ID", string(st.ID())).
Int("num failures", st.FailedTimes()).
Expand Down
31 changes: 31 additions & 0 deletions p2p/stream/protocols/sync/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ var (
}
testGetBlocksByHashesRequest = syncpb.MakeGetBlocksByHashesRequest(testGetBlockByHashes)
testGetBlocksByHashesRequestMsg = syncpb.MakeMessageFromRequest(testGetBlocksByHashesRequest)

testGetReceipts = []common.Hash{
numberToHash(1),
numberToHash(2),
numberToHash(3),
numberToHash(4),
numberToHash(5),
}
testGetReceiptsRequest = syncpb.MakeGetReceiptsRequest(testGetReceipts)
testGetReceiptsRequestMsg = syncpb.MakeMessageFromRequest(testGetReceiptsRequest)
)

func TestSyncStream_HandleGetBlocksByRequest(t *testing.T) {
Expand Down Expand Up @@ -126,6 +136,27 @@ func TestSyncStream_HandleGetBlocksByHashes(t *testing.T) {
}
}

func TestSyncStream_HandleGetReceipts(t *testing.T) {
st, remoteSt := makeTestSyncStream()

go st.run()
defer close(st.closeC)

req := testGetReceiptsRequestMsg
b, _ := protobuf.Marshal(req)
err := remoteSt.WriteBytes(b)
if err != nil {
t.Fatal(err)
}

time.Sleep(200 * time.Millisecond)
receivedBytes, _ := remoteSt.ReadBytes()

if err := checkGetReceiptsResult(receivedBytes, testGetBlockByHashes); err != nil {
t.Fatal(err)
}
}

func makeTestSyncStream() (*syncStream, *testRemoteBaseStream) {
localRaw, remoteRaw := makePairP2PStreams()
remote := newTestRemoteBaseStream(remoteRaw)
Expand Down
15 changes: 12 additions & 3 deletions p2p/stream/types/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"io"
"sync"
"time"

libp2p_network "github.com/libp2p/go-libp2p/core/network"
"github.com/pkg/errors"
Expand All @@ -22,7 +23,7 @@ type Stream interface {
Close() error
CloseOnExit() error
FailedTimes() int
AddFailedTimes()
AddFailedTimes(faultRecoveryThreshold time.Duration)
ResetFailedTimes()
}

Expand All @@ -38,7 +39,8 @@ type BaseStream struct {
specErr error
specOnce sync.Once

failedTimes int
failedTimes int
lastFailureTime time.Time
}

// NewBaseStream creates BaseStream as the wrapper of libp2p Stream
Expand Down Expand Up @@ -82,8 +84,15 @@ func (st *BaseStream) FailedTimes() int {
return st.failedTimes
}

func (st *BaseStream) AddFailedTimes() {
func (st *BaseStream) AddFailedTimes(faultRecoveryThreshold time.Duration) {
if st.failedTimes > 0 {
durationSinceLastFailure := time.Now().Sub(st.lastFailureTime)
if durationSinceLastFailure >= faultRecoveryThreshold {
st.ResetFailedTimes()
}
}
st.failedTimes++
st.lastFailureTime = time.Now()
}

func (st *BaseStream) ResetFailedTimes() {
Expand Down