Skip to content

Commit

Permalink
Merge pull request #9745 from gyuho/watch
Browse files Browse the repository at this point in the history
*: test watch restore in network-partitioned node, clean up fields, logging
  • Loading branch information
gyuho authored May 18, 2018
2 parents 1c904b1 + 31094e5 commit 1a399bd
Show file tree
Hide file tree
Showing 21 changed files with 250 additions and 79 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- e.g. exit with error on `ETCD_INITIAL_CLUSTER_TOKEN=abc etcd --initial-cluster-token=def`.
- e.g. exit with error on `ETCDCTL_ENDPOINTS=abc.com ETCDCTL_API=3 etcdctl endpoint health --endpoints=def.com`.
- Change [`etcdserverpb.AuthRoleRevokePermissionRequest/key,range_end` fields type from `string` to `bytes`](https://github.com/coreos/etcd/pull/9433).
- Rename `etcdserver.ServerConfig.SnapCount` field to `etcdserver.ServerConfig.SnapshotCount`, to be consistent with the flag name `etcd --snapshot-count`.
- Rename `embed.Config.SnapCount` field to [`embed.Config.SnapshotCount`](https://github.com/coreos/etcd/pull/9745), to be consistent with the flag name `etcd --snapshot-count`.
- Change [`embed.Config.CorsInfo` in `*cors.CORSInfo` type to `embed.Config.CORS` in `map[string]struct{}` type](https://github.com/coreos/etcd/pull/9490).
- Remove [`embed.Config.SetupLogging`](https://github.com/coreos/etcd/pull/9572).
- Now logger is set up automatically based on [`embed.Config.Logger`, `embed.Config.LogOutputs`, `embed.Config.Debug` fields](https://github.com/coreos/etcd/pull/9572).
Expand Down Expand Up @@ -231,6 +233,7 @@ Note: **v3.5 will deprecate `etcd --log-package-levels` flag for `capnslog`**; `
- Remove [`embed.Config.SetupLogging`](https://github.com/coreos/etcd/pull/9572).
- Now logger is set up automatically based on [`embed.Config.Logger`, `embed.Config.LogOutputs`, `embed.Config.Debug` fields](https://github.com/coreos/etcd/pull/9572).
- Add [`embed.Config.Logger`](https://github.com/coreos/etcd/pull/9518) to support [structured logger `zap`](https://github.com/uber-go/zap) in server-side.
- Rename `embed.Config.SnapCount` field to [`embed.Config.SnapshotCount`](https://github.com/coreos/etcd/pull/9745), to be consistent with the flag name `etcd --snapshot-count`.
- Rename [**`embed.Config.LogOutput`** to **`embed.Config.LogOutputs`**](https://github.com/coreos/etcd/pull/9624) to support multiple log outputs.
- Change [**`embed.Config.LogOutputs`** type from `string` to `[]string`](https://github.com/coreos/etcd/pull/9579) to support multiple log outputs.

Expand Down
24 changes: 24 additions & 0 deletions Documentation/upgrades/upgrade_3_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,30 @@ if err != nil {
}
```

#### Changed `embed.Config.SnapCount` to `embed.Config.SnapshotCount`

To be consistent with the flag name `etcd --snapshot-count`, `embed.Config.SnapCount` field has been renamed to `embed.Config.SnapshotCount`:

```diff
import "github.com/coreos/etcd/embed"

cfg := embed.NewConfig()
-cfg.SnapCount = 100000
+cfg.SnapshotCount = 100000
```

#### Changed `etcdserver.ServerConfig.SnapCount` to `etcdserver.ServerConfig.SnapshotCount`

To be consistent with the flag name `etcd --snapshot-count`, `etcdserver.ServerConfig.SnapCount` field has been renamed to `etcdserver.ServerConfig.SnapshotCount`:

```diff
import "github.com/coreos/etcd/etcdserver"

srvcfg := etcdserver.ServerConfig{
- SnapCount: 100000,
+ SnapshotCount: 100000,
```

#### Changed function signature in package `wal`

Changed `wal` function signatures to support structured logger.
Expand Down
4 changes: 2 additions & 2 deletions contrib/raftexample/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type raftNode struct {
httpdonec chan struct{} // signals http server shutdown complete
}

var defaultSnapCount uint64 = 10000
var defaultSnapshotCount uint64 = 10000

// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
Expand All @@ -95,7 +95,7 @@ func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte,
waldir: fmt.Sprintf("raftexample-%d", id),
snapdir: fmt.Sprintf("raftexample-%d-snap", id),
getSnapshot: getSnapshot,
snapCount: defaultSnapCount,
snapCount: defaultSnapshotCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
Expand Down
27 changes: 20 additions & 7 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,23 @@ func init() {

// Config holds the arguments for configuring an etcd server.
type Config struct {
Name string `json:"name"`
Dir string `json:"data-dir"`
WalDir string `json:"wal-dir"`
SnapCount uint64 `json:"snapshot-count"`
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
Dir string `json:"data-dir"`
WalDir string `json:"wal-dir"`

SnapshotCount uint64 `json:"snapshot-count"`

// SnapshotCatchUpEntries is the number of entries for a slow follower
// to catch-up after compacting the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
// WARNING: only change this for tests.
// Always use "DefaultSnapshotCatchUpEntries"
SnapshotCatchUpEntries uint64

MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`

// TickMs is the number of milliseconds between heartbeat ticks.
// TODO: decouple tickMs and heartbeat tick (current heartbeat tick = 1).
Expand Down Expand Up @@ -342,7 +353,9 @@ func NewConfig() *Config {

Name: DefaultName,

SnapCount: etcdserver.DefaultSnapCount,
SnapshotCount: etcdserver.DefaultSnapshotCount,
SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries,

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,

Expand Down
2 changes: 1 addition & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
PeerURLs: cfg.APUrls,
DataDir: cfg.Dir,
DedicatedWALDir: cfg.WalDir,
SnapCount: cfg.SnapCount,
SnapshotCount: cfg.SnapshotCount,
MaxSnapFiles: cfg.MaxSnapFiles,
MaxWALFiles: cfg.MaxWalFiles,
InitialPeerURLsMap: urlsmap,
Expand Down
2 changes: 1 addition & 1 deletion etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func newConfig() *config {
fs.UintVar(&cfg.ec.MaxSnapFiles, "max-snapshots", cfg.ec.MaxSnapFiles, "Maximum number of snapshot files to retain (0 is unlimited).")
fs.UintVar(&cfg.ec.MaxWalFiles, "max-wals", cfg.ec.MaxWalFiles, "Maximum number of wal files to retain (0 is unlimited).")
fs.StringVar(&cfg.ec.Name, "name", cfg.ec.Name, "Human-readable name for this member.")
fs.Uint64Var(&cfg.ec.SnapCount, "snapshot-count", cfg.ec.SnapCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.Uint64Var(&cfg.ec.SnapshotCount, "snapshot-count", cfg.ec.SnapshotCount, "Number of committed transactions to trigger a snapshot to disk.")
fs.UintVar(&cfg.ec.TickMs, "heartbeat-interval", cfg.ec.TickMs, "Time (in milliseconds) of a heartbeat interval.")
fs.UintVar(&cfg.ec.ElectionMs, "election-timeout", cfg.ec.ElectionMs, "Time (in milliseconds) for an election to timeout.")
fs.BoolVar(&cfg.ec.InitialElectionTickAdvance, "initial-election-tick-advance", cfg.ec.InitialElectionTickAdvance, "Whether to fast-forward initial election ticks on boot for faster election.")
Expand Down
20 changes: 10 additions & 10 deletions etcdmain/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestConfigFileMemberFields(t *testing.T) {
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapCount uint64 `json:"snapshot-count"`
SnapshotCount uint64 `json:"snapshot-count"`
LPUrls string `json:"listen-peer-urls"`
LCUrls string `json:"listen-client-urls"`
AcurlsCfgFile string `json:"advertise-client-urls"`
Expand Down Expand Up @@ -513,13 +513,13 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {

func validateMemberFlags(t *testing.T, cfg *config) {
wcfg := &embed.Config{
Dir: "testdir",
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
SnapCount: 10,
Dir: "testdir",
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
SnapshotCount: 10,
}

if cfg.ec.Dir != wcfg.Dir {
Expand All @@ -534,8 +534,8 @@ func validateMemberFlags(t *testing.T, cfg *config) {
if cfg.ec.Name != wcfg.Name {
t.Errorf("name = %v, want %v", cfg.ec.Name, wcfg.Name)
}
if cfg.ec.SnapCount != wcfg.SnapCount {
t.Errorf("snapcount = %v, want %v", cfg.ec.SnapCount, wcfg.SnapCount)
if cfg.ec.SnapshotCount != wcfg.SnapshotCount {
t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount)
}
if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) {
t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls)
Expand Down
24 changes: 18 additions & 6 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,21 @@ type ServerConfig struct {
DataDir string
// DedicatedWALDir config will make the etcd to write the WAL to the WALDir
// rather than the dataDir/member/wal.
DedicatedWALDir string
SnapCount uint64
MaxSnapFiles uint
MaxWALFiles uint
DedicatedWALDir string

SnapshotCount uint64

// SnapshotCatchUpEntries is the number of entries for a slow follower
// to catch-up after compacting the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
// WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries"
SnapshotCatchUpEntries uint64

MaxSnapFiles uint
MaxWALFiles uint

InitialPeerURLsMap types.URLsMap
InitialClusterToken string
NewCluster bool
Expand Down Expand Up @@ -273,7 +284,7 @@ func (c *ServerConfig) print(initial bool) {
}
plog.Infof("heartbeat = %dms", c.TickMs)
plog.Infof("election = %dms", c.ElectionTicks*int(c.TickMs))
plog.Infof("snapshot count = %d", c.SnapCount)
plog.Infof("snapshot count = %d", c.SnapshotCount)
if len(c.DiscoveryURL) != 0 {
plog.Infof("discovery URL= %s", c.DiscoveryURL)
if len(c.DiscoveryProxy) != 0 {
Expand Down Expand Up @@ -302,7 +313,8 @@ func (c *ServerConfig) print(initial bool) {
zap.Int("election-tick-ms", c.ElectionTicks),
zap.String("election-timeout", fmt.Sprintf("%v", time.Duration(c.ElectionTicks*int(c.TickMs))*time.Millisecond)),
zap.Bool("initial-election-tick-advance", c.InitialElectionTickAdvance),
zap.Uint64("snapshot-count", c.SnapCount),
zap.Uint64("snapshot-count", c.SnapshotCount),
zap.Uint64("snapshot-catchup-entries", c.SnapshotCatchUpEntries),
zap.Strings("advertise-client-urls", c.getACURLs()),
zap.Strings("initial-advertise-peer-urls", c.getAPURLs()),
zap.Bool("initial", initial),
Expand Down
7 changes: 0 additions & 7 deletions etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ import (
)

const (
// Number of entries for slow follower to catch-up after compacting
// the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
numberOfCatchUpEntries = 5000

// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
// Assuming the RTT is around 10ms, 1MB max size is large enough.
maxSizePerMsg = 1 * 1024 * 1024
Expand Down
50 changes: 39 additions & 11 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ import (
)

const (
DefaultSnapCount = 100000
DefaultSnapshotCount = 100000

// DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
// to catch-up after compacting the raft storage entries.
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"
Expand Down Expand Up @@ -703,14 +710,30 @@ func (s *EtcdServer) Start() {
// This function is just used for testing.
func (s *EtcdServer) start() {
lg := s.getLogger()
if s.Cfg.SnapCount == 0 {
if lg != nil {

if s.Cfg.SnapshotCount == 0 {
if lg != nil {
lg.Info(
"updating snapshot-count to default",
zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
)
} else {
plog.Infof("set snapshot count to default %d", DefaultSnapCount)
plog.Infof("set snapshot count to default %d", DefaultSnapshotCount)
}
s.Cfg.SnapshotCount = DefaultSnapshotCount
}
if s.Cfg.SnapshotCatchUpEntries == 0 {
if lg != nil {
lg.Info(
"updating snapshot catch-up entries to default",
zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
)
}
s.Cfg.SnapCount = DefaultSnapCount
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}

s.w = wait.New()
s.applyWait = wait.NewTimeList()
s.done = make(chan struct{})
Expand Down Expand Up @@ -743,6 +766,7 @@ func (s *EtcdServer) start() {
plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
}
}

// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
Expand Down Expand Up @@ -1058,7 +1082,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
"applying snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
} else {
plog.Infof("applying snapshot at index %d...", ep.snapi)
Expand All @@ -1069,7 +1094,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
"applied snapshot",
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
} else {
plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
Expand All @@ -1083,6 +1109,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
zap.Uint64("current-snapshot-index", ep.snapi),
zap.Uint64("current-applied-index", ep.appliedi),
zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
)
} else {
plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
Expand Down Expand Up @@ -1304,7 +1331,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
return
}

Expand All @@ -1314,7 +1341,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
zap.String("local-member-id", s.ID().String()),
zap.Uint64("local-member-applied-index", ep.appliedi),
zap.Uint64("local-member-snapshot-index", ep.snapi),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapCount),
zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
)
} else {
plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
Expand Down Expand Up @@ -2132,9 +2159,10 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > numberOfCatchUpEntries {
compacti = snapi - numberOfCatchUpEntries
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand Down
Loading

0 comments on commit 1a399bd

Please sign in to comment.