Skip to content

Commit

Permalink
feat: add StringWithConcurrency function
Browse files Browse the repository at this point in the history
  • Loading branch information
mdross95 committed Jun 20, 2024
1 parent 32f15e4 commit f7a85dc
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type File interface {
Parse(record []byte) error
Bytes() []byte
String(newline bool) string
StringWithConcurrency(newline bool, concurrency int) string
Validate() error
}

Expand Down
40 changes: 36 additions & 4 deletions pkg/file/file_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"reflect"
"strings"
"sync"
"unicode"

"github.com/moov-io/base/log"
Expand Down Expand Up @@ -254,6 +256,15 @@ func (f *fileInstance) Parse(record []byte) error {

// String writes the File struct to raw string.
func (f *fileInstance) String(isNewLine bool) string {
return f.StringWithConcurrency(isNewLine, 1)
}

// StringWithConcurrency augments String with a given number of concurrent goroutines.
func (f *fileInstance) StringWithConcurrency(isNewLine bool, concurrency int) string {
if concurrency < 1 {
concurrency = 1
}

var buf strings.Builder

newLine := ""
Expand All @@ -266,18 +277,39 @@ func (f *fileInstance) String(isNewLine bool) string {

// Data Block
data := ""
for _, base := range f.Bases {
data += base.String() + newLine
pageSize := int(math.Ceil(float64(len(f.Bases)) / float64(concurrency)))
basePages := [][]lib.Record{}
dataPages := make([]string, concurrency)
for i := 0; i < len(f.Bases); i += pageSize {
end := i + pageSize
if end > len(f.Bases) {
end = len(f.Bases)
}
basePages = append(basePages, f.Bases[i:end])
}
var wg sync.WaitGroup
for i, page := range basePages {
wg.Add(1)
go func(idx int, page []lib.Record) {
defer wg.Done()
data := ""
for _, base := range page {
data += base.String() + newLine
}
dataPages[idx] = data
}(i, page)
}
wg.Wait()
for _, page := range dataPages {
data += page
}

// Trailer Block
trailer := f.Trailer.String()

buf.Grow(len(header) + len(data) + len(trailer))
buf.WriteString(header)
buf.WriteString(data)
buf.WriteString(trailer)

return buf.String()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (t *FileTest) TestJsonWithUnpackedVariableBlocked(c *check.C) {

rawStr := strings.ReplaceAll(string(raw), "\r\n", "\n")
c.Assert(strings.Compare(f.String(true), rawStr), check.Equals, 0)
c.Assert(strings.Compare(f.StringWithConcurrency(true, 2), rawStr), check.Equals, 0)

buf, err := json.Marshal(f)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -320,6 +321,7 @@ func (t *FileTest) TestCreateFile(c *check.C) {
c.Assert(err, check.IsNil)

c.Assert(strings.Compare(f.String(false), string(raw)), check.Equals, 0)
c.Assert(strings.Compare(f.StringWithConcurrency(false, 2), string(raw)), check.Equals, 0)
}

func (t *FileTest) TestNewFileFromReader(c *check.C) {
Expand All @@ -333,6 +335,7 @@ func (t *FileTest) TestNewFileFromReader(c *check.C) {
c.Assert(err, check.IsNil)

c.Assert(strings.Compare(f.String(false), string(raw)), check.Equals, 0)
c.Assert(strings.Compare(f.StringWithConcurrency(false, 2), string(raw)), check.Equals, 0)
}

func (t *FileTest) TestCreateFileFailed(c *check.C) {
Expand Down

0 comments on commit f7a85dc

Please sign in to comment.