From ed409eba42a594a0acaeaffd17c041206d92b20a Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 29 May 2018 17:06:42 -0400 Subject: [PATCH] Add WAL directory tailer Signed-off-by: Fabian Reinartz --- tail/tail.go | 129 ++++++++++++++++++++++++++++++++++++++++++++ tail/tail_test.go | 134 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 263 insertions(+) create mode 100644 tail/tail.go create mode 100644 tail/tail_test.go diff --git a/tail/tail.go b/tail/tail.go new file mode 100644 index 00000000..ef36aebf --- /dev/null +++ b/tail/tail.go @@ -0,0 +1,129 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tail + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "path/filepath" + "strconv" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/wal" +) + +type tailer struct { + ctx context.Context + dir string + cur io.ReadCloser + nextSegment int +} + +// Tail the prommetheus/tsdb write ahead log in the given directory. Checkpoints +// are read before reading any WAL segments. +// Tailing may fail if we are racing with the DB itself in deleting obsolete checkpoints +// and segments. The caller should implement relevant logic to retry in those cases. +func Tail(ctx context.Context, dir string) (io.ReadCloser, error) { + t := &tailer{ + ctx: ctx, + dir: dir, + } + cpdir, k, err := tsdb.LastCheckpoint(dir) + if err == tsdb.ErrNotFound { + t.cur = ioutil.NopCloser(bytes.NewReader(nil)) + t.nextSegment = 0 + } else { + if err != nil { + return nil, errors.Wrap(err, "retrieve last checkpoint") + } + // Open the entire checkpoint first. It has to be consumed before + // the tailer proceeds to any segments. + t.cur, err = wal.NewSegmentsReader(cpdir) + if err != nil { + return nil, errors.Wrap(err, "open checkpoint") + } + t.nextSegment = k + 1 + } + return t, nil +} + +func (t *tailer) Close() error { + return t.cur.Close() +} + +func (t *tailer) Read(b []byte) (int, error) { + const maxBackoff = 3 * time.Second + backoff := 10 * time.Millisecond + + for { + n, err := t.cur.Read(b) + if err != io.EOF { + return n, err + } + select { + case <-t.ctx.Done(): + // We return EOF here. This will make the WAL reader identify a corruption + // if we terminate mid stream. But at least we have a clean shutdown if we + // realy read till the end of a stopped WAL. + return n, io.EOF + default: + } + // Check if the next segment already exists. Then the current + // one is really done. + // We could do something more sophisticated to save syscalls, but this + // seems fine for the expected throughput (<5MB/s). + next, err := openSegment(t.dir, t.nextSegment) + if err == tsdb.ErrNotFound { + // Next segment doesn't exist yet. We'll probably just have to + // wait for more data to be written. + select { + case <-time.After(backoff): + case <-t.ctx.Done(): + return n, io.EOF + } + if backoff *= 2; backoff > maxBackoff { + backoff = maxBackoff + } + continue + } else if err != nil { + return n, errors.Wrap(err, "open next segment") + } + t.cur = next + t.nextSegment++ + } +} + +func openSegment(dir string, n int) (io.ReadCloser, error) { + files, err := fileutil.ReadDir(dir) + if err != nil { + return nil, err + } + for _, fn := range files { + k, err := strconv.Atoi(fn) + if err != nil || k < n { + continue + } + if k > n { + return nil, errors.Errorf("next segment %d too high, expected %d", n, k) + } + return wal.OpenReadSegment(filepath.Join(dir, fn)) + } + return nil, tsdb.ErrNotFound +} diff --git a/tail/tail_test.go b/tail/tail_test.go new file mode 100644 index 00000000..4624b8bb --- /dev/null +++ b/tail/tail_test.go @@ -0,0 +1,134 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tail + +import ( + "bytes" + "context" + "io/ioutil" + "math/rand" + "os" + "testing" + "time" + + "github.com/prometheus/tsdb/wal" +) + +func TestTailFuzz(t *testing.T) { + dir, err := ioutil.TempDir("", "test_tail") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + ctx, cancel := context.WithCancel(context.Background()) + + rc, err := Tail(ctx, dir) + if err != nil { + t.Fatal(err) + } + defer rc.Close() + + w, err := wal.NewSize(nil, nil, dir, 2*1024*1024) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + var written [][]byte + var read [][]byte + + // Start background writer. + const count = 50000 + go func() { + for i := 0; i < count; i++ { + if i%100 == 0 { + time.Sleep(time.Duration(rand.Intn(10 * int(time.Millisecond)))) + } + rec := make([]byte, rand.Intn(5337)) + if _, err := rand.Read(rec); err != nil { + t.Fatal(err) + } + if err := w.Log(rec); err != nil { + t.Fatal(err) + } + written = append(written, rec) + } + time.Sleep(time.Second) + cancel() + }() + + wr := wal.NewReader(rc) + + for wr.Next() { + read = append(read, append([]byte(nil), wr.Record()...)) + } + if wr.Err() != nil { + t.Fatal(wr.Err()) + } + if len(written) != len(read) { + t.Fatal("didn't read all records") + } + for i, r := range read { + if !bytes.Equal(r, written[i]) { + t.Fatalf("record %d doesn't match", i) + } + } +} + +func BenchmarkTailFuzz(t *testing.B) { + dir, err := ioutil.TempDir("", "test_tail") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rc, err := Tail(ctx, dir) + if err != nil { + t.Fatal(err) + } + defer rc.Close() + + w, err := wal.NewSize(nil, nil, dir, 32*1024*1024) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + t.SetBytes(4 * 2000) // Average record size times worker count. + t.ResetTimer() + + var rec [4000]byte + count := t.N * 4 + for k := 0; k < 4; k++ { + go func() { + for i := 0; i < count/4; i++ { + if err := w.Log(rec[:rand.Intn(4000)]); err != nil { + t.Fatal(err) + } + } + }() + } + + wr := wal.NewReader(rc) + + for i := 1; wr.Next(); i++ { + if i == t.N*4 { + break + } + } +}