Skip to content

Commit

Permalink
feat(traces): implement external data origin detection
Browse files Browse the repository at this point in the history
Signed-off-by: Wassim DHIF <wassim.dhif@datadoghq.com>
  • Loading branch information
wdhif committed Dec 6, 2024
1 parent 68415f1 commit 9ce206d
Show file tree
Hide file tree
Showing 35 changed files with 1,021 additions and 177 deletions.
24 changes: 24 additions & 0 deletions cmd/trace-agent/config/remote/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/DataDog/datadog-go/v5/statsd"

"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
rcclient "github.com/DataDog/datadog-agent/pkg/config/remote/client"
pbgo "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
"github.com/DataDog/datadog-agent/pkg/trace/api"
Expand Down Expand Up @@ -106,13 +107,36 @@ func getContainerTags(req *http.Request, cfg *config.AgentConfig, provider api.I
if cfg == nil || cfg.ContainerTags == nil {
return nil
}

// Tag using Local Data
if cid := provider.GetContainerID(req.Context(), req.Header); cid != "" {
containerTags, err := cfg.ContainerTags(cid)
if err != nil {
_ = log.Error("Failed getting container tags", err)
}
return containerTags
}

// Tag using External Data
if externalEnv := req.Header.Get("Datadog-External-Env"); externalEnv != "" {
externalData, err := origindetection.ParseExternalData(externalEnv)
if err != nil {
_ = log.Error("Failed parsing external data", err)
}

generatedContainerID, err := cfg.ContainerIDFromExternalData(externalData)
if err != nil {
_ = log.Error("Failed getting container ID from external data", err)
}
log.Errorf("w: Generated ContainerID: %s", generatedContainerID)

containerTags, err := cfg.ContainerTags(generatedContainerID)
if err != nil {
_ = log.Error("Failed getting container tags", err)
}
return containerTags
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions comp/api/api/apiimpl/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (s *serverSecure) TaggerStreamEntities(req *pb.StreamTagsRequest, srv pb.Ag
return s.taggerServer.TaggerStreamEntities(req, srv)
}

func (s *serverSecure) TaggerGenerateContainerIDFromExternalData(ctx context.Context, req *pb.GenerateContainerIDFromExternalDataRequest) (*pb.GenerateContainerIDFromExternalDataResponse, error) {
return s.taggerServer.TaggerGenerateContainerIDFromExternalData(ctx, req)
}

func (s *serverSecure) TaggerFetchEntity(ctx context.Context, req *pb.FetchEntityRequest) (*pb.FetchEntityResponse, error) {
return s.taggerServer.TaggerFetchEntity(ctx, req)
}
Expand Down
4 changes: 3 additions & 1 deletion comp/core/tagger/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package tagger
import (
"context"

"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
taggertypes "github.com/DataDog/datadog-agent/pkg/tagger/types"
Expand All @@ -31,12 +32,13 @@ type Component interface {
Stop() error
ReplayTagger() ReplayTagger
GetTaggerTelemetryStore() *telemetry.Store
Tag(entityID types.EntityID, cardinality types.TagCardinality) ([]string, error)
// LegacyTag has the same behaviour as the Tag method, but it receives the entity id as a string and parses it.
// If possible, avoid using this function, and use the Tag method instead.
// This function exists in order not to break backward compatibility with rtloader and python
// integrations using the tagger
LegacyTag(entity string, cardinality types.TagCardinality) ([]string, error)
Tag(entityID types.EntityID, cardinality types.TagCardinality) ([]string, error)
GenerateContainerIDFromExternalData(externalData origindetection.ExternalData) (string, error)
AccumulateTagsFor(entityID types.EntityID, cardinality types.TagCardinality, tb tagset.TagsAccumulator) error
Standard(entityID types.EntityID) ([]string, error)
List() types.TaggerListResponse
Expand Down
5 changes: 5 additions & 0 deletions comp/core/tagger/impl-noop/tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
taggertypes "github.com/DataDog/datadog-agent/pkg/tagger/types"
Expand Down Expand Up @@ -49,6 +50,10 @@ func (n *noopTagger) LegacyTag(string, types.TagCardinality) ([]string, error) {
return nil, nil
}

func (n *noopTagger) GenerateContainerIDFromExternalData(origindetection.ExternalData) (string, error) {
return "", nil
}

func (n *noopTagger) AccumulateTagsFor(types.EntityID, types.TagCardinality, tagset.TagsAccumulator) error {
return nil
}
Expand Down
80 changes: 80 additions & 0 deletions comp/core/tagger/impl-remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/tagger/utils"
Expand All @@ -43,10 +44,18 @@ import (
const (
noTimeout = 0 * time.Minute
streamRecvTimeout = 10 * time.Minute
// External Data Prefixes
// These prefixes are used to build the External Data Environment Variable.
// This variable is then used for Origin Detection.
externalDataInitPrefix = "it-"
externalDataContainerNamePrefix = "cn-"
externalDataPodUIDPrefix = "pu-"
)

var errTaggerStreamNotStarted = errors.New("tagger stream not started")

var errTaggerFailedGenerateContainerIDFromExternalData = errors.New("tagger failed to generate container ID from external data")

// Requires defines the dependencies for the remote tagger.
type Requires struct {
compdef.In
Expand Down Expand Up @@ -82,6 +91,9 @@ type remoteTagger struct {
streamCancel context.CancelFunc
filter *types.Filter

queryCtx context.Context
queryCancel context.CancelFunc

ctx context.Context
cancel context.CancelFunc

Expand All @@ -99,6 +111,12 @@ type Options struct {
Disabled bool
}

type externalData struct {
init bool
containerName string
podUID string
}

// NewComponent returns a remote tagger
func NewComponent(req Requires) (Provides, error) {
remoteTagger, err := newRemoteTagger(req.Params, req.Config, req.Log, req.Telemetry)
Expand Down Expand Up @@ -250,6 +268,68 @@ func (t *remoteTagger) LegacyTag(entity string, cardinality types.TagCardinality
return t.Tag(entityID, cardinality)
}

// GenerateContainerIDFromExternalData returns a container ID for the given external data.
func (t *remoteTagger) GenerateContainerIDFromExternalData(externalData origindetection.ExternalData) (string, error) {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 500 * time.Millisecond
expBackoff.MaxInterval = 1 * time.Second
expBackoff.MaxElapsedTime = 15 * time.Second

var containerID string

err := backoff.Retry(func() error {
select {
case <-t.telemetryTicker.C:
t.store.collectTelemetry()
case <-t.ctx.Done():
return &backoff.PermanentError{Err: errTaggerFailedGenerateContainerIDFromExternalData}
default:
}

t.telemetryStore.ExternalDataRequests.Inc()

// Fetch the auth token
token, err := t.options.TokenFetcher()
if err != nil {
_ = t.log.Errorf("unable to fetch auth token, will possibly retry: %s", err)
return err
}

// Create the context with the auth token
t.queryCtx, t.queryCancel = context.WithCancel(
metadata.NewOutgoingContext(t.ctx, metadata.MD{
"authorization": []string{fmt.Sprintf("Bearer %s", token)},
}),
)

// Call the GRPC method to get the container ID from the external data
containerIDResponse, err := t.client.TaggerGenerateContainerIDFromExternalData(t.queryCtx, &pb.GenerateContainerIDFromExternalDataRequest{
Init: externalData.Init,
ContainerName: externalData.ContainerName,
PodUID: externalData.PodUID,
})
if err != nil {
_ = t.log.Errorf("unable to generate container ID from external data, will retry: %s", err)
return err
}

if containerIDResponse == nil {
_ = t.log.Warnf("unable to generate container ID from external data, will retry: %s", err)
return errors.New("containerIDResponse is nil")
}
containerID = containerIDResponse.ContainerID

t.telemetryStore.ExternalDataRequests.Inc()
t.log.Debugf("Container ID generated successfully from external data %+v: %s", externalData, containerID)
return nil
}, expBackoff)

if err != nil {
return "", err
}
return containerID, nil
}

// AccumulateTagsFor returns tags for a given entity at the desired cardinality.
func (t *remoteTagger) AccumulateTagsFor(entityID types.EntityID, cardinality types.TagCardinality, tb tagset.TagsAccumulator) error {
tags, err := t.Tag(entityID, cardinality)
Expand Down
9 changes: 9 additions & 0 deletions comp/core/tagger/impl/local_tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/tagger/collectors"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/tagstore"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
taggertypes "github.com/DataDog/datadog-agent/pkg/tagger/types"
"github.com/DataDog/datadog-agent/pkg/tagset"
"github.com/DataDog/datadog-agent/pkg/util/containers/metrics"
"github.com/DataDog/datadog-agent/pkg/util/optional"
)

// Tagger is the entry class for entity tagging. It hold the tagger collector,
Expand Down Expand Up @@ -99,6 +103,11 @@ func (t *localTagger) Tag(entityID types.EntityID, cardinality types.TagCardinal
return tags.Copy(), nil
}

func (t *localTagger) GenerateContainerIDFromExternalData(externalData origindetection.ExternalData) (string, error) {
metaCollector := metrics.GetProvider(optional.NewOption(t.workloadStore)).GetMetaCollector()
return metaCollector.ContainerIDForPodUIDAndContName(externalData.PodUID, externalData.ContainerName, externalData.Init, time.Second)
}

// LegacyTag has the same behaviour as the Tag method, but it receives the entity id as a string and parses it.
// If possible, avoid using this function, and use the Tag method instead.
// This function exists in order not to break backward compatibility with rtloader and python
Expand Down
5 changes: 5 additions & 0 deletions comp/core/tagger/impl/replay_tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/tagstore"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
Expand Down Expand Up @@ -77,6 +78,10 @@ func (t *replayTagger) LegacyTag(entity string, cardinality types.TagCardinality
return t.Tag(entityID, cardinality)
}

func (t *replayTagger) GenerateContainerIDFromExternalData(origindetection.ExternalData) (string, error) {
return "", nil
}

// AccumulateTagsFor returns tags for a given entity at the desired cardinality.
func (t *replayTagger) AccumulateTagsFor(entityID types.EntityID, cardinality types.TagCardinality, tb tagset.TagsAccumulator) error {
tags := t.store.LookupHashed(entityID, cardinality)
Expand Down
9 changes: 9 additions & 0 deletions comp/core/tagger/impl/tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
log "github.com/DataDog/datadog-agent/comp/core/log/def"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
taggermock "github.com/DataDog/datadog-agent/comp/core/tagger/mock"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/tagger/utils"
Expand Down Expand Up @@ -467,6 +468,8 @@ func (t *TaggerWrapper) EnrichTags(tb tagset.TagsAccumulator, originInfo taggert
return
}

// TODO (wassim): Try to re-use the origindetection.GenerateContainerIDFromOriginInfo method.
// That would mean not accumulating for every ContainerID but doing it once.
// Tag using Local Data
if originInfo.ContainerIDFromSocket != packets.NoOrigin && len(originInfo.ContainerIDFromSocket) > containerIDFromSocketCutIndex {
containerID := originInfo.ContainerIDFromSocket[containerIDFromSocketCutIndex:]
Expand Down Expand Up @@ -538,6 +541,12 @@ func (t *TaggerWrapper) EnrichTags(tb tagset.TagsAccumulator, originInfo taggert
}
}

// GenerateContainerIDFromExternalData generates a container ID from the external data.
func (t *TaggerWrapper) GenerateContainerIDFromExternalData(externalData origindetection.ExternalData) (string, error) {
metaCollector := metrics.GetProvider(optional.NewOption(t.wmeta)).GetMetaCollector()
return metaCollector.ContainerIDForPodUIDAndContName(externalData.PodUID, externalData.ContainerName, externalData.Init, time.Second)
}

// generateContainerIDFromExternalData generates a container ID from the external data
func (t *TaggerWrapper) generateContainerIDFromExternalData(e externalData, metricsProvider provider.ContainerIDForPodUIDAndContNameRetriever) (string, error) {
return metricsProvider.ContainerIDForPodUIDAndContName(e.podUID, e.containerName, e.init, time.Second)
Expand Down
5 changes: 5 additions & 0 deletions comp/core/tagger/mock/fake_tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/tagstore"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
Expand Down Expand Up @@ -118,6 +119,10 @@ func (f *FakeTagger) LegacyTag(entity string, cardinality types.TagCardinality)
return f.Tag(entityID, cardinality)
}

func (f *FakeTagger) GenerateContainerIDFromExternalData(origindetection.ExternalData) (string, error) {
return "", nil
}

// GlobalTags fake implementation
func (f *FakeTagger) GlobalTags(cardinality types.TagCardinality) ([]string, error) {
return f.Tag(types.GetGlobalEntityID(), cardinality)
Expand Down
1 change: 1 addition & 0 deletions comp/core/tagger/origindetection/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module github.com/DataDog/datadog-agent/comp/core/tagger/origindetection
Loading

0 comments on commit 9ce206d

Please sign in to comment.