forked from Azure/azure-service-bus-go
-
Notifications
You must be signed in to change notification settings - Fork 2
/
rpc_test.go
118 lines (90 loc) · 3.25 KB
/
rpc_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package servicebus
import (
"context"
"testing"
"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/Azure/go-amqp"
"github.com/stretchr/testify/require"
)
func TestRPCLinkCaching(t *testing.T) {
t.Parallel()
ctx := context.Background()
t.Run("WithClose", func(t *testing.T) {
fake := createFakeRPCClient()
// validate our internal cache properly tracked the link
// now getting the link again should get us the cached link.
addCachedLink(ctx, t, fake)
// close the client itself
require.NoError(t, fake.rpcClient.Close())
require.True(t, fake.amqpClientClosed, "AMQP client was closed")
require.Empty(t, fake.rpcClient.linkCache, "link cache is cleared when PRC client is closed")
})
t.Run("WithRecover", func(t *testing.T) {
fake := createFakeRPCClient()
// validate our internal cache properly tracked the link
// now getting the link again should get us the cached link.
addCachedLink(ctx, t, fake)
// Recover the client instead.
require.NoError(t, fake.rpcClient.Recover(ctx))
require.True(t, fake.amqpClientClosed, "AMQP client was closed")
require.Empty(t, fake.rpcClient.linkCache, "link cache is cleared when PRC client recovers")
// sanity check: just make sure nothing breaks if we close afterwards
require.NoError(t, fake.rpcClient.Close())
})
}
func addCachedLink(ctx context.Context, t *testing.T, fake *fakeRPCClient) {
createdLink, err := fake.rpcClient.getCachedLink(ctx, "an address")
require.NoError(t, err)
require.EqualValues(t, 1, len(fake.createdLinks))
require.NotNil(t, createdLink)
linkFromInternalCache, exists := fake.rpcClient.linkCache["an address"]
require.True(t, exists)
require.Same(t, linkFromInternalCache, createdLink)
sameOldLink, err := fake.rpcClient.getCachedLink(ctx, "an address")
require.NoError(t, err)
require.Same(t, sameOldLink, createdLink, "Same link instance should always be returned")
}
type fakeRPCClient struct {
*rpcClient
newAMQPClientCreated bool
amqpClientClosed bool
createdLinks []*rpc.Link
}
func createFakeRPCClient() *fakeRPCClient {
fake := &fakeRPCClient{}
fake.rpcClient = &rpcClient{
ec: &fakeEntityConnector{},
linkCache: map[string]*rpc.Link{},
newAMQPClient: func(ctx context.Context, ec entityConnector) (*amqp.Client, func() <-chan struct{}, error) {
fake.newAMQPClientCreated = true
ch := make(chan struct{})
close(ch) // NOTE, it's just a little lie (ie - we'll just pretend cancellation of creds refresh worked INSTANTLY)
return &amqp.Client{}, func() <-chan struct{} {
return ch
}, nil
},
newRPCLink: func(conn *amqp.Client, address string, opts ...rpc.LinkOption) (*rpc.Link, error) {
fake.createdLinks = append(fake.createdLinks, &rpc.Link{})
return fake.createdLinks[len(fake.createdLinks)-1], nil
},
closeAMQPClient: func() error {
fake.amqpClientClosed = true
return nil
},
}
return fake
}
type fakeEntityConnector struct{}
func (fc *fakeEntityConnector) ManagementPath() string {
return "ThisIsTheManagementPath"
}
func (fc *fakeEntityConnector) Namespace() *Namespace {
return &Namespace{
amqpDial: func(addr string, opts ...amqp.ConnOption) (*amqp.Client, error) {
return &amqp.Client{}, nil
},
}
}
func (fc *fakeEntityConnector) getEntity() *entity {
return nil
}