-
Notifications
You must be signed in to change notification settings - Fork 6
/
starter.go
96 lines (83 loc) · 2.5 KB
/
starter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// Copyright 2022 The incite Authors. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package incite
import (
"context"
"errors"
"time"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
)
type starter struct {
worker
}
const maxTempStartingErrs = 10
func newStarter(m *mgr) *starter {
s := &starter{
worker: worker{
m: m,
regulator: makeRegulator(m.close, m.RPS[StartQuery], RPSDefaults[StartQuery], !m.DisableAdaptation),
in: m.start,
out: m.update,
name: "starter",
maxTemporaryError: maxTempStartingErrs,
},
}
s.manipulator = s
return s
}
func (s *starter) context(c *chunk) context.Context {
return c.ctx
}
func (s *starter) manipulate(c *chunk) outcome {
// Discard chunk if the owning stream is dead.
if !c.stream.alive() {
return nothing
}
// Get the chunk time range in Insights' format.
starts := epochMillisecond(c.start)
ends := epochMillisecond(c.end.Add(-time.Millisecond)) // CWL uses inclusive time ranges, we use exclusive ranges.
// Start the chunk.
input := cloudwatchlogs.StartQueryInput{
QueryString: &c.stream.Text,
StartTime: &starts,
EndTime: &ends,
LogGroupNames: c.stream.groups,
Limit: &c.stream.Limit,
}
output, err := s.m.Actions.StartQueryWithContext(c.ctx, &input, request.WithAppendUserAgent(version()))
s.lastReq = time.Now()
if err != nil {
c.err = &StartQueryError{c.stream.Text, c.start, c.end, err}
switch classifyError(err) {
case throttlingClass:
return throttlingError
case limitExceededClass:
s.m.logChunk(c, "exceeded query concurrency limit", "temporary error from CloudWatch Logs: "+err.Error())
c.err = errReduceParallel
return finished
case temporaryClass:
return temporaryError
default:
s.m.logChunk(c, "permanent failure to start", "fatal error from CloudWatch Logs: "+err.Error())
return finished
}
}
// Save the current query ID into the chunk.
queryID := output.QueryId
if queryID == nil {
c.err = &StartQueryError{c.stream.Text, c.start, c.end, errors.New(outputMissingQueryIDMsg)}
s.m.logChunk(c, "nil query ID from CloudWatch Logs for", "")
return finished
}
c.queryID = *queryID
// Chunk is started successfully.
c.state = started
c.err = nil
s.m.logChunk(c, "started", "")
return finished
}
func (s *starter) release(c *chunk) {
s.m.logChunk(c, "releasing startable", "")
}