From c6b88a2c5d79f85357c18d7c04330ae96bcfd882 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Mon, 20 Jul 2020 20:17:29 +0200 Subject: [PATCH] wire a context in most of the data pipeline --- queue/queue.go | 8 ++++---- queue/queue_test.go | 2 +- simple/reprovide_test.go | 14 +++++++++----- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 2c33502..7f840b6 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -97,7 +97,7 @@ func (q *Queue) work() { c, err = cid.Parse(head.Value) if err != nil { log.Warningf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err) - err = q.ds.Delete(k) + err = q.ds.Delete(q.ctx, k) if err != nil { log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) return @@ -120,12 +120,12 @@ func (q *Queue) work() { keyPath := fmt.Sprintf("%d/%s", time.Now().UnixNano(), c.String()) nextKey := datastore.NewKey(keyPath) - if err := q.ds.Put(nextKey, toQueue.Bytes()); err != nil { + if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to enqueue cid: %s", err) continue } case dequeue <- c: - err := q.ds.Delete(k) + err := q.ds.Delete(q.ctx, k) if err != nil { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) @@ -141,7 +141,7 @@ func (q *Queue) work() { func (q *Queue) getQueueHead() (*query.Entry, error) { qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1} - results, err := q.ds.Query(qry) + results, err := q.ds.Query(q.ctx, qry) if err != nil { return nil, err } diff --git a/queue/queue_test.go b/queue/queue_test.go index 819fa90..127ac95 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -72,7 +72,7 @@ func TestMangledData(t *testing.T) { // put bad data in the queue queueKey := datastore.NewKey("/test/0") - err = queue.ds.Put(queueKey, []byte("borked")) + err = queue.ds.Put(ctx, queueKey, []byte("borked")) if err != nil { t.Fatal(err) } diff --git a/simple/reprovide_test.go b/simple/reprovide_test.go index 3858baf..7d025b3 100644 --- a/simple/reprovide_test.go +++ b/simple/reprovide_test.go @@ -33,14 +33,18 @@ func setupRouting(t *testing.T) (clA, clB mock.Client, idA, idB peer.ID) { return clA, clB, iidA.ID(), iidB.ID() } -func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) { +func setupDag(t *testing.T, ctx context.Context) (nodes []cid.Cid, bstore blockstore.Blockstore) { bstore = blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) for _, data := range []string{"foo", "bar"} { + + // TODO: I think this is now an incorrect encoding after the update to newer + // go-cid or related. Tests breaks because of that. + blk, err := cbor.WrapObject(data, mh.SHA2_256, -1) if err != nil { t.Fatal(err) } - err = bstore.Put(blk) + err = bstore.Put(ctx, blk) if err != nil { t.Fatal(err) } @@ -52,7 +56,7 @@ func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) { if err != nil { t.Fatal(err) } - err = bstore.Put(blk) + err = bstore.Put(ctx, blk) if err != nil { t.Fatal(err) } @@ -83,7 +87,7 @@ func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context defer cancel() clA, clB, idA, _ := setupRouting(t) - nodes, bstore := setupDag(t) + nodes, bstore := setupDag(t, ctx) keyProvider := NewBlockstoreProvider(bstore) reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) @@ -193,7 +197,7 @@ func TestReprovidePinned(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodes, bstore := setupDag(t) + nodes, bstore := setupDag(t, ctx) dag := merkledag.NewDAGService(bsrv.New(bstore, offline.Exchange(bstore)))