Skip to content

Commit

Permalink
private/{ee,}stream: restart on quiescent downloads
Browse files Browse the repository at this point in the history
this change causes downloads to restart if no progress
is made on the piece tracker for 5 consecutive seconds.

todo in a later change:
   - testing for quiescent state
   - e2e testing in failure situation

Change-Id: I35684ee610a5c785640aca5d4ef4e6d6a5ee7814
  • Loading branch information
zeebo authored and jtolio committed Apr 9, 2024
1 parent 76e9d53 commit 41d7ecd
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 4 deletions.
10 changes: 10 additions & 0 deletions private/eestream/bundy.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ func NewPiecesProgress(minimum, total int32) *PiecesProgress {
}
}

// ProgressSnapshot returns a snapshot of the current progress. No locks are held
// so it doesn't represent a single point in time in the presence of concurrent
// mutations.
func (y *PiecesProgress) ProgressSnapshot(out []int32) []int32 {
for i := range y.pieceSharesReceived {
out = append(out, y.pieceSharesReceived[i].Load())
}
return out
}

// SetStripesNeeded tells PiecesProgress what neededShares stripe is needed next.
func (y *PiecesProgress) SetStripesNeeded(required int32) {
y.stripesNeeded.Store(required)
Expand Down
4 changes: 4 additions & 0 deletions private/eestream/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ var (
// Error is the default eestream errs class.
Error = errs.Class("eestream")

// QuiescentError is the class of errors returned when a stream is quiescent
// and should be restarted.
QuiescentError = errs.Class("quiescence")

mon = monkit.Package()
)
62 changes: 60 additions & 2 deletions private/eestream/stripe.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/spacemonkeygo/monkit/v3"
"golang.org/x/exp/slices"

"storj.io/common/rpc/rpctracing"
"storj.io/common/sync2"
"storj.io/infectious"
)

const debugEnabled = false
const maxStripesAhead = 256 // might be interesting to test different values later
const (
debugEnabled = false
maxStripesAhead = 256 // might be interesting to test different values later
quiescentCheckInterval = time.Second
quiescentIntervalTrigger = 5 // number of quiescent check intervals before triggering
)

// pieceReader represents the stream of shares within one piece.
type pieceReader struct {
Expand All @@ -47,6 +53,7 @@ type StripeReader struct {
totalStripes int32
errorDetection bool
runningPieces atomic.Int32
quiescent atomic.Bool
}

// NewStripeReader makes a new StripeReader using the provided map of share
Expand Down Expand Up @@ -92,11 +99,16 @@ func (s *StripeReader) start() {
if debugEnabled {
fmt.Println("starting", len(s.pieces), "readers")
}

var pwg sync.WaitGroup
s.runningPieces.Store(int32(len(s.pieces)))

for idx := range s.pieces {
s.wg.Add(1)
pwg.Add(1)
go func(idx int) {
defer s.wg.Done()
defer pwg.Done()

// whenever a share reader is done, we should wake up the core in case
// this share reader just exited unsuccessfully and this represents a
Expand All @@ -110,6 +122,47 @@ func (s *StripeReader) start() {
s.readShares(idx)
}(idx)
}

done := make(chan struct{})
go func() {
pwg.Wait()
close(done)
}()

s.wg.Add(1)
go func() {
defer s.wg.Done()

s1 := s.bundy.ProgressSnapshot(nil)
var s2 []int32

t := time.NewTicker(quiescentCheckInterval)
defer t.Stop()

match := 0
for {
select {
case <-t.C:
s2 = s.bundy.ProgressSnapshot(s2[:0])

if !slices.Equal(s1, s2) {
match = 0
s2, s1 = s1, s2
continue
}

match++
if match == quiescentIntervalTrigger {
s.quiescent.Store(true)
s.stripeReady.Signal()
return
}

case <-done:
return
}
}
}()
}

// readShares is the method that does the actual work of reading an individual
Expand Down Expand Up @@ -265,6 +318,11 @@ func (s *StripeReader) ReadStripes(ctx context.Context, nextStripe int64, out []
ready := make([]int, 0, len(s.pieces))

for {
// check if we were woken from quiescence. if so, error out.
if s.quiescent.Load() {
return nil, 0, QuiescentError.New("")
}

// okay let's tell the bundy clock we're awake and it should be okay to
// wake us up again next time we sleep.
s.bundy.AcknowledgeNewStripes()
Expand Down
25 changes: 23 additions & 2 deletions private/stream/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (
"storj.io/common/storj"
"storj.io/eventkit"
"storj.io/picobuf"
"storj.io/uplink/private/eestream"
"storj.io/uplink/private/metaclient"
"storj.io/uplink/private/storage/streams"
)

const (
maxDecryptionRetries = 6
maxDownloadRetries = 6
)

var (
Expand All @@ -37,6 +38,7 @@ type Download struct {
closed bool

decryptionRetries int
quiescenceRetries int
}

// NewDownload creates new stream download.
Expand Down Expand Up @@ -96,21 +98,40 @@ func (download *Download) Read(data []byte) (n int, err error) {
} else if encryption.ErrDecryptFailed.Has(err) {
evs.Event("decryption-failure",
eventkit.Int64("decryption-retries", int64(download.decryptionRetries)),
eventkit.Int64("quiescence-retries", int64(download.quiescenceRetries)),
eventkit.Int64("offset", download.offset),
eventkit.Int64("length", download.length),
eventkit.Bytes("path-checksum", pathChecksum(download.info.EncPath)),
eventkit.String("cipher-suite", download.info.Object.CipherSuite.String()),
eventkit.Bytes("stream-id", maybeSatStreamID(download.info.Object.Stream.ID)),
)

if download.decryptionRetries < maxDecryptionRetries {
if download.decryptionRetries+download.quiescenceRetries < maxDownloadRetries {
download.decryptionRetries++

// force us to get new a new collection of limits.
download.info.DownloadedSegments = nil

err = download.resetReader(true)
}
} else if eestream.QuiescentError.Has(err) {
evs.Event("quiescence-failure",
eventkit.Int64("decryption-retries", int64(download.decryptionRetries)),
eventkit.Int64("quiescence-retries", int64(download.quiescenceRetries)),
eventkit.Int64("offset", download.offset),
eventkit.Int64("length", download.length),
eventkit.Bytes("path-checksum", pathChecksum(download.info.EncPath)),
eventkit.String("cipher-suite", download.info.Object.CipherSuite.String()),
eventkit.Bytes("stream-id", maybeSatStreamID(download.info.Object.Stream.ID)),
)

if download.decryptionRetries+download.quiescenceRetries < maxDownloadRetries {
download.quiescenceRetries++

download.info.DownloadedSegments = nil

err = download.resetReader(false)
}
}

return n, err
Expand Down

0 comments on commit 41d7ecd

Please sign in to comment.