forked from donovanhide/eventsource
-
Notifications
You must be signed in to change notification settings - Fork 8
/
decoder.go
178 lines (162 loc) · 5.01 KB
/
decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package eventsource
import (
"bufio"
"io"
"strconv"
"strings"
"time"
)
type publication struct {
id, event, data, lastEventID string
retry int64
}
//nolint:revive,stylecheck // should be ID; retained for backward compatibility
func (s *publication) Id() string { return s.id }
func (s *publication) Event() string { return s.event }
func (s *publication) Data() string { return s.data }
func (s *publication) Retry() int64 { return s.retry }
// LastEventID is from a separate interface, EventWithLastID
func (s *publication) LastEventID() string { return s.lastEventID }
// A Decoder is capable of reading Events from a stream.
type Decoder struct {
linesCh <-chan string
errorCh <-chan error
readTimeout time.Duration
lastEventID string
}
// DecoderOption is a common interface for optional configuration parameters that can be
// used in creating a Decoder.
type DecoderOption interface {
apply(s *Decoder)
}
type readTimeoutDecoderOption time.Duration
func (o readTimeoutDecoderOption) apply(d *Decoder) {
d.readTimeout = time.Duration(o)
}
type lastEventIDDecoderOption string
func (o lastEventIDDecoderOption) apply(d *Decoder) {
d.lastEventID = string(o)
}
// DecoderOptionReadTimeout returns an option that sets the read timeout interval for a
// Decoder when the Decoder is created. If the Decoder does not receive new data within this
// length of time, it will return an error. By default, there is no read timeout.
func DecoderOptionReadTimeout(timeout time.Duration) DecoderOption {
return readTimeoutDecoderOption(timeout)
}
// DecoderOptionLastEventID returns an option that sets the last event ID property for a
// Decoder when the Decoder is created. This allows the last ID to be included in new
// events if they do not override it.
func DecoderOptionLastEventID(lastEventID string) DecoderOption {
return lastEventIDDecoderOption(lastEventID)
}
// NewDecoder returns a new Decoder instance that reads events with the given io.Reader.
func NewDecoder(r io.Reader) *Decoder {
bufReader := bufio.NewReader(newNormaliser(r))
linesCh, errorCh := newLineStreamChannel(bufReader)
return &Decoder{
linesCh: linesCh,
errorCh: errorCh,
}
}
// NewDecoderWithOptions returns a new Decoder instance that reads events with the given
// io.Reader, with optional configuration parameters.
func NewDecoderWithOptions(r io.Reader, options ...DecoderOption) *Decoder {
d := NewDecoder(r)
for _, o := range options {
o.apply(d)
}
return d
}
// Decode reads the next Event from a stream (and will block until one
// comes in).
// Graceful disconnects (between events) are indicated by an io.EOF error.
// Any error occurring mid-event is considered non-graceful and will
// show up as some other error (most likely io.ErrUnexpectedEOF).
func (dec *Decoder) Decode() (Event, error) {
pub := new(publication)
inDecoding := false
var timeoutTimer *time.Timer
var timeoutCh <-chan time.Time
if dec.readTimeout > 0 {
timeoutTimer = time.NewTimer(dec.readTimeout)
defer timeoutTimer.Stop()
timeoutCh = timeoutTimer.C
}
ReadLoop:
for {
select {
case line := <-dec.linesCh:
if timeoutTimer != nil {
if !timeoutTimer.Stop() {
<-timeoutCh
}
timeoutTimer.Reset(dec.readTimeout)
}
if line == "\n" && inDecoding {
// the empty line signals the end of an event
break ReadLoop
} else if line == "\n" && !inDecoding {
// only a newline was sent, so we don't want to publish an empty event but try to read again
continue ReadLoop
}
line = strings.TrimSuffix(line, "\n")
if strings.HasPrefix(line, ":") {
continue ReadLoop
}
sections := strings.SplitN(line, ":", 2)
field, value := sections[0], ""
if len(sections) == 2 {
value = strings.TrimPrefix(sections[1], " ")
}
inDecoding = true
switch field {
case "event":
pub.event = value
case "data":
pub.data += value + "\n"
case "id":
if !strings.ContainsRune(value, 0) {
pub.id = value
dec.lastEventID = value
}
case "retry":
pub.retry, _ = strconv.ParseInt(value, 10, 64)
}
case err := <-dec.errorCh:
if err == io.ErrUnexpectedEOF && !inDecoding {
// if we're not in the middle of an event then just return EOF
err = io.EOF
} else if err == io.EOF && inDecoding {
// if we are in the middle of an event then EOF is unexpected
err = io.ErrUnexpectedEOF
}
return nil, err
case <-timeoutCh:
return nil, ErrReadTimeout
}
}
pub.data = strings.TrimSuffix(pub.data, "\n")
pub.lastEventID = dec.lastEventID
return pub, nil
}
/**
* Returns a channel that will receive lines of text as they are read. On any error
* from the underlying reader, it stops and posts the error to a second channel.
*/
func newLineStreamChannel(r *bufio.Reader) (<-chan string, <-chan error) {
linesCh := make(chan string)
errorCh := make(chan error)
go func() {
defer close(linesCh)
defer close(errorCh)
for {
line, err := r.ReadString('\n')
if err != nil {
errorCh <- err
return
}
linesCh <- line
}
}()
return linesCh, errorCh
}