forked from rabbitmq/amqp091-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection_test.go
211 lines (167 loc) · 5.73 KB
/
connection_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build integration
package amqp091
import (
"crypto/tls"
"net"
"sync"
"testing"
"time"
)
func TestRequiredServerLocale(t *testing.T) {
conn := integrationConnection(t, "AMQP 0-9-1 required server locale")
requiredServerLocale := defaultLocale
for _, locale := range conn.Locales {
if locale == requiredServerLocale {
return
}
}
t.Fatalf("AMQP 0-9-1 server must support at least the %s locale, server sent the following locales: %#v", requiredServerLocale, conn.Locales)
}
func TestDefaultConnectionLocale(t *testing.T) {
conn := integrationConnection(t, "client default locale")
if conn.Config.Locale != defaultLocale {
t.Fatalf("Expected default connection locale to be %s, is was: %s", defaultLocale, conn.Config.Locale)
}
}
func TestChannelOpenOnAClosedConnectionFails(t *testing.T) {
conn := integrationConnection(t, "channel on close")
conn.Close()
if _, err := conn.Channel(); err != ErrClosed {
t.Fatalf("channel.open on a closed connection %#v is expected to fail", conn)
}
}
// TestChannelOpenOnAClosedConnectionFails_ReleasesAllocatedChannel ensures the
// channel allocated is released if opening the channel fails.
func TestChannelOpenOnAClosedConnectionFails_ReleasesAllocatedChannel(t *testing.T) {
conn := integrationConnection(t, "releases channel allocation")
conn.Close()
before := len(conn.channels)
if _, err := conn.Channel(); err != ErrClosed {
t.Fatalf("channel.open on a closed connection %#v is expected to fail", conn)
}
if len(conn.channels) != before {
t.Fatalf("channel.open failed, but the allocated channel was not released")
}
}
// TestRaceBetweenChannelAndConnectionClose ensures allocating a new channel
// does not race with shutting the connection down.
//
// See https://github.com/streadway/amqp/issues/251 - thanks to jmalloc for the
// test case.
func TestRaceBetweenChannelAndConnectionClose(t *testing.T) {
defer time.AfterFunc(10*time.Second, func() { panic("Close deadlock") }).Stop()
conn := integrationConnection(t, "allocation/shutdown race")
go conn.Close()
for i := 0; i < 10; i++ {
go func() {
ch, err := conn.Channel()
if err == nil {
ch.Close()
}
}()
}
}
// TestRaceBetweenChannelShutdownAndSend ensures closing a channel
// (channel.shutdown) does not race with calling channel.send() from any other
// goroutines.
//
// See https://github.com/streadway/amqp/pull/253#issuecomment-292464811 for
// more details - thanks to jmalloc again.
func TestRaceBetweenChannelShutdownAndSend(t *testing.T) {
defer time.AfterFunc(10*time.Second, func() { panic("Close deadlock") }).Stop()
conn := integrationConnection(t, "channel close/send race")
defer conn.Close()
ch, _ := conn.Channel()
go ch.Close()
for i := 0; i < 10; i++ {
go func() {
// ch.Ack calls ch.send() internally.
ch.Ack(42, false)
}()
}
}
func TestQueueDeclareOnAClosedConnectionFails(t *testing.T) {
conn := integrationConnection(t, "queue declare on close")
ch, _ := conn.Channel()
conn.Close()
if _, err := ch.QueueDeclare("an example", false, false, false, false, nil); err != ErrClosed {
t.Fatalf("queue.declare on a closed connection %#v is expected to return ErrClosed, returned: %#v", conn, err)
}
}
func TestConcurrentClose(t *testing.T) {
const concurrency = 32
conn := integrationConnection(t, "concurrent close")
defer conn.Close()
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
err := conn.Close()
if err == nil {
t.Log("first concurrent close was successful")
return
}
if err == ErrClosed {
t.Log("later concurrent close were successful and returned ErrClosed")
return
}
// BUG(st) is this really acceptable? we got a net.OpError before the
// connection was marked as closed means a race condition between the
// network connection and handshake state. It should be a package error
// returned.
if _, neterr := err.(*net.OpError); neterr {
t.Logf("unknown net.OpError during close, ignoring: %+v", err)
return
}
// A different/protocol error occurred indicating a race or missed condition
if _, other := err.(*Error); other {
t.Fatalf("Expected no error, or ErrClosed, or a net.OpError from conn.Close(), got %#v (%s) of type %T", err, err, err)
}
}()
}
wg.Wait()
}
// TestPlaintextDialTLS ensures amqp:// connections succeed when using DialTLS.
func TestPlaintextDialTLS(t *testing.T) {
uri, err := ParseURI(integrationURLFromEnv())
if err != nil {
t.Fatalf("parse URI error: %s", err)
}
// We can only test when we have a plaintext listener
if uri.Scheme != "amqp" {
t.Skip("requires server listening for plaintext connections")
}
conn, err := DialTLS(uri.String(), &tls.Config{MinVersion: tls.VersionTLS12})
if err != nil {
t.Fatalf("unexpected dial error, got %v", err)
}
conn.Close()
}
// TestIsClosed will test the public method IsClosed on a connection.
func TestIsClosed(t *testing.T) {
conn := integrationConnection(t, "public IsClosed()")
if conn.IsClosed() {
t.Fatalf("connection expected to not be marked as closed")
}
conn.Close()
if !conn.IsClosed() {
t.Fatal("connection expected to be marked as closed")
}
}
// TestChannelIsClosed will test the public method IsClosed on a channel.
func TestChannelIsClosed(t *testing.T) {
conn := integrationConnection(t, "public channel.IsClosed()")
ch, _ := conn.Channel()
if ch.IsClosed() {
t.Fatalf("channel expected to not be marked as closed")
}
ch.Close()
if !ch.IsClosed() {
t.Fatal("channel expected to be marked as closed")
}
}