Skip to content

Commit

Permalink
More consistent use of test library
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Mar 10, 2023
1 parent cd3f95a commit 2e37434
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 603 deletions.
45 changes: 12 additions & 33 deletions command/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/ipni/storetheindex/config"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -34,19 +35,13 @@ func TestInit(t *testing.T) {
)

err := app.RunContext(ctx, []string{"storetheindex", "init", "-listen-admin", badAddr})
if err == nil {
t.Fatal("expected error")
}
require.Error(t, err)

err = app.RunContext(ctx, []string{"storetheindex", "init", "-listen-finder", badAddr})
if err == nil {
t.Fatal("expected error")
}
require.Error(t, err)

err = app.RunContext(ctx, []string{"storetheindex", "init", "-listen-ingest", badAddr})
if err == nil {
t.Fatal("expected error")
}
require.Error(t, err)

args := []string{
"storetheindex", "init",
Expand All @@ -57,33 +52,17 @@ func TestInit(t *testing.T) {
"-store", storeType,
}
err = app.RunContext(ctx, args)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

cfg, err := config.Load("")
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if cfg.Addresses.Finder != goodAddr {
t.Error("finder listen address was not configured")
}
if cfg.Addresses.Ingest != goodAddr2 {
t.Error("ingest listen address was not configured")
}
if cfg.Indexer.CacheSize != cacheSize {
t.Error("cache size was tno configured")
}
if cfg.Indexer.ValueStoreType != storeType {
t.Error("value store type was not configured")
}
if cfg.Ingest.PubSubTopic != topicName {
t.Errorf("expected %s for pubsub topic, got %s", topicName, cfg.Ingest.PubSubTopic)
}
if cfg.Version != config.Version {
t.Error("did not init config with correct version")
}
require.Equal(t, goodAddr, cfg.Addresses.Finder, "finder listen address was not configured")
require.Equal(t, goodAddr2, cfg.Addresses.Ingest, "ingest listen address was not configured")
require.Equal(t, cacheSize, cfg.Indexer.CacheSize, "cache size was tno configured")
require.Equal(t, storeType, cfg.Indexer.ValueStoreType, "value store type was not configured")
require.Equal(t, topicName, cfg.Ingest.PubSubTopic)
require.Equal(t, config.Version, cfg.Version)

t.Log(cfg.String())
}
37 changes: 12 additions & 25 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ func TestAnnounceReplace(t *testing.T) {
// Have the subscriber receive an announce. This is the same as if it was
// published by the publisher without having to wait for it to arrive.
err = sub.Announce(context.Background(), firstCid, srcHost.ID(), srcHost.Addrs())
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Log("Sent announce for first CID", firstCid)

// This first announce should start the handler goroutine and clear the
Expand Down Expand Up @@ -113,15 +111,10 @@ func TestAnnounceReplace(t *testing.T) {
case <-time.After(updateTimeout):
t.Fatal("timed out waiting for sync to propagate")
case downstream, open := <-watcher:
if !open {
t.Fatal("event channle closed without receiving event")
}
if !downstream.Cid.Equals(firstCid) {
t.Fatalf("sync returned unexpected first cid %s, expected %s", downstream.Cid, firstCid)
}
if _, err = dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String())); err != nil {
t.Fatalf("data not in receiver store: %s", err)
}
require.True(t, open, "event channle closed without receiving event")
require.Equal(t, firstCid, downstream.Cid, "sync returned unexpected first cid")
_, err = dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String()))
require.NoError(t, err, "data not in receiver store")
t.Log("Received sync notification for first CID:", firstCid)
}

Expand All @@ -130,26 +123,20 @@ func TestAnnounceReplace(t *testing.T) {
case <-time.After(updateTimeout):
t.Fatal("timed out waiting for sync to propagate")
case downstream, open := <-watcher:
if !open {
t.Fatal("event channle closed without receiving event")
}
if !downstream.Cid.Equals(lastCid) {
t.Fatalf("sync returned unexpected last cid %s, expected %s", downstream.Cid, lastCid)
}
if _, err = dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String())); err != nil {
t.Fatalf("data not in receiver store: %s", err)
}
require.True(t, open, "event channle closed without receiving event")
require.Equal(t, lastCid, downstream.Cid, "sync returned unexpected last cid")
_, err = dstStore.Get(context.Background(), datastore.NewKey(downstream.Cid.String()))
require.NoError(t, err, "data not in receiver store")
t.Log("Received sync notification for last CID:", lastCid)
}

// Validate that no additional updates happen.
select {
case <-time.After(3 * time.Second):
case changeEvent, open := <-watcher:
if open {
t.Fatalf("no exchange should have been performed, but got change from peer %s for cid %s",
changeEvent.PeerID, changeEvent.Cid)
}
require.Falsef(t, open,
"no exchange should have been performed, but got change from peer %s for cid %s",
changeEvent.PeerID, changeEvent.Cid)
}
}

Expand Down
125 changes: 37 additions & 88 deletions dagsync/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dagsync_test
import (
"context"
"crypto/rand"
"strings"
"testing"
"time"

Expand All @@ -21,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

type httpTestEnv struct {
Expand All @@ -36,17 +36,13 @@ type httpTestEnv struct {

func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option) httpTestEnv {
srcPrivKey, _, err := ic.GenerateECDSAKeyPair(rand.Reader)
if err != nil {
t.Fatal("Err generating private key", err)
}
require.NoError(t, err, "Err generating private key")

srcHost = test.MkTestHost(libp2p.Identity(srcPrivKey))
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcLinkSys := test.MkLinkSystem(srcStore)
httpPub, err := httpsync.NewPublisher("127.0.0.1:0", srcLinkSys, srcPrivKey)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Cleanup(func() {
httpPub.Close()
})
Expand All @@ -57,9 +53,7 @@ func setupPublisherSubscriber(t *testing.T, subscriberOptions []dagsync.Option)
dstHost := test.MkTestHost()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLinkSys, testTopic, nil, subscriberOptions...)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
t.Cleanup(func() {
sub.Close()
})
Expand All @@ -86,62 +80,41 @@ func TestManualSync(t *testing.T) {
te := setupPublisherSubscriber(t, []dagsync.Option{dagsync.BlockHook(blockHook)})

rootLnk, err := test.Store(te.srcStore, basicnode.NewString("hello world"))
if err != nil {
t.Fatal(err)
}
if err := te.pub.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid); err != nil {
t.Fatal(err)
}
require.NoError(t, err)
err = te.pub.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

syncCid, err := te.sub.Sync(ctx, te.srcHost.ID(), cid.Undef, nil, te.pubAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if !syncCid.Equals(rootLnk.(cidlink.Link).Cid) {
t.Fatalf("didn't get expected cid. expected %s, got %s", rootLnk, syncCid)
}
require.Equal(t, rootLnk.(cidlink.Link).Cid, syncCid)

_, ok := blocksSeenByHook[syncCid]
if !ok {
t.Fatal("hook did not get", syncCid)
}
require.True(t, ok, "hook did not get", syncCid)
}

func TestSyncHttpFailsUnexpectedPeer(t *testing.T) {
te := setupPublisherSubscriber(t, nil)

rootLnk, err := test.Store(te.srcStore, basicnode.NewString("hello world"))
if err != nil {
t.Fatal(err)
}
if err := te.pub.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid); err != nil {
t.Fatal(err)
}
require.NoError(t, err)
err = te.pub.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), updateTimeout)

defer cancel()
_, otherPubKey, err := ic.GenerateECDSAKeyPair(rand.Reader)
if err != nil {
t.Fatal("failed to make another peerid")
}
require.NoError(t, err, "failed to make another peerid")
otherPeerID, err := peer.IDFromPublicKey(otherPubKey)
if err != nil {
t.Fatal("failed to make another peerid")
}
require.NoError(t, err, "failed to make another peerid")

// This fails because the head msg is signed by srcHost.ID(), but we are asking this to check if it's signed by otherPeerID.
_, err = te.sub.Sync(ctx, otherPeerID, cid.Undef, nil, te.pubAddr)
if err == nil {
t.Fatalf("expected error, got nil")
}
if !strings.Contains(err.Error(), "unexpected peer") {
t.Fatalf("expected error to contain the string 'unexpected peer', got %s", err.Error())
}
require.ErrorContains(t, err, "unexpected peer")
}

func TestSyncFnHttp(t *testing.T) {
Expand All @@ -165,13 +138,8 @@ func TestSyncFnHttp(t *testing.T) {
ctx, syncncl := context.WithTimeout(context.Background(), time.Second)
defer syncncl()

var err error
if _, err = te.sub.Sync(ctx, te.srcHost.ID(), cids[0], nil, te.pubAddr); err == nil {
t.Fatal("expected error when no content to sync")
}
if !strings.Contains(err.Error(), "failed to traverse requested dag") {
t.Fatalf("expected error to contain the string 'failed to traverse requested dag', got %s", err.Error())
}
_, err := te.sub.Sync(ctx, te.srcHost.ID(), cids[0], nil, te.pubAddr)
require.ErrorContains(t, err, "failed to traverse requested dag")
syncncl()

select {
Expand All @@ -182,9 +150,8 @@ func TestSyncFnHttp(t *testing.T) {

// Assert the latestSync is updated by explicit sync when cid and selector are unset.
newHead := chainLnks[0].(cidlink.Link).Cid
if err = te.pub.UpdateRoot(context.Background(), newHead); err != nil {
t.Fatal(err)
}
err = te.pub.UpdateRoot(context.Background(), newHead)
require.NoError(t, err)

lnk := chainLnks[1]

Expand All @@ -194,59 +161,41 @@ func TestSyncFnHttp(t *testing.T) {
ctx, syncncl = context.WithTimeout(context.Background(), updateTimeout)
defer syncncl()
syncCid, err := te.sub.Sync(ctx, te.srcHost.ID(), lnk.(cidlink.Link).Cid, nil, te.pubAddr)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

if !syncCid.Equals(lnk.(cidlink.Link).Cid) {
t.Fatalf("sync'd cid unexpected %s vs %s", syncCid, lnk)
}
if _, err = te.dstStore.Get(context.Background(), datastore.NewKey(syncCid.String())); err != nil {
t.Fatalf("data not in receiver store: %v", err)
}
require.Equal(t, lnk.(cidlink.Link).Cid, syncCid, "sync'd cid unexpected")
_, err = te.dstStore.Get(context.Background(), datastore.NewKey(syncCid.String()))
require.NoError(t, err, "data not in receiver store")
syncncl()

_, ok := blocksSeenByHook[lnk.(cidlink.Link).Cid]
if !ok {
t.Fatal("block hook did not see link cid")
}
if blockHookCalls != 11 {
t.Fatalf("expected 11 block hook calls, got %d", blockHookCalls)
}
require.True(t, ok, "block hook did not see link cid")
require.Equal(t, 11, blockHookCalls)

// Assert the latestSync is not updated by explicit sync when cid is set
if te.sub.GetLatestSync(te.srcHost.ID()) != nil && assertLatestSyncEquals(te.sub, te.srcHost.ID(), curLatestSync.(cidlink.Link).Cid) != nil {
t.Fatal("Sync should not update latestSync")
if te.sub.GetLatestSync(te.srcHost.ID()) != nil {
err = assertLatestSyncEquals(te.sub, te.srcHost.ID(), curLatestSync.(cidlink.Link).Cid)
require.NoError(t, err, "Sync should not update latestSync")
}

ctx, syncncl = context.WithTimeout(context.Background(), updateTimeout)
defer syncncl()
syncCid, err = te.sub.Sync(ctx, te.srcHost.ID(), cid.Undef, nil, te.pubAddr)
if err != nil {
t.Fatal(err)
}
if !syncCid.Equals(newHead) {
t.Fatalf("sync'd cid unexpected %s vs %s", syncCid, lnk)
}
if _, err = te.dstStore.Get(context.Background(), datastore.NewKey(syncCid.String())); err != nil {
t.Fatalf("data not in receiver store: %v", err)
}
require.NoError(t, err)
require.Equal(t, newHead, syncCid, "sync'd cid unexpected")
_, err = te.dstStore.Get(context.Background(), datastore.NewKey(syncCid.String()))
require.NoError(t, err, "data not in receiver store")
syncncl()

select {
case <-time.After(updateTimeout):
t.Fatal("timed out waiting for sync from published update")
case syncFin, open := <-watcher:
if !open {
t.Fatal("sync finished channel closed with no event")
}
if syncFin.Cid != newHead {
t.Fatalf("Should have been updated to %s, got %s", newHead, syncFin.Cid)
}
require.True(t, open, "sync finished channel closed with no event")
require.Equal(t, newHead, syncFin.Cid)
}
cancelWatcher()

if err = assertLatestSyncEquals(te.sub, te.srcHost.ID(), newHead); err != nil {
t.Fatal(err)
}
err = assertLatestSyncEquals(te.sub, te.srcHost.ID(), newHead)
require.NoError(t, err)
}
Loading

0 comments on commit 2e37434

Please sign in to comment.