Skip to content

Commit

Permalink
Add more poller metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kegsay committed Jul 7, 2023
1 parent 365ed4c commit e67ba9a
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions sync2/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ type V2DataReceiver interface {

// PollerMap is a map of device ID to Poller
type PollerMap struct {
v2Client Client
callbacks V2DataReceiver
pollerMu *sync.Mutex
Pollers map[PollerID]*poller
executor chan func()
executorRunning bool
processHistogramVec *prometheus.HistogramVec
timelineSizeHistogramVec *prometheus.HistogramVec
v2Client Client
callbacks V2DataReceiver
pollerMu *sync.Mutex
Pollers map[PollerID]*poller
executor chan func()
executorRunning bool
processHistogramVec *prometheus.HistogramVec
timelineSizeHistogramVec *prometheus.HistogramVec
numOutstandingSyncReqsGauge prometheus.Gauge
totalNumPollsCounter prometheus.Counter
}

// NewPollerMap makes a new PollerMap. Guarantees that the V2DataReceiver will be called on the same
Expand Down Expand Up @@ -119,7 +121,20 @@ func NewPollerMap(v2Client Client, enablePrometheus bool) *PollerMap {
Buckets: []float64{0.0, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0},
}, []string{"limited"})
prometheus.MustRegister(pm.timelineSizeHistogramVec)

pm.totalNumPollsCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "sliding_sync",
Subsystem: "poller",
Name: "total_num_polls",
Help: "Total number of poll loops iterated.",
})
prometheus.MustRegister(pm.totalNumPollsCounter)
pm.numOutstandingSyncReqsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "sliding_sync",
Subsystem: "poller",
Name: "num_outstanding_sync_v2_reqs",
Help: "Number of sync v2 requests that have yet to return a response.",
})
prometheus.MustRegister(pm.numOutstandingSyncReqsGauge)
}
return pm
}
Expand Down Expand Up @@ -200,6 +215,8 @@ func (h *PollerMap) EnsurePolling(pid PollerID, accessToken, v2since string, isS
poller = newPoller(pid, accessToken, h.v2Client, h, logger, !needToWait && !isStartup)
poller.processHistogramVec = h.processHistogramVec
poller.timelineSizeVec = h.timelineSizeHistogramVec
poller.numOutstandingSyncReqs = h.numOutstandingSyncReqsGauge
poller.totalNumPolls = h.totalNumPollsCounter
go poller.Poll(v2since)
h.Pollers[pid] = poller

Expand Down Expand Up @@ -351,9 +368,11 @@ type poller struct {
totalChangedDeviceLists int
totalLeftDeviceLists int

pollHistogramVec *prometheus.HistogramVec
processHistogramVec *prometheus.HistogramVec
timelineSizeVec *prometheus.HistogramVec
pollHistogramVec *prometheus.HistogramVec
processHistogramVec *prometheus.HistogramVec
timelineSizeVec *prometheus.HistogramVec
numOutstandingSyncReqs prometheus.Gauge
totalNumPolls prometheus.Counter
}

func newPoller(pid PollerID, accessToken string, client Client, receiver V2DataReceiver, logger zerolog.Logger, initialToDeviceOnly bool) *poller {
Expand Down Expand Up @@ -391,6 +410,9 @@ type pollLoopState struct {
// Returns if the access token gets invalidated or if there was a fatal error processing v2 responses.
// Use WaitUntilInitialSync() to wait until the first poll has been processed.
func (p *poller) Poll(since string) {
if p.totalNumPolls != nil {
p.totalNumPolls.Inc()
}
// Basing the sentry-wrangling on the sentry-go net/http integration, see e.g.
// https://github.com/getsentry/sentry-go/blob/02e712a638c40cd9701ad52d5d1309d65d556ef9/http/sentryhttp.go#L84
// TODO is this the correct way to create hub? Should the cloning be done by the
Expand Down Expand Up @@ -452,7 +474,13 @@ func (p *poller) poll(ctx context.Context, s *pollLoopState) error {
}
start := time.Now()
spanCtx, region := internal.StartSpan(ctx, "DoSyncV2")
if p.numOutstandingSyncReqs != nil {
p.numOutstandingSyncReqs.Inc()
}
resp, statusCode, err := p.client.DoSyncV2(spanCtx, p.accessToken, s.since, s.firstTime, p.initialToDeviceOnly)
if p.numOutstandingSyncReqs != nil {
p.numOutstandingSyncReqs.Dec()
}
region.End()
p.trackRequestDuration(time.Since(start), s.since == "", s.firstTime)
if p.terminated.Load() {
Expand Down

0 comments on commit e67ba9a

Please sign in to comment.