Skip to content
This repository has been archived by the owner on Apr 12, 2022. It is now read-only.

Commit

Permalink
extract container names once instead of always looping to look them up (
Browse files Browse the repository at this point in the history
#13)

* embed Configuration for more natural access from DNAQuery
  • Loading branch information
chrisvaughn committed Feb 8, 2018
1 parent e2065a2 commit 1f7b3ac
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 25 deletions.
8 changes: 8 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ func (cfg *Configuration) getContainer(c string) (Container, error) {
return Container{}, errors.New("Container not found")
}

func (cfg *Configuration) extractContainerNames() (set map[string]struct{}) {
set = make(map[string]struct{}, len(cfg.Containers))
for _, container := range cfg.Containers {
set[container.Name] = struct{}{}
}
return set
}

func (cfg *Configuration) compileRegex() {
for i, c := range cfg.Containers {
cmp := regexp.MustCompile(c.Regex)
Expand Down
13 changes: 13 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,16 @@ func TestSetupDir(t *testing.T) {
}
}

func TestExtractContainerNames(t *testing.T) {
cfg := &Configuration{
Containers: []Container{{Name: "c1"}, {Name: "c2"}},
}
names := cfg.extractContainerNames()
expected := make(map[string]struct{}, len(cfg.Containers))
expected["c1"] = struct{}{}
expected["c2"] = struct{}{}
if !cmp.Equal(names, expected) {
t.Errorf("expected names to equal %v, received %v instead", expected, names)
}
}

50 changes: 25 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@ import (
)

type DNAQuery struct {
cfg *Configuration
*Configuration
containerNames map[string]struct{}
}

func NewDNAQuery(cfg *Configuration) (*DNAQuery, error) {
if len(cfg.Containers) < 1 {
return nil, errors.New("Configuration needs at least 1 container")
}

dna := &DNAQuery{
cfg: cfg,
Configuration: cfg,
containerNames: cfg.extractContainerNames(),
}
dna.cfg.compileRegex()
err := dna.cfg.setupDirectory()
dna.compileRegex()
err := dna.setupDirectory()
if err != nil {
return nil, errors.Wrap(err, "Error setting up directory")
}
Expand Down Expand Up @@ -78,12 +81,10 @@ func (d *DNAQuery) readLine(path string) chan [2]string {
lineCount++
data := scanner.Bytes()
container, _ := jsonparser.GetString(data, "container")
_, err := d.cfg.getContainer(container)
if err != nil {
continue
if _, ok := d.containerNames[container]; ok {
line, _ := jsonparser.GetString(data, "_line")
ch <- [2]string{container, line}
}
line, _ := jsonparser.GetString(data, "_line")
ch <- [2]string{container, line}
}
if err := scanner.Err(); err != nil {
log.Fatalln("Error reading log file:", err.Error())
Expand All @@ -108,14 +109,13 @@ func (d *DNAQuery) processLine(path string, ch chan [2]string) {
line := r[1]
var record []string
record = append(record, container)
c, err := d.cfg.getContainer(container)
c, err := d.getContainer(container)
if err != nil {
log.Println("Can't find container config in processLine:", container)
continue
}
result := c.CompiledRegex.FindStringSubmatch(line)
if len(result) == 0 {
// log.Println("Found no match for regex in processLine:", container, line)
nSkipped++
continue
}
Expand All @@ -141,7 +141,6 @@ func (d *DNAQuery) processLine(path string, ch chan [2]string) {
} else {
dt, err := time.Parse(c.TimeFormat, result[c.TimeGroup])
if err != nil {
// log.Println("skipping row:", err.Error())
nSkipped++
continue
}
Expand All @@ -165,20 +164,20 @@ func (d *DNAQuery) processLine(path string, ch chan [2]string) {

func (d *DNAQuery) getLogfile(logDate string) (logName string) {
date, _ := time.Parse("2006-01-02", logDate)
logName = d.cfg.AWS.LogPrefix + "." + logDate + ".json.gz"
localLogName := filepath.Join(d.cfg.Storage.LogDirectory, logName)
logName = d.AWS.LogPrefix + "." + logDate + ".json.gz"
localLogName := filepath.Join(d.Storage.LogDirectory, logName)
item := date.Format("2006/01/") + logName

awsCfg := &aws.Config{
Region: aws.String("us-east-1"),
Credentials: credentials.NewStaticCredentials(d.cfg.AWS.Key, d.cfg.AWS.Secret, ""),
Credentials: credentials.NewStaticCredentials(d.AWS.Key, d.AWS.Secret, ""),
}
sess, err := session.NewSession(awsCfg)
CheckErr("Unable to create session: ", err)

svc := s3.New(sess)
obi := &s3.GetObjectInput{
Bucket: aws.String(d.cfg.AWS.Bucket),
Bucket: aws.String(d.AWS.Bucket),
Key: aws.String(item),
}
ob, err := svc.GetObject(obi)
Expand Down Expand Up @@ -217,9 +216,9 @@ func (d *DNAQuery) getLogfile(logDate string) (logName string) {

func (d *DNAQuery) uploadToGCS(path string, object string) error {
log.Println("Starting upload to GCS")
os.Setenv("GOOGLE_CLOUD_PROJECT", d.cfg.GCP.ProjectID)
os.Setenv("GOOGLE_CLOUD_PROJECT", d.GCP.ProjectID)
ctx := context.Background()
client, err := storage.NewClient(ctx, option.WithCredentialsFile(d.cfg.GCP.CredentialsFile))
client, err := storage.NewClient(ctx, option.WithCredentialsFile(d.GCP.CredentialsFile))
CheckErr("Error creating storage client: ", err)
f, err := os.Open(path)
if err != nil {
Expand All @@ -229,7 +228,7 @@ func (d *DNAQuery) uploadToGCS(path string, object string) error {

stat, _ := f.Stat()
log.Printf("Upload size: %f MB\n", float64(stat.Size())/1024/1024)
wc := client.Bucket(d.cfg.GCP.Bucket).Object(object).NewWriter(ctx)
wc := client.Bucket(d.GCP.Bucket).Object(object).NewWriter(ctx)
nBytes, nChunks := int64(0), int64(0)
r := bufio.NewReader(f)
buf := make([]byte, 0, 4*1024)
Expand Down Expand Up @@ -270,15 +269,15 @@ func (d *DNAQuery) loadInBQ(object string, date string) {
date = strings.Replace(date, "-", "_", -1)
log.Println("Starting load into BQ")
ctx := context.Background()
client, err := bigquery.NewClient(ctx, d.cfg.GCP.ProjectID,
option.WithCredentialsFile(d.cfg.GCP.CredentialsFile))
CheckErr("Error creating BQ Client: ", err)

myDataset := client.Dataset(d.cfg.GCP.Dataset)
client, err := bigquery.NewClient(ctx, d.GCP.ProjectID,
option.WithCredentialsFile(d.GCP.CredentialsFile))
CheckErr("Error creating BQ Client: ", err)
myDataset := client.Dataset(d.GCP.Dataset)

templateTable := myDataset.Table(d.cfg.GCP.TemplateTable)
templateTable := myDataset.Table(d.GCP.TemplateTable)

gscURL := "gs://" + d.cfg.GCP.Bucket + "/" + object
gscURL := "gs://" + d.GCP.Bucket + "/" + object
gcsRef := bigquery.NewGCSReference(gscURL)
tmpTableMeta, err := templateTable.Metadata(ctx)
CheckErr("Error getting template table: ", err)
Expand Down Expand Up @@ -356,5 +355,6 @@ func main() {
},
}
app.Action = run
app.Version = "0.1.0"
app.Run(os.Args)
}

0 comments on commit 1f7b3ac

Please sign in to comment.