Skip to content

Commit

Permalink
chore: send connection config to router transform (#4903)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth authored Sep 2, 2024
1 parent f6782c3 commit 4cae96d
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 27 deletions.
73 changes: 73 additions & 0 deletions backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,48 @@ var sampleBackendConfig = ConfigT{
},
}

var sampleConfigWithConnection = ConfigT{
WorkspaceID: sampleWorkspaceID,
Sources: []SourceT{
{
ID: "1",
WriteKey: "d",
Enabled: false,
}, {
ID: "2",
WriteKey: "d2",
Enabled: false,
Destinations: []DestinationT{
{
ID: "d1",
Name: "processor Disabled",
IsProcessorEnabled: false,
}, {
ID: "d2",
Name: "processor Enabled",
IsProcessorEnabled: true,
},
},
},
},
Connections: map[string]Connection{
"1": {
SourceID: "2",
DestinationID: "d1",
Enabled: true,
Config: map[string]interface{}{"key": "value"},
ProcessorEnabled: false,
},
"2": {
SourceID: "2",
DestinationID: "d2",
Enabled: true,
Config: map[string]interface{}{"key2": "value2"},
ProcessorEnabled: true,
},
},
}

// This configuration is assumed by all gateway tests and, is returned on Subscribe of mocked backend config
var sampleFilteredSources = ConfigT{
Sources: []SourceT{
Expand Down Expand Up @@ -264,6 +306,37 @@ func TestConfigUpdate(t *testing.T) {
require.Equal(t, (<-chProcess).Data, map[string]ConfigT{workspaces: sampleFilteredSources})
require.Equal(t, (<-chBackend).Data, map[string]ConfigT{workspaces: sampleBackendConfig})
})

t.Run("new config with connections", func(t *testing.T) {
var (
ctrl = gomock.NewController(t)
ctx, cancel = context.WithCancel(context.Background())
workspaces = "foo"
cacheStore = cache.NewMockCache(ctrl)
)
defer ctrl.Finish()
defer cancel()

wc := NewMockworkspaceConfig(ctrl)
wc.EXPECT().Get(gomock.Eq(ctx)).Return(map[string]ConfigT{workspaces: sampleConfigWithConnection}, nil).Times(1)

var pubSub pubsub.PublishSubscriber
bc := &backendConfigImpl{
eb: &pubSub,
workspaceConfig: wc,
cache: cacheStore,
}
bc.curSourceJSON = map[string]ConfigT{workspaces: sampleBackendConfig2}

chProcess := pubSub.Subscribe(ctx, string(TopicProcessConfig))
chBackend := pubSub.Subscribe(ctx, string(TopicBackendConfig))

bc.configUpdate(ctx)
require.True(t, bc.initialized)
require.Equal(t, (<-chProcess).Data, map[string]ConfigT{workspaces: sampleFilteredSources})
require.Equal(t, (<-chBackend).Data, map[string]ConfigT{workspaces: sampleConfigWithConnection})
require.Equal(t, bc.curSourceJSON[workspaces].Connections, sampleConfigWithConnection.Connections)
})
}

func TestFilterProcessorEnabledDestinations(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ type ConfigT struct {
Settings Settings `json:"settings"`
UpdatedAt time.Time `json:"updatedAt"`
Credentials map[string]Credential `json:"credentials"`
Connections map[string]Connection `json:"connections"`
}

type Connection struct {
SourceID string `json:"sourceId"`
DestinationID string `json:"destinationId"`
Enabled bool `json:"enabled"`
Config map[string]interface{} `json:"config"`
ProcessorEnabled bool `json:"processorEnabled"`
}

func (c *ConfigT) SourcesMap() map[string]*SourceT {
Expand Down
59 changes: 58 additions & 1 deletion integration_test/multi_tenant_test/testdata/mtGatewayTest02.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,62 @@
}
]
}
]
],
"connections": {
"connedctionID": {
"sourceId": "xxxyyyzzEaEurW247ad9WYZLUyk",
"destinationId": "xxxyyyzzP9kQfzOoKd1tuxchYAG",
"enabled": true,
"config": {
"mapping": {
"enabled": true,
"mapping": {
"alias": {
"enabled": true,
"mapping": {
"email": "email",
"name": "name"
}
},
"group": {
"enabled": true,
"mapping": {
"groupId": "groupId",
"groupName": "groupName"
}
},
"identify": {
"enabled": true,
"mapping": {
"email": "email",
"name": "name"
}
},
"page": {
"enabled": true,
"mapping": {
"pageName": "pageName",
"pageType": "pageType"
}
},
"screen": {
"enabled": true,
"mapping": {
"screenName": "screenName",
"screenType": "screenType"
}
},
"track": {
"enabled": true,
"mapping": {
"eventName": "eventName",
"eventValue": "eventValue"
}
}
}
}
},
"processorEnabled": true
}
}
}
24 changes: 22 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type Handle struct {
sourceIdSourceMap map[string]backendconfig.SourceT
workspaceLibrariesMap map[string]backendconfig.LibrariesT
oneTrustConsentCategoriesMap map[string][]string
connectionConfigMap map[connection]backendconfig.Connection
ketchConsentCategoriesMap map[string][]string
destGenericConsentManagementMap map[string]map[string]GenericConsentManagementProviderData
batchDestinations []string
Expand Down Expand Up @@ -816,6 +817,10 @@ func (proc *Handle) loadReloadableConfig(defaultPayloadLimit int64, defaultMaxEv
proc.config.captureEventNameStats = config.GetReloadableBoolVar(false, "Processor.Stats.captureEventName")
}

type connection struct {
sourceID, destinationID string
}

func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
var initDone bool
ch := proc.backendConfig.Subscribe(ctx, backendconfig.TopicProcessConfig)
Expand All @@ -831,8 +836,12 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
eventAuditEnabled = make(map[string]bool)
credentialsMap = make(map[string][]transformer.Credential)
nonEventStreamSources = make(map[string]bool)
connectionConfigMap = make(map[connection]backendconfig.Connection)
)
for workspaceID, wConfig := range config {
for _, conn := range wConfig.Connections {
connectionConfigMap[connection{sourceID: conn.SourceID, destinationID: conn.DestinationID}] = conn
}
for i := range wConfig.Sources {
source := &wConfig.Sources[i]
sourceIdSourceMap[source.ID] = *source
Expand Down Expand Up @@ -866,6 +875,7 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
})
}
proc.config.configSubscriberLock.Lock()
proc.config.connectionConfigMap = connectionConfigMap
proc.config.oneTrustConsentCategoriesMap = oneTrustConsentCategoriesMap
proc.config.ketchConsentCategoriesMap = ketchConsentCategoriesMap
proc.config.destGenericConsentManagementMap = destGenericConsentManagementMap
Expand All @@ -889,6 +899,12 @@ func (proc *Handle) getWorkspaceLibraries(workspaceID string) backendconfig.Libr
return proc.config.workspaceLibrariesMap[workspaceID]
}

func (proc *Handle) getConnectionConfig(conn connection) backendconfig.Connection {
proc.config.configSubscriberLock.RLock()
defer proc.config.configSubscriberLock.RUnlock()
return proc.config.connectionConfigMap[conn]
}

func (proc *Handle) getSourceBySourceID(sourceId string) (*backendconfig.SourceT, error) {
var err error
proc.config.configSubscriberLock.RLock()
Expand Down Expand Up @@ -1091,6 +1107,7 @@ func (proc *Handle) getTransformerEvents(
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
destination *backendconfig.DestinationT,
connection backendconfig.Connection,
inPU, pu string,
) (
[]transformer.TransformerEvent,
Expand Down Expand Up @@ -1153,6 +1170,7 @@ func (proc *Handle) getTransformerEvents(
Message: userTransformedEvent.Output,
Metadata: *eventMetadata,
Destination: *destination,
Connection: connection,
Credentials: proc.config.credentialsMap[commonMetaData.WorkspaceID],
}
eventsToTransform = append(eventsToTransform, updatedEvent)
Expand Down Expand Up @@ -2005,6 +2023,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
for idx := range enabledDestinationsList {
destination := &enabledDestinationsList[idx]
shallowEventCopy := transformer.TransformerEvent{}
shallowEventCopy.Connection = proc.getConnectionConfig(connection{sourceID: sourceId, destinationID: destination.ID})
shallowEventCopy.Message = singularEvent
shallowEventCopy.Destination = *destination
shallowEventCopy.Libraries = workspaceLibraries
Expand Down Expand Up @@ -2499,6 +2518,7 @@ func (proc *Handle) transformSrcDest(
sourceID, destID := getSourceAndDestIDsFromKey(srcAndDestKey)
sourceName := eventList[0].Metadata.SourceName
destination := &eventList[0].Destination
connection := eventList[0].Connection
workspaceID := eventList[0].Metadata.WorkspaceID
destType := destination.DestinationDefinition.Name
commonMetaData := &transformer.Metadata{
Expand Down Expand Up @@ -2592,7 +2612,7 @@ func (proc *Handle) transformSrcDest(
var successMetrics []*types.PUReportedMetric
var successCountMap map[string]int64
var successCountMetadataMap map[string]MetricMetadata
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.USER_TRANSFORMER)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, connection, inPU, types.USER_TRANSFORMER)
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, inPU, types.USER_TRANSFORMER)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
Expand Down Expand Up @@ -2689,7 +2709,7 @@ func (proc *Handle) transformSrcDest(
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.EVENT_FILTER)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, connection, inPU, types.EVENT_FILTER)
proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform))

// REPORTING - START
Expand Down
3 changes: 2 additions & 1 deletion processor/trackingplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans
trackingPlanEnabledMap[SourceIDT(sourceID)] = true

var successMetrics []*types.PUReportedMetric
eventsToTransform, successMetrics, _, _ := proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation.
eventsToTransform, successMetrics, _, _ := proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, backendconfig.Connection{}, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation.
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR)

validationStat.numValidationSuccessEvents.Count(len(eventsToTransform))
Expand Down
1 change: 1 addition & 0 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type TransformerEvent struct {
Message types.SingularEventT `json:"message"`
Metadata Metadata `json:"metadata"`
Destination backendconfig.DestinationT `json:"destination"`
Connection backendconfig.Connection `json:"connection"`
Libraries []backendconfig.LibraryT `json:"libraries"`
Credentials []Credential `json:"credentials"`
}
Expand Down
1 change: 1 addition & 0 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Handle struct {
oauth oauth.Authorizer
destinationsMapMu sync.RWMutex
destinationsMap map[string]*routerutils.DestinationWithSources // destinationID -> destination
connectionsMap map[types.SourceDest]types.ConnectionWithID
isBackendConfigInitialized bool
backendConfigInitialized chan bool
responseQ chan workerJobStatus
Expand Down
12 changes: 12 additions & 0 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,19 @@ func (rt *Handle) backendConfigSubscriber() {
ch := rt.backendConfig.Subscribe(context.TODO(), backendconfig.TopicBackendConfig)
for configEvent := range ch {
destinationsMap := map[string]*routerutils.DestinationWithSources{}
connectionsMap := map[types.SourceDest]types.ConnectionWithID{}
configData := configEvent.Data.(map[string]backendconfig.ConfigT)
for _, wConfig := range configData {
for connectionID := range wConfig.Connections {
connection := wConfig.Connections[connectionID]
connectionsMap[types.SourceDest{
SourceID: connection.SourceID,
DestinationID: connection.DestinationID,
}] = types.ConnectionWithID{
ConnectionID: connectionID,
Connection: connection,
}
}
for i := range wConfig.Sources {
source := &wConfig.Sources[i]
for i := range source.Destinations {
Expand Down Expand Up @@ -446,6 +457,7 @@ func (rt *Handle) backendConfigSubscriber() {
}
}
rt.destinationsMapMu.Lock()
rt.connectionsMap = connectionsMap
rt.destinationsMap = destinationsMap
rt.destinationsMapMu.Unlock()
if !rt.isBackendConfigInitialized {
Expand Down
Loading

0 comments on commit 4cae96d

Please sign in to comment.