Skip to content

Commit

Permalink
Add disk usage metric for value store (#1139)
Browse files Browse the repository at this point in the history
* Add disk usage metric for value store

Maintain a disk usage metric, percent usage, for the file system located at the location of the value store.

This addresses #119 and #484

* test speedup

* Remove useless metrics update signal

Ingestion metrics are updated periodically if an update has been signaled. If the indexer has any activity, update is signaled. So in almost all cases the update is signaled. If the indexer has no activity, there is no point in avoiding the update because the indexer has nothing better to do and then the metrics should still be updated as they may be affected by other external factors like fs usage or storage compacting.

So, remove the update signaling since it does not help anything, and only causes unnecessary context switches. Simply update the metrics periodically.
  • Loading branch information
gammazero authored Jan 13, 2023
1 parent 0f2948d commit 203f381
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 38 deletions.
4 changes: 1 addition & 3 deletions assigner/core/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,10 @@ func NewAssigner(ctx context.Context, cfg config.Assignment, p2pHost host.Host)
replication = len(indexerPool)
}

httpClient := &http.Client{}

a := &Assigner{
assigned: make(map[peer.ID]*assignment),
indexerPool: indexerPool,
httpClient: httpClient,
httpClient: &http.Client{},
p2pHost: p2pHost,
policy: policy,
pollDone: make(chan struct{}),
Expand Down
3 changes: 2 additions & 1 deletion dagsync/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func TestConcurrentSync(t *testing.T) {
func TestSync(t *testing.T) {
err := quick.Check(func(dpsb dagsyncPubSubBuilder, ll llBuilder) bool {
return t.Run("Quickcheck", func(t *testing.T) {
t.Parallel()
pubSys := newHostSystem(t)
subSys := newHostSystem(t)
defer pubSys.close()
Expand Down Expand Up @@ -291,6 +292,7 @@ func TestSync(t *testing.T) {
func TestSyncWithHydratedDataStore(t *testing.T) {
err := quick.Check(func(dpsb dagsyncPubSubBuilder, ll llBuilder) bool {
return t.Run("Quickcheck", func(t *testing.T) {
t.Parallel()
pubPrivKey, _, err := crypto.GenerateEd25519Key(cryptorand.Reader)
require.NoError(t, err)

Expand Down Expand Up @@ -789,7 +791,6 @@ func (b dagsyncPubSubBuilder) Build(t *testing.T, topicName string, pubSys hostS
pub, err = dtsync.NewPublisher(pubSys.host, pubSys.ds, pubSys.lsys, topicName, dtsync.WithAnnounceSenders(p2pSender))
require.NoError(t, err)
pubAddr = pubSys.host.Addrs()[0]
test.WaitForPublisher(pubSys.host, topicName, subSys.host.ID())
}
require.NoError(t, err)
sub, err := dagsync.NewSubscriber(subSys.host, subSys.ds, subSys.lsys, topicName, nil, subOpts...)
Expand Down
63 changes: 31 additions & 32 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
// adProcessedFrozenPrefix identifies all advertisements processed while in
// frozen mode. Used for unfreezing.
adProcessedFrozenPrefix = "/adF/"
// metricsUpdateInterva determines how ofter to update ingestion metrics.
metricsUpdateInterval = time.Minute
)

type adProcessedEvent struct {
Expand Down Expand Up @@ -91,7 +93,6 @@ type Ingester struct {

batchSize uint32
closeOnce sync.Once
sigUpdate chan struct{}

sub *dagsync.Subscriber
syncTimeout time.Duration
Expand Down Expand Up @@ -157,7 +158,6 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re
lsys: mkLinkSystem(ds, reg),
indexer: idxr,
batchSize: uint32(cfg.StoreBatchSize),
sigUpdate: make(chan struct{}, 1),
syncTimeout: time.Duration(cfg.SyncTimeout),
entriesSel: Selectors.EntriesWithLimit(recursionLimit(cfg.EntriesDepthLimit)),
reg: reg,
Expand Down Expand Up @@ -474,7 +474,6 @@ func (ing *Ingester) Sync(ctx context.Context, peerID peer.ID, peerAddr multiadd
// future adProcessedEvents. Therefore check the headAdCid to
// see if this was the sync that was started.
if adProcessedEvent.headAdCid == c {
ing.signalMetricsUpdate()
return cid.Undef, adProcessedEvent.err
}
} else if adProcessedEvent.adCid == c {
Expand Down Expand Up @@ -673,43 +672,43 @@ func (ing *Ingester) onAdProcessed(peerID peer.ID) (<-chan adProcessedEvent, con
return ch, cncl
}

// signalMetricsUpdate signals that metrics should be updated.
func (ing *Ingester) signalMetricsUpdate() {
select {
case ing.sigUpdate <- struct{}{}:
default:
// Already signaled
}
}

// metricsUpdate periodically updates metrics. This goroutine exits when the
// sigUpdate channel is closed, when Close is called.
// metricsUpdate periodically updates metrics. This goroutine exits when
// canceling pending syncs, when Close is called.
func (ing *Ingester) metricsUpdater() {
hasUpdate := true
t := time.NewTimer(time.Minute)
t := time.NewTimer(metricsUpdateInterval)

for {
select {
case <-ing.sigUpdate:
hasUpdate = true
case <-t.C:
if hasUpdate {
// Update value store size metric after sync.
size, err := ing.indexer.Size()
// Update value store size metric after sync.
size, err := ing.indexer.Size()
if err != nil {
log.Errorw("Error getting indexer value store size", "err", err)
}
var usage float64
usageStats, err := ing.reg.ValueStoreUsage()
if err != nil {
log.Errorw("Error getting disk usage", "err", err)
} else {
usage = usageStats.Percent
}

if ing.indexCounts != nil {
indexCount, err := ing.indexCounts.Total()
if err != nil {
log.Errorw("Error getting indexer value store size", "err", err)
return
log.Errorw("Error getting index counts", "err", err)
}
if ing.indexCounts != nil {
indexCount, err := ing.indexCounts.Total()
if err != nil {
log.Errorw("Error getting index counts", "err", err)
}
stats.Record(context.Background(), coremetrics.StoreSize.M(size), metrics.IndexCount.M(int64(indexCount)))
}
hasUpdate = false
stats.Record(context.Background(),
coremetrics.StoreSize.M(size),
metrics.IndexCount.M(int64(indexCount)),
metrics.PercentUsage.M(usage))
} else {
stats.Record(context.Background(),
coremetrics.StoreSize.M(size),
metrics.PercentUsage.M(usage))
}
t.Reset(time.Minute)

t.Reset(metricsUpdateInterval)
case <-ing.closePendingSyncs:
// If closing pending syncs, then close metrics updater as well.
t.Stop()
Expand Down
2 changes: 0 additions & 2 deletions internal/ingest/linksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ func (ing *Ingester) ingestAd(publisherID peer.ID, adCid cid.Cid, ad schema.Adve
ing.indexCounts.AddCount(providerID, ad.ContextID, uint64(mhCount))
}
}
ing.signalMetricsUpdate()

if len(errsIngestingEntryChunks) > 0 {
return adIngestError{adIngestEntryChunkErr, fmt.Errorf("failed to ingest entry chunks: %v", errsIngestingEntryChunks)}
Expand Down Expand Up @@ -536,7 +535,6 @@ func (ing *Ingester) ingestEntryChunk(ctx context.Context, ad schema.Advertiseme
return fmt.Errorf("failed processing entries for advertisement: %w", err)
}

ing.signalMetricsUpdate()
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions internal/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
EntriesSyncLatency = stats.Float64("ingest/entriessynclatency", "How long it took to sync an Ad's entries", stats.UnitMilliseconds)
MhStoreNanoseconds = stats.Int64("ingest/mhstorenanoseconds", "Average nanoseconds to store one multihash", stats.UnitDimensionless)
IndexCount = stats.Int64("provider/indexCount", "Number of indexes stored for all providers", stats.UnitDimensionless)
PercentUsage = stats.Float64("ingest/percentusage", "Percent usage of storage available in value store", stats.UnitDimensionless)
)

// Views
Expand Down Expand Up @@ -99,6 +100,10 @@ var (
Measure: IndexCount,
Aggregation: view.LastValue(),
}
percentUsageView = &view.View{
Measure: PercentUsage,
Aggregation: view.LastValue(),
}
)

var log = logging.Logger("indexer/metrics")
Expand All @@ -121,6 +126,7 @@ func Start(views []*view.View) http.Handler {
adLoadError,
mhStoreNanosecondsView,
indexCountView,
percentUsageView,
)
if err != nil {
log.Errorf("cannot register metrics default views: %s", err)
Expand Down

0 comments on commit 203f381

Please sign in to comment.