Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset read deadline after reading deal proposal message #1479

Merged
merged 2 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ require (
github.com/zondax/hid v0.9.1 // indirect
github.com/zondax/ledger-go v0.12.1 // indirect
go.uber.org/dig v1.15.0 // indirect
go.uber.org/zap v1.24.0 // indirect
go.uber.org/zap v1.24.0
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.7.0 // indirect
Expand Down
91 changes: 64 additions & 27 deletions storagemarket/lp2pimpl/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
)

var log = logging.Logger("boost-net")
Expand All @@ -31,9 +32,19 @@ var propLog = logging.Logger("boost-prop")
const DealProtocolv120ID = "/fil/storage/mk/1.2.0"
const DealProtocolv121ID = "/fil/storage/mk/1.2.1"
const DealStatusV12ProtocolID = "/fil/storage/status/1.2.0"

// The time limit to read a message from the client when the client opens a stream
const providerReadDeadline = 10 * time.Second

// The time limit to write a response to the client
const providerWriteDeadline = 10 * time.Second
const clientReadDeadline = 10 * time.Second

// The time limit to wait for the provider to send a response to a client's request.
// This includes the time it takes for the provider to process the request and
// send a response.
const clientReadDeadline = 60 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: extrapolating from my logs I went for 90 in ♠️
Not advocating a change in boost, just sharing I've seen scary times in an SP log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks, that's good to know.
I think separately we should look into why third party tools are taking a long time to respond.


// The time limit to write a message to the provider
const clientWriteDeadline = 10 * time.Second

// DealClientOption is an option for configuring the libp2p storage deal client
Expand Down Expand Up @@ -195,34 +206,44 @@ func (p *DealProvider) Stop() {

// Called when the client opens a libp2p stream with a new deal proposal
func (p *DealProvider) handleNewDealStream(s network.Stream) {
defer s.Close()
start := time.Now()
reqLogUuid := uuid.New()
reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer())
reqLog.Debugw("new deal proposal request")

defer func() {
err := s.Close()
if err != nil {
reqLog.Infow("closing stream", "err", err)
}
reqLog.Debugw("handled deal proposal request", "duration", time.Since(start).String())
}()

// Set a deadline on reading from the stream so it doesn't hang
_ = s.SetReadDeadline(time.Now().Add(providerReadDeadline))
defer s.SetReadDeadline(time.Time{}) // nolint

// Read the deal proposal from the stream
var proposal types.DealParams
err := proposal.UnmarshalCBOR(s)
_ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed
if err != nil {
log.Warnw("reading storage deal proposal from stream", "err", err)
reqLog.Warnw("reading storage deal proposal from stream", "err", err)
return
}

log.Infow("received deal proposal", "id", proposal.DealUUID, "client-peer", s.Conn().RemotePeer())
reqLog = reqLog.With("id", proposal.DealUUID)
reqLog.Infow("received deal proposal")

// Start executing the deal.
// Note: This method just waits for the deal to be accepted, it doesn't
// wait for deal execution to complete.
startExec := time.Now()
res, err := p.prov.ExecuteDeal(context.Background(), &proposal, s.Conn().RemotePeer())
reqLog.Debugw("processed deal proposal accept")
if err != nil {
log.Warnw("deal proposal failed", "id", proposal.DealUUID, "err", err, "reason", res.Reason)
reqLog.Warnw("deal proposal failed", "err", err, "reason", res.Reason)
}

// Set a deadline on writing to the stream so it doesn't hang
_ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline))
defer s.SetWriteDeadline(time.Time{}) // nolint

// Log the response
propLog.Infow("send deal proposal response",
"id", proposal.DealUUID,
Expand All @@ -238,44 +259,60 @@ func (p *DealProvider) handleNewDealStream(s network.Stream) {
"start epoch", proposal.ClientDealProposal.Proposal.StartEpoch,
"end epoch", proposal.ClientDealProposal.Proposal.EndEpoch,
"price per epoch", proposal.ClientDealProposal.Proposal.StoragePricePerEpoch,
"duration", time.Since(startExec).String(),
)
_ = p.plDB.InsertLog(p.ctx, proposal, res.Accepted, res.Reason) //nolint:errcheck

// Set a deadline on writing to the stream so it doesn't hang
_ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline))
defer s.SetWriteDeadline(time.Time{}) // nolint

// Write the response to the client
err = cborutil.WriteCborRPC(s, &types.DealResponse{Accepted: res.Accepted, Message: res.Reason})
if err != nil {
log.Warnw("writing deal response", "id", proposal.DealUUID, "err", err)
return
reqLog.Warnw("writing deal response", "err", err)
}
}

func (p *DealProvider) handleNewDealStatusStream(s network.Stream) {
defer s.Close()
start := time.Now()
reqLogUuid := uuid.New()
reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer())
reqLog.Debugw("new deal status request")

defer func() {
err := s.Close()
if err != nil {
reqLog.Infow("closing stream", "err", err)
}
reqLog.Debugw("handled deal status request", "duration", time.Since(start).String())
}()

// Read the deal status request from the stream
_ = s.SetReadDeadline(time.Now().Add(providerReadDeadline))
defer s.SetReadDeadline(time.Time{}) // nolint

var req types.DealStatusRequest
err := req.UnmarshalCBOR(s)
_ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed
if err != nil {
log.Warnw("reading deal status request from stream", "err", err)
reqLog.Warnw("reading deal status request from stream", "err", err)
return
}
log.Debugw("received deal status request", "id", req.DealUUID, "client-peer", s.Conn().RemotePeer())
reqLog = reqLog.With("id", req.DealUUID)
reqLog.Debugw("received deal status request")

resp := p.getDealStatus(req)
resp := p.getDealStatus(req, reqLog)
reqLog.Debugw("processed deal status request")

// Set a deadline on writing to the stream so it doesn't hang
_ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline))
defer s.SetWriteDeadline(time.Time{}) // nolint

if err := cborutil.WriteCborRPC(s, &resp); err != nil {
log.Errorw("failed to write deal status response", "err", err)
return
reqLog.Errorw("failed to write deal status response", "err", err)
}
}

func (p *DealProvider) getDealStatus(req types.DealStatusRequest) types.DealStatusResponse {
func (p *DealProvider) getDealStatus(req types.DealStatusRequest, reqLog *zap.SugaredLogger) types.DealStatusResponse {
errResp := func(err string) types.DealStatusResponse {
return types.DealStatusResponse{DealUUID: req.DealUUID, Error: err}
}
Expand All @@ -286,42 +323,42 @@ func (p *DealProvider) getDealStatus(req types.DealStatusRequest) types.DealStat
}

if err != nil {
log.Errorw("failed to fetch deal status", "err", err)
reqLog.Errorw("failed to fetch deal status", "err", err)
return errResp("failed to fetch deal status")
}

// verify request signature
uuidBytes, err := req.DealUUID.MarshalBinary()
if err != nil {
log.Errorw("failed to serialize request deal UUID", "err", err)
reqLog.Errorw("failed to serialize request deal UUID", "err", err)
return errResp("failed to serialize request deal UUID")
}

clientAddr := pds.ClientDealProposal.Proposal.Client
addr, err := p.fullNode.StateAccountKey(p.ctx, clientAddr, chaintypes.EmptyTSK)
if err != nil {
log.Errorw("failed to get account key for client addr", "client", clientAddr.String(), "err", err)
reqLog.Errorw("failed to get account key for client addr", "client", clientAddr.String(), "err", err)
msg := fmt.Sprintf("failed to get account key for client addr %s", clientAddr.String())
return errResp(msg)
}

err = sigs.Verify(&req.Signature, addr, uuidBytes)
if err != nil {
log.Warnw("signature verification failed", "err", err)
reqLog.Warnw("signature verification failed", "err", err)
return errResp("signature verification failed")
}

signedPropCid, err := pds.SignedProposalCid()
if err != nil {
log.Errorw("getting signed proposal cid", "err", err)
reqLog.Errorw("getting signed proposal cid", "err", err)
return errResp("getting signed proposal cid")
}

bts := p.prov.NBytesReceived(req.DealUUID)

si, err := p.spApi.SectorsStatus(p.ctx, pds.SectorID, false)
if err != nil {
log.Errorw("getting sector status from sealer", "err", err)
reqLog.Errorw("getting sector status from sealer", "err", err)
return errResp("getting sector status from sealer")
}

Expand Down