Skip to content

Commit

Permalink
fix(forwarder): reduce memory usage
Browse files Browse the repository at this point in the history
This commit modifies how the buffer queue manages chunks in order to
reduce memory consumption.

Previously, we were appending uncompressed data to a single buffer. On
flush, we would copy the data out and process it in a worker. The worker
would be responsible for compressing the data and posting the data over
HTTP.

This commit applies two improvements:
- compress data on write to queue. This drastically reduces memory
  consumption.
- pass buffer to channel on flush, rather than copying to intermediate
  buffer. There is no justification for copying data from one buffer to
  another.

In a simple test with an 8MB GZIP compressed AWS Config change
notification, our memory consumption went down from ~47MB to ~6MB.
  • Loading branch information
jta committed Jun 10, 2024
1 parent 113e800 commit 1aa32b3
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 42 deletions.
43 changes: 43 additions & 0 deletions cmd/testing/forwarderhttp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Forwarder HTTP test utility

This utility exercises the Forwarder lambda code for the case where the backend is an HTTP server.

At a bare minimum, you must provide a list of input files via the command line args:

```
go run main.go ./my-test.json ./another-file.csv
```

The output of the program will be a directory containing all request bodies
sent to the HTTP server. Each output filename will be named according to the
hash of contents. This is useful for verifying code changes do not affect how
data is chunked.

When processing files, the code will apply the same presets that the Forwarder
lambda does. You may have to use `content-type` or `content-encoding` flags to
fake either object attribute, since neither is a property of the local
filesystem. For example, to test an AWS Config file, you would have to set `content-encoding`:

```
go run main.go \
-content-encoding=gzip \
./123456789012_Config_us-west-2_ConfigHistory_AWS::SSM::ManagedInstanceInventory_20240607T130841Z_20240607T190342Z_1.json.gz
```

In the lambda case, `content-encoding` is already set in S3. In the local
testing case, we must configure it manually.

## Profiling

To dump a profile of the executed code, set `-profile`:

```
go run main.go \
-profile=mem \
...
```

You can then explore the file through `go tool`, e.g:
```
go tool pprof -http=:8080 forwarder-post/mem.pprof
```
22 changes: 14 additions & 8 deletions pkg/handler/forwarder/s3http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type GetObjectAPIClient interface {
type Client struct {
GetObjectAPIClient
RequestBuilder *request.Builder
GzipLevel *int
}

func toGetInput(copyInput *s3.CopyObjectInput) (*s3.GetObjectInput, error) {
Expand Down Expand Up @@ -149,15 +150,20 @@ func (c *Client) PutObject(ctx context.Context, params *s3.PutObjectInput, _ ...
return nil, fmt.Errorf("failed to get decoder: %w", err)
}

headers := map[string]string{
"Content-Type": "application/x-ndjson",
}
if c.GzipLevel != nil {
headers["Content-Encoding"] = "gzip"
}

err = batch.Run(ctx, &batch.RunInput{
Decoder: dec,
Decoder: dec,
GzipLevel: c.GzipLevel,
Handler: c.RequestBuilder.With(map[string]string{
"content-type": aws.ToString(params.ContentType),
"key": aws.ToString(params.Key),
}, map[string]string{
"Content-Type": "application/x-ndjson",
},
),
}, headers),
})
if err != nil {
return nil, fmt.Errorf("failed to process: %w", err)
Expand All @@ -172,10 +178,10 @@ func New(cfg *Config) (*Client, error) {

return &Client{
GetObjectAPIClient: cfg.GetObjectAPIClient,
GzipLevel: cfg.GzipLevel,
RequestBuilder: &request.Builder{
URL: cfg.DestinationURI,
GzipLevel: cfg.RequestGzipLevel,
Client: cfg.HTTPClient,
URL: cfg.DestinationURI,
Client: cfg.HTTPClient,
},
}, nil
}
1 change: 0 additions & 1 deletion pkg/handler/forwarder/s3http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func TestClientPut(t *testing.T) {
client, err := s3http.New(&s3http.Config{
DestinationURI: fmt.Sprintf("%s/%s", s.URL, tt.Path),
GetObjectAPIClient: &awstest.S3Client{},
RequestGzipLevel: aws.Int(0),
HTTPClient: s.Client(),
})
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions pkg/handler/forwarder/s3http/config.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package s3http

import (
"compress/gzip"
"errors"
"fmt"
"net/http"
"net/url"
)

var (
ErrInvalidDestination = errors.New("invalid destination URI")
ErrMissingS3Client = errors.New("missing S3 client")
ErrInvalidDestination = errors.New("invalid destination URI")
ErrMissingS3Client = errors.New("missing S3 client")
ErrUnsupportedGzipLevel = errors.New("unsupported compression level")
)

type Config struct {
DestinationURI string // HTTP URI to upload data to
GetObjectAPIClient
HTTPClient *http.Client
RequestGzipLevel *int
HTTPClient *http.Client
GzipLevel *int
}

func (c *Config) Validate() error {
Expand All @@ -38,5 +40,11 @@ func (c *Config) Validate() error {
errs = append(errs, ErrMissingS3Client)
}

if c.GzipLevel != nil {
if _, err := gzip.NewWriterLevel(nil, *c.GzipLevel); err != nil {
errs = append(errs, fmt.Errorf("%w: %w", ErrUnsupportedGzipLevel, err))
}
}

return errors.Join(errs...)
}
12 changes: 12 additions & 0 deletions pkg/handler/forwarder/s3http/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/observeinc/aws-sam-apps/pkg/testing/awstest"
)

func ptr[T any](v T) *T {
return &v
}

func TestConfig(t *testing.T) {
testcases := []struct {
s3http.Config
Expand All @@ -36,6 +40,14 @@ func TestConfig(t *testing.T) {
// S3 URI not supported
ExpectError: s3http.ErrInvalidDestination,
},
{
Config: s3http.Config{
DestinationURI: "https://test",
GzipLevel: ptr(200),
GetObjectAPIClient: &awstest.S3Client{},
},
ExpectError: s3http.ErrUnsupportedGzipLevel,
},
}

for i, tc := range testcases {
Expand Down
74 changes: 55 additions & 19 deletions pkg/handler/forwarder/s3http/internal/batch/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package batch

import (
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
Expand All @@ -16,23 +17,34 @@ var (
)

type QueueConfig struct {
MaxBatchSize int // maximum batch size in bytes
Capacity int // channel capacity
MaxBatchSize int // maximum batch size in bytes
Capacity int // channel capacity
GzipLevel *int // gzip compression level
Delimiter []byte
}

// Queue appends item to buffer until batch size is reached.
type Queue struct {
buffer bytes.Buffer
maxBatchSize int
ch chan *bytes.Buffer // channel containing batches
delimiter []byte

// the chunk currently being appended to
buffer *bytes.Buffer

// chunk may be compressed, so we write and flush via the following
// accessors
writer io.Writer
closer io.Closer

newWriterFunc func(*bytes.Buffer) (io.Writer, io.Closer)
written int
maxBatchSize int
ch chan *bytes.Buffer // channel containing batches
delimiter []byte
}

// Push a record to queue for batching.
// We assume record includes delimiter.
func (q *Queue) Push(ctx context.Context, record []byte) error {
if q.maxBatchSize > 0 && q.buffer.Len()+len(record)+len(q.delimiter) > q.maxBatchSize {
if q.maxBatchSize > 0 && q.written+len(record)+len(q.delimiter) > q.maxBatchSize {
if len(record) > q.maxBatchSize {
return fmt.Errorf("%w: %d", ErrRecordLenExceedsBatchSize, len(record))
}
Expand All @@ -42,37 +54,48 @@ func (q *Queue) Push(ctx context.Context, record []byte) error {
}
}

if _, err := q.buffer.Write(record); err != nil {
if q.written == 0 {
buf, ok := bufPool.Get().(*bytes.Buffer)
if !ok {
panic("failed type assertion")
}
buf.Reset()
q.buffer = buf
q.writer, q.closer = q.newWriterFunc(buf)
}

n, err := q.writer.Write(record)
if err != nil {
return fmt.Errorf("failed to buffer record: %w", err)
}
q.written += n

if len(q.delimiter) > 0 {
if _, err := q.buffer.Write(q.delimiter); err != nil {
n, err = q.writer.Write(q.delimiter)
if err != nil {
return fmt.Errorf("failed to buffer delimiter: %w", err)
}
q.written += n
}
return nil
}

func (q *Queue) flush(ctx context.Context) error {
if q.buffer.Len() == 0 {
if q.written == 0 {
return nil
}

buf, ok := bufPool.Get().(*bytes.Buffer)
if !ok {
panic("failed type assertion")
if err := q.closer.Close(); err != nil {
return fmt.Errorf("failed to close buffer: %w", err)
}

if _, err := io.Copy(buf, &q.buffer); err != nil {
return fmt.Errorf("failed to flush: %w", err)
}
q.written = 0

select {
case q.ch <- buf:
case q.ch <- q.buffer:
return nil
case <-ctx.Done():
bufPool.Put(buf)
bufPool.Put(q.buffer)
return fmt.Errorf("cancelled flush: %w", ctx.Err())
}
}
Expand Down Expand Up @@ -112,9 +135,22 @@ func NewQueue(cfg *QueueConfig) *Queue {
cfg = &QueueConfig{}
}

return &Queue{
q := &Queue{
newWriterFunc: func(buf *bytes.Buffer) (io.Writer, io.Closer) {
return buf, io.NopCloser(buf)
},
maxBatchSize: cfg.MaxBatchSize,
delimiter: cfg.Delimiter,
ch: make(chan *bytes.Buffer, cfg.Capacity),
}

if cfg.GzipLevel != nil {
q.newWriterFunc = func(buf *bytes.Buffer) (io.Writer, io.Closer) {
gw, _ := gzip.NewWriterLevel(buf, *cfg.GzipLevel)
return gw, gw
}
}

return q

}
2 changes: 2 additions & 0 deletions pkg/handler/forwarder/s3http/internal/batch/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type RunInput struct {
MaxBatchSize *int // maximum size in bytes for each batch
MaxRecordSize *int // maximum size in bytes for each record
CapacityFactor *int // channel capacity, calculated as a multiple of concurrency
GzipLevel *int // whether to enable gzip when writing batch
}

// Run processes all events from a decoder and feeds them into 1 or more batch handlers.
Expand Down Expand Up @@ -75,6 +76,7 @@ func Run(ctx context.Context, r *RunInput) error {
MaxBatchSize: maxBatchSize,
Capacity: capacityFactor * maxConcurrency,
Delimiter: []byte("\n"),
GzipLevel: r.GzipLevel,
})

for range maxConcurrency {
Expand Down
13 changes: 3 additions & 10 deletions pkg/handler/forwarder/s3http/internal/request/builder.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package request

import (
"compress/gzip"
"errors"
"net/http"
"net/url"
Expand Down Expand Up @@ -38,15 +37,9 @@ func (b *Builder) With(params map[string]string, headers map[string]string) *Han
client = http.DefaultClient
}

gzipLevel := gzip.DefaultCompression
if v := b.GzipLevel; v != nil {
gzipLevel = *v
}

return &Handler{
URL: u.String(),
Headers: headers,
GzipLevel: gzipLevel,
Client: client,
URL: u.String(),
Headers: headers,
Client: client,
}
}
3 changes: 3 additions & 0 deletions pkg/lambda/forwarder/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Config struct {
OTELTracesExporter string `env:"OTEL_TRACES_EXPORTER,default=none"`
OTELExporterOTLPEndpoint string `env:"OTEL_EXPORTER_OTLP_ENDPOINT"`

S3HTTPGzipLevel *int `env:"S3_HTTP_GZIP_LEVEL,default=1"`

// The following variables are not configurable via environment
HTTPInsecureSkipVerify bool `json:"-"`
AWSS3Client S3Client `json:"-"`
Expand Down Expand Up @@ -120,6 +122,7 @@ func New(ctx context.Context, cfg *Config) (*Lambda, error) {
s3Client, err = s3http.New(&s3http.Config{
DestinationURI: cfg.DestinationURI,
GetObjectAPIClient: awsS3Client,
GzipLevel: cfg.S3HTTPGzipLevel,
HTTPClient: tracing.NewHTTPClient(&tracing.HTTPClientConfig{
TracerProvider: tracerProvider,
Logger: &logger,
Expand Down

0 comments on commit 1aa32b3

Please sign in to comment.