Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry fetch request when libp2p stream is reset #140

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion dagsync/ipnisync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
headschema "github.com/ipni/go-libipni/dagsync/ipnisync/head"
"github.com/ipni/go-libipni/maurl"
"github.com/ipni/go-libipni/mautil"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multihash"
Expand Down Expand Up @@ -190,7 +191,7 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
if s.peerID == "" {
log.Warn("Cannot verify publisher signature without peer ID")
} else if signerID != s.peerID {
return cid.Undef, errors.New("found head signed by an unexpected peer")
return cid.Undef, fmt.Errorf("found head signed by an unexpected peer, peerID: %s, signed-by: %s", s.peerID.String(), signerID.String())
}

// TODO: Check that the returned topic, if any, matches the expected topic.
Expand Down Expand Up @@ -284,6 +285,8 @@ func (s *Syncer) walkFetch(ctx context.Context, rootCid cid.Cid, sel selector.Se
func (s *Syncer) fetch(ctx context.Context, rsrc string, cb func(io.Reader) error) error {
nextURL:
fetchURL := s.rootURL.JoinPath(rsrc)
var doneRetry bool
retry:
req, err := http.NewRequestWithContext(ctx, "GET", fetchURL.String(), nil)
if err != nil {
return err
Expand All @@ -300,6 +303,12 @@ nextURL:
}
goto nextURL
}
if !doneRetry && errors.Is(err, network.ErrReset) {
log.Errorw("stream reset err, retrying", "publisher", s.peerID, "url", fetchURL.String())
// Only retry the same fetch once.
doneRetry = true
goto retry
}
return fmt.Errorf("fetch request failed: %w", err)
}
defer resp.Body.Close()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-ipld-prime v0.21.0
github.com/libp2p/go-libp2p v0.32.1
github.com/libp2p/go-libp2p v0.32.2
github.com/libp2p/go-libp2p-pubsub v0.10.0
github.com/libp2p/go-msgio v0.3.0
github.com/mr-tron/base58 v1.2.0
Expand Down Expand Up @@ -112,7 +112,7 @@ require (
github.com/prometheus/procfs v0.9.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-20 v0.3.4 // indirect
github.com/quic-go/quic-go v0.39.3 // indirect
github.com/quic-go/quic-go v0.39.4 // indirect
github.com/quic-go/webtransport-go v0.6.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.32.1 h1:wy1J4kZIZxOaej6NveTWCZmHiJ/kY7GoAqXgqNCnPps=
github.com/libp2p/go-libp2p v0.32.1/go.mod h1:hXXC3kXPlBZ1eu8Q2hptGrMB4mZ3048JUoS4EKaHW5c=
github.com/libp2p/go-libp2p v0.32.2 h1:s8GYN4YJzgUoyeYNPdW7JZeZ5Ee31iNaIBfGYMAY4FQ=
github.com/libp2p/go-libp2p v0.32.2/go.mod h1:E0LKe+diV/ZVJVnOJby8VC5xzHF0660osg71skcxJvk=
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-pubsub v0.10.0 h1:wS0S5FlISavMaAbxyQn3dxMOe2eegMfswM471RuHJwA=
Expand Down Expand Up @@ -423,8 +423,8 @@ github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/qtls-go1-20 v0.3.4 h1:MfFAPULvst4yoMgY9QmtpYmfij/em7O8UUi+bNVm7Cg=
github.com/quic-go/qtls-go1-20 v0.3.4/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k=
github.com/quic-go/quic-go v0.39.3 h1:o3YB6t2SR+HU/pgwF29kJ6g4jJIJEwEZ8CKia1h1TKg=
github.com/quic-go/quic-go v0.39.3/go.mod h1:T09QsDQWjLiQ74ZmacDfqZmhY/NLnw5BC40MANNNZ1Q=
github.com/quic-go/quic-go v0.39.4 h1:PelfiuG7wXEffUT2yceiqz5V6Pc0TA5ruOd1LcmFc1s=
github.com/quic-go/quic-go v0.39.4/go.mod h1:T09QsDQWjLiQ74ZmacDfqZmhY/NLnw5BC40MANNNZ1Q=
github.com/quic-go/webtransport-go v0.6.0 h1:CvNsKqc4W2HljHJnoT+rMmbRJybShZ0YPFDD3NxaZLY=
github.com/quic-go/webtransport-go v0.6.0/go.mod h1:9KjU4AEBqEQidGHNDkZrb8CAa1abRaosM2yGOyiikEc=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
Expand Down