Skip to content

Commit

Permalink
fix: add messageId, rudderId and type based on new schema
Browse files Browse the repository at this point in the history
  • Loading branch information
BonapartePC committed Sep 27, 2024
1 parent 100b3d9 commit ca7caad
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 42 deletions.
169 changes: 140 additions & 29 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1715,7 +1715,7 @@ var _ = Describe("Gateway", func() {
)
internalBatchPayload := fmt.Sprintf(`[{
"properties": {
"messageID": "messageID",
"requestType": "dummyRequestType",
"routingKey": "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
"workspaceID": %q,
"userID": %q,
Expand All @@ -1739,7 +1739,7 @@ var _ = Describe("Gateway", func() {
internalBatchPayload := func() string {
return fmt.Sprintf(`{
"properties": {
"messageID": %q,
"requestType": "dummyRequestType",
"routingKey": "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
"workspaceID": %q,
"userID": %q,
Expand All @@ -1751,7 +1751,7 @@ var _ = Describe("Gateway", func() {
"traceID": "traceID"
},
"payload": %s
}`, uuid.NewString(), workspaceID, userID, sourceID, validData)
}`, workspaceID, userID, sourceID, validData)
}
return []byte(fmt.Sprintf(`[%s,%s]`, internalBatchPayload(), internalBatchPayload()))
}
Expand Down Expand Up @@ -1999,30 +1999,6 @@ var _ = Describe("Gateway", func() {
Expect(failedEventStat).To(BeNil())
})

It("request failed message validation error", func() {
req, err := http.NewRequest(http.MethodPost, internalBatchEndpoint, bytes.NewBuffer([]byte(`[{}]`)))
Expect(err).To(BeNil())
resp, err := client.Do(req)
Expect(err).To(BeNil())
Expect(http.StatusBadRequest, resp.StatusCode)
respData, err := io.ReadAll(resp.Body)
defer httputil.CloseResponse(resp)
Expect(err).To(BeNil())
Expect(string(respData)).Should(ContainSubstring(response.InvalidStreamMessage))
failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{
"writeKey": "",
"reqType": "internalBatch",
"reason": response.InvalidStreamMessage,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(failedRequestStat).To(Not(BeNil()))
Expect(failedRequestStat.Values()).To(Equal([]float64{1}))
})

It("request success - suppressed user", func() {
payload := createInternalBatchPayload(SuppressedUserID, SourceIDEnabled)
req, err := http.NewRequest(http.MethodPost, internalBatchEndpoint, bytes.NewBuffer(payload))
Expand Down Expand Up @@ -2150,7 +2126,7 @@ var _ = Describe("Gateway", func() {

It("doesn't override if receivedAt or request_ip already exists in payload", func() {
properties := stream.MessageProperties{
MessageID: "messageID",
RequestType: "track",
RoutingKey: "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
WorkspaceID: "workspaceID",
SourceID: "sourceID",
Expand Down Expand Up @@ -2199,7 +2175,7 @@ var _ = Describe("Gateway", func() {

It("adds receivedAt and request_ip in the request payload if it's not already present", func() {
properties := stream.MessageProperties{
MessageID: "messageID",
RequestType: "track",
RoutingKey: "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
WorkspaceID: "workspaceID",
SourceID: "sourceID",
Expand Down Expand Up @@ -2241,6 +2217,141 @@ var _ = Describe("Gateway", func() {
Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("2024-01-01T01:01:01.000Z"))
Expect(job.Batch[0].RequestIP).To(ContainSubstring("dummyIP"))
})

It("adds messageID, rudderId in the request payload if it's not already present", func() {
properties := stream.MessageProperties{
RequestType: "track",
RoutingKey: "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
WorkspaceID: "workspaceID",
SourceID: "sourceID",
ReceivedAt: time.Date(2024, 1, 1, 1, 1, 1, 1, time.UTC),
RequestIP: "dummyIP",
DestinationID: "destinationID",
}
msg := stream.Message{
Properties: properties,
Payload: []byte(`{}`),
}
messages := []stream.Message{msg}
payload, err := json.Marshal(messages)
Expect(err).To(BeNil())
req := &webRequestT{
reqType: "batch",
authContext: rCtxEnabled,
done: make(chan<- string),
requestPayload: payload,
}
jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
Expect(err).To(BeNil())
Expect(jobsWithStats).To(HaveLen(1))
Expect(jobsWithStats[0].stat).To(Equal(gwstats.SourceStat{
SourceID: "sourceID",
WorkspaceID: "workspaceID",
ReqType: "batch",
}))

var job struct {
Batch []struct {
MessageID string `json:"messageID"`
RudderID string `json:"rudderId"`
} `json:"batch"`
}
err = json.Unmarshal(jobsWithStats[0].job.EventPayload, &job)
Expect(err).To(BeNil())
Expect(job.Batch).To(HaveLen(1))
Expect(job.Batch[0].MessageID).To(Not(BeEmpty()))
Expect(job.Batch[0].RudderID).To(Not(BeEmpty()))
})

It("doesn't override if messageID already exists in payload", func() {
properties := stream.MessageProperties{
RequestType: "track",
RoutingKey: "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
WorkspaceID: "workspaceID",
SourceID: "sourceID",
ReceivedAt: time.Date(2024, 1, 1, 1, 1, 1, 1, time.UTC),
RequestIP: "dummyIP",
DestinationID: "destinationID",
}
msg := stream.Message{
Properties: properties,
Payload: []byte(`{"messageId": "dummyMessageID"}`),
}
messages := []stream.Message{msg}
payload, err := json.Marshal(messages)
Expect(err).To(BeNil())

req := &webRequestT{
reqType: "batch",
authContext: rCtxEnabled,
done: make(chan<- string),
requestPayload: payload,
}
jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
Expect(err).To(BeNil())
Expect(jobsWithStats).To(HaveLen(1))
Expect(jobsWithStats[0].stat).To(Equal(
gwstats.SourceStat{
SourceID: "sourceID",
WorkspaceID: "workspaceID",
ReqType: "batch",
},
))

var job struct {
Batch []struct {
MessageID string `json:"messageId"`
} `json:"batch"`
}
jobForm := jobsWithStats[0].job
err = json.Unmarshal(jobForm.EventPayload, &job)
Expect(err).To(BeNil())
Expect(job.Batch).To(HaveLen(1))
Expect(job.Batch[0].MessageID).To(ContainSubstring("dummyMessageID"))
})

It("adds type and batch in the request payload if it's a non batch call in RequestType Properties", func() {
properties := stream.MessageProperties{
RequestType: "track",
RoutingKey: "anonymousId_header<<>>anonymousId_1<<>>identified_user_id",
WorkspaceID: "workspaceID",
SourceID: "sourceID",
ReceivedAt: time.Date(2024, 1, 1, 1, 1, 1, 1, time.UTC),
RequestIP: "dummyIP",
DestinationID: "destinationID",
}
msg := stream.Message{
Properties: properties,
Payload: []byte(`{}`),
}
messages := []stream.Message{msg}
payload, err := json.Marshal(messages)
Expect(err).To(BeNil())
req := &webRequestT{
reqType: "batch",
authContext: rCtxEnabled,
done: make(chan<- string),
requestPayload: payload,
}
jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
Expect(err).To(BeNil())
Expect(jobsWithStats).To(HaveLen(1))
Expect(jobsWithStats[0].stat).To(Equal(gwstats.SourceStat{
SourceID: "sourceID",
WorkspaceID: "workspaceID",
ReqType: "batch",
}))

var job struct {
Batch []struct {
Type string `json:"type"`
} `json:"batch"`
}
err = json.Unmarshal(jobsWithStats[0].job.EventPayload, &job)
Expect(err).To(BeNil())
Expect(job.Batch).To(HaveLen(1))
Expect(job.Batch[0].Type).To(ContainSubstring("track"))
})
})
})

Expand Down
84 changes: 74 additions & 10 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/buger/jsonparser"
jsoniter "github.com/json-iterator/go"

"github.com/google/uuid"
Expand Down Expand Up @@ -401,11 +402,9 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
}

// hashing combination of userIDFromReq + anonIDFromReq, using colon as a delimiter
var rudderId uuid.UUID
rudderId, err = kituuid.GetMD5UUID(userIDFromReq + ":" + anonIDFromReq)
rudderId, err := getRudderId(userIDFromReq, anonIDFromReq)
if err != nil {
err = errors.New(response.NonIdentifiableRequest)
return
return jobData, err
}
toSet["rudderId"] = rudderId
if _, ok := toSet["receivedAt"]; !ok {
Expand Down Expand Up @@ -779,7 +778,45 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt

for _, msg := range messages {
stat := gwstats.SourceStat{ReqType: reqType}
err := gw.streamMsgValidator(&msg)
msgPayload := msg.Payload
// TODO: get rid of this check
if msg.Properties.RequestType != "" {
switch msg.Properties.RequestType {
case "batch", "replay", "retl", "import":
default:
msgPayload, err = sjson.SetBytes(msgPayload, "type", msg.Properties.RequestType)
if err != nil {
stat.RequestEventsFailed(1, response.NotRudderEvent)
stat.Report(gw.stats)
return nil, errors.New(response.NotRudderEvent)
}
}
}

anonIDFromReq := sanitizeAndTrim(string(getJSONValueBytes(msgPayload, "anonymousId")))
userIDFromReq := sanitizeAndTrim(string(getJSONValueBytes(msgPayload, "userId")))
messageID, changed := getMessageID(msgPayload)
if changed {
msgPayload, err = jsonparser.Set(msgPayload, []byte(`"`+messageID+`"`), "messageId")
if err != nil {
stat.RequestFailed(response.NotRudderEvent)
stat.Report(gw.stats)
return nil, errors.New(response.NotRudderEvent)
}
}
rudderId, err := getRudderId(userIDFromReq, anonIDFromReq)
if err != nil {
stat.RequestFailed(response.NotRudderEvent)
stat.Report(gw.stats)
return nil, errors.New(response.NotRudderEvent)
}
msgPayload, err = jsonparser.Set(msgPayload, []byte(`"`+rudderId.String()+`"`), "rudderId")
if err != nil {
stat.RequestFailed(response.NotRudderEvent)
stat.Report(gw.stats)
return nil, errors.New(response.NotRudderEvent)
}
err = gw.streamMsgValidator(&msg)
if err != nil {
gw.logger.Errorn("invalid message in request", logger.NewErrorField(err))
stat.RequestEventsFailed(1, response.InvalidStreamMessage)
Expand All @@ -790,7 +827,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
if !ok {
// only live-events will not work if writeKey is not found
gw.logger.Errorn("unable to get writeKey for job",
logger.NewStringField("messageId", msg.Properties.MessageID),
logger.NewStringField("messageId", messageID),
obskit.SourceID(msg.Properties.SourceID))
}
stat.SourceID = msg.Properties.SourceID
Expand All @@ -817,7 +854,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
}).Since(msg.Properties.ReceivedAt)

jobsDBParams := params{
MessageID: msg.Properties.MessageID,
MessageID: messageID,
SourceID: msg.Properties.SourceID,
SourceJobRunID: msg.Properties.SourceJobRunID,
SourceTaskRunID: msg.Properties.SourceTaskRunID,
Expand All @@ -838,14 +875,14 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
)
}

msg.Payload, err = fillReceivedAt(msg.Payload, msg.Properties.ReceivedAt)
msgPayload, err = fillReceivedAt(msgPayload, msg.Properties.ReceivedAt)
if err != nil {
err = fmt.Errorf("filling receivedAt: %w", err)
stat.RequestEventsFailed(1, err.Error())
stat.Report(gw.stats)
return nil, fmt.Errorf("filling receivedAt: %w", err)
}
msg.Payload, err = fillRequestIP(msg.Payload, msg.Properties.RequestIP)
msgPayload, err = fillRequestIP(msgPayload, msg.Properties.RequestIP)
if err != nil {
err = fmt.Errorf("filling request_ip: %w", err)
stat.RequestEventsFailed(1, err.Error())
Expand All @@ -854,7 +891,7 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
}

eventBatch := singularEventBatch{
Batch: []json.RawMessage{msg.Payload},
Batch: []json.RawMessage{msgPayload},
ReceivedAt: msg.Properties.ReceivedAt.Format(misc.RFC3339Milli),
RequestIP: msg.Properties.RequestIP,
WriteKey: writeKey,
Expand Down Expand Up @@ -888,6 +925,33 @@ func (gw *Handle) extractJobsFromInternalBatchPayload(reqType string, body []byt
return res, nil
}

func getMessageID(event []byte) (string, bool) {
messageID := string(getJSONValueBytes(event, "messageId"))
sanitizedMessageID := sanitizeAndTrim(messageID)
if sanitizedMessageID == "" {
return uuid.New().String(), true
}
return sanitizedMessageID, messageID != sanitizedMessageID
}

func getRudderId(userIDFromReq, anonIDFromReq string) (uuid.UUID, error) {
rudderId, err := kituuid.GetMD5UUID(userIDFromReq + ":" + anonIDFromReq)
if err != nil {
err = errors.New(response.NonIdentifiableRequest)
return rudderId, err
}
return rudderId, nil
}

func getJSONValueBytes(payload []byte, keys ...string) []byte {
value, _, _, _ := jsonparser.Get(payload, keys...)
return value
}

func sanitizeAndTrim(str string) string {
return strings.TrimSpace(sanitize.Unicode(str))
}

func fillReceivedAt(event []byte, receivedAt time.Time) ([]byte, error) {
if !gjson.GetBytes(event, "receivedAt").Exists() {
return sjson.SetBytes(event, "receivedAt", receivedAt.Format(misc.RFC3339Milli))
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/apache/pulsar-client-go v0.13.1
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/aws/aws-sdk-go v1.55.5
github.com/buger/jsonparser v1.1.1
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/confluentinc/confluent-kafka-go/v2 v2.5.3
Expand Down Expand Up @@ -80,7 +81,7 @@ require (
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.42.0
github.com/rudderlabs/rudder-observability-kit v0.0.3
github.com/rudderlabs/rudder-schemas v0.5.2
github.com/rudderlabs/rudder-schemas v0.5.3-0.20240927013310-2c5ae88df954
github.com/rudderlabs/rudder-transformer/go v0.0.0-20240910055720-f77d2ab4125a
github.com/rudderlabs/sql-tunnels v0.1.7
github.com/rudderlabs/sqlconnect-go v1.10.0
Expand Down
Loading

0 comments on commit ca7caad

Please sign in to comment.