-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender.go
161 lines (139 loc) · 4.26 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
Send events to Keen.io asynchronously in batches.
*/
package keengo
import (
"encoding/json"
"log"
"net/http"
"strings"
"time"
)
const (
// The Keen.IO API URL
url string = "https://api.keen.io/3.0/projects/"
// The size of the queue to the background goroutine
channel_size int = 100
// The background routine will send batches of events up to this size
send_threshold int = 90
)
// Type for queuing events to the background
type analyticsEvent struct {
eventCollection string
data interface{}
}
type Sender struct {
projectId string
writeKey string
url string // The url to post events too, including project details
events map[string][]interface{} // For batching events as we pull them off the channel
count int // Number of events batched and ready to send
channel chan analyticsEvent // For queuing events to the background
done chan bool // For clean exiting
}
/*
Create a new Sender.
This creates a background goroutine to aggregate and send your events.
*/
func NewSender(projectId, writeKey string) *Sender {
sender := &Sender{
projectId: projectId,
writeKey: writeKey,
channel: make(chan analyticsEvent, channel_size),
done: make(chan bool),
}
sender.url = url + sender.projectId + "/events?api_key=" + sender.writeKey
sender.reset()
go sender.run()
return sender
}
/*
Queue events to be sent to Keen.io
info can be anything that is JSON serializable. Events are immediately queued to a background goroutine for sending. The
background routine will send everything that's queued to it in a batch, then wait for new data.
The upshot is that if you send events slowly they will be sent immediately and individually, but if you send events quickly they will be batched
*/
func (sender *Sender) Queue(eventCollection string, info interface{}) {
sender.channel <- analyticsEvent{eventCollection, info}
}
/*
Close the sender and wait for queued events to be sent
*/
func (sender *Sender) Close() {
// Closing the channel signals the background thread to exit
close(sender.channel)
// Wait for the background thread to signal it has flushed all events and exited
<-sender.done
}
// Add an event to the map that's used to batch events
func (sender *Sender) add(event analyticsEvent) bool {
if event.eventCollection == "" {
// nil event, don't add
return false
}
sender.events[event.eventCollection] = append(sender.events[event.eventCollection], event.data)
sender.count++
if sender.count > send_threshold {
sender.send()
}
return true
}
// Reset the event map that's used to batch events
func (sender *Sender) reset() {
sender.events = make(map[string][]interface{}, 10)
sender.count = 0
}
// Send the events currently in sender.events
func (sender *Sender) send() {
if sender.count == 0 {
return
}
// Whether we can send the events or not, we dump them before exiting this function
defer sender.reset()
// Convert data to JSON
data, err := json.Marshal(sender.events)
if err != nil {
log.Printf("Couldn't marshal json for analytics. %v\n", err)
return
}
start := time.Now()
rsp, err := http.Post(sender.url, "application/json", strings.NewReader(string(data)))
if err != nil {
log.Printf("Failed to post analytics events. %v\n", err)
return
}
defer rsp.Body.Close()
if rsp.StatusCode != http.StatusOK {
log.Printf("Failure return for analytics post. %d, %s\n", rsp.StatusCode, rsp.Status)
} else {
// TODO: remove once analytics has bedded in
log.Printf("analytics sent in %v\n", time.Since(start))
}
}
func (sender *Sender) run() {
var event analyticsEvent
// Block for the first event, once we have one event we try to drain everthing left
for event = range sender.channel {
sender.add(event)
// Select with a default case is essentially a non-blocking read from the channel
Loop:
for {
select {
case event = <-sender.channel:
// Add the event to those we are batching
if !sender.add(event) {
break Loop
}
default:
// Nothing to batch at present. Send our events if we have any, then go back to block until something
// shows up
break Loop
}
}
// Send what we have batched
sender.send()
}
// Indicate that this thread is over
sender.done <- true
log.Printf("Analytics exited\n")
}