Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: send connection config to router transform #4903

Merged
merged 8 commits into from
Sep 2, 2024
73 changes: 73 additions & 0 deletions backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,48 @@ var sampleBackendConfig = ConfigT{
},
}

var sampleConfigWithConnection = ConfigT{
WorkspaceID: sampleWorkspaceID,
Sources: []SourceT{
{
ID: "1",
WriteKey: "d",
Enabled: false,
}, {
ID: "2",
WriteKey: "d2",
Enabled: false,
Destinations: []DestinationT{
{
ID: "d1",
Name: "processor Disabled",
IsProcessorEnabled: false,
}, {
ID: "d2",
Name: "processor Enabled",
IsProcessorEnabled: true,
},
},
},
},
Connections: map[string]Connection{
"1": {
SourceID: "2",
DestinationID: "d1",
Enabled: true,
Config: map[string]interface{}{"key": "value"},
ProcessorEnabled: false,
},
"2": {
SourceID: "2",
DestinationID: "d2",
Enabled: true,
Config: map[string]interface{}{"key2": "value2"},
ProcessorEnabled: true,
},
},
}

// This configuration is assumed by all gateway tests and, is returned on Subscribe of mocked backend config
var sampleFilteredSources = ConfigT{
Sources: []SourceT{
Expand Down Expand Up @@ -264,6 +306,37 @@ func TestConfigUpdate(t *testing.T) {
require.Equal(t, (<-chProcess).Data, map[string]ConfigT{workspaces: sampleFilteredSources})
require.Equal(t, (<-chBackend).Data, map[string]ConfigT{workspaces: sampleBackendConfig})
})

t.Run("new config with connections", func(t *testing.T) {
var (
ctrl = gomock.NewController(t)
ctx, cancel = context.WithCancel(context.Background())
workspaces = "foo"
cacheStore = cache.NewMockCache(ctrl)
)
defer ctrl.Finish()
defer cancel()

wc := NewMockworkspaceConfig(ctrl)
wc.EXPECT().Get(gomock.Eq(ctx)).Return(map[string]ConfigT{workspaces: sampleConfigWithConnection}, nil).Times(1)

var pubSub pubsub.PublishSubscriber
bc := &backendConfigImpl{
eb: &pubSub,
workspaceConfig: wc,
cache: cacheStore,
}
bc.curSourceJSON = map[string]ConfigT{workspaces: sampleBackendConfig2}

chProcess := pubSub.Subscribe(ctx, string(TopicProcessConfig))
chBackend := pubSub.Subscribe(ctx, string(TopicBackendConfig))

bc.configUpdate(ctx)
require.True(t, bc.initialized)
require.Equal(t, (<-chProcess).Data, map[string]ConfigT{workspaces: sampleFilteredSources})
require.Equal(t, (<-chBackend).Data, map[string]ConfigT{workspaces: sampleConfigWithConnection})
require.Equal(t, bc.curSourceJSON[workspaces].Connections, sampleConfigWithConnection.Connections)
})
}

func TestFilterProcessorEnabledDestinations(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions backend-config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ type ConfigT struct {
Settings Settings `json:"settings"`
UpdatedAt time.Time `json:"updatedAt"`
Credentials map[string]Credential `json:"credentials"`
Connections map[string]Connection `json:"connections"`
}

type Connection struct {
SourceID string `json:"sourceId"`
DestinationID string `json:"destinationId"`
Enabled bool `json:"enabled"`
Config map[string]interface{} `json:"config"`
ProcessorEnabled bool `json:"processorEnabled"`
}

func (c *ConfigT) SourcesMap() map[string]*SourceT {
Expand Down
59 changes: 58 additions & 1 deletion integration_test/multi_tenant_test/testdata/mtGatewayTest02.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,62 @@
}
]
}
]
],
"connections": {
"connedctionID": {
"sourceId": "xxxyyyzzEaEurW247ad9WYZLUyk",
"destinationId": "xxxyyyzzP9kQfzOoKd1tuxchYAG",
"enabled": true,
"config": {
"mapping": {
"enabled": true,
"mapping": {
"alias": {
"enabled": true,
"mapping": {
"email": "email",
"name": "name"
}
},
"group": {
"enabled": true,
"mapping": {
"groupId": "groupId",
"groupName": "groupName"
}
},
"identify": {
"enabled": true,
"mapping": {
"email": "email",
"name": "name"
}
},
"page": {
"enabled": true,
"mapping": {
"pageName": "pageName",
"pageType": "pageType"
}
},
"screen": {
"enabled": true,
"mapping": {
"screenName": "screenName",
"screenType": "screenType"
}
},
"track": {
"enabled": true,
"mapping": {
"eventName": "eventName",
"eventValue": "eventValue"
}
}
}
}
},
"processorEnabled": true
}
}
}
24 changes: 22 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type Handle struct {
sourceIdSourceMap map[string]backendconfig.SourceT
workspaceLibrariesMap map[string]backendconfig.LibrariesT
oneTrustConsentCategoriesMap map[string][]string
connectionConfigMap map[connection]backendconfig.Connection
ketchConsentCategoriesMap map[string][]string
destGenericConsentManagementMap map[string]map[string]GenericConsentManagementProviderData
batchDestinations []string
Expand Down Expand Up @@ -816,6 +817,10 @@ func (proc *Handle) loadReloadableConfig(defaultPayloadLimit int64, defaultMaxEv
proc.config.captureEventNameStats = config.GetReloadableBoolVar(false, "Processor.Stats.captureEventName")
}

type connection struct {
sourceID, destinationID string
}

func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
var initDone bool
ch := proc.backendConfig.Subscribe(ctx, backendconfig.TopicProcessConfig)
Expand All @@ -831,8 +836,12 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
eventAuditEnabled = make(map[string]bool)
credentialsMap = make(map[string][]transformer.Credential)
nonEventStreamSources = make(map[string]bool)
connectionConfigMap = make(map[connection]backendconfig.Connection)
)
for workspaceID, wConfig := range config {
for _, conn := range wConfig.Connections {
connectionConfigMap[connection{sourceID: conn.SourceID, destinationID: conn.DestinationID}] = conn
}
for i := range wConfig.Sources {
source := &wConfig.Sources[i]
sourceIdSourceMap[source.ID] = *source
Expand Down Expand Up @@ -866,6 +875,7 @@ func (proc *Handle) backendConfigSubscriber(ctx context.Context) {
})
}
proc.config.configSubscriberLock.Lock()
proc.config.connectionConfigMap = connectionConfigMap
proc.config.oneTrustConsentCategoriesMap = oneTrustConsentCategoriesMap
proc.config.ketchConsentCategoriesMap = ketchConsentCategoriesMap
proc.config.destGenericConsentManagementMap = destGenericConsentManagementMap
Expand All @@ -889,6 +899,12 @@ func (proc *Handle) getWorkspaceLibraries(workspaceID string) backendconfig.Libr
return proc.config.workspaceLibrariesMap[workspaceID]
}

func (proc *Handle) getConnectionConfig(conn connection) backendconfig.Connection {
proc.config.configSubscriberLock.RLock()
defer proc.config.configSubscriberLock.RUnlock()
return proc.config.connectionConfigMap[conn]
}

func (proc *Handle) getSourceBySourceID(sourceId string) (*backendconfig.SourceT, error) {
var err error
proc.config.configSubscriberLock.RLock()
Expand Down Expand Up @@ -1091,6 +1107,7 @@ func (proc *Handle) getTransformerEvents(
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
destination *backendconfig.DestinationT,
connection backendconfig.Connection,
inPU, pu string,
) (
[]transformer.TransformerEvent,
Expand Down Expand Up @@ -1153,6 +1170,7 @@ func (proc *Handle) getTransformerEvents(
Message: userTransformedEvent.Output,
Metadata: *eventMetadata,
Destination: *destination,
Connection: connection,
Credentials: proc.config.credentialsMap[commonMetaData.WorkspaceID],
}
eventsToTransform = append(eventsToTransform, updatedEvent)
Expand Down Expand Up @@ -2005,6 +2023,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
for idx := range enabledDestinationsList {
destination := &enabledDestinationsList[idx]
shallowEventCopy := transformer.TransformerEvent{}
shallowEventCopy.Connection = proc.getConnectionConfig(connection{sourceID: sourceId, destinationID: destination.ID})
shallowEventCopy.Message = singularEvent
shallowEventCopy.Destination = *destination
shallowEventCopy.Libraries = workspaceLibraries
Expand Down Expand Up @@ -2499,6 +2518,7 @@ func (proc *Handle) transformSrcDest(
sourceID, destID := getSourceAndDestIDsFromKey(srcAndDestKey)
sourceName := eventList[0].Metadata.SourceName
destination := &eventList[0].Destination
connection := eventList[0].Connection
workspaceID := eventList[0].Metadata.WorkspaceID
destType := destination.DestinationDefinition.Name
commonMetaData := &transformer.Metadata{
Expand Down Expand Up @@ -2592,7 +2612,7 @@ func (proc *Handle) transformSrcDest(
var successMetrics []*types.PUReportedMetric
var successCountMap map[string]int64
var successCountMetadataMap map[string]MetricMetadata
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.USER_TRANSFORMER)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, connection, inPU, types.USER_TRANSFORMER)
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, inPU, types.USER_TRANSFORMER)
droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
Expand Down Expand Up @@ -2689,7 +2709,7 @@ func (proc *Handle) transformSrcDest(
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
}
procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, inPU, types.EVENT_FILTER)
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, connection, inPU, types.EVENT_FILTER)
proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform))

// REPORTING - START
Expand Down
3 changes: 2 additions & 1 deletion processor/trackingplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans
trackingPlanEnabledMap[SourceIDT(sourceID)] = true

var successMetrics []*types.PUReportedMetric
eventsToTransform, successMetrics, _, _ := proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation.
eventsToTransform, successMetrics, _, _ := proc.getTransformerEvents(response, commonMetaData, eventsByMessageID, destination, backendconfig.Connection{}, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation.
nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, types.DESTINATION_FILTER, types.TRACKINGPLAN_VALIDATOR)

validationStat.numValidationSuccessEvents.Count(len(eventsToTransform))
Expand Down
1 change: 1 addition & 0 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type TransformerEvent struct {
Message types.SingularEventT `json:"message"`
Metadata Metadata `json:"metadata"`
Destination backendconfig.DestinationT `json:"destination"`
Connection backendconfig.Connection `json:"connection"`
Libraries []backendconfig.LibraryT `json:"libraries"`
Credentials []Credential `json:"credentials"`
}
Expand Down
1 change: 1 addition & 0 deletions router/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Handle struct {
oauth oauth.Authorizer
destinationsMapMu sync.RWMutex
destinationsMap map[string]*routerutils.DestinationWithSources // destinationID -> destination
connectionsMap map[types.SourceDest]types.ConnectionWithID
isBackendConfigInitialized bool
backendConfigInitialized chan bool
responseQ chan workerJobStatus
Expand Down
12 changes: 12 additions & 0 deletions router/handle_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,19 @@ func (rt *Handle) backendConfigSubscriber() {
ch := rt.backendConfig.Subscribe(context.TODO(), backendconfig.TopicBackendConfig)
for configEvent := range ch {
destinationsMap := map[string]*routerutils.DestinationWithSources{}
connectionsMap := map[types.SourceDest]types.ConnectionWithID{}
configData := configEvent.Data.(map[string]backendconfig.ConfigT)
for _, wConfig := range configData {
for connectionID := range wConfig.Connections {
connection := wConfig.Connections[connectionID]
connectionsMap[types.SourceDest{
SourceID: connection.SourceID,
DestinationID: connection.DestinationID,
}] = types.ConnectionWithID{
ConnectionID: connectionID,
Connection: connection,
}
}
for i := range wConfig.Sources {
source := &wConfig.Sources[i]
for i := range source.Destinations {
Expand Down Expand Up @@ -446,6 +457,7 @@ func (rt *Handle) backendConfigSubscriber() {
}
}
rt.destinationsMapMu.Lock()
rt.connectionsMap = connectionsMap
rt.destinationsMap = destinationsMap
rt.destinationsMapMu.Unlock()
if !rt.isBackendConfigInitialized {
Expand Down
Loading
Loading