Skip to content

Commit

Permalink
feat: add mock config BE support for transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
mihir20 committed Feb 21, 2024
1 parent 9f118b2 commit b15ec75
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- uses: codecov/codecov-action@v3
test-package:
name: Test package
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/go-chi/chi/v5 v5.0.11
github.com/go-redis/redis/v8 v8.11.5
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/joho/godotenv v1.5.1
github.com/json-iterator/go v1.1.12
github.com/lib/pq v1.10.9
Expand Down Expand Up @@ -82,7 +83,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions testhelper/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package docker

import (
"strconv"
"strings"
"testing"

"github.com/ory/dockertest/v3/docker"
Expand All @@ -20,3 +21,10 @@ func GetHostPort(t testing.TB, port string, container *docker.Container) int {
}
return 0
}

// ToInternalDockerHost replaces localhost and 127.0.0.1 with host.docker.internal
func ToInternalDockerHost(url string) string {
url = strings.ReplaceAll(url, "localhost", "host.docker.internal")
url = strings.ReplaceAll(url, "127.0.0.1", "host.docker.internal")
return url
}
64 changes: 60 additions & 4 deletions testhelper/docker/resource/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"errors"
"fmt"
"net/http"
"strings"

kithttptest "github.com/rudderlabs/rudder-go-kit/testhelper/httptest"

dockerTestHelper "github.com/rudderlabs/rudder-go-kit/testhelper/docker"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
Expand All @@ -21,12 +26,62 @@ type config struct {
repository string
tag string
exposedPorts []string
env []string
envs []string
extraHosts []string
}

func (c *config) updateBackendConfigURL(url string) {
found := false
for i, env := range c.envs {
if strings.HasPrefix(env, "CONFIG_BACKEND_URL=") {
found = true
c.envs[i] = fmt.Sprintf("CONFIG_BACKEND_URL=%s", url)
}
}
if !found {
c.envs = append(c.envs, fmt.Sprintf("CONFIG_BACKEND_URL=%s", url))
}

Check warning on line 43 in testhelper/docker/resource/transformer/transformer.go

View check run for this annotation

Codecov / codecov/patch

testhelper/docker/resource/transformer/transformer.go#L42-L43

Added lines #L42 - L43 were not covered by tests
}

// WithTransformations will mock BE config to set transformation for given transformation versionID to transformation function map
//
// - events with transformationVersionID not present in map will not be transformed and transformer will return 404 for those requests
//
// - WithTransformations should not be used with WithConfigBackendURL option
//
// - only javascript transformation functions are supported
//
// e.g.
//
// WithTransformations(map[string]string{
// "transform-version-id-1": `export function transformEvent(event, metadata) {
// event.transformed=true
// return event;
// }`,
// })
func WithTransformations(transformations map[string]string, cleaner resource.Cleaner) func(*config) {
return func(conf *config) {
mux := http.NewServeMux()
mockBackendConfigServer := &mockHttpServer{Transformations: transformations}
mux.HandleFunc(getByVersionIdEndPoint, mockBackendConfigServer.handleGetByVersionId)
backendConfigSvc := kithttptest.NewServer(mux)

conf.updateBackendConfigURL(dockerTestHelper.ToInternalDockerHost(backendConfigSvc.URL))
if conf.extraHosts == nil {
conf.extraHosts = make([]string, 0)
}
conf.extraHosts = append(conf.extraHosts, "host.docker.internal:host-gateway")
cleaner.Cleanup(func() {
backendConfigSvc.Close()
})
}
}

// WithConfigBackendURL lets transformer use custom backend config server for transformations
// WithConfigBackendURL should not be used with WithTransformations option
func WithConfigBackendURL(url string) func(*config) {
return func(conf *config) {
conf.env = []string{fmt.Sprintf("CONFIG_BACKEND_URL=%s", url)}
conf.updateBackendConfigURL(url)
}
}

Expand All @@ -44,7 +99,7 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(conf *config)
repository: "rudderstack/rudder-transformer",
tag: "latest",
exposedPorts: []string{"9090"},
env: []string{
envs: []string{
"CONFIG_BACKEND_URL=https://api.rudderstack.com",
},
}
Expand All @@ -64,7 +119,8 @@ func Setup(pool *dockertest.Pool, d resource.Cleaner, opts ...func(conf *config)
Repository: conf.repository,
Tag: conf.tag,
ExposedPorts: conf.exposedPorts,
Env: conf.env,
Env: conf.envs,
ExtraHosts: conf.extraHosts,
})
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package transformer

import (
"fmt"
"net/http"
"strings"

"github.com/google/uuid"
)

const (
getByVersionIdEndPoint = "/transformation/getByVersionId"
versionIDKey = "versionId"
)

type mockHttpServer struct {
Transformations map[string]string
}

func (m *mockHttpServer) handleGetByVersionId(w http.ResponseWriter, r *http.Request) {
transformationVersionId := r.URL.Query().Get(versionIDKey)
transformationCode, ok := m.Transformations[transformationVersionId]
if !ok {
w.WriteHeader(http.StatusNotFound)
return
}

Check warning on line 26 in testhelper/docker/resource/transformer/transformer_backend_config.go

View check run for this annotation

Codecov / codecov/patch

testhelper/docker/resource/transformer/transformer_backend_config.go#L24-L26

Added lines #L24 - L26 were not covered by tests
transformationCode = sanitiseTransformationCode(transformationCode)
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(fmt.Sprintf(`{
"id": "%s",
"createdAt": "2023-03-27T11:40:00.894Z",
"updatedAt": "2023-03-27T11:40:00.894Z",
"versionId": "%s",
"name": "Add Transformed field",
"description": "",
"code": "%s",
"secretsVersion": null,
"codeVersion": "1",
"language": "javascript",
"imports": [],
"secrets": {}
}`, uuid.NewString(), transformationVersionId, transformationCode)))
if err != nil {
return
}

Check warning on line 45 in testhelper/docker/resource/transformer/transformer_backend_config.go

View check run for this annotation

Codecov / codecov/patch

testhelper/docker/resource/transformer/transformer_backend_config.go#L44-L45

Added lines #L44 - L45 were not covered by tests
w.WriteHeader(http.StatusInternalServerError)
}

func sanitiseTransformationCode(transformationCode string) string {
sanitisedTransformationCode := strings.ReplaceAll(transformationCode, "\t", " ")
sanitisedTransformationCode = strings.ReplaceAll(sanitisedTransformationCode, "\n", "\\n")
return sanitisedTransformationCode
}
49 changes: 48 additions & 1 deletion testhelper/docker/resource/transformer/transformer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package transformer_test

import (
"bytes"
"io"
"net/http"
"net/url"
"testing"
Expand All @@ -15,7 +17,7 @@ func TestSetup(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

t.Run("check get endpoints", func(t *testing.T) {
t.Run("test get endpoints", func(t *testing.T) {
tests := []struct {
name string
tag string
Expand Down Expand Up @@ -57,4 +59,49 @@ func TestSetup(t *testing.T) {
})
}
})

t.Run("test custom transformation", func(t *testing.T) {
transformations := map[string]string{
"2Nazu8t4ujUC0Dzc4pBFbjmOijx": `export function transformEvent(event, metadata) {
event.transformed=true
return event;
}`,
}
transformerContainer, err := transformer.Setup(pool, t, transformer.WithTransformations(transformations, t))
require.NoError(t, err)

transformerURL, err := url.JoinPath(transformerContainer.TransformerURL, "customTransform")
require.NoError(t, err)

rawReq := []byte(`[{"message":{
"userId": "identified_user_id",
"anonymousId":"anonymousId_1",
"messageId":"messageId_1",
"type": "track",
"event": "Product Reviewed",
"properties": {
"review_id": "12345",
"product_id" : "123",
"rating" : 3.5,
"review_body" : "Average product, expected much more."
}
},"metadata":{"sourceId":"xxxyyyzzEaEurW247ad9WYZLUyk","workspaceId":"fyJvxaglFrCFxsBcLiSPBCmgpWK",
"messageId":"messageId_1"},"destination":{"Transformations":[{"VersionID":"2Nazu8t4ujUC0Dzc4pBFbjmOijx","ID":""}]}}]`)
req, reqErr := http.NewRequest(http.MethodPost, transformerURL, bytes.NewBuffer(rawReq))
if reqErr != nil {
return
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("X-Feature-Gzip-Support", "?1")
// Header to let transformer know that the client understands event filter code
req.Header.Set("X-Feature-Filter-Code", "?1")

resp, err := http.DefaultClient.Do(req)
defer func() { _ = resp.Body.Close() }()
require.NoError(t, err)
// require.Equal(t, http.StatusOK, resp.StatusCode)
respData, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(respData), `"transformed":true`)
})
}
47 changes: 47 additions & 0 deletions testhelper/httptest/httptest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package httptest

import (
"fmt"
"net"
"net/http"
nethttptest "net/http/httptest"
)

// NewServer starts a new httptest server that listens on all interfaces, contrary to the standard net/httptest.Server that listens only on localhost.
// This is useful when you want to access the test http server from within a docker container.
func NewServer(handler http.Handler) *Server {
ts := newUnStartedServer(handler)
ts.Start()
return ts
}

// Simple net/httptest.Server wrapper
type Server struct {
*nethttptest.Server
}

func (s *Server) Start() {
s.Server.Start()
_, port, err := net.SplitHostPort(s.Listener.Addr().String())
if err != nil {
panic(fmt.Sprintf("httptest: failed to parse listener address: %v", err))

Check warning on line 27 in testhelper/httptest/httptest.go

View check run for this annotation

Codecov / codecov/patch

testhelper/httptest/httptest.go#L27

Added line #L27 was not covered by tests
}
s.URL = fmt.Sprintf("http://%s:%s", "localhost", port)
}

func newUnStartedServer(handler http.Handler) *Server {
return &Server{&nethttptest.Server{
Listener: newListener(),
Config: &http.Server{Handler: handler},
}}
}

func newListener() net.Listener {
l, err := net.Listen("tcp", "0.0.0.0:0")
if err != nil {
if l, err = net.Listen("tcp6", "[::]:0"); err != nil {
panic(fmt.Sprintf("httptest: failed to listen on a port: %v", err))

Check warning on line 43 in testhelper/httptest/httptest.go

View check run for this annotation

Codecov / codecov/patch

testhelper/httptest/httptest.go#L42-L43

Added lines #L42 - L43 were not covered by tests
}
}
return l
}
45 changes: 45 additions & 0 deletions testhelper/httptest/httptest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package httptest_test

import (
"io"
"net"
"net/http"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/httputil"
kithttptest "github.com/rudderlabs/rudder-go-kit/testhelper/httptest"
)

func TestServer(t *testing.T) {
httpServer := kithttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte("Hello, world!"))
}))
defer httpServer.Close()

httpServerParsedURL, err := url.Parse(httpServer.URL)
require.NoError(t, err)

_, httpServerPort, err := net.SplitHostPort(httpServerParsedURL.Host)
require.NoError(t, err)

var (
body []byte
statusCode int
)
require.Eventually(t, func() bool {
resp, err := http.Get("http://0.0.0.0:" + httpServerPort)
defer func() { httputil.CloseResponse(resp) }()
if err == nil {
statusCode = resp.StatusCode
body, err = io.ReadAll(resp.Body)
}
return err == nil
}, 5*time.Second, 10*time.Millisecond, "failed to connect to proxy")

require.Equal(t, http.StatusOK, statusCode)
require.Equal(t, "Hello, world!", string(body))
}

0 comments on commit b15ec75

Please sign in to comment.