Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tapdb: allow batch inserting proofs when syncing universe, remove write lock for RegisterIssuance #449

Merged
merged 13 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ jobs:
########################
integration-test-postgres:
name: run itests postgres
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
steps:
- name: git checkout
uses: actions/checkout@v3
Expand Down
14 changes: 0 additions & 14 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -179,20 +179,6 @@ itest-only-trace: aperture-dir
rm -rf itest/regtest; date
$(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -loglevel=trace -btcdexec=./btcd-itest -logdir=regtest

optional-itest: build-itest optional-itest-only

optional-itest-trace: build-itest optional-itest-only-trace

optional-itest-only: aperture-dir
@$(call print, "Running integration tests with ${backend} backend.")
rm -rf itest/regtest; date
$(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -optional -btcdexec=./btcd-itest -logdir=regtest

optional-itest-only-trace: aperture-dir
@$(call print, "Running integration tests with ${backend} backend.")
rm -rf itest/regtest; date
$(GOTEST) ./itest -v -tags="$(ITEST_TAGS)" $(TEST_FLAGS) $(ITEST_FLAGS) -optional -loglevel=trace -btcdexec=./btcd-itest -logdir=regtest

aperture-dir:
ifeq ($(UNAME_S),Linux)
mkdir -p $$HOME/.aperture
Expand Down
35 changes: 33 additions & 2 deletions fn/recv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fn

import (
"context"
"fmt"
"time"
)
Expand Down Expand Up @@ -40,8 +41,6 @@ func RecvResp[T any](r <-chan T, e <-chan error, q <-chan struct{}) (T, error) {
//
// NOTE: This function closes the channel to be able to collect all items at
// once.
//
// TODO(roasbeef): instead could take a number of items to recv?
func Collect[T any](c chan T) []T {
close(c)

Expand All @@ -52,3 +51,35 @@ func Collect[T any](c chan T) []T {

return out
}

// CollectBatch reads from the given channel and returns batchSize items at a
// time and a boolean that indicates whether we expect more items to be sent
// on the channel. If the context is canceled, the function returns the items
// that have been read so far and the context's error.
//
// NOTE: The channel MUST be closed for this function to return.
func CollectBatch[V any](ctx context.Context, values <-chan V,
batchSize int, cb func(ctx context.Context, batch []V) error) error {

batch := make([]V, 0, batchSize)
for {
select {
case v, ok := <-values:
if !ok {
return cb(ctx, batch)
}
batch = append(batch, v)

if len(batch) == batchSize {
err := cb(ctx, batch)
if err != nil {
return err
}
batch = make([]V, 0, batchSize)
}

case <-ctx.Done():
return ctx.Err()
}
}
}
61 changes: 61 additions & 0 deletions fn/recv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package fn

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
)

var (
testTimeout = 100 * time.Millisecond
)

func TestCollectBatch(t *testing.T) {
t.Parallel()

ctxb := context.Background()
ctxt, cancel := context.WithTimeout(ctxb, testTimeout)
defer cancel()

// First, test the expected normal case where we receive all the items
// and the channel is closed.
var (
c = make(chan int, 10)
numReceived = 0
)

for i := 0; i < 10; i++ {
c <- i
}
close(c)

err := CollectBatch(
ctxt, c, 3, func(ctx context.Context, batch []int) error {
numReceived += len(batch)

return nil
},
)
require.NoError(t, err)
require.Equal(t, 10, numReceived)

// If we don't close the channel, then we expect to run into the
// timeout and only receive 9 out of 10 items (the last batch is never
// completed).
c = make(chan int, 10)
numReceived = 0
for i := 0; i < 10; i++ {
c <- i
}
err = CollectBatch(
ctxt, c, 3, func(ctx context.Context, batch []int) error {
numReceived += len(batch)

return nil
},
)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Equal(t, 9, numReceived)
}
4 changes: 2 additions & 2 deletions itest/mint_batch_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func testMintBatch100StressTest(t *harnessTest) {

func testMintBatch1kStressTest(t *harnessTest) {
batchSize := 1_000
timeout := defaultWaitTimeout * 10
timeout := defaultWaitTimeout * 20

testMintBatchNStressTest(t, batchSize, timeout)
}

func testMintBatch10kStressTest(t *harnessTest) {
batchSize := 10_000
timeout := defaultWaitTimeout * 100
timeout := defaultWaitTimeout * 200

testMintBatchNStressTest(t, batchSize, timeout)
}
Expand Down
10 changes: 9 additions & 1 deletion itest/tapd_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ var (
// to use when starting a tap daemon.
dbbackend = flag.String("dbbackend", "sqlite", "Set the database "+
"backend to use when starting a tap daemon.")

// postgresTimeout is a command line flag for specifying the amount of
// time to allow the postgres fixture to run in total. Needs to be
// increased for long-running tests.
postgresTimeout = flag.Duration("postgrestimeout",
tapdb.DefaultPostgresFixtureLifetime, "The amount of time to "+
"allow the postgres fixture to run in total. Needs "+
"to be increased for long-running tests.")
)

const (
Expand Down Expand Up @@ -109,7 +117,7 @@ func newTapdHarness(ht *harnessTest, cfg tapdConfig,

case tapcfg.DatabaseBackendPostgres:
fixture := tapdb.NewTestPgFixture(
ht.t, tapdb.DefaultPostgresFixtureLifetime, !*noDelete,
ht.t, *postgresTimeout, !*noDelete,
)
ht.t.Cleanup(func() {
if !*noDelete {
Expand Down
7 changes: 7 additions & 0 deletions make/testing_flags.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ ifneq ($(nodelete),)
ITEST_FLAGS += -nodelete
endif

# Run the optional tests.
ifneq ($(optional),)
guggero marked this conversation as resolved.
Show resolved Hide resolved
ITEST_FLAGS += -optional -postgrestimeout=240m
endif

# Run itests with specified db backend.
ifneq ($(dbbackend),)
ITEST_FLAGS += -dbbackend=$(dbbackend)
Expand Down Expand Up @@ -69,6 +74,8 @@ endif
# test command. If not, we set 60m (up from the default 10m).
ifneq ($(timeout),)
TEST_FLAGS += -test.timeout=$(timeout)
else ifneq ($(optional),)
TEST_FLAGS += -test.timeout=240m
else
TEST_FLAGS += -test.timeout=60m
endif
Expand Down
16 changes: 7 additions & 9 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2589,25 +2589,23 @@ func (r *rpcServer) AssetLeafKeys(ctx context.Context,
func marshalAssetLeaf(ctx context.Context, keys KeyLookup,
assetLeaf *universe.MintingLeaf) (*unirpc.AssetLeaf, error) {

// In order to display the full asset, we'll parse the genesis
// proof so we can map that to the asset being proved.
var assetProof proof.Proof
if err := assetProof.Decode(
bytes.NewReader(assetLeaf.GenesisProof),
); err != nil {
// In order to display the full asset, we'll also encode the genesis
// proof.
var buf bytes.Buffer
if err := assetLeaf.GenesisProof.Encode(&buf); err != nil {
return nil, err
}

rpcAsset, err := MarshalAsset(
ctx, &assetProof.Asset, false, true, keys,
ctx, &assetLeaf.GenesisProof.Asset, false, true, keys,
)
if err != nil {
return nil, err
}

return &unirpc.AssetLeaf{
Asset: rpcAsset,
IssuanceProof: assetLeaf.GenesisProof[:],
IssuanceProof: buf.Bytes(),
}, nil
}

Expand Down Expand Up @@ -2857,7 +2855,7 @@ func unmarshalAssetLeaf(leaf *unirpc.AssetLeaf) (*universe.MintingLeaf, error) {
Genesis: assetProof.Asset.Genesis,
GroupKey: assetProof.Asset.GroupKey,
},
GenesisProof: leaf.IssuanceProof,
GenesisProof: &assetProof,
Amt: assetProof.Asset.Amount,
}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions tapcfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ const (
// to sync Universe state with the federation.
defaultUniverseSyncInterval = time.Minute * 10

// defaultUniverseSyncBatchSize is the default number of proofs we'll
// sync in a single batch.
defaultUniverseSyncBatchSize = 200

// defaultReOrgSafeDepth is the default number of confirmations we'll
// wait for before considering a transaction safely buried in the chain.
defaultReOrgSafeDepth = 6
Expand Down
8 changes: 5 additions & 3 deletions tapcfg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
LocalDiffEngine: baseUni,
NewRemoteDiffEngine: tap.NewRpcUniverseDiff,
LocalRegistrar: baseUni,
SyncBatchSize: defaultUniverseSyncBatchSize,
})

federationMembers := cfg.Universe.FederationServers
Expand Down Expand Up @@ -284,9 +285,10 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger,
GenSigner: tap.NewLndRpcGenSigner(
lndServices,
),
ProofFiles: proofFileStore,
Universe: universeFederation,
ProofWatcher: reOrgWatcher,
ProofFiles: proofFileStore,
Universe: universeFederation,
ProofWatcher: reOrgWatcher,
UniversePushBatchSize: defaultUniverseSyncBatchSize,
},
BatchTicker: ticker.NewForce(cfg.BatchMintingInterval),
ProofUpdates: proofArchive,
Expand Down
61 changes: 48 additions & 13 deletions tapdb/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tapdb
import (
"context"
"database/sql"
"math"
prand "math/rand"
"time"

Expand All @@ -21,9 +22,16 @@ const (
// repetition.
DefaultNumTxRetries = 10

// DefaultRetryDelay is the default delay between retries. This will be
// used to generate a random delay between 0 and this value.
DefaultRetryDelay = time.Millisecond * 50
// DefaultInitialRetryDelay is the default initial delay between
// retries. This will be used to generate a random delay between -50%
// and +50% of this value, so 20 to 60 milliseconds. The retry will be
// doubled after each attempt until we reach DefaultMaxRetryDelay. We
// start with a random value to avoid multiple goroutines that are
// created at the same time to effectively retry at the same time.
DefaultInitialRetryDelay = time.Millisecond * 40

// DefaultMaxRetryDelay is the default maximum delay between retries.
DefaultMaxRetryDelay = time.Second * 3
)

// TxOptions represents a set of options one can use to control what type of
Expand Down Expand Up @@ -91,23 +99,50 @@ type BatchedQuerier interface {
// executor. This can be used to do things like retry a transaction due to an
// error a certain amount of times.
type txExecutorOptions struct {
numRetries int
retryDelay time.Duration
numRetries int
initialRetryDelay time.Duration
maxRetryDelay time.Duration
}

// defaultTxExecutorOptions returns the default options for the transaction
// executor.
func defaultTxExecutorOptions() *txExecutorOptions {
return &txExecutorOptions{
numRetries: DefaultNumTxRetries,
retryDelay: DefaultRetryDelay,
numRetries: DefaultNumTxRetries,
initialRetryDelay: DefaultInitialRetryDelay,
maxRetryDelay: DefaultMaxRetryDelay,
}
}

// randRetryDelay returns a random retry delay between 0 and the configured max
// delay.
func (t *txExecutorOptions) randRetryDelay() time.Duration {
return time.Duration(prand.Int63n(int64(t.retryDelay))) //nolint:gosec
// randRetryDelay returns a random retry delay between -50% and +50%
// of the configured delay that is doubled for each attempt and capped at a max
// value.
func (t *txExecutorOptions) randRetryDelay(attempt int) time.Duration {
halfDelay := t.initialRetryDelay / 2
randDelay := prand.Int63n(int64(t.initialRetryDelay)) //nolint:gosec
guggero marked this conversation as resolved.
Show resolved Hide resolved

// 50% plus 0%-100% gives us the range of 50%-150%.
initialDelay := halfDelay + time.Duration(randDelay)

// If this is the first attempt, we just return the initial delay.
if attempt == 0 {
return initialDelay
}

// For each subsequent delay, we double the initial delay. This still
// gives us a somewhat random delay, but it still increases with each
// attempt. If we double something n times, that's the same as
// multiplying the value with 2^n. We limit the power to 32 to avoid
// overflows.
factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32)))
actualDelay := initialDelay * factor

// Cap the delay at the maximum configured value.
if actualDelay > t.maxRetryDelay {
guggero marked this conversation as resolved.
Show resolved Hide resolved
return t.maxRetryDelay
}

return actualDelay
}

// TxExecutorOption is a functional option that allows us to pass in optional
Expand All @@ -126,7 +161,7 @@ func WithTxRetries(numRetries int) TxExecutorOption {
// to wait before a transaction is retried.
func WithTxRetryDelay(delay time.Duration) TxExecutorOption {
return func(o *txExecutorOptions) {
o.retryDelay = delay
o.initialRetryDelay = delay
}
}

Expand Down Expand Up @@ -171,7 +206,7 @@ func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
txOptions TxOptions, txBody func(Q) error) error {

waitBeforeRetry := func(attemptNumber int) {
retryDelay := t.opts.randRetryDelay()
retryDelay := t.opts.randRetryDelay(attemptNumber)

log.Tracef("Retrying transaction due to tx serialization "+
"error, attempt_number=%v, delay=%v", attemptNumber,
Expand Down
Loading