Skip to content

Commit

Permalink
feat(inputs.gnmi): add connection selfstat status before creating client
Browse files Browse the repository at this point in the history
Now selfstat does not report status of connection if setup of subscription fails.
This makes harder to check all hosts statuses, when host count per telegraf is high.
  • Loading branch information
Mrflatt committed Nov 11, 2024
1 parent 9565421 commit 4e23111
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
9 changes: 8 additions & 1 deletion plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "embed"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -272,8 +273,14 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
go func(addr string) {
defer c.wg.Done()

host, port, err := net.SplitHostPort(addr)
if err != nil {
acc.AddError(fmt.Errorf("unable to parse address %s: %w", addr, err))
return
}
h := handler{
address: addr,
host: host,
port: port,
aliases: c.internalAliases,
tagsubs: c.TagSubscriptions,
maxMsgSize: int(c.MaxMsgSize),
Expand Down
28 changes: 13 additions & 15 deletions plugins/inputs/gnmi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
const eidJuniperTelemetryHeader = 1

type handler struct {
address string
host string
port string
aliases map[*pathInfo]string
tagsubs []tagSubscription
maxMsgSize int
Expand Down Expand Up @@ -72,7 +73,14 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
opts = append(opts, grpc.WithKeepaliveParams(h.ClientParameters))
}

client, err := grpc.NewClient(h.address, opts...)
// Used to report the status of the TCP connection to the device. If the
// GNMI connection goes down, but TCP is still up this will still report
// connected until the TCP connection times out.
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.host})
connectStat.Set(0)

address := net.JoinHostPort(h.host, h.port)
client, err := grpc.NewClient(address, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
Expand All @@ -88,16 +96,10 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
if err := subscribeClient.Send(request); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send subscription request: %w", err)
}

h.log.Debugf("Connection to gNMI device %s established", h.address)

// Used to report the status of the TCP connection to the device. If the
// GNMI connection goes down, but TCP is still up this will still report
// connected until the TCP connection times out.
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.address})
connectStat.Set(1)
h.log.Debugf("Connection to gNMI device %s established", address)

defer h.log.Debugf("Connection to gNMI device %s closed", h.address)
defer h.log.Debugf("Connection to gNMI device %s closed", address)
for ctx.Err() == nil {
var reply *gnmi.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil {
Expand Down Expand Up @@ -164,11 +166,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
prefix := newInfoFromPath(response.Update.Prefix)

// Add info to the tags
var err error
headerTags["source"], _, err = net.SplitHostPort(h.address)
if err != nil {
h.log.Errorf("unable to parse address %s: %v", h.address, err)
}
headerTags["source"] = h.host
if !prefix.empty() {
headerTags["path"] = prefix.fullPath()
}
Expand Down

0 comments on commit 4e23111

Please sign in to comment.