Skip to content

Commit

Permalink
feat(tagger): implement remote OriginInfo resolution (#31836)
Browse files Browse the repository at this point in the history
Signed-off-by: Wassim DHIF <wassim.dhif@datadoghq.com>
  • Loading branch information
wdhif authored Dec 17, 2024
1 parent 8ef1fd9 commit 9c7198c
Show file tree
Hide file tree
Showing 23 changed files with 1,154 additions and 280 deletions.
6 changes: 6 additions & 0 deletions comp/api/api/apiimpl/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (s *serverSecure) TaggerStreamEntities(req *pb.StreamTagsRequest, srv pb.Ag
return s.taggerServer.TaggerStreamEntities(req, srv)
}

// TaggerGenerateContainerIDFromOriginInfo generates a container ID from the Origin Info.
// This function takes an Origin Info but only uses the ExternalData part of it, this is done for backward compatibility.
func (s *serverSecure) TaggerGenerateContainerIDFromOriginInfo(ctx context.Context, req *pb.GenerateContainerIDFromOriginInfoRequest) (*pb.GenerateContainerIDFromOriginInfoResponse, error) {
return s.taggerServer.TaggerGenerateContainerIDFromOriginInfo(ctx, req)
}

func (s *serverSecure) TaggerFetchEntity(ctx context.Context, req *pb.FetchEntityRequest) (*pb.FetchEntityResponse, error) {
return s.taggerServer.TaggerFetchEntity(ctx, req)
}
Expand Down
2 changes: 2 additions & 0 deletions comp/core/tagger/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package tagger
import (
"context"

"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
taggertypes "github.com/DataDog/datadog-agent/pkg/tagger/types"
Expand Down Expand Up @@ -37,6 +38,7 @@ type Component interface {
// integrations using the tagger
LegacyTag(entity string, cardinality types.TagCardinality) ([]string, error)
Tag(entityID types.EntityID, cardinality types.TagCardinality) ([]string, error)
GenerateContainerIDFromOriginInfo(originInfo origindetection.OriginInfo) (string, error)
AccumulateTagsFor(entityID types.EntityID, cardinality types.TagCardinality, tb tagset.TagsAccumulator) error
Standard(entityID types.EntityID) ([]string, error)
List() types.TaggerListResponse
Expand Down
7 changes: 7 additions & 0 deletions comp/core/tagger/impl-noop/tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
taggertypes "github.com/DataDog/datadog-agent/pkg/tagger/types"
Expand Down Expand Up @@ -49,6 +50,12 @@ func (n *noopTagger) LegacyTag(string, types.TagCardinality) ([]string, error) {
return nil, nil
}

// GenerateContainerIDFromOriginInfo generates a container ID from Origin Info.
// This is a no-op for the noop tagger
func (n *noopTagger) GenerateContainerIDFromOriginInfo(origindetection.OriginInfo) (string, error) {
return "", nil
}

func (n *noopTagger) AccumulateTagsFor(types.EntityID, types.TagCardinality, tagset.TagsAccumulator) error {
return nil
}
Expand Down
80 changes: 80 additions & 0 deletions comp/core/tagger/impl-remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/tagger/utils"
Expand All @@ -47,6 +48,8 @@ const (

var errTaggerStreamNotStarted = errors.New("tagger stream not started")

var errTaggerFailedGenerateContainerIDFromOriginInfo = errors.New("tagger failed to generate container ID from origin info")

// Requires defines the dependencies for the remote tagger.
type Requires struct {
compdef.In
Expand Down Expand Up @@ -75,13 +78,17 @@ type remoteTagger struct {
log log.Component

conn *grpc.ClientConn
token string
client pb.AgentSecureClient
stream pb.AgentSecure_TaggerStreamEntitiesClient

streamCtx context.Context
streamCancel context.CancelFunc
filter *types.Filter

queryCtx context.Context
queryCancel context.CancelFunc

ctx context.Context
cancel context.CancelFunc

Expand Down Expand Up @@ -250,6 +257,79 @@ func (t *remoteTagger) LegacyTag(entity string, cardinality types.TagCardinality
return t.Tag(entityID, cardinality)
}

// GenerateContainerIDFromOriginInfo returns a container ID for the given Origin Info.
// This function currently only uses the External Data from the Origin Info to generate the container ID.
func (t *remoteTagger) GenerateContainerIDFromOriginInfo(originInfo origindetection.OriginInfo) (string, error) {
fail := true
defer func() {
if fail {
t.telemetryStore.OriginInfoRequests.Inc("failed")
} else {
t.telemetryStore.OriginInfoRequests.Inc("success")
}
}()

expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = 500 * time.Millisecond
expBackoff.MaxInterval = 1 * time.Second
expBackoff.MaxElapsedTime = 15 * time.Second

var containerID string

err := backoff.Retry(func() error {
select {
case <-t.ctx.Done():
return &backoff.PermanentError{Err: errTaggerFailedGenerateContainerIDFromOriginInfo}
default:
}

// Fetch the auth token
if t.token == "" {
var authError error
t.token, authError = t.options.TokenFetcher()
if authError != nil {
_ = t.log.Errorf("unable to fetch auth token, will possibly retry: %s", authError)
return authError
}
}

// Create the context with the auth token
t.queryCtx, t.queryCancel = context.WithCancel(
metadata.NewOutgoingContext(t.ctx, metadata.MD{
"authorization": []string{fmt.Sprintf("Bearer %s", t.token)},
}),
)

// Call the gRPC method to get the container ID from the origin info
containerIDResponse, err := t.client.TaggerGenerateContainerIDFromOriginInfo(t.queryCtx, &pb.GenerateContainerIDFromOriginInfoRequest{
ExternalData: &pb.GenerateContainerIDFromOriginInfoRequest_ExternalData{
Init: &originInfo.ExternalData.Init,
ContainerName: &originInfo.ExternalData.ContainerName,
PodUID: &originInfo.ExternalData.PodUID,
},
})
if err != nil {
_ = t.log.Errorf("unable to generate container ID from origin info, will retry: %s", err)
return err
}

if containerIDResponse == nil {
_ = t.log.Warnf("unable to generate container ID from origin info, will retry: %s", err)
return errors.New("containerIDResponse is nil")
}
containerID = containerIDResponse.ContainerID

fail = false
t.log.Debugf("Container ID generated successfully from origin info %+v: %s", originInfo, containerID)
return nil
}, expBackoff)

if err != nil {
return "", err
}
return containerID, nil
}

// AccumulateTagsFor returns tags for a given entity at the desired cardinality.
func (t *remoteTagger) AccumulateTagsFor(entityID types.EntityID, cardinality types.TagCardinality, tb tagset.TagsAccumulator) error {
tags, err := t.Tag(entityID, cardinality)
Expand Down
10 changes: 10 additions & 0 deletions comp/core/tagger/impl/local_tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/tagger/collectors"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/tagstore"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
taggertypes "github.com/DataDog/datadog-agent/pkg/tagger/types"
"github.com/DataDog/datadog-agent/pkg/tagset"
"github.com/DataDog/datadog-agent/pkg/util/containers/metrics"
"github.com/DataDog/datadog-agent/pkg/util/optional"
)

// Tagger is the entry class for entity tagging. It hold the tagger collector,
Expand Down Expand Up @@ -99,6 +103,12 @@ func (t *localTagger) Tag(entityID types.EntityID, cardinality types.TagCardinal
return tags.Copy(), nil
}

// GenerateContainerIDFromOriginInfo generates a container ID from Origin Info.
func (t *localTagger) GenerateContainerIDFromOriginInfo(originInfo origindetection.OriginInfo) (string, error) {
metaCollector := metrics.GetProvider(optional.NewOption(t.workloadStore)).GetMetaCollector()
return metaCollector.ContainerIDForPodUIDAndContName(originInfo.ExternalData.PodUID, originInfo.ExternalData.ContainerName, originInfo.ExternalData.Init, time.Second)
}

// LegacyTag has the same behaviour as the Tag method, but it receives the entity id as a string and parses it.
// If possible, avoid using this function, and use the Tag method instead.
// This function exists in order not to break backward compatibility with rtloader and python
Expand Down
7 changes: 7 additions & 0 deletions comp/core/tagger/impl/replay_tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/tagstore"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
Expand Down Expand Up @@ -77,6 +78,12 @@ func (t *replayTagger) LegacyTag(entity string, cardinality types.TagCardinality
return t.Tag(entityID, cardinality)
}

// GenerateContainerIDFromOriginInfo generates a container ID from Origin Info.
// This is a no-op for the replay tagger
func (t *replayTagger) GenerateContainerIDFromOriginInfo(origindetection.OriginInfo) (string, error) {
return "", nil
}

// AccumulateTagsFor returns tags for a given entity at the desired cardinality.
func (t *replayTagger) AccumulateTagsFor(entityID types.EntityID, cardinality types.TagCardinality, tb tagset.TagsAccumulator) error {
tags := t.store.LookupHashed(entityID, cardinality)
Expand Down
8 changes: 7 additions & 1 deletion comp/core/tagger/impl/tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
log "github.com/DataDog/datadog-agent/comp/core/log/def"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
taggermock "github.com/DataDog/datadog-agent/comp/core/tagger/mock"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/tagger/utils"
Expand Down Expand Up @@ -538,7 +539,12 @@ func (t *TaggerWrapper) EnrichTags(tb tagset.TagsAccumulator, originInfo taggert
}
}

// generateContainerIDFromExternalData generates a container ID from the external data
// GenerateContainerIDFromOriginInfo generates a container ID from Origin Info.
func (t *TaggerWrapper) GenerateContainerIDFromOriginInfo(originInfo origindetection.OriginInfo) (string, error) {
return t.defaultTagger.GenerateContainerIDFromOriginInfo(originInfo)
}

// generateContainerIDFromExternalData generates a container ID from the External Data.
func (t *TaggerWrapper) generateContainerIDFromExternalData(e externalData, metricsProvider provider.ContainerIDForPodUIDAndContNameRetriever) (string, error) {
return metricsProvider.ContainerIDForPodUIDAndContName(e.podUID, e.containerName, e.init, time.Second)
}
Expand Down
6 changes: 6 additions & 0 deletions comp/core/tagger/mock/fake_tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/origindetection"
"github.com/DataDog/datadog-agent/comp/core/tagger/tagstore"
"github.com/DataDog/datadog-agent/comp/core/tagger/telemetry"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
Expand Down Expand Up @@ -118,6 +119,11 @@ func (f *FakeTagger) LegacyTag(entity string, cardinality types.TagCardinality)
return f.Tag(entityID, cardinality)
}

// GenerateContainerIDFromOriginInfo fake implementation
func (f *FakeTagger) GenerateContainerIDFromOriginInfo(origindetection.OriginInfo) (string, error) {
return "", nil
}

// GlobalTags fake implementation
func (f *FakeTagger) GlobalTags(cardinality types.TagCardinality) ([]string, error) {
return f.Tag(types.GetGlobalEntityID(), cardinality)
Expand Down
14 changes: 14 additions & 0 deletions comp/core/tagger/origindetection/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module github.com/DataDog/datadog-agent/comp/core/tagger/origindetection

go 1.22.0

require github.com/stretchr/testify v1.10.0

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
23 changes: 23 additions & 0 deletions comp/core/tagger/origindetection/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 84 additions & 0 deletions comp/core/tagger/origindetection/origindetection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

// TODO: A lot of the code in this file is currently duplicated in taggertypes.
// We will need to move all the code in taggertype to this file and remove the taggertypes package.

// Package origindetection contains the types and functions used for Origin Detection.
package origindetection

import (
"strconv"
"strings"
)

// ProductOrigin is the origin of the product that sent the entity.
type ProductOrigin int

const (
// ProductOriginDogStatsDLegacy is the ProductOrigin for DogStatsD in Legacy mode.
// TODO: remove this when dogstatsd_origin_detection_unified is enabled by default
ProductOriginDogStatsDLegacy ProductOrigin = iota
// ProductOriginDogStatsD is the ProductOrigin for DogStatsD.
ProductOriginDogStatsD ProductOrigin = iota
// ProductOriginAPM is the ProductOrigin for APM.
ProductOriginAPM ProductOrigin = iota

// External Data Prefixes
// These prefixes are used to build the External Data Environment Variable.

// ExternalDataInitPrefix is the prefix for the Init flag in the External Data.
ExternalDataInitPrefix = "it-"
// ExternalDataContainerNamePrefix is the prefix for the Container Name in the External Data.
ExternalDataContainerNamePrefix = "cn-"
// ExternalDataPodUIDPrefix is the prefix for the Pod UID in the External Data.
ExternalDataPodUIDPrefix = "pu-"
)

// OriginInfo contains the Origin Detection information.
type OriginInfo struct {
LocalData LocalData // LocalData is the local data list.
ExternalData ExternalData // ExternalData is the external data list.
Cardinality string // Cardinality is the cardinality of the resolved origin.
ProductOrigin ProductOrigin // ProductOrigin is the product that sent the origin information.
}

// LocalData that is generated by the client and sent to the Agent.
type LocalData struct {
ProcessID uint32 // ProcessID of the container process on the host.
ContainerID string // ContainerID sent from the client.
Inode uint64 // Inode is the Cgroup inode of the container.
PodUID string // PodUID of the pod sent from the client.
}

// ExternalData generated by the Admission Controller and sent to the Agent.
type ExternalData struct {
Init bool // Init is true if the container is an init container.
ContainerName string // ContainerName is the name of the container as seen by the Admission Controller.
PodUID string // PodUID is the UID of the pod as seen by the Admission Controller.
}

// GenerateContainerIDFromExternalData generates a container ID from the external data.
type GenerateContainerIDFromExternalData func(externalData ExternalData) (string, error)

// ParseExternalData parses the external data string into an ExternalData struct.
func ParseExternalData(externalEnv string) (ExternalData, error) {
if externalEnv == "" {
return ExternalData{}, nil
}
var externalData ExternalData
var parsingError error
for _, item := range strings.Split(externalEnv, ",") {
switch {
case strings.HasPrefix(item, ExternalDataInitPrefix):
externalData.Init, parsingError = strconv.ParseBool(item[len(ExternalDataInitPrefix):])
case strings.HasPrefix(item, ExternalDataContainerNamePrefix):
externalData.ContainerName = item[len(ExternalDataContainerNamePrefix):]
case strings.HasPrefix(item, ExternalDataPodUIDPrefix):
externalData.PodUID = item[len(ExternalDataPodUIDPrefix):]
}
}
return externalData, parsingError
}
Loading

0 comments on commit 9c7198c

Please sign in to comment.