-
Notifications
You must be signed in to change notification settings - Fork 1
/
nonzero.go
335 lines (301 loc) · 9.75 KB
/
nonzero.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
package bstore
import (
"fmt"
"reflect"
)
// isZero returns whether v is the zero value for the fields that we store.
// reflect.IsZero cannot be used on structs because it checks private fields as well.
func (ft fieldType) isZero(v reflect.Value) bool {
if !v.IsValid() {
return true
}
if ft.Ptr {
return v.IsZero()
}
switch ft.Kind {
case kindStruct:
for _, f := range ft.structFields {
if !f.Type.isZero(v.FieldByIndex(f.structField.Index)) {
return false
}
}
return true
}
// Use standard IsZero otherwise, also for kindBinaryMarshal.
return v.IsZero()
}
// We ensure nonzero constraints when opening a database. An updated schema, with
// added nonzero constraints, can mean all records have to be checked. With cyclic
// types, we have to take care not to recurse, and for efficiency we want to only
// check fields/types that are affected. Steps:
//
// - Go through each field of the struct, and recurse into the field types,
// gathering the types and newly nonzero fields.
// - Propagate the need for nonzero checks to types that reference the changed
// types.
// - By now, if there was a new nonzero constraint, the top-level type will be
// marked as needing a check, so we'll read through all records and check all the
// immediate newly nonzero fields of a type, and recurse into fields of types that
// are marked as needing a check.
// nonzeroCheckType is tracked per reflect.Type that has been analysed (always the
// non-pointer type, i.e. a pointer is dereferenced). These types can be cyclic. We
// gather them for all types involved, including map and slice types and basic
// types, but "newlyNonzero" and "fields" will only be set for structs.
type nonzeroCheckType struct {
needsCheck bool
newlyNonzero []field // Fields in this type that have a new nonzero constraint themselves.
fields []field // All fields in a struct type.
// Types that reference this type. Used to propagate needsCheck to the top.
referencedBy map[reflect.Type]struct{}
}
func (ct *nonzeroCheckType) markRefBy(t reflect.Type) {
if t != nil {
ct.referencedBy[t] = struct{}{}
}
}
// checkNonzero compares ofields (optional previous type schema) and nfields (new
// type schema) for nonzero struct tags. If an existing field has a new nonzero
// constraint, we verify that there are indeed no nonzero values in the existing
// records. If there are, we return ErrZero. checkNonzero looks at (potentially
// cyclic) types referenced by fields.
func (tx *Tx) checkNonzero(st storeType, tv *typeVersion, ofields, nfields []field) error {
// Gather all new nonzero constraints on fields.
m := map[reflect.Type]*nonzeroCheckType{}
nonzeroCheckGather(m, st.Type, nil, ofields, nfields)
// Propagate the need for a check on all types due to a referenced type having a
// new nonzero constraint.
// todo: this can probably be done more elegantly, with fewer graph walks...
for t, ct := range m {
if ct.needsCheck {
nonzeroCheckPropagate(m, t, t, ct)
}
}
// If needsCheck wasn't propagated to the top-level, there was no new nonzero
// constraint, and we're not going to read all the data. This is the common case
// when opening a database.
if !m[st.Type].needsCheck {
return nil
}
// Read through all data, and check the new nonzero constraint.
// todo optimize: if there are only top-level fields to check, and we have indices on those fields, we can use the index to check this without reading all data.
return checkNonzeroRecords(tx, st, tv, m)
}
// Walk down fields, gathering their types (including those they reference), and
// marking needsCheck if any of a type's immediate field has a new nonzero
// constraint. The need for a check is not propagated to referencing types by this
// function.
func nonzeroCheckGather(m map[reflect.Type]*nonzeroCheckType, t, refBy reflect.Type, ofields, nfields []field) {
ct := m[t]
if ct != nil {
// Already gathered, don't recurse, for cyclic types.
ct.markRefBy(refBy)
return
}
ct = &nonzeroCheckType{
fields: nfields,
referencedBy: map[reflect.Type]struct{}{},
}
ct.markRefBy(refBy)
m[t] = ct
for _, f := range nfields {
// Check if this field is newly nonzero.
var of *field
for i := range ofields {
if f.Name == ofields[i].Name {
of = &ofields[i]
// Compare with existing field.
if f.Nonzero && !of.Nonzero {
ct.newlyNonzero = append(ct.newlyNonzero, f)
ct.needsCheck = true
}
break
}
}
// Check if this is a new field entirely, with nonzero constraint.
if of == nil && f.Nonzero {
ct.newlyNonzero = append(ct.newlyNonzero, f)
ct.needsCheck = true
}
// Descend into referenced types, adding references back to this type.
var oft *fieldType
if of != nil {
oft = &of.Type
}
ft := f.structField.Type
nonzeroCheckGatherFieldType(m, ft, t, oft, f.Type)
}
}
// gather new nonzero constraints for type "t", which is referenced by "refBy" (and
// will be marked as such). type "t" is described by "nft" and optionally
// previously by "oft".
func nonzeroCheckGatherFieldType(m map[reflect.Type]*nonzeroCheckType, t, refBy reflect.Type, oft *fieldType, nft fieldType) {
// If this is a pointer type, dereference the reflect type.
if nft.Ptr {
t = t.Elem()
}
if nft.Kind == kindStruct {
var fofields []field
if oft != nil {
fofields = oft.structFields
}
nonzeroCheckGather(m, t, refBy, fofields, nft.structFields)
}
// Mark this type as gathered, so we don't process it again if we recurse.
ct := m[t]
if ct != nil {
ct.markRefBy(refBy)
return
}
ct = &nonzeroCheckType{
fields: nft.structFields,
referencedBy: map[reflect.Type]struct{}{},
}
ct.markRefBy(refBy)
m[t] = ct
switch nft.Kind {
case kindMap:
var koft, voft *fieldType
if oft != nil {
koft = oft.MapKey
voft = oft.MapValue
}
nonzeroCheckGatherFieldType(m, t.Key(), t, koft, *nft.MapKey)
nonzeroCheckGatherFieldType(m, t.Elem(), t, voft, *nft.MapValue)
case kindSlice:
var loft *fieldType
if oft != nil {
loft = oft.ListElem
}
nonzeroCheckGatherFieldType(m, t.Elem(), t, loft, *nft.ListElem)
case kindArray:
var loft *fieldType
if oft != nil {
loft = oft.ListElem
}
nonzeroCheckGatherFieldType(m, t.Elem(), t, loft, *nft.ListElem)
}
}
// Propagate that type "t" is affected by a new nonzero constrained and needs to be
// checked. The types referencing "t" are in ct.referencedBy. "origt" is the
// starting type for this propagation.
func nonzeroCheckPropagate(m map[reflect.Type]*nonzeroCheckType, origt, t reflect.Type, ct *nonzeroCheckType) {
for rt := range ct.referencedBy {
if rt == origt {
continue // End recursion.
}
m[rt].needsCheck = true
nonzeroCheckPropagate(m, origt, rt, m[rt])
}
}
// checkNonzeroPaths reads through all records of a type, and checks that the fields
// indicated by paths are nonzero. If not, ErrZero is returned.
func checkNonzeroRecords(tx *Tx, st storeType, tv *typeVersion, m map[reflect.Type]*nonzeroCheckType) error {
rb, err := tx.recordsBucket(st.Current.name, st.Current.fillPercent)
if err != nil {
return err
}
ctxDone := tx.ctx.Done()
return rb.ForEach(func(bk, bv []byte) error {
tx.stats.Records.Cursor++
select {
case <-ctxDone:
return tx.ctx.Err()
default:
}
// todo optimize: instead of parsing the full record, use the fieldmap to see if the value is nonzero.
rv, err := st.parseNew(bk, bv)
if err != nil {
return err
}
ct := m[st.Type]
return checkNonzeroFields(m, st.Type, ct.newlyNonzero, ct.fields, rv)
})
}
// checkNonzeroFields checks that the newly nonzero fields of a struct value are
// indeed nonzero, and walks down referenced types, checking the constraint.
func checkNonzeroFields(m map[reflect.Type]*nonzeroCheckType, t reflect.Type, newlyNonzero, fields []field, rv reflect.Value) error {
// Check the newly nonzero fields.
for _, f := range newlyNonzero {
frv := rv.FieldByIndex(f.structField.Index)
if f.Type.isZero(frv) {
return fmt.Errorf("%w: field %q", ErrZero, f.Name)
}
}
// Descend into referenced types.
for _, f := range fields {
switch f.Type.Kind {
case kindMap, kindSlice, kindStruct, kindArray:
ft := f.structField.Type
if err := checkNonzeroFieldType(m, f.Type, ft, rv.FieldByIndex(f.structField.Index)); err != nil {
return err
}
}
}
return nil
}
// checkNonzeroFieldType walks down a value, and checks that its (struct) types
// don't violate nonzero constraints.
// Does not check whether the value itself is nonzero. If required, that has
// already been checked.
func checkNonzeroFieldType(m map[reflect.Type]*nonzeroCheckType, ft fieldType, t reflect.Type, rv reflect.Value) error {
if ft.Ptr {
t = t.Elem()
}
if !m[t].needsCheck {
return nil
}
if ft.Ptr && rv.IsZero() {
return nil
}
if ft.Ptr {
rv = rv.Elem()
}
unptr := func(t reflect.Type, ptr bool) reflect.Type {
if ptr {
return t.Elem()
}
return t
}
switch ft.Kind {
case kindMap:
kt := t.Key()
vt := t.Elem()
checkKey := m[unptr(kt, ft.MapKey.Ptr)].needsCheck
checkValue := m[unptr(vt, ft.MapValue.Ptr)].needsCheck
iter := rv.MapRange()
for iter.Next() {
if checkKey {
if err := checkNonzeroFieldType(m, *ft.MapKey, kt, iter.Key()); err != nil {
return err
}
}
if checkValue {
if err := checkNonzeroFieldType(m, *ft.MapValue, vt, iter.Value()); err != nil {
return err
}
}
}
case kindSlice:
et := t.Elem()
n := rv.Len()
for i := 0; i < n; i++ {
if err := checkNonzeroFieldType(m, *ft.ListElem, et, rv.Index(i)); err != nil {
return err
}
}
case kindArray:
et := t.Elem()
n := ft.ArrayLength
for i := 0; i < n; i++ {
if err := checkNonzeroFieldType(m, *ft.ListElem, et, rv.Index(i)); err != nil {
return err
}
}
case kindStruct:
ct := m[t]
if err := checkNonzeroFields(m, t, ct.newlyNonzero, ct.fields, rv); err != nil {
return err
}
}
return nil
}