Skip to content

Commit

Permalink
Document code to clafify publisher versus provider
Browse files Browse the repository at this point in the history
Add comments, change variable namess, and update log messages to clarify use of data pertaining to advertisement publisher and content provider. Add additional comments to clarify indexer behavior and explain advertisement processing workflow.

- Add some minor optimizations
- Fix minor resource leak with pending announce messages from publisher that is no a providers.
  • Loading branch information
gammazero committed Mar 11, 2023
1 parent 1936a32 commit a12af89
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 94 deletions.
6 changes: 4 additions & 2 deletions announce/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,11 @@ func (r *Receiver) watch(ctx context.Context) {
close(r.watchDone)
}

// Direct handles a direct announce message, that was not arrived over pubsub.
// Direct handles a direct announce message, that was not received over pubsub.
// The message is resent over pubsub with the original peerID encoded into the
// message extra data.
// message extra data. The peerID and addrs are those of the advertisement
// publisher, since an announce message announces the availability of an
// advertisement and where to retrieve it from.
func (r *Receiver) Direct(ctx context.Context, nextCid cid.Cid, peerID peer.ID, addrs []multiaddr.Multiaddr) error {
log.Infow("Handling direct announce", "peer", peerID)
amsg := Announce{
Expand Down
31 changes: 20 additions & 11 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ type SyncFinished struct {
// Cid is the CID identifying the link that finished and is now the latest
// sync for a specific peer.
Cid cid.Cid
// PeerID identifies the peer this SyncFinished event pertains to.
// PeerID identifies the peer this SyncFinished event pertains to. This is
// the publisher of the advertisement chain.
PeerID peer.ID
// A list of cids that this sync acquired. In order from latest to oldest.
// The latest cid will always be at the beginning.
Expand All @@ -127,7 +128,8 @@ type handler struct {
// it should grab this lock to insure no other process updates that state
// concurrently.
latestSyncMu sync.Mutex
// peerID is the ID of the peer this handler is responsible for.
// peerID is the ID of the peer this handler is responsible for. This is
// the publisher of an advertisement chain.
peerID peer.ID
// pendingCid is a CID queued for async handling.
pendingCid cid.Cid
Expand Down Expand Up @@ -369,10 +371,10 @@ func (s *Subscriber) RemoveHandler(peerID peer.ID) bool {
return true
}

// Sync performs a one-off explicit sync with the given peer for a specific CID
// and updates the latest synced link to it. Completing sync may take a
// significant amount of time, so Sync should generally be run in its own
// goroutine.
// Sync performs a one-off explicit sync with the given peer (publisher) for a
// specific CID and updates the latest synced link to it. Completing sync may
// take a significant amount of time, so Sync should generally be run in its
// own goroutine.
//
// If given cid.Undef, the latest root CID is queried from the peer directly
// and used instead. Note that in an event where there is no latest root, i.e.
Expand Down Expand Up @@ -463,7 +465,8 @@ func (s *Subscriber) Sync(ctx context.Context, peerID peer.ID, nextCid cid.Cid,
wrapSel = true
}

// Check for existing handler. If none, create one if allowed.
// Check for an existing handler for the specified peer (publisher). If
// none, create one if allowed.
hnd, err := s.getOrCreateHandler(peerID)
if err != nil {
return cid.Undef, err
Expand All @@ -484,7 +487,11 @@ func (s *Subscriber) Sync(ctx context.Context, peerID peer.ID, nextCid cid.Cid,

if updateLatest {
hnd.subscriber.latestSyncHander.SetLatestSync(hnd.peerID, nextCid)
hnd.subscriber.inEvents <- SyncFinished{Cid: nextCid, PeerID: hnd.peerID, SyncedCids: syncedCids}
hnd.subscriber.inEvents <- SyncFinished{
Cid: nextCid,
PeerID: hnd.peerID,
SyncedCids: syncedCids,
}
}

// The sync succeeded, so let's remember this address in the appropriate
Expand Down Expand Up @@ -608,9 +615,11 @@ func (s *Subscriber) watch() {
}
}

// Announce handles a direct announce message, that was not arrived over
// pubsub. The message is resent over pubsub if the Receiver is configured to do so.
// with the original peerID encoded into the message extra data.
// Announce handles a direct announce message, that was not received over
// pubsub. The message is resent over pubsub, if the Receiver is configured to
// do so. The peerID and addrs are those of the advertisement publisher, since
// an announce message announces the availability of an advertisement and where
// to retrieve it from.
func (s *Subscriber) Announce(ctx context.Context, nextCid cid.Cid, peerID peer.ID, peerAddrs []multiaddr.Multiaddr) error {
return s.receiver.Direct(ctx, nextCid, peerID, peerAddrs)
}
Expand Down
126 changes: 86 additions & 40 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ type adProcessedEvent struct {
err error
}

type providerID peer.ID

// pendingAnnounce captures an announcement received from a provider that await processing.
type pendingAnnounce struct {
addrInfo peer.AddrInfo
Expand All @@ -87,6 +85,23 @@ type workerAssignment struct {
}

// Ingester is a type that uses dagsync for the ingestion protocol.
//
// ## Advertisement Ingestion Constraints
//
// 1. If an Ad is processed, all older ads referenced by this ad (towards the
// start of the ad chain), have also been processes. For example, given some
// chain A <- B <- C, the indexer will never be in the state that it indexed A
// & C but not B.
//
// 2. An indexer will index an Ad chain, but will not make any gaurantees about
// consistency in the presence of multiple ad chains for a given provider. For
// example if a provider publishes two ad chains at the same time chain1 and
// chain2 the indexer will apply whichever chain it learns about first first,
// then apply the other chain.
//
// 3. An indexer will not index the same Ad twice. An indexer will be resilient
// to restarts. If the indexer goes down and comes back up it should not break
// constraint 1.
type Ingester struct {
host host.Host
ds datastore.Batching
Expand Down Expand Up @@ -126,8 +141,6 @@ type Ingester struct {
providerAdChainStaging map[peer.ID]*atomic.Value

closeWorkers chan struct{}
// toStaging receives sync finished events used to call to runIngestStep.
toStaging <-chan dagsync.SyncFinished
// toWorkers is used to ask the worker pool to start processing the ad
// chain for a given provider.
toWorkers *Queue
Expand Down Expand Up @@ -164,6 +177,10 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re
opts.dsAds = ds
}

if cfg.IngestWorkerCount == 0 {
return nil, errors.New("ingester worker count must be > 0")
}

ing := &Ingester{
host: h,
ds: ds,
Expand Down Expand Up @@ -239,15 +256,12 @@ func NewIngester(cfg config.Ingest, h host.Host, idxr indexer.Interface, reg *re
}
ing.sub = sub

ing.toStaging, ing.cancelOnSyncFinished = ing.sub.OnSyncFinished()

if cfg.IngestWorkerCount == 0 {
return nil, errors.New("ingester worker count must be > 0")
}

ing.RunWorkers(cfg.IngestWorkerCount)

go ing.runIngesterLoop()
var syncFinishedEvents <-chan dagsync.SyncFinished
syncFinishedEvents, ing.cancelOnSyncFinished = ing.sub.OnSyncFinished()

go ing.runIngesterLoop(syncFinishedEvents)

// Start distributor to send SyncFinished messages to interested parties.
go ing.distributeEvents()
Expand Down Expand Up @@ -517,29 +531,33 @@ func (ing *Ingester) Sync(ctx context.Context, peerID peer.ID, peerAddr multiadd

// Announce send an announce message to directly to dagsync, instead of through
// pubsub.
func (ing *Ingester) Announce(ctx context.Context, nextCid cid.Cid, addrInfo peer.AddrInfo) error {
provider := addrInfo.ID
log := log.With("provider", provider, "cid", nextCid, "addrs", addrInfo.Addrs)
func (ing *Ingester) Announce(ctx context.Context, nextCid cid.Cid, pubAddrInfo peer.AddrInfo) error {
publisher := pubAddrInfo.ID
log := log.With("publisher", publisher, "cid", nextCid, "addrs", pubAddrInfo.Addrs)

// If the publisher is not the same as the provider, then this will not
// wait for the provider to be done processing the ad chain it is working
// on.
ing.providersBeingProcessedMu.Lock()
pc, ok := ing.providersBeingProcessed[provider]
pc, ok := ing.providersBeingProcessed[publisher]
ing.providersBeingProcessedMu.Unlock()
if !ok {
pc = make(chan struct{}, 1)
ing.providersBeingProcessed[provider] = pc
return ing.sub.Announce(ctx, nextCid, publisher, pubAddrInfo.Addrs)
}
ing.providersBeingProcessedMu.Unlock()

// The publisher in the announce message has the same ID as a known
// provider, so defer handling the announce if that provider is busy.
select {
case pc <- struct{}{}:
log.Info("Handling direct announce request")
err := ing.sub.Announce(ctx, nextCid, provider, addrInfo.Addrs)
err := ing.sub.Announce(ctx, nextCid, publisher, pubAddrInfo.Addrs)
<-pc
return err
case <-ctx.Done():
return ctx.Err()
default:
ing.providersPendingAnnounce.Store(provider, pendingAnnounce{
addrInfo: addrInfo,
ing.providersPendingAnnounce.Store(publisher, pendingAnnounce{
addrInfo: pubAddrInfo,
nextCid: nextCid,
})
log.Info("Deferred handling direct announce request")
Expand Down Expand Up @@ -813,7 +831,11 @@ func (ing *Ingester) autoSync() {
autoSyncMutex.Unlock()
ing.waitForPendingSyncs.Done()
}()
log := log.With("provider", provID, "publisher", pubID, "addr", pubAddr)
log := log.With("publisher", pubID, "addr", pubAddr)
// Log provider ID if not the same as publisher ID.
if provID != pubID {
log = log.With("provider", provID)
}
log.Info("Auto-syncing the latest advertisement with publisher")

_, err := ing.sub.Sync(ctx, pubID, cid.Undef, nil, pubAddr)
Expand Down Expand Up @@ -877,15 +899,29 @@ func (ing *Ingester) RunWorkers(n int) {
}
}

func (ing *Ingester) runIngesterLoop() {
for syncFinishedEvent := range ing.toStaging {
ing.runIngestStep(syncFinishedEvent)
// Handle events from dagsync.Subscriber signaling an chain has been synced.
func (ing *Ingester) runIngesterLoop(syncFinishedEvents <-chan dagsync.SyncFinished) {
for event := range syncFinishedEvents {
ing.runIngestStep(event)
}
}

func (ing *Ingester) runIngestStep(syncFinishedEvent dagsync.SyncFinished) {
log := log.With("publisher", syncFinishedEvent.PeerID)
publisher := syncFinishedEvent.PeerID
log := log.With("publisher", publisher)
// 1. Group the incoming CIDs by provider.
//
// Serializing on the provider prevents concurrent processing of ads for
// the same provider from different chains. This allows the indexer to do
// the following:
//
// - Detect and skip ads that have come from a different chain. Concurrent
// processing may result in double processing. That would cause unnecessary
// work and inaccuracy of metrics.
//
// - When an already-processed ad is seen, the indexer can abandon the
// remainder of the chain being processed, instead of skipping ads for
// specific providers on a mixed provider chain.
adsGroupedByProvider := map[peer.ID][]adInfo{}
for _, c := range syncFinishedEvent.SyncedCids {
// Group the CIDs by the provider. Most of the time a publisher will
Expand Down Expand Up @@ -918,7 +954,8 @@ func (ing *Ingester) runIngestStep(syncFinishedEvent dagsync.SyncFinished) {
})
}

// 2. For each provider put the ad stack to the worker msg channel.
// 2. For each provider put the ad stack to the worker msg channel. Each ad
// stack contains ads for a single provider, from a single publisher.
for p, adInfos := range adsGroupedByProvider {
ing.providersBeingProcessedMu.Lock()
if _, ok := ing.providersBeingProcessed[p]; !ok {
Expand All @@ -933,19 +970,24 @@ func (ing *Ingester) runIngestStep(syncFinishedEvent dagsync.SyncFinished) {

oldAssignment := wa.Swap(workerAssignment{
adInfos: adInfos,
publisher: syncFinishedEvent.PeerID,
publisher: publisher,
provider: p,
})

if oldAssignment == nil || oldAssignment.(workerAssignment).none {
// No previous run scheduled a worker to handle this provider, so
// schedule one.
ing.reg.Saw(p)
pushCount := ing.toWorkers.Push(providerID(p))
pushCount := ing.toWorkers.Push(p)
stats.Record(context.Background(),
metrics.AdIngestQueued.M(int64(ing.toWorkers.Length())),
metrics.AdIngestBacklog.M(int64(pushCount)))
}
// If oldAssignment has adInfos, it is not necessary to merge the old
// and new assignments because the new assignment will already have all
// the adInfos that the old assignment does. If the old assignment was
// not processed yet, then the sync that created the new assignment
// would have traversed the same chain as the old. In other words, any
// existing old assignment is always a subset of a new assignment.
}
}

Expand Down Expand Up @@ -1004,11 +1046,17 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID) {

frozen := ing.reg.Frozen()

log := log.With("publisher", assignment.publisher)
// Log provider ID if not the same as publisher ID.
if provider != assignment.publisher {
log = log.With("provider", provider)
}

// Filter out ads that are already processed, and any earlier ads.
splitAtIndex := len(assignment.adInfos)
for i, ai := range assignment.adInfos {
if ctx.Err() != nil {
log.Infow("Ingest worker canceled while ingesting ads", "provider", provider, "err", ctx.Err())
log.Infow("Ingest worker canceled while ingesting ads", "err", ctx.Err())
ing.inEvents <- adProcessedEvent{
publisher: assignment.publisher,
headAdCid: assignment.adInfos[0].cid,
Expand Down Expand Up @@ -1039,15 +1087,13 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID) {
continue
}
// If this is a remove, do not skip this ad, but skip all earlier
// (deeped in chain) ads with the same context ID.
// (deeper in chain) ads with the same context ID and provider ID.
if ai.ad.IsRm {
rmCtxID[ctxIdStr] = struct{}{}
continue
}
}

log := log.With("publisher", assignment.publisher)

log.Infow("Running worker on ad stack", "headAdCid", assignment.adInfos[0].cid, "numAdsToProcess", splitAtIndex)
var count int
for i := splitAtIndex - 1; i >= 0; i-- {
Expand All @@ -1056,7 +1102,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID) {
count++

if ctx.Err() != nil {
log.Infow("Ingest worker canceled while processing ads", "provider", provider, "err", ctx.Err())
log.Infow("Ingest worker canceled while processing ads", "err", ctx.Err())
ing.inEvents <- adProcessedEvent{
publisher: assignment.publisher,
headAdCid: assignment.adInfos[0].cid,
Expand Down Expand Up @@ -1121,7 +1167,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID) {
"progress", fmt.Sprintf("%d of %d", count, splitAtIndex),
"lag", lag)

err := ing.ingestAd(assignment.publisher, ai.cid, ai.ad, ai.resync, frozen, lag)
err := ing.ingestAd(ctx, assignment.publisher, ai.cid, ai.ad, ai.resync, frozen, lag)
if err == nil {
// No error at all, this ad was processed successfully.
stats.Record(context.Background(), metrics.AdIngestSuccessCount.M(1))
Expand Down Expand Up @@ -1189,16 +1235,16 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID) {
}
}

func (ing *Ingester) handlePendingAnnounce(ctx context.Context, pid peer.ID) {
func (ing *Ingester) handlePendingAnnounce(ctx context.Context, pubID peer.ID) {
if ctx.Err() != nil {
return
}
log := log.With("provider", pid)
log := log.With("publisher", pubID)
// Process pending announce request if any.
// Note that the pending announce is deleted regardless of whether it was successfully
// Note that the pending announce is deleted regardless of whether it was successfully
// processed or not. Because, the cause of failure may be non-recoverable e.g. address
// change and not removing it will block processing of future pending announces.
v, found := ing.providersPendingAnnounce.LoadAndDelete(pid)
v, found := ing.providersPendingAnnounce.LoadAndDelete(pubID)
if !found {
return
}
Expand Down
Loading

0 comments on commit a12af89

Please sign in to comment.