From 489f7f864ece712ea4584bb4ae50637f8c2bed38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Taveira=20Ara=C3=BAjo?= Date: Thu, 9 May 2024 20:03:57 -0700 Subject: [PATCH] feat(forwarder): add HTTP forwarding support for vpcflowlogs Handle space separated CSV, both for the cases where a content type is parameterized (e.g. `application/x-csv;delimiter=space` and for `application/x-aws-vpcflowlogs`) --- .../forwarder/s3http/internal/decoders/csv.go | 28 +++++++++++---- .../s3http/internal/decoders/decoder.go | 19 ++++++++--- .../s3http/internal/decoders/decoder_test.go | 5 +++ .../s3http/internal/decoders/json.go | 32 ++++++++++-------- .../decoders/testdata/vpcflowlogs.log.gz | Bin 0 -> 429 bytes .../testdata/vpcflowlogs.log.gz.golden | 14 ++++++++ 6 files changed, 72 insertions(+), 26 deletions(-) create mode 100644 handler/forwarder/s3http/internal/decoders/testdata/vpcflowlogs.log.gz create mode 100644 handler/forwarder/s3http/internal/decoders/testdata/vpcflowlogs.log.gz.golden diff --git a/handler/forwarder/s3http/internal/decoders/csv.go b/handler/forwarder/s3http/internal/decoders/csv.go index 9f01e5e2..13c33252 100644 --- a/handler/forwarder/s3http/internal/decoders/csv.go +++ b/handler/forwarder/s3http/internal/decoders/csv.go @@ -10,14 +10,28 @@ import ( "sync" ) -func CSVDecoderFactory(r io.Reader) Decoder { - buffered := bufio.NewReader(r) - csvDecoder := &CSVDecoder{ - Reader: csv.NewReader(buffered), - buffered: buffered, +func CSVDecoderFactory(params map[string]string) DecoderFactory { + return func(r io.Reader) Decoder { + buffered := bufio.NewReader(r) + csvDecoder := &CSVDecoder{ + Reader: csv.NewReader(buffered), + buffered: buffered, + } + csvDecoder.Reader.FieldsPerRecord = -1 + + comma := ',' + if params["delimiter"] == "space" { + comma = ' ' + } + csvDecoder.Reader.Comma = comma + return csvDecoder } - csvDecoder.Reader.FieldsPerRecord = -1 - return csvDecoder +} + +func VPCFlowLogDecoderFactory(_ map[string]string) DecoderFactory { + return CSVDecoderFactory(map[string]string{ + "delimiter": "space", + }) } type CSVDecoder struct { diff --git a/handler/forwarder/s3http/internal/decoders/decoder.go b/handler/forwarder/s3http/internal/decoders/decoder.go index 76a40858..a63cef1c 100644 --- a/handler/forwarder/s3http/internal/decoders/decoder.go +++ b/handler/forwarder/s3http/internal/decoders/decoder.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "mime" ) var ( @@ -11,7 +12,7 @@ var ( ErrUnsupportedContentType = errors.New("content type not supported: %w") ) -var decoders = map[string]DecoderFactory{ +var decoders = map[string]ParameterizedDecoderFactory{ "": JSONDecoderFactory, "application/json": JSONDecoderFactory, "application/x-csv": CSVDecoderFactory, @@ -24,7 +25,7 @@ var decoders = map[string]DecoderFactory{ "application/x-aws-change": JSONDecoderFactory, "application/x-aws-cloudtrail": NestedJSONDecoderFactory, "application/x-aws-sqs": JSONDecoderFactory, - // "application/x-aws-vpcflowlogs": CSVDecoderFactory, + "application/x-aws-vpcflowlogs": VPCFlowLogDecoderFactory, } type Decoder interface { @@ -32,7 +33,10 @@ type Decoder interface { Decode(any) error } -type DecoderFactory func(io.Reader) Decoder +type ( + ParameterizedDecoderFactory func(params map[string]string) DecoderFactory + DecoderFactory func(io.Reader) Decoder +) func Get(contentEncoding, contentType string) (DecoderFactory, error) { wrapper, ok := wrappers[contentEncoding] @@ -40,10 +44,15 @@ func Get(contentEncoding, contentType string) (DecoderFactory, error) { return nil, fmt.Errorf("%w: %q", ErrUnsupportedContentEncoding, contentEncoding) } - decoder, ok := decoders[contentType] + mediaType, params, err := mime.ParseMediaType(contentType) + if err != nil { + return nil, fmt.Errorf("failed to parse content type: %w", err) + } + + decoder, ok := decoders[mediaType] if !ok { return nil, fmt.Errorf("%w: %q", ErrUnsupportedContentType, contentType) } - return wrapper(decoder), nil + return wrapper(decoder(params)), nil } diff --git a/handler/forwarder/s3http/internal/decoders/decoder_test.go b/handler/forwarder/s3http/internal/decoders/decoder_test.go index 65e86b36..cbd68413 100644 --- a/handler/forwarder/s3http/internal/decoders/decoder_test.go +++ b/handler/forwarder/s3http/internal/decoders/decoder_test.go @@ -43,6 +43,11 @@ func TestDecoders(t *testing.T) { ContentType: "text/csv", InputFile: "testdata/example.csv", }, + { + ContentType: "application/x-aws-vpcflowlogs", + ContentEncoding: "gzip", + InputFile: "testdata/vpcflowlogs.log.gz", + }, } for _, tt := range testcases { diff --git a/handler/forwarder/s3http/internal/decoders/json.go b/handler/forwarder/s3http/internal/decoders/json.go index d2ab6e0b..1107aac1 100644 --- a/handler/forwarder/s3http/internal/decoders/json.go +++ b/handler/forwarder/s3http/internal/decoders/json.go @@ -6,23 +6,27 @@ import ( "io" ) -var JSONDecoderFactory = func(r io.Reader) Decoder { - return json.NewDecoder(r) +func JSONDecoderFactory(map[string]string) DecoderFactory { + return func(r io.Reader) Decoder { + return json.NewDecoder(r) + } } -var NestedJSONDecoderFactory = func(r io.Reader) Decoder { - dec := json.NewDecoder(r) - tok, err := dec.Token() - for err == nil { - if v, ok := tok.(json.Delim); ok { - if v == '[' { - break +func NestedJSONDecoderFactory(map[string]string) DecoderFactory { + return func(r io.Reader) Decoder { + dec := json.NewDecoder(r) + tok, err := dec.Token() + for err == nil { + if v, ok := tok.(json.Delim); ok { + if v == '[' { + break + } } + tok, err = dec.Token() } - tok, err = dec.Token() - } - if err != nil { - return &errorDecoder{fmt.Errorf("unexpected token: %w", err)} + if err != nil { + return &errorDecoder{fmt.Errorf("unexpected token: %w", err)} + } + return dec } - return dec } diff --git a/handler/forwarder/s3http/internal/decoders/testdata/vpcflowlogs.log.gz b/handler/forwarder/s3http/internal/decoders/testdata/vpcflowlogs.log.gz new file mode 100644 index 0000000000000000000000000000000000000000..3edd964369455e623c6b6290315889ef749ee1f3 GIT binary patch literal 429 zcmV;e0aE@SiwFpgj6G%m18i?+0JW4&Zrm^oh4(#$E-(T~QKGmUq+JzAy2u4&SuO?* z#sk|4(A$@upzTc1v;pEm2K4_vK9TTKo94rKbfwnu@wn^`y*nJ2HVvh=buv%2^!?=e zc{%qsKaJC}&d>d68kez-5AIazx3qDKWE^A*~FQ6=PUfswqj z$BdriC63vGQBrluvBL!0`6CFP5PjxLV1y9C1!22u-YkHPh%d6oUc%X36cI3#sC}BZ z_lMW?D8Z>Meb6RWAYUKala++J@DF2@OE@n)tFl X1MBK9n{gZcJA*#}%75gJ`vm|1Cnenb literal 0 HcmV?d00001 diff --git a/handler/forwarder/s3http/internal/decoders/testdata/vpcflowlogs.log.gz.golden b/handler/forwarder/s3http/internal/decoders/testdata/vpcflowlogs.log.gz.golden new file mode 100644 index 00000000..1768bb10 --- /dev/null +++ b/handler/forwarder/s3http/internal/decoders/testdata/vpcflowlogs.log.gz.golden @@ -0,0 +1,14 @@ +{"version":"2","account-id":"123456789012","interface-id":"eni-0a200bd23e7ff5610","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309031","end":"1715309101","action":"-","log-status":"NODATA"} +{"version":"2","account-id":"123456789012","interface-id":"eni-0e300e04460f51c93","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309029","end":"1715309100","action":"-","log-status":"NODATA"} +{"version":"2","account-id":"123456789012","interface-id":"eni-0ad00e2d53896f30f","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309028","end":"1715309100","action":"-","log-status":"NODATA"} +{"version":"2","account-id":"123456789012","interface-id":"eni-0e100815a7794913b","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309031","end":"1715309101","action":"-","log-status":"NODATA"} +{"version":"2","account-id":"123456789012","interface-id":"eni-0c20026b9d22c2854","srcaddr":"-","dstaddr":"-","srcport":"-","dstport":"-","protocol":"-","packets":"-","bytes":"-","start":"1715309029","end":"1715309100","action":"-","log-status":"NODATA"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"10.0.0.247","dstaddr":"10.0.2.37","srcport":"123","dstport":"44191","protocol":"17","packets":"1","bytes":"76","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.125.190.58","dstaddr":"10.0.0.247","srcport":"123","dstport":"59738","protocol":"17","packets":"1","bytes":"76","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"52.94.181.45","dstaddr":"10.0.0.247","srcport":"443","dstport":"19299","protocol":"6","packets":"4","bytes":"160","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.203.211.84","dstaddr":"10.0.0.247","srcport":"50350","dstport":"37412","protocol":"6","packets":"1","bytes":"44","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"10.0.0.247","dstaddr":"10.0.4.73","srcport":"443","dstport":"57770","protocol":"6","packets":"79","bytes":"7998","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.203.211.173","dstaddr":"10.0.0.247","srcport":"57343","dstport":"46498","protocol":"6","packets":"1","bytes":"44","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"10.0.0.247","dstaddr":"10.0.2.37","srcport":"443","dstport":"56020","protocol":"6","packets":"65","bytes":"12479","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.49.1.80","dstaddr":"10.0.0.247","srcport":"43128","dstport":"81","protocol":"6","packets":"1","bytes":"40","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"} +{"version":"2","account-id":"123456789012","interface-id":"eni-00a0047a9b1cab273","srcaddr":"127.161.242.171","dstaddr":"10.0.0.247","srcport":"443","dstport":"3490","protocol":"6","packets":"87","bytes":"14908","start":"1715309072","end":"1715309101","action":"ACCEPT","log-status":"OK"}