-
Notifications
You must be signed in to change notification settings - Fork 22
/
ingest.go
1179 lines (1045 loc) · 37.2 KB
/
ingest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package ingest
import (
"context"
"errors"
"fmt"
"net/http"
"path"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-retryablehttp"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
indexer "github.com/ipni/go-indexer-core"
coremetrics "github.com/ipni/go-indexer-core/metrics"
"github.com/ipni/storetheindex/api/v0/ingest/schema"
"github.com/ipni/storetheindex/config"
"github.com/ipni/storetheindex/dagsync"
"github.com/ipni/storetheindex/internal/counter"
"github.com/ipni/storetheindex/internal/metrics"
"github.com/ipni/storetheindex/internal/registry"
"github.com/ipni/storetheindex/peerutil"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/time/rate"
)
var log = logging.Logger("indexer/ingest")
// prefix used to track latest sync in datastore.
const (
// syncPrefix identifies the latest sync for each provider.
syncPrefix = "/sync/"
// adProcessedPrefix identifies all processed advertisements.
adProcessedPrefix = "/adProcessed/"
// adProcessedFrozenPrefix identifies all advertisements processed while in
// frozen mode. Used for unfreezing.
adProcessedFrozenPrefix = "/adF/"
// metricsUpdateInterva determines how ofter to update ingestion metrics.
metricsUpdateInterval = time.Minute
)
type adProcessedEvent struct {
publisher peer.ID
// Head of the chain being processed.
headAdCid cid.Cid
// Actual adCid being processing.
adCid cid.Cid
// A non-nil value indicates failure to process the ad for adCid.
err error
}
type providerID peer.ID
// pendingAnnounce captures an announcement received from a provider that await processing.
type pendingAnnounce struct {
addrInfo peer.AddrInfo
nextCid cid.Cid
}
type adInfo struct {
cid cid.Cid
ad schema.Advertisement
resync bool
}
type workerAssignment struct {
// none represents a nil assignment. Used because a nil in atomic.Value
// cannot be stored.
none bool
adInfos []adInfo
publisher peer.ID
provider peer.ID
}
// Ingester is a type that uses dagsync for the ingestion protocol.
type Ingester struct {
host host.Host
ds datastore.Batching
lsys ipld.LinkSystem
indexer indexer.Interface
batchSize uint32
closeOnce sync.Once
sub *dagsync.Subscriber
syncTimeout time.Duration
entriesSel datamodel.Node
reg *registry.Registry
// inEvents is used to send a adProcessedEvent to the distributeEvents
// goroutine, when an advertisement in marked complete or err'd.
inEvents chan adProcessedEvent
// outEventsChans is a slice of channels, where each channel delivers a
// copy of an adProcessedEvent to an onAdProcessed reader.
outEventsChans map[peer.ID][]chan adProcessedEvent
outEventsMutex sync.Mutex
waitForPendingSyncs sync.WaitGroup
closePendingSyncs chan struct{}
cancelWorkers context.CancelFunc
workersCtx context.Context
cancelOnSyncFinished context.CancelFunc
// A map of providers currently being processed. A worker holds the lock of
// a provider while ingesting ads for that provider.
providersBeingProcessed map[peer.ID]chan struct{}
providersBeingProcessedMu sync.Mutex
providerAdChainStaging map[peer.ID]*atomic.Value
closeWorkers chan struct{}
// toStaging receives sync finished events used to call to runIngestStep.
toStaging <-chan dagsync.SyncFinished
// toWorkers is used to ask the worker pool to start processing the ad
// chain for a given provider.
toWorkers *Queue
waitForWorkers sync.WaitGroup
workerPoolSize int
activeWorkers int32
// RateLimiting
rateApply peerutil.Policy
rateBurst int
// providersPendingAnnounce maps the provider ID to the latest announcement received from the
// provider that is waiting to be processed.
providersPendingAnnounce sync.Map
rateLimit rate.Limit
rateMutex sync.Mutex
// Multihash minimum length
minKeyLen int
indexCounts *counter.IndexCounts
}
// NewIngester creates a new Ingester that uses a dagsync Subscriber to handle
// communication with providers.
func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *registry.Registry, ds datastore.Batching, idxCounts *counter.IndexCounts) (*Ingester, error) {
ing := &Ingester{
host: h,
ds: ds,
lsys: mkLinkSystem(ds, reg),
indexer: idxr,
batchSize: uint32(cfg.StoreBatchSize),
syncTimeout: time.Duration(cfg.SyncTimeout),
entriesSel: Selectors.EntriesWithLimit(recursionLimit(cfg.EntriesDepthLimit)),
reg: reg,
inEvents: make(chan adProcessedEvent, 1),
closePendingSyncs: make(chan struct{}),
providersBeingProcessed: make(map[peer.ID]chan struct{}),
providerAdChainStaging: make(map[peer.ID]*atomic.Value),
toWorkers: NewPriorityQueue(),
closeWorkers: make(chan struct{}),
minKeyLen: cfg.MinimumKeyLength,
indexCounts: idxCounts,
}
var err error
ing.rateApply, ing.rateBurst, ing.rateLimit, err = configRateLimit(cfg.RateLimit)
if err != nil {
log.Error(err.Error())
}
// Instantiate retryable HTTP client used by dagsync httpsync.
rclient := &retryablehttp.Client{
HTTPClient: &http.Client{
Timeout: time.Duration(cfg.HttpSyncTimeout),
},
RetryWaitMin: time.Duration(cfg.HttpSyncRetryWaitMin),
RetryWaitMax: time.Duration(cfg.HttpSyncRetryWaitMax),
RetryMax: cfg.HttpSyncRetryMax,
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}
// Create and start pubsub subscriber. This also registers the storage hook
// to index data as it is received.
sub, err := dagsync.NewSubscriber(h, ds, ing.lsys, cfg.PubSubTopic, Selectors.AdSequence,
dagsync.AllowPeer(reg.Allowed),
dagsync.FilterIPs(reg.FilterIPsEnabled()),
dagsync.SyncRecursionLimit(recursionLimit(cfg.AdvertisementDepthLimit)),
dagsync.UseLatestSyncHandler(&syncHandler{ing}),
dagsync.RateLimiter(ing.getRateLimiter),
dagsync.SegmentDepthLimit(int64(cfg.SyncSegmentDepthLimit)),
dagsync.HttpClient(rclient.StandardClient()),
dagsync.BlockHook(ing.generalDagsyncBlockHook),
dagsync.ResendAnnounce(cfg.ResendDirectAnnounce),
)
if err != nil {
log.Errorw("Failed to start pubsub subscriber", "err", err)
return nil, errors.New("ingester subscriber failed")
}
ing.sub = sub
ing.toStaging, ing.cancelOnSyncFinished = ing.sub.OnSyncFinished()
if cfg.IngestWorkerCount == 0 {
return nil, errors.New("ingester worker count must be > 0")
}
ing.workersCtx, ing.cancelWorkers = context.WithCancel(context.Background())
ing.RunWorkers(cfg.IngestWorkerCount)
go ing.runIngesterLoop()
// Start distributor to send SyncFinished messages to interested parties.
go ing.distributeEvents()
go ing.metricsUpdater()
go ing.autoSync()
log.Debugf("Ingester started and all hooks and linksystem registered")
return ing, nil
}
func (ing *Ingester) generalDagsyncBlockHook(_ peer.ID, c cid.Cid, actions dagsync.SegmentSyncActions) {
// The only kind of block we should get by loading CIDs here should be Advertisement.
// Because:
// - the default subscription selector only selects advertisements.
// - explicit Ingester.Sync only selects advertisement.
// - entries are synced with an explicit selector separate from advertisement syncs and
// should use dagsync.ScopedBlockHook to override this hook and decode chunks
// instead.
//
// Therefore, we only attempt to load advertisements here and signal failure if the
// load fails.
if ad, err := ing.loadAd(c); err != nil {
actions.FailSync(err)
} else if ad.PreviousID != nil {
actions.SetNextSyncCid(ad.PreviousID.(cidlink.Link).Cid)
} else {
actions.SetNextSyncCid(cid.Undef)
}
}
func (ing *Ingester) getRateLimiter(publisher peer.ID) *rate.Limiter {
ing.rateMutex.Lock()
defer ing.rateMutex.Unlock()
// If rateLimiting disabled or publisher is not rate-limited, then return
// infinite rate limiter.
if ing.rateLimit == 0 || !ing.rateApply.Eval(publisher) {
return rate.NewLimiter(rate.Inf, 0)
}
// Return rate limiter with rate setting from config.
return rate.NewLimiter(ing.rateLimit, ing.rateBurst)
}
func (ing *Ingester) Close() error {
// Tell workers to stop ingestion in progress.
ing.cancelWorkers()
// Close dagsync transport.
err := ing.sub.Close()
log.Info("dagsync subscriber stopped")
// Dismiss any event readers.
ing.outEventsMutex.Lock()
for _, chans := range ing.outEventsChans {
for _, ch := range chans {
close(ch)
}
}
ing.outEventsChans = nil
ing.outEventsMutex.Unlock()
ing.closeOnce.Do(func() {
ing.cancelOnSyncFinished()
close(ing.closeWorkers)
ing.waitForWorkers.Wait()
ing.toWorkers.Close()
log.Info("Workers stopped")
close(ing.closePendingSyncs)
ing.waitForPendingSyncs.Wait()
log.Info("Pending sync processing stopped")
// Stop the distribution goroutine.
close(ing.inEvents)
log.Info("Ingester stopped")
})
return err
}
func Unfreeze(unfrozen map[peer.ID]cid.Cid, dstore datastore.Datastore) error {
// Unfreeze is not cancelable since unfrozen data can only be acquired once
// from the registry.
ctx := context.Background()
// Remove all ads processed while frozen.
err := removeProcessedFrozen(ctx, dstore)
if err != nil {
return fmt.Errorf("cannot remove processed ads: %w", err)
}
for pubID, frozenAt := range unfrozen {
// If there was no previous frozen ad, then there was no latest ad
// when the indexer was frozen.
if frozenAt == cid.Undef {
err = dstore.Delete(ctx, datastore.NewKey(syncPrefix+pubID.String()))
if err != nil {
log.Errorw("Unfreeze cannot delete last processed ad", "err", err)
}
continue
}
// Set last processed ad to previous frozen at ad.
err = dstore.Put(ctx, datastore.NewKey(syncPrefix+pubID.String()), frozenAt.Bytes())
if err != nil {
log.Errorw("Unfreeze cannot set advertisement as last processed", "err", err)
}
}
return nil
}
func removeProcessedFrozen(ctx context.Context, dstore datastore.Datastore) error {
q := query.Query{
Prefix: adProcessedFrozenPrefix,
KeysOnly: true,
}
results, err := dstore.Query(ctx, q)
if err != nil {
return err
}
ents, err := results.Rest()
if err != nil {
return err
}
for i := range ents {
key := ents[i].Key
err = dstore.Delete(ctx, datastore.NewKey(key))
if err != nil {
return err
}
err = dstore.Delete(ctx, datastore.NewKey(adProcessedPrefix+path.Base(key)))
if err != nil {
return err
}
}
return nil
}
// Sync syncs advertisements, up to the the latest advertisement, from a
// publisher. This channel returns the final CID that was synced by the call to
// Sync.
//
// Sync works by first fetching each advertisement from the specified peer
// starting at the most recent and traversing to the advertisement last seen by
// the indexer, or until the advertisement depth limit is reached. Then the
// entries in each advertisement are synced and the multihashes in each entry
// are indexed.
//
// The selector used to sync the advertisement is controlled by the following
// parameters: depth, and resync.
//
// The depth argument specifies the recursion depth limit to use during sync.
// Its value may less than -1 for no limit, 0 to use the indexer's configured
// value, or greater than 1 for an explicit limit.
//
// The resync argument specifies whether to stop the traversal at the latest
// known advertisement that is already synced. If set to true, the traversal
// will continue until either there are no more advertisements left or the
// recursion depth limit is reached.
//
// The reference to the latest synced advertisement returned by GetLatestSync
// is only updated if the given depth is zero and resync is set to
// false. Otherwise, a custom selector with the given depth limit and stop link
// is constructed and used for traversal. See dagsync.Subscriber.Sync.
//
// The Context argument controls the lifetime of the sync. Canceling it cancels
// the sync and causes the multihash channel to close without any data.
func (ing *Ingester) Sync(ctx context.Context, peerID peer.ID, peerAddr multiaddr.Multiaddr, depth int, resync bool) (cid.Cid, error) {
err := peerID.Validate()
if err != nil {
return cid.Undef, errors.New("invalid provider id")
}
log := log.With("publisher", peerID, "address", peerAddr, "depth", depth, "resync", resync)
log.Info("Explicitly syncing the latest advertisement from peer")
var sel ipld.Node
// If depth is non-zero or traversal should not stop at the latest synced,
// then construct a selector to behave accordingly.
if depth != 0 || resync {
sel, err = ing.makeLimitedDepthSelector(peerID, depth, resync)
if err != nil {
return cid.Undef, fmt.Errorf("failed to construct selector for explicit sync: %w", err)
}
}
syncDone, cancel := ing.onAdProcessed(peerID)
defer cancel()
latest, err := ing.GetLatestSync(peerID)
if err != nil {
return cid.Undef, fmt.Errorf("failed to get latest sync: %w", err)
}
// Start syncing. Notifications for the finished sync are sent
// asynchronously. Sync with cid.Undef so that the latest head is queried
// by dagsync via head-publisher.
//
// Note that if the selector is nil the default selector is used where
// traversal stops at the latest known head.
//
// Reference to the latest synced CID is only updated if the given selector
// is nil.
opts := []dagsync.SyncOption{
dagsync.AlwaysUpdateLatest(),
}
if resync {
// If this is a resync, then it is necessary to mark the ad as
// unprocessed so that everything can be reingested from the start of
// this sync. Create a scoped block-hook to do this.
opts = append(opts, dagsync.ScopedBlockHook(func(i peer.ID, c cid.Cid, actions dagsync.SegmentSyncActions) {
err := ing.markAdUnprocessed(c, true)
if err != nil {
log.Errorw("Failed to mark ad as unprocessed", "err", err, "adCid", c)
}
// Call the general hook because scoped block hook overrides the
// subscriber's general block hook.
ing.generalDagsyncBlockHook(i, c, actions)
}))
}
c, err := ing.sub.Sync(ctx, peerID, cid.Undef, sel, peerAddr, opts...)
if err != nil {
return cid.Undef, fmt.Errorf("failed to sync: %w", err)
}
// Do not persist the latest sync here, because that is done after
// processing the ad.
// If latest head had already finished syncing, then do not wait for
// syncDone since it will never happen.
if latest == c && !resync {
log.Infow("Latest advertisement already processed", "adCid", c)
return c, nil
}
log.Debugw("Syncing advertisements up to latest", "adCid", c)
for {
select {
case adProcessedEvent := <-syncDone:
log.Debugw("Synced advertisement", "adCid", adProcessedEvent.adCid)
if adProcessedEvent.err != nil {
// If an error occurred then the adProcessedEvent.adCid will be
// the cid that caused the error, and there will not be any
// future adProcessedEvents. Therefore check the headAdCid to
// see if this was the sync that was started.
if adProcessedEvent.headAdCid == c {
return cid.Undef, adProcessedEvent.err
}
} else if adProcessedEvent.adCid == c {
return c, nil
}
case <-ctx.Done():
return cid.Undef, ctx.Err()
case <-ing.closePendingSyncs:
// When shutting down the ingester, calls to Sync may return "sync
// closed" error, or this error may be returned first depending on
// goroutine scheduling.
return cid.Undef, errors.New("sync canceled: service closed")
}
}
}
// Announce send an announce message to directly to dagsync, instead of through
// pubsub.
func (ing *Ingester) Announce(ctx context.Context, nextCid cid.Cid, addrInfo peer.AddrInfo) error {
provider := addrInfo.ID
log := log.With("provider", provider, "cid", nextCid, "addrs", addrInfo.Addrs)
ing.providersBeingProcessedMu.Lock()
pc, ok := ing.providersBeingProcessed[provider]
if !ok {
pc = make(chan struct{}, 1)
ing.providersBeingProcessed[provider] = pc
}
ing.providersBeingProcessedMu.Unlock()
select {
case pc <- struct{}{}:
log.Info("Handling direct announce request")
err := ing.sub.Announce(ctx, nextCid, provider, addrInfo.Addrs)
<-pc
return err
case <-ctx.Done():
return ctx.Err()
default:
ing.providersPendingAnnounce.Store(provider, pendingAnnounce{
addrInfo: addrInfo,
nextCid: nextCid,
})
log.Info("Deferred handling direct announce request")
return nil
}
}
func (ing *Ingester) makeLimitedDepthSelector(peerID peer.ID, depth int, resync bool) (ipld.Node, error) {
// Consider the value of < 1 as no-limit.
rLimit := recursionLimit(depth)
log := log.With("depth", depth)
var stopAt ipld.Link
if !resync {
latest, err := ing.GetLatestSync(peerID)
if err != nil {
return nil, err
}
if latest != cid.Undef {
stopAt = cidlink.Link{Cid: latest}
}
}
// The stop link may be nil, in which case it is treated as no stop link.
// Log it regardless for debugging purposes.
log.Debugw("Custom selector constructed for explicit sync", "stopAt", stopAt)
return dagsync.ExploreRecursiveWithStopNode(rLimit, Selectors.AdSequence, stopAt), nil
}
// markAdUnprocessed takes an advertisement CID and marks it as unprocessed.
// This lets the ad be re-ingested in case re-ingesting with different depths
// or processing even earlier ads and need to reprocess later ones so that the
// indexer re-ingests the later ones in the context of the earlier ads, and
// thus become consistent.
//
// During a sync, this should be called be in order from newest to oldest
// ad. This is so that if an something fails to get marked as unprocessed the
// constraint is maintained that if an ad is processed, all older ads are also
// processed.
//
// When forResync is true, index counts will not be added to the existing index count.
func (ing *Ingester) markAdUnprocessed(adCid cid.Cid, forResync bool) error {
data := []byte{0}
if forResync {
data = []byte{2}
}
return ing.ds.Put(context.Background(), datastore.NewKey(adProcessedPrefix+adCid.String()), data)
}
func (ing *Ingester) adAlreadyProcessed(adCid cid.Cid) (bool, bool) {
v, err := ing.ds.Get(context.Background(), datastore.NewKey(adProcessedPrefix+adCid.String()))
if err != nil {
if err != datastore.ErrNotFound {
log.Errorw("Failed to read advertisement processed state from datastore", "err", err)
}
return false, false
}
processed := v[0] == byte(1)
resync := v[0] == byte(2)
return processed, resync
}
func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen bool) error {
cidStr := adCid.String()
ctx := context.Background()
if frozen {
err := ing.ds.Put(ctx, datastore.NewKey(adProcessedFrozenPrefix+cidStr), []byte{1})
if err != nil {
return err
}
}
err := ing.ds.Put(ctx, datastore.NewKey(adProcessedPrefix+cidStr), []byte{1})
if err != nil {
return err
}
// This ad is processed, so remove it from the datastore.
err = ing.ds.Delete(ctx, datastore.NewKey(cidStr))
if err != nil {
// Log the error, but do not return. Continue on to save the procesed ad.
log.Errorw("Cannot remove advertisement from datastore", "err", err)
}
return ing.ds.Put(ctx, datastore.NewKey(syncPrefix+publisher.String()), adCid.Bytes())
}
// distributeEvents reads a adProcessedEvent, sent by a peer handler, and
// copies the event to all channels in outEventsChans. This delivers the event
// to all onAdProcessed channel readers.
func (ing *Ingester) distributeEvents() {
for event := range ing.inEvents {
// Send update to all change notification channels.
ing.outEventsMutex.Lock()
outEventsChans, ok := ing.outEventsChans[event.publisher]
if ok {
for _, ch := range outEventsChans {
ch <- event
}
}
ing.outEventsMutex.Unlock()
}
}
// onAdProcessed creates a channel that receives notification when an
// advertisement and all of its content entries have finished syncing.
//
// Doing a manual sync will not always cause a notification if the requested
// advertisement has previously been processed.
//
// Calling the returned cancel function removes the notification channel from
// the list of channels to be notified on changes, and closes the channel to
// allow any reading goroutines to stop waiting on the channel.
func (ing *Ingester) onAdProcessed(peerID peer.ID) (<-chan adProcessedEvent, context.CancelFunc) {
// Channel is buffered to prevent distribute() from blocking if a reader is
// not reading the channel immediately.
ch := make(chan adProcessedEvent, 1)
ing.outEventsMutex.Lock()
defer ing.outEventsMutex.Unlock()
var outEventsChans []chan adProcessedEvent
if ing.outEventsChans == nil {
ing.outEventsChans = make(map[peer.ID][]chan adProcessedEvent)
} else {
outEventsChans = ing.outEventsChans[peerID]
}
ing.outEventsChans[peerID] = append(outEventsChans, ch)
cncl := func() {
ing.outEventsMutex.Lock()
defer ing.outEventsMutex.Unlock()
outEventsChans, ok := ing.outEventsChans[peerID]
if !ok {
return
}
for i, ca := range outEventsChans {
if ca == ch {
if len(outEventsChans) == 1 {
if len(ing.outEventsChans) == 1 {
ing.outEventsChans = nil
} else {
delete(ing.outEventsChans, peerID)
}
} else {
outEventsChans[i] = outEventsChans[len(outEventsChans)-1]
outEventsChans[len(outEventsChans)-1] = nil
outEventsChans = outEventsChans[:len(outEventsChans)-1]
close(ch)
ing.outEventsChans[peerID] = outEventsChans
}
break
}
}
}
return ch, cncl
}
// metricsUpdate periodically updates metrics. This goroutine exits when
// canceling pending syncs, when Close is called.
func (ing *Ingester) metricsUpdater() {
t := time.NewTimer(metricsUpdateInterval)
for {
select {
case <-t.C:
// Update value store size metric after sync.
size, err := ing.indexer.Size()
if err != nil {
log.Errorw("Error getting indexer value store size", "err", err)
}
var usage float64
usageStats, err := ing.reg.ValueStoreUsage()
if err != nil {
log.Errorw("Error getting disk usage", "err", err)
} else {
usage = usageStats.Percent
}
if ing.indexCounts != nil {
indexCount, err := ing.indexCounts.Total()
if err != nil {
log.Errorw("Error getting index counts", "err", err)
}
stats.Record(context.Background(),
coremetrics.StoreSize.M(size),
metrics.IndexCount.M(int64(indexCount)),
metrics.PercentUsage.M(usage))
} else {
stats.Record(context.Background(),
coremetrics.StoreSize.M(size),
metrics.PercentUsage.M(usage))
}
t.Reset(metricsUpdateInterval)
case <-ing.closePendingSyncs:
// If closing pending syncs, then close metrics updater as well.
t.Stop()
return
}
}
}
// removePublisher removes data for the identified publisher. This is done as
// part of removing a provider.
func (ing *Ingester) removePublisher(ctx context.Context, publisherID peer.ID) error {
if publisherID.Validate() != nil {
// Invalid publisher ID, registered provider never got a published ad.
return nil
}
ing.sub.RemoveHandler(publisherID)
err := ing.ds.Delete(ctx, datastore.NewKey(syncPrefix+publisherID.String()))
if err != nil {
return fmt.Errorf("could not remove latest sync for publisher %s: %w", publisherID, err)
}
return nil
}
func (ing *Ingester) autoSync() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var autoSyncMutex sync.Mutex
autoSyncInProgress := map[peer.ID]struct{}{}
for provInfo := range ing.reg.SyncChan() {
if provInfo.Deleted() {
if err := ing.removePublisher(ctx, provInfo.Publisher); err != nil {
log.Errorw("Error removing provider", "err", err, "provider", provInfo.AddrInfo.ID)
}
if ing.indexCounts != nil {
ing.indexCounts.RemoveProvider(provInfo.AddrInfo.ID)
}
// Do not remove provider info from core, because that requires
// scanning the entire core valuestore. Instead, let the finder
// delete provider contexts as deleted providers appear in find
// results.
continue
}
autoSyncMutex.Lock()
_, already := autoSyncInProgress[provInfo.AddrInfo.ID]
autoSyncMutex.Unlock()
if already {
log.Infow("Auto-sync already in progress", "provider", provInfo.AddrInfo.ID)
continue
}
autoSyncInProgress[provInfo.AddrInfo.ID] = struct{}{}
if stopCid := provInfo.StopCid(); stopCid != cid.Undef {
err := ing.markAdProcessed(provInfo.Publisher, stopCid, false)
if err != nil {
// This error would cause the ingestion of everything from the
// publisher from the handoff, and is a critical failure. It
// also means that the datastore is failing to store data, so
// likely nothing else is working.
log.Errorw("Failed to mark ad from handoff as processed", "err", err)
continue
}
}
// Attempt to sync the provider at its last know publisher, in a
// separate goroutine.
ing.waitForPendingSyncs.Add(1)
go func(pubID peer.ID, pubAddr multiaddr.Multiaddr, provID peer.ID) {
defer func() {
autoSyncMutex.Lock()
delete(autoSyncInProgress, provID)
autoSyncMutex.Unlock()
ing.waitForPendingSyncs.Done()
}()
log := log.With("provider", provID, "publisher", pubID, "addr", pubAddr)
log.Info("Auto-syncing the latest advertisement with publisher")
_, err := ing.sub.Sync(ctx, pubID, cid.Undef, nil, pubAddr)
if err != nil {
log.Errorw("Failed to auto-sync with publisher", "err", err)
return
}
ing.reg.Saw(provID)
}(provInfo.Publisher, provInfo.PublisherAddr, provInfo.AddrInfo.ID)
}
}
// Get the latest CID synced for the peer.
func (ing *Ingester) GetLatestSync(publisherID peer.ID) (cid.Cid, error) {
b, err := ing.ds.Get(context.Background(), datastore.NewKey(syncPrefix+publisherID.String()))
if err != nil {
if err == datastore.ErrNotFound {
return cid.Undef, nil
}
return cid.Undef, err
}
_, c, err := cid.CidFromBytes(b)
return c, err
}
func (ing *Ingester) BatchSize() int {
return int(atomic.LoadUint32(&ing.batchSize))
}
func (ing *Ingester) SetBatchSize(batchSize int) {
atomic.StoreUint32(&ing.batchSize, uint32(batchSize))
}
func (ing *Ingester) SetRateLimit(cfgRateLimit config.RateLimit) error {
apply, burst, limit, err := configRateLimit(cfgRateLimit)
if err != nil {
return err
}
ing.rateMutex.Lock()
ing.rateApply = apply
ing.rateBurst = burst
ing.rateLimit = limit
ing.rateMutex.Unlock()
return nil
}
func (ing *Ingester) RunWorkers(n int) {
for n > ing.workerPoolSize {
// Start worker.
ing.waitForWorkers.Add(1)
go ing.ingestWorker(ing.workersCtx)
ing.workerPoolSize++
}
for n < ing.workerPoolSize {
// Stop worker.
ing.closeWorkers <- struct{}{}
ing.workerPoolSize--
}
}
func (ing *Ingester) runIngesterLoop() {
for syncFinishedEvent := range ing.toStaging {
ing.runIngestStep(syncFinishedEvent)
}
}
func (ing *Ingester) runIngestStep(syncFinishedEvent dagsync.SyncFinished) {
log := log.With("publisher", syncFinishedEvent.PeerID)
// 1. Group the incoming CIDs by provider.
adsGroupedByProvider := map[peer.ID][]adInfo{}
for _, c := range syncFinishedEvent.SyncedCids {
// Group the CIDs by the provider. Most of the time a publisher will
// only publish Ads for one provider, but it's possible that an ad
// chain can include multiple providers.
processed, resync := ing.adAlreadyProcessed(c)
if processed {
// This ad has been processed so all earlier ads already have been
// processed.
break
}
ad, err := ing.loadAd(c)
if err != nil {
stats.Record(context.Background(), metrics.AdLoadError.M(1))
log.Errorw("Failed to load advertisement CID, skipping", "cid", c, "err", err)
continue
}
providerID, err := peer.Decode(ad.Provider)
if err != nil {
log.Errorf("Failed to get provider from ad CID: %s skipping", err)
continue
}
adsGroupedByProvider[providerID] = append(adsGroupedByProvider[providerID], adInfo{
cid: c,
ad: ad,
resync: resync,
})
}
// 2. For each provider put the ad stack to the worker msg channel.
for p, adInfos := range adsGroupedByProvider {
ing.providersBeingProcessedMu.Lock()
if _, ok := ing.providersBeingProcessed[p]; !ok {
ing.providersBeingProcessed[p] = make(chan struct{}, 1)
}
wa, ok := ing.providerAdChainStaging[p]
if !ok {
wa = &atomic.Value{}
ing.providerAdChainStaging[p] = wa
}
ing.providersBeingProcessedMu.Unlock()
oldAssignment := wa.Swap(workerAssignment{
adInfos: adInfos,
publisher: syncFinishedEvent.PeerID,
provider: p,
})
if oldAssignment == nil || oldAssignment.(workerAssignment).none {
// No previous run scheduled a worker to handle this provider, so
// schedule one.
ing.reg.Saw(p)
pushCount := ing.toWorkers.Push(providerID(p))
stats.Record(context.Background(), metrics.AdIngestQueued.M(int64(ing.toWorkers.Length())))
stats.Record(context.Background(), metrics.AdIngestBacklog.M(int64(pushCount)))
}
}
}
func (ing *Ingester) ingestWorker(ctx context.Context) {
log.Debug("started ingest worker")
defer ing.waitForWorkers.Done()
for {
select {
case <-ing.closeWorkers:
log.Debug("stopped ingest worker")
return
case provider := <-ing.toWorkers.PopChan():
stats.Record(context.Background(), metrics.AdIngestQueued.M(int64(ing.toWorkers.Length())))
pid := peer.ID(provider)
ing.providersBeingProcessedMu.Lock()
pc := ing.providersBeingProcessed[pid]
ing.providersBeingProcessedMu.Unlock()
activeWorkers := atomic.AddInt32(&ing.activeWorkers, 1)
stats.Record(context.Background(), metrics.AdIngestActive.M(int64(activeWorkers)))
pc <- struct{}{}
ing.ingestWorkerLogic(ctx, pid)
ing.handlePendingAnnounce(ctx, pid)
<-pc
activeWorkers = atomic.AddInt32(&ing.activeWorkers, -1)
stats.Record(context.Background(), metrics.AdIngestActive.M(int64(activeWorkers)))
}
}
}
func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID) {
// Pull out the assignment for this provider. Note that runIngestStep
// populates this atomic.Value.
ing.providersBeingProcessedMu.Lock()
wa := ing.providerAdChainStaging[provider]
ing.providersBeingProcessedMu.Unlock()
assignmentInterface := wa.Swap(workerAssignment{none: true})
if assignmentInterface == nil || assignmentInterface.(workerAssignment).none {
// Note this is here for completeness. This would not happen
// normally. Execution could get here if someone manually calls this
// function outside the ingest loop. Nothing to do – no assignment.
return
}
assignment := assignmentInterface.(workerAssignment)
rmCtxID := make(map[string]struct{})
var skips []int
skip := -1
frozen := ing.reg.Frozen()
// Filter out ads that are already processed, and any earlier ads.
splitAtIndex := len(assignment.adInfos)
for i, ai := range assignment.adInfos {
if ctx.Err() != nil {
log.Infow("Ingest worker canceled while ingesting ads", "provider", provider, "err", ctx.Err())
ing.inEvents <- adProcessedEvent{
publisher: assignment.publisher,
headAdCid: assignment.adInfos[0].cid,
adCid: ai.cid,
err: ctx.Err(),
}
return
}
// Iterate latest to earliest.
processed, resync := ing.adAlreadyProcessed(ai.cid)
if processed {
// This ad is already processed, which means that all earlier ads
// are also processed. Break here and split at this index later.
// The cids before this index are newer and have not been processed
// yet; the cids after are older and have already been processed.
splitAtIndex = i
break
}
if resync {
ai.resync = true
}
ctxIdStr := string(ai.ad.ContextID)
// This ad was deleted by a later remove. Push previous onto skips
// stack, and set latest skip.