-
Notifications
You must be signed in to change notification settings - Fork 2
/
analysis.go
511 lines (421 loc) · 13.6 KB
/
analysis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
package pto3
import (
"bufio"
"compress/bzip2"
"encoding/json"
"fmt"
"io"
"os"
"runtime"
"strings"
)
// ConditionSet tracks conditions seen in analysis output by name.
type ConditionSet map[string]struct{}
// AddCondition adds a condition (by name) to this set
func (cs ConditionSet) AddCondition(condition string) {
cs[condition] = struct{}{}
}
// HasCondition returns true if a given condition is in the condition set
func (cs ConditionSet) HasCondition(condition string) bool {
_, ok := cs[condition]
return ok
}
// Conditions lists all conditions in the set as a string slice,
// suitable for output as normalizer/analyzer metadata
func (cd ConditionSet) Conditions() []string {
out := make([]string, 0)
for k := range cd {
out = append(out, k)
}
return out
}
///////////////////////////////////////////////////////////////////////
// ScanningNormalizers
///////////////////////////////////////////////////////////////////////
type serialFiletypeMapEntry struct {
splitFunc bufio.SplitFunc
normFunc SerialNormFunc
finalFunc SerialMetadataFinalizeFunc
}
type ScanningNormalizer interface {
Normalize(in *os.File, metain io.Reader, out io.Writer)
}
// SerialScanningNormalizer implements a normalizer whose raw data input can
// be processed using a standard library Scanner, and which must be processed
// in order (i.e.,nonconcurrently), as the result of the normalization of a
// record may depend (via data stored in the output metadata accumulator) on
// the results of normalization of previous records.
type SerialScanningNormalizer struct {
filetypeMap map[string]serialFiletypeMapEntry
metadataURL string
}
// SerialNormFunc describes a record normalization function for a
// SerialScanningNormalizer; it is called once per record, in order.
type SerialNormFunc func(rec []byte, mdin *RawMetadata, mdout map[string]interface{}) ([]Observation, error)
// SerialMetadataFinalizeFunc is a metadata finalization function for a
// SerialScanningNormalizer; it is called at the end of a normalization with
// input and output metadata, to edit the latter prior to output.
type SerialMetadataFinalizeFunc func(mdin *RawMetadata, mdout map[string]interface{}) error
func NewSerialScanningNormalizer(metadataURL string) *SerialScanningNormalizer {
norm := new(SerialScanningNormalizer)
norm.filetypeMap = make(map[string]serialFiletypeMapEntry)
norm.metadataURL = metadataURL
return norm
}
func (norm *SerialScanningNormalizer) RegisterFiletype(
filetype string,
splitFunc bufio.SplitFunc,
normFunc SerialNormFunc,
finalFunc SerialMetadataFinalizeFunc) {
norm.filetypeMap[filetype] =
serialFiletypeMapEntry{
splitFunc: splitFunc,
normFunc: normFunc,
finalFunc: finalFunc}
}
func (norm *SerialScanningNormalizer) Normalize(in *os.File, metain io.Reader, out io.Writer) error {
// read raw metadata
rmd, err := RawMetadataFromReader(metain, nil)
if err != nil {
return PTOWrapError(err)
}
// copy raw arbitrary metadata to output
omd := make(map[string]interface{})
for k, v := range rmd.Metadata {
omd[k] = v
}
// check filetype for compression
filetype := rmd.Filetype(true)
var scanner *bufio.Scanner
if strings.HasSuffix(filetype, "-bz2") {
scanner = bufio.NewScanner(bzip2.NewReader(in))
filetype = filetype[0 : len(filetype)-4]
} else {
scanner = bufio.NewScanner(in)
}
// lookup file type in registry
fte, ok := norm.filetypeMap[filetype]
if !ok {
return PTOErrorf("no registered handler for filetype %s", filetype)
}
// create condition cache
hasCondition := make(ConditionSet)
// now set up scanner and iterate
scanner.Split(fte.splitFunc)
var recno int
for scanner.Scan() {
recno++
rec := scanner.Bytes()
obsen, err := fte.normFunc(rec, rmd, omd)
if err == nil {
return PTOErrorf("error normalizing record %d: %v", recno, err)
}
for _, o := range obsen {
hasCondition[o.Condition.Name] = struct{}{}
}
if err := WriteObservations(obsen, out); err != nil {
return PTOErrorf("error writing observation from record %d: %v", recno, err)
}
}
// finalize output metadata if necessary
if fte.finalFunc != nil {
if err := fte.finalFunc(rmd, omd); err != nil {
return PTOErrorf("error finalizing output metadata: %v", err)
}
}
// add conditions
omd["_conditions"] = hasCondition.Conditions()
// add analyzer metadata link
omd["_analyzer"] = norm.metadataURL
// now write output metadata
b, err := json.Marshal(omd)
if err != nil {
return fmt.Errorf("error marshaling metadata: %s", err.Error())
}
if _, err := fmt.Fprintf(out, "%s\n", b); err != nil {
return fmt.Errorf("error writing metadata: %s", err.Error())
}
// all done
return nil
}
type parallelFiletypeMapEntry struct {
splitFunc bufio.SplitFunc
normFunc ParallelNormFunc
mergeFunc ParallelMetadataMergeFunc
}
// ParallelScanningNormalizer implements a normalizer whose raw data input can be
// processed using a standard library Scanner, and which may be processed in parallel
type ParallelScanningNormalizer struct {
filetypeMap map[string]parallelFiletypeMapEntry
metadataURL string
concurrency int
}
type ParallelNormFunc func(rec []byte, rawmeta *RawMetadata, metachan chan<- map[string]interface{}) ([]Observation, error)
type ParallelMetadataMergeFunc func(in map[string]interface{}, accumulator map[string]interface{})
func MergeByOverwrite(in map[string]interface{}, accumulator map[string]interface{}) {
for k, v := range in {
accumulator[k] = v
}
}
// NewParallelScanningNormalizer creates a new ScanningNormalizer
// that spawns multiple goroutines to process records using the
// ParallelNormFunc. The normalization function must be
// safe to use with multiple goroutines. If concurrency is set to
// 0 it will default to GOMAXPROCS.
func NewParallelScanningNormalizer(metadataURL string, concurrency int) *ParallelScanningNormalizer {
norm := new(ParallelScanningNormalizer)
norm.filetypeMap = make(map[string]parallelFiletypeMapEntry)
norm.metadataURL = metadataURL
norm.concurrency = concurrency
if concurrency == 0 {
// 0 is a special value to GOMAXPROCS and means
// "return current setting without changing it".
norm.concurrency = runtime.GOMAXPROCS(0)
}
return norm
}
func (norm *ParallelScanningNormalizer) RegisterFiletype(
filetype string,
splitFunc bufio.SplitFunc,
normFunc ParallelNormFunc,
mergeFunc ParallelMetadataMergeFunc) {
if mergeFunc == nil {
mergeFunc = MergeByOverwrite
}
norm.filetypeMap[filetype] =
parallelFiletypeMapEntry{
splitFunc: splitFunc,
normFunc: normFunc,
mergeFunc: mergeFunc}
}
type psnRecord struct {
n int
bytes []byte
}
func (norm *ParallelScanningNormalizer) Normalize(in *os.File, metain io.Reader, out io.Writer) error {
// create channels
// (munt): It seems yet unclear how to determine the "ideal" channel size
// as either too large channels or too small channels have some
// impact (either positive or negative) on the performance.
// Until we know how to calculate this value experimental evidence
// suggests that "a little extra room" is at least not hurtful.
recChan := make(chan *psnRecord, norm.concurrency*norm.concurrency)
obsChan := make(chan []Observation, norm.concurrency*norm.concurrency)
errChan := make(chan error, norm.concurrency)
mdChan := make(chan map[string]interface{}, norm.concurrency)
// create signals
recordComplete := make([]chan struct{}, norm.concurrency)
for i := 0; i < norm.concurrency; i++ {
recordComplete[i] = make(chan struct{})
}
mergeComplete := make(chan struct{})
writeComplete := make(chan struct{})
// create error accumulators
var writeError, outError error
// read raw metadata
rmd, err := RawMetadataFromReader(metain, nil)
if err != nil {
return PTOWrapError(err)
}
// copy raw arbitrary metadata to output
mdOut := make(map[string]interface{})
for k, v := range rmd.Metadata {
mdOut[k] = v
}
// check filetype for compression
filetype := rmd.Filetype(true)
var scanner *bufio.Scanner
if strings.HasSuffix(filetype, "-bz2") {
scanner = bufio.NewScanner(bzip2.NewReader(in))
filetype = filetype[0 : len(filetype)-4]
} else {
scanner = bufio.NewScanner(in)
}
// lookup file type in registry
fte, ok := norm.filetypeMap[filetype]
if !ok {
return PTOErrorf("no registered handler for filetype %s", filetype)
}
scanner.Split(fte.splitFunc)
// create condition cache
hasCondition := make(ConditionSet)
// start merging metadata
go func() {
for mdNext := range mdChan {
fte.mergeFunc(mdNext, mdOut)
}
close(mergeComplete)
}()
// start writing records
go func() {
for obsen := range obsChan {
for _, o := range obsen {
hasCondition[o.Condition.Name] = struct{}{}
}
if err := WriteObservations(obsen, out); err != nil {
writeError = PTOErrorf("error writing observation: %v", err)
break
}
}
close(writeComplete)
}()
// start normalizing records
for i := 0; i < norm.concurrency; i++ {
go func(me int) {
// process all records
for rec := range recChan {
obsen, err := fte.normFunc(rec.bytes, rmd, mdChan)
if err != nil {
errChan <- PTOErrorf("error normalizing record %d: %v (goroutine %d)", rec.n, err, me)
close(recordComplete[me])
return
}
obsChan <- obsen
}
close(recordComplete[me])
}(i)
}
// now go. split and process.
var recno int
for scanner.Scan() {
recno++
recBytes := make([]byte, len(scanner.Bytes()))
copy(recBytes, scanner.Bytes())
select {
case err = <-errChan:
outError = err
goto shutdown
case recChan <- &psnRecord{n: recno, bytes: recBytes}:
// NOP
}
}
shutdown:
// signal shutdown to record normalizers and wait for shutdown
close(recChan)
for i := 0; i < norm.concurrency; i++ {
<-recordComplete[i]
}
// signal shutdown to writer and wait for shutdown
close(obsChan)
<-writeComplete
if writeError != nil {
outError = writeError
}
// signal shutdown to merger and wait for shutdown
close(mdChan)
<-mergeComplete
// all goroutines have shut down. did we error?
if outError != nil {
return outError
}
// add conditions
mdOut["_conditions"] = hasCondition.Conditions()
// add analyzer metadata link
mdOut["_analyzer"] = norm.metadataURL
// now write output metadata
b, err := json.Marshal(mdOut)
if err != nil {
return fmt.Errorf("error marshaling metadata: %s", err.Error())
}
if _, err := fmt.Fprintf(out, "%s\n", b); err != nil {
return fmt.Errorf("error writing metadata: %s", err.Error())
}
// all done
return nil
}
///////////////////////////////////////////////////////////////////////
// Analysis Utilities
///////////////////////////////////////////////////////////////////////
// AnalysisSetTable tracks observation sets by ID
type AnalysisSetTable map[int]*ObservationSet
// AddSetFrom adds an observation set from a given observation
func (st AnalysisSetTable) AddSetFrom(obs *Observation) {
if _, ok := st[obs.SetID]; obs.SetID != 0 && !ok {
st[obs.SetID] = obs.Set
}
}
// MergeMetadata creates merged output metadata from the set of incoming
// observation sets, tracking sources and including all metadata keys for
// which the value is the same in each set in the table.
func (st AnalysisSetTable) MergeMetadata() map[string]interface{} {
mdout := make(map[string]interface{})
sources := make([]string, 0)
conflictingKeys := make(map[string]struct{})
for setid := range st {
// track sources
source := st[setid].Link()
if source != "" {
sources = append(sources, source)
}
// inherit arbitrary metadata for all keys without conflict
for k, newval := range st[setid].Metadata {
if _, ok := conflictingKeys[k]; ok {
continue
} else {
existval, ok := mdout[k]
if !ok {
mdout[k] = newval
} else if fmt.Sprintf("%v", existval) != fmt.Sprintf("%v", newval) {
delete(mdout, k)
conflictingKeys[k] = struct{}{}
}
}
}
}
if len(sources) > 0 {
mdout["_sources"] = sources
}
return mdout
}
// AnalyzeObservationStream reads observation set metadata and data from a
// file (as created by ptocat) and calls a provided analysis function once per
// observation. It is a convenience function for writing PTO analyzers in Go.
// It returns a table mapping set IDs to observation sets,
// from which metadata can be derived.
func AnalyzeObservationStream(in io.Reader, afn func(obs *Observation) error) (AnalysisSetTable, error) {
// stream in observation sets
scanner := bufio.NewScanner(in)
var lineno int
var currentSet *ObservationSet
var obs *Observation
setTable := make(AnalysisSetTable)
for scanner.Scan() {
lineno++
line := strings.TrimSpace(scanner.Text())
switch line[0] {
case '{':
// New observation set; cache metadata
currentSet = new(ObservationSet)
if err := currentSet.UnmarshalJSON(scanner.Bytes()); err != nil {
return nil, PTOErrorf("error parsing set on input line %d: %s", lineno, err.Error())
}
case '[':
// New observation; call analysis function
obs = new(Observation)
if err := obs.UnmarshalJSON(scanner.Bytes()); err != nil {
return nil, PTOErrorf("error parsing observation on input line %d: %s", lineno, err.Error())
}
if currentSet == nil {
return nil, PTOErrorf("observation on input line %d without current set", lineno)
} else if currentSet.ID == 0 {
// new current set, cache by ID
currentSet.ID = obs.SetID
obs.Set = currentSet
setTable.AddSetFrom(obs)
} else if currentSet.ID != obs.SetID {
var ok bool
currentSet, ok = setTable[obs.SetID]
if !ok {
return nil, PTOErrorf("observation on input line %d refers to uncached set %x", lineno, obs.SetID)
}
obs.Set = currentSet
} else {
obs.Set = currentSet
}
if err := afn(obs); err != nil {
return nil, PTOWrapError(err)
}
}
}
return setTable, nil
}