forked from Benzinga/go-bztcp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
226 lines (191 loc) · 4.71 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package bztcp
import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
"net"
"time"
)
// Conn implements a Benzinga TCP connection.
type Conn struct {
socket net.Conn
reader *bufio.Reader
}
// Dial connects to a Benzinga TCP server.
func Dial(addr, user, key string) (*Conn, error) {
return DialTimeout(addr, user, key, AuthTimeout)
}
// DialTLS connects to a Benzinga TCP server using TLS.
func DialTLS(addr, user, key string) (*Conn, error) {
return DialTimeoutTLS(addr, user, key, AuthTimeout)
}
// DialTimeout connects to Benzinga TCP with a timeout.
func DialTimeout(addr, user, key string, d time.Duration) (*Conn, error) {
socket, err := net.DialTimeout("tcp", addr, d)
if err != nil {
return nil, err
}
return NewConn(socket, user, key)
}
// DialTimeoutTLS connects to Benzinga TCP with a timeout using TLS.
func DialTimeoutTLS(addr, user, key string, d time.Duration) (*Conn, error) {
socket, err := tls.DialWithDialer(&net.Dialer{Timeout: d}, "tcp", addr, nil)
if err != nil {
return nil, err
}
return NewConn(socket, user, key)
}
// NewConn connects to TCP using an already-configured socket.
func NewConn(socket net.Conn, user, key string) (*Conn, error) {
conn := Conn{
socket: socket,
reader: bufio.NewReader(socket),
}
// Attempt to authenticate with credentials.
err := conn.authenticate(user, key)
if err != nil {
socket.Close()
return nil, err
}
// Attempt to enable TCP keep alive.
if tcpconn, ok := socket.(*net.TCPConn); ok {
tcpconn.SetKeepAlive(true)
}
return &conn, nil
}
// Stream watches the connection, calling the cb function whenever it
// encounters a stream message. This function will exit when the connection
// closes, or when the context is closed. If the context is closed, the
// connection will be closed.
func (c *Conn) Stream(ctx context.Context, cb func(d StreamData)) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Ping thread
go func() {
t := time.NewTicker(PingDuration)
defer t.Stop()
for {
select {
case <-t.C:
c.Send("PING", PingData{time.Now().Format(TimeFormat)})
case <-ctx.Done():
c.socket.Close()
return
}
}
}()
// Read loop
for {
// Read next message.
msg, err := c.Recv()
// Discard error if the context was closed while reading.
select {
case <-ctx.Done():
return nil
default:
}
if err != nil {
return err
}
// Interpret and handle message.
switch msg.Status {
case "PONG":
// do nothing
case "STREAM":
data := StreamData{}
err := json.Unmarshal(msg.Data, &data)
if err != nil {
return err
}
cb(data)
}
}
}
// Recv gets the next message in the stream. This function is low-level and
// not necessary; most users should use Stream instead.
func (c *Conn) Recv() (Message, error) {
// Read line from TCP socket.
message := Message{}
line, err := c.reader.ReadBytes('\n')
if err != nil {
return Message{}, err
}
// Deserialize message.
err = message.Decode(line)
if err != nil {
return Message{}, err
}
return message, nil
}
// Send sends a message to the server. This function is low-level and not
// usually necessary; most users should use Stream instead.
func (c *Conn) Send(status string, body interface{}) error {
// Serialize message body.
msg, err := NewMessage(status, body)
if err != nil {
return err
}
// Encode to socket.
_, err = c.socket.Write(msg.Encode())
if err != nil {
return err
}
return nil
}
// authenticate handles the authentication handshake.
//
// The exchange generally looks like this:
//
// < READY=BZEOT
// > AUTH: {"username":"bztest","key":"12345"}=BZEOT
// < CONNECTED=BZEOT
//
// In error cases, such as an invalid key, you may see
// an error response instead of `CONNECTED`:
//
// - "INVALID KEY FORMAT": An error occurred decoding the authdata.
// - "INVALID KEY": The user or key was not valid.
//
// An example of such transmission follows:
//
// < READY=BZEOT
// > AUTH: {"username":"bztest",}=BZEOT
// < INVALID KEY FORMAT=BZEOT
//
func (c *Conn) authenticate(user, key string) error {
c.socket.SetDeadline(time.Now().Add(AuthTimeout))
// Read 'READY' message.
msg, err := c.Recv()
if err != nil {
return err
} else if msg.Status != "READY" {
return ErrInvalidReady
}
// Write 'AUTH' message.
err = c.Send("AUTH", AuthData{
Username: user,
Key: key,
})
if err != nil {
return err
}
// Read 'AUTH' response message.
msg, err = c.Recv()
if err != nil {
return err
}
// Clear the deadline.
c.socket.SetDeadline(time.Time{})
// Handle message status.
switch msg.Status {
case "INVALID KEY FORMAT":
return ErrInvalidKeyFormat
case "INVALID KEY":
return ErrInvalidKey
case "CONNECTED":
return nil
default:
return ErrInvalidAuthResponse
}
}