diff --git a/config.go b/config.go index 70a93cc..22051d5 100644 --- a/config.go +++ b/config.go @@ -66,6 +66,14 @@ func (cfg *Configuration) getContainer(c string) (Container, error) { return Container{}, errors.New("Container not found") } +func (cfg *Configuration) extractContainerNames() (set map[string]struct{}) { + set = make(map[string]struct{}, len(cfg.Containers)) + for _, container := range cfg.Containers { + set[container.Name] = struct{}{} + } + return set +} + func (cfg *Configuration) compileRegex() { for i, c := range cfg.Containers { cmp := regexp.MustCompile(c.Regex) diff --git a/config_test.go b/config_test.go index 96cdc3d..2b5cbc6 100644 --- a/config_test.go +++ b/config_test.go @@ -97,3 +97,16 @@ func TestSetupDir(t *testing.T) { } } +func TestExtractContainerNames(t *testing.T) { + cfg := &Configuration{ + Containers: []Container{{Name: "c1"}, {Name: "c2"}}, + } + names := cfg.extractContainerNames() + expected := make(map[string]struct{}, len(cfg.Containers)) + expected["c1"] = struct{}{} + expected["c2"] = struct{}{} + if !cmp.Equal(names, expected) { + t.Errorf("expected names to equal %v, received %v instead", expected, names) + } +} + diff --git a/main.go b/main.go index 53e2f7e..021642d 100644 --- a/main.go +++ b/main.go @@ -27,18 +27,21 @@ import ( ) type DNAQuery struct { - cfg *Configuration + *Configuration + containerNames map[string]struct{} } func NewDNAQuery(cfg *Configuration) (*DNAQuery, error) { if len(cfg.Containers) < 1 { return nil, errors.New("Configuration needs at least 1 container") } + dna := &DNAQuery{ - cfg: cfg, + Configuration: cfg, + containerNames: cfg.extractContainerNames(), } - dna.cfg.compileRegex() - err := dna.cfg.setupDirectory() + dna.compileRegex() + err := dna.setupDirectory() if err != nil { return nil, errors.Wrap(err, "Error setting up directory") } @@ -78,12 +81,10 @@ func (d *DNAQuery) readLine(path string) chan [2]string { lineCount++ data := scanner.Bytes() container, _ := jsonparser.GetString(data, "container") - _, err := d.cfg.getContainer(container) - if err != nil { - continue + if _, ok := d.containerNames[container]; ok { + line, _ := jsonparser.GetString(data, "_line") + ch <- [2]string{container, line} } - line, _ := jsonparser.GetString(data, "_line") - ch <- [2]string{container, line} } if err := scanner.Err(); err != nil { log.Fatalln("Error reading log file:", err.Error()) @@ -108,14 +109,13 @@ func (d *DNAQuery) processLine(path string, ch chan [2]string) { line := r[1] var record []string record = append(record, container) - c, err := d.cfg.getContainer(container) + c, err := d.getContainer(container) if err != nil { log.Println("Can't find container config in processLine:", container) continue } result := c.CompiledRegex.FindStringSubmatch(line) if len(result) == 0 { - // log.Println("Found no match for regex in processLine:", container, line) nSkipped++ continue } @@ -141,7 +141,6 @@ func (d *DNAQuery) processLine(path string, ch chan [2]string) { } else { dt, err := time.Parse(c.TimeFormat, result[c.TimeGroup]) if err != nil { - // log.Println("skipping row:", err.Error()) nSkipped++ continue } @@ -165,20 +164,20 @@ func (d *DNAQuery) processLine(path string, ch chan [2]string) { func (d *DNAQuery) getLogfile(logDate string) (logName string) { date, _ := time.Parse("2006-01-02", logDate) - logName = d.cfg.AWS.LogPrefix + "." + logDate + ".json.gz" - localLogName := filepath.Join(d.cfg.Storage.LogDirectory, logName) + logName = d.AWS.LogPrefix + "." + logDate + ".json.gz" + localLogName := filepath.Join(d.Storage.LogDirectory, logName) item := date.Format("2006/01/") + logName awsCfg := &aws.Config{ Region: aws.String("us-east-1"), - Credentials: credentials.NewStaticCredentials(d.cfg.AWS.Key, d.cfg.AWS.Secret, ""), + Credentials: credentials.NewStaticCredentials(d.AWS.Key, d.AWS.Secret, ""), } sess, err := session.NewSession(awsCfg) CheckErr("Unable to create session: ", err) svc := s3.New(sess) obi := &s3.GetObjectInput{ - Bucket: aws.String(d.cfg.AWS.Bucket), + Bucket: aws.String(d.AWS.Bucket), Key: aws.String(item), } ob, err := svc.GetObject(obi) @@ -217,9 +216,9 @@ func (d *DNAQuery) getLogfile(logDate string) (logName string) { func (d *DNAQuery) uploadToGCS(path string, object string) error { log.Println("Starting upload to GCS") - os.Setenv("GOOGLE_CLOUD_PROJECT", d.cfg.GCP.ProjectID) + os.Setenv("GOOGLE_CLOUD_PROJECT", d.GCP.ProjectID) ctx := context.Background() - client, err := storage.NewClient(ctx, option.WithCredentialsFile(d.cfg.GCP.CredentialsFile)) + client, err := storage.NewClient(ctx, option.WithCredentialsFile(d.GCP.CredentialsFile)) CheckErr("Error creating storage client: ", err) f, err := os.Open(path) if err != nil { @@ -229,7 +228,7 @@ func (d *DNAQuery) uploadToGCS(path string, object string) error { stat, _ := f.Stat() log.Printf("Upload size: %f MB\n", float64(stat.Size())/1024/1024) - wc := client.Bucket(d.cfg.GCP.Bucket).Object(object).NewWriter(ctx) + wc := client.Bucket(d.GCP.Bucket).Object(object).NewWriter(ctx) nBytes, nChunks := int64(0), int64(0) r := bufio.NewReader(f) buf := make([]byte, 0, 4*1024) @@ -270,15 +269,15 @@ func (d *DNAQuery) loadInBQ(object string, date string) { date = strings.Replace(date, "-", "_", -1) log.Println("Starting load into BQ") ctx := context.Background() - client, err := bigquery.NewClient(ctx, d.cfg.GCP.ProjectID, - option.WithCredentialsFile(d.cfg.GCP.CredentialsFile)) - CheckErr("Error creating BQ Client: ", err) - myDataset := client.Dataset(d.cfg.GCP.Dataset) + client, err := bigquery.NewClient(ctx, d.GCP.ProjectID, + option.WithCredentialsFile(d.GCP.CredentialsFile)) + CheckErr("Error creating BQ Client: ", err) + myDataset := client.Dataset(d.GCP.Dataset) - templateTable := myDataset.Table(d.cfg.GCP.TemplateTable) + templateTable := myDataset.Table(d.GCP.TemplateTable) - gscURL := "gs://" + d.cfg.GCP.Bucket + "/" + object + gscURL := "gs://" + d.GCP.Bucket + "/" + object gcsRef := bigquery.NewGCSReference(gscURL) tmpTableMeta, err := templateTable.Metadata(ctx) CheckErr("Error getting template table: ", err) @@ -356,5 +355,6 @@ func main() { }, } app.Action = run + app.Version = "0.1.0" app.Run(os.Args) }