Skip to content

Commit

Permalink
universe+tapgarden: support batch proof insertion on mint
Browse files Browse the repository at this point in the history
We also want to insert proofs in batches into the local universe when we
mint multiple assets.
  • Loading branch information
guggero committed Aug 18, 2023
1 parent 09b4d8a commit 4f4d331
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 72 deletions.
172 changes: 111 additions & 61 deletions tapgarden/caretaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,18 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)
// file system as well.
//
// TODO(roasbeef): rely on the upsert here instead
mintingProofBlobs := make(proof.AssetBlobs)
for _, newAsset := range batchCommitment.CommittedAssets() {
var (
committedAssets = batchCommitment.CommittedAssets()
mintingProofBlobs = make(
proof.AssetBlobs, len(committedAssets),
)
universeItems = make(
chan *universe.IssuanceItem,
len(committedAssets),
)
)
for idx := range committedAssets {
newAsset := committedAssets[idx]
assetID := newAsset.ID()
scriptPubKey := newAsset.ScriptKey.PubKey
scriptKey := asset.ToSerialized(scriptPubKey)
Expand Down Expand Up @@ -888,65 +898,105 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)

// Before we mark the batch as confirmed below, we'll
// also register the issuance of the new asset with our
// local base universe.
//
// TODO(roasbeef): can combine with minting proof
// creation above?
if b.cfg.Universe != nil {
// The universe ID serves to identifier the
// universe root we want to add this asset to.
// This is either the assetID or the group key.
uniID := universe.Identifier{
AssetID: assetID,
}

groupKey := newAsset.GroupKey
if groupKey != nil {
uniID.GroupKey = &groupKey.GroupPubKey
}

log.Debugf("Registering asset with "+
"universe, key=%v", spew.Sdump(uniID))

// The base key is the set of bytes that keys
// into the universe, this'll be the outpoint.
// where it was created at and the script key
// for that asset.
baseKey := universe.BaseKey{
MintingOutpoint: wire.OutPoint{
Hash: confInfo.Tx.TxHash(),
Index: b.anchorOutputIndex,
},
ScriptKey: &newAsset.ScriptKey,
}

// With both of those assembled, we can now
// register issuance which takes the amount and
// proof of the minting event.
uniGen := universe.GenesisWithGroup{
Genesis: newAsset.Genesis,
}
if groupKey != nil {
uniGen.GroupKey = groupKey
}
mintingLeaf := &universe.MintingLeaf{
GenesisWithGroup: uniGen,

// The universe tree store only the
// asset state transition and not also
// the proof file checksum (as the root
// is effectively a checksum), so we'll
// use just the state transition.
GenesisProof: mintingProof,
Amt: newAsset.Amount,
}
_, err = b.cfg.Universe.RegisterIssuance(
ctx, uniID, baseKey, mintingLeaf,
)
if err != nil {
return 0, fmt.Errorf("unable to "+
"register issuance: %v", err)
}
// local base universe. We skip this step if there is
// no universe configured.
if b.cfg.Universe == nil {
continue
}

// The universe ID serves to identifier the universe
// root we want to add this asset to. This is either the
// assetID or the group key.
uniID := universe.Identifier{
AssetID: assetID,
}

groupKey := newAsset.GroupKey
if groupKey != nil {
uniID.GroupKey = &groupKey.GroupPubKey
}

log.Debugf("Preparing asset for registration with "+
"universe, key=%v", spew.Sdump(uniID))

// The base key is the set of bytes that keys into the
// universe, this'll be the outpoint where it was
// created at and the script key for that asset.
baseKey := universe.BaseKey{
MintingOutpoint: wire.OutPoint{
Hash: confInfo.Tx.TxHash(),
Index: b.anchorOutputIndex,
},
ScriptKey: &newAsset.ScriptKey,
}

// With both of those assembled, we can now register
// issuance which takes the amount and proof of the
// minting event.
uniGen := universe.GenesisWithGroup{
Genesis: newAsset.Genesis,
}
if groupKey != nil {
uniGen.GroupKey = groupKey
}
mintingLeaf := &universe.MintingLeaf{
GenesisWithGroup: uniGen,

// The universe tree store only the asset state
// transition and not also the proof file
// checksum (as the root is effectively a
// checksum), so we'll use just the state
// transition.
GenesisProof: mintingProof,
Amt: newAsset.Amount,
}
universeItems <- &universe.IssuanceItem{
ID: uniID,
Key: baseKey,
Leaf: mintingLeaf,
}
}
close(universeItems)

// Batch insert the new proofs into the local universe, if we
// have one configured.
if b.cfg.Universe != nil {
var (
batchSize = 200
numItems int
uni = b.cfg.Universe
)
err = fn.CollectBatch(
ctx, universeItems, batchSize,
func(ctx context.Context,
batch []*universe.IssuanceItem) error {

numItems += len(batch)
log.Infof("Inserting %d new leaves "+
"(%d of %d) into local "+
"universe", len(batch),
numItems, len(committedAssets))

err := uni.RegisterNewIssuanceBatch(
ctx, batch,
)
if err != nil {
return fmt.Errorf("unable to "+
"register issuance "+
"batch: %w", err)
}

log.Infof("Inserted %d new leaves "+
"(%d of %d) into local "+
"universe", len(batch),
numItems, len(committedAssets))

return nil
},
)
if err != nil {
return 0, fmt.Errorf("unable to register "+
"issuance proofs: %w", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion tapgarden/planter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type GardenKit struct {

// Universe is used to register new asset issuance with a local/remote
// base universe instance.
Universe universe.Registrar
Universe universe.BatchRegistrar

// ProofWatcher is used to watch new proofs for their anchor transaction
// to be confirmed safely with a minimum number of confirmations.
Expand Down
91 changes: 81 additions & 10 deletions universe/auto_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type FederationConfig struct {
// LocalRegistrar is the local register. This'll be used to add new
// leaves (minting events) to our local server before pushing them out
// to the federation.
LocalRegistrar Registrar
LocalRegistrar BatchRegistrar

// SyncInterval is the period that we'll use to synchronize with the
// set of Universe servers.
Expand Down Expand Up @@ -73,6 +73,15 @@ type FederationPushReq struct {
err chan error
}

// FederationIssuanceBatchPushReq is used to push out a batch of new issuance
// events to all or some members of the federation.
type FederationIssuanceBatchPushReq struct {
IssuanceBatch []*IssuanceItem

resp chan struct{}
err chan error
}

// FederationEnvoy is used to manage synchronization between the set of
// federated Universe servers. It handles the periodic sync between universe
// servers, and can also be used to push out new locally created proofs to the
Expand All @@ -87,13 +96,16 @@ type FederationEnvoy struct {
stopOnce sync.Once

pushRequests chan *FederationPushReq

batchPushRequests chan *FederationIssuanceBatchPushReq
}

// NewFederationEnvoy creates a new federation envoy from the passed config.
func NewFederationEnvoy(cfg FederationConfig) *FederationEnvoy {
return &FederationEnvoy{
cfg: cfg,
pushRequests: make(chan *FederationPushReq),
cfg: cfg,
pushRequests: make(chan *FederationPushReq),
batchPushRequests: make(chan *FederationIssuanceBatchPushReq),
ContextGuard: &fn.ContextGuard{
DefaultTimeout: DefaultTimeout,
Quit: make(chan struct{}),
Expand Down Expand Up @@ -213,8 +225,8 @@ func (f *FederationEnvoy) syncUniverseState(ctx context.Context,

// pushProofToFederation attempts to push out a new proof to the current
// federation in parallel.
func (f *FederationEnvoy) pushProofToFederation(uniID Identifier,
newProof *IssuanceProof) {
func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key BaseKey,
leaf *MintingLeaf) {

ctx, cancel := f.WithCtxQuit()
defer cancel()
Expand All @@ -234,7 +246,7 @@ func (f *FederationEnvoy) pushProofToFederation(uniID Identifier,
}

log.Infof("Pushing new proof to %v federation members, proof_key=%v",
len(fedServers), spew.Sdump(newProof.MintingKey))
len(fedServers), spew.Sdump(key))

ctx, cancel = f.WithCtxQuitNoTimeout()
defer cancel()
Expand All @@ -250,7 +262,7 @@ func (f *FederationEnvoy) pushProofToFederation(uniID Identifier,
}

_, err = remoteUniverseServer.RegisterIssuance(
ctx, uniID, newProof.MintingKey, newProof.Leaf,
ctx, uniID, key, leaf,
)
return err
}
Expand Down Expand Up @@ -326,13 +338,13 @@ func (f *FederationEnvoy) syncer() {
// members.
case pushReq := <-f.pushRequests:
ctx, cancel := f.WithCtxQuit()
defer cancel()

// First, we'll attempt to registrar the issuance with
// the local registrar server.
newProof, err := f.cfg.LocalRegistrar.RegisterIssuance(
ctx, pushReq.ID, pushReq.Key, pushReq.Leaf,
)
cancel()
if err != nil {
err := fmt.Errorf("unable to insert proof "+
"into local universe: %w", err)
Expand All @@ -350,7 +362,43 @@ func (f *FederationEnvoy) syncer() {

// With the response sent above, we'll push this out to
// all the Universe servers in the background.
go f.pushProofToFederation(pushReq.ID, newProof)
go f.pushProofToFederation(
pushReq.ID, pushReq.Key, pushReq.Leaf,
)

case pushReq := <-f.batchPushRequests:
ctx, cancel := f.WithCtxQuitNoTimeout()

// First, we'll attempt to registrar the issuance with
// the local registrar server.
err := f.cfg.LocalRegistrar.RegisterNewIssuanceBatch(
ctx, pushReq.IssuanceBatch,
)
cancel()
if err != nil {
err := fmt.Errorf("unable to insert proof "+
"batch into local universe: %w", err)

log.Warnf(err.Error())

pushReq.err <- err
continue
}

// Now that we know we were able to register the proof,
// we'll return back to the caller.
pushReq.resp <- struct{}{}

// With the response sent above, we'll push this out to
// all the Universe servers in the background.
go func() {
for idx := range pushReq.IssuanceBatch {
item := pushReq.IssuanceBatch[idx]
f.pushProofToFederation(
item.ID, item.Key, item.Leaf,
)
}
}()

case <-f.Quit:
return
Expand All @@ -364,7 +412,7 @@ func (f *FederationEnvoy) syncer() {
// sent to the set of active universe servers.
//
// NOTE: This is part of the universe.Registrar interface.
func (f *FederationEnvoy) RegisterIssuance(ctx context.Context, id Identifier,
func (f *FederationEnvoy) RegisterIssuance(_ context.Context, id Identifier,
key BaseKey, leaf *MintingLeaf) (*IssuanceProof, error) {

pushReq := &FederationPushReq{
Expand All @@ -382,6 +430,29 @@ func (f *FederationEnvoy) RegisterIssuance(ctx context.Context, id Identifier,
return fn.RecvResp(pushReq.resp, pushReq.err, f.Quit)
}

// RegisterNewIssuanceBatch inserts a batch of new minting leaves within the
// target universe tree (based on the ID), stored at the base key(s). We assume
// the proofs within the batch have already been checked that they don't yet
// exist in the local database.
//
// NOTE: This is part of the universe.BatchRegistrar interface.
func (f *FederationEnvoy) RegisterNewIssuanceBatch(_ context.Context,
items []*IssuanceItem) error {

pushReq := &FederationIssuanceBatchPushReq{
IssuanceBatch: items,
resp: make(chan struct{}, 1),
err: make(chan error, 1),
}

if !fn.SendOrQuit(f.batchPushRequests, pushReq, f.Quit) {
return fmt.Errorf("unable to push new proof event batch")
}

_, err := fn.RecvResp(pushReq.resp, pushReq.err, f.Quit)
return err
}

// AddServer adds a new set of servers to the federation, then immediately
// performs a new background sync.
func (f *FederationEnvoy) AddServer(addrs ...ServerAddr) error {
Expand Down

0 comments on commit 4f4d331

Please sign in to comment.