Skip to content

Commit

Permalink
fix: take tacking plan id from dgsourceTPconfig (#5041)
Browse files Browse the repository at this point in the history
* fix: take tacking plan id from dgsourceTPconfig

* fix: test cases update

* chore add test and address review comments

* fix: add check for tracking plan id from rudder typer

* chore: adapting test for recent changes

---------

Co-authored-by: achettyiitr <achetty.iitr@gmail.com>
  • Loading branch information
akashrpo and achettyiitr authored Aug 30, 2024
1 parent 5c38dab commit f6782c3
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 9 deletions.
5 changes: 1 addition & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1836,10 +1836,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
trackingPlanVersion := source.DgSourceTrackingPlanConfig.TrackingPlan.Version
rudderTyperTPID := misc.MapLookup(singularEvent, "context", "ruddertyper", "trackingPlanId")
rudderTyperTPVersion := misc.MapLookup(singularEvent, "context", "ruddertyper", "trackingPlanVersion")
if rudderTyperTPID != nil && rudderTyperTPVersion != nil {
if id, ok := rudderTyperTPID.(string); ok && id != "" {
trackingPlanID = id
}
if rudderTyperTPID != nil && rudderTyperTPVersion != nil && rudderTyperTPID == trackingPlanID {
if version, ok := rudderTyperTPVersion.(float64); ok && version > 0 {
trackingPlanVersion = int(version)
}
Expand Down
119 changes: 114 additions & 5 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (c *testContext) Finish() {
const (
WriteKeyEnabled = "enabled-write-key"
WriteKeyEnabledNoUT = "enabled-write-key-no-ut"
WriteKeyEnabledTp = "enabled-write-key-tp"
WriteKeyEnabledNoUT2 = "enabled-write-key-no-ut2"
WriteKeyEnabledOnlyUT = "enabled-write-key-only-ut"
SourceIDEnabled = "enabled-source"
Expand All @@ -158,6 +159,8 @@ const (
WriteKeyTransient = "transient-write-key"
SourceIDEnabledNoUT = "enabled-source-no-ut"
SourceIDEnabledNoUTName = "SourceIDEnabledNoUT"
SourceIDEnabledTp = "enabled-source-tp"
SourceIDEnabledTpName = "SourceIDEnabledTp"
SourceIDEnabledOnlyUT = "enabled-source-only-ut"
SourceIDEnabledOnlyUTName = "SourceIDEnabledOnlyUT"
SourceIDEnabledNoUT2 = "enabled-source-no-ut2"
Expand Down Expand Up @@ -195,6 +198,7 @@ var (
SourceIDGCM: SourceIDGCMName,
SourceIDKetchConsent: SourceIDKetchConsentName,
SourceIDTransient: SourceIDTransientName,
SourceIDEnabledTp: SourceIDEnabledTpName,
}
)

Expand Down Expand Up @@ -882,6 +886,36 @@ var sampleBackendConfig = backendconfig.ConfigT{
},
},
},
{
ID: SourceIDEnabledTp,
Name: SourceIDEnabledTpName,
WriteKey: WriteKeyEnabledTp,
Enabled: true,
SourceDefinition: backendconfig.SourceDefinitionT{
Category: "webhook",
},
Destinations: []backendconfig.DestinationT{
{
ID: DestinationIDEnabledA,
Name: "A",
Enabled: true,
IsProcessorEnabled: true,
DestinationDefinition: backendconfig.DestinationDefinitionT{
ID: "enabled-destination-a-definition-id",
Name: "enabled-destination-a-definition-name",
DisplayName: "enabled-destination-a-definition-display-name",
Config: map[string]interface{}{},
},
},
},
DgSourceTrackingPlanConfig: backendconfig.DgSourceTrackingPlanConfigT{
SourceId: SourceIDEnabledTp,
TrackingPlan: backendconfig.TrackingPlanT{
Id: "tracking-plan-id",
Version: 100,
},
},
},
},
Settings: backendconfig.Settings{
EventAuditEnabled: true,
Expand Down Expand Up @@ -912,7 +946,82 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
})

Context("RudderTyper", func() {
It("TrackingPlanId and TrackingPlanVersion", func() {
It("Tracking plan id and version from DgSourceTrackingPlanConfig", func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
mockTransformer.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(transformer.Response{})

isolationStrategy, err := isolation.GetStrategy(isolation.ModeNone)
Expect(err).To(BeNil())

processor := NewHandle(config.Default, mockTransformer)
processor.isolationStrategy = isolationStrategy
processor.config.archivalEnabled = config.SingleValueLoader(false)
Setup(processor, c, false, false)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")

_ = processor.processJobsForDest(
"",
subJob{
subJobs: []*jobsdb.JobT{
{
UUID: uuid.New(),
JobID: 1,
CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC),
ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC),
CustomVal: gatewayCustomVal[0],
EventPayload: createBatchPayload(
WriteKeyEnabledTp,
"2001-01-02T02:23:45.000Z",
[]mockEventData{
{
id: "1",
jobid: 1,
originalTimestamp: "2000-01-02T01:23:45",
expectedOriginalTimestamp: "2000-01-02T01:23:45.000Z",
sentAt: "2000-01-02 01:23",
expectedSentAt: "2000-01-02T01:23:00.000Z",
expectedReceivedAt: "2001-01-02T02:23:45.000Z",
},
},
func(e mockEventData) string {
return fmt.Sprintf(`
{
"rudderId": "some-rudder-id",
"messageId": "message-%[1]s",
"some-property": "property-%[1]s",
"originalTimestamp": %[2]q,
"sentAt": %[3]q
}
`,
e.id,
e.originalTimestamp,
e.sentAt,
)
},
),
EventCount: 1,
LastJobStatus: jobsdb.JobStatusT{},
Parameters: createBatchParameters(SourceIDEnabledTp),
WorkspaceId: sampleWorkspaceID,
},
},
},
)

Expect(c.MockObserver.calls).To(HaveLen(1))
for _, v := range c.MockObserver.calls {
for _, e := range v.events {
Expect(e.Metadata.TrackingPlanId).To(BeEquivalentTo("tracking-plan-id"))
Expect(e.Metadata.TrackingPlanVersion).To(BeEquivalentTo(100)) // from DgSourceTrackingPlanConfig
}
}
})
It("Tracking plan version override from context.ruddertyper", func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
mockTransformer.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(transformer.Response{})

Expand Down Expand Up @@ -941,7 +1050,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC),
CustomVal: gatewayCustomVal[0],
EventPayload: createBatchPayload(
WriteKeyEnabledNoUT,
WriteKeyEnabledTp,
"2001-01-02T02:23:45.000Z",
[]mockEventData{
{
Expand All @@ -964,7 +1073,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
"sentAt": %[3]q,
"context": {
"ruddertyper": {
"trackingPlanId": "tracking-plan-id",
"trackingPlanId": "tracking-plan-id",
"trackingPlanVersion": 123
}
}
Expand All @@ -978,7 +1087,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
),
EventCount: 1,
LastJobStatus: jobsdb.JobStatusT{},
Parameters: createBatchParameters(SourceIDEnabledNoUT),
Parameters: createBatchParameters(SourceIDEnabledTp),
WorkspaceId: sampleWorkspaceID,
},
},
Expand All @@ -989,7 +1098,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
for _, v := range c.MockObserver.calls {
for _, e := range v.events {
Expect(e.Metadata.TrackingPlanId).To(BeEquivalentTo("tracking-plan-id"))
Expect(e.Metadata.TrackingPlanVersion).To(BeEquivalentTo(123))
Expect(e.Metadata.TrackingPlanVersion).To(BeEquivalentTo(123)) // Overridden happens when tracking plan id is same in context.ruddertyper and DgSourceTrackingPlanConfig
}
}
})
Expand Down

0 comments on commit f6782c3

Please sign in to comment.