Skip to content

Commit

Permalink
Merge pull request #11738 from gyuho/wal-metrics
Browse files Browse the repository at this point in the history
wal: add "etcd_wal_writes_bytes_total"
  • Loading branch information
gyuho authored Apr 1, 2020
2 parents 054b0e5 + 381e77c commit 5ceb701
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 8 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- Change [`etcd_cluster_version`](https://github.com/etcd-io/etcd/pull/11254) Prometheus metrics to include only major and minor version.
- Add [`etcd_debugging_mvcc_total_put_size_in_bytes`](https://github.com/etcd-io/etcd/pull/11374) Prometheus metric.
- Add [`etcd_server_client_requests_total` with `"type"` and `"client_api_version"` labels](https://github.com/etcd-io/etcd/pull/11687).
- Add [`etcd_wal_write_bytes_total`](https://github.com/etcd-io/etcd/pull/11738).

### etcd server

Expand Down Expand Up @@ -121,6 +122,10 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
- https://github.com/etcd-io/etcd/issues/11495
- https://github.com/etcd-io/etcd/issues/11730

### Package `wal`

- Add [`etcd_wal_write_bytes_total`](https://github.com/etcd-io/etcd/pull/11738).

### etcdctl v3

- Fix `etcdctl member add` command to prevent potential timeout. ([PR#11194](https://github.com/etcd-io/etcd/pull/11194) and [PR#11638](https://github.com/etcd-io/etcd/pull/11638))
Expand Down
17 changes: 14 additions & 3 deletions pkg/ioutil/pagewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,23 @@ func (pw *PageWriter) Write(p []byte) (n int, err error) {
return n, werr
}

// Flush flushes buffered data.
func (pw *PageWriter) Flush() error {
_, err := pw.flush()
return err
}

// FlushN flushes buffered data and returns the number of written bytes.
func (pw *PageWriter) FlushN() (int, error) {
return pw.flush()
}

func (pw *PageWriter) flush() (int, error) {
if pw.bufferedBytes == 0 {
return nil
return 0, nil
}
_, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
n, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
pw.bufferedBytes = 0
return err
return n, err
}
12 changes: 8 additions & 4 deletions wal/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func (e *encoder) encode(rec *walpb.Record) error {
if padBytes != 0 {
data = append(data, make([]byte, padBytes)...)
}
_, err = e.bw.Write(data)
n, err = e.bw.Write(data)
walWriteBytes.Add(float64(n))
return err
}

Expand All @@ -108,13 +109,16 @@ func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {

func (e *encoder) flush() error {
e.mu.Lock()
defer e.mu.Unlock()
return e.bw.Flush()
n, err := e.bw.FlushN()
e.mu.Unlock()
walWriteBytes.Add(float64(n))
return err
}

func writeUint64(w io.Writer, n uint64, buf []byte) error {
// http://golang.org/src/encoding/binary/binary.go
binary.LittleEndian.PutUint64(buf, n)
_, err := w.Write(buf)
nv, err := w.Write(buf)
walWriteBytes.Add(float64(nv))
return err
}
8 changes: 8 additions & 0 deletions wal/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ var (
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
})

walWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "wal_write_bytes_total",
Help: "Total number of bytes written in WAL.",
})
)

func init() {
prometheus.MustRegister(walFsyncSec)
prometheus.MustRegister(walWriteBytes)
}
4 changes: 3 additions & 1 deletion wal/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"io"
"os"
"path/filepath"
"time"

"go.etcd.io/etcd/pkg/fileutil"
"go.etcd.io/etcd/wal/walpb"

"go.uber.org/zap"
)

Expand Down Expand Up @@ -86,10 +86,12 @@ func Repair(lg *zap.Logger, dirpath string) bool {
return false
}

start := time.Now()
if err = fileutil.Fsync(f.File); err != nil {
lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err))
return false
}
walFsyncSec.Observe(time.Since(start).Seconds())

lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF))
return true
Expand Down
5 changes: 5 additions & 0 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
)
return nil, perr
}
start := time.Now()
if perr = fileutil.Fsync(pdir); perr != nil {
lg.Warn(
"failed to fsync the parent data directory file",
Expand All @@ -202,6 +203,8 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
)
return nil, perr
}
walFsyncSec.Observe(time.Since(start).Seconds())

if perr = pdir.Close(); perr != nil {
lg.Warn(
"failed to close the parent data directory file",
Expand Down Expand Up @@ -647,9 +650,11 @@ func (w *WAL) cut() error {
if err = os.Rename(newTail.Name(), fpath); err != nil {
return err
}
start := time.Now()
if err = fileutil.Fsync(w.dirFile); err != nil {
return err
}
walFsyncSec.Observe(time.Since(start).Seconds())

// reopen newTail with its new path so calls to Name() match the wal filename format
newTail.Close()
Expand Down

0 comments on commit 5ceb701

Please sign in to comment.