diff --git a/pubsub_test.go b/pubsub_test.go index a89ba67..68ac541 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -3,6 +3,7 @@ package namesys import ( "bytes" "context" + "errors" "fmt" "testing" "time" @@ -146,7 +147,8 @@ func TestEarlyPublish(t *testing.T) { } // Wait for Fetch protocol to retrieve data - time.Sleep(time.Second * 1) + waitForPropagation(ctx, t, vss, key) + for i, vs := range vss { checkValue(ctx, t, i, vs, key, val) } @@ -168,7 +170,7 @@ func TestPubsubPublishSubscribe(t *testing.T) { } // let the flood propagate - time.Sleep(time.Second * 1) + waitForPropagation(ctx, t, vss, key) for i, vs := range vss { checkValue(ctx, t, i, vs, key, val) } @@ -180,7 +182,7 @@ func TestPubsubPublishSubscribe(t *testing.T) { } // let the flood propagate - time.Sleep(time.Second * 1) + waitForPropagation(ctx, t, vss, key) for i, vs := range vss { checkValue(ctx, t, i, vs, key, val) } @@ -193,7 +195,7 @@ func TestPubsubPublishSubscribe(t *testing.T) { } // let the flood propagate - time.Sleep(time.Second * 1) + waitForPropagation(ctx, t, vss, key) for i, vs := range vss { checkValue(ctx, t, i, vs, key, val) } @@ -206,7 +208,7 @@ func TestPubsubPublishSubscribe(t *testing.T) { } // let the flood propagate - time.Sleep(time.Second * 1) + waitForPropagation(ctx, t, vss, key) for i, vs := range vss { checkValue(ctx, t, i, vs, key, val) } @@ -227,7 +229,8 @@ func TestPubsubPublishSubscribe(t *testing.T) { } // let the flood propagate - time.Sleep(time.Second * 1) + waitForPropagation(ctx, t, vss, key2) + waitForPropagation(ctx, t, vss, key) for i, vs := range vss { checkValue(ctx, t, i, vs, key2, nval) checkValue(ctx, t, i, vs, key, val) @@ -254,7 +257,7 @@ func TestPubsubPublishSubscribe(t *testing.T) { } // check that we get the new value - time.Sleep(time.Second * 1) + waitForPropagation(ctx, t, vss, key) for i, vs := range vss { checkValue(ctx, t, i, vs, key, nval) } @@ -392,3 +395,50 @@ func checkValue(ctx context.Context, t *testing.T, i int, vs routing.ValueStore, t.Fatalf("[ValueStore %d] unexpected value: expected '%s', got '%s'", i, val, xval) } } + +func waitForPropagation(ctx context.Context, t *testing.T, vss []*PubsubValueStore, key string) { + t.Helper() + + condition := func(ctx context.Context) (bool, error) { + for _, vs := range vss { + _, err := vs.GetValue(ctx, key) + if err == nil { + continue + } else if errors.Is(err, routing.ErrNotFound) { + // Not propogated yet + return false, nil + } else { + // Some other error occured + return false, err + } + } + + // No errors, all done + return true, nil + } + + err := waitUntil(ctx, condition, 100*time.Millisecond) + if err != nil { + t.Fatalf("[ValueStore] vssolve failed: %v", err) + } +} + +func waitUntil(ctx context.Context, condition func(context.Context) (bool, error), interval time.Duration) error { + tick := time.NewTicker(interval) + defer tick.Stop() + + for { + select { + case <-tick.C: + done, err := condition(ctx) + if err != nil { + return err + } + if done { + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } +}