Skip to content

Commit

Permalink
Remove useless metrics update signal
Browse files Browse the repository at this point in the history
Ingestion metrics are updated periodically if an update has been signaled. If the indexer has any activity, update is signaled. So in almost all cases the update is signaled. If the indexer has no activity, there is no point in avoiding the update because the indexer has nothing better to do and then the metrics should still be updated as they may be affected by other external factors like fs usage or storage compacting.

So, remove the update signaling since it does not help anything, and only causes unnecessary context switches. Simply update the metrics periodically.
  • Loading branch information
gammazero committed Jan 13, 2023
1 parent 2410192 commit 4a4fc38
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 20 deletions.
24 changes: 6 additions & 18 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
// adProcessedFrozenPrefix identifies all advertisements processed while in
// frozen mode. Used for unfreezing.
adProcessedFrozenPrefix = "/adF/"
// metricsUpdateInterva determines how ofter to update ingestion metrics.
metricsUpdateInterval = time.Minute
)

type adProcessedEvent struct {
Expand Down Expand Up @@ -91,7 +93,6 @@ type Ingester struct {

batchSize uint32
closeOnce sync.Once
sigUpdate chan struct{}

sub *dagsync.Subscriber
syncTimeout time.Duration
Expand Down Expand Up @@ -157,7 +158,6 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re
lsys: mkLinkSystem(ds, reg),
indexer: idxr,
batchSize: uint32(cfg.StoreBatchSize),
sigUpdate: make(chan struct{}, 1),
syncTimeout: time.Duration(cfg.SyncTimeout),
entriesSel: Selectors.EntriesWithLimit(recursionLimit(cfg.EntriesDepthLimit)),
reg: reg,
Expand Down Expand Up @@ -474,7 +474,6 @@ func (ing *Ingester) Sync(ctx context.Context, peerID peer.ID, peerAddr multiadd
// future adProcessedEvents. Therefore check the headAdCid to
// see if this was the sync that was started.
if adProcessedEvent.headAdCid == c {
ing.signalMetricsUpdate()
return cid.Undef, adProcessedEvent.err
}
} else if adProcessedEvent.adCid == c {
Expand Down Expand Up @@ -673,25 +672,14 @@ func (ing *Ingester) onAdProcessed(peerID peer.ID) (<-chan adProcessedEvent, con
return ch, cncl
}

// signalMetricsUpdate signals that metrics should be updated.
func (ing *Ingester) signalMetricsUpdate() {
select {
case ing.sigUpdate <- struct{}{}:
default:
// Already signaled
}
}

// metricsUpdate periodically updates metrics. This goroutine exits when the
// sigUpdate channel is closed, when Close is called.
// metricsUpdate periodically updates metrics. This goroutine exits when
// canceling pending syncs, when Close is called.
func (ing *Ingester) metricsUpdater() {
hasUpdate := true
t := time.NewTimer(time.Minute)
t := time.NewTimer(metricsUpdateInterval)

for {
select {
case <-ing.sigUpdate:
hasUpdate = true
case <-t.C:
if hasUpdate {
// Update value store size metric after sync.
Expand Down Expand Up @@ -719,7 +707,7 @@ func (ing *Ingester) metricsUpdater() {
}
hasUpdate = false
}
t.Reset(time.Minute)
t.Reset(metricsUpdateInterval)
case <-ing.closePendingSyncs:
// If closing pending syncs, then close metrics updater as well.
t.Stop()
Expand Down
2 changes: 0 additions & 2 deletions internal/ingest/linksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ func (ing *Ingester) ingestAd(publisherID peer.ID, adCid cid.Cid, ad schema.Adve
ing.indexCounts.AddCount(providerID, ad.ContextID, uint64(mhCount))
}
}
ing.signalMetricsUpdate()

if len(errsIngestingEntryChunks) > 0 {
return adIngestError{adIngestEntryChunkErr, fmt.Errorf("failed to ingest entry chunks: %v", errsIngestingEntryChunks)}
Expand Down Expand Up @@ -536,7 +535,6 @@ func (ing *Ingester) ingestEntryChunk(ctx context.Context, ad schema.Advertiseme
return fmt.Errorf("failed processing entries for advertisement: %w", err)
}

ing.signalMetricsUpdate()
return nil
}

Expand Down

0 comments on commit 4a4fc38

Please sign in to comment.