Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/network, swarm/storage: Preserve opentracing contexts #19022

Merged
merged 8 commits into from
Feb 8, 2019
36 changes: 19 additions & 17 deletions swarm/network/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Fetcher struct {
requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
searchTimeout time.Duration
skipCheck bool
ctx context.Context
}

type Request struct {
Expand Down Expand Up @@ -109,29 +110,30 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
// contain the peers which are actively requesting this chunk, to make sure we
// don't request back the chunks from them.
// The created Fetcher is started and returned.
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peersToSkip *sync.Map) storage.NetFetcher {
fetcher := NewFetcher(source, f.request, f.skipCheck)
go fetcher.run(ctx, peersToSkip)
func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
go fetcher.run(peers)
return fetcher
}

// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
return &Fetcher{
addr: addr,
protoRequestFunc: rf,
offerC: make(chan *enode.ID),
requestC: make(chan uint8),
searchTimeout: defaultSearchTimeout,
skipCheck: skipCheck,
ctx: ctx,
}
}

// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
func (f *Fetcher) Offer(source *enode.ID) {
// First we need to have this select to make sure that we return if context is done
select {
case <-ctx.Done():
case <-f.ctx.Done():
return
default:
}
Expand All @@ -140,15 +142,15 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.offerC <- source:
case <-ctx.Done():
case <-f.ctx.Done():
}
}

// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
func (f *Fetcher) Request(hopCount uint8) {
// First we need to have this select to make sure that we return if context is done
select {
case <-ctx.Done():
case <-f.ctx.Done():
return
default:
}
Expand All @@ -162,13 +164,13 @@ func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
select {
case f.requestC <- hopCount + 1:
case <-ctx.Done():
case <-f.ctx.Done():
}
}

// start prepares the Fetcher
// it keeps the Fetcher alive within the lifecycle of the passed context
func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
func (f *Fetcher) run(peers *sync.Map) {
var (
doRequest bool // determines if retrieval is initiated in the current iteration
wait *time.Timer // timer for search timeout
Expand Down Expand Up @@ -219,7 +221,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
doRequest = requested

// all Fetcher context closed, can quit
case <-ctx.Done():
case <-f.ctx.Done():
log.Trace("terminate fetcher", "request addr", f.addr)
// TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
return
Expand All @@ -228,7 +230,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// need to issue a new request
if doRequest {
var err error
sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
sources, err = f.doRequest(gone, peers, sources, hopCount)
if err != nil {
log.Info("unable to request", "request addr", f.addr, "err", err)
}
Expand Down Expand Up @@ -266,7 +268,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// * the peer's address is added to the set of peers to skip
// * the peer's address is removed from prospective sources, and
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
var i int
var sourceID *enode.ID
var quit chan struct{}
Expand All @@ -283,7 +285,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
sourceID, quit, err = f.protoRequestFunc(ctx, req)
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
// Note: we can modify the source although we are looping on it, because we break from the loop immediately
Expand All @@ -297,7 +299,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
if !foundSource {
req.Source = nil
var err error
sourceID, quit, err = f.protoRequestFunc(ctx, req)
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err != nil {
// if no peers found to request from
return sources, err
Expand All @@ -314,7 +316,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
select {
case <-quit:
gone <- sourceID
case <-ctx.Done():
case <-f.ctx.Done():
}
}()
return sources, nil
Expand Down
113 changes: 54 additions & 59 deletions swarm/network/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode
func TestFetcherSingleRequest(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

peers := []string{"a", "b", "c", "d"}
peersToSkip := &sync.Map{}
for _, p := range peers {
peersToSkip.Store(p, time.Now())
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

rctx := context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

select {
case request := <-requester.requestC:
Expand Down Expand Up @@ -115,20 +115,19 @@ func TestFetcherSingleRequest(t *testing.T) {
func TestFetcherCancelStopsFetcher(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

// we start the fetcher, and then we immediately cancel the context
go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)
cancel()

rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer rcancel()
// we call Request with an active context
fetcher.Request(rctx, 0)
fetcher.Request(0)

// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select {
Expand All @@ -140,23 +139,23 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {

// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
func TestFetcherCancelStopsRequest(t *testing.T) {
t.Skip("since context is now per fetcher, this test is likely redundant")

requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// we start the fetcher with an active context
go fetcher.run(ctx, peersToSkip)
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

rctx, rcancel := context.WithCancel(context.Background())
rcancel()
peersToSkip := &sync.Map{}

// we start the fetcher with an active context
go fetcher.run(peersToSkip)

// we call Request with a cancelled context
fetcher.Request(rctx, 0)
fetcher.Request(0)

// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
select {
Expand All @@ -166,8 +165,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
}

// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
rctx = context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

select {
case <-requester.requestC:
Expand All @@ -182,19 +180,19 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
func TestFetcherOfferUsesSource(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

// start the fetcher
go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

rctx := context.Background()
// call the Offer function with the source peer
fetcher.Offer(rctx, &sourcePeerID)
fetcher.Offer(&sourcePeerID)

// fetcher should not initiate request
select {
Expand All @@ -204,8 +202,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
}

// call Request after the Offer
rctx = context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

// there should be exactly 1 request coming from fetcher
var request *Request
Expand Down Expand Up @@ -234,19 +231,19 @@ func TestFetcherOfferUsesSource(t *testing.T) {
func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
requester := newMockRequester(100 * time.Millisecond)
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

peersToSkip := &sync.Map{}

// start the fetcher
go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

// call Request first
rctx := context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

// there should be a request coming from fetcher
var request *Request
Expand All @@ -260,7 +257,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
}

// after the Request call Offer
fetcher.Offer(context.Background(), &sourcePeerID)
fetcher.Offer(&sourcePeerID)

// there should be a request coming from fetcher
select {
Expand All @@ -283,21 +280,21 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
func TestFetcherRetryOnTimeout(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
// set searchTimeOut to low value so the test is quicker
fetcher.searchTimeout = 250 * time.Millisecond

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start the fetcher
go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

// call the fetch function with an active context
rctx := context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

// after 100ms the first request should be initiated
time.Sleep(100 * time.Millisecond)
Expand Down Expand Up @@ -339,7 +336,7 @@ func TestFetcherFactory(t *testing.T) {

fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)

fetcher.Request(context.Background(), 0)
fetcher.Request(0)

// check if the created fetchFunction really starts a fetcher and initiates a request
select {
Expand All @@ -353,21 +350,21 @@ func TestFetcherFactory(t *testing.T) {
func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

// make sure the searchTimeout is long so it is sure the request is not
// retried because of timeout
fetcher.searchTimeout = 10 * time.Second

peersToSkip := &sync.Map{}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go fetcher.run(ctx, peersToSkip)
go fetcher.run(peersToSkip)

rctx := context.Background()
fetcher.Request(rctx, 0)
fetcher.Request(0)

select {
case <-requester.requestC:
Expand Down Expand Up @@ -460,17 +457,15 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
func TestFetcherMaxHopCount(t *testing.T) {
requester := newMockRequester()
addr := make([]byte, 32)
fetcher := NewFetcher(addr, requester.doRequest, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

peersToSkip := &sync.Map{}
fetcher := NewFetcher(ctx, addr, requester.doRequest, true)

go fetcher.run(ctx, peersToSkip)
peersToSkip := &sync.Map{}

rctx := context.Background()
fetcher.Request(rctx, maxHopCount)
go fetcher.run(peersToSkip)

// if hopCount is already at max no request should be initiated
select {
Expand Down
Loading