From 41d7ecd0ac36e0152c5b8cde776673df0f483201 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Wed, 20 Mar 2024 15:12:34 -0400 Subject: [PATCH] private/{ee,}stream: restart on quiescent downloads this change causes downloads to restart if no progress is made on the piece tracker for 5 consecutive seconds. todo in a later change: - testing for quiescent state - e2e testing in failure situation Change-Id: I35684ee610a5c785640aca5d4ef4e6d6a5ee7814 --- private/eestream/bundy.go | 10 ++++++ private/eestream/common.go | 4 +++ private/eestream/stripe.go | 62 ++++++++++++++++++++++++++++++++++++-- private/stream/download.go | 25 +++++++++++++-- 4 files changed, 97 insertions(+), 4 deletions(-) diff --git a/private/eestream/bundy.go b/private/eestream/bundy.go index 01441d53..e80e668d 100644 --- a/private/eestream/bundy.go +++ b/private/eestream/bundy.go @@ -53,6 +53,16 @@ func NewPiecesProgress(minimum, total int32) *PiecesProgress { } } +// ProgressSnapshot returns a snapshot of the current progress. No locks are held +// so it doesn't represent a single point in time in the presence of concurrent +// mutations. +func (y *PiecesProgress) ProgressSnapshot(out []int32) []int32 { + for i := range y.pieceSharesReceived { + out = append(out, y.pieceSharesReceived[i].Load()) + } + return out +} + // SetStripesNeeded tells PiecesProgress what neededShares stripe is needed next. func (y *PiecesProgress) SetStripesNeeded(required int32) { y.stripesNeeded.Store(required) diff --git a/private/eestream/common.go b/private/eestream/common.go index 87f51718..cd0c6ac0 100644 --- a/private/eestream/common.go +++ b/private/eestream/common.go @@ -12,5 +12,9 @@ var ( // Error is the default eestream errs class. Error = errs.Class("eestream") + // QuiescentError is the class of errors returned when a stream is quiescent + // and should be restarted. + QuiescentError = errs.Class("quiescence") + mon = monkit.Package() ) diff --git a/private/eestream/stripe.go b/private/eestream/stripe.go index ad68185e..b28041f1 100644 --- a/private/eestream/stripe.go +++ b/private/eestream/stripe.go @@ -12,16 +12,22 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/spacemonkeygo/monkit/v3" + "golang.org/x/exp/slices" "storj.io/common/rpc/rpctracing" "storj.io/common/sync2" "storj.io/infectious" ) -const debugEnabled = false -const maxStripesAhead = 256 // might be interesting to test different values later +const ( + debugEnabled = false + maxStripesAhead = 256 // might be interesting to test different values later + quiescentCheckInterval = time.Second + quiescentIntervalTrigger = 5 // number of quiescent check intervals before triggering +) // pieceReader represents the stream of shares within one piece. type pieceReader struct { @@ -47,6 +53,7 @@ type StripeReader struct { totalStripes int32 errorDetection bool runningPieces atomic.Int32 + quiescent atomic.Bool } // NewStripeReader makes a new StripeReader using the provided map of share @@ -92,11 +99,16 @@ func (s *StripeReader) start() { if debugEnabled { fmt.Println("starting", len(s.pieces), "readers") } + + var pwg sync.WaitGroup s.runningPieces.Store(int32(len(s.pieces))) + for idx := range s.pieces { s.wg.Add(1) + pwg.Add(1) go func(idx int) { defer s.wg.Done() + defer pwg.Done() // whenever a share reader is done, we should wake up the core in case // this share reader just exited unsuccessfully and this represents a @@ -110,6 +122,47 @@ func (s *StripeReader) start() { s.readShares(idx) }(idx) } + + done := make(chan struct{}) + go func() { + pwg.Wait() + close(done) + }() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + s1 := s.bundy.ProgressSnapshot(nil) + var s2 []int32 + + t := time.NewTicker(quiescentCheckInterval) + defer t.Stop() + + match := 0 + for { + select { + case <-t.C: + s2 = s.bundy.ProgressSnapshot(s2[:0]) + + if !slices.Equal(s1, s2) { + match = 0 + s2, s1 = s1, s2 + continue + } + + match++ + if match == quiescentIntervalTrigger { + s.quiescent.Store(true) + s.stripeReady.Signal() + return + } + + case <-done: + return + } + } + }() } // readShares is the method that does the actual work of reading an individual @@ -265,6 +318,11 @@ func (s *StripeReader) ReadStripes(ctx context.Context, nextStripe int64, out [] ready := make([]int, 0, len(s.pieces)) for { + // check if we were woken from quiescence. if so, error out. + if s.quiescent.Load() { + return nil, 0, QuiescentError.New("") + } + // okay let's tell the bundy clock we're awake and it should be okay to // wake us up again next time we sleep. s.bundy.AcknowledgeNewStripes() diff --git a/private/stream/download.go b/private/stream/download.go index e400ac77..ff15f55e 100644 --- a/private/stream/download.go +++ b/private/stream/download.go @@ -14,12 +14,13 @@ import ( "storj.io/common/storj" "storj.io/eventkit" "storj.io/picobuf" + "storj.io/uplink/private/eestream" "storj.io/uplink/private/metaclient" "storj.io/uplink/private/storage/streams" ) const ( - maxDecryptionRetries = 6 + maxDownloadRetries = 6 ) var ( @@ -37,6 +38,7 @@ type Download struct { closed bool decryptionRetries int + quiescenceRetries int } // NewDownload creates new stream download. @@ -96,6 +98,7 @@ func (download *Download) Read(data []byte) (n int, err error) { } else if encryption.ErrDecryptFailed.Has(err) { evs.Event("decryption-failure", eventkit.Int64("decryption-retries", int64(download.decryptionRetries)), + eventkit.Int64("quiescence-retries", int64(download.quiescenceRetries)), eventkit.Int64("offset", download.offset), eventkit.Int64("length", download.length), eventkit.Bytes("path-checksum", pathChecksum(download.info.EncPath)), @@ -103,7 +106,7 @@ func (download *Download) Read(data []byte) (n int, err error) { eventkit.Bytes("stream-id", maybeSatStreamID(download.info.Object.Stream.ID)), ) - if download.decryptionRetries < maxDecryptionRetries { + if download.decryptionRetries+download.quiescenceRetries < maxDownloadRetries { download.decryptionRetries++ // force us to get new a new collection of limits. @@ -111,6 +114,24 @@ func (download *Download) Read(data []byte) (n int, err error) { err = download.resetReader(true) } + } else if eestream.QuiescentError.Has(err) { + evs.Event("quiescence-failure", + eventkit.Int64("decryption-retries", int64(download.decryptionRetries)), + eventkit.Int64("quiescence-retries", int64(download.quiescenceRetries)), + eventkit.Int64("offset", download.offset), + eventkit.Int64("length", download.length), + eventkit.Bytes("path-checksum", pathChecksum(download.info.EncPath)), + eventkit.String("cipher-suite", download.info.Object.CipherSuite.String()), + eventkit.Bytes("stream-id", maybeSatStreamID(download.info.Object.Stream.ID)), + ) + + if download.decryptionRetries+download.quiescenceRetries < maxDownloadRetries { + download.quiescenceRetries++ + + download.info.DownloadedSegments = nil + + err = download.resetReader(false) + } } return n, err