Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Supporting Consul auto DC KV distribution #819

Merged
merged 14 commits into from
Mar 24, 2019
2 changes: 2 additions & 0 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ type Configuration struct {
DiscoveryIgnoreReplicaHostnameFilters []string // Regexp filters to apply to prevent auto-discovering new replicas. Usage: unreachable servers due to firewalls, applications which trigger binlog dumps
ConsulAddress string // Address where Consul HTTP api is found. Example: 127.0.0.1:8500
ConsulAclToken string // ACL token used to write to Consul KV
ConsulCrossDataCenterDistribution bool // should orchestrator automatically auto-deduce all consul DCs and write KVs in all DCs
ZkAddress string // UNSUPPERTED YET. Address where (single or multiple) ZooKeeper servers are found, in `srv1[:port1][,srv2[:port2]...]` format. Default port is 2181. Example: srv-a,srv-b:12181,srv-c
KVClusterMasterPrefix string // Prefix to use for clusters' masters entries in KV stores (internal, consul, ZK), default: "mysql/master"
WebMessage string // If provided, will be shown on all web pages below the title bar
Expand Down Expand Up @@ -417,6 +418,7 @@ func newConfiguration() *Configuration {
DiscoveryIgnoreReplicaHostnameFilters: []string{},
ConsulAddress: "",
ConsulAclToken: "",
ConsulCrossDataCenterDistribution: false,
ZkAddress: "",
KVClusterMasterPrefix: "mysql/master",
WebMessage: "",
Expand Down
22 changes: 22 additions & 0 deletions go/kv/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,25 @@ func (this *consulStore) AddKeyValue(key string, value string) (added bool, err
err = this.PutKeyValue(key, value)
return (err != nil), err
}

func (this *consulStore) DistributePairs(kvPairs [](*KVPair)) (err error) {
if config.Config.ConsulCrossDataCenterDistribution {
datacenters, err := this.client.Catalog().Datacenters()
if err != nil {
return err
}
consulPairs := [](*consulapi.KVPair){}
for _, kvPair := range kvPairs {
consulPairs = append(consulPairs, &consulapi.KVPair{Key: kvPair.Key, Value: []byte(kvPair.Value)})
}
for _, datacenter := range datacenters {
writeOptions := &consulapi.WriteOptions{Datacenter: datacenter}
for _, consulPair := range consulPairs {
if _, e := this.client.KV().Put(consulPair, writeOptions); e != nil {
err = e
}
}
}
}
return err
}
4 changes: 4 additions & 0 deletions go/kv/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ func (this *internalKVStore) AddKeyValue(key string, value string) (added bool,
}
return (rowsAffected > 0), nil
}

func (this *internalKVStore) DistributePairs(pairs [](*KVPair)) (err error) {
return nil
}
11 changes: 10 additions & 1 deletion go/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func (this *KVPair) String() string {
type KVStore interface {
PutKeyValue(key string, value string) (err error)
GetKeyValue(key string) (value string, found bool, err error)

AddKeyValue(key string, value string) (added bool, err error)
DistributePairs(pairs [](*KVPair)) (err error)
}

var kvMutex sync.Mutex
Expand Down Expand Up @@ -111,3 +111,12 @@ func AddKVPair(kvPair *KVPair) (err error) {
}
return AddValue(kvPair.Key, kvPair.Value)
}

func DistributePairs(pairs [](*KVPair)) (err error) {
for _, store := range getKVStores() {
if err := store.DistributePairs(pairs); err != nil {
return err
}
}
return nil
}
4 changes: 4 additions & 0 deletions go/kv/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ func (this *zkStore) AddKeyValue(key string, value string) (added bool, err erro
err = this.PutKeyValue(key, value)
return (err != nil), err
}

func (this *zkStore) DistributePairs(pairs [](*KVPair)) (err error) {
return nil
}
20 changes: 11 additions & 9 deletions go/logic/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,16 +411,12 @@ func InjectPseudoGTIDOnWriters() error {
// stores are updated via failovers.
func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVPair), submittedCount int, err error) {
kvPairs, err = inst.GetMastersKVPairs(clusterName)
log.Debugf("kv.SubmitMastersToKvStores, clusterName: %s, force: %+v: numPairs: %+v", clusterName, force, len(kvPairs))
if err != nil {
return kvPairs, submittedCount, log.Errore(err)
}
command := "add-key-value"
applyFunc := kv.AddKVPair
if force {
command = "put-key-value"
applyFunc = kv.PutKVPair
}
var selectedError error
var submitKvPairs [](*kv.KVPair)
for _, kvPair := range kvPairs {
if !force {
// !force: Called periodically to auto-populate KV
Expand All @@ -429,23 +425,29 @@ func SubmitMastersToKvStores(clusterName string, force bool) (kvPairs [](*kv.KVP
// Let's not overload database with queries. Let's not overload raft with events.
continue
}
if v, found, err := kv.GetValue(kvPair.Key); err == nil && found && v == kvPair.Value {
v, found, err := kv.GetValue(kvPair.Key)
if err == nil && found && v == kvPair.Value {
// Already has the right value.
kvFoundCache.Set(kvPair.Key, true, cache.DefaultExpiration)
continue
}
}
submitKvPairs = append(submitKvPairs, kvPair)
}
log.Debugf("kv.SubmitMastersToKvStores: submitKvPairs: %+v", len(submitKvPairs))
for _, kvPair := range submitKvPairs {
if orcraft.IsRaftEnabled() {
_, err = orcraft.PublishCommand(command, kvPair)
_, err = orcraft.PublishCommand("put-key-value", kvPair)
} else {
err = applyFunc(kvPair)
err = kv.PutKVPair(kvPair)
}
if err == nil {
submittedCount++
} else {
selectedError = err
}
}
kv.DistributePairs(submitKvPairs)
return kvPairs, submittedCount, log.Errore(selectedError)
}

Expand Down
5 changes: 4 additions & 1 deletion go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,10 @@ func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidate
log.Errore(err)
}
}

{
err := kv.DistributePairs(kvPairs)
log.Errore(err)
}
if !skipProcesses {
// Execute post master-failover processes
executeProcesses(config.Config.PostMasterFailoverProcesses, "PostMasterFailoverProcesses", topologyRecovery, false)
Expand Down