Skip to content

Commit

Permalink
Merge pull request #253 from msaf1980/fix/sd
Browse files Browse the repository at this point in the history
feat(sd): default namespace is graphite, reduce weight if overload
  • Loading branch information
msaf1980 authored Nov 3, 2023
2 parents c47e8c8 + ef4d8f2 commit b05bb9c
Show file tree
Hide file tree
Showing 22 changed files with 650 additions and 185 deletions.
32 changes: 18 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ type Common struct {
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

Expand Down Expand Up @@ -278,11 +279,11 @@ type Carbonlink struct {

// Prometheus configuration
type Prometheus struct {
Listen string `toml:"listen" json:"listen" comment:"listen addr for prometheus ui and api"`
ExternalURLRaw string `toml:"external-url" json:"external-url" comment:"allows to set URL for redirect manually"`
ExternalURL *url.URL `toml:"-" json:"-"`
PageTitle string `toml:"page-title" json:"page-title"`
LookbackDelta time.Duration `toml:"lookback-delta" json:"lookback-delta"`
Listen string `toml:"listen" json:"listen" comment:"listen addr for prometheus ui and api"`
ExternalURLRaw string `toml:"external-url" json:"external-url" comment:"allows to set URL for redirect manually"`
ExternalURL *url.URL `toml:"-" json:"-"`
PageTitle string `toml:"page-title" json:"page-title"`
LookbackDelta time.Duration `toml:"lookback-delta" json:"lookback-delta"`
RemoteReadConcurrencyLimit int `toml:"remote-read-concurrency-limit" json:"remote-read-concurrency-limit" comment:"concurrently handled remote read requests"`
}

Expand Down Expand Up @@ -393,10 +394,10 @@ func New() *Config {
TotalTimeout: 500 * time.Millisecond,
},
Prometheus: Prometheus{
ExternalURLRaw: "",
PageTitle: "Prometheus Time Series Collection and Processing Server",
Listen: ":9092",
LookbackDelta: 5 * time.Minute,
ExternalURLRaw: "",
PageTitle: "Prometheus Time Series Collection and Processing Server",
Listen: ":9092",
LookbackDelta: 5 * time.Minute,
RemoteReadConcurrencyLimit: 10,
},
Debug: Debug{
Expand Down Expand Up @@ -722,6 +723,9 @@ func (c *Config) NeedLoadAvgColect() bool {
if c.Common.SDNamespace == "" {
c.Common.SDNamespace = "graphite"
}
if c.Common.SDExpire < 24*time.Hour {
c.Common.SDExpire = 24 * time.Hour
}
return true
}
if c.ClickHouse.RenderAdaptiveQueries > 0 {
Expand Down
2 changes: 2 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
service-discovery-ns = ""
# service discovery datacenters (first - is primary, in other register as backup)
service-discovery-ds = []
# service discovery expire duration for cleanup (minimum is 24h, if enabled)
service-discovery-expire = "0s"

# find/tags cache config
[common.find-cache]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/lomik/zapwriter v0.0.0-20210624082824-c1161d1eb463
github.com/msaf1980/go-expirecache v0.0.2
github.com/msaf1980/go-metrics v0.0.14
github.com/msaf1980/go-stringutils v0.1.4
github.com/msaf1980/go-stringutils v0.1.6
github.com/msaf1980/go-syncutils v0.0.3
github.com/msaf1980/go-timeutils v0.0.3
github.com/pelletier/go-toml v1.9.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ github.com/msaf1980/go-metrics v0.0.11/go.mod h1:8VcR8MdyvIJpcVLOVFKbhb27+60tXy0
github.com/msaf1980/go-metrics v0.0.14 h1:gD0kCG5MDbon33Nkz49yW6kz3yu0DHzDN0SxjGTWlTA=
github.com/msaf1980/go-metrics v0.0.14/go.mod h1:8VcR8MdyvIJpcVLOVFKbhb27+60tXy0M+zq7Ag8a6Pw=
github.com/msaf1980/go-stringutils v0.1.2/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/msaf1980/go-stringutils v0.1.4 h1:UwsIT0hplHVucqbknk3CoNqKkmIuSHhsbBldXxyld5U=
github.com/msaf1980/go-stringutils v0.1.4/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/msaf1980/go-stringutils v0.1.6 h1:qri8o+4XLJCJYemHcvJY6xJhrGTmllUoPwayKEj4NSg=
github.com/msaf1980/go-stringutils v0.1.6/go.mod h1:xpicaTIpLAVzL0gUQkciB1zjypDGKsOCI25cKQbRQYA=
github.com/msaf1980/go-syncutils v0.0.3 h1:bd6+yTSB8/CmpG7M6j1gq5sJMyPqecjJcBf19s2Y6u4=
github.com/msaf1980/go-syncutils v0.0.3/go.mod h1:zoZwQNkDATcfKq5lQPK6dmJT7Z01COxw/vd8bcJyC9w=
github.com/msaf1980/go-timeutils v0.0.3 h1:c0NIpJBcU6KoMeMCPdnbGFcaP4sm7VCwoW1cdgsmUkU=
Expand Down
96 changes: 63 additions & 33 deletions graphite-clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os/signal"
"runtime"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/lomik/graphite-clickhouse/prometheus"
"github.com/lomik/graphite-clickhouse/render"
"github.com/lomik/graphite-clickhouse/sd"
"github.com/lomik/graphite-clickhouse/sd/nginx"
"github.com/lomik/graphite-clickhouse/tagger"
)

Expand Down Expand Up @@ -108,7 +108,10 @@ func main() {
)

sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
sdClean := flag.Bool("sd-clean", false, "Cleanup registered nodes in SD")
sdDelete := flag.Bool("sd-delete", false, "Delete registered nodes for this hostname in SD")
sdEvict := flag.String("sd-evict", "", "Delete registered nodes for hostname in SD")
sdClean := flag.Bool("sd-clean", false, "Cleanup expired registered nodes in SD")
sdExpired := flag.Bool("sd-expired", false, "List expired registered nodes in SD")

printVersion := flag.Bool("version", false, "Print version")
verbose := flag.Bool("verbose", false, "Verbose (print config on startup)")
Expand Down Expand Up @@ -137,35 +140,61 @@ func main() {
return
}

if *sdList || *sdClean {
if *sdEvict != "" {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var sd sd.SD
var s sd.SD
logger := zapwriter.Default()
switch cfg.Common.SDType {
case config.SDNginx:
sd = nginx.New(cfg.Common.SD, cfg.Common.SDNamespace, "", logger)
default:
panic(fmt.Errorf("service discovery type %q can be registered", cfg.Common.SDType.String()))
if s, err = sd.New(&cfg.Common, *sdEvict, logger); err != nil {
fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
os.Exit(1)
}
ts := time.Now().Unix() - 3600
if nodes, err := sd.Nodes(); err == nil {
for _, node := range nodes {
if *sdClean && node.Flags > 0 {
if ts > node.Flags {
fmt.Printf("%s: %s (%s), deleted\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
// sd.Delete(node.Key, node.Value)
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
err = s.Clear("", "")
}
return
} else if *sdList || *sdDelete || *sdExpired || *sdClean {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var s sd.SD
logger := zapwriter.Default()
if s, err = sd.New(&cfg.Common, "", logger); err != nil {
fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
os.Exit(1)
}

// sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
// sdDelete := flag.Bool("sd-delete", false, "Delete registered nodes for this hostname in SD")
// sdEvict := flag.String("sd-evict", "", "Delete registered nodes for hostname in SD")
// sdClean := flag.Bool("sd-clean", false, "Cleanup expired registered nodes in SD")

if *sdDelete {
hostname, _ := os.Hostname()
hostname, _, _ = strings.Cut(hostname, ".")
if err = s.Clear("", ""); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else if *sdExpired {
if err = sd.Cleanup(&cfg.Common, s, true); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else if *sdClean {
if err = sd.Cleanup(&cfg.Common, s, false); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else {
log.Fatal(err)
if nodes, err := s.Nodes(); err == nil {
for _, node := range nodes {
fmt.Printf("%s/%s: %s (%s)\n", s.Namespace(), node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
} else {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
}
} else {
fmt.Fprintln(os.Stderr, "SD not enabled")
os.Exit(1)
}
return
}
Expand Down Expand Up @@ -288,29 +317,30 @@ func main() {
}
}()

if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
go func() {
time.Sleep(time.Millisecond * 100)
sdLogger := localManager.Logger("service discovery")
sd.Register(&cfg.Common, sdLogger)
}()
}

go func() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT)
<-stop
logger.Info("stoping graphite-clickhouse")
if cfg.Common.SDType != config.SDNone {
if cfg.Common.SD != "" {
// unregister SD
sd.Stop()
time.Sleep(10 * time.Second)
}
// initiating the shutdown
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
srv.Shutdown(ctx)
cancel()
}()

if cfg.Common.SD != "" {
go func() {
time.Sleep(time.Millisecond * 100)
sdLogger := localManager.Logger("service discovery")
sd.Register(cfg, sdLogger)
}()
}

exitWait.Wait()

logger.Info("stop graphite-clickhouse")
Expand Down
10 changes: 8 additions & 2 deletions load_avg/load_avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ func Store(f float64) {
}

func Weight(n int, l float64) int64 {
if n <= 0 || l >= 2.0 {
return 1
}
// (1 / normalized_load_avg - 1)
l = math.Round(10*l) / 10
if l == 0 {
return 2 * int64(n)
}
if l > 1.0 {
l *= 4
}
l = math.Log10(l)
w := int64(n) - int64(float64(n)*l)
if w < 0 {
return 0
if w <= 0 {
return 1
}
return w
}
26 changes: 14 additions & 12 deletions load_avg/load_avg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ func TestWeight(t *testing.T) {
{n: 100, l: 0.5, want: 130},
{n: 100, l: 0.9, want: 104},
{n: 100, l: 1, want: 100},
{n: 100, l: 1.1, want: 96},
{n: 100, l: 2, want: 70},
{n: 100, l: 4, want: 40},
{n: 100, l: 9, want: 5},
{n: 100, l: 10, want: 0},
{n: 100, l: 20, want: 0},
{n: 100, l: 1.1, want: 36},
{n: 100, l: 1.9, want: 12},
{n: 100, l: 2, want: 1},
{n: 100, l: 9, want: 1},
{n: 100, l: 10, want: 1},
{n: 100, l: 20, want: 1},
// n : 1000
{n: 1000, l: 0, want: 2000},
{n: 1000, l: 0.1, want: 1999},
Expand All @@ -33,12 +33,14 @@ func TestWeight(t *testing.T) {
{n: 1000, l: 0.5, want: 1301},
{n: 1000, l: 0.9, want: 1045},
{n: 1000, l: 1, want: 1000},
{n: 1000, l: 1.1, want: 959},
{n: 1000, l: 2, want: 699},
{n: 1000, l: 4, want: 398},
{n: 1000, l: 9, want: 46},
{n: 1000, l: 10, want: 0},
{n: 1000, l: 20, want: 0},
{n: 1000, l: 1.1, want: 357},
{n: 1000, l: 1.9, want: 120},
{n: 1000, l: 2, want: 1},
{n: 1000, l: 3, want: 1},
{n: 1000, l: 4, want: 1},
{n: 1000, l: 9, want: 1},
{n: 1000, l: 10, want: 1},
{n: 1000, l: 20, want: 1},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%d#%f", tt.n, tt.l), func(t *testing.T) {
Expand Down
Loading

0 comments on commit b05bb9c

Please sign in to comment.