Skip to content

Commit

Permalink
tapd+universe: support batch proof issuance
Browse files Browse the repository at this point in the history
  • Loading branch information
guggero committed Aug 11, 2023
1 parent bd91f21 commit c8d3c82
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 18 deletions.
67 changes: 67 additions & 0 deletions tapdb/universe_forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,70 @@ func (b *BaseUniverseForest) RegisterIssuance(ctx context.Context,

return issuanceProof, nil
}

// RegisterBatchIssuance inserts a new minting leaf batch within the multiverse
// tree and the universe tree that corresponds to the given base key(s).
func (b *BaseUniverseForest) RegisterBatchIssuance(ctx context.Context,
items []*universe.IssuanceItem) error {

insertProof := func(item *universe.IssuanceItem,
dbTx BaseUniverseForestStore) error {

// Register issuance in the asset (group) specific universe
// tree.
_, universeRoot, err := universeRegisterIssuance(
ctx, dbTx, item.ID, item.Key, item.Leaf,
item.MetaReveal,
)
if err != nil {
return err
}

// Retrieve a handle to the multiverse tree so that we can
// update the tree by inserting a new issuance.
multiverseTree := mssmt.NewCompactedTree(
newTreeStoreWrapperTx(dbTx, multiverseNS),
)

// Construct a leaf node for insertion into the multiverse tree.
// The leaf node includes a reference to the lower tree via the
// lower tree root hash.
universeRootHash := universeRoot.NodeHash()
assetGroupSum := universeRoot.NodeSum()

leafNode := mssmt.NewLeafNode(
universeRootHash[:], assetGroupSum,
)

// Use asset ID (or asset group hash) as the upper tree leaf
// node key. This is the same as the asset specific universe ID.
leafNodeKey := item.ID.Bytes()

_, err = multiverseTree.Insert(ctx, leafNodeKey, leafNode)
if err != nil {
return err
}

return nil
}

var writeTx BaseUniverseForestOptions
dbErr := b.db.ExecTx(
ctx, &writeTx, func(store BaseUniverseForestStore) error {
for idx := range items {
item := items[idx]
err := insertProof(item, store)
if err != nil {
return err
}
}

return nil
},
)
if dbErr != nil {
return dbErr
}

return nil
}
61 changes: 61 additions & 0 deletions tapdb/universe_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,37 @@ func (u *UniverseStats) LogSyncEvent(ctx context.Context,
})
}

// LogSyncEvents logs sync events for the target universe.
func (u *UniverseStats) LogSyncEvents(ctx context.Context,
uniIDs ...universe.Identifier) error {

var writeOpts UniverseStatsOptions
return u.db.ExecTx(ctx, &writeOpts, func(db UniverseStatsStore) error {
for idx := range uniIDs {
uniID := uniIDs[idx]

var groupKeyXOnly []byte
if uniID.GroupKey != nil {
groupKeyXOnly = schnorr.SerializePubKey(
uniID.GroupKey,
)
}

err := db.InsertNewSyncEvent(ctx, NewSyncEvent{
EventTime: u.clock.Now(),
EventTimestamp: u.clock.Now().UTC().Unix(),
AssetID: uniID.AssetID[:],
GroupKeyXOnly: groupKeyXOnly,
})
if err != nil {
return err
}
}

return nil
})
}

// LogNewProofEvent logs a new proof insertion event for the target universe.
func (u *UniverseStats) LogNewProofEvent(ctx context.Context,
uniID universe.Identifier, key universe.BaseKey) error {
Expand All @@ -165,6 +196,36 @@ func (u *UniverseStats) LogNewProofEvent(ctx context.Context,
})
}

// LogNewProofEvents logs new proof insertion events for the target universe.
func (u *UniverseStats) LogNewProofEvents(ctx context.Context,
uniIDs ...universe.Identifier) error {

var writeTxOpts UniverseStatsOptions
return u.db.ExecTx(ctx, &writeTxOpts, func(db UniverseStatsStore) error {
for idx := range uniIDs {
uniID := uniIDs[idx]
var groupKeyXOnly []byte
if uniID.GroupKey != nil {
groupKeyXOnly = schnorr.SerializePubKey(
uniID.GroupKey,
)
}

err := db.InsertNewProofEvent(ctx, NewProofEvent{
EventTime: u.clock.Now(),
EventTimestamp: u.clock.Now().UTC().Unix(),
AssetID: uniID.AssetID[:],
GroupKeyXOnly: groupKeyXOnly,
})
if err != nil {
return err
}
}

return nil
})
}

// AggregateSyncStats returns stats aggregated over all assets within the
// Universe.
func (u *UniverseStats) AggregateSyncStats(
Expand Down
44 changes: 44 additions & 0 deletions universe/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/btcsuite/btcd/btcec/v2/schnorr"
"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightninglabs/taproot-assets/proof"
)

Expand Down Expand Up @@ -249,6 +250,49 @@ func (a *MintingArchive) verifyIssuanceProof(ctx context.Context,
return assetSnapshot, nil
}

// RegisterNewIssuanceBatch inserts a batch of new minting leaves within the
// target universe tree (based on the ID), stored at the base key(s). We assume
// the proofs within the batch have already been checked that they don't yet
// exist in the local database.
func (a *MintingArchive) RegisterNewIssuanceBatch(ctx context.Context,
items []*IssuanceItem) error {

log.Debugf("Verifying %d new proofs for insertion into Universe",
len(items))

for idx := range items {
assetSnapshot, err := a.verifyIssuanceProof(ctx, items[idx])
if err != nil {
return err
}

items[idx].MetaReveal = assetSnapshot.MetaReveal
}

log.Debugf("Inserting %d verified proofs into Universe", len(items))
err := a.cfg.UniverseForest.RegisterBatchIssuance(ctx, items)
if err != nil {
return fmt.Errorf("unable to register new issuance proofs: %w",
err)
}

// Log a sync event for the newly inserted leaf in the background as an
// async goroutine.
ids := fn.Map(items, func(item *IssuanceItem) Identifier {
return item.ID
})
go func() {
err := a.cfg.UniverseStats.LogNewProofEvents(
context.Background(), ids...,
)
if err != nil {
log.Warnf("unable to log new proof events: %v", err)
}
}()

return nil
}

// FetchIssuanceProof attempts to fetch an issuance proof for the target base
// leaf based on the universe identifier (assetID/groupKey).
func (a *MintingArchive) FetchIssuanceProof(ctx context.Context, id Identifier,
Expand Down
22 changes: 22 additions & 0 deletions universe/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ type BaseMultiverse interface {
leaf *MintingLeaf,
metaReveal *proof.MetaReveal) (*IssuanceProof, error)

RegisterBatchIssuance(ctx context.Context, items []*IssuanceItem) error

// FetchIssuanceProof returns an issuance proof for the target key. If
// the key doesn't have a script key specified, then all the proofs for
// the minting outpoint will be returned. If neither are specified, then
Expand Down Expand Up @@ -283,6 +285,19 @@ type IssuanceItem struct {
MetaReveal *proof.MetaReveal
}

// BatchRegistrar is an interface that allows a caller to register a batch of
// issuance items within a base universe.
type BatchRegistrar interface {
Registrar

// RegisterNewIssuanceBatch inserts a batch of new minting leaves within
// the target universe tree (based on the ID), stored at the base
// key(s). We assume the proofs within the batch have already been
// checked that they don't yet exist in the local database.
RegisterNewIssuanceBatch(ctx context.Context,
items []*IssuanceItem) error
}

const (
// DefaultUniverseRPCPort is the default port that the universe RPC is
// hosted on.
Expand Down Expand Up @@ -683,11 +698,18 @@ type Telemetry interface {
LogSyncEvent(ctx context.Context, uniID Identifier,
key BaseKey) error

// LogSyncEvents logs sync events for the target universe.
LogSyncEvents(ctx context.Context, uniIDs ...Identifier) error

// LogNewProofEvent logs a new proof insertion event for the target
// universe.
LogNewProofEvent(ctx context.Context, uniID Identifier,
key BaseKey) error

// LogNewProofEvents logs new proof insertion events for the target
// universe.
LogNewProofEvents(ctx context.Context, uniIDs ...Identifier) error

// QuerySyncStats attempts to query the stats for the target universe.
// For a given asset ID, tag, or type, the set of universe stats is
// returned which lists information such as the total number of syncs
Expand Down
64 changes: 46 additions & 18 deletions universe/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/taproot-assets/fn"
Expand All @@ -30,7 +31,7 @@ type SimpleSyncCfg struct {
// LocalRegistrar is the registrar tied to a local Universe instance.
// This is used to insert new proof into the local DB as a result of
// the diff operation.
LocalRegistrar Registrar
LocalRegistrar BatchRegistrar
}

// SimpleSyncer is a simple implementation of the Syncer interface. It's based
Expand Down Expand Up @@ -167,7 +168,12 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot BaseRoot,

// Now that we know where the divergence is, we can fetch the issuance
// proofs from the remote party.
newLeaves := make(chan *MintingLeaf, len(keysToFetch))
var (
batchSize = 100
currentBatch = make([]*IssuanceItem, 0, batchSize)
fetchedLeaves = make([]*IssuanceItem, 0, len(keysToFetch))
mutex sync.Mutex
)
err = fn.ParSlice(
ctx, keysToFetch, func(ctx context.Context, key BaseKey) error {
newProof, err := diffEngine.FetchIssuanceProof(
Expand Down Expand Up @@ -195,22 +201,41 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot BaseRoot,
// TODO(roasbeef): inclusion w/ root here, also that
// it's the expected asset ID

log.Infof("UniverseRoot(%v): inserting new leaf",
uniID.String())
log.Tracef("UniverseRoot(%v): inserting new leaf for "+
"key=%v", uniID.String(), spew.Sdump(key))
item := &IssuanceItem{
ID: uniID,
Key: key,
Leaf: leafProof.Leaf,
}

mutex.Lock()
defer mutex.Unlock()

// TODO(roasbeef): this is actually giving a lagging
// proof for each of them
_, err = s.cfg.LocalRegistrar.RegisterIssuance(
ctx, uniID, key, leafProof.Leaf,
)
if err != nil {
return fmt.Errorf("unable to register "+
"issuance proof: %w", err)
currentBatch = append(currentBatch, item)
fetchedLeaves = append(fetchedLeaves, item)
if len(currentBatch) == batchSize {
log.Infof("UniverseRoot(%v): inserting %d "+
"new leaves", uniID.String(),
len(currentBatch))

reg := s.cfg.LocalRegistrar
err = reg.RegisterNewIssuanceBatch(
ctx, currentBatch,
)
if err != nil {
return fmt.Errorf("unable to register "+
"issuance proofs: %w", err)
}

currentBatch = make(
[]*IssuanceItem, 0, batchSize,
)

log.Debugf("UniverseRoot(%v): inserted %d "+
"new leaves, now have %d of %d",
uniID.String(), batchSize,
len(fetchedLeaves), len(keysToFetch))
}

newLeaves <- leafProof.Leaf
return nil
},
)
Expand All @@ -228,13 +253,16 @@ func (s *SimpleSyncer) syncRoot(ctx context.Context, remoteRoot BaseRoot,
result <- AssetSyncDiff{
OldUniverseRoot: localRoot,
NewUniverseRoot: remoteRoot,
NewLeafProofs: fn.Collect(newLeaves),
NewLeafProofs: fn.Map(
fetchedLeaves, func(i *IssuanceItem) *MintingLeaf {
return i.Leaf
},
),
}

log.Infof("Sync for UniverseRoot(%v) complete!", uniID.String())
log.Tracef("Sync for UniverseRoot(%v) complete! New "+
"universe_root=%v", uniID.String(),
spew.Sdump(remoteRoot))
"universe_root=%v", uniID.String(), spew.Sdump(remoteRoot))

return nil
}
Expand Down

0 comments on commit c8d3c82

Please sign in to comment.