Skip to content

Commit

Permalink
Handle context concelation during ingestion (#2592)
Browse files Browse the repository at this point in the history
In some places context cancelation was not handled during ingestion. This caused the indexer to be unresponsive to a shutdown request. This change will include context cancelation when selecting to read or write channels.

Adds additional debug logging.
  • Loading branch information
gammazero authored Apr 3, 2024
1 parent 021ab38 commit 6bbb360
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
25 changes: 17 additions & 8 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,6 @@ func (ing *Ingester) RunWorkers(n int) {
// assignments are processed preferentially over new advertisement chains.
func (ing *Ingester) ingestWorker(ctx context.Context, syncFinishedEvents <-chan dagsync.SyncFinished, wkrNum int) {
log := log.With("worker", wkrNum)

log.Info("started ingest worker")
defer ing.waitForWorkers.Done()

Expand Down Expand Up @@ -921,7 +920,7 @@ func (ing *Ingester) processRawAdChain(ctx context.Context, syncFinished dagsync
}

publisher := syncFinished.PeerID
log := log.With("publisher", publisher)
log := log.With("publisher", publisher, "worker", wkrNum)
log.Infow("Advertisement chain synced", "length", syncFinished.Count)

var rmCount int64
Expand Down Expand Up @@ -1066,13 +1065,14 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
count++

if ctx.Err() != nil {
log.Infow("Ingest worker canceled while processing ads", "err", ctx.Err())
log.Infow("Ingest canceled while processing ads", "err", ctx.Err())
ing.inEvents <- adProcessedEvent{
publisher: publisher,
headAdCid: headAdCid,
adCid: ai.cid,
err: ctx.Err(),
}
log.Debug("Sent ad processed event for canceled processing")
return
}

Expand Down Expand Up @@ -1103,6 +1103,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
headAdCid: headAdCid,
adCid: ai.cid,
}
log.Debug("Sent ad processed event for skipped ad")
continue
}

Expand All @@ -1112,7 +1113,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
"progress", fmt.Sprintf("%d of %d", count, total),
"lag", lag)

hasEnts, fromMirror, err := ing.ingestAd(ctx, publisher, ai.cid, ai.resync, frozen, lag, headProvider)
hasEnts, fromMirror, err := ing.ingestAd(ctx, publisher, ai.cid, ai.resync, frozen, lag, headProvider, wkrNum)
if err != nil {
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
Expand Down Expand Up @@ -1141,11 +1142,13 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher

// If err still not nil, then this is a non-permanent type of error.
if err != nil {
errText := err.Error()
if errors.Is(err, errInternal) {
errText = errInternal.Error()
if !errors.Is(err, context.Canceled) {
errText := err.Error()
if errors.Is(err, errInternal) {
errText = errInternal.Error()
}
ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText))
}
ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText))
log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1)
// Tell anyone waiting that the sync finished for this head because
// of error. TODO(mm) would be better to propagate the error.
Expand All @@ -1155,6 +1158,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
adCid: ai.cid,
err: err,
}
log.Debug("Sent ad processed event with error")
return
}
} else {
Expand All @@ -1171,6 +1175,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher

if putMirror {
if fromMirror && ing.mirror.readWriteSame() {
log.Debug("Removing temporary ad data")
// If ad data retrieved from same mirror that is being written
// to, then only clean up the data from local datastore, but do
// not rewrite it to the mirror.
Expand All @@ -1179,6 +1184,7 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
log.Errorw("Cannot remove advertisement data from datastore", "err", err)
}
} else {
log.Debug("Writing ad to CAR mirror")
// If resyncing and not overwriting, then do not overwrite the
// destination file if it already exists.
preventOverwrite := ai.resync && !ing.overwriteMirrorOnResync
Expand All @@ -1195,11 +1201,14 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider, publisher
}
}

log.Debug("Done processing ad")

// Distribute the atProcessedEvent notices to waiting Sync calls.
ing.inEvents <- adProcessedEvent{
publisher: publisher,
headAdCid: headAdCid,
adCid: ai.cid,
}
log.Debug("Sent ad processed event")
}
}
25 changes: 19 additions & 6 deletions internal/ingest/linksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ func verifyAdvertisement(n ipld.Node, reg *registry.Registry) (peer.ID, error) {
// is the source of the indexed content, the provider is where content can be
// retrieved from. It is the provider ID that needs to be stored by the
// indexer.
func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) (bool, bool, error) {
log := log.With("publisher", publisherID, "adCid", adCid)
func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo, wkrNum int) (bool, bool, error) {
log := log.With("publisher", publisherID, "adCid", adCid, "worker", wkrNum)

ad, err := ing.loadAd(adCid)
if err != nil {
Expand Down Expand Up @@ -202,6 +202,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci

var extendedProviders *registry.ExtendedProviders
if ad.ExtendedProvider != nil {
log.Debug("Advertisement has extended providers")
if ad.IsRm {
return false, false, adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")}
}
Expand Down Expand Up @@ -263,7 +264,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
log = log.With("contextID", base64.StdEncoding.EncodeToString(ad.ContextID))

if ad.IsRm {
log.Infow("Advertisement is for removal by context id")
log.Info("Advertisement is for removal by context id")
err = ing.indexer.RemoveProviderContext(providerID, ad.ContextID)
if err != nil {
return false, false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)}
Expand All @@ -277,6 +278,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
if ad.Entries != schema.NoEntries {
return false, false, adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")}
}
log.Info("Advertisement is for removal by context id")
return false, false, nil
}

Expand All @@ -290,9 +292,9 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
}

if frozen {
log.Infow("Indexer frozen, advertisement only updates metadata")
log.Info("Indexer frozen, advertisement only updates metadata")
} else {
log.Infow("Advertisement is metadata update only")
log.Info("Advertisement is for metadata update only")
}
err = ing.indexer.Put(value)
if err != nil {
Expand All @@ -306,6 +308,8 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
return false, false, adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")}
}

log.Debug("Advertisement has entries to sync")

if ing.syncTimeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, ing.syncTimeout)
Expand All @@ -316,6 +320,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci

// If using a CAR reader, then try to get the advertisement CAR file first.
if ing.mirror.canRead() {
log.Debug("Attempting to fetch entries from CAR mirror")
mhCount, err = ing.ingestEntriesFromCar(ctx, ad, providerID, adCid, entriesCid, log)
hasEnts := mhCount != 0
// If entries data successfully read from CAR file.
Expand All @@ -324,6 +329,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
return hasEnts, true, nil
}
if !errors.Is(err, fs.ErrNotExist) {
log.Errorw("Cannot get advertisement from CAR mirror", "err", err)
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
Expand All @@ -336,10 +342,15 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
// serving entries data.
return hasEnts, false, err
}
} else if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return hasEnts, false, err
}
log.Errorw("Cannot get advertisement from car store", "err", err)
// If any other error, proceed and try to fetch from publisher.
} else {
log.Debug("Advertisement not found in CAR mirror")
}
}
log.Debug("Fetching entries from publisher")

// The ad.Entries link can point to either a chain of EntryChunks or a
// HAMT. Sync the very first entry so that we can check which type it is.
Expand Down Expand Up @@ -373,8 +384,10 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
}

if isHAMT(node) {
log.Info("syncing hamt entries")
mhCount, err = ing.ingestHamtFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log)
} else {
log.Info("syncing entries")
mhCount, err = ing.ingestEntriesFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log)
}
return mhCount != 0, false, err
Expand Down

0 comments on commit 6bbb360

Please sign in to comment.