Skip to content

Commit

Permalink
lsps2: simplify part handling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Sep 21, 2023
1 parent efec935 commit fb97e98
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 102 deletions.
184 changes: 83 additions & 101 deletions lsps2/intercept_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,18 @@ type InterceptorConfig struct {
}

type Interceptor struct {
store Lsps2Store
openingService shared.OpeningService
client lightning.Client
feeEstimator chain.FeeEstimator
config *InterceptorConfig
newPart chan *partState
partAwaitingRegistration chan *awaitingRegistrationEvent
registrationReady chan *registrationReadyEvent
notRegistered chan string
paymentReady chan string
paymentFailure chan *paymentFailureEvent
paymentChanOpened chan *paymentChanOpenedEvent
inflightPayments map[string]*paymentState
store Lsps2Store
openingService shared.OpeningService
client lightning.Client
feeEstimator chain.FeeEstimator
config *InterceptorConfig
newPart chan *partState
registrationReady chan *registrationReadyEvent
notRegistered chan string
paymentReady chan string
paymentFailure chan *paymentFailureEvent
paymentChanOpened chan *paymentChanOpenedEvent
inflightPayments map[string]*paymentState
}

func NewInterceptHandler(
Expand All @@ -62,14 +61,13 @@ func NewInterceptHandler(
feeEstimator: feeEstimator,
config: config,
// TODO: make sure the chan sizes do not lead to deadlocks.
newPart: make(chan *partState, 1000),
partAwaitingRegistration: make(chan *awaitingRegistrationEvent, 1000),
registrationReady: make(chan *registrationReadyEvent, 1000),
notRegistered: make(chan string, 1000),
paymentReady: make(chan string, 1000),
paymentFailure: make(chan *paymentFailureEvent, 1000),
paymentChanOpened: make(chan *paymentChanOpenedEvent, 1000),
inflightPayments: make(map[string]*paymentState),
newPart: make(chan *partState, 1000),
registrationReady: make(chan *registrationReadyEvent, 1000),
notRegistered: make(chan string, 1000),
paymentReady: make(chan string, 1000),
paymentFailure: make(chan *paymentFailureEvent, 1000),
paymentChanOpened: make(chan *paymentChanOpenedEvent, 1000),
inflightPayments: make(map[string]*paymentState),
}
}

Expand Down Expand Up @@ -142,8 +140,6 @@ func (i *Interceptor) Start(ctx context.Context) {
i.handleRegistrationReady(ev)
case paymentId := <-i.notRegistered:
i.handleNotRegistered(paymentId)
case ev := <-i.partAwaitingRegistration:
i.handlePartAwaitingRegistration(ev)
case paymentId := <-i.paymentReady:
i.handlePaymentReady(paymentId)
case ev := <-i.paymentFailure:
Expand All @@ -165,25 +161,6 @@ func (i *Interceptor) handleNewPart(part *partState) {
timeoutChan: make(chan struct{}),
}
i.inflightPayments[paymentId] = payment

go func() {
select {
case <-time.After(i.config.MppTimeout):
// Handle timeout inside the event loop, to make sure there are
// no race conditions, since this timeout watcher is running in
// a goroutine.
i.paymentFailure <- &paymentFailureEvent{
paymentId: paymentId,
code: shared.FAILURE_TEMPORARY_CHANNEL_FAILURE,
}
case <-payment.timeoutChan:
// Stop listening for timeouts when the payment is ready.
}
}()

// Fetch the buy registration in a goroutine, to avoid blocking the
// event loop.
go i.fetchRegistration(part.req.PaymentId(), part.req.Scid)
}

// Check whether we already have this part, because it may have been
Expand Down Expand Up @@ -213,67 +190,46 @@ func (i *Interceptor) handleNewPart(part *partState) {
return
}

i.partAwaitingRegistration <- &awaitingRegistrationEvent{
paymentId: part.req.PaymentId(),
partId: part.req.HtlcId(),
}
}

func (i *Interceptor) fetchRegistration(
paymentId string,
scid lightning.ShortChannelID,
) {
registration, err := i.store.GetBuyRegistration(
context.TODO(),
scid,
)
// If this is the first part for this payment, setup the timeout, and fetch
// the registration.
if !paymentExisted {
go func() {
select {
case <-time.After(i.config.MppTimeout):
// Handle timeout inside the event loop, to make sure there are
// no race conditions, since this timeout watcher is running in
// a goroutine.
i.paymentFailure <- &paymentFailureEvent{
paymentId: paymentId,
code: shared.FAILURE_TEMPORARY_CHANNEL_FAILURE,
}
case <-payment.timeoutChan:
// Stop listening for timeouts when the payment is ready.
}
}()

if err == ErrNotFound {
i.notRegistered <- paymentId
return
// Fetch the buy registration in a goroutine, to avoid blocking the
// event loop.
go i.fetchRegistration(part.req.PaymentId(), part.req.Scid)
}

if err != nil {
log.Printf(
"Failed to get buy registration for %v: %v",
uint64(scid),
err,
)
i.notRegistered <- paymentId
return
}

i.registrationReady <- &registrationReadyEvent{
paymentId: paymentId,
registration: registration,
// If the registration was already fetched, this part might complete the
// payment. Process the part. Otherwise, the part will be processed after
// the registration was fetched.
if payment.registration != nil {
i.processPart(payment, part)
}
}

func (i *Interceptor) handlePartAwaitingRegistration(ev *awaitingRegistrationEvent) {
payment, ok := i.inflightPayments[ev.paymentId]
if !ok {
// This part is already handled.
return
}

part, ok := payment.parts[ev.partId]
if !ok {
// This part is already handled.
return
}

if part.isFinalized {
// This part is already handled.
return
}

if payment.registration == nil {
// The registration is not yet ready, queue the part again.
i.partAwaitingRegistration <- ev
func (i *Interceptor) processPart(payment *paymentState, part *partState) {
if payment.registration.IsComplete {
i.failPart(payment, part, shared.FAILURE_UNKNOWN_NEXT_PEER)
return
}

if payment.registration.IsComplete {
// Fail parts that come in after the payment is already final. To avoid
// inconsistencies in the payment state.
if payment.isFinal {
i.failPart(payment, part, shared.FAILURE_UNKNOWN_NEXT_PEER)
return
}
Expand Down Expand Up @@ -356,13 +312,6 @@ func (i *Interceptor) handlePartAwaitingRegistration(ev *awaitingRegistrationEve
return
}

// Fail parts that come in after the payment is already final. To avoid
// inconsistencies in the payment state.
if payment.isFinal {
i.failPart(payment, part, shared.FAILURE_UNKNOWN_NEXT_PEER)
return
}

// This is a new part. Update the sum of htlcs currently
// in-flight.
payment.incomingSumMsat += part.req.IncomingAmountMsat
Expand All @@ -377,6 +326,36 @@ func (i *Interceptor) handlePartAwaitingRegistration(ev *awaitingRegistrationEve
}
}

func (i *Interceptor) fetchRegistration(
paymentId string,
scid lightning.ShortChannelID,
) {
registration, err := i.store.GetBuyRegistration(
context.TODO(),
scid,
)

if err == ErrNotFound {
i.notRegistered <- paymentId
return
}

if err != nil {
log.Printf(
"Failed to get buy registration for %v: %v",
uint64(scid),
err,
)
i.notRegistered <- paymentId
return
}

i.registrationReady <- &registrationReadyEvent{
paymentId: paymentId,
registration: registration,
}
}

func (i *Interceptor) handleRegistrationReady(ev *registrationReadyEvent) {
payment, ok := i.inflightPayments[ev.paymentId]
if !ok {
Expand All @@ -385,6 +364,9 @@ func (i *Interceptor) handleRegistrationReady(ev *registrationReadyEvent) {
}

payment.registration = ev.registration
for _, part := range payment.parts {
i.processPart(payment, part)
}
}

func (i *Interceptor) handleNotRegistered(paymentId string) {
Expand Down
1 change: 0 additions & 1 deletion lsps2/intercept_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ func assertEmpty(t *testing.T, i *Interceptor) {
assert.Empty(t, i.newPart)
assert.Empty(t, i.registrationReady)
assert.Empty(t, i.notRegistered)
assert.Empty(t, i.partAwaitingRegistration)
assert.Empty(t, i.paymentChanOpened)
assert.Empty(t, i.paymentFailure)
assert.Empty(t, i.paymentReady)
Expand Down

0 comments on commit fb97e98

Please sign in to comment.