Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2.2] Stream Envoy metrics to the cloud #4053

Merged
merged 34 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
ed0775f
Prototype to stream Envoy metrics to Ambassador's backend
douglascamata Jan 26, 2022
a298915
Stream Envoy metrics to the cloud
douglascamata Jan 26, 2022
b869f5c
Port over metrics sink to envoy v3
douglascamata Jan 27, 2022
a4f088f
Fix logs and metrics v2 import
douglascamata Jan 27, 2022
4d0d08b
Mount the go build cache in the builder dockerfile
douglascamata Jan 27, 2022
d2025a2
Update Helm chart with grpc port and service
douglascamata Jan 27, 2022
9ca987f
Mark agent's grpc service as required k8s config
douglascamata Jan 27, 2022
4fd1cbc
Update generated files
douglascamata Jan 27, 2022
1c51788
Change all references of CEPC to DCP
douglascamata Jan 27, 2022
8982f80
Added release notes about streaming metrics
douglascamata Jan 28, 2022
6753404
Stream only the metrics we need to the cloud
douglascamata Jan 28, 2022
b2c5378
Merge branch 'master' of github.com:emissary-ingress/emissary into dc…
douglascamata Jan 28, 2022
a42dae0
Force BuildKit on in builder.mk
Jan 21, 2022
b4a346f
Fix mock client for grpc call
douglascamata Jan 28, 2022
7350a57
Properly break out of suffix loop if found match
douglascamata Jan 28, 2022
45be7de
Merge branch 'master' into dcamata/agent-metrics-stream
Jan 31, 2022
0f8bd8a
Merge branch 'master' of github.com:emissary-ingress/emissary into dc…
Feb 2, 2022
af33779
Merge branch 'dcamata/agent-metrics-stream' of github.com:emissary-in…
Feb 2, 2022
e28a361
Change Envoy metrics server from 8123 and 8006
Feb 3, 2022
0a1fccb
Upgrade to metrics v3 transport api
Feb 3, 2022
30e35b6
Fix logic error in envoy metrics filtering
Feb 3, 2022
63ae1a7
Merge branch 'master' of github.com:emissary-ingress/emissary into dc…
Feb 3, 2022
fb36a8c
Update generated files
Feb 3, 2022
d5b19d4
Improve python host/port parsing to work with ipv6
Feb 3, 2022
10a7796
Use dhttp package to start the metrics server
Feb 3, 2022
ee40b51
Start the metrics server with dgroup
Feb 3, 2022
2b7544a
Merge branch 'master' into dcamata/agent-metrics-stream
Feb 4, 2022
902d455
Whoops, missed a commit for the CHANGELOG
Feb 4, 2022
0029397
Merge branch 'dcamata/agent-metrics-stream' of github.com:emissary-in…
Feb 4, 2022
7344154
Merge branch 'master' of github.com:emissary-ingress/emissary into dc…
Feb 4, 2022
d305b81
Return error from metrics-server group
Feb 4, 2022
41d620b
Fix logging in the envoy metrics server
Feb 4, 2022
128bce4
Fix Python type signature of split_host_port
Feb 4, 2022
6473414
Pin pytest back to 6.2.5.
Feb 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v2"
kflynn marked this conversation as resolved.
Show resolved Hide resolved
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -599,6 +600,24 @@ func (a *Agent) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Sna
return nil
}

func (a *Agent) MetricsRelayHandler(in *envoyMetrics.StreamMetricsMessage) {
ctx := context.Background()
kflynn marked this conversation as resolved.
Show resolved Hide resolved
dlog.Debugf(ctx, "received %d metrics", len(in.GetEnvoyMetrics()))
if a.comm != nil && !a.reportingStopped {
a.ambassadorAPIKeyMutex.Lock()
apikey := a.ambassadorAPIKey
a.ambassadorAPIKeyMutex.Unlock()
outMessage := &agent.StreamMetricsMessage{
Identity: a.agentID,
EnvoyMetrics: in.EnvoyMetrics,
}
dlog.Debugf(ctx, "relaying %d metrics", len(outMessage.GetEnvoyMetrics()))
if err := a.comm.StreamMetrics(ctx, outMessage, apikey); err != nil {
dlog.Errorf(ctx, "Error streaming metrics: %+v", err)
}
}
}

// ClearComm ends the current connection to the Director, if it exists, thereby
// forcing a new connection to be created when needed.
func (a *Agent) ClearComm() {
Expand Down
27 changes: 21 additions & 6 deletions pkg/agent/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/url"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand All @@ -19,12 +20,13 @@ import (
const APIKeyMetadataKey = "x-ambassador-api-key"

type RPCComm struct {
conn io.Closer
client agent.DirectorClient
rptWake chan struct{}
retCancel context.CancelFunc
agentID *agent.Identity
directives chan *agent.Directive
conn io.Closer
client agent.DirectorClient
rptWake chan struct{}
retCancel context.CancelFunc
agentID *agent.Identity
directives chan *agent.Directive
metricsStreamWriterMutex sync.Mutex
}

const (
Expand Down Expand Up @@ -189,6 +191,19 @@ func (c *RPCComm) Report(ctx context.Context, report *agent.Snapshot, apiKey str
return nil
}

func (c *RPCComm) StreamMetrics(ctx context.Context, metrics *agent.StreamMetricsMessage, apiKey string) error {
ctx = dlog.WithField(ctx, "agent", "streammetrics")

c.metricsStreamWriterMutex.Lock()
defer c.metricsStreamWriterMutex.Unlock()
ctx = metadata.AppendToOutgoingContext(ctx, APIKeyMetadataKey, apiKey)
streamClient, err := c.client.StreamMetrics(ctx)
if err != nil {
return err
}
return streamClient.Send(metrics)
}

func (c *RPCComm) Directives() <-chan *agent.Directive {
return c.directives
}
56 changes: 56 additions & 0 deletions pkg/agent/envoy_metrics_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package agent

import (
"context"
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v2"
"github.com/datawire/dlib/dlog"
"google.golang.org/grpc"
"io"
"net"
)

type streamHandler func(in *envoyMetrics.StreamMetricsMessage)

type metricsServer struct {
envoyMetrics.MetricsServiceServer
handler streamHandler
}

// NewMetricsServer is the main metricsServer constructor.
func NewMetricsServer(handler streamHandler) *metricsServer {
return &metricsServer{
handler: handler,
}
}

// StartServer will start the metrics gRPC server, listening on :8123
// It is a blocking call until grpcServer.Serve returns.
func (s *metricsServer) StartServer(ctx context.Context) error {
grpcServer := grpc.NewServer()
envoyMetrics.RegisterMetricsServiceServer(grpcServer, s)

listener, err := net.Listen("tcp", ":8123")
kflynn marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
dlog.Errorf(ctx, "metrics service failed to listen: %v", err)
}

dlog.Infof(ctx, "metrics service listening on %s", listener.Addr().String())
return grpcServer.Serve(listener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use github.com/datawire/dlib/dhttp, rather than the google.golang.org/grpc HTTP server. (For an example of how to do this, see the cmd/example-envoy-metrics-sink/ which this PR also edits.)

}

// StreamMetrics implements the StreamMetrics rpc call by calling the stream handler on each
// message received.
func (s *metricsServer) StreamMetrics(stream envoyMetrics.MetricsService_StreamMetricsServer) error {
ctx := stream.Context()
dlog.Debug(ctx, "started stream")
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
s.handler(in)
}
}