diff --git a/README.md b/README.md index 4767e66..bfee08d 100644 --- a/README.md +++ b/README.md @@ -20,26 +20,25 @@ USAGE: balance-agent [global options] command [command options] [arguments...] VERSION: - v0.0.22 + v0.0.38 COMMANDS: help, h Shows a list of commands or help for one command GLOBAL OPTIONS: - --apikey value api key - --rpcserver value host:port of ln daemon (default: "localhost:10009") - --lnddir value path to lnd's base directory (default: "/home/user/lnd") - --tlscertpath value path to TLS certificate (default: "/home/user/lnd/tls.cert") - --chain value, -c value the chain lnd is running on e.g. bitcoin (default: "bitcoin") - --network value, -n value the network lnd is running on e.g. mainnet, testnet, etc. (default: "mainnet") - --macaroonpath value path to macaroon file - --allowedentropy value allowed entropy in bits for channel balances (default: 64) - --interval value interval to poll - 10s, 1m, 10m or 1h (default: "10s") - --private report private data as well (default: false) - --preferipv4 If you have the choice between IPv6 and IPv4 prefer IPv4 (default: false) - --verbosity value log level for V logs (default: 0) - --help, -h show help - --version, -v print the version + --allowedentropy value allowed entropy in bits for channel balances (default: 64) + --apikey value api key + --interval value interval to poll - 10s, 1m, 10m or 1h (default: "10s") + --lnddir value path to lnd's base directory (default: "/home/user/.lnd") + --macaroonpath value path to macaroon file + --private report private data as well (default: false) + --preferipv4 If you have the choice between IPv6 and IPv4 prefer IPv4 (default: false) + --rpcserver value host:port of ln daemon (default: "localhost:10009") + --tlscertpath value path to TLS certificate (default: "/home/user/.lnd/tls.cert") + --channel-whitelist value Path to file containing a whitelist of channels + --verbosity value log level for V logs (default: 0) + --help, -h show help + --version, -v print the version ``` It tries the best to have sane defaults so you can just start it up on your node without further hassle. @@ -129,11 +128,39 @@ Usage: docker run -v /tmp:/tmp -e API_KEY=changeme ghcr.io/bolt-observer/agent:v0.0.35 ``` +## Filtering on agent side + +You can limit what channnels are reported using `--channel-whitelist` option. It specifies a local file path to be used as a whitelist of what channels to report. +When adding `--private` to `--channel-whitelist` this means every private channel AND whatever is listed in the file. There is also `--public` to allow all public channels. +Using the options without `--channel-whitelist` makes no sense since by default all public channels are reported however it has to be explicit with `--channel-whitelist` in order +to automatically allow all public channels (beside what is allowed through the file). +If you set `--private` and `--public` then no matter what you add to the `--channel-whitelist` file everything will be reported. + +The file looks like this: + +``` +# Comments start with a # character +# You can list pubkeys for example: +0288037d3f0bdcfb240402b43b80cdc32e41528b3e2ebe05884aff507d71fca71a # bolt.observer +# which means any channel where peer pubkey is this +# or you can specify a specific short channel id e.g., +759930760125546497 +# too, invalid lines like +whatever +# will be ignored (and logged as a warning, aliases also don't work!) +# Validity for channel id is not checked (it just has to be numeric), thus: +1337 +# is perfectly valid (altho it won't match and thus allow the reporting of +# any additional channel). +# Empty files means nothing - in whitelist context: do not report anything. +``` + ## Components Internally we use: * [channelchecker](./channelchecker): an abstraction for checking all channels * [nodeinfo](./nodeinfo): this can basically report `lncli getnodeinfo` for your node - it is used by the agent so we have a full view of node info & channels +* [filter](./filter): this is used to filter specific channels on the agent side * [checkermonitoring](./checkermonitoring): is used for reporting metrics via Graphite (not used directly in balance-agent here) * [lightning_api](./lightning_api): an abstraction around lightning node API (that furthermore heavily depends on common code from [lnd](https://github.com/lightningnetwork/lnd)) diff --git a/channelchecker/channelchecker.go b/channelchecker/channelchecker.go index 8f26f29..c8c7110 100644 --- a/channelchecker/channelchecker.go +++ b/channelchecker/channelchecker.go @@ -14,6 +14,7 @@ import ( checkermonitoring "github.com/bolt-observer/agent/checkermonitoring" entities "github.com/bolt-observer/agent/entities" + "github.com/bolt-observer/agent/filter" api "github.com/bolt-observer/agent/lightning_api" common_entities "github.com/bolt-observer/go_common/entities" utils "github.com/bolt-observer/go_common/utils" @@ -109,6 +110,12 @@ func (c *ChannelChecker) Subscribe( settings.NoopInterval = c.keepAliveInterval } + if settings.Filter == nil { + glog.V(3).Infof("Filter was nil, allowing everything") + f, _ := filter.NewAllowAllFilter() + settings.Filter = f + } + c.globalSettings.Set(info.IdentityPubkey+uniqueId, Settings{ identifier: entities.NodeIdentifier{Identifier: pubKey, UniqueId: uniqueId}, settings: settings, @@ -140,6 +147,11 @@ func (c *ChannelChecker) GetState( return nil, errors.New("invalid pubkey") } + if settings.Filter == nil { + f, _ := filter.NewAllowAllFilter() + settings.Filter = f + } + resp, err := c.checkOne(entities.NodeIdentifier{Identifier: pubKey, UniqueId: uniqueId}, getApi, settings, true, false) if err != nil { return nil, err @@ -156,7 +168,9 @@ func (c *ChannelChecker) getChannelList( api api.LightingApiCalls, info *api.InfoApi, precisionBits int, - allowPrivateChans bool) ([]entities.ChannelBalance, SetOfChanIds, error) { + allowPrivateChans bool, + filter filter.FilterInterface, +) ([]entities.ChannelBalance, SetOfChanIds, error) { defer c.monitoring.MetricsTimer("channellist", map[string]string{"pubkey": info.IdentityPubkey})() @@ -180,6 +194,12 @@ func (c *ChannelChecker) getChannelList( for _, channel := range channels.Channels { if channel.Private && !allowPrivateChans { + glog.V(3).Infof("Skipping private channel %v", channel.ChanId) + continue + } + + if !filter.AllowChanId(channel.ChanId) && !filter.AllowPubKey(channel.RemotePubkey) && !filter.AllowSpecial(channel.Private) { + glog.V(3).Infof("Filtering channel %v", channel.ChanId) continue } @@ -474,7 +494,7 @@ func (c *ChannelChecker) checkOne( identifier.Identifier = info.IdentityPubkey } - channelList, set, err := c.getChannelList(api, info, settings.AllowedEntropy, settings.AllowPrivateChannels) + channelList, set, err := c.getChannelList(api, info, settings.AllowedEntropy, settings.AllowPrivateChannels, settings.Filter) if err != nil { c.monitoring.MetricsReport("checkone", "failure", map[string]string{"pubkey": pubkey}) return nil, err diff --git a/channelchecker/channelchecker_test.go b/channelchecker/channelchecker_test.go index 26251fa..7746a27 100644 --- a/channelchecker/channelchecker_test.go +++ b/channelchecker/channelchecker_test.go @@ -14,6 +14,7 @@ import ( miniredis "github.com/alicebob/miniredis/v2" agent_entities "github.com/bolt-observer/agent/entities" + "github.com/bolt-observer/agent/filter" lightning_api "github.com/bolt-observer/agent/lightning_api" entities "github.com/bolt-observer/go_common/entities" utils "github.com/bolt-observer/go_common/utils" @@ -216,6 +217,129 @@ func TestBasicFlow(t *testing.T) { } } +func TestBasicFlowFilterOne(t *testing.T) { + pubKey, api, d := initTest(t) + + d.HttpApi.DoFunc = func(req *http.Request) (*http.Response, error) { + contents := "" + if strings.Contains(req.URL.Path, "v1/getinfo") { + contents = getInfoJson("02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256") + } else if strings.Contains(req.URL.Path, "v1/channels") { + contents = getChannelJson(1337, false, true) + } + + r := ioutil.NopCloser(bytes.NewReader([]byte(contents))) + + return &http.Response{ + StatusCode: 200, + Body: r, + }, nil + } + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(15*time.Second)) + + c := NewDefaultChannelChecker(ctx, time.Duration(0), true, false, nil) + // Make everything a bit faster + c.OverrideLoopInterval(1 * time.Second) + was_called := false + + f, _ := filter.NewUnitTestFilter() + fd := f.(*filter.UnitTestFilter) + fd.AddAllowChanId(1) + fd.AddAllowChanId(1337) + + c.Subscribe( + pubKey, "random_id", + func() lightning_api.LightingApiCalls { return api }, + agent_entities.ReportingSettings{ + AllowedEntropy: 64, + PollInterval: agent_entities.SECOND, + AllowPrivateChannels: true, + Filter: f, + }, + func(ctx context.Context, report *agent_entities.ChannelBalanceReport) bool { + if len(report.ChangedChannels) == 1 && report.UniqueId == "random_id" { + was_called = true + } + + cancel() + return true + }, + ) + + c.EventLoop() + + select { + case <-time.After(5 * time.Second): + t.Fatal("Took too long") + case <-ctx.Done(): + if !was_called { + t.Fatalf("Callback was not correctly invoked") + } + } +} + +func TestBasicFlowFilterTwo(t *testing.T) { + pubKey, api, d := initTest(t) + + d.HttpApi.DoFunc = func(req *http.Request) (*http.Response, error) { + contents := "" + if strings.Contains(req.URL.Path, "v1/getinfo") { + contents = getInfoJson("02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256") + } else if strings.Contains(req.URL.Path, "v1/channels") { + contents = getChannelJson(1337, false, true) + } + + r := ioutil.NopCloser(bytes.NewReader([]byte(contents))) + + return &http.Response{ + StatusCode: 200, + Body: r, + }, nil + } + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(15*time.Second)) + + c := NewDefaultChannelChecker(ctx, time.Duration(0), true, false, nil) + // Make everything a bit faster + c.OverrideLoopInterval(1 * time.Second) + was_called := false + + f, _ := filter.NewUnitTestFilter() + fd := f.(*filter.UnitTestFilter) + fd.AddAllowPubKey("02004c625d622245606a1ea2c1c69cfb4516b703b47945a3647713c05fe4aaeb1c") + + c.Subscribe( + pubKey, "random_id", + func() lightning_api.LightingApiCalls { return api }, + agent_entities.ReportingSettings{ + AllowedEntropy: 64, + PollInterval: agent_entities.SECOND, + AllowPrivateChannels: true, + Filter: f, + }, + func(ctx context.Context, report *agent_entities.ChannelBalanceReport) bool { + if len(report.ChangedChannels) == 2 && report.UniqueId == "random_id" { + was_called = true + } + + cancel() + return true + }, + ) + + c.EventLoop() + + select { + case <-time.After(5 * time.Second): + t.Fatal("Took too long") + case <-ctx.Done(): + if !was_called { + t.Fatalf("Callback was not correctly invoked") + } + } +} + func TestContextCanBeNil(t *testing.T) { pubKey, api, d := initTest(t) diff --git a/cmd/balance-agent/main.go b/cmd/balance-agent/main.go index 5231640..bd67755 100644 --- a/cmd/balance-agent/main.go +++ b/cmd/balance-agent/main.go @@ -11,10 +11,12 @@ import ( "net" "net/http" "os" + "os/signal" "os/user" "path/filepath" "strings" "sync" + "syscall" "time" "github.com/btcsuite/btcd/btcutil" @@ -23,6 +25,7 @@ import ( channelchecker "github.com/bolt-observer/agent/channelchecker" "github.com/bolt-observer/agent/checkermonitoring" + "github.com/bolt-observer/agent/filter" api "github.com/bolt-observer/agent/lightning_api" "github.com/bolt-observer/agent/nodeinfo" entities "github.com/bolt-observer/go_common/entities" @@ -36,6 +39,7 @@ const ( defaultChainSubDir = "chain" defaultTLSCertFilename = "tls.cert" defaultMacaroonFilename = "readonly.macaroon" + whitelist = "channel-whitelist" ) var ( @@ -204,59 +208,58 @@ func getApp() *cli.App { app.Version = GitRevision app.Flags = []cli.Flag{ + &cli.IntFlag{ + Name: "allowedentropy", + Usage: "allowed entropy in bits for channel balances", + Value: 64, + }, &cli.StringFlag{ Name: "apikey", Value: "", Usage: "api key", }, &cli.StringFlag{ - Name: "rpcserver", - Value: defaultRPCHostPort, - Usage: "host:port of ln daemon", + Name: "interval", + Usage: "interval to poll - 10s, 1m, 10m or 1h", + Value: "10s", }, &cli.StringFlag{ Name: "lnddir", Value: defaultLndDir, Usage: "path to lnd's base directory", }, - &cli.StringFlag{ - Name: "tlscertpath", - Value: defaultTLSCertPath, - Usage: "path to TLS certificate", - }, - &cli.StringFlag{ - Name: "chain, c", - Usage: "the chain lnd is running on e.g. bitcoin", - Value: "bitcoin", - }, - &cli.StringFlag{ - Name: "network, n", - Usage: "the network lnd is running on e.g. mainnet, " + - "testnet, etc.", - Value: "mainnet", - }, &cli.StringFlag{ Name: "macaroonpath", Usage: "path to macaroon file", }, - &cli.IntFlag{ - Name: "allowedentropy", - Usage: "allowed entropy in bits for channel balances", - Value: 64, - }, - &cli.StringFlag{ - Name: "interval", - Usage: "interval to poll - 10s, 1m, 10m or 1h", - Value: "10s", - }, &cli.BoolFlag{ Name: "private", Usage: "report private data as well (default: false)", }, + &cli.BoolFlag{ + Name: "public", + Usage: fmt.Sprintf("report public data - useful with %s (default: false)", whitelist), + Hidden: true, + }, &cli.BoolFlag{ Name: "preferipv4", Usage: "If you have the choice between IPv6 and IPv4 prefer IPv4 (default: false)", }, + &cli.StringFlag{ + Name: "rpcserver", + Value: defaultRPCHostPort, + Usage: "host:port of ln daemon", + }, + &cli.StringFlag{ + Name: "tlscertpath", + Value: defaultTLSCertPath, + Usage: "path to TLS certificate", + }, + &cli.StringFlag{ + Name: whitelist, + Usage: "Path to file containing a whitelist of channels", + Hidden: true, + }, &cli.BoolFlag{ Name: "userest", Usage: "Use REST API when true instead of gRPC", @@ -303,6 +306,20 @@ func getApp() *cli.App { Usage: "Ignore CLN socket", Hidden: true, }, + + &cli.StringFlag{ + Name: "chain, c", + Usage: "the chain lnd is running on e.g. bitcoin", + Value: "bitcoin", + Hidden: true, + }, + &cli.StringFlag{ + Name: "network, n", + Usage: "the network lnd is running on e.g. mainnet, " + + "testnet, etc.", + Value: "mainnet", + Hidden: true, + }, } app.Flags = append(app.Flags, glogFlags...) @@ -508,6 +525,54 @@ func mkGetLndApi(ctx *cli.Context) agent_entities.NewApiCall { } } +func reloadConfig(ctx context.Context, f *filter.FileFilter) { + glog.Info("Reloading configuration...") + + if f != nil { + err := f.Reload() + if err != nil { + glog.Warningf("Error while reloading configuration %v", err) + } + } +} + +func signalHandler(ctx context.Context, f filter.FilterInterface) { + ff, ok := f.(*filter.FileFilter) + + signal_chan := make(chan os.Signal, 1) + exit_chan := make(chan int) + + signal.Notify(signal_chan, + syscall.SIGHUP) + go func() { + for { + select { + case s := <-signal_chan: + switch s { + case syscall.SIGHUP: + if ok { + reloadConfig(ctx, ff) + } else { + reloadConfig(ctx, nil) + } + + case syscall.SIGTERM: + exit_chan <- 0 + case syscall.SIGQUIT: + exit_chan <- 0 + default: + fmt.Println("Unknown signal.") + exit_chan <- 1 + } + case <-ctx.Done(): + return + } + } + }() + code := <-exit_chan + os.Exit(code) +} + func checker(ctx *cli.Context) error { apiKey = utils.GetEnvWithDefault("API_KEY", "") if apiKey == "" { @@ -515,12 +580,44 @@ func checker(ctx *cli.Context) error { } if apiKey == "" && (ctx.String("url") != "" || ctx.String("nodeurl") != "") { - return fmt.Errorf("missing API key") + // We don't return error here since we don't want glog to handle it + fmt.Fprintf(os.Stderr, "missing API key (use --apikey or set API_KEY environment variable)\n") + os.Exit(1) } + ct := context.Background() + + var err error + + f, _ := filter.NewAllowAllFilter() + if ctx.String(whitelist) != "" { + if _, err = os.Stat(ctx.String(whitelist)); err != nil { + // We don't return error here since we don't want glog to handle it + fmt.Fprintf(os.Stderr, "%s points to non-existing file", whitelist) + os.Exit(1) + } + + o := filter.None + + if ctx.Bool("private") { + o |= filter.AllowAllPrivate + } + + if ctx.Bool("public") { + o |= filter.AllowAllPublic + } + + f, err = filter.NewFilterFromFile(ct, ctx.String(whitelist), o) + if err != nil { + return err + } + } + + go signalHandler(ct, f) + url = ctx.String("url") nodeurl = ctx.String("nodeurl") - private = ctx.Bool("private") + private = ctx.Bool("private") || ctx.String(whitelist) != "" interval, err := getInterval(ctx, "interval") if err != nil { @@ -534,7 +631,6 @@ func checker(ctx *cli.Context) error { preferipv4 = ctx.Bool("preferipv4") - ct := context.Background() infochecker := nodeinfo.NewNodeInfo(ct, checkermonitoring.NewNopCheckerMonitoring("nodeinfo")) c := channelchecker.NewDefaultChannelChecker(ct, ctx.Duration("keepalive"), ctx.Bool("smooth"), ctx.Bool("checkgraph"), checkermonitoring.NewNopCheckerMonitoring("channelchecker")) @@ -548,14 +644,14 @@ func checker(ctx *cli.Context) error { nodeinterval = agent_entities.TEN_SECONDS } - settings := agent_entities.ReportingSettings{PollInterval: interval, AllowedEntropy: ctx.Int("allowedentropy"), AllowPrivateChannels: ctx.Bool("private")} + settings := agent_entities.ReportingSettings{PollInterval: interval, AllowedEntropy: ctx.Int("allowedentropy"), AllowPrivateChannels: private, Filter: f} if settings.PollInterval == agent_entities.MANUAL_REQUEST { - infochecker.GetState("", ctx.String("uniqueid"), private, agent_entities.MANUAL_REQUEST, mkGetLndApi(ctx), infoCallback) + infochecker.GetState("", ctx.String("uniqueid"), private, agent_entities.MANUAL_REQUEST, mkGetLndApi(ctx), infoCallback, f) time.Sleep(1 * time.Second) c.GetState("", ctx.String("uniqueid"), mkGetLndApi(ctx), settings, balanceCallback) } else { - err := infochecker.Subscribe("", ctx.String("uniqueid"), private, nodeinterval, mkGetLndApi(ctx), infoCallback) + err := infochecker.Subscribe("", ctx.String("uniqueid"), private, nodeinterval, mkGetLndApi(ctx), infoCallback, f) if err != nil { return err } diff --git a/entities/balance.go b/entities/balance.go index 853fe5d..daddb29 100644 --- a/entities/balance.go +++ b/entities/balance.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/bolt-observer/agent/filter" entities "github.com/bolt-observer/go_common/entities" ) @@ -84,11 +85,12 @@ func (i *Interval) UnmarshalJSON(s []byte) (err error) { type BalanceReportCallback func(ctx context.Context, report *ChannelBalanceReport) bool type ReportingSettings struct { - GraphPollInterval time.Duration `json:"-"` - NoopInterval time.Duration `json:"-"` // If that much time has passed send null report - PollInterval Interval `json:"poll_interval"` - AllowedEntropy int `json:"allowed_entropy"` // 64 bits is the default - AllowPrivateChannels bool `json:"allow_private_channels"` // default is false + GraphPollInterval time.Duration `json:"-"` + NoopInterval time.Duration `json:"-"` // If that much time has passed send null report + Filter filter.FilterInterface `json:"-"` + PollInterval Interval `json:"poll_interval"` + AllowedEntropy int `json:"allowed_entropy"` // 64 bits is the default + AllowPrivateChannels bool `json:"allow_private_channels"` // default is false } type ChannelBalanceReport struct { diff --git a/filter/README.md b/filter/README.md new file mode 100644 index 0000000..8d6ee81 --- /dev/null +++ b/filter/README.md @@ -0,0 +1,4 @@ +# Filter + +This is a simple interface to filter based on channel id or (peer) pubkey. +Currently just a whitelist of what to report is supported but it is easily extendible to blacklists too. diff --git a/filter/dummy.go b/filter/dummy.go new file mode 100644 index 0000000..119f27e --- /dev/null +++ b/filter/dummy.go @@ -0,0 +1,48 @@ +package filter + +type AllowAllFilter struct { + Filter +} + +func NewAllowAllFilter() (FilterInterface, error) { + return &AllowAllFilter{}, nil +} + +func (f *AllowAllFilter) AllowPubKey(id string) bool { + return true +} + +func (f *AllowAllFilter) AllowChanId(id uint64) bool { + return true +} + +func (f *AllowAllFilter) AllowSpecial(private bool) bool { + return true +} + +type UnitTestFilter struct { + Filter +} + +func NewUnitTestFilter() (FilterInterface, error) { + f := &UnitTestFilter{ + Filter: Filter{ + chanIdWhitelist: make(map[uint64]struct{}), + nodeIdWhitelist: make(map[string]struct{}), + }, + } + + return f, nil +} + +func (u *UnitTestFilter) AddAllowPubKey(id string) { + u.nodeIdWhitelist[id] = struct{}{} +} + +func (u *UnitTestFilter) AddAllowChanId(id uint64) { + u.chanIdWhitelist[id] = struct{}{} +} + +func (f *UnitTestFilter) ChangeOptions(options Options) { + f.Options = options +} diff --git a/filter/filter.go b/filter/filter.go new file mode 100644 index 0000000..9f8329a --- /dev/null +++ b/filter/filter.go @@ -0,0 +1,156 @@ +package filter + +import ( + "bufio" + "context" + "os" + "regexp" + "strconv" + "strings" + "sync" + + utils "github.com/bolt-observer/go_common/utils" + "github.com/fsnotify/fsnotify" + "github.com/golang/glog" +) + +type FileFilter struct { + Filter + WhitelistFilePath string + Mutex sync.Mutex + DefaultOptions Options +} + +func (f *FileFilter) Reload() error { + f.Mutex.Lock() + defer f.Mutex.Unlock() + + r := regexp.MustCompile(`\s*#.*$`) + + readFile, err := os.Open(f.WhitelistFilePath) + + if err != nil { + return err + } + + defer readFile.Close() + + f.nodeIdWhitelist = make(map[string]struct{}) + f.chanIdWhitelist = make(map[uint64]struct{}) + f.Options = f.DefaultOptions + + fileScanner := bufio.NewScanner(readFile) + fileScanner.Split(bufio.ScanLines) + for fileScanner.Scan() { + line := strings.Trim(fileScanner.Text(), " ") + + if strings.HasPrefix("#", line) { + continue + } + + line = strings.Trim(r.ReplaceAllString(line, ""), " \t\r\n") + if line == "" { + continue + } + + if utils.ValidatePubkey(line) { + f.nodeIdWhitelist[line] = struct{}{} + } else if strings.ToLower(line) == "private" { + f.Options |= AllowAllPrivate + } else if strings.ToLower(line) == "public" { + f.Options |= AllowAllPublic + } else { + val, err := strconv.ParseUint(line, 10, 64) + if err != nil { + glog.Warningf("Invalid line %s", line) + continue + } + + f.chanIdWhitelist[val] = struct{}{} + } + } + + glog.V(3).Infof("Filter reloaded") + + return nil +} + +func NewFilterFromFile(ctx context.Context, filePath string, options Options) (FilterInterface, error) { + f := &FileFilter{ + WhitelistFilePath: filePath, + } + + f.DefaultOptions = options + + err := f.Reload() + if err != nil { + return nil, err + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + err = watcher.Add(filePath) + if err != nil { + return nil, err + } + + go func(watcher *fsnotify.Watcher) { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + continue + } + + f.Reload() + + if event.Op&fsnotify.Rename == fsnotify.Rename || event.Op&fsnotify.Remove == fsnotify.Remove { + // Happens when you save the changes via text editor + err := watcher.Add(event.Name) + if err != nil { + glog.Warningf("Watcher error %v\n", err) + } + } + case err := <-watcher.Errors: + glog.Warningf("Watcher error %v\n", err) + case <-ctx.Done(): + if watcher != nil { + watcher.Close() + } + return + } + } + }(watcher) + + return f, nil +} + +func (f *FileFilter) AllowPubKey(id string) bool { + f.Mutex.Lock() + defer f.Mutex.Unlock() + _, ok := f.nodeIdWhitelist[id] + + return ok +} + +func (f *FileFilter) AllowChanId(id uint64) bool { + f.Mutex.Lock() + defer f.Mutex.Unlock() + + _, ok := f.chanIdWhitelist[id] + + return ok +} + +func (f *FileFilter) AllowSpecial(private bool) bool { + f.Mutex.Lock() + defer f.Mutex.Unlock() + + if private { + return f.Options&AllowAllPrivate == AllowAllPrivate + } else { + return f.Options&AllowAllPublic == AllowAllPublic + } +} diff --git a/filter/filter_test.go b/filter/filter_test.go new file mode 100644 index 0000000..096cf60 --- /dev/null +++ b/filter/filter_test.go @@ -0,0 +1,73 @@ +package filter + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFile(t *testing.T) { + f, err := os.CreateTemp("", "tmpfile-") + if err != nil { + t.Fatalf("Got error %v\n", err) + } + + defer f.Close() + defer os.Remove(f.Name()) + + data := []byte(` + ### + # Demo file + ### + 1 + 2 + 3 # Comment + # 4 + `) + if _, err := f.Write(data); err != nil { + t.Fatalf("Got error %v\n", err) + } + + fil, err := NewFilterFromFile(context.TODO(), f.Name(), None) + if err != nil { + t.Fatalf("Got error %v\n", err) + } + + for _, chanid := range []uint64{1, 2, 3} { + assert.Equal(t, true, fil.AllowChanId(chanid), "Should be allowed") + } + + for _, chanid := range []uint64{4, 1337} { + assert.Equal(t, false, fil.AllowChanId(chanid), "Should not be allowed") + } +} + +func TestOptions(t *testing.T) { + f, err := os.CreateTemp("", "tmpfile-") + if err != nil { + t.Fatalf("Got error %v\n", err) + } + + defer f.Close() + defer os.Remove(f.Name()) + + data := []byte(` + ### + # Demo file + ### + public + `) + if _, err := f.Write(data); err != nil { + t.Fatalf("Got error %v\n", err) + } + + fil, err := NewFilterFromFile(context.TODO(), f.Name(), AllowAllPrivate) + if err != nil { + t.Fatalf("Got error %v\n", err) + } + + assert.Equal(t, true, fil.AllowSpecial(true), "Private channels should be allowed") + assert.Equal(t, true, fil.AllowSpecial(false), "Publlic channels should be allowed") +} diff --git a/filter/interface.go b/filter/interface.go new file mode 100644 index 0000000..586ad89 --- /dev/null +++ b/filter/interface.go @@ -0,0 +1,43 @@ +package filter + +type Options uint8 + +const ( + None Options = 1 << iota + AllowAllPrivate + AllowAllPublic +) + +type FilterInterface interface { + AllowPubKey(id string) bool + AllowChanId(id uint64) bool + AllowSpecial(private bool) bool +} + +type Filter struct { + Options Options + chanIdWhitelist map[uint64]struct{} + nodeIdWhitelist map[string]struct{} +} + +// TODO: yeah it would be easier if we had just Allow(chan) or sth + +func (f *Filter) AllowPubKey(id string) bool { + _, ok := f.nodeIdWhitelist[id] + + return ok +} + +func (f *Filter) AllowChanId(id uint64) bool { + _, ok := f.chanIdWhitelist[id] + + return ok +} + +func (f *Filter) AllowSpecial(private bool) bool { + if private { + return f.Options&AllowAllPrivate == AllowAllPrivate + } else { + return f.Options&AllowAllPublic == AllowAllPublic + } +} diff --git a/go.mod b/go.mod index c43e446..6564786 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/bolt-observer/go_common v0.0.8 github.com/bolt-observer/graphite-golang v0.0.0-20221216134649-94ce8603f234 github.com/btcsuite/btcd/btcutil v1.1.2 + github.com/fsnotify/fsnotify v1.4.9 github.com/getsentry/sentry-go v0.13.0 github.com/go-redis/redis v6.15.9+incompatible github.com/golang/glog v1.0.0 diff --git a/lightning_api/api.go b/lightning_api/api.go index c3756e1..2b47e70 100644 --- a/lightning_api/api.go +++ b/lightning_api/api.go @@ -204,6 +204,7 @@ func getNodeInfoFullTemplate(l LightingApiCalls, threshUseDescribeGraph int, ctx continue } c, err := l.GetChanInfo(ctx, ch.ChanId) + if err != nil { glog.Warningf("Could not get channel info for %v: %v", ch.ChanId, err) extendedNodeInfo.NumChannels -= 1 diff --git a/nodeinfo/nodeinfo.go b/nodeinfo/nodeinfo.go index e22798a..04fb71d 100644 --- a/nodeinfo/nodeinfo.go +++ b/nodeinfo/nodeinfo.go @@ -12,6 +12,8 @@ import ( checkermonitoring "github.com/bolt-observer/agent/checkermonitoring" entities "github.com/bolt-observer/agent/entities" + "github.com/bolt-observer/agent/filter" + "github.com/bolt-observer/agent/lightning_api" common_entities "github.com/bolt-observer/go_common/entities" utils "github.com/bolt-observer/go_common/utils" "github.com/golang/glog" @@ -81,13 +83,15 @@ func (c *NodeInfo) GetState( private bool, PollInterval entities.Interval, getApi entities.NewApiCall, - optCallback entities.InfoCallback) (*entities.InfoReport, error) { + optCallback entities.InfoCallback, + filter filter.FilterInterface, +) (*entities.InfoReport, error) { if pubKey != "" && !utils.ValidatePubkey(pubKey) { return nil, errors.New("invalid pubkey") } - resp, err := c.checkOne(entities.NodeIdentifier{Identifier: pubKey, UniqueId: uniqueId}, getApi, private) + resp, err := c.checkOne(entities.NodeIdentifier{Identifier: pubKey, UniqueId: uniqueId}, getApi, private, filter) if err != nil { return nil, err } @@ -106,7 +110,9 @@ func (c *NodeInfo) Subscribe( private bool, PollInterval entities.Interval, getApi entities.NewApiCall, - callback entities.InfoCallback) error { + callback entities.InfoCallback, + f filter.FilterInterface, +) error { if pubKey != "" && !utils.ValidatePubkey(pubKey) { return errors.New("invalid pubkey") @@ -127,6 +133,10 @@ func (c *NodeInfo) Subscribe( return fmt.Errorf("pubkey and reported pubkey are not the same %s vs %s", info.IdentityPubkey, pubKey) } + if f == nil { + f, _ = filter.NewAllowAllFilter() + } + c.globalSettings.Set(info.IdentityPubkey+uniqueId, Settings{ identifier: entities.NodeIdentifier{Identifier: pubKey, UniqueId: uniqueId}, lastCheck: time.Time{}, @@ -134,6 +144,7 @@ func (c *NodeInfo) Subscribe( getApi: getApi, hash: 0, private: private, + filter: f, }) return nil @@ -184,7 +195,7 @@ func (c *NodeInfo) checkAll() bool { toBeCheckedBy := s.lastCheck.Add(s.interval.Duration()) if toBeCheckedBy.Before(now) { - resp, err := c.checkOne(s.identifier, s.getApi, s.private) + resp, err := c.checkOne(s.identifier, s.getApi, s.private, s.filter) if err != nil { glog.Warningf("Failed to check %v: %v", s.identifier.GetId(), err) continue @@ -235,11 +246,34 @@ func (c *NodeInfo) checkAll() bool { return true } +func applyFilter(info *lightning_api.NodeInfoApiExtended, filter filter.FilterInterface) *lightning_api.NodeInfoApiExtended { + ret := &lightning_api.NodeInfoApiExtended{ + NodeInfoApi: info.NodeInfoApi, + Channels: make([]lightning_api.NodeChannelApiExtended, 0), + } + + for _, c := range info.Channels { + nodeAllowed := (filter.AllowPubKey(c.Node1Pub) && info.Node.PubKey != c.Node1Pub) || (filter.AllowPubKey(c.Node2Pub) && info.Node.PubKey != c.Node2Pub) + chanAllowed := filter.AllowChanId(c.ChannelId) + + if nodeAllowed || chanAllowed || filter.AllowSpecial(c.Private) { + ret.Channels = append(ret.Channels, c) + } else { + ret.NumChannels -= 1 + ret.TotalCapacity -= c.Capacity + } + } + + return ret +} + // checkOne checks one specific node func (c *NodeInfo) checkOne( identifier entities.NodeIdentifier, getApi entities.NewApiCall, - private bool) (*entities.InfoReport, error) { + private bool, + filter filter.FilterInterface, +) (*entities.InfoReport, error) { pubkey := identifier.GetId() if pubkey == "" { @@ -260,6 +294,9 @@ func (c *NodeInfo) checkOne( c.monitoring.MetricsReport("checkone", "failure", map[string]string{"pubkey": pubkey}) return nil, fmt.Errorf("failed to call GetNodeInfoFull %v", err) } + + info = applyFilter(info, filter) + if len(info.Channels) != int(info.NumChannels) { c.monitoring.MetricsReport("checkone", "failure", map[string]string{"pubkey": pubkey}) return nil, fmt.Errorf("bad NodeInfo obtained %d channels vs. num_channels %d - %v", len(info.Channels), info.NumChannels, info) diff --git a/nodeinfo/nodeinfo_test.go b/nodeinfo/nodeinfo_test.go index b594ddb..68a7ea4 100644 --- a/nodeinfo/nodeinfo_test.go +++ b/nodeinfo/nodeinfo_test.go @@ -12,6 +12,7 @@ import ( "time" agent_entities "github.com/bolt-observer/agent/entities" + "github.com/bolt-observer/agent/filter" lightning_api "github.com/bolt-observer/agent/lightning_api" entities "github.com/bolt-observer/go_common/entities" utils "github.com/bolt-observer/go_common/utils" @@ -31,7 +32,7 @@ func getChannelJson(remote uint64, private, active bool) string { "channels": [ { "chan_id": "1", - "capacity": "50000", + "capacity": "10000", "local_balance": "7331", "remote_balance": "%d", "commit_fee": "2345", @@ -122,10 +123,17 @@ func getNodeInfoJson(pubKey string) string { `, pubKey) } -func getChanInfo() string { - return ` - {"channel_id":"810130063083110402", "chan_point":"72003042c278217521ce91dd11ac96ee1ece398c304b514aa3bff9e05329b126:2", "last_update":1661455399, "node1_pub":"02004c625d622245606a1ea2c1c69cfb4516b703b47945a3647713c05fe4aaeb1c", "node2_pub":"02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256", "capacity":"50000", "node1_policy":{"time_lock_delta":40, "min_htlc":"1000", "fee_base_msat":"1000", "fee_rate_milli_msat":"1", "disabled":false, "max_htlc_msat":"49500000", "last_update":1661455399}, "node2_policy":{"time_lock_delta":40, "min_htlc":"1000", "fee_base_msat":"1000", "fee_rate_milli_msat":"1", "disabled":false, "max_htlc_msat":"49500000", "last_update":1661395514}} - ` +func getChanInfo(url string) string { + s := strings.ReplaceAll(url, "/v1/graph/edge/", "") + id, err := strconv.Atoi(s) + + if err != nil { + return "" + } + + return fmt.Sprintf(` + {"channel_id":"%d", "chan_point":"72003042c278217521ce91dd11ac96ee1ece398c304b514aa3bff9e05329b126:2", "last_update":1661455399, "node1_pub":"02004c625d622245606a1ea2c1c69cfb4516b703b47945a3647713c05fe4aaeb1c", "node2_pub":"02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256", "capacity":"%d", "node1_policy":{"time_lock_delta":40, "min_htlc":"1000", "fee_base_msat":"1000", "fee_rate_milli_msat":"1", "disabled":false, "max_htlc_msat":"49500000", "last_update":1661455399}, "node2_policy":{"time_lock_delta":40, "min_htlc":"1000", "fee_base_msat":"1000", "fee_rate_milli_msat":"1", "disabled":false, "max_htlc_msat":"49500000", "last_update":1661395514}} + `, id, id*10000) } func initTest(t *testing.T) (string, lightning_api.LightingApiCalls, *lightning_api.LndRestLightningApi) { @@ -185,11 +193,13 @@ func TestSubscription(t *testing.T) { return } + f, _ := filter.NewAllowAllFilter() err := c.Subscribe( pubKey, "random_id", true, agent_entities.SECOND, func() lightning_api.LightingApiCalls { return api }, nil, + f, ) if err != nil { t.Fatalf("Subscribe failed: %v", err) @@ -224,7 +234,7 @@ func TestBasicFlow(t *testing.T) { } else if strings.Contains(req.URL.Path, "v1/channels") { contents = getChannelJson(1337, false, true) } else if strings.Contains(req.URL.Path, "v1/graph/edge") { - contents = getChanInfo() + contents = getChanInfo(req.URL.Path) } else if strings.Contains(req.URL.Path, "v1/graph/node") { contents = getNodeInfoJson("02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256") } @@ -245,6 +255,7 @@ func TestBasicFlow(t *testing.T) { c.OverrideLoopInterval(1 * time.Second) was_called := false + f, _ := filter.NewAllowAllFilter() c.Subscribe( pubKey, "random_id", true, agent_entities.SECOND, @@ -254,6 +265,70 @@ func TestBasicFlow(t *testing.T) { cancel() return true }, + f, + ) + + c.EventLoop() + + select { + case <-time.After(5 * time.Second): + t.Fatal("Took too long") + case <-ctx.Done(): + if !was_called { + t.Fatalf("Callback was not correctly invoked") + } + } +} + +func TestBasicFlowFilter(t *testing.T) { + pubKey, api, d := initTest(t) + + d.HttpApi.DoFunc = func(req *http.Request) (*http.Response, error) { + contents := "" + if strings.Contains(req.URL.Path, "v1/getinfo") { + contents = getInfoJson("02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256") + } else if strings.Contains(req.URL.Path, "v1/channels") { + contents = getChannelJson(1337, false, true) + } else if strings.Contains(req.URL.Path, "v1/graph/edge") { + contents = getChanInfo(req.URL.Path) + } else if strings.Contains(req.URL.Path, "v1/graph/node") { + contents = getNodeInfoJson("02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256") + } + + r := ioutil.NopCloser(bytes.NewReader([]byte(contents))) + + return &http.Response{ + StatusCode: 200, + Body: r, + }, nil + } + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(15*time.Second)) + + c := NewNodeInfo(ctx, nil) + + // Make everything a bit faster + c.OverrideLoopInterval(1 * time.Second) + was_called := false + + f, _ := filter.NewUnitTestFilter() + fd := f.(*filter.UnitTestFilter) + fd.AddAllowChanId(1) + fd.AddAllowChanId(1337) + + c.Subscribe( + pubKey, "random_id", true, + agent_entities.SECOND, + func() lightning_api.LightingApiCalls { return api }, + func(ctx context.Context, report *agent_entities.InfoReport) bool { + if report.NumChannels == 1 && report.TotalCapacity == 10000 { + was_called = true + cancel() + } + + return true + }, + f, ) c.EventLoop() @@ -278,7 +353,7 @@ func TestContextCanBeNil(t *testing.T) { } else if strings.Contains(req.URL.Path, "v1/channels") { contents = getChannelJson(1337, false, true) } else if strings.Contains(req.URL.Path, "v1/graph/edge") { - contents = getChanInfo() + contents = getChanInfo(req.URL.Path) } else if strings.Contains(req.URL.Path, "v1/graph/node") { contents = getNodeInfoJson("02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256") } @@ -297,6 +372,8 @@ func TestContextCanBeNil(t *testing.T) { c.OverrideLoopInterval(1 * time.Second) was_called := false + f, _ := filter.NewAllowAllFilter() + c.Subscribe( pubKey, "random_id", true, agent_entities.SECOND, @@ -305,6 +382,7 @@ func TestContextCanBeNil(t *testing.T) { was_called = true return true }, + f, ) go c.EventLoop() @@ -327,7 +405,7 @@ func TestGetState(t *testing.T) { } else if strings.Contains(req.URL.Path, "v1/channels") { contents = getChannelJson(1337, false, true) } else if strings.Contains(req.URL.Path, "v1/graph/edge") { - contents = getChanInfo() + contents = getChanInfo(req.URL.Path) } else if strings.Contains(req.URL.Path, "v1/graph/node") { contents = getNodeInfoJson("02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256") } @@ -345,11 +423,14 @@ func TestGetState(t *testing.T) { c := NewNodeInfo(ctx, nil) + f, _ := filter.NewAllowAllFilter() + resp, err := c.GetState( pubKey, "random_id", true, agent_entities.SECOND, func() lightning_api.LightingApiCalls { return api }, nil, + f, ) if err != nil { @@ -373,7 +454,7 @@ func TestGetStateCallback(t *testing.T) { } else if strings.Contains(req.URL.Path, "v1/channels") { contents = getChannelJson(1337, false, true) } else if strings.Contains(req.URL.Path, "v1/graph/edge") { - contents = getChanInfo() + contents = getChanInfo(req.URL.Path) } else if strings.Contains(req.URL.Path, "v1/graph/node") { contents = getNodeInfoJson("02b67e55fb850d7f7d77eb71038362bc0ed0abd5b7ee72cc4f90b16786c69b9256") } @@ -393,6 +474,8 @@ func TestGetStateCallback(t *testing.T) { var callresp *agent_entities.InfoReport callresp = nil + f, _ := filter.NewAllowAllFilter() + resp, err := c.GetState( pubKey, "random_id", true, agent_entities.SECOND, @@ -401,6 +484,7 @@ func TestGetStateCallback(t *testing.T) { callresp = report return true }, + f, ) if err != nil { diff --git a/nodeinfo/settings.go b/nodeinfo/settings.go index 522b8e0..bc6ce87 100644 --- a/nodeinfo/settings.go +++ b/nodeinfo/settings.go @@ -5,6 +5,7 @@ import ( "time" entities "github.com/bolt-observer/agent/entities" + "github.com/bolt-observer/agent/filter" ) type GlobalSettings struct { @@ -56,4 +57,5 @@ type Settings struct { getApi entities.NewApiCall interval entities.Interval private bool + filter filter.FilterInterface }