Skip to content

Commit

Permalink
chore: get processor namespace, instanceID during setup (#4718)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth authored May 29, 2024
1 parent 51e3e4e commit 877015e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
18 changes: 12 additions & 6 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ type Handle struct {
jobRunIDs config.ValueLoader[[]string]
}

namespace string
instanceID string

adaptiveLimit func(int64) int64
storePlocker kitsync.PartitionLocker

Expand Down Expand Up @@ -396,6 +399,9 @@ func (proc *Handle) Setup(
}
proc.storePlocker = *kitsync.NewPartitionLocker()

proc.namespace = config.GetKubeNamespace()
proc.instanceID = misc.GetInstanceID()

// Stats
proc.statsFactory = stats.Default
proc.tracer = proc.statsFactory.NewTracer("processor")
Expand Down Expand Up @@ -910,13 +916,13 @@ func enhanceWithTimeFields(event *transformer.TransformerEvent, singularEventMap
event.Message["timestamp"] = timestamp.Format(misc.RFC3339Milli)
}

func makeCommonMetadataFromSingularEvent(singularEvent types.SingularEventT, batchEvent *jobsdb.JobT, receivedAt time.Time, source *backendconfig.SourceT, eventParams types.EventParams) *transformer.Metadata {
func (proc *Handle) makeCommonMetadataFromSingularEvent(singularEvent types.SingularEventT, batchEvent *jobsdb.JobT, receivedAt time.Time, source *backendconfig.SourceT, eventParams types.EventParams) *transformer.Metadata {
commonMetadata := transformer.Metadata{}
commonMetadata.SourceID = source.ID
commonMetadata.SourceName = source.Name
commonMetadata.WorkspaceID = source.WorkspaceID
commonMetadata.Namespace = config.GetKubeNamespace()
commonMetadata.InstanceID = misc.GetInstanceID()
commonMetadata.Namespace = proc.namespace
commonMetadata.InstanceID = proc.instanceID
commonMetadata.RudderID = batchEvent.UserID
commonMetadata.JobID = batchEvent.JobID
commonMetadata.MessageID = stringify.Any(singularEvent["messageId"])
Expand Down Expand Up @@ -1670,7 +1676,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
ReceivedAt: receivedAt,
}

commonMetadataFromSingularEvent := makeCommonMetadataFromSingularEvent(
commonMetadataFromSingularEvent := proc.makeCommonMetadataFromSingularEvent(
singularEvent,
batchEvent,
receivedAt,
Expand Down Expand Up @@ -2418,8 +2424,8 @@ func (proc *Handle) transformSrcDest(
SourceType: eventList[0].Metadata.SourceType,
SourceCategory: eventList[0].Metadata.SourceCategory,
WorkspaceID: workspaceID,
Namespace: config.GetKubeNamespace(),
InstanceID: misc.GetInstanceID(),
Namespace: proc.namespace,
InstanceID: proc.instanceID,
DestinationID: destID,
DestinationType: destType,
SourceDefinitionType: eventList[0].Metadata.SourceDefinitionType,
Expand Down
3 changes: 2 additions & 1 deletion processor/processorBenchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
)

func Benchmark_makeCommonMetadataFromSingularEvent(b *testing.B) {
proc := &Handle{}
for i := 0; i < b.N; i++ {
_ = makeCommonMetadataFromSingularEvent(
_ = proc.makeCommonMetadataFromSingularEvent(
dummySingularEvent, &dummyBatchEvent, time.Now(), &backendconfig.SourceT{
WorkspaceID: "test",
SourceDefinition: backendconfig.SourceDefinitionT{
Expand Down

0 comments on commit 877015e

Please sign in to comment.