diff --git a/tapdb/universe_forest.go b/tapdb/universe_forest.go index 88903c05f..384b0c885 100644 --- a/tapdb/universe_forest.go +++ b/tapdb/universe_forest.go @@ -273,3 +273,70 @@ func (b *BaseUniverseForest) RegisterIssuance(ctx context.Context, return issuanceProof, nil } + +// RegisterBatchIssuance inserts a new minting leaf batch within the multiverse +// tree and the universe tree that corresponds to the given base key(s). +func (b *BaseUniverseForest) RegisterBatchIssuance(ctx context.Context, + items []*universe.IssuanceItem) error { + + insertProof := func(item *universe.IssuanceItem, + dbTx BaseUniverseForestStore) error { + + // Register issuance in the asset (group) specific universe + // tree. + _, universeRoot, err := universeRegisterIssuance( + ctx, dbTx, item.ID, item.Key, item.Leaf, + item.MetaReveal, + ) + if err != nil { + return err + } + + // Retrieve a handle to the multiverse tree so that we can + // update the tree by inserting a new issuance. + multiverseTree := mssmt.NewCompactedTree( + newTreeStoreWrapperTx(dbTx, multiverseNS), + ) + + // Construct a leaf node for insertion into the multiverse tree. + // The leaf node includes a reference to the lower tree via the + // lower tree root hash. + universeRootHash := universeRoot.NodeHash() + assetGroupSum := universeRoot.NodeSum() + + leafNode := mssmt.NewLeafNode( + universeRootHash[:], assetGroupSum, + ) + + // Use asset ID (or asset group hash) as the upper tree leaf + // node key. This is the same as the asset specific universe ID. + leafNodeKey := item.ID.Bytes() + + _, err = multiverseTree.Insert(ctx, leafNodeKey, leafNode) + if err != nil { + return err + } + + return nil + } + + var writeTx BaseUniverseForestOptions + dbErr := b.db.ExecTx( + ctx, &writeTx, func(store BaseUniverseForestStore) error { + for idx := range items { + item := items[idx] + err := insertProof(item, store) + if err != nil { + return err + } + } + + return nil + }, + ) + if dbErr != nil { + return dbErr + } + + return nil +} diff --git a/tapdb/universe_stats.go b/tapdb/universe_stats.go index c9862e51d..dc0025a72 100644 --- a/tapdb/universe_stats.go +++ b/tapdb/universe_stats.go @@ -145,6 +145,37 @@ func (u *UniverseStats) LogSyncEvent(ctx context.Context, }) } +// LogSyncEvents logs sync events for the target universe. +func (u *UniverseStats) LogSyncEvents(ctx context.Context, + uniIDs ...universe.Identifier) error { + + var writeOpts UniverseStatsOptions + return u.db.ExecTx(ctx, &writeOpts, func(db UniverseStatsStore) error { + for idx := range uniIDs { + uniID := uniIDs[idx] + + var groupKeyXOnly []byte + if uniID.GroupKey != nil { + groupKeyXOnly = schnorr.SerializePubKey( + uniID.GroupKey, + ) + } + + err := db.InsertNewSyncEvent(ctx, NewSyncEvent{ + EventTime: u.clock.Now(), + EventTimestamp: u.clock.Now().UTC().Unix(), + AssetID: uniID.AssetID[:], + GroupKeyXOnly: groupKeyXOnly, + }) + if err != nil { + return err + } + } + + return nil + }) +} + // LogNewProofEvent logs a new proof insertion event for the target universe. func (u *UniverseStats) LogNewProofEvent(ctx context.Context, uniID universe.Identifier, key universe.BaseKey) error { @@ -165,6 +196,36 @@ func (u *UniverseStats) LogNewProofEvent(ctx context.Context, }) } +// LogNewProofEvents logs new proof insertion events for the target universe. +func (u *UniverseStats) LogNewProofEvents(ctx context.Context, + uniIDs ...universe.Identifier) error { + + var writeTxOpts UniverseStatsOptions + return u.db.ExecTx(ctx, &writeTxOpts, func(db UniverseStatsStore) error { + for idx := range uniIDs { + uniID := uniIDs[idx] + var groupKeyXOnly []byte + if uniID.GroupKey != nil { + groupKeyXOnly = schnorr.SerializePubKey( + uniID.GroupKey, + ) + } + + err := db.InsertNewProofEvent(ctx, NewProofEvent{ + EventTime: u.clock.Now(), + EventTimestamp: u.clock.Now().UTC().Unix(), + AssetID: uniID.AssetID[:], + GroupKeyXOnly: groupKeyXOnly, + }) + if err != nil { + return err + } + } + + return nil + }) +} + // AggregateSyncStats returns stats aggregated over all assets within the // Universe. func (u *UniverseStats) AggregateSyncStats( diff --git a/universe/base.go b/universe/base.go index e8e37acae..4c7a0df1b 100644 --- a/universe/base.go +++ b/universe/base.go @@ -9,6 +9,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2/schnorr" "github.com/davecgh/go-spew/spew" + "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/proof" ) @@ -249,6 +250,49 @@ func (a *MintingArchive) verifyIssuanceProof(ctx context.Context, return assetSnapshot, nil } +// RegisterNewIssuanceBatch inserts a batch of new minting leaves within the +// target universe tree (based on the ID), stored at the base key(s). We assume +// the proofs within the batch have already been checked that they don't yet +// exist in the local database. +func (a *MintingArchive) RegisterNewIssuanceBatch(ctx context.Context, + items []*IssuanceItem) error { + + log.Debugf("Verifying %d new proofs for insertion into Universe", + len(items)) + + for idx := range items { + assetSnapshot, err := a.verifyIssuanceProof(ctx, items[idx]) + if err != nil { + return err + } + + items[idx].MetaReveal = assetSnapshot.MetaReveal + } + + log.Debugf("Inserting %d verified proofs into Universe", len(items)) + err := a.cfg.UniverseForest.RegisterBatchIssuance(ctx, items) + if err != nil { + return fmt.Errorf("unable to register new issuance proofs: %w", + err) + } + + // Log a sync event for the newly inserted leaf in the background as an + // async goroutine. + ids := fn.Map(items, func(item *IssuanceItem) Identifier { + return item.ID + }) + go func() { + err := a.cfg.UniverseStats.LogNewProofEvents( + context.Background(), ids..., + ) + if err != nil { + log.Warnf("unable to log new proof events: %v", err) + } + }() + + return nil +} + // FetchIssuanceProof attempts to fetch an issuance proof for the target base // leaf based on the universe identifier (assetID/groupKey). func (a *MintingArchive) FetchIssuanceProof(ctx context.Context, id Identifier, diff --git a/universe/interface.go b/universe/interface.go index 3571684ad..6887fe6d5 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -246,6 +246,8 @@ type BaseMultiverse interface { leaf *MintingLeaf, metaReveal *proof.MetaReveal) (*IssuanceProof, error) + RegisterBatchIssuance(ctx context.Context, items []*IssuanceItem) error + // FetchIssuanceProof returns an issuance proof for the target key. If // the key doesn't have a script key specified, then all the proofs for // the minting outpoint will be returned. If neither are specified, then @@ -283,6 +285,19 @@ type IssuanceItem struct { MetaReveal *proof.MetaReveal } +// BatchRegistrar is an interface that allows a caller to register a batch of +// issuance items within a base universe. +type BatchRegistrar interface { + Registrar + + // RegisterNewIssuanceBatch inserts a batch of new minting leaves within + // the target universe tree (based on the ID), stored at the base + // key(s). We assume the proofs within the batch have already been + // checked that they don't yet exist in the local database. + RegisterNewIssuanceBatch(ctx context.Context, + items []*IssuanceItem) error +} + const ( // DefaultUniverseRPCPort is the default port that the universe RPC is // hosted on. @@ -683,11 +698,18 @@ type Telemetry interface { LogSyncEvent(ctx context.Context, uniID Identifier, key BaseKey) error + // LogSyncEvents logs sync events for the target universe. + LogSyncEvents(ctx context.Context, uniIDs ...Identifier) error + // LogNewProofEvent logs a new proof insertion event for the target // universe. LogNewProofEvent(ctx context.Context, uniID Identifier, key BaseKey) error + // LogNewProofEvents logs new proof insertion events for the target + // universe. + LogNewProofEvents(ctx context.Context, uniIDs ...Identifier) error + // QuerySyncStats attempts to query the stats for the target universe. // For a given asset ID, tag, or type, the set of universe stats is // returned which lists information such as the total number of syncs diff --git a/universe/syncer.go b/universe/syncer.go index 2d26b1ffa..19e0118ae 100644 --- a/universe/syncer.go +++ b/universe/syncer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "github.com/davecgh/go-spew/spew" "github.com/lightninglabs/taproot-assets/fn" @@ -30,7 +31,7 @@ type SimpleSyncCfg struct { // LocalRegistrar is the registrar tied to a local Universe instance. // This is used to insert new proof into the local DB as a result of // the diff operation. - LocalRegistrar Registrar + LocalRegistrar BatchRegistrar } // SimpleSyncer is a simple implementation of the Syncer interface. It's based @@ -167,7 +168,12 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot BaseRoot, // Now that we know where the divergence is, we can fetch the issuance // proofs from the remote party. - newLeaves := make(chan *MintingLeaf, len(keysToFetch)) + var ( + batchSize = 100 + currentBatch = make([]*IssuanceItem, 0, batchSize) + fetchedLeaves = make([]*IssuanceItem, 0, len(keysToFetch)) + mutex sync.Mutex + ) err = fn.ParSlice( ctx, keysToFetch, func(ctx context.Context, key BaseKey) error { newProof, err := diffEngine.FetchIssuanceProof( @@ -195,22 +201,41 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot BaseRoot, // TODO(roasbeef): inclusion w/ root here, also that // it's the expected asset ID - log.Infof("UniverseRoot(%v): inserting new leaf", - uniID.String()) - log.Tracef("UniverseRoot(%v): inserting new leaf for "+ - "key=%v", uniID.String(), spew.Sdump(key)) + item := &IssuanceItem{ + ID: uniID, + Key: key, + Leaf: leafProof.Leaf, + } + + mutex.Lock() + defer mutex.Unlock() - // TODO(roasbeef): this is actually giving a lagging - // proof for each of them - _, err = s.cfg.LocalRegistrar.RegisterIssuance( - ctx, uniID, key, leafProof.Leaf, - ) - if err != nil { - return fmt.Errorf("unable to register "+ - "issuance proof: %w", err) + currentBatch = append(currentBatch, item) + fetchedLeaves = append(fetchedLeaves, item) + if len(currentBatch) == batchSize { + log.Infof("UniverseRoot(%v): inserting %d "+ + "new leaves", uniID.String(), + len(currentBatch)) + + reg := s.cfg.LocalRegistrar + err = reg.RegisterNewIssuanceBatch( + ctx, currentBatch, + ) + if err != nil { + return fmt.Errorf("unable to register "+ + "issuance proofs: %w", err) + } + + currentBatch = make( + []*IssuanceItem, 0, batchSize, + ) + + log.Debugf("UniverseRoot(%v): inserted %d "+ + "new leaves, now have %d of %d", + uniID.String(), batchSize, + len(fetchedLeaves), len(keysToFetch)) } - newLeaves <- leafProof.Leaf return nil }, ) @@ -228,13 +253,16 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot BaseRoot, result <- AssetSyncDiff{ OldUniverseRoot: localRoot, NewUniverseRoot: remoteRoot, - NewLeafProofs: fn.Collect(newLeaves), + NewLeafProofs: fn.Map( + fetchedLeaves, func(i *IssuanceItem) *MintingLeaf { + return i.Leaf + }, + ), } log.Infof("Sync for UniverseRoot(%v) complete!", uniID.String()) log.Tracef("Sync for UniverseRoot(%v) complete! New "+ - "universe_root=%v", uniID.String(), - spew.Sdump(remoteRoot)) + "universe_root=%v", uniID.String(), spew.Sdump(remoteRoot)) return nil }