Skip to content

Commit

Permalink
feat(forwarder): add HTTP forwarding support for vpcflowlogs
Browse files Browse the repository at this point in the history
Handle space separated CSV, both for the cases where a content type is
parameterized (e.g. `application/x-csv;delimiter=space` and for
`application/x-aws-vpcflowlogs`)
  • Loading branch information
jta committed May 10, 2024
1 parent 3337cd2 commit 489f7f8
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 26 deletions.
28 changes: 21 additions & 7 deletions handler/forwarder/s3http/internal/decoders/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,28 @@ import (
"sync"
)

func CSVDecoderFactory(r io.Reader) Decoder {
buffered := bufio.NewReader(r)
csvDecoder := &CSVDecoder{
Reader: csv.NewReader(buffered),
buffered: buffered,
func CSVDecoderFactory(params map[string]string) DecoderFactory {
return func(r io.Reader) Decoder {
buffered := bufio.NewReader(r)
csvDecoder := &CSVDecoder{
Reader: csv.NewReader(buffered),
buffered: buffered,
}
csvDecoder.Reader.FieldsPerRecord = -1

comma := ','
if params["delimiter"] == "space" {
comma = ' '
}
csvDecoder.Reader.Comma = comma
return csvDecoder
}
csvDecoder.Reader.FieldsPerRecord = -1
return csvDecoder
}

func VPCFlowLogDecoderFactory(_ map[string]string) DecoderFactory {
return CSVDecoderFactory(map[string]string{
"delimiter": "space",
})
}

type CSVDecoder struct {
Expand Down
19 changes: 14 additions & 5 deletions handler/forwarder/s3http/internal/decoders/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"errors"
"fmt"
"io"
"mime"
)

var (
ErrUnsupportedContentEncoding = errors.New("content encoding not supported: %w")
ErrUnsupportedContentType = errors.New("content type not supported: %w")
)

var decoders = map[string]DecoderFactory{
var decoders = map[string]ParameterizedDecoderFactory{
"": JSONDecoderFactory,
"application/json": JSONDecoderFactory,
"application/x-csv": CSVDecoderFactory,
Expand All @@ -24,26 +25,34 @@ var decoders = map[string]DecoderFactory{
"application/x-aws-change": JSONDecoderFactory,
"application/x-aws-cloudtrail": NestedJSONDecoderFactory,
"application/x-aws-sqs": JSONDecoderFactory,
// "application/x-aws-vpcflowlogs": CSVDecoderFactory,
"application/x-aws-vpcflowlogs": VPCFlowLogDecoderFactory,
}

type Decoder interface {
More() bool
Decode(any) error
}

type DecoderFactory func(io.Reader) Decoder
type (
ParameterizedDecoderFactory func(params map[string]string) DecoderFactory
DecoderFactory func(io.Reader) Decoder
)

func Get(contentEncoding, contentType string) (DecoderFactory, error) {
wrapper, ok := wrappers[contentEncoding]
if !ok {
return nil, fmt.Errorf("%w: %q", ErrUnsupportedContentEncoding, contentEncoding)
}

decoder, ok := decoders[contentType]
mediaType, params, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, fmt.Errorf("failed to parse content type: %w", err)
}

decoder, ok := decoders[mediaType]
if !ok {
return nil, fmt.Errorf("%w: %q", ErrUnsupportedContentType, contentType)
}

return wrapper(decoder), nil
return wrapper(decoder(params)), nil
}
5 changes: 5 additions & 0 deletions handler/forwarder/s3http/internal/decoders/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func TestDecoders(t *testing.T) {
ContentType: "text/csv",
InputFile: "testdata/example.csv",
},
{
ContentType: "application/x-aws-vpcflowlogs",
ContentEncoding: "gzip",
InputFile: "testdata/vpcflowlogs.log.gz",
},
}

for _, tt := range testcases {
Expand Down
32 changes: 18 additions & 14 deletions handler/forwarder/s3http/internal/decoders/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,27 @@ import (
"io"
)

var JSONDecoderFactory = func(r io.Reader) Decoder {
return json.NewDecoder(r)
func JSONDecoderFactory(map[string]string) DecoderFactory {
return func(r io.Reader) Decoder {
return json.NewDecoder(r)
}
}

var NestedJSONDecoderFactory = func(r io.Reader) Decoder {
dec := json.NewDecoder(r)
tok, err := dec.Token()
for err == nil {
if v, ok := tok.(json.Delim); ok {
if v == '[' {
break
func NestedJSONDecoderFactory(map[string]string) DecoderFactory {
return func(r io.Reader) Decoder {
dec := json.NewDecoder(r)
tok, err := dec.Token()
for err == nil {
if v, ok := tok.(json.Delim); ok {
if v == '[' {
break
}
}
tok, err = dec.Token()
}
tok, err = dec.Token()
}
if err != nil {
return &errorDecoder{fmt.Errorf("unexpected token: %w", err)}
if err != nil {
return &errorDecoder{fmt.Errorf("unexpected token: %w", err)}
}
return dec
}
return dec
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{"version":"2","account-id":"123456789012","interface-id":"eni-0a200bd23e7ff5610","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309031","end":"1715309101","action":"-","log-status":"NODATA"}
{"version":"2","account-id":"123456789012","interface-id":"eni-0e300e04460f51c93","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309029","end":"1715309100","action":"-","log-status":"NODATA"}
{"version":"2","account-id":"123456789012","interface-id":"eni-0ad00e2d53896f30f","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309028","end":"1715309100","action":"-","log-status":"NODATA"}
{"version":"2","account-id":"123456789012","interface-id":"eni-0e100815a7794913b","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309031","end":"1715309101","action":"-","log-status":"NODATA"}
{"version":"2","account-id":"123456789012","interface-id":"eni-0c20026b9d22c2854","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309029","end":"1715309100","action":"-","log-status":"NODATA"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"10.0.0.247","dstaddr":"10.0.2.37","srcport":"123","dstport":"44191","protocol":"17","packets":"1","bytes":"76","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.125.190.58","dstaddr":"10.0.0.247","srcport":"123","dstport":"59738","protocol":"17","packets":"1","bytes":"76","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"52.94.181.45","dstaddr":"10.0.0.247","srcport":"443","dstport":"19299","protocol":"6","packets":"4","bytes":"160","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.203.211.84","dstaddr":"10.0.0.247","srcport":"50350","dstport":"37412","protocol":"6","packets":"1","bytes":"44","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"10.0.0.247","dstaddr":"10.0.4.73","srcport":"443","dstport":"57770","protocol":"6","packets":"79","bytes":"7998","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.203.211.173","dstaddr":"10.0.0.247","srcport":"57343","dstport":"46498","protocol":"6","packets":"1","bytes":"44","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"10.0.0.247","dstaddr":"10.0.2.37","srcport":"443","dstport":"56020","protocol":"6","packets":"65","bytes":"12479","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.49.1.80","dstaddr":"10.0.0.247","srcport":"43128","dstport":"81","protocol":"6","packets":"1","bytes":"40","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}
{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.161.242.171","dstaddr":"10.0.0.247","srcport":"443","dstport":"3490","protocol":"6","packets":"87","bytes":"14908","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}

0 comments on commit 489f7f8

Please sign in to comment.