Skip to content

Commit

Permalink
chore: include transformationId in transformer client metrics (#5055)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored Sep 3, 2024
1 parent 907c97c commit bb04781
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 24 deletions.
13 changes: 12 additions & 1 deletion processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,22 @@ func (trans *handle) request(ctx context.Context, url, stage string, data []Tran
// endless backoff loop, only nil error or panics inside
_ = backoff.RetryNotify(
func() error {
transformationID := ""
if len(data[0].Destination.Transformations) > 0 {
transformationID = data[0].Destination.Transformations[0].ID
}

respData, statusCode = trans.doPost(ctx, rawJSON, url, stage, stats.Tags{
"destinationType": data[0].Destination.DestinationDefinition.Name,
"destinationId": data[0].Destination.ID,
"sourceId": data[0].Metadata.SourceID,
"transformationId": transformationID,
"stage": stage,

// Legacy tags: to be removed
"dest_type": data[0].Destination.DestinationDefinition.Name,
"dest_id": data[0].Destination.ID,
"src_id": data[0].Metadata.SourceID,
"stage": stage,
})
if statusCode == StatusCPDown {
trans.cpDownGauge.Gauge(1)
Expand Down
81 changes: 58 additions & 23 deletions processor/transformer/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"go.uber.org/mock/gomock"

"github.com/rudderlabs/rudder-go-kit/testhelper/rand"
"github.com/rudderlabs/rudder-server/testhelper/backendconfigtest"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"

"github.com/rudderlabs/rudder-server/utils/types"
Expand All @@ -23,6 +25,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/logger/mock_logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/gateway/response"
Expand Down Expand Up @@ -147,23 +150,6 @@ func TestTransformer(t *testing.T) {
srv := httptest.NewServer(ft)
defer srv.Close()

tr := handle{}
tr.stat = stats.Default
tr.logger = logger.NOP
tr.conf = config.Default
tr.client = srv.Client()
tr.guardConcurrency = make(chan struct{}, 200)
tr.sentStat = tr.stat.NewStat("transformer_sent", stats.CountType)
tr.receivedStat = tr.stat.NewStat("transformer_received", stats.CountType)
tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType)
tr.config.timeoutDuration = 1 * time.Second
tr.config.failOnUserTransformTimeout = config.SingleValueLoader(true)
tr.config.failOnError = config.SingleValueLoader(true)

tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second)

tr.config.maxRetry = config.SingleValueLoader(1)

tc := []struct {
batchSize int
eventsCount int
Expand All @@ -180,13 +166,43 @@ func TestTransformer(t *testing.T) {
}

for _, tt := range tc {
statsStore, err := memstats.New()
require.NoError(t, err)

tr := handle{}
tr.stat = statsStore
tr.logger = logger.NOP
tr.conf = config.Default
tr.client = srv.Client()
tr.guardConcurrency = make(chan struct{}, 200)
tr.sentStat = tr.stat.NewStat("transformer_sent", stats.CountType)
tr.receivedStat = tr.stat.NewStat("transformer_received", stats.CountType)
tr.cpDownGauge = tr.stat.NewStat("control_plane_down", stats.GaugeType)
tr.config.timeoutDuration = 1 * time.Second
tr.config.failOnUserTransformTimeout = config.SingleValueLoader(true)
tr.config.failOnError = config.SingleValueLoader(true)
tr.config.maxRetryBackoffInterval = config.SingleValueLoader(1 * time.Second)
tr.config.maxRetry = config.SingleValueLoader(1)

batchSize := tt.batchSize
eventsCount := tt.eventsCount
failEvery := tt.failEvery

events := make([]TransformerEvent, eventsCount)
expectedResponse := Response{}

transformationID := rand.String(10)

destinationConfig := backendconfigtest.NewDestinationBuilder("WEBHOOK").
WithUserTransformation(transformationID, rand.String(10)).Build()

metadata := Metadata{
DestinationType: destinationConfig.DestinationDefinition.Name,
SourceID: rand.String(10),
DestinationID: destinationConfig.ID,
TransformationID: destinationConfig.Transformations[0].ID,
}

for i := range events {
msgID := fmt.Sprintf("messageID-%d", i)
statusCode := http.StatusOK
Expand All @@ -195,14 +211,16 @@ func TestTransformer(t *testing.T) {
statusCode = http.StatusBadRequest
}

metadata := metadata
metadata.MessageID = msgID

events[i] = TransformerEvent{
Metadata: Metadata{
MessageID: msgID,
},
Metadata: metadata,
Message: map[string]interface{}{
"src-key-1": msgID,
"forceStatusCode": statusCode,
},
Destination: destinationConfig,
Credentials: []Credential{
{
ID: "test-credential",
Expand All @@ -214,9 +232,7 @@ func TestTransformer(t *testing.T) {
}

tResp := TransformerResponse{
Metadata: Metadata{
MessageID: msgID,
},
Metadata: metadata,
StatusCode: statusCode,
Output: map[string]interface{}{
"src-key-1": msgID,
Expand All @@ -234,6 +250,25 @@ func TestTransformer(t *testing.T) {

rsp := tr.transform(context.TODO(), events, srv.URL, batchSize, "test-stage")
require.Equal(t, expectedResponse, rsp)

metrics := statsStore.GetByName("processor.transformer_request_time")
if tt.eventsCount > 0 {
require.NotEmpty(t, metrics)
for _, m := range metrics {
require.Equal(t, stats.Tags{
"stage": "test-stage",
"sourceId": metadata.SourceID,
"destinationType": destinationConfig.DestinationDefinition.Name,
"destinationId": destinationConfig.ID,
"transformationId": destinationConfig.Transformations[0].ID,

// Legacy tags: to be removed
"dest_type": destinationConfig.DestinationDefinition.Name,
"dest_id": destinationConfig.ID,
"src_id": metadata.SourceID,
}, m.Tags)
}
}
}
})

Expand Down

0 comments on commit bb04781

Please sign in to comment.