Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block on Proof Courier Service Connection Attempt #1203

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion itest/tapd_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig,
BackoffCfg: &hashmailBackoffCfg,
}
finalCfg.UniverseRpcCourier = &proof.UniverseRpcCourierCfg{
BackoffCfg: &universeRpcBackoffCfg,
BackoffCfg: &universeRpcBackoffCfg,
ServiceRequestTimeout: 50 * time.Millisecond,
}

switch typedProofCourier := (opts.proofCourier).(type) {
Expand Down
2 changes: 2 additions & 0 deletions itest/test_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ func setupHarnesses(t *testing.T, ht *harnessTest,
// If nothing is specified, we use the universe RPC proof courier by
// default.
default:
t.Logf("Address of universe server as proof courier: %v",
universeServer.service.rpcHost())
proofCourier = universeServer
}

Expand Down
49 changes: 39 additions & 10 deletions proof/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func serverDialOpts() ([]grpc.DialOption, error) {
tlsConfig := tls.Config{InsecureSkipVerify: true}
transportCredentials := credentials.NewTLS(&tlsConfig)
opts = append(opts, grpc.WithTransportCredentials(transportCredentials))
opts = append(opts, grpc.WithBlock())
ffranr marked this conversation as resolved.
Show resolved Hide resolved

return opts, nil
}
Expand Down Expand Up @@ -654,7 +655,7 @@ func (b *BackoffHandler) Exec(ctx context.Context, proofLocator Locator,
if err != nil {
return err
}
log.Infof("Starting proof transfer backoff procedure for proof "+
log.Infof("Starting proof transfer backoff procedure "+
"(transfer_type=%s, locator_hash=%x)", transferType,
locatorHash[:])

Expand Down Expand Up @@ -710,7 +711,7 @@ func (b *BackoffHandler) Exec(ctx context.Context, proofLocator Locator,
)
subscriberEvent(waitEvent)

log.Debugf("Proof delivery failed with error. Backing off. "+
log.Debugf("Proof transfer failed with error. Backing off. "+
"(transfer_type=%s, locator_hash=%x, backoff=%s, "+
"attempt=%d): %v",
transferType, locatorHash[:], backoff, i, errExec)
Expand Down Expand Up @@ -742,7 +743,7 @@ func (b *BackoffHandler) wait(ctx context.Context, wait time.Duration) error {
case <-time.After(wait):
return nil
case <-ctx.Done():
return fmt.Errorf("context canceled")
return fmt.Errorf("back off handler context done")
}
}

Expand Down Expand Up @@ -1154,10 +1155,17 @@ func (h *HashMailCourier) SetSubscribers(
var _ Courier = (*HashMailCourier)(nil)

// UniverseRpcCourierCfg is the config for the universe RPC proof courier.
//
// nolint:lll
type UniverseRpcCourierCfg struct {
// BackoffCfg configures the behaviour of the proof delivery
// functionality.
BackoffCfg *BackoffCfg

// ServiceRequestTimeout defines the maximum duration we'll wait for
// a courier service to handle our outgoing request during a connection
// attempt, or when delivering or retrieving a proof.
ServiceRequestTimeout time.Duration `long:"servicerequestimeout" description:"The maximum duration we'll wait for a courier service to handle our outgoing request during a connection attempt, or when delivering or retrieving a proof."`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the hashmail courier / every courier implementation have a similar timeout? And then use it on initialization.

For hashmail I think that would be in NewHashMailBox, you could add the same child ctx object as you're doing for the Universe courier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right! Nice catch. Hahsmail is blocking also now, so would be good to have that timeout.

}

// UniverseRpcCourier is a universe RPC proof courier service handle. It
Expand Down Expand Up @@ -1351,18 +1359,29 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
deliverFunc := func() error {
// Connect to the courier service if a connection hasn't
// been established yet.
err := c.ensureConnect(ctx)
subCtx, subCtxCancel := context.WithTimeout(
ctx, c.cfg.ServiceRequestTimeout,
)
defer subCtxCancel()

err := c.ensureConnect(subCtx)
if err != nil {
return fmt.Errorf("unable to connect to "+
"courier service during delivery "+
"attempt: %w", err)
}

// Submit proof to courier.
_, err = c.client.InsertProof(ctx, &unirpc.AssetProof{
subCtx, subCtxCancel = context.WithTimeout(
ctx, c.cfg.ServiceRequestTimeout,
)
defer subCtxCancel()

assetProof := unirpc.AssetProof{
Key: &universeKey,
AssetLeaf: &assetLeaf,
})
}
_, err = c.client.InsertProof(subCtx, &assetProof)
if err != nil {
return fmt.Errorf("error inserting proof "+
"into universe courier service: %w",
Expand Down Expand Up @@ -1415,15 +1434,25 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
receiveFunc := func() error {
// Connect to the courier service if a connection hasn't
// been established yet.
err := c.ensureConnect(ctx)
subCtx, subCtxCancel := context.WithTimeout(
ctx, c.cfg.ServiceRequestTimeout,
)
defer subCtxCancel()

err := c.ensureConnect(subCtx)
if err != nil {
return fmt.Errorf("unable to connect to "+
"courier service during delivery "+
"attempt: %w", err)
"universe RPC courier service during "+
"recieve attempt: %w", err)
}

// Retrieve proof from courier.
resp, err := c.client.QueryProof(ctx, &universeKey)
subCtx, subCtxCancel = context.WithTimeout(
ctx, c.cfg.ServiceRequestTimeout,
)
defer subCtxCancel()

resp, err := c.client.QueryProof(subCtx, &universeKey)
if err != nil {
return fmt.Errorf("error retrieving proof "+
"from universe courier service: %w",
Expand Down
5 changes: 5 additions & 0 deletions sample-tapd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
; {s, m, h}.
; custodianproofretrievaldelay=5s

; The maximum duration we'll wait for a proof courier service to handle our
; outgoing request during a connection attempt, or when delivering or retrieving
; a proof.
; universerpccourier.servicerequestimeout=5s

; Network to run on (mainnet, regtest, testnet, simnet, signet)
; network=testnet

Expand Down
5 changes: 5 additions & 0 deletions tapcfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ const (
// use for waiting for a receiver to acknowledge a proof transfer.
defaultProofTransferReceiverAckTimeout = time.Hour * 6

// defaultProofCourierServiceResponseTimeout is the default timeout
// we'll use for waiting for a response from the proof courier service.
defaultProofCourierServiceResponseTimeout = time.Second * 5

// defaultUniverseSyncInterval is the default interval that we'll use
// to sync Universe state with the federation.
defaultUniverseSyncInterval = time.Minute * 10
Expand Down Expand Up @@ -423,6 +427,7 @@ func DefaultConfig() Config {
InitialBackoff: defaultProofTransferInitialBackoff,
MaxBackoff: defaultProofTransferMaxBackoff,
},
ServiceRequestTimeout: defaultProofCourierServiceResponseTimeout,
},
CustodianProofRetrievalDelay: defaultProofRetrievalDelay,
Universe: &UniverseConfig{
Expand Down
26 changes: 26 additions & 0 deletions tapfreighter/chain_porter.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,10 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
// those that do not.
reportProofTransfers(notDeliveringOutputs, pendingDeliveryOutputs)

// incompleteDelivery is set to true if any proof delivery attempts fail
// and exceed the maximum backoff limit.
incompleteDelivery := false

deliver := func(ctx context.Context, out TransferOutput) error {
key := out.ScriptKey.PubKey

Expand Down Expand Up @@ -910,6 +914,13 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
// later.
var backoffExecErr *proof.BackoffExecError
if errors.As(err, &backoffExecErr) {
log.Debugf("Exceeded backoff limit for proof delivery "+
"(script_key=%x, proof_courier_addr=%s)",
key.SerializeCompressed(), out.ProofCourierAddr)

// Set the incomplete delivery flag to true so that we
// can retry the proof transfer state later.
incompleteDelivery = true
return nil
}
if err != nil {
Expand All @@ -927,6 +938,10 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
"confirmation: %w", err)
}

log.Infof("Transfer output proof delivery complete "+
"(anchor_txid=%v, output_position=%d)",
pkg.OutboundPkg.AnchorTx.TxHash(), out.Position)

return nil
}

Expand Down Expand Up @@ -969,6 +984,17 @@ func (p *ChainPorter) transferReceiverProof(pkg *sendPackage) error {
return firstErr
}

// If the delivery is incomplete, we'll return early so that we can
// retry proof transfer later.
if incompleteDelivery {
log.Debugf("Proof delivery incomplete, will retry executing "+
"the proof transfer state (transfer_anchor_tx_hash=%v)",
pkg.OutboundPkg.AnchorTx.TxHash())

// Return here before setting the transfer to complete.
return nil
}

// At this point, the transfer is fully finalised and successful:
// - The anchoring transaction has been confirmed on-chain.
// - The proof(s) have been delivered to the receiver(s).
Expand Down
Loading