Skip to content

Commit

Permalink
pump/: Fix get L0 files num and trigger vlog.gcTS step by step (#648) (
Browse files Browse the repository at this point in the history
…#658)

1, Add a API to trigger gc
2, Check L0 files by GetProperty instead of Stats
Stats can't get the right L0 files num, see:
https://github.com/syndtr/goleveldb/pull/283/files
3, call vlog.gcTS step by step, help free space quickly
4, update goleveldb dependency
only one new commit
syndtr/goleveldb@02440ea
Fix the stat about levels of the Stats API
  • Loading branch information
july2993 authored Jul 2, 2019
1 parent 76433a7 commit 4808e90
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 35 deletions.
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ require (
github.com/soheilhy/cmux v0.1.2
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.3.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/unrolled/render v0.0.0-20180807193321-4206df6ff701
github.com/zanmato1984/clickhouse v1.3.4-0.20181106115746-3e9a6b9beb12
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1 // indirect
go.uber.org/zap v1.9.1
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f // indirect
golang.org/x/net v0.0.0-20190522155817-f3200d17e092
golang.org/x/sync v0.0.0-20190423024810-112230192c58
Expand Down
8 changes: 2 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ github.com/cznic/mathutil v0.0.0-20160613104831-78ad7f262603 h1:hhR9hTi0ligs11Jj
github.com/cznic/mathutil v0.0.0-20160613104831-78ad7f262603/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=
github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65 h1:hxuZop6tSoOi0sxFzoGGYdRqNrPubyaIf9KoBG9tPiE=
github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
Expand Down Expand Up @@ -181,13 +180,10 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 h1:1oFLiOyVl+W7bnBzGhf7BbIv9loSFQcieWWYIjLqcAw=
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKyceA6DiCsngFof9jAyeaSyX9XC5a1a7Q=
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/twinj/uuid v0.0.0-20150629100731-70cac2bcd273 h1:YqFyfcgqxQqjpRr0SEG0Z555J/3kPqDL/xmRyeAaX/0=
Expand Down
50 changes: 33 additions & 17 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Server struct {
cancel context.CancelFunc
wg sync.WaitGroup
gcDuration time.Duration
triggerGC chan time.Time
metrics *metricClient
// save the last time we write binlog to Storage
// if long time not write, we can write a fake binlog
Expand Down Expand Up @@ -153,6 +154,7 @@ func NewServer(cfg *Config) (*Server, error) {
gcDuration: time.Duration(cfg.GC) * 24 * time.Hour,
pdCli: pdCli,
cfg: cfg,
triggerGC: make(chan time.Time),
}, nil
}

Expand Down Expand Up @@ -396,6 +398,7 @@ func (s *Server) Start() error {
router.HandleFunc("/state/{nodeID}/{action}", s.ApplyAction).Methods("PUT")
router.HandleFunc("/drainers", s.AllDrainers).Methods("GET")
router.HandleFunc("/debug/binlog/{ts}", s.BinlogByTS).Methods("GET")
router.HandleFunc("/debug/gc/trigger", s.TriggerGC).Methods("POST")
http.Handle("/", router)
prometheus.DefaultGatherer = registry
http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -512,30 +515,33 @@ func (s *Server) gcBinlogFile() {
case <-s.ctx.Done():
log.Info("gcBinlogFile exit")
return
case <-s.triggerGC:
log.Info("trigger gc now")
case <-time.After(time.Hour):
if s.gcDuration == 0 {
continue
}
}

safeTSO, err := s.getSaveGCTSOForDrainers()
if err != nil {
log.Warn(err)
continue
}
log.Info("safe ts for drainers: ", safeTSO)
if s.gcDuration == 0 {
continue
}

millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000
gcTS := int64(oracle.EncodeTSO(millisecond))
if safeTSO < gcTS {
gcTS = safeTSO
}
log.Info("gc ts: ", gcTS)
s.storage.GCTS(gcTS)
safeTSO, err := s.getSafeGCTSOForDrainers()
if err != nil {
log.Warn("get save gc tso for drainers failed: %+v", err)
continue
}
log.Infof("get safe ts for drainers success, ts: %d", safeTSO)

millisecond := time.Now().Add(-s.gcDuration).UnixNano() / 1000 / 1000
gcTS := int64(oracle.EncodeTSO(millisecond))
if safeTSO < gcTS {
gcTS = safeTSO
}
log.Infof("send gc request to storage, ts: %d", gcTS)
s.storage.GCTS(gcTS)
}
}

func (s *Server) getSaveGCTSOForDrainers() (int64, error) {
func (s *Server) getSafeGCTSOForDrainers() (int64, error) {
pumpNode := s.node.(*pumpNode)

drainers, err := pumpNode.Nodes(s.ctx, "drainers")
Expand Down Expand Up @@ -589,6 +595,16 @@ func (s *Server) Status(w http.ResponseWriter, r *http.Request) {
s.PumpStatus().Status(w, r)
}

// TriggerGC trigger pump to gc now
func (s *Server) TriggerGC(w http.ResponseWriter, r *http.Request) {
select {
case s.triggerGC <- time.Now():
fmt.Fprintln(w, "trigger gc success")
default:
fmt.Fprintln(w, "gc is working")
}
}

// BinlogByTS exposes api get get binlog by ts
func (s *Server) BinlogByTS(w http.ResponseWriter, r *http.Request) {
tsStr := mux.Vars(r)["ts"]
Expand Down
37 changes: 28 additions & 9 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path"
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
"go.uber.org/zap"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -531,7 +533,6 @@ func (a *Append) GCTS(ts int64) {
// so we forward a little bit to make sure we can get the according P binlog
a.doGCTS(ts - int64(oracle.EncodeTSO(maxTxnTimeoutSecond*1000)))
}()

}

func (a *Append) doGCTS(ts int64) {
Expand All @@ -543,16 +544,23 @@ func (a *Append) doGCTS(ts int64) {
l0Trigger = a.options.KVConfig.CompactionL0Trigger
}

deleteNum := 0

for {
var stats leveldb.DBStats
err := a.metadata.Stats(&stats)
nStr, err := a.metadata.GetProperty("leveldb.num-files-at-level0")
if err != nil {
log.Error(err)
time.Sleep(5 * time.Second)
continue
log.Errorf("get `leveldb.num-files-at-level0` property failed: %+v", err)
return
}

l0Num, err := strconv.Atoi(nStr)
if err != nil {
log.Errorf("parse `leveldb.num-files-at-level0` result to int failed, str: %s err: %+v", nStr, err)
return
}
if len(stats.LevelTablesCounts) > 0 && stats.LevelTablesCounts[0] >= l0Trigger {
log.Info("wait some time to gc cause too many L0 file", stats.LevelTablesCounts[0])

if l0Num >= l0Trigger {
log.Infof("wait some time to gc cause too many L0 file, files: %d", l0Num)
time.Sleep(5 * time.Second)
continue
}
Expand All @@ -564,8 +572,13 @@ func (a *Append) doGCTS(ts int64) {
iter := a.metadata.NewIterator(irange, nil)

deleteBatch := 0
var lastKey []byte

for iter.Next() && deleteBatch < 100 {
batch.Delete(iter.Key())
deleteNum++
lastKey = iter.Key()

if batch.Len() == 1024 {
err := a.metadata.Write(batch, nil)
if err != nil {
Expand All @@ -588,10 +601,16 @@ func (a *Append) doGCTS(ts int64) {
}
break
}

if len(lastKey) > 0 {
a.vlog.gcTS(decodeTSKey(lastKey))
}

log.Info("has delete", zap.Int("delete num", deleteNum))
}

a.vlog.gcTS(ts)
log.Info("finish gc ts: ", ts)
log.Infof("finish gc, ts: %d delete num: %d", ts, deleteNum)
}

// MaxCommitTS implement Storage.MaxCommitTS
Expand Down
3 changes: 3 additions & 0 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ func (vlog *valueLog) scan(start valuePointer, fn func(vp valuePointer, record *

// delete data <= gcTS
func (vlog *valueLog) gcTS(gcTS int64) {
log.Infof("gc vlog, ts: %d", gcTS)

vlog.filesLock.Lock()
var toDeleteFiles []*logFile

Expand Down Expand Up @@ -423,6 +425,7 @@ func (vlog *valueLog) gcTS(gcTS int64) {
if err != nil {
log.Errorf("remove file %s err: %v", logFile.path, err)
}
log.Infof("remove file, path: %s", logFile.path)
logFile.lock.Unlock()
}
}

0 comments on commit 4808e90

Please sign in to comment.