Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: test watch restore in network-partitioned node, clean up fields, logging #9745

Merged
merged 10 commits into from
May 18, 2018
3 changes: 3 additions & 0 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- e.g. exit with error on `ETCD_INITIAL_CLUSTER_TOKEN=abc etcd --initial-cluster-token=def`.
- e.g. exit with error on `ETCDCTL_ENDPOINTS=abc.com ETCDCTL_API=3 etcdctl endpoint health --endpoints=def.com`.
- Change [`etcdserverpb.AuthRoleRevokePermissionRequest/key,range_end` fields type from `string` to `bytes`](https://github.com/coreos/etcd/pull/9433).
- Rename `etcdserver.ServerConfig.SnapCount` field to `etcdserver.ServerConfig.SnapshotCount`, to be consistent with the flag name `etcd --snapshot-count`.
- Rename `embed.Config.SnapCount` field to [`embed.Config.SnapshotCount`](https://github.com/coreos/etcd/pull/9745), to be consistent with the flag name `etcd --snapshot-count`.
- Change [`embed.Config.CorsInfo` in `*cors.CORSInfo` type to `embed.Config.CORS` in `map[string]struct{}` type](https://github.com/coreos/etcd/pull/9490).
- Remove [`embed.Config.SetupLogging`](https://github.com/coreos/etcd/pull/9572).
- Now logger is set up automatically based on [`embed.Config.Logger`, `embed.Config.LogOutputs`, `embed.Config.Debug` fields](https://github.com/coreos/etcd/pull/9572).
Expand Down Expand Up @@ -231,6 +233,7 @@ Note: **v3.5 will deprecate `etcd --log-package-levels` flag for `capnslog`**; `
- Remove [`embed.Config.SetupLogging`](https://github.com/coreos/etcd/pull/9572).
- Now logger is set up automatically based on [`embed.Config.Logger`, `embed.Config.LogOutputs`, `embed.Config.Debug` fields](https://github.com/coreos/etcd/pull/9572).
- Add [`embed.Config.Logger`](https://github.com/coreos/etcd/pull/9518) to support [structured logger `zap`](https://github.com/uber-go/zap) in server-side.
- Rename `embed.Config.SnapCount` field to [`embed.Config.SnapshotCount`](https://github.com/coreos/etcd/pull/9745), to be consistent with the flag name `etcd --snapshot-count`.
- Rename [**`embed.Config.LogOutput`** to **`embed.Config.LogOutputs`**](https://github.com/coreos/etcd/pull/9624) to support multiple log outputs.
- Change [**`embed.Config.LogOutputs`** type from `string` to `[]string`](https://github.com/coreos/etcd/pull/9579) to support multiple log outputs.

Expand Down
24 changes: 24 additions & 0 deletions Documentation/upgrades/upgrade_3_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,30 @@ if err != nil {
}
```

#### Changed `embed.Config.SnapCount` to `embed.Config.SnapshotCount`

To be consistent with the flag name `etcd --snapshot-count`, `embed.Config.SnapCount` field has been renamed to `embed.Config.SnapshotCount`:

```diff
import "github.com/coreos/etcd/embed"

cfg := embed.NewConfig()
-cfg.SnapCount = 100000
+cfg.SnapshotCount = 100000
```

#### Changed `etcdserver.ServerConfig.SnapCount` to `etcdserver.ServerConfig.SnapshotCount`

To be consistent with the flag name `etcd --snapshot-count`, `etcdserver.ServerConfig.SnapCount` field has been renamed to `etcdserver.ServerConfig.SnapshotCount`:

```diff
import "github.com/coreos/etcd/etcdserver"

srvcfg := etcdserver.ServerConfig{
- SnapCount: 100000,
+ SnapshotCount: 100000,
```

#### Changed function signature in package `wal`

Changed `wal` function signatures to support structured logger.
Expand Down
4 changes: 2 additions & 2 deletions contrib/raftexample/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type raftNode struct {
httpdonec chan struct{} // signals http server shutdown complete
}

var defaultSnapCount uint64 = 10000
var defaultSnapshotCount uint64 = 10000

// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
Expand All @@ -95,7 +95,7 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
waldir: fmt.Sprintf("raftexample-%d", id),
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
getSnapshot: getSnapshot,
snapCount: defaultSnapCount,
snapCount: defaultSnapshotCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
Expand Down
27 changes: 20 additions & 7 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,23 @@ func init() {

// Config holds the arguments for configuring an etcd server.
type Config struct {
Name string `json:"name"`
Dir string `json:"data-dir"`
WalDir string `json:"wal-dir"`
SnapCount uint64 `json:"snapshot-count"`
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
Dir string `json:"data-dir"`
WalDir string `json:"wal-dir"`

SnapshotCount uint64 `json:"snapshot-count"`

// SnapshotCatchUpEntries is the number of entries for a slow follower
// to catch-up after compacting the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
// WARNING: only change this for tests.
// Always use "DefaultSnapshotCatchUpEntries"
SnapshotCatchUpEntries uint64

MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`

// TickMs is the number of milliseconds between heartbeat ticks.
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
Expand Down Expand Up @@ -342,7 +353,9 @@ func NewConfig() *Config {

Name: DefaultName,

SnapCount: etcdserver.DefaultSnapCount,
SnapshotCount: etcdserver.DefaultSnapshotCount,
SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries,

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,

Expand Down
2 changes: 1 addition & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
PeerURLs: cfg.APUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapCount: cfg.SnapCount,
SnapshotCount: cfg.SnapshotCount,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func newConfig() *config {
fs.UintVar(&cfg.ec.MaxSnapFiles, "max-snapshots", cfg.ec.MaxSnapFiles, "Maximum number of snapshot files to retain (0 is unlimited).")
fs.UintVar(&cfg.ec.MaxWalFiles, "max-wals", cfg.ec.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).")
fs.StringVar(&cfg.ec.Name, "name", cfg.ec.Name, "Human-readable name for this member.")
fs.Uint64Var(&cfg.ec.SnapCount, "snapshot-count", cfg.ec.SnapCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.Uint64Var(&cfg.ec.SnapshotCount, "snapshot-count", cfg.ec.SnapshotCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.UintVar(&cfg.ec.TickMs, "heartbeat-interval", cfg.ec.TickMs, "Time (in milliseconds) of a heartbeat interval.")
fs.UintVar(&cfg.ec.ElectionMs, "election-timeout", cfg.ec.ElectionMs, "Time (in milliseconds) for an election to timeout.")
fs.BoolVar(&cfg.ec.InitialElectionTickAdvance, "initial-election-tick-advance", cfg.ec.InitialElectionTickAdvance, "Whether to fast-forward initial election ticks on boot for faster election.")
Expand Down
20 changes: 10 additions & 10 deletions etcdmain/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestConfigFileMemberFields(t *testing.T) {
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
SnapshotCount uint64 `json:"snapshot-count"`
LPUrls string `json:"listen-peer-urls"`
LCUrls string `json:"listen-client-urls"`
AcurlsCfgFile string `json:"advertise-client-urls"`
Expand Down Expand Up @@ -513,13 +513,13 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {

func validateMemberFlags(t *testing.T, cfg *config) {
wcfg := &embed.Config{
Dir: "testdir",
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
SnapCount: 10,
Dir: "testdir",
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
SnapshotCount: 10,
}

if cfg.ec.Dir != wcfg.Dir {
Expand All @@ -534,8 +534,8 @@ func validateMemberFlags(t *testing.T, cfg *config) {
if cfg.ec.Name != wcfg.Name {
t.Errorf("name = %v, want %v", cfg.ec.Name, wcfg.Name)
}
if cfg.ec.SnapCount != wcfg.SnapCount {
t.Errorf("snapcount = %v, want %v", cfg.ec.SnapCount, wcfg.SnapCount)
if cfg.ec.SnapshotCount != wcfg.SnapshotCount {
t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount)
}
if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) {
t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls)
Expand Down
24 changes: 18 additions & 6 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,21 @@ type ServerConfig struct {
DataDir string
// DedicatedWALDir config will make the etcd to write the WAL to the WALDir
// rather than the dataDir/member/wal.
DedicatedWALDir string
SnapCount uint64
MaxSnapFiles uint
MaxWALFiles uint
DedicatedWALDir string

SnapshotCount uint64

// SnapshotCatchUpEntries is the number of entries for a slow follower
// to catch-up after compacting the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
// WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries"
SnapshotCatchUpEntries uint64

MaxSnapFiles uint
MaxWALFiles uint

InitialPeerURLsMap types.URLsMap
InitialClusterToken string
NewCluster bool
Expand Down Expand Up @@ -273,7 +284,7 @@ func (c *ServerConfig) print(initial bool) {
}
plog.Infof("heartbeat = %dms", c.TickMs)
plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs))
plog.Infof("snapshot count = %d", c.SnapCount)
plog.Infof("snapshot count = %d", c.SnapshotCount)
if len(c.DiscoveryURL) != 0 {
plog.Infof("discovery URL= %s", c.DiscoveryURL)
if len(c.DiscoveryProxy) != 0 {
Expand Down Expand Up @@ -302,7 +313,8 @@ func (c *ServerConfig) print(initial bool) {
zap.Int("election-tick-ms", c.ElectionTicks),
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond)),
zap.Bool("initial-election-tick-advance", c.InitialElectionTickAdvance),
zap.Uint64("snapshot-count", c.SnapCount),
zap.Uint64("snapshot-count", c.SnapshotCount),
zap.Uint64("snapshot-catchup-entries", c.SnapshotCatchUpEntries),
zap.Strings("advertise-client-urls", c.getACURLs()),
zap.Strings("initial-advertise-peer-urls", c.getAPURLs()),
zap.Bool("initial", initial),
Expand Down
7 changes: 0 additions & 7 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ import (
)

const (
// Number of entries for slow follower to catch-up after compacting
// the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
numberOfCatchUpEntries = 5000

// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
// Assuming the RTT is around 10ms, 1MB max size is large enough.
maxSizePerMsg = 1 * 1024 * 1024
Expand Down
50 changes: 39 additions & 11 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ import (
)

const (
DefaultSnapCount = 100000
DefaultSnapshotCount = 100000

// DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
// to catch-up after compacting the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"
Expand Down Expand Up @@ -703,14 +710,30 @@ func (s *EtcdServer) Start() {
// This function is just used for testing.
func (s *EtcdServer) start() {
lg := s.getLogger()
if s.Cfg.SnapCount == 0 {
if lg != nil {

if s.Cfg.SnapshotCount == 0 {
if lg != nil {
lg.Info(
"updating snapshot-count to default",
zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
)
} else {
plog.Infof("set snapshot count to default %d", DefaultSnapCount)
plog.Infof("set snapshot count to default %d", DefaultSnapshotCount)
}
s.Cfg.SnapshotCount = DefaultSnapshotCount
}
if s.Cfg.SnapshotCatchUpEntries == 0 {
if lg != nil {
lg.Info(
"updating snapshot catch-up entries to default",
zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
)
}
s.Cfg.SnapCount = DefaultSnapCount
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}

s.w = wait.New()
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
Expand Down Expand Up @@ -743,6 +766,7 @@ func (s *EtcdServer) start() {
plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
}
}

// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
Expand Down Expand Up @@ -1058,7 +1082,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
} else {
plog.Infof("applying snapshot at index %d...", ep.snapi)
Expand All @@ -1069,7 +1094,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
} else {
plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
Expand All @@ -1083,6 +1109,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
} else {
plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
Expand Down Expand Up @@ -1304,7 +1331,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
return
}

Expand All @@ -1314,7 +1341,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
zap.String("local-member-id", s.ID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.snapi),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapCount),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
)
} else {
plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
Expand Down Expand Up @@ -2132,9 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > numberOfCatchUpEntries {
compacti = snapi - numberOfCatchUpEntries
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand Down
Loading