Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] [query] Add readiness probe for probing current consistency level achievability #2976

Merged
merged 10 commits into from
Dec 4, 2020
10 changes: 10 additions & 0 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ setup_single_m3db_node
echo "Start Prometheus containers"
docker-compose -f ${COMPOSE_FILE} up -d prometheus01

function test_readiness {
# Check readiness probe eventually succeeds
echo "Check readiness probe eventually succeeds"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl --write-out "%{http_code}" --silent --output /dev/null 0.0.0.0:7201/ready) -eq "200" ]]'
}

function test_prometheus_remote_read {
# Ensure Prometheus can proxy a Prometheus query
echo "Wait until the remote write endpoint generates and allows for data to be queried"
Expand Down Expand Up @@ -384,6 +391,9 @@ function test_series {
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]'
}

echo "Running readiness test"
test_readiness

echo "Running prometheus tests"
test_prometheus_remote_read
test_prometheus_remote_write_multi_namespaces
Expand Down
90 changes: 90 additions & 0 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/dbnode/client/replicated_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ func (s replicatedSession) replicate(params replicatedParams) error {
return s.session.Write(params.namespace, params.id, params.t, params.value, params.unit, params.annotation)
}

func (s *replicatedSession) ReadClusterAvailability() (bool, error) {
return s.session.ReadClusterAvailability()
}

func (s *replicatedSession) WriteClusterAvailability() (bool, error) {
return s.session.WriteClusterAvailability()
}

// Write value to the database for an ID.
func (s replicatedSession) Write(namespace, id ident.ID, t time.Time, value float64, unit xtime.Unit, annotation []byte) error {
return s.replicate(replicatedParams{
Expand Down
80 changes: 79 additions & 1 deletion src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,79 @@ func (s *session) hostQueues(
return queues, replicas, majority, nil
}

func (s *session) WriteClusterAvailability() (bool, error) {
level := s.opts.WriteConsistencyLevel()
return s.clusterAvailability(level)
}

func (s *session) ReadClusterAvailability() (bool, error) {
var convertedConsistencyLevel topology.ConsistencyLevel
level := s.opts.ReadConsistencyLevel()
switch level {
case topology.ReadConsistencyLevelNone:
// Already ready.
return true, nil
case topology.ReadConsistencyLevelOne:
convertedConsistencyLevel = topology.ConsistencyLevelOne
case topology.ReadConsistencyLevelUnstrictMajority:
convertedConsistencyLevel = topology.ConsistencyLevelOne
case topology.ReadConsistencyLevelMajority:
convertedConsistencyLevel = topology.ConsistencyLevelMajority
case topology.ReadConsistencyLevelUnstrictAll:
convertedConsistencyLevel = topology.ConsistencyLevelOne
case topology.ReadConsistencyLevelAll:
convertedConsistencyLevel = topology.ConsistencyLevelAll
default:
return false, fmt.Errorf("unknown consistency level: %d", level)
}
return s.clusterAvailability(convertedConsistencyLevel)
}

func (s *session) clusterAvailability(
level topology.ConsistencyLevel,
) (bool, error) {
s.state.RLock()
queues := s.state.queues
topoMap, err := s.topologyMapWithStateRLock()
s.state.RUnlock()
if err != nil {
return false, err
}

shards := topoMap.ShardSet().AllIDs()
minConnectionCount := s.opts.MinConnectionCount()
replicas := topoMap.Replicas()
majority := topoMap.MajorityReplicas()

for _, shardID := range shards {
shardReplicasAvailable := 0
routeErr := topoMap.RouteShardForEach(shardID, func(idx int, _ shard.Shard, _ topology.Host) {
if queues[idx].ConnectionCount() >= minConnectionCount {
shardReplicasAvailable++
}
})
if routeErr != nil {
return false, routeErr
}
var clusterAvailableForShard bool
switch level {
case topology.ConsistencyLevelAll:
clusterAvailableForShard = shardReplicasAvailable == replicas
case topology.ConsistencyLevelMajority:
clusterAvailableForShard = shardReplicasAvailable >= majority
case topology.ConsistencyLevelOne:
clusterAvailableForShard = shardReplicasAvailable > 0
default:
return false, fmt.Errorf("unknown consistency level: %d", level)
}
Comment on lines +822 to +831
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I would extract this switch out of the loop and compute minReplicasRequired value in it (which would be topoMap.Replicas(), topoMap.MajorityReplicas(), or 1, respectively).
(not for the sake of performance, but for brevity and reduced nesting)

if !clusterAvailableForShard {
return false, nil
}
Comment on lines +832 to +834
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and then, this would simply be:

Suggested change
if !clusterAvailableForShard {
return false, nil
}
if shardReplicasAvailable < minReplicasRequired {
return false, nil
}

}

return true, nil
}

func (s *session) setTopologyWithLock(topoMap topology.Map, queues []hostQueue, replicas, majority int) {
prevQueues := s.state.queues

Expand Down Expand Up @@ -1879,9 +1952,14 @@ func (s *session) Replicas() int {

func (s *session) TopologyMap() (topology.Map, error) {
s.state.RLock()
topoMap, err := s.topologyMapWithStateRLock()
s.state.RUnlock()
return topoMap, err
}

func (s *session) topologyMapWithStateRLock() (topology.Map, error) {
status := s.state.status
topoMap := s.state.topoMap
s.state.RUnlock()

// Make sure the session is open, as thats what sets the initial topology.
if status != statusOpen {
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ type Client interface {

// Session can write and read to a cluster.
type Session interface {
// WriteClusterAvailability returns whether cluster is available for writes.
WriteClusterAvailability() (bool, error)

// ReadClusterAvailability returns whether cluster is available for reads.
ReadClusterAvailability() (bool, error)

// Write value to the database for an ID.
Write(namespace, id ident.ID, t time.Time, value float64, unit xtime.Unit, annotation []byte) error

Expand Down
Loading