Skip to content

Commit

Permalink
Merge branch 'master' into limit-store
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored May 22, 2019
2 parents eb0d433 + 1268d8b commit ec079d7
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 49 deletions.
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# PD
# PD

[![TravisCI Build Status](https://travis-ci.org/pingcap/pd.svg?branch=master)](https://travis-ci.org/pingcap/pd)
![GitHub release](https://img.shields.io/github/release/pingcap/pd.svg)
[![CircleCI Build Status](https://circleci.com/gh/pingcap/pd.svg?style=shield)](https://circleci.com/gh/pingcap/pd)
[![Go Report Card](https://goreportcard.com/badge/github.com/pingcap/pd)](https://goreportcard.com/report/github.com/pingcap/pd)
[![codecov](https://codecov.io/gh/pingcap/pd/branch/master/graph/badge.svg)](https://codecov.io/gh/pingcap/pd)

PD is the abbreviation for Placement Driver. It is used to manage and schedule the [TiKV](https://github.com/tikv/tikv) cluster.
PD is the abbreviation for Placement Driver. It is used to manage and schedule the [TiKV](https://github.com/tikv/tikv) cluster.

PD supports distribution and fault-tolerance by embedding [etcd](https://github.com/etcd-io/etcd).
PD supports distribution and fault-tolerance by embedding [etcd](https://github.com/etcd-io/etcd).

## Build

Expand All @@ -19,15 +19,15 @@ PD supports distribution and fault-tolerance by embedding [etcd](https://github.

### Command flags

See [configuration](https://github.com/pingcap/docs/blob/master/op-guide/configuration.md#placement-driver-pd).
See [PD Configuration Flags](https://pingcap.com/docs/dev/reference/configuration/pd-server/configuration/#pd-configuration-flags).

### Single Node with default ports

You can run `pd-server` directly on your local machine, if you want to connect to PD from outside,
You can run `pd-server` directly on your local machine, if you want to connect to PD from outside,
you can let PD listen on the host IP.

```bash
# Set correct HostIP here.
# Set correct HostIP here.
export HostIP="192.168.199.105"

pd-server --name="pd" \
Expand Down Expand Up @@ -60,9 +60,9 @@ X-Etcd-Cluster-Id: 33dc747581249309
{
"clientURLs": [
"http://192.168.199.105:2379"
],
"id": "f62e88a6e81c149",
"name": "pd",
],
"id": "f62e88a6e81c149",
"name": "pd",
"peerURLs": [
"http://192.168.199.105:2380"
]
Expand All @@ -75,20 +75,20 @@ X-Etcd-Cluster-Id: 33dc747581249309

You can use the following command to build a PD image directly:

```
```bash
docker build -t pingcap/pd .
```

Or you can also use following command to get PD from Docker hub:

```
```bash
docker pull pingcap/pd
```

Run a single node with Docker:
Run a single node with Docker:

```bash
# Set correct HostIP here.
# Set correct HostIP here.
export HostIP="192.168.199.105"

docker run -d -p 2379:2379 -p 2380:2380 --name pd pingcap/pd \
Expand All @@ -103,9 +103,9 @@ docker run -d -p 2379:2379 -p 2380:2380 --name pd pingcap/pd \

### Cluster

PD is a component in TiDB project, you must run it with TiDB and TiKV together, see
[TiDB-Ansible](https://github.com/pingcap/docs/blob/master/op-guide/ansible-deployment.md) to learn
how to set up the cluster and run them.
PD is a component in TiDB project, you must run it with TiDB and TiKV together, see
[TiDB-Ansible](https://pingcap.com/docs/dev/how-to/deploy/orchestrated/ansible/#deploy-tidb-using-ansible)
to learn how to set up the cluster and run them.

You can also use [Docker](https://github.com/pingcap/docs/blob/master/op-guide/docker-deployment.md) to
run the cluster.
You can also use [Docker](https://pingcap.com/docs/dev/how-to/deploy/orchestrated/docker/#deploy-tidb-using-docker)
to run the cluster.
16 changes: 16 additions & 0 deletions build.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# For `--version`
$PD_PKG = "github.com/pingcap/pd"
$GO_LDFLAGS = "-X `"$PD_PKG/server.PDReleaseVersion=$(git describe --tags --dirty)`""
$GO_LDFLAGS += " -X `"$PD_PKG/server.PDBuildTS=$(date -u '+%Y-%m-%d_%I:%M:%S')`""
$GO_LDFLAGS += " -X `"$PD_PKG/server.PDGitHash=$(git rev-parse HEAD)`""
$GO_LDFLAGS += " -X `"$PD_PKG/server.PDGitBranch=$(git rev-parse --abbrev-ref HEAD)`""

# Output binaries
go build -ldflags $GO_LDFLAGS -o bin/pd-server.exe cmd/pd-server/main.go
echo "bin/pd-server.exe"
go build -ldflags $GO_LDFLAGS -o bin/pd-ctl.exe tools/pd-ctl/main.go
echo "bin/pd-ctl.exe"
go build -o bin/pd-tso-bench.exe tools/pd-tso-bench/main.go
echo "bin/pd-tso-bench.exe"
go build -o bin/pd-recover.exe tools/pd-recover/main.go
echo "bin/pd-recover.exe"
2 changes: 1 addition & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ func (c *RaftCluster) checkOperators() {
log.Info("operator timeout",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
opController.RemoveOperator(op)
opController.RemoveTimeoutOperator(op)
}
}
}
Expand Down
90 changes: 90 additions & 0 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
"github.com/pkg/errors"
"google.golang.org/grpc"
)

Expand All @@ -44,6 +45,14 @@ type testClusterSuite struct {
baseCluster
}

type testErrorKV struct {
core.KVBase
}

func (kv *testErrorKV) Save(key, value string) error {
return errors.New("save failed")
}

func mustNewGrpcClient(c *C, addr string) pdpb.PDClient {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())

Expand Down Expand Up @@ -597,3 +606,84 @@ func (s *testGetStoresSuite) BenchmarkGetStores(c *C) {
s.cluster.core.Stores.GetStores()
}
}

func (s *testClusterSuite) TestSetScheduleOpt(c *C) {
var err error
var cleanup func()
_, s.svr, cleanup, err = NewTestServer(c)
c.Assert(err, IsNil)
mustWaitLeader(c, []*Server{s.svr})
s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr())
defer cleanup()
clusterID := s.svr.clusterID

storeAddr := "127.0.0.1:0"
_, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, clusterID, storeAddr))
c.Assert(err, IsNil)

_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)

scheduleCfg := opt.load()
replicateCfg := s.svr.GetReplicationConfig()
pdServerCfg := s.svr.scheduleOpt.loadPDServerConfig()

//PUT GET DELETE successed
replicateCfg.MaxReplicas = 5
scheduleCfg.MaxSnapshotCount = 10
pdServerCfg.UseRegionStorage = true
typ, labelKey, labelValue := "testTyp", "testKey", "testValue"
nsConfig := NamespaceConfig{LeaderScheduleLimit: uint64(200)}

c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), IsNil)
c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), IsNil)
c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), IsNil)
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil)

c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5))
c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10))
c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true)
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey")
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue")
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))

c.Assert(s.svr.DeleteNamespaceConfig("testNS"), IsNil)
c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), IsNil)

c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0))
c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0)

//PUT GET failed
oldKV := s.svr.kv
s.svr.kv = core.NewKV(&testErrorKV{})
replicateCfg.MaxReplicas = 7
scheduleCfg.MaxSnapshotCount = 20
pdServerCfg.UseRegionStorage = false

c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), NotNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), NotNil)
c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), NotNil)
c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), NotNil)
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), NotNil)

c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5))
c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10))
c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true)
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0))
c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0)

//DELETE failed
s.svr.kv = oldKV
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil)

s.svr.kv = core.NewKV(&testErrorKV{})
c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil)
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))
c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil)

c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey")
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue")
}
61 changes: 43 additions & 18 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"reflect"
"sync"
"sync/atomic"
"time"

Expand All @@ -28,7 +29,7 @@ import (
type scheduleOption struct {
v atomic.Value
rep *Replication
ns map[string]*namespaceOption
ns sync.Map // concurrent map[string]*namespaceOption
labelProperty atomic.Value
clusterVersion atomic.Value
pdServerConfig atomic.Value
Expand All @@ -37,10 +38,10 @@ type scheduleOption struct {
func newScheduleOption(cfg *Config) *scheduleOption {
o := &scheduleOption{}
o.store(&cfg.Schedule)
o.ns = make(map[string]*namespaceOption)
o.ns = sync.Map{}
for name, nsCfg := range cfg.Namespace {
nsCfg := nsCfg
o.ns[name] = newNamespaceOption(&nsCfg)
o.ns.Store(name, newNamespaceOption(&nsCfg))
}
o.rep = newReplication(&cfg.Replication)
o.pdServerConfig.Store(&cfg.PDServerCfg)
Expand All @@ -61,8 +62,36 @@ func (o *scheduleOption) GetReplication() *Replication {
return o.rep
}

func (o *scheduleOption) getNS(name string) (*namespaceOption, bool) {
if n, ok := o.ns.Load(name); ok {
if n, ok := n.(*namespaceOption); ok {
return n, true
}
}
return nil, false
}

func (o *scheduleOption) loadNSConfig() map[string]NamespaceConfig {
namespaces := make(map[string]NamespaceConfig)
f := func(k, v interface{}) bool {
var kstr string
var ok bool
if kstr, ok = k.(string); !ok {
return false
}
if ns, ok := v.(*namespaceOption); ok {
namespaces[kstr] = *ns.load()
return true
}
return false
}
o.ns.Range(f)

return namespaces
}

func (o *scheduleOption) GetMaxReplicas(name string) int {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetMaxReplicas()
}
return o.rep.GetMaxReplicas()
Expand Down Expand Up @@ -105,35 +134,35 @@ func (o *scheduleOption) GetMaxStoreDownTime() time.Duration {
}

func (o *scheduleOption) GetLeaderScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetLeaderScheduleLimit()
}
return o.load().LeaderScheduleLimit
}

func (o *scheduleOption) GetRegionScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetRegionScheduleLimit()
}
return o.load().RegionScheduleLimit
}

func (o *scheduleOption) GetReplicaScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetReplicaScheduleLimit()
}
return o.load().ReplicaScheduleLimit
}

func (o *scheduleOption) GetMergeScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetMergeScheduleLimit()
}
return o.load().MergeScheduleLimit
}

func (o *scheduleOption) GetHotRegionScheduleLimit(name string) uint64 {
if n, ok := o.ns[name]; ok {
if n, ok := o.getNS(name); ok {
return n.GetHotRegionScheduleLimit()
}
return o.load().HotRegionScheduleLimit
Expand Down Expand Up @@ -276,10 +305,8 @@ func (o *scheduleOption) loadPDServerConfig() *PDServerConfig {
}

func (o *scheduleOption) persist(kv *core.KV) error {
namespaces := make(map[string]NamespaceConfig)
for name, ns := range o.ns {
namespaces[name] = *ns.load()
}
namespaces := o.loadNSConfig()

cfg := &Config{
Schedule: *o.load(),
Replication: *o.rep.load(),
Expand All @@ -293,10 +320,8 @@ func (o *scheduleOption) persist(kv *core.KV) error {
}

func (o *scheduleOption) reload(kv *core.KV) error {
namespaces := make(map[string]NamespaceConfig)
for name, ns := range o.ns {
namespaces[name] = *ns.load()
}
namespaces := o.loadNSConfig()

cfg := &Config{
Schedule: *o.load().clone(),
Replication: *o.rep.load(),
Expand All @@ -315,7 +340,7 @@ func (o *scheduleOption) reload(kv *core.KV) error {
o.rep.store(&cfg.Replication)
for name, nsCfg := range cfg.Namespace {
nsCfg := nsCfg
o.ns[name] = newNamespaceOption(&nsCfg)
o.ns.Store(name, newNamespaceOption(&nsCfg))
}
o.labelProperty.Store(cfg.LabelProperty)
o.clusterVersion.Store(cfg.ClusterVersion)
Expand Down
1 change: 0 additions & 1 deletion server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ func (o *Operator) IsTimeout() bool {
timeout = time.Since(o.createTime) > LeaderOperatorWaitTime
}
if timeout {
operatorCounter.WithLabelValues(o.Desc(), "timeout").Inc()
return true
}
return false
Expand Down
Loading

0 comments on commit ec079d7

Please sign in to comment.