diff --git a/pkg/util/duration.go b/pkg/util/duration.go new file mode 100644 index 000000000..20141c1c4 --- /dev/null +++ b/pkg/util/duration.go @@ -0,0 +1,89 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/BurntSushi/toml" + "github.com/pingcap/errors" +) + +var empty = "" +var _ toml.TextMarshaler = Duration(empty) +var _ toml.TextUnmarshaler = (*Duration)(&empty) +var _ json.Marshaler = Duration(empty) +var _ json.Unmarshaler = (*Duration)(&empty) + +// Duration is a wrapper of time.Duration for TOML and JSON. +type Duration string + +// NewDuration creates a Duration from time.Duration. +func NewDuration(duration time.Duration) Duration { + return Duration(duration.String()) +} + +// MarshalJSON returns the duration as a JSON string. +func (d Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d)), nil +} + +// UnmarshalJSON parses a JSON string into the duration. +func (d *Duration) UnmarshalJSON(text []byte) error { + s, err := strconv.Unquote(string(text)) + if err != nil { + return errors.WithStack(err) + } + td := Duration(s) + _, err = td.ParseDuration() + if err != nil { + return errors.WithStack(err) + } + *d = Duration(s) + return nil +} + +// UnmarshalText parses a TOML string into the duration. +func (d *Duration) UnmarshalText(text []byte) error { + var err error + td := Duration(text) + _, err = td.ParseDuration() + if err != nil { + return errors.WithStack(err) + } + *d = Duration(text) + return nil +} + +// MarshalText returns the duration as a JSON string. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(d), nil +} + +// ParseDuration parses gc durations. The default unit is day. +func (d Duration) ParseDuration() (time.Duration, error) { + gc := string(d) + t, err := strconv.ParseUint(gc, 10, 64) + if err == nil { + return time.Duration(t) * 24 * time.Hour, nil + } + gcDuration, err := time.ParseDuration(gc) + if err != nil { + return 0, errors.Annotatef(err, "unsupported gc time %s, etc: use 7 for 7 day, 7h for 7 hour", gc) + } + return gcDuration, nil +} diff --git a/pkg/util/duration_test.go b/pkg/util/duration_test.go new file mode 100644 index 000000000..66728b832 --- /dev/null +++ b/pkg/util/duration_test.go @@ -0,0 +1,42 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "time" + + . "github.com/pingcap/check" +) + +type durationSuite struct{} + +var _ = Suite(&durationSuite{}) + +func (s *durationSuite) TestParseDuration(c *C) { + gc := Duration("7") + expectDuration := 7 * 24 * time.Hour + duration, err := gc.ParseDuration() + c.Assert(err, IsNil) + c.Assert(duration, Equals, expectDuration) + + gc = "30m" + expectDuration = 30 * time.Minute + duration, err = gc.ParseDuration() + c.Assert(err, IsNil) + c.Assert(duration, Equals, expectDuration) + + gc = "7d" + _, err = gc.ParseDuration() + c.Assert(err, NotNil) +} diff --git a/pump/config.go b/pump/config.go index cf7f7894a..d9d35d9c1 100644 --- a/pump/config.go +++ b/pump/config.go @@ -37,7 +37,7 @@ const ( defaultListenAddr = "127.0.0.1:8250" defautMaxKafkaSize = 1024 * 1024 * 1024 defaultHeartbeatInterval = 2 - defaultGC = 7 + defaultGC = "7" defaultDataDir = "data.pump" // default interval time to generate fake binlog, the unit is second @@ -64,8 +64,8 @@ type Config struct { EtcdDialTimeout time.Duration DataDir string `toml:"data-dir" json:"data-dir"` HeartbeatInterval int `toml:"heartbeat-interval" json:"heartbeat-interval"` - // pump only stores binlog events whose ts >= current time - GC(day) - GC int `toml:"gc" json:"gc"` + // pump only stores binlog events whose ts >= current time - GC Time. The default unit is day + GC util.Duration `toml:"gc" json:"gc"` LogFile string `toml:"log-file" json:"log-file"` Security security.Config `toml:"security" json:"security"` @@ -99,7 +99,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of the PD endpoints") fs.StringVar(&cfg.DataDir, "data-dir", "", "the path to store binlog data") fs.IntVar(&cfg.HeartbeatInterval, "heartbeat-interval", defaultHeartbeatInterval, "number of seconds between heartbeat ticks") - fs.IntVar(&cfg.GC, "gc", defaultGC, "recycle binlog files older than gc days") + fs.StringVar((*string)(&cfg.GC), "gc", defaultGC, "recycle binlog files older than gc time. default unit is day. also accept 8h format time(max unit is hour)") fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal") fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push") fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push") @@ -180,8 +180,12 @@ func (cfg *Config) configFromFile(path string) error { // validate checks whether the configuration is valid func (cfg *Config) validate() error { // check GC - if cfg.GC <= 0 { - return errors.Errorf("GC is %d, must bigger than 0", cfg.GC) + if duration, err := cfg.GC.ParseDuration(); err == nil { + if duration <= 0 { + return errors.Errorf("GC is %s, must bigger than 0", cfg.GC) + } + } else { + return errors.Errorf("parse GC time failed, err: %s", err) } // check ListenAddr diff --git a/pump/config_test.go b/pump/config_test.go index 76dc6a6d4..287ad6466 100644 --- a/pump/config_test.go +++ b/pump/config_test.go @@ -18,9 +18,11 @@ import ( "io/ioutil" "os" "path" + "time" "github.com/BurntSushi/toml" . "github.com/pingcap/check" + "github.com/pingcap/tidb-binlog/pkg/util" ) var _ = Suite(&testConfigSuite{}) @@ -29,7 +31,7 @@ type testConfigSuite struct{} func (s *testConfigSuite) TestValidate(c *C) { cfg := Config{} - cfg.GC = 1 + cfg.GC = util.NewDuration(24 * time.Hour) cfg.ListenAddr = "http://:8250" cfg.EtcdURLs = "http://192.168.10.23:7777" @@ -155,6 +157,100 @@ func (s *testConfigSuite) TestConfigParsingFileWithInvalidArgs(c *C) { c.Assert(err, ErrorMatches, ".*contained unknown configuration options: unrecognized-option-test.*") } +func (s *testConfigSuite) TestConfigParsingIntegerDuration(c *C) { + yc := struct { + ListenAddr string `toml:"addr" json:"addr"` + AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` + EtcdURLs string `toml:"pd-urls" json:"pd-urls"` + BinlogDir string `toml:"data-dir" json:"data-dir"` + GC int `toml:"gc" json:"gc"` + HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"` + }{ + "192.168.199.100:8260", + "192.168.199.100:8260", + "http://192.168.199.110:2379,http://hostname:2379", + "/tmp/pump", + 5, + 1500, + } + + var buf bytes.Buffer + e := toml.NewEncoder(&buf) + err := e.Encode(yc) + c.Assert(err, IsNil) + + configFilename := path.Join(c.MkDir(), "pump_config_gc_int.toml") + err = ioutil.WriteFile(configFilename, buf.Bytes(), 0644) + c.Assert(err, IsNil) + + args := []string{ + "--config", + configFilename, + "-L", "debug", + } + + cfg := NewConfig() + err = cfg.Parse(args) + c.Assert(err, IsNil) + duration, err := cfg.GC.ParseDuration() + c.Assert(err, IsNil) + c.Assert(duration, Equals, 5*24*time.Hour) + + // test whether gc config can be covered by command lines + args = []string{ + "--config", + configFilename, + "-L", "debug", + "--gc", "3", + } + cfg = NewConfig() + err = cfg.Parse(args) + c.Assert(err, IsNil) + duration, err = cfg.GC.ParseDuration() + c.Assert(err, IsNil) + c.Assert(duration, Equals, 3*24*time.Hour) +} + +func (s *testConfigSuite) TestConfigParsingStringDuration(c *C) { + yc := struct { + ListenAddr string `toml:"addr" json:"addr"` + AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` + EtcdURLs string `toml:"pd-urls" json:"pd-urls"` + BinlogDir string `toml:"data-dir" json:"data-dir"` + GC string `toml:"gc" json:"gc"` + HeartbeatInterval uint `toml:"heartbeat-interval" json:"heartbeat-interval"` + }{ + "192.168.199.100:8260", + "192.168.199.100:8260", + "http://192.168.199.110:2379,http://hostname:2379", + "/tmp/pump", + "30m", + 1500, + } + + var buf bytes.Buffer + e := toml.NewEncoder(&buf) + err := e.Encode(yc) + c.Assert(err, IsNil) + + configFilename := path.Join(c.MkDir(), "pump_config_gc_str.toml") + err = ioutil.WriteFile(configFilename, buf.Bytes(), 0644) + c.Assert(err, IsNil) + + args := []string{ + "--config", + configFilename, + "-L", "debug", + } + + cfg := NewConfig() + err = cfg.Parse(args) + c.Assert(err, IsNil) + duration, err := cfg.GC.ParseDuration() + c.Assert(err, IsNil) + c.Assert(duration, Equals, 30*time.Minute) +} + func mustSuccess(c *C, err error) { c.Assert(err, IsNil) } diff --git a/pump/server.go b/pump/server.go index 43689495c..009694b2c 100644 --- a/pump/server.go +++ b/pump/server.go @@ -107,6 +107,10 @@ func init() { // NewServer returns a instance of pump server func NewServer(cfg *Config) (*Server, error) { + gcDuration, err := cfg.GC.ParseDuration() + if err != nil { + return nil, errors.Trace(err) + } var metrics *util.MetricClient if cfg.MetricsAddr != "" && cfg.MetricsInterval != 0 { metrics = util.NewMetricClient( @@ -173,7 +177,7 @@ func NewServer(cfg *Config) (*Server, error) { cancel: cancel, metrics: metrics, tiStore: tiStore, - gcDuration: time.Duration(cfg.GC) * 24 * time.Hour, + gcDuration: gcDuration, pdCli: pdCli, cfg: cfg, triggerGC: make(chan time.Time), diff --git a/pump/server_test.go b/pump/server_test.go index 8e7d41c7b..cafdb4734 100644 --- a/pump/server_test.go +++ b/pump/server_test.go @@ -579,6 +579,7 @@ func (s *newServerSuite) SetUpTest(c *C) { LogLevel: "debug", MetricsAddr: "192.168.199.100:5000", MetricsInterval: 15, + GC: "7", Security: security.Config{ SSLCA: "/path/to/ca.pem", SSLCert: "/path/to/drainer.pem", @@ -713,7 +714,7 @@ func (s *startServerSuite) TestStartPumpServer(c *C) { ctx: ctx, cancel: cancel, tiStore: nil, - gcDuration: time.Duration(cfg.GC) * 24 * time.Hour, + gcDuration: 24 * time.Hour, pdCli: nil, cfg: cfg, triggerGC: make(chan time.Time),