Skip to content

Commit

Permalink
itest: refactor send test subscription handling
Browse files Browse the repository at this point in the history
In this commit we slightly refactor the event stream subscription
handling in the send integration test to make sure we correctly time out
in case we don't receive the required events.
  • Loading branch information
guggero committed Mar 19, 2024
1 parent 958916e commit 4a1c14d
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 83 deletions.
158 changes: 75 additions & 83 deletions itest/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"context"
"encoding/hex"
"fmt"
"io"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -41,10 +39,7 @@ func testBasicSendUnidirectional(t *harnessTest) {
)

// Subscribe to receive assent send events from primary tapd node.
eventNtfns, err := t.tapd.SubscribeSendAssetEventNtfns(
ctxb, &tapdevrpc.SubscribeSendAssetEventNtfnsRequest{},
)
require.NoError(t.t, err)
events := SubscribeSendEvents(t.t, t.tapd)

// Test to ensure that we execute the transaction broadcast state.
// This test is executed in a goroutine to ensure that we can receive
Expand All @@ -64,10 +59,9 @@ func testBasicSendUnidirectional(t *harnessTest) {
}

timeout := 2 * defaultProofTransferReceiverAckTimeout
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

assertAssetSendNtfsEvent(
t, ctx, eventNtfns, targetEventSelector, numSends,
t, events, timeout, targetEventSelector, numSends,
)
}()

Expand Down Expand Up @@ -122,7 +116,7 @@ func testBasicSendUnidirectional(t *harnessTest) {
}

// Close event stream.
err = eventNtfns.CloseSend()
err = events.CloseSend()
require.NoError(t.t, err)

wg.Wait()
Expand All @@ -146,10 +140,7 @@ func testRestartReceiverCheckBalance(t *harnessTest) {
)

// Subscribe to receive assent send events from primary tapd node.
eventNtfns, err := t.tapd.SubscribeSendAssetEventNtfns(
ctxb, &tapdevrpc.SubscribeSendAssetEventNtfnsRequest{},
)
require.NoError(t.t, err)
events := SubscribeSendEvents(t.t, t.tapd)

// Test to ensure that we execute the transaction broadcast state.
// This test is executed in a goroutine to ensure that we can receive
Expand All @@ -169,10 +160,9 @@ func testRestartReceiverCheckBalance(t *harnessTest) {
}

timeout := 2 * defaultProofTransferReceiverAckTimeout
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

assertAssetSendNtfsEvent(
t, ctx, eventNtfns, targetEventSelector, 1,
t, events, timeout, targetEventSelector, 1,
)
}()

Expand Down Expand Up @@ -236,7 +226,7 @@ func testRestartReceiverCheckBalance(t *harnessTest) {
AssertNonInteractiveRecvComplete(t.t, recvTapd, 1)

// Close event stream.
err = eventNtfns.CloseSend()
err = events.CloseSend()
require.NoError(t.t, err)

wg.Wait()
Expand Down Expand Up @@ -587,10 +577,7 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) {
)

// Subscribe to receive asset send events from primary tapd node.
eventNtfns, err := sendTapd.SubscribeSendAssetEventNtfns(
ctxb, &tapdevrpc.SubscribeSendAssetEventNtfnsRequest{},
)
require.NoError(t.t, err)
events := SubscribeSendEvents(t.t, sendTapd)

// Test to ensure that we receive the expected number of backoff wait
// event notifications.
Expand Down Expand Up @@ -622,11 +609,9 @@ func testReattemptFailedSendHashmailCourier(t *harnessTest) {
defaultProofTransferReceiverAckTimeout
// Add overhead buffer to context timeout.
timeout += 5 * time.Second
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

assertAssetSendNtfsEvent(
t, ctx, eventNtfns, targetEventSelector,
t, events, timeout, targetEventSelector,
expectedEventCount,
)
}()
Expand Down Expand Up @@ -687,10 +672,7 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
)

// Subscribe to receive asset send events from the sending tapd node.
eventNtfns, err := sendTapd.SubscribeSendAssetEventNtfns(
ctxb, &tapdevrpc.SubscribeSendAssetEventNtfnsRequest{},
)
require.NoError(t.t, err)
events := SubscribeSendEvents(t.t, sendTapd)

// Test to ensure that we receive the expected number of backoff wait
// event notifications.
Expand Down Expand Up @@ -722,11 +704,9 @@ func testReattemptFailedSendUniCourier(t *harnessTest) {
defaultProofTransferReceiverAckTimeout
// Add overhead buffer to context timeout.
timeout += 5 * time.Second
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

assertAssetSendNtfsEvent(
t, ctx, eventNtfns, targetEventSelector,
t, events, timeout, targetEventSelector,
expectedEventCount,
)
}()
Expand Down Expand Up @@ -846,10 +826,7 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {
// Subscribe to receive asset receive events from receiving tapd node.
// We'll use these events to ensure that the receiver node is making
// multiple attempts to retrieve the asset proof.
eventNtfns, err := receiveTapd.SubscribeReceiveAssetEventNtfns(
ctxb, &tapdevrpc.SubscribeReceiveAssetEventNtfnsRequest{},
)
require.NoError(t.t, err)
events := SubscribeReceiveEvents(t.t, receiveTapd)

// Test to ensure that we receive the minimum expected number of backoff
// wait event notifications.
Expand Down Expand Up @@ -883,13 +860,11 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {
defaultProofTransferReceiverAckTimeout
// Add overhead buffer to context timeout.
timeout += 5 * time.Second
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

// Assert that the receiver tapd node has accomplished our minimum
// expected number of backoff procedure receive attempts.
assertAssetRecvNtfsEvent(
t, ctx, eventNtfns, targetEventSelector, expectedEventCount,
t, timeout, events, targetEventSelector, expectedEventCount,
)

t.Logf("Finished waiting for the receiving tapd node to complete " +
Expand All @@ -911,7 +886,7 @@ func testReattemptFailedReceiveUniCourier(t *harnessTest) {
// transfer and publishes an asset recv complete event.
t.Logf("Check for asset recv complete event from receiver tapd node")
assertAssetRecvCompleteEvent(
t, ctxb, 5*time.Second, recvAddr.Encoded, eventNtfns,
t, 5*time.Second, recvAddr.Encoded, events,
)
}

Expand Down Expand Up @@ -947,10 +922,7 @@ func testOfflineReceiverEventuallyReceives(t *harnessTest) {
recvTapd := t.tapd

// Subscribe to receive asset send events from primary tapd node.
eventNtfns, err := sendTapd.SubscribeSendAssetEventNtfns(
ctxb, &tapdevrpc.SubscribeSendAssetEventNtfnsRequest{},
)
require.NoError(t.t, err)
events := SubscribeSendEvents(t.t, sendTapd)

// Test to ensure that we receive the expected number of backoff wait
// event notifications.
Expand Down Expand Up @@ -979,11 +951,9 @@ func testOfflineReceiverEventuallyReceives(t *harnessTest) {

// Events must be received before a timeout.
timeout := 5 * time.Second
ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()

assertAssetSendNtfsEvent(
t, ctx, eventNtfns, targetEventSelector,
t, events, timeout, targetEventSelector,
expectedEventCount,
)
}()
Expand Down Expand Up @@ -1034,36 +1004,48 @@ func testOfflineReceiverEventuallyReceives(t *harnessTest) {
// assertAssetSendNtfsEvent asserts that the given asset send event notification
// was received. This function will block until the event is received or the
// event stream is closed.
func assertAssetSendNtfsEvent(t *harnessTest, ctx context.Context,
eventNtfns tapdevrpc.TapDev_SubscribeSendAssetEventNtfnsClient,
func assertAssetSendNtfsEvent(t *harnessTest,
stream *eventSubscription[*tapdevrpc.SendAssetEvent],
timeout time.Duration,
targetEventSelector func(*tapdevrpc.SendAssetEvent) bool,
expectedCount int) {

success := make(chan struct{})
timeoutChan := time.After(timeout)

// To make sure we don't forever hang on receiving on the stream, we'll
// cancel it after the timeout.
go func() {
select {
case <-timeoutChan:
stream.cancel()

case <-success:
}
}()

countFound := 0
for {
// Ensure that the context has not been cancelled.
require.NoError(t.t, ctx.Err())
select {
case <-stream.Context().Done():
require.NoError(t.t, stream.Context().Err())

if countFound == expectedCount {
break
default:
}

event, err := eventNtfns.Recv()
if countFound == expectedCount {
break
}

// Break if we get an EOF, which means the stream was
// closed.
//
// Use string comparison here because the RPC protocol
// does not transport wrapped error structures.
if err != nil &&
strings.Contains(err.Error(), io.EOF.Error()) {
event, err := stream.Recv()
if err != nil {
require.NoError(t.t, err)

break
}

// If err is not EOF, then we expect it to be nil.
require.NoError(t.t, err)

// Check for target state.
if targetEventSelector(event) {
countFound++
Expand All @@ -1076,35 +1058,48 @@ func assertAssetSendNtfsEvent(t *harnessTest, ctx context.Context,
// assertAssetRecvNtfsEvent asserts that the given asset receive event
// notification was received. This function will block until the event is
// received or the event stream is closed.
func assertAssetRecvNtfsEvent(t *harnessTest, ctx context.Context,
eventNtfns tapdevrpc.TapDev_SubscribeReceiveAssetEventNtfnsClient,
func assertAssetRecvNtfsEvent(t *harnessTest, timeout time.Duration,
stream *eventSubscription[*tapdevrpc.ReceiveAssetEvent],
targetEventSelector func(event *tapdevrpc.ReceiveAssetEvent) bool,
expectedCount int) {

success := make(chan struct{})
timeoutChan := time.After(timeout)

// To make sure we don't forever hang on receiving on the stream, we'll
// cancel it after the timeout.
go func() {
select {
case <-timeoutChan:
stream.cancel()

case <-success:
}
}()

countFound := 0
for {
// Ensure that the context has not been cancelled.
require.NoError(t.t, ctx.Err())
select {
case <-stream.Context().Done():
require.NoError(t.t, stream.Context().Err())

if countFound == expectedCount {
break
default:
}

event, err := eventNtfns.Recv()

// Break if we get an EOF, which means the stream was
// closed.
//
// Use string comparison here because the RPC protocol
// does not transport wrapped error structures.
if err != nil &&
strings.Contains(err.Error(), io.EOF.Error()) {
if countFound == expectedCount {
close(success)

break
}

// If err is not EOF, then we expect it to be nil.
require.NoError(t.t, err)
event, err := stream.Recv()
if err != nil {
require.NoError(t.t, err)

break
}

// Check for target state.
if targetEventSelector(event) {
Expand All @@ -1120,12 +1115,9 @@ func assertAssetRecvNtfsEvent(t *harnessTest, ctx context.Context,
// assertAssetRecvNtfsEvent asserts that the given asset receive complete event
// notification was received. This function will block until the event is
// received or the event stream is closed.
func assertAssetRecvCompleteEvent(t *harnessTest, ctxb context.Context,
func assertAssetRecvCompleteEvent(t *harnessTest,
timeout time.Duration, encodedAddr string,
eventNtfns tapdevrpc.TapDev_SubscribeReceiveAssetEventNtfnsClient) {

ctx, cancel := context.WithTimeout(ctxb, timeout)
defer cancel()
stream *eventSubscription[*tapdevrpc.ReceiveAssetEvent]) {

eventSelector := func(event *tapdevrpc.ReceiveAssetEvent) bool {
switch eventTyped := event.Event.(type) {
Expand All @@ -1137,7 +1129,7 @@ func assertAssetRecvCompleteEvent(t *harnessTest, ctxb context.Context,
}
}

assertAssetRecvNtfsEvent(t, ctx, eventNtfns, eventSelector, 1)
assertAssetRecvNtfsEvent(t, timeout, stream, eventSelector, 1)
}

// testMultiInputSendNonInteractiveSingleID tests that we can properly
Expand Down
Loading

0 comments on commit 4a1c14d

Please sign in to comment.