Skip to content

Commit

Permalink
cln_plugin: handle timeouts on replayed htlcs
Browse files Browse the repository at this point in the history
  • Loading branch information
JssDWt committed Sep 14, 2023
1 parent c6a5798 commit efec935
Showing 1 changed file with 44 additions and 7 deletions.
51 changes: 44 additions & 7 deletions cln_plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,21 @@ func (s *server) HtlcStream(stream proto.ClnPlugin_HtlcStreamServer) error {
return fmt.Errorf("already subscribed")
}

s.htlcStream = stream

newTimeout := time.Now().Add(s.subscriberTimeout)
// Replay in-flight htlcs in fifo order
for pair := s.inflightHtlcs.Oldest(); pair != nil; pair = pair.Next() {
sendHtlcAccepted(stream, pair.Value)
err := sendHtlcAccepted(stream, pair.Value)
if err != nil {
s.mtx.Unlock()
return err
}

// Reset the subscriber timeout for this htlc.
pair.Value.timeout = newTimeout
}

s.htlcStream = stream

// Notify listeners that a new subscriber is active. Replace the chan with
// a new one immediately in case this subscriber is dropped later.
close(s.htlcnewSubscriber)
Expand Down Expand Up @@ -232,10 +240,6 @@ func (s *server) listenHtlcRequests() {
// Attempts to send a htlc_accepted message to the grpc client. The message will
// be held until a subscriber is active, or the subscriber timeout expires.
func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
s.mtx.Lock()
s.inflightHtlcs.Set(msg.id, msg)
s.mtx.Unlock()

for {
s.mtx.Lock()
stream := s.htlcStream
Expand Down Expand Up @@ -273,12 +277,22 @@ func (s *server) handleHtlcAccepted(msg *htlcAcceptedMsg) {
}
}

// Add the htlc to in-flight htlcs
s.mtx.Lock()
s.inflightHtlcs.Set(msg.id, msg)

// There is a subscriber. Attempt to send the htlc_accepted message.
err := sendHtlcAccepted(stream, msg)

// If there is no error, we're done.
if err == nil {
s.mtx.Unlock()
return
} else {
// Remove the htlc from inflight htlcs again on error, so it won't
// get replayed twice in a row.
s.inflightHtlcs.Delete(msg.id)
s.mtx.Unlock()
}

// If we end up here, there was an error sending the message to the
Expand Down Expand Up @@ -320,6 +334,11 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution {
s.mtx.Lock()
stream := s.htlcStream
ns := s.htlcnewSubscriber
oldestHtlc := s.inflightHtlcs.Oldest()
var htlcTimeout time.Duration = 1 << 62 // practically infinite
if oldestHtlc != nil {
htlcTimeout = time.Until(oldestHtlc.Value.timeout)
}
s.mtx.Unlock()

if stream == nil {
Expand All @@ -331,6 +350,24 @@ func (s *server) recvHtlcResolution() *proto.HtlcResolution {
case <-ns:
log.Printf("New subscription available for htlc receive, continue receive.")
continue
case <-time.After(htlcTimeout):
log.Printf(
"WARNING: htlc with id '%s' timed out after '%v' waiting "+
"for grpc subscriber: %+v",
oldestHtlc.Value.id,
s.subscriberTimeout,
oldestHtlc.Value.htlc,
)

// If the subscriber timeout expires while holding a htlc
// we short circuit the htlc by sending the default result
// (continue) to cln.
return &proto.HtlcResolution{
Correlationid: oldestHtlc.Value.id,
Outcome: &proto.HtlcResolution_Continue{
Continue: &proto.HtlcContinue{},
},
}
}
}

Expand Down

0 comments on commit efec935

Please sign in to comment.