Skip to content

Commit

Permalink
feat(forwarder-agent): Forward timers to otel cols
Browse files Browse the repository at this point in the history
Forwarder Agent forwards loggregator timers to OTel Collectors as OTLP
traces.

Signed-off-by: Andrew Crump <andrew.crump@broadcom.com>
Signed-off-by: Rebecca Roberts <rebecca.roberts@broadcom.com>
Signed-off-by: Matthew Kocher <matthew.kocher@broadcom.com>
  • Loading branch information
ctlong authored and acrmp committed Apr 16, 2024
1 parent 3a27e2c commit ff273ff
Show file tree
Hide file tree
Showing 13 changed files with 2,530 additions and 155 deletions.
61 changes: 61 additions & 0 deletions src/cmd/forwarder-agent/app/app_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
Expand Down Expand Up @@ -255,3 +256,63 @@ func (s *spyOtelColMetricServer) Export(_ context.Context, req *colmetricspb.Exp
func (s *spyOtelColMetricServer) close() {
s.srv.Stop()
}

// A fake OTel Collector trace gRPC server that captures requests made to it.
type spyOtelColTraceServer struct {
coltracepb.UnimplementedTraceServiceServer

srv *grpc.Server
addr string

requests chan *coltracepb.ExportTraceServiceRequest
}

// Creates a spyOtelColTraceServer, starts it listening on a random port,
// registers it as a gRPC service, and writes out a temp file for the forwarder
// agent to recognize it as a destination. The cfgPath it accepts is the folder
// under which to write the temp file.
func startSpyOtelColTraceServer(cfgPath string, tc *testhelper.TestCerts, commonName string) *spyOtelColTraceServer {
serverCreds, err := plumbing.NewServerCredentials(
tc.Cert(commonName),
tc.Key(commonName),
tc.CA(),
)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

lis, err := net.Listen("tcp", "127.0.0.1:")
ExpectWithOffset(1, err).NotTo(HaveOccurred())

s := &spyOtelColTraceServer{
srv: grpc.NewServer(grpc.Creds(serverCreds)),
requests: make(chan *coltracepb.ExportTraceServiceRequest, 10000),
addr: lis.Addr().String(),
}

coltracepb.RegisterTraceServiceServer(s.srv, s)
go s.srv.Serve(lis) //nolint:errcheck

port, err := strconv.Atoi(strings.Split(s.addr, ":")[1])
ExpectWithOffset(1, err).NotTo(HaveOccurred())

dir, err := os.MkdirTemp(cfgPath, "")
ExpectWithOffset(1, err).ToNot(HaveOccurred())
tmpfn := filepath.Join(dir, "ingress_port.yml")

contents := fmt.Sprintf(`---
ingress: %d
protocol: otelcol
`, port)
err = os.WriteFile(tmpfn, []byte(contents), 0600)
ExpectWithOffset(1, err).ToNot(HaveOccurred())

return s
}

func (s *spyOtelColTraceServer) Export(_ context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
s.requests <- req
return &coltracepb.ExportTraceServiceResponse{}, nil
}

func (s *spyOtelColTraceServer) close() {
s.srv.Stop()
}
41 changes: 33 additions & 8 deletions src/cmd/forwarder-agent/app/forwarder_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"code.cloudfoundry.org/loggregator-agent-release/src/pkg/config"
colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/protobuf/proto"

"code.cloudfoundry.org/go-loggregator/v9"
"code.cloudfoundry.org/go-loggregator/v9/rpc/loggregator_v2"
Expand Down Expand Up @@ -349,32 +351,36 @@ var _ = Describe("App", func() {
})

Context("when an OTel Collector is registered to forward to", func() {
var otelServer *spyOtelColMetricServer
var (
otelMetricsServer *spyOtelColMetricServer
otelTraceServer *spyOtelColTraceServer
)

BeforeEach(func() {
otelServer = startSpyOtelColMetricServer(ingressCfgPath, agentCerts, "otel-collector")
otelMetricsServer = startSpyOtelColMetricServer(ingressCfgPath, agentCerts, "otel-collector")
otelTraceServer = startSpyOtelColTraceServer(ingressCfgPath, agentCerts, "otel-collector")
})

AfterEach(func() {
otelServer.close()
otelMetricsServer.close()
otelTraceServer.close()
})

DescribeTable("some envelopes are not forwarded",
func(e *loggregator_v2.Envelope) {
ingressClient.Emit(e)
Consistently(otelServer.requests, 3).ShouldNot(Receive())
Consistently(otelMetricsServer.requests, 3).ShouldNot(Receive())
},
Entry("drops logs", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Log{}}),
Entry("drops events", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}),
Entry("drops timers", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}),
)

It("forwards counters", func() {
name := "test-counter-name"
ingressClient.EmitCounter(name)

var req *colmetricspb.ExportMetricsServiceRequest
Eventually(otelServer.requests).Should(Receive(&req))
Eventually(otelMetricsServer.requests).Should(Receive(&req))

metric := req.ResourceMetrics[0].ScopeMetrics[0].Metrics[0]
Expect(metric.GetName()).To(Equal(name))
Expand All @@ -385,16 +391,35 @@ var _ = Describe("App", func() {
ingressClient.EmitGauge(loggregator.WithGaugeValue(name, 20.2, "test-unit"))

var req *colmetricspb.ExportMetricsServiceRequest
Eventually(otelServer.requests).Should(Receive(&req))
Eventually(otelMetricsServer.requests).Should(Receive(&req))

metric := req.ResourceMetrics[0].ScopeMetrics[0].Metrics[0]
Expect(metric.GetName()).To(Equal(name))
})

It("forwards timers", func() {
name := "test-timer-name"
ingressClient.EmitTimer(name, time.Now(), time.Now().Add(time.Second), func(m proto.Message) {
switch e := m.(type) {
case *loggregator_v2.Envelope:
e.Tags["trace_id"] = "beefdeadbeefdeadbeefdeadbeefdead"
e.Tags["span_id"] = "deadbeefdeadbeef"
default:
panic(fmt.Sprintf("unsupported Message type: %T", m))
}
})

var req *coltracepb.ExportTraceServiceRequest
Eventually(otelTraceServer.requests).Should(Receive(&req))

trace := req.ResourceSpans[0].ScopeSpans[0].Spans[0]
Expect(trace.GetName()).To(Equal(name))
})

It("emits an expired metric", func() {
et := map[string]string{
"protocol": "otelcol",
"destination": otelServer.addr,
"destination": otelMetricsServer.addr,
}

Eventually(agentMetrics.HasMetric).WithArguments("egress_expired_total", et).Should(BeTrue())
Expand Down
73 changes: 0 additions & 73 deletions src/pkg/otelcolclient/metric_batcher.go

This file was deleted.

54 changes: 0 additions & 54 deletions src/pkg/otelcolclient/metric_batcher_test.go

This file was deleted.

Loading

0 comments on commit ff273ff

Please sign in to comment.