Skip to content

Commit

Permalink
feat: compression and streaming (#37)
Browse files Browse the repository at this point in the history
* feat: compression and streaming

* chore: closing body on defer
  • Loading branch information
fracasula authored May 13, 2024
1 parent 453661d commit 4427253
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 48 deletions.
95 changes: 95 additions & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package cpsdk

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
)

// BenchmarkGetWorkspaceConfigs on free-us-1, last time it showed 371MB of allocations with this technique
func BenchmarkGetWorkspaceConfigs(b *testing.B) {
conf := config.New()
baseURL := conf.GetString("BASE_URL", "https://api.rudderstack.com")
namespace := conf.GetString("NAMESPACE", "free-us-1")
identity := conf.GetString("IDENTITY", "")
if identity == "" {
b.Skip("IDENTITY is not set")
return
}

cpSDK, err := New(
WithBaseUrl(baseURL),
WithLogger(logger.NOP),
WithPollingInterval(0), // Setting the poller interval to 0 to disable the poller
WithNamespaceIdentity(namespace, identity),
)
require.NoError(b, err)
defer cpSDK.Close()

_, err = cpSDK.GetWorkspaceConfigs(context.Background())
require.NoError(b, err)
}

// BenchmarkGetCustomWorkspaceConfigs on free-us-1, last time it showed 88MB of allocations with this technique
func BenchmarkGetCustomWorkspaceConfigs(b *testing.B) {
conf := config.New()
baseURL := conf.GetString("BASE_URL", "https://api.rudderstack.com")
namespace := conf.GetString("NAMESPACE", "free-us-1")
identity := conf.GetString("IDENTITY", "")
if identity == "" {
b.Skip("IDENTITY is not set")
return
}

cpSDK, err := New(
WithBaseUrl(baseURL),
WithLogger(logger.NOP),
WithPollingInterval(0), // Setting the poller interval to 0 to disable the poller
WithNamespaceIdentity(namespace, identity),
)
require.NoError(b, err)
defer cpSDK.Close()

var workspaceConfigs WorkspaceConfigs
err = cpSDK.GetCustomWorkspaceConfigs(context.Background(), &workspaceConfigs)
require.NoError(b, err)
}

type WorkspaceConfigs struct {
Workspaces map[string]*WorkspaceConfig `json:"workspaces"`
SourceDefinitions map[string]*SourceDefinition `json:"sourceDefinitions"`
}

type WorkspaceConfig struct {
Sources map[string]*Source `json:"sources"`
Destinations map[string]*Destination `json:"destinations"`
Connections map[string]*Connection `json:"connections"`
}

type Source struct {
Name string `json:"name"`
WriteKey string `json:"writeKey"`
Enabled bool `json:"enabled"`
DefinitionName string `json:"sourceDefinitionName"`
Deleted bool `json:"deleted"`
}

type Destination struct {
Enabled bool `json:"enabled"`
}

type Connection struct {
SourceID string `json:"sourceId"`
DestinationID string `json:"destinationId"`
Enabled bool `json:"enabled"`
ProcessorEnabled bool `json:"processorEnabled"`
}

type SourceDefinition struct {
Name string `json:"name"`
Category string `json:"category"`
}
39 changes: 31 additions & 8 deletions internal/clients/base/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package base

import (
"compress/gzip"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -28,28 +29,50 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) {
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept-Encoding", "gzip")

return req, nil
}

func (c *Client) Send(req *http.Request) ([]byte, error) {
func (c *Client) Send(req *http.Request) (io.ReadCloser, error) {
res, err := c.HTTPClient.Do(req)
if err != nil {
return nil, err
}
defer func() {
httputil.CloseResponse(res)
}()

if res.StatusCode != http.StatusOK {
defer func() { httputil.CloseResponse(res) }()
return nil, fmt.Errorf("unexpected status code: %d", res.StatusCode)
}

data, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
if res.Header.Get("Content-Encoding") == "gzip" {
gzipRdr, err := gzip.NewReader(res.Body)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
}
return &gzipReadCloser{
gzipReader: gzipRdr,
resBody: res.Body,
}, nil
}

return data, nil
return res.Body, nil
}

// gzipReadCloser is a ReadCloser that closes both the gzip reader and the response body.
// unfortunately the gzip.Reader does not close the underlying reader when it is closed.
type gzipReadCloser struct {
gzipReader io.ReadCloser
resBody io.ReadCloser
}

func (g *gzipReadCloser) Read(p []byte) (n int, err error) {
return g.gzipReader.Read(p)
}

func (g *gzipReadCloser) Close() error {
defer func() { _ = g.resBody.Close() }()
return g.gzipReader.Close()
}
37 changes: 28 additions & 9 deletions internal/clients/namespace/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package namespace
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"

jsoniter "github.com/json-iterator/go"

"github.com/rudderlabs/rudder-cp-sdk/identity"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/base"
"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/modelv2/parser"
)

var json = jsoniter.ConfigFastest

type Client struct {
*base.Client

Expand All @@ -25,37 +31,50 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) {
}

req.SetBasicAuth(c.Identity.Secret, "")

return req, nil
}

func (c *Client) GetRawWorkspaceConfigs(ctx context.Context) ([]byte, error) {
func (c *Client) getWorkspaceConfigsReader(ctx context.Context) (io.ReadCloser, error) {
req, err := c.Get(ctx, "/data-plane/v2/namespaces/"+c.Identity.Namespace+"/config")
if err != nil {
return nil, err
}

data, err := c.Send(req)
if err != nil {
return nil, err
}

return data, nil
return c.Send(req)
}

func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) {
data, err := c.GetRawWorkspaceConfigs(ctx)
reader, err := c.getWorkspaceConfigsReader(ctx)
if err != nil {
return nil, err
}

wcs, err := parser.Parse(data)
defer func() { _ = reader.Close() }()

wcs, err := parser.Parse(reader)
if err != nil {
return nil, err
}

return wcs, nil
}

func (c *Client) GetCustomWorkspaceConfigs(ctx context.Context, object any) error {
reader, err := c.getWorkspaceConfigsReader(ctx)
if err != nil {
return err
}

defer func() { _ = reader.Close() }()

if err = json.NewDecoder(reader).Decode(object); err != nil {
return fmt.Errorf("failed to decode workspace configs: %w", err)
}

return nil
}

func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) {
return nil, errors.New("not implemented")
}
37 changes: 28 additions & 9 deletions internal/clients/workspace/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package workspace
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"

jsoniter "github.com/json-iterator/go"

"github.com/rudderlabs/rudder-cp-sdk/identity"
"github.com/rudderlabs/rudder-cp-sdk/internal/clients/base"
"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/modelv2/parser"
)

var json = jsoniter.ConfigFastest

type Client struct {
*base.Client

Expand All @@ -25,37 +31,50 @@ func (c *Client) Get(ctx context.Context, path string) (*http.Request, error) {
}

req.SetBasicAuth(c.Identity.WorkspaceToken, "")

return req, nil
}

func (c *Client) GetRawWorkspaceConfigs(ctx context.Context) ([]byte, error) {
func (c *Client) getWorkspaceConfigsReader(ctx context.Context) (io.ReadCloser, error) {
req, err := c.Get(ctx, "/data-plane/v2/workspaceConfig")
if err != nil {
return nil, err
}

data, err := c.Send(req)
if err != nil {
return nil, err
}

return data, nil
return c.Send(req)
}

func (c *Client) GetWorkspaceConfigs(ctx context.Context) (*modelv2.WorkspaceConfigs, error) {
data, err := c.GetRawWorkspaceConfigs(ctx)
reader, err := c.getWorkspaceConfigsReader(ctx)
if err != nil {
return nil, err
}

wcs, err := parser.Parse(data)
defer func() { _ = reader.Close() }()

wcs, err := parser.Parse(reader)
if err != nil {
return nil, err
}

return wcs, nil
}

func (c *Client) GetCustomWorkspaceConfigs(ctx context.Context, object any) error {
reader, err := c.getWorkspaceConfigsReader(ctx)
if err != nil {
return err
}

defer func() { _ = reader.Close() }()

if err = json.NewDecoder(reader).Decode(object); err != nil {
return fmt.Errorf("failed to decode workspace configs: %w", err)
}

return nil
}

func (c *Client) GetUpdatedWorkspaceConfigs(ctx context.Context, updatedAfter time.Time) (*modelv2.WorkspaceConfigs, error) {
return nil, errors.New("not implemented")
}
9 changes: 6 additions & 3 deletions modelv2/parser/parser.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package parser

import (
"io"

jsoniter "github.com/json-iterator/go"

"github.com/rudderlabs/rudder-cp-sdk/modelv2"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary
var json = jsoniter.ConfigFastest

func Parse(data []byte) (*modelv2.WorkspaceConfigs, error) {
func Parse(reader io.Reader) (*modelv2.WorkspaceConfigs, error) {
res := &modelv2.WorkspaceConfigs{}

if err := json.Unmarshal(data, res); err != nil {
if err := json.NewDecoder(reader).Decode(res); err != nil {
return nil, err
}

Expand Down
8 changes: 5 additions & 3 deletions modelv2/parser/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package parser_test

import (
"bytes"
"os"
"testing"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-cp-sdk/modelv2"
"github.com/rudderlabs/rudder-cp-sdk/modelv2/parser"
"github.com/stretchr/testify/require"
)

func TestParse(t *testing.T) {
data, err := os.ReadFile("testdata/workspace_configs.v2.json")
data, err := os.Open("testdata/workspace_configs.v2.json")
require.NoError(t, err)

wcs, err := parser.Parse(data)
Expand Down Expand Up @@ -39,7 +41,7 @@ func TestParse(t *testing.T) {
}

func TestParseError(t *testing.T) {
wcs, err := parser.Parse([]byte(`{ malformed json }`))
wcs, err := parser.Parse(bytes.NewReader([]byte(`{ malformed json }`)))
require.Nil(t, wcs)
require.Error(t, err)
}
Loading

0 comments on commit 4427253

Please sign in to comment.