Skip to content

Commit

Permalink
refactor(forwarder): refactor DecoderFactory (#255)
Browse files Browse the repository at this point in the history
Simplify DecoderFactory by allowing caller to pass in parameters
alongside io.Reader.
  • Loading branch information
jta authored May 14, 2024
1 parent 23cac43 commit 925b7df
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 67 deletions.
4 changes: 2 additions & 2 deletions handler/forwarder/s3http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ func (c *Client) PutObject(ctx context.Context, params *s3.PutObjectInput, _ ...
logger := logr.FromContextOrDiscard(ctx)
logger.V(6).Info("processing PutObject", "putObjectInput", params)

dec, err := decoders.Get(aws.ToString(params.ContentEncoding), aws.ToString(params.ContentType))
dec, err := decoders.Get(aws.ToString(params.ContentEncoding), aws.ToString(params.ContentType), params.Body)
if err != nil {
return nil, fmt.Errorf("failed to get decoder: %w", err)
}

err = batch.Run(ctx, &batch.RunInput{
Decoder: dec(params.Body),
Decoder: dec,
Handler: c.RequestBuilder.With(map[string]string{
"content-type": aws.ToString(params.ContentType),
"key": aws.ToString(params.Key),
Expand Down
12 changes: 5 additions & 7 deletions handler/forwarder/s3http/internal/decoders/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ import (
"github.com/aws/aws-lambda-go/events"
)

func CloudWatchLogsDecoderFactory(map[string]string) DecoderFactory {
return func(r io.Reader) Decoder {
buffered := bufio.NewReader(r)
return &CloudWatchLogsDecoder{
buffered: buffered,
decoder: json.NewDecoder(buffered),
}
func CloudWatchLogsDecoderFactory(r io.Reader, _ map[string]string) Decoder {
buffered := bufio.NewReader(r)
return &CloudWatchLogsDecoder{
buffered: buffered,
decoder: json.NewDecoder(buffered),
}
}

Expand Down
52 changes: 26 additions & 26 deletions handler/forwarder/s3http/internal/decoders/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,36 @@ import (

var ErrUnsupportedDelimiter = errors.New("unsupported delimiter")

func CSVDecoderFactory(params map[string]string) DecoderFactory {
return func(r io.Reader) Decoder {
buffered := bufio.NewReader(r)
csvDecoder := &CSVDecoder{
Reader: csv.NewReader(buffered),
buffered: buffered,
}
csvDecoder.Reader.FieldsPerRecord = -1
func CSVDecoderFactory(r io.Reader, params map[string]string) Decoder {
buffered := bufio.NewReader(r)
csvDecoder := &CSVDecoder{
Reader: csv.NewReader(buffered),
buffered: buffered,
}
csvDecoder.Reader.FieldsPerRecord = -1

var delimiter rune
switch params["delimiter"] {
case "space":
delimiter = ' '
case "tab":
delimiter = '\t'
case "comma", "":
delimiter = ','
default:
err := fmt.Errorf("%w: %q", ErrUnsupportedDelimiter, params["delimiter"])
return &errorDecoder{err}
}
csvDecoder.Reader.Comma = delimiter
return csvDecoder
var delimiter rune
switch params["delimiter"] {
case "space":
delimiter = ' '
case "tab":
delimiter = '\t'
case "comma", "":
delimiter = ','
default:
err := fmt.Errorf("%w: %q", ErrUnsupportedDelimiter, params["delimiter"])
return &errorDecoder{err}
}
csvDecoder.Reader.Comma = delimiter
return csvDecoder
}

func VPCFlowLogDecoderFactory(_ map[string]string) DecoderFactory {
return CSVDecoderFactory(map[string]string{
"delimiter": "space",
})
func VPCFlowLogDecoderFactory(r io.Reader, params map[string]string) Decoder {
if _, ok := params["delimiter"]; !ok {
params["delimiter"] = "space"
}

return CSVDecoderFactory(r, params)
}

type CSVDecoder struct {
Expand Down
9 changes: 4 additions & 5 deletions handler/forwarder/s3http/internal/decoders/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var (
ErrUnsupportedContentType = errors.New("content type not supported: %w")
)

var decoders = map[string]ParameterizedDecoderFactory{
var decoders = map[string]DecoderFactory{
"": JSONDecoderFactory,
"application/json": JSONDecoderFactory,
"application/x-csv": CSVDecoderFactory,
Expand All @@ -34,11 +34,10 @@ type Decoder interface {
}

type (
ParameterizedDecoderFactory func(params map[string]string) DecoderFactory
DecoderFactory func(io.Reader) Decoder
DecoderFactory func(io.Reader, map[string]string) Decoder
)

func Get(contentEncoding, contentType string) (DecoderFactory, error) {
func Get(contentEncoding, contentType string, r io.Reader) (Decoder, error) {
wrapper, ok := wrappers[contentEncoding]
if !ok {
return nil, fmt.Errorf("%w: %q", ErrUnsupportedContentEncoding, contentEncoding)
Expand All @@ -54,5 +53,5 @@ func Get(contentEncoding, contentType string) (DecoderFactory, error) {
return nil, fmt.Errorf("%w: %q", ErrUnsupportedContentType, contentType)
}

return wrapper(decoder(params)), nil
return wrapper(decoder)(r, params), nil
}
4 changes: 2 additions & 2 deletions handler/forwarder/s3http/internal/decoders/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func TestDecoders(t *testing.T) {
t.Run(tt.InputFile, func(t *testing.T) {
t.Parallel()

fn, err := decoders.Get(tt.ContentEncoding, tt.ContentType)
dec, err := decoders.Get(tt.ContentEncoding, tt.ContentType, readFile(t, tt.InputFile))
if err != nil {
t.Fatal(err)
}

var buf bytes.Buffer

enc := json.NewEncoder(&buf)
for dec := fn(readFile(t, tt.InputFile)); dec.More(); {
for dec.More() {
var v json.RawMessage
if err := dec.Decode(&v); err != nil {
t.Fatal(err)
Expand Down
32 changes: 14 additions & 18 deletions handler/forwarder/s3http/internal/decoders/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,23 @@ import (
"io"
)

func JSONDecoderFactory(map[string]string) DecoderFactory {
return func(r io.Reader) Decoder {
return json.NewDecoder(r)
}
func JSONDecoderFactory(r io.Reader, _ map[string]string) Decoder {
return json.NewDecoder(r)
}

func NestedJSONDecoderFactory(map[string]string) DecoderFactory {
return func(r io.Reader) Decoder {
dec := json.NewDecoder(r)
tok, err := dec.Token()
for err == nil {
if v, ok := tok.(json.Delim); ok {
if v == '[' {
break
}
func NestedJSONDecoderFactory(r io.Reader, _ map[string]string) Decoder {
dec := json.NewDecoder(r)
tok, err := dec.Token()
for err == nil {
if v, ok := tok.(json.Delim); ok {
if v == '[' {
break
}
tok, err = dec.Token()
}
if err != nil {
return &errorDecoder{fmt.Errorf("unexpected token: %w", err)}
}
return dec
tok, err = dec.Token()
}
if err != nil {
return &errorDecoder{fmt.Errorf("unexpected token: %w", err)}
}
return dec
}
8 changes: 3 additions & 5 deletions handler/forwarder/s3http/internal/decoders/text.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ import (
"strconv"
)

func TextDecoderFactory(map[string]string) DecoderFactory {
return func(r io.Reader) Decoder {
return &TextDecoder{
Reader: bufio.NewReader(r),
}
func TextDecoderFactory(r io.Reader, _ map[string]string) Decoder {
return &TextDecoder{
Reader: bufio.NewReader(r),
}
}

Expand Down
4 changes: 2 additions & 2 deletions handler/forwarder/s3http/internal/decoders/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ var wrappers = map[string]Wrapper{
type Wrapper func(DecoderFactory) DecoderFactory

func GzipWrapper(fn DecoderFactory) DecoderFactory {
return func(r io.Reader) Decoder {
return func(r io.Reader, params map[string]string) Decoder {
gr, err := gzip.NewReader(r)
if err != nil {
return &errorDecoder{fmt.Errorf("failed to read gzip: %w", err)}
}
defer gr.Close()
return fn(gr)
return fn(gr, params)
}
}

0 comments on commit 925b7df

Please sign in to comment.