forked from lightninglabs/lndinit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cmd_wait_ready.go
190 lines (159 loc) · 4.91 KB
/
cmd_wait_ready.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package main
import (
"context"
"crypto/tls"
"fmt"
"math"
"time"
"github.com/jessevdk/go-flags"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/signal"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var (
connectionRetryInterval = time.Millisecond * 250
)
type waitReadyCommand struct {
RPCServer string `long:"rpcserver" description:"The host:port of lnd's RPC listener"`
Timeout time.Duration `long:"timeout" description:"The maximum time we'll wait for lnd to become ready; 0 means wait forever"`
}
func newWaitReadyCommand() *waitReadyCommand {
return &waitReadyCommand{
RPCServer: defaultRPCServer,
}
}
func (x *waitReadyCommand) Register(parser *flags.Parser) error {
_, err := parser.AddCommand(
"wait-ready",
"Wait for lnd to be fully ready",
"Wait for lnd to be fully started, unlocked and ready to "+
"serve RPC requests; will wait and block forever "+
"until either lnd reports its status as ready or the "+
"given timeout is reached; the RPC connection to lnd "+
"is re-tried indefinitely and errors are ignored (or "+
"logged in verbose mode) until success or timeout; "+
"requires lnd v0.13.0-beta or later to work",
x,
)
return err
}
func (x *waitReadyCommand) Execute(_ []string) error {
// Since this will potentially run forever, make sure we catch any
// interrupt signals.
shutdown, err := signal.Intercept()
if err != nil {
return fmt.Errorf("error intercepting signals: %v", err)
}
started := time.Now()
timeout := time.Duration(math.MaxInt64)
if x.Timeout > 0 {
timeout = x.Timeout
log("Will time out in %v (%s)", timeout, started.Add(timeout))
}
return waitUntilStatus(
x.RPCServer, lnrpc.WalletState_SERVER_ACTIVE, timeout,
shutdown.ShutdownChannel(),
)
}
func waitUntilStatus(rpcServer string, desiredState lnrpc.WalletState,
timeout time.Duration, shutdown <-chan struct{}) error {
log("Waiting for lnd to become ready (want state %v)", desiredState)
connectionRetryTicker := time.NewTicker(connectionRetryInterval)
timeoutChan := time.After(timeout)
connectionLoop:
for {
log("Attempting to connect to RPC server %s", rpcServer)
conn, err := getStatusConnection(rpcServer)
if err != nil {
log("Connection to lnd not successful: %v", err)
select {
case <-connectionRetryTicker.C:
case <-timeoutChan:
return fmt.Errorf("timeout reached")
case <-shutdown:
return nil
}
continue
}
log("Attempting to subscribe to the wallet state")
statusStream, err := conn.SubscribeState(
context.Background(), &lnrpc.SubscribeStateRequest{},
)
if err != nil {
log("Status subscription for lnd not successful: %v",
err)
select {
case <-connectionRetryTicker.C:
case <-timeoutChan:
return fmt.Errorf("timeout reached")
case <-shutdown:
return nil
}
continue
}
for {
// Have we reached the global timeout yet?
select {
case <-timeoutChan:
return fmt.Errorf("timeout reached")
case <-shutdown:
return nil
default:
}
msg, err := statusStream.Recv()
if err != nil {
log("Error receiving status update: %v", err)
select {
case <-connectionRetryTicker.C:
case <-timeoutChan:
return fmt.Errorf("timeout reached")
case <-shutdown:
return nil
}
// Something went wrong, perhaps lnd shut down
// before becoming active. Let's retry the whole
// connection again.
continue connectionLoop
}
log("Received update from lnd, wallet status is now: "+
"%v", msg.State)
// We've arrived at the final state!
if msg.State == desiredState {
return nil
}
// If we're waiting for a state that is at the very
// beginning of the list (e.g. NON_EXISTENT) then we
// need to return an error if a state further down the
// list is returned, as that would mean we skipped over
// it. The only exception is the WAITING_TO_START since
// that is actually the largest number a state can have.
if msg.State != lnrpc.WalletState_WAITING_TO_START &&
msg.State > desiredState {
return fmt.Errorf("received state %v which "+
"is greater than %v", msg.State,
desiredState)
}
// Let's wait for another status message to arrive.
}
}
}
func getStatusConnection(rpcServer string) (lnrpc.StateClient, error) {
// Don't bother with checking the cert, we're not sending any macaroons
// to the server anyway.
creds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
// We need to use a custom dialer so we can also connect to unix sockets
// and not just TCP addresses.
genericDialer := lncfg.ClientAddressDialer(defaultRPCPort)
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
grpc.WithContextDialer(genericDialer),
}
conn, err := grpc.Dial(rpcServer, opts...)
if err != nil {
return nil, fmt.Errorf("unable to connect to RPC server: %v",
err)
}
return lnrpc.NewStateClient(conn), nil
}