Skip to content

Commit

Permalink
Handle additional errors as permanent (#240)
Browse files Browse the repository at this point in the history
* update version
* Fix issue #241
* Handle "content not found" error
  • Loading branch information
gammazero authored Feb 25, 2022
1 parent 21920c4 commit 0c51f62
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 27 deletions.
27 changes: 27 additions & 0 deletions internal/ingest/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ingest

import (
"fmt"
)

type adIngestState string

type adIngestError struct {
state adIngestState
err error
}

const (
adIngestIndexerErr adIngestState = "indexerErr"
adIngestDecodingErr adIngestState = "decodingErr"
adIngestMalformedErr adIngestState = "malformedErr"
adIngestRegisterProviderErr adIngestState = "registerErr"
adIngestSyncEntriesErr adIngestState = "syncEntriesErr"
adIngestContentNotFound adIngestState = "contentNotFound"
// Happens if there is an error during ingest of an entry chunk (rather than fetching it).
adIngestEntryChunkErr adIngestState = "ingestEntryChunkErr"
)

func (e adIngestError) Error() string {
return fmt.Sprintf("%s: %s", e.state, e.err)
}
2 changes: 1 addition & 1 deletion internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (ing *Ingester) ingestWorkerLogic(msg toWorkerMsg) {
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
case adIngestDecodingErr, adIngestMalformedErr, adIngestEntryChunkErr:
case adIngestDecodingErr, adIngestMalformedErr, adIngestEntryChunkErr, adIngestContentNotFound:
// These error cases are permament. e.g. if we try again later we will hit the same error. So we log and drop this error.
log.Errorw("Skipping ad because of a permanant error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state)
err = nil
Expand Down
33 changes: 8 additions & 25 deletions internal/ingest/linksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"strings"
"time"

indexer "github.com/filecoin-project/go-indexer-core"
Expand Down Expand Up @@ -174,33 +175,12 @@ func (ing *Ingester) ingestEntryChunk(publisher peer.ID, adCid cid.Cid, ad schem
err = ing.indexContentBlock(adCid, ad, publisher, node)
if err != nil {
return fmt.Errorf("failed processing entries for advertisement: %w", err)
} else {
log.Info("Done indexing content in entry block")
ing.signalMetricsUpdate()
}

return nil
}

type adIngestError struct {
state adIngestState
err error
}

type adIngestState string

const (
adIngestIndexerErr adIngestState = "indexerErr"
adIngestDecodingErr adIngestState = "decodingErr"
adIngestMalformedErr adIngestState = "malformedErr"
adIngestRegisterProviderErr adIngestState = "registerErr"
adIngestSyncEntriesErr adIngestState = "syncEntriesErr"
// Happens if there is an error during ingest of an entry chunk (rather than fetching it).
adIngestEntryChunkErr adIngestState = "ingestEntryChunkErr"
)
log.Info("Done indexing content in entry block")
ing.signalMetricsUpdate()

func (e adIngestError) Error() string {
return fmt.Sprintf("%s: %s\n", e.state, e.err)
return nil
}

// ingestAd fetches all the entries for a single advertisement and processes
Expand Down Expand Up @@ -306,6 +286,9 @@ func (ing *Ingester) ingestAd(publisher peer.ID, adCid cid.Cid, ad schema.Advert
}
}))
if err != nil {
if strings.Contains(err.Error(), "datatransfer failed: content not found") {
return adIngestError{adIngestContentNotFound, fmt.Errorf("failed to sync entries: %w", err)}
}
return adIngestError{adIngestSyncEntriesErr, fmt.Errorf("failed to sync entries: %w", err)}
}

Expand All @@ -317,7 +300,7 @@ func (ing *Ingester) ingestAd(publisher peer.ID, adCid cid.Cid, ad schema.Advert
ing.signalMetricsUpdate()

if len(errsIngestingEntryChunks) > 0 {
return adIngestError{adIngestSyncEntriesErr, fmt.Errorf("failed to ingest entry chunks: %v", errsIngestingEntryChunks)}
return adIngestError{adIngestEntryChunkErr, fmt.Errorf("failed to ingest entry chunks: %v", errsIngestingEntryChunks)}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.3.4"
"version": "v0.3.5"
}

0 comments on commit 0c51f62

Please sign in to comment.