Skip to content

Commit

Permalink
feat(sd): cleanup/list expired
Browse files Browse the repository at this point in the history
  • Loading branch information
msaf1980 committed Nov 3, 2023
1 parent cef75bd commit ef4d8f2
Show file tree
Hide file tree
Showing 19 changed files with 623 additions and 169 deletions.
32 changes: 18 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ type Common struct {
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

Expand Down Expand Up @@ -278,11 +279,11 @@ type Carbonlink struct {

// Prometheus configuration
type Prometheus struct {
Listen string `toml:"listen" json:"listen" comment:"listen addr for prometheus ui and api"`
ExternalURLRaw string `toml:"external-url" json:"external-url" comment:"allows to set URL for redirect manually"`
ExternalURL *url.URL `toml:"-" json:"-"`
PageTitle string `toml:"page-title" json:"page-title"`
LookbackDelta time.Duration `toml:"lookback-delta" json:"lookback-delta"`
Listen string `toml:"listen" json:"listen" comment:"listen addr for prometheus ui and api"`
ExternalURLRaw string `toml:"external-url" json:"external-url" comment:"allows to set URL for redirect manually"`
ExternalURL *url.URL `toml:"-" json:"-"`
PageTitle string `toml:"page-title" json:"page-title"`
LookbackDelta time.Duration `toml:"lookback-delta" json:"lookback-delta"`
RemoteReadConcurrencyLimit int `toml:"remote-read-concurrency-limit" json:"remote-read-concurrency-limit" comment:"concurrently handled remote read requests"`
}

Expand Down Expand Up @@ -393,10 +394,10 @@ func New() *Config {
TotalTimeout: 500 * time.Millisecond,
},
Prometheus: Prometheus{
ExternalURLRaw: "",
PageTitle: "Prometheus Time Series Collection and Processing Server",
Listen: ":9092",
LookbackDelta: 5 * time.Minute,
ExternalURLRaw: "",
PageTitle: "Prometheus Time Series Collection and Processing Server",
Listen: ":9092",
LookbackDelta: 5 * time.Minute,
RemoteReadConcurrencyLimit: 10,
},
Debug: Debug{
Expand Down Expand Up @@ -722,6 +723,9 @@ func (c *Config) NeedLoadAvgColect() bool {
if c.Common.SDNamespace == "" {
c.Common.SDNamespace = "graphite"
}
if c.Common.SDExpire < 24*time.Hour {
c.Common.SDExpire = 24 * time.Hour
}
return true
}
if c.ClickHouse.RenderAdaptiveQueries > 0 {
Expand Down
2 changes: 2 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
service-discovery-ns = ""
# service discovery datacenters (first - is primary, in other register as backup)
service-discovery-ds = []
# service discovery expire duration for cleanup (minimum is 24h, if enabled)
service-discovery-expire = "0s"

# find/tags cache config
[common.find-cache]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/lomik/zapwriter v0.0.0-20210624082824-c1161d1eb463
github.com/msaf1980/go-expirecache v0.0.2
github.com/msaf1980/go-metrics v0.0.14
github.com/msaf1980/go-stringutils v0.1.4
github.com/msaf1980/go-stringutils v0.1.6
github.com/msaf1980/go-syncutils v0.0.3
github.com/msaf1980/go-timeutils v0.0.3
github.com/pelletier/go-toml v1.9.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ github.com/msaf1980/go-metrics v0.0.11/go.mod h1:8VcR8MdyvIJpcVLOVFKbhb27+60tXy0
github.com/msaf1980/go-metrics v0.0.14 h1:gD0kCG5MDbon33Nkz49yW6kz3yu0DHzDN0SxjGTWlTA=
github.com/msaf1980/go-metrics v0.0.14/go.mod h1:8VcR8MdyvIJpcVLOVFKbhb27+60tXy0M+zq7Ag8a6Pw=
github.com/msaf1980/go-stringutils v0.1.2/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/msaf1980/go-stringutils v0.1.4 h1:UwsIT0hplHVucqbknk3CoNqKkmIuSHhsbBldXxyld5U=
github.com/msaf1980/go-stringutils v0.1.4/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/msaf1980/go-stringutils v0.1.6 h1:qri8o+4XLJCJYemHcvJY6xJhrGTmllUoPwayKEj4NSg=
github.com/msaf1980/go-stringutils v0.1.6/go.mod h1:xpicaTIpLAVzL0gUQkciB1zjypDGKsOCI25cKQbRQYA=
github.com/msaf1980/go-syncutils v0.0.3 h1:bd6+yTSB8/CmpG7M6j1gq5sJMyPqecjJcBf19s2Y6u4=
github.com/msaf1980/go-syncutils v0.0.3/go.mod h1:zoZwQNkDATcfKq5lQPK6dmJT7Z01COxw/vd8bcJyC9w=
github.com/msaf1980/go-timeutils v0.0.3 h1:c0NIpJBcU6KoMeMCPdnbGFcaP4sm7VCwoW1cdgsmUkU=
Expand Down
96 changes: 63 additions & 33 deletions graphite-clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os/signal"
"runtime"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/lomik/graphite-clickhouse/prometheus"
"github.com/lomik/graphite-clickhouse/render"
"github.com/lomik/graphite-clickhouse/sd"
"github.com/lomik/graphite-clickhouse/sd/nginx"
"github.com/lomik/graphite-clickhouse/tagger"
)

Expand Down Expand Up @@ -108,7 +108,10 @@ func main() {
)

sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
sdClean := flag.Bool("sd-clean", false, "Cleanup registered nodes in SD")
sdDelete := flag.Bool("sd-delete", false, "Delete registered nodes for this hostname in SD")
sdEvict := flag.String("sd-evict", "", "Delete registered nodes for hostname in SD")
sdClean := flag.Bool("sd-clean", false, "Cleanup expired registered nodes in SD")
sdExpired := flag.Bool("sd-expired", false, "List expired registered nodes in SD")

printVersion := flag.Bool("version", false, "Print version")
verbose := flag.Bool("verbose", false, "Verbose (print config on startup)")
Expand Down Expand Up @@ -137,35 +140,61 @@ func main() {
return
}

if *sdList || *sdClean {
if *sdEvict != "" {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var sd sd.SD
var s sd.SD
logger := zapwriter.Default()
switch cfg.Common.SDType {
case config.SDNginx:
sd = nginx.New(cfg.Common.SD, cfg.Common.SDNamespace, "", logger)
default:
panic(fmt.Errorf("service discovery type %q can be registered", cfg.Common.SDType.String()))
if s, err = sd.New(&cfg.Common, *sdEvict, logger); err != nil {
fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
os.Exit(1)
}
ts := time.Now().Unix() - 3600
if nodes, err := sd.Nodes(); err == nil {
for _, node := range nodes {
if *sdClean && node.Flags > 0 {
if ts > node.Flags {
fmt.Printf("%s: %s (%s), deleted\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
// sd.Delete(node.Key, node.Value)
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
err = s.Clear("", "")
}
return
} else if *sdList || *sdDelete || *sdExpired || *sdClean {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var s sd.SD
logger := zapwriter.Default()
if s, err = sd.New(&cfg.Common, "", logger); err != nil {
fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
os.Exit(1)
}

// sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
// sdDelete := flag.Bool("sd-delete", false, "Delete registered nodes for this hostname in SD")
// sdEvict := flag.String("sd-evict", "", "Delete registered nodes for hostname in SD")
// sdClean := flag.Bool("sd-clean", false, "Cleanup expired registered nodes in SD")

if *sdDelete {
hostname, _ := os.Hostname()
hostname, _, _ = strings.Cut(hostname, ".")
if err = s.Clear("", ""); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else if *sdExpired {
if err = sd.Cleanup(&cfg.Common, s, true); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else if *sdClean {
if err = sd.Cleanup(&cfg.Common, s, false); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else {
log.Fatal(err)
if nodes, err := s.Nodes(); err == nil {
for _, node := range nodes {
fmt.Printf("%s/%s: %s (%s)\n", s.Namespace(), node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
} else {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
}
} else {
fmt.Fprintln(os.Stderr, "SD not enabled")
os.Exit(1)
}
return
}
Expand Down Expand Up @@ -288,29 +317,30 @@ func main() {
}
}()

if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
go func() {
time.Sleep(time.Millisecond * 100)
sdLogger := localManager.Logger("service discovery")
sd.Register(&cfg.Common, sdLogger)
}()
}

go func() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT)
<-stop
logger.Info("stoping graphite-clickhouse")
if cfg.Common.SDType != config.SDNone {
if cfg.Common.SD != "" {
// unregister SD
sd.Stop()
time.Sleep(10 * time.Second)
}
// initiating the shutdown
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
srv.Shutdown(ctx)
cancel()
}()

if cfg.Common.SD != "" {
go func() {
time.Sleep(time.Millisecond * 100)
sdLogger := localManager.Logger("service discovery")
sd.Register(cfg, sdLogger)
}()
}

exitWait.Wait()

logger.Info("stop graphite-clickhouse")
Expand Down
34 changes: 26 additions & 8 deletions sd/nginx/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ import (
)

type ErrInvalidKey struct {
key string
val string
}

func (e ErrInvalidKey) Error() string {
if e.val == "" {
return "list key is invalid"
}
return "list key is invalid: '" + e.val + "'"
return "list key '" + e.key + "' is invalid: '" + e.val + "'"
}

var (
Expand Down Expand Up @@ -50,6 +48,7 @@ func splitNode(node string) (dc, host, listen string, ok bool) {
type Nginx struct {
weight int64
hostname string
namespace string
body []byte
backupBody []byte
url stringutils.Builder
Expand All @@ -69,8 +68,9 @@ func New(url, namespace, hostname string, logger *zap.Logger) *Nginx {
backupBody: []byte(`{"backup":1,"max_fails":0}`),
nsEnd: "upstreams/" + namespace + "/",
hostname: hostname,
namespace: namespace,
}
sd.setWeight(0)
sd.setWeight(1)

sd.url.WriteString(url)
sd.url.WriteByte('/')
Expand All @@ -84,6 +84,9 @@ func New(url, namespace, hostname string, logger *zap.Logger) *Nginx {
}

func (sd *Nginx) setWeight(weight int64) {
if weight <= 0 {
weight = 1
}
if sd.weight != weight {
sd.weight = weight
sd.body = sd.body[:0]
Expand All @@ -93,6 +96,10 @@ func (sd *Nginx) setWeight(weight int64) {
}
}

func (sd *Nginx) Namespace() string {
return sd.namespace
}

func (sd *Nginx) List() (nodes []string, err error) {
sd.url.Truncate(sd.pos)
sd.url.WriteString("?recurse")
Expand All @@ -118,7 +125,7 @@ func (sd *Nginx) List() (nodes []string, err error) {
nodes = append(nodes, s)
}
} else {
return nil, ErrInvalidKey{s}
return nil, ErrInvalidKey{key: sd.nsEnd, val: s}
}
} else {
return nil, ErrNoKey
Expand Down Expand Up @@ -169,7 +176,7 @@ func (sd *Nginx) ListMap() (nodes map[string]string, err error) {
}
}
} else {
return nil, ErrInvalidKey{s}
return nil, ErrInvalidKey{key: sd.nsEnd, val: s}
}
} else {
return nil, ErrNoKey
Expand Down Expand Up @@ -225,7 +232,7 @@ func (sd *Nginx) Nodes() (nodes []utils.KV, err error) {
}
nodes = append(nodes, kv)
} else {
return nil, ErrInvalidKey{s}
return nil, ErrInvalidKey{key: sd.nsEnd, val: s}
}
} else {
return nil, ErrNoKey
Expand Down Expand Up @@ -307,6 +314,17 @@ func (sd *Nginx) Update(ip, port string, dc []string, weight int64) error {
return sd.update(ip, port, dc)
}

func (sd *Nginx) DeleteNode(node string) (err error) {
sd.url.Truncate(sd.pos)
sd.url.WriteString(node)

if err = utils.HttpDelete(sd.url.String()); err != nil {
sd.logger.Error("delete", zap.String("address", sd.url.String()[sd.pos:]), zap.Error(err))
}

return
}

func (sd *Nginx) Delete(ip, port string, dc []string) (err error) {
if len(dc) == 0 {
sd.url.Truncate(sd.pos)
Expand Down
Loading

0 comments on commit ef4d8f2

Please sign in to comment.