-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
invoices: refactor InvoiceDB
to eliminate ScanInvoices
#8081
Changes from all commits
1f8065d
8ab267a
d7d385f
8bf7503
bedafca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1013,64 +1013,58 @@ func TestSettleIndexAmpPayments(t *testing.T) { | |
require.Nil(t, err) | ||
} | ||
|
||
// TestScanInvoices tests that ScanInvoices scans through all stored invoices | ||
// correctly. | ||
func TestScanInvoices(t *testing.T) { | ||
// TestFetchPendingInvoices tests that we can fetch all pending invoices from | ||
// the database using the FetchPendingInvoices method. | ||
func TestFetchPendingInvoices(t *testing.T) { | ||
t.Parallel() | ||
|
||
db, err := MakeTestInvoiceDB(t) | ||
db, err := MakeTestInvoiceDB(t, OptionClock(testClock)) | ||
require.NoError(t, err, "unable to make test db") | ||
|
||
var invoices map[lntypes.Hash]*invpkg.Invoice | ||
callCount := 0 | ||
resetCount := 0 | ||
|
||
// reset is used to reset/initialize results and is called once | ||
// upon calling ScanInvoices and when the underlying transaction is | ||
// retried. | ||
reset := func() { | ||
invoices = make(map[lntypes.Hash]*invpkg.Invoice) | ||
callCount = 0 | ||
resetCount++ | ||
} | ||
|
||
scanFunc := func(paymentHash lntypes.Hash, | ||
invoice *invpkg.Invoice) error { | ||
|
||
invoices[paymentHash] = invoice | ||
callCount++ | ||
|
||
return nil | ||
} | ||
|
||
ctxb := context.Background() | ||
// With an empty DB we expect to not scan any invoices. | ||
require.NoError(t, db.ScanInvoices(ctxb, scanFunc, reset)) | ||
require.Equal(t, 0, len(invoices)) | ||
require.Equal(t, 0, callCount) | ||
require.Equal(t, 1, resetCount) | ||
|
||
numInvoices := 5 | ||
testInvoices := make(map[lntypes.Hash]*invpkg.Invoice) | ||
// Make sure that fetching pending invoices from an empty database | ||
// returns an empty result and no errors. | ||
pending, err := db.FetchPendingInvoices(ctxb) | ||
require.NoError(t, err) | ||
require.Empty(t, pending) | ||
|
||
const numInvoices = 20 | ||
var settleIndex uint64 = 1 | ||
pendingInvoices := make(map[lntypes.Hash]invpkg.Invoice) | ||
|
||
// Now populate the DB and check if we can get all invoices with their | ||
// payment hashes as expected. | ||
for i := 1; i <= numInvoices; i++ { | ||
invoice, err := randInvoice(lnwire.MilliSatoshi(i)) | ||
amt := lnwire.MilliSatoshi(i * 1000) | ||
invoice, err := randInvoice(amt) | ||
require.NoError(t, err) | ||
|
||
invoice.CreationDate = invoice.CreationDate.Add( | ||
time.Duration(i-1) * time.Second, | ||
) | ||
|
||
paymentHash := invoice.Terms.PaymentPreimage.Hash() | ||
testInvoices[paymentHash] = invoice | ||
|
||
_, err = db.AddInvoice(ctxb, invoice, paymentHash) | ||
require.NoError(t, err) | ||
|
||
// Settle every second invoice. | ||
if i%2 == 0 { | ||
pendingInvoices[paymentHash] = *invoice | ||
continue | ||
} | ||
|
||
ref := invpkg.InvoiceRefByHash(paymentHash) | ||
_, err = db.UpdateInvoice(ctxb, ref, nil, getUpdateInvoice(amt)) | ||
require.NoError(t, err) | ||
|
||
settleTestInvoice(invoice, settleIndex) | ||
yyforyongyu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
settleIndex++ | ||
} | ||
|
||
resetCount = 0 | ||
require.NoError(t, db.ScanInvoices(ctxb, scanFunc, reset)) | ||
require.Equal(t, numInvoices, callCount) | ||
require.Equal(t, testInvoices, invoices) | ||
require.Equal(t, 1, resetCount) | ||
// Fetch all pending invoices. | ||
pending, err = db.FetchPendingInvoices(ctxb) | ||
require.NoError(t, err) | ||
require.Equal(t, pendingInvoices, pending) | ||
} | ||
|
||
// TestDuplicateSettleInvoice tests that if we add a new invoice and settle it | ||
|
@@ -3092,6 +3086,65 @@ func TestDeleteInvoices(t *testing.T) { | |
assertInvoiceCount(0) | ||
} | ||
|
||
// TestDeleteCanceledInvoices tests that deleting canceled invoices with the | ||
// specific DeleteCanceledInvoices method works correctly. | ||
func TestDeleteCanceledInvoices(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be great if we could add tests to cover more branches in this method, but guess it's too much work as most stuff here is not mocked. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, maybe we should return to the test coverage question later on once we have both SQL and KV implementations run against this test suite since I'm afraid many of the branches we'd test will go away as we deprecate the KV implementation. Similarly |
||
t.Parallel() | ||
|
||
db, err := MakeTestInvoiceDB(t) | ||
require.NoError(t, err, "unable to make test db") | ||
|
||
// Updatefunc is used to cancel an invoice. | ||
updateFunc := func(invoice *invpkg.Invoice) ( | ||
*invpkg.InvoiceUpdateDesc, error) { | ||
|
||
return &invpkg.InvoiceUpdateDesc{ | ||
UpdateType: invpkg.CancelInvoiceUpdate, | ||
State: &invpkg.InvoiceStateUpdateDesc{ | ||
NewState: invpkg.ContractCanceled, | ||
}, | ||
}, nil | ||
} | ||
|
||
// Add some invoices to the test db. | ||
ctxb := context.Background() | ||
var invoices []invpkg.Invoice | ||
for i := 0; i < 10; i++ { | ||
invoice, err := randInvoice(lnwire.MilliSatoshi(i + 1)) | ||
require.NoError(t, err) | ||
|
||
paymentHash := invoice.Terms.PaymentPreimage.Hash() | ||
_, err = db.AddInvoice(ctxb, invoice, paymentHash) | ||
require.NoError(t, err) | ||
|
||
// Cancel every second invoice. | ||
if i%2 == 0 { | ||
invoice, err = db.UpdateInvoice( | ||
ctxb, invpkg.InvoiceRefByHash(paymentHash), nil, | ||
updateFunc, | ||
) | ||
require.NoError(t, err) | ||
} else { | ||
invoices = append(invoices, *invoice) | ||
} | ||
} | ||
|
||
// Delete canceled invoices. | ||
require.NoError(t, db.DeleteCanceledInvoices(ctxb)) | ||
|
||
// Query to collect all invoices. | ||
query := invpkg.InvoiceQuery{ | ||
IndexOffset: 0, | ||
NumMaxInvoices: math.MaxUint64, | ||
} | ||
|
||
dbInvoices, err := db.QueryInvoices(ctxb, query) | ||
require.NoError(t, err) | ||
|
||
// Check that we really have the expected invoices. | ||
require.Equal(t, invoices, dbInvoices.Invoices) | ||
} | ||
|
||
// TestAddInvoiceInvalidFeatureDeps asserts that inserting an invoice with | ||
// invalid transitive feature dependencies fails with the appropriate error. | ||
func TestAddInvoiceInvalidFeatureDeps(t *testing.T) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -433,17 +433,18 @@ func fetchInvoiceNumByRef(invoiceIndex, payAddrIndex, setIDIndex kvdb.RBucket, | |
} | ||
} | ||
|
||
// ScanInvoices scans through all invoices and calls the passed scanFunc for | ||
// for each invoice with its respective payment hash. Additionally a reset() | ||
// closure is passed which is used to reset/initialize partial results and also | ||
// to signal if the kvdb.View transaction has been retried. | ||
func (d *DB) ScanInvoices(_ context.Context, scanFunc invpkg.InvScanFunc, | ||
reset func()) error { | ||
|
||
return kvdb.View(d, func(tx kvdb.RTx) error { | ||
// FetchPendingInvoices returns all invoices that have not yet been settled or | ||
// canceled. The returned map is keyed by the payment hash of each respective | ||
// invoice. | ||
func (d *DB) FetchPendingInvoices(_ context.Context) ( | ||
map[lntypes.Hash]invpkg.Invoice, error) { | ||
|
||
result := make(map[lntypes.Hash]invpkg.Invoice) | ||
|
||
err := kvdb.View(d, func(tx kvdb.RTx) error { | ||
invoices := tx.ReadBucket(invoiceBucket) | ||
if invoices == nil { | ||
return invpkg.ErrNoInvoicesCreated | ||
return nil | ||
} | ||
|
||
invoiceIndex := invoices.NestedReadBucket(invoiceIndexBucket) | ||
|
@@ -472,12 +473,23 @@ func (d *DB) ScanInvoices(_ context.Context, scanFunc invpkg.InvScanFunc, | |
return err | ||
} | ||
|
||
var paymentHash lntypes.Hash | ||
copy(paymentHash[:], k) | ||
if invoice.IsPending() { | ||
var paymentHash lntypes.Hash | ||
copy(paymentHash[:], k) | ||
result[paymentHash] = invoice | ||
} | ||
|
||
return scanFunc(paymentHash, &invoice) | ||
return nil | ||
}) | ||
}, reset) | ||
}, func() { | ||
result = make(map[lntypes.Hash]invpkg.Invoice) | ||
}) | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return result, nil | ||
} | ||
|
||
// QueryInvoices allows a caller to query the invoice database for invoices | ||
|
@@ -2702,6 +2714,102 @@ func delAMPSettleIndex(invoiceNum []byte, invoices, | |
return nil | ||
} | ||
|
||
// DeleteCanceledInvoices deletes all canceled invoices from the database. | ||
func (d *DB) DeleteCanceledInvoices(_ context.Context) error { | ||
return kvdb.Update(d, func(tx kvdb.RwTx) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we use batch here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's probably not a good idea since we scan through all the invoices here and bolt's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it depends on whether there's an error and how we handle the error, from this bench test, if we distinguish the errors here, each closure is executed exactly once. |
||
invoices := tx.ReadWriteBucket(invoiceBucket) | ||
if invoices == nil { | ||
return nil | ||
} | ||
|
||
invoiceIndex := invoices.NestedReadWriteBucket( | ||
invoiceIndexBucket, | ||
) | ||
if invoiceIndex == nil { | ||
return invpkg.ErrNoInvoicesCreated | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Think we should unify the behaviors here - in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case is different though right? We do return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could it be the case that this is just an empty bucket? like all the invoices are deleted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, if the bucket is empty we'd still get back the handle. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool - then I'm curious in what case will this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's there to give us some reassurance that about expected existence of those buckets. Similar checks are done for other channeldb methods too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cool then I interpret it as a db corruption check. |
||
} | ||
|
||
invoiceAddIndex := invoices.NestedReadWriteBucket( | ||
addIndexBucket, | ||
) | ||
if invoiceAddIndex == nil { | ||
return invpkg.ErrNoInvoicesCreated | ||
} | ||
|
||
payAddrIndex := tx.ReadWriteBucket(payAddrIndexBucket) | ||
|
||
return invoiceIndex.ForEach(func(k, v []byte) error { | ||
// Skip the special numInvoicesKey as that does not | ||
// point to a valid invoice. | ||
if bytes.Equal(k, numInvoicesKey) { | ||
return nil | ||
} | ||
|
||
// Skip sub-buckets. | ||
if v == nil { | ||
return nil | ||
} | ||
|
||
invoice, err := fetchInvoice(v, invoices) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if invoice.State != invpkg.ContractCanceled { | ||
return nil | ||
} | ||
|
||
// Delete the payment hash from the invoice index. | ||
err = invoiceIndex.Delete(k) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Delete payment address index reference if there's a | ||
// valid payment address. | ||
if invoice.Terms.PaymentAddr != invpkg.BlankPayAddr { | ||
// To ensure consistency check that the already | ||
// fetched invoice key matches the one in the | ||
// payment address index. | ||
key := payAddrIndex.Get( | ||
invoice.Terms.PaymentAddr[:], | ||
) | ||
if bytes.Equal(key, k) { | ||
// Delete from the payment address | ||
// index. | ||
if err := payAddrIndex.Delete( | ||
invoice.Terms.PaymentAddr[:], | ||
); err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
// Remove from the add index. | ||
var addIndexKey [8]byte | ||
byteOrder.PutUint64(addIndexKey[:], invoice.AddIndex) | ||
err = invoiceAddIndex.Delete(addIndexKey[:]) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Note that we don't need to delete the invoice from | ||
// the settle index as it is not added until the | ||
// invoice is settled. | ||
|
||
// Now remove all sub invoices. | ||
err = delAMPInvoices(k, invoices) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Finally remove the serialized invoice from the | ||
// invoice bucket. | ||
return invoices.Delete(k) | ||
}) | ||
}, func() {}) | ||
} | ||
|
||
// DeleteInvoice attempts to delete the passed invoices from the database in | ||
// one transaction. The passed delete references hold all keys required to | ||
// delete the invoices without also needing to deserialze them. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also add a case here to test the empty slice returned from
FetchPendingInvoices
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good idea! Done.