Skip to content

Commit

Permalink
fix: gateway internal batch endpoint stats (#5089)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth authored Sep 16, 2024
1 parent 40a5446 commit bbaaabd
Show file tree
Hide file tree
Showing 3 changed files with 314 additions and 77 deletions.
244 changes: 237 additions & 7 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/rudderlabs/rudder-server/app"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
gwstats "github.com/rudderlabs/rudder-server/gateway/internal/stats"
gwtypes "github.com/rudderlabs/rudder-server/gateway/internal/types"
"github.com/rudderlabs/rudder-server/gateway/response"
webhookModel "github.com/rudderlabs/rudder-server/gateway/webhook/model"
Expand Down Expand Up @@ -1837,6 +1838,96 @@ var _ = Describe("Gateway", func() {
},
},
}))
Expect(statStore.GetByName("gateway.write_key_events")).To(Equal([]memstats.Metric{
{
Name: "gateway.write_key_events",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
},
Value: 1,
},
}))
Expect(statStore.GetByName("gateway.write_key_successful_events")).To(Equal([]memstats.Metric{
{
Name: "gateway.write_key_successful_events",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
},
Value: 1,
},
}))
Expect(statStore.GetByName("gateway.write_key_requests")).To(Equal([]memstats.Metric{
{
Name: "gateway.write_key_requests",
Tags: map[string]string{
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
"source": "",
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
},
Value: 1,
},
}))
Expect(statStore.GetByName("gateway.write_key_successful_requests")).To(Equal([]memstats.Metric{
{
Name: "gateway.write_key_successful_requests",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
},
Value: 1,
},
}))
Expect(statStore.GetByName("gateway.write_key_failed_requests")).To(Equal([]memstats.Metric{
{
Name: "gateway.write_key_failed_requests",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
},
Value: 0,
},
}))
Expect(statStore.GetByName("gateway.write_key_failed_events")).To(Equal([]memstats.Metric{
{
Name: "gateway.write_key_failed_events",
Tags: map[string]string{
"source": "",
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
},
Value: 0,
},
}))
})

It("Successful request, without debugger", func() {
Expand All @@ -1859,6 +1950,18 @@ var _ = Describe("Gateway", func() {
defer httputil.CloseResponse(resp)
Expect(err).To(BeNil())
Expect(string(respData)).Should(ContainSubstring(response.NotRudderEvent))
failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{
"writeKey": "",
"reqType": "internalBatch",
"reason": response.NotRudderEvent,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(failedRequestStat).To(Not(BeNil()))
Expect(failedRequestStat.Values()).To(Equal([]float64{1}))
})

It("request failed unmarshall error", func() {
Expand All @@ -1871,6 +1974,29 @@ var _ = Describe("Gateway", func() {
defer httputil.CloseResponse(resp)
Expect(err).To(BeNil())
Expect(string(respData)).Should(ContainSubstring(response.InvalidJSON))
failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{
"writeKey": "",
"reqType": "internalBatch",
"reason": response.InvalidJSON,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(failedRequestStat).To(Not(BeNil()))
Expect(failedRequestStat.Values()).To(Equal([]float64{1}))
failedEventStat := statStore.Get("gateway.write_key_failed_events", map[string]string{
"writeKey": "",
"reqType": "internalBatch",
"reason": response.InvalidJSON,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(failedEventStat).To(BeNil())
})

It("request failed message validation error", func() {
Expand All @@ -1883,6 +2009,18 @@ var _ = Describe("Gateway", func() {
defer httputil.CloseResponse(resp)
Expect(err).To(BeNil())
Expect(string(respData)).Should(ContainSubstring(response.InvalidStreamMessage))
failedRequestStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{
"writeKey": "",
"reqType": "internalBatch",
"reason": response.InvalidStreamMessage,
"workspaceId": "",
"sourceID": "",
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(failedRequestStat).To(Not(BeNil()))
Expect(failedRequestStat.Values()).To(Equal([]float64{1}))
})

It("request success - suppressed user", func() {
Expand All @@ -1892,6 +2030,17 @@ var _ = Describe("Gateway", func() {
resp, err := client.Do(req)
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)
successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(successfulReqStat).To(Not(BeNil()))
Expect(successfulReqStat.Values()).To(Equal([]float64{1}))
})

It("request success - multiple messages", func() {
Expand All @@ -1903,6 +2052,39 @@ var _ = Describe("Gateway", func() {
resp, err := client.Do(req)
Expect(err).To(BeNil())
Expect(http.StatusOK, resp.StatusCode)
successfulReqStat := statStore.Get("gateway.write_key_successful_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(successfulReqStat).To(Not(BeNil()))
Expect(successfulReqStat.LastValue()).To(Equal(float64(3)))
successfulEventStat := statStore.Get("gateway.write_key_successful_events", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(successfulEventStat).To(Not(BeNil()))
Expect(successfulEventStat.LastValue()).To(Equal(float64(3)))
eventsStat := statStore.Get("gateway.write_key_events", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(eventsStat).To(Not(BeNil()))
Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3}))
})

It("request failed db error", func() {
Expand All @@ -1913,6 +2095,41 @@ var _ = Describe("Gateway", func() {
resp, err := client.Do(req)
Expect(err).To(BeNil())
Expect(http.StatusInternalServerError, resp.StatusCode)
failedReqStat := statStore.Get("gateway.write_key_failed_requests", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
"source": "",
"reason": "storeFailed",
})
Expect(failedReqStat).To(Not(BeNil()))
Expect(failedReqStat.Values()).To(Equal([]float64{1}))
failedEventStat := statStore.Get("gateway.write_key_failed_events", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
"source": "",
"reason": "storeFailed",
})
Expect(failedEventStat).To(Not(BeNil()))
Expect(failedEventStat.Values()).To(Equal([]float64{1}))
eventsStat := statStore.Get("gateway.write_key_events", map[string]string{
"writeKey": WriteKeyEnabled,
"reqType": "internalBatch",
"workspaceId": WorkspaceID,
"sourceID": SourceIDEnabled,
"sourceType": "",
"sdkVersion": "",
"source": "",
})
Expect(eventsStat).To(Not(BeNil()))
Expect(eventsStat.Values()).To(Equal([]float64{1, 2, 3, 4}))
})
})

Expand Down Expand Up @@ -1955,17 +2172,25 @@ var _ = Describe("Gateway", func() {
done: make(chan<- string),
requestPayload: payload,
}
jobForm, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
Expect(err).To(BeNil())
jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
Expect(err).To(BeNil())
Expect(jobsWithStats).To(HaveLen(1))
Expect(jobsWithStats[0].stat).To(Equal(
gwstats.SourceStat{
SourceID: "sourceID",
WorkspaceID: "workspaceID",
ReqType: "batch",
},
))

var job struct {
Batch []struct {
ReceivedAt string `json:"receivedAt"`
RequestIP string `json:"request_ip"`
} `json:"batch"`
}
Expect(jobForm).To(HaveLen(1))
err = json.Unmarshal(jobForm[0].EventPayload, &job)
jobForm := jobsWithStats[0].job
err = json.Unmarshal(jobForm.EventPayload, &job)
Expect(err).To(BeNil())
Expect(job.Batch).To(HaveLen(1))
Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("dummyReceivedAtFromPayload"))
Expand Down Expand Up @@ -1995,17 +2220,22 @@ var _ = Describe("Gateway", func() {
done: make(chan<- string),
requestPayload: payload,
}
jobForm, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
jobsWithStats, err := gateway.extractJobsFromInternalBatchPayload("batch", req.requestPayload)
Expect(err).To(BeNil())
Expect(jobsWithStats).To(HaveLen(1))
Expect(jobsWithStats[0].stat).To(Equal(gwstats.SourceStat{
SourceID: "sourceID",
WorkspaceID: "workspaceID",
ReqType: "batch",
}))

var job struct {
Batch []struct {
ReceivedAt string `json:"receivedAt"`
RequestIP string `json:"request_ip"`
} `json:"batch"`
}
Expect(jobForm).To(HaveLen(1))
err = json.Unmarshal(jobForm[0].EventPayload, &job)
err = json.Unmarshal(jobsWithStats[0].job.EventPayload, &job)
Expect(err).To(BeNil())
Expect(job.Batch).To(HaveLen(1))
Expect(job.Batch[0].ReceivedAt).To(ContainSubstring("2024-01-01T01:01:01.000Z"))
Expand Down
Loading

0 comments on commit bbaaabd

Please sign in to comment.