-
Notifications
You must be signed in to change notification settings - Fork 5
/
query.go
307 lines (249 loc) · 7.4 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
package tormenta
import (
"fmt"
"time"
"github.com/dgraph-io/badger"
"github.com/jpincas/gouuidv6"
)
type Query struct {
// Connection to BadgerDB
db DB
// The entity type being searched
keyRoot []byte
// Target is the pointer passed into the Query where results will be set
target interface{}
single bool
// Order by index name
orderByIndexName []byte
// Limit number of returned results
limit int
// Offet - start returning results N entities from the beginning
// offsetCounter used to track the offset
offset, offsetCounter int
// Reverse fullStruct of searching and returned results
reverse bool
// From and To dates for the search
from, to gouuidv6.UUID
// Is this a count only search
countOnly bool
// A placeholders for errors to be passed down through the Query
err error
// Ranges and comparision key
seekFrom, validTo, compareTo []byte
sumIndexName []byte
sumTarget interface{}
// Pass-through context
ctx map[string]interface{}
// Filter
filters []filter
basicQuery *basicQuery
// Logical ID combinator
idsCombinator func(...idList) idList
// Is already prepared?
prepared bool
debug bool
}
func (q Query) Compare(cq Query) bool {
return fmt.Sprint(q) == fmt.Sprint(cq)
}
func fromUUID(t time.Time) gouuidv6.UUID {
// Subtract 1 nanosecond form the specified time
// Leads to an inclusive date search
t = t.Add(-1 * time.Nanosecond)
return gouuidv6.NewFromTime(t)
}
func toUUID(t time.Time) gouuidv6.UUID {
return gouuidv6.NewFromTime(t)
}
func (db DB) newQuery(target interface{}) *Query {
// Create the base Query
q := &Query{
db: db,
keyRoot: KeyRoot(target),
target: target,
}
// Start with blank context
q.ctx = make(map[string]interface{})
// Defualt to logical AND combination
q.idsCombinator = intersection
return q
}
func (q *Query) addFilter(f filter) {
q.filters = append(q.filters, f)
}
func (q Query) shouldApplyLimitOffsetToFilter() bool {
// We only pass the limit/offset to a filter if
// there is only 1 filter AND there is no order by index
return len(q.filters) == 1 && len(q.orderByIndexName) == 0
}
func (q Query) shouldApplyLimitOffsetToBasicQuery() bool {
return len(q.orderByIndexName) == 0
}
func (q *Query) prepareQuery() {
// Each filter also needs some of the top level information
// e.g keyroot, date range, limit, offset etc,
// so we copy that in now
for i := range q.filters {
q.filters[i].keyRoot = q.keyRoot
q.filters[i].reverse = q.reverse
q.filters[i].from = q.from
q.filters[i].to = q.to
if q.shouldApplyLimitOffsetToFilter() {
q.filters[i].limit = q.limit
q.filters[i].offset = q.offset
}
}
// If there are no filters, then we prepare a 'basic query'
if len(q.filters) == 0 {
bq := &basicQuery{
from: q.from,
to: q.to,
reverse: q.reverse,
keyRoot: q.keyRoot,
}
if q.shouldApplyLimitOffsetToBasicQuery() {
bq.limit = q.limit
bq.offset = q.offset
}
q.basicQuery = bq
}
}
func (q *Query) queryIDs(txn *badger.Txn) (idList, error) {
if !q.prepared {
q.prepareQuery()
}
var allResults []idList
// If during the query planning and preparation,
// something has gone wrong and an error has been set on the query,
// we'll return right here and now
if q.err != nil {
return idList{}, q.err
}
if len(q.filters) > 0 {
// FOR WHEN THERE ARE INDEX FILTERS
// We process them serially at the moment, becuase Badger can only support 1 iterator
// per transaction. If that limitation is ever removed, we could do this in parallel
for _, filter := range q.filters {
thisFilterResults, err := filter.queryIDs(txn)
// If preparing any of the filters results in an error,
// rerturn it now
if err != nil {
return idList{}, err
}
allResults = append(allResults, thisFilterResults)
}
} else {
// FOR WHEN THERE ARE NO INDEX FILTERS
allResults = []idList{q.basicQuery.queryIDs(txn)}
}
// Combine the results from multiple filters,
// or the single top level id list into one, final id list
// according to the required AND/OR logic
return q.idsCombinator(allResults...), nil
}
func (q *Query) execute() (int, error) {
// Start time for debugging, if required
t := time.Now()
txn := q.db.KV.NewTransaction(false)
defer txn.Discard()
finalIDList, err := q.queryIDs(txn)
if err != nil {
q.debugLog(t, 0, err)
return 0, err
}
// TODO: more conditions to restrict when this is necessary
if len(q.orderByIndexName) > 0 {
indexKind, err := fieldKind(q.target, string(q.orderByIndexName))
if err != nil {
q.debugLog(t, 0, err)
return 0, err
}
is := indexSearch{
idsToSearchFor: finalIDList,
reverse: q.reverse,
limit: q.limit,
keyRoot: q.keyRoot,
indexName: q.orderByIndexName,
indexKind: indexKind,
offset: q.offset,
}
// If we are doing a quicksum and the sum index is the same
// as the order index, we can take advantage of this index
// iteration to do the sum
if len(q.sumIndexName) > 0 && q.sumTarget != nil {
if string(q.sumIndexName) == string(q.orderByIndexName) {
is.sumIndexName = q.sumIndexName
is.sumTarget = q.sumTarget
}
}
// This will order and apply limit/offset
finalIDList = is.execute(txn)
}
// For count-only, there's nothing more to do
if q.countOnly {
q.debugLog(t, len(finalIDList), nil)
return len(finalIDList), nil
}
// If a sumIndexName and a target have been specified,
// then we will take that to mean that this is a quicksum execution
// How we handle quicksum depends on wehther the sum index is different from the order index.
// If the two are the same, then we have already worked out the quicksum in the index iteration above, and theres
// no need to do it again
if len(q.sumIndexName) > 0 && q.sumTarget != nil {
if string(q.sumIndexName) != string(q.orderByIndexName) {
indexKind, err := fieldKind(q.target, string(q.sumIndexName))
if err != nil {
q.debugLog(t, 0, err)
return 0, err
}
is := indexSearch{
idsToSearchFor: finalIDList,
reverse: q.reverse,
limit: q.limit,
keyRoot: q.keyRoot,
indexName: q.sumIndexName,
indexKind: indexKind,
offset: q.offset,
sumIndexName: q.sumIndexName,
sumTarget: q.sumTarget,
}
is.execute(txn)
}
// Now, whether the quicksum was on the same index as order,
// or any other index, we will have the result in the target, so we can return now
q.debugLog(t, len(finalIDList), nil)
return len(finalIDList), nil
}
// For 'First' type queries
if q.single {
// For 'first' queries, we should check that there is at least 1 record found
// before trying to set it
if len(finalIDList) == 0 {
q.debugLog(t, 0, nil)
return 0, nil
}
// db.get ususally takes a 'Record', so we need to set a new one up
// and then set the result of get to the target aftwards
record := newRecord(q.target)
id := finalIDList[0]
if found, err := q.db.get(txn, record, q.ctx, id); err != nil {
q.debugLog(t, 0, err)
return 0, err
} else if !found {
err := fmt.Errorf("Could not retrieve record with id: %v", id)
q.debugLog(t, 0, err)
return 0, err
}
setSingleResultOntoTarget(q.target, record)
q.debugLog(t, 1, nil)
return 1, nil
}
// Otherwise we just get the records and return
n, err := q.db.getIDsWithContext(txn, q.target, q.ctx, finalIDList...)
if err != nil {
q.debugLog(t, 0, err)
return 0, err
}
q.debugLog(t, n, nil)
return n, nil
}