Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add placement APIs for M3Coordinator #1055

Merged
merged 7 commits into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions scripts/development/m3_stack/start_m3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ echo "Validating namespace"
[ "$(curl -sSf localhost:7201/api/v1/namespace | jq .registry.namespaces.prometheus_metrics.indexOptions.enabled)" == true ]
echo "Done validating namespace"

echo "Initializing topology"
echo "Initializing M3DB topology"
curl -vvvsSf -X POST localhost:7201/api/v1/placement/init -d '{
"num_shards": 64,
"replication_factor": 3,
Expand Down Expand Up @@ -70,12 +70,30 @@ curl -vvvsSf -X POST localhost:7201/api/v1/placement/init -d '{
}
]
}'
echo "Done initializing topology"
echo "Done initializing M3DB topology"

echo "Validating topology"
[ "$(curl -sSf localhost:7201/api/v1/placement | jq .placement.instances.m3db_seed.id)" == '"m3db_seed"' ]
echo "Done validating topology"

echo "Initializing M3Coordinator topology"
curl -vvvsSf -X POST localhost:7201/api/v1/services/m3coordinator/placement/init -d '{
"instances": [
{
"id": "coordinator01",
"zone": "embedded",
"endpoint": "coordinator01:7507",
"hostname": "coordinator01",
"port": 7507
}
]
}'
echo "Done initializing M3Coordinator topology"

echo "Validating M3Coordinator topology"
[ "$(curl -sSf localhost:7201/api/v1/services/m3coordinator/placement | jq .placement.instances.coordinator01.id)" == '"coordinator01"' ]
echo "Done validating topology"

echo "Prometheus available at localhost:9090"
echo "Grafana available at localhost:3000"
echo "Run ./stop.sh to shutdown nodes when done"
4 changes: 4 additions & 0 deletions src/query/api/v1/handler/placement/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ var (
// M3AggAddURL is the url for the placement add handler (with the POST method)
// for the M3Agg service.
M3AggAddURL = path.Join(handler.RoutePrefixV1, M3AggServicePlacementPathName)

// M3CoordinatorAddURL is the url for the placement add handler (with the POST method)
// for the M3Coordinator service.
M3CoordinatorAddURL = path.Join(handler.RoutePrefixV1, M3CoordinatorServicePlacementPathName)
)

// AddHandler is the handler for placement adds.
Expand Down
22 changes: 17 additions & 5 deletions src/query/api/v1/handler/placement/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) {
w = httptest.NewRecorder()
req *http.Request
)
if serviceName == M3AggregatorServiceName {
switch serviceName {
case M3AggregatorServiceName:
req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`))
} else {
default:
req = httptest.NewRequest(AddHTTPMethod, M3DBAddURL, strings.NewReader(`{"instances":[{"id": "host1","isolation_group": "rack1","zone": "test","weight": 1,"endpoint": "http://host1:1234","hostname": "host1","port": 1234}]}`))
}
require.NotNil(t, req)
Expand All @@ -166,7 +167,15 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) {
SetIsSharded(true)
)

if serviceName == M3AggregatorServiceName {
switch serviceName {
case M3CoordinatorServiceName:
existingPlacement = existingPlacement.
SetIsSharded(false).
SetReplicaFactor(1)
newPlacement = existingPlacement.
SetIsSharded(false).
SetReplicaFactor(1)
case M3AggregatorServiceName:
existingPlacement = existingPlacement.
SetIsMirrored(true).
SetReplicaFactor(1)
Expand Down Expand Up @@ -200,9 +209,12 @@ func TestPlacementAddHandler_SafeOK(t *testing.T) {
resp = w.Result()
body, _ = ioutil.ReadAll(resp.Body)

if serviceName == M3AggregatorServiceName {
switch serviceName {
case M3CoordinatorServiceName:
require.Equal(t, `{"placement":{"instances":{"host1":{"id":"host1","isolationGroup":"rack1","zone":"test","weight":1,"endpoint":"http://host1:1234","shards":[],"shardSetId":0,"hostname":"host1","port":1234}},"replicaFactor":1,"numShards":0,"isSharded":false,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":1}`, string(body))
case M3AggregatorServiceName:
require.Equal(t, `{"placement":{"instances":{},"replicaFactor":1,"numShards":0,"isSharded":true,"cutoverTime":"0","isMirrored":true,"maxShardSetId":0},"version":1}`, string(body))
} else {
default:
require.Equal(t, `{"placement":{"instances":{},"replicaFactor":0,"numShards":0,"isSharded":true,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":1}`, string(body))
}
require.Equal(t, http.StatusOK, resp.StatusCode)
Expand Down
45 changes: 34 additions & 11 deletions src/query/api/v1/handler/placement/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
M3DBServiceName = "m3db"
// M3AggregatorServiceName is the service name for M3Aggregator.
M3AggregatorServiceName = "m3aggregator"
// M3CoordinatorServiceName is the service name for M3Coordinator.
M3CoordinatorServiceName = "m3coordinator"
// ServicesPathName is the services part of the API path.
ServicesPathName = "services"
// PlacementPathName is the placement part of the API path.
Expand All @@ -75,6 +77,8 @@ const (

defaultM3AggMaxAggregationWindowSize = 5 * time.Minute
defaultM3AggWarmupDuration = time.Minute

m3AggregatorPlacementNamespace = "/placement"
)

var (
Expand All @@ -85,14 +89,17 @@ var (
errM3AggServiceOptionsRequired = errors.New("m3agg service options are required")

allowedServices = allowedServicesSet{
M3DBServiceName: true,
M3AggregatorServiceName: true,
M3DBServiceName: true,
M3AggregatorServiceName: true,
M3CoordinatorServiceName: true,
}

// M3DBServicePlacementPathName is the M3DB service placement API path.
M3DBServicePlacementPathName = path.Join(ServicesPathName, M3DBServiceName, PlacementPathName)
// M3AggServicePlacementPathName is the M3Agg service placement API path.
M3AggServicePlacementPathName = path.Join(ServicesPathName, M3AggregatorServiceName, PlacementPathName)
// M3CoordinatorServicePlacementPathName is the M3Coordinator service placement API path.
M3CoordinatorServicePlacementPathName = path.Join(ServicesPathName, M3CoordinatorServiceName, PlacementPathName)
)

// HandlerOptions is the options struct for the handler.
Expand Down Expand Up @@ -204,7 +211,18 @@ func ServiceWithAlgo(
opts ServiceOptions,
now time.Time,
) (placement.Service, placement.Algorithm, error) {
cs, err := clusterClient.Services(services.NewOverrideOptions())

overrides := services.NewOverrideOptions()
switch opts.ServiceName {
case M3AggregatorServiceName:
overrides = overrides.
SetNamespaceOptions(
overrides.NamespaceOptions().
SetPlacementNamespace(m3AggregatorPlacementNamespace),
)
}

cs, err := clusterClient.Services(overrides)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -237,7 +255,11 @@ func ServiceWithAlgo(
SetIsSharded(true).
SetDryrun(opts.DryRun)

if opts.ServiceName == M3AggregatorServiceName {
switch opts.ServiceName {
case M3CoordinatorServiceName:
pOpts = pOpts.
SetIsSharded(false)
case M3AggregatorServiceName:
var (
maxAggregationWindowSize = opts.M3Agg.MaxAggregationWindowSize
warmupDuration = opts.M3Agg.WarmupDuration
Expand Down Expand Up @@ -314,6 +336,7 @@ func RegisterRoutes(r *mux.Router, opts HandlerOptions) {
r.HandleFunc(DeprecatedM3DBInitURL, deprecatedInitFn).Methods(InitHTTPMethod)
r.HandleFunc(M3DBInitURL, initFn).Methods(InitHTTPMethod)
r.HandleFunc(M3AggInitURL, initFn).Methods(InitHTTPMethod)
r.HandleFunc(M3CoordinatorInitURL, initFn).Methods(InitHTTPMethod)

// Get
var (
Expand All @@ -324,6 +347,7 @@ func RegisterRoutes(r *mux.Router, opts HandlerOptions) {
r.HandleFunc(DeprecatedM3DBGetURL, deprecatedGetFn).Methods(GetHTTPMethod)
r.HandleFunc(M3DBGetURL, getFn).Methods(GetHTTPMethod)
r.HandleFunc(M3AggGetURL, getFn).Methods(GetHTTPMethod)
r.HandleFunc(M3CoordinatorGetURL, getFn).Methods(GetHTTPMethod)

// Delete all
var (
Expand All @@ -334,6 +358,7 @@ func RegisterRoutes(r *mux.Router, opts HandlerOptions) {
r.HandleFunc(DeprecatedM3DBDeleteAllURL, deprecatedDeleteAllFn).Methods(DeleteAllHTTPMethod)
r.HandleFunc(M3DBDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod)
r.HandleFunc(M3AggDeleteAllURL, deleteAllFn).Methods(DeleteAllHTTPMethod)
r.HandleFunc(M3CoordinatorDeleteAllURL, getFn).Methods(GetHTTPMethod)

// Add
var (
Expand All @@ -344,6 +369,7 @@ func RegisterRoutes(r *mux.Router, opts HandlerOptions) {
r.HandleFunc(DeprecatedM3DBAddURL, deprecatedAddFn).Methods(AddHTTPMethod)
r.HandleFunc(M3DBAddURL, addFn).Methods(AddHTTPMethod)
r.HandleFunc(M3AggAddURL, addFn).Methods(AddHTTPMethod)
r.HandleFunc(M3CoordinatorAddURL, getFn).Methods(GetHTTPMethod)

// Delete
var (
Expand All @@ -354,6 +380,7 @@ func RegisterRoutes(r *mux.Router, opts HandlerOptions) {
r.HandleFunc(DeprecatedM3DBDeleteURL, deprecatedDeleteFn).Methods(DeleteHTTPMethod)
r.HandleFunc(M3DBDeleteURL, deleteFn).Methods(DeleteHTTPMethod)
r.HandleFunc(M3AggDeleteURL, deleteFn).Methods(DeleteHTTPMethod)
r.HandleFunc(M3CoordinatorDeleteURL, getFn).Methods(GetHTTPMethod)
}

func newPlacementCutoverNanosFn(
Expand Down Expand Up @@ -481,14 +508,10 @@ func parseServiceFromRequest(r *http.Request) (string, error) {
for i, c := range components {
if c == "services" && i+1 < len(components) {
service := components[i+1]
switch service {
case M3DBServiceName:
return M3DBServiceName, nil
case M3AggregatorServiceName:
return M3AggregatorServiceName, nil
default:
return "", fmt.Errorf("unknown service: %s", service)
if _, ok := allowedServices[service]; ok {
return service, nil
}
return "", fmt.Errorf("unknown service: %s", service)
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/query/api/v1/handler/placement/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ var (
// M3AggDeleteURL is the url for the placement delete handler for the M3Agg service.
M3AggDeleteURL = path.Join(handler.RoutePrefixV1, M3AggServicePlacementPathName, placementIDPath)

// M3CoordinatorDeleteURL is the url for the placement delete handler for the M3Coordinator service.
M3CoordinatorDeleteURL = path.Join(handler.RoutePrefixV1, M3CoordinatorServicePlacementPathName, placementIDPath)

errEmptyID = errors.New("must specify placement ID to delete")
)

Expand Down Expand Up @@ -96,6 +99,11 @@ func (h *DeleteHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *

toRemove := []string{id}

switch serviceName {
case M3CoordinatorServiceName:
// There are no unsafe placement changes because M3Coordinator is stateless
force = true
}
var newPlacement placement.Placement
if force {
newPlacement, err = service.RemoveInstances(toRemove)
Expand Down
4 changes: 4 additions & 0 deletions src/query/api/v1/handler/placement/delete_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ var (
// M3AggDeleteAllURL is the url for the handler to delete all placements (with the DELETE method)
// for the M3Agg service.
M3AggDeleteAllURL = path.Join(handler.RoutePrefixV1, M3AggServicePlacementPathName)

// M3CoordinatorDeleteAllURL is the url for the handler to delete all placements (with the DELETE method)
// for the M3Coordinator service.
M3CoordinatorDeleteAllURL = path.Join(handler.RoutePrefixV1, M3CoordinatorServicePlacementPathName)
)

// DeleteAllHandler is the handler to delete all placements.
Expand Down
69 changes: 51 additions & 18 deletions src/query/api/v1/handler/placement/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,16 @@ func TestPlacementDeleteHandler_Safe(t *testing.T) {
req = httptest.NewRequest(DeleteHTTPMethod, "/placement/host1", nil)
)
handler.nowFn = func() time.Time { return time.Unix(0, 0) }
if serviceName == M3AggregatorServiceName {

switch serviceName {
case M3CoordinatorServiceName:
basePlacement = basePlacement.
SetIsSharded(false).
SetReplicaFactor(1)
mockPlacementService.EXPECT().
RemoveInstances([]string{"host1"}).
Return(placement.NewPlacement(), nil)
case M3AggregatorServiceName:
basePlacement = basePlacement.
SetIsMirrored(true)
}
Expand All @@ -102,8 +111,13 @@ func TestPlacementDeleteHandler_Safe(t *testing.T) {
resp := w.Result()
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
require.True(t, strings.Contains(string(body), "instance host1 does not exist"))
switch serviceName {
case M3CoordinatorServiceName:
require.Equal(t, http.StatusOK, resp.StatusCode)
default:
require.True(t, strings.Contains(string(body), "instance host1 does not exist"))
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}

// Test remove host when placement unsafe
basePlacement = basePlacement.SetInstances([]placement.Instance{
Expand All @@ -115,18 +129,23 @@ func TestPlacementDeleteHandler_Safe(t *testing.T) {
})),
})

w = httptest.NewRecorder()
req = httptest.NewRequest(DeleteHTTPMethod, "/placement/host1", nil)
req = mux.SetURLVars(req, map[string]string{"id": "host1"})
require.NotNil(t, req)
mockPlacementService.EXPECT().Placement().Return(basePlacement, 0, nil)
handler.ServeHTTP(serviceName, w, req)

resp = w.Result()
body, err = ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
require.Equal(t, `{"error":"instances [host2] do not have all shards available"}`+"\n", string(body))
switch serviceName {
case M3CoordinatorServiceName:
// M3Coordinator placement changes are alway safe because it is stateless
default:
w = httptest.NewRecorder()
req = httptest.NewRequest(DeleteHTTPMethod, "/placement/host1", nil)
req = mux.SetURLVars(req, map[string]string{"id": "host1"})
require.NotNil(t, req)
mockPlacementService.EXPECT().Placement().Return(basePlacement, 0, nil)
handler.ServeHTTP(serviceName, w, req)

resp = w.Result()
body, err = ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
require.Equal(t, `{"error":"instances [host2] do not have all shards available"}`+"\n", string(body))
}

// Test OK
basePlacement = basePlacement.SetReplicaFactor(2).SetMaxShardSetID(2).SetInstances([]placement.Instance{
Expand All @@ -144,7 +163,18 @@ func TestPlacementDeleteHandler_Safe(t *testing.T) {
shard.NewShard(1).SetState(shard.Available),
})),
})
if serviceName == M3AggregatorServiceName {

switch serviceName {
case M3CoordinatorServiceName:
basePlacement.
SetIsSharded(false).
SetReplicaFactor(1).
SetShards(nil).
SetInstances([]placement.Instance{placement.NewInstance().SetID("host1")})
mockPlacementService.EXPECT().
RemoveInstances([]string{"host1"}).
Return(placement.NewPlacement(), nil)
case M3AggregatorServiceName:
// Need to be mirrored in M3Agg case
basePlacement.SetReplicaFactor(1).SetMaxShardSetID(2).SetInstances([]placement.Instance{
placement.NewInstance().SetID("host1").SetIsolationGroup("a").SetWeight(10).SetShardSetID(0).
Expand All @@ -169,9 +199,12 @@ func TestPlacementDeleteHandler_Safe(t *testing.T) {
resp = w.Result()
body, err = ioutil.ReadAll(resp.Body)
require.NoError(t, err)
if serviceName == M3AggregatorServiceName {
switch serviceName {
case M3CoordinatorServiceName:
require.Equal(t, `{"placement":{"instances":{},"replicaFactor":0,"numShards":0,"isSharded":false,"cutoverTime":"0","isMirrored":false,"maxShardSetId":0},"version":0}`, string(body))
case M3AggregatorServiceName:
require.Equal(t, "{\"placement\":{\"instances\":{\"host1\":{\"id\":\"host1\",\"isolationGroup\":\"a\",\"zone\":\"\",\"weight\":10,\"endpoint\":\"\",\"shards\":[{\"id\":0,\"state\":\"LEAVING\",\"sourceId\":\"\",\"cutoverNanos\":\"0\",\"cutoffNanos\":\"300000000000\"}],\"shardSetId\":0,\"hostname\":\"\",\"port\":0},\"host2\":{\"id\":\"host2\",\"isolationGroup\":\"b\",\"zone\":\"\",\"weight\":10,\"endpoint\":\"\",\"shards\":[{\"id\":0,\"state\":\"INITIALIZING\",\"sourceId\":\"host1\",\"cutoverNanos\":\"300000000000\",\"cutoffNanos\":\"0\"},{\"id\":1,\"state\":\"AVAILABLE\",\"sourceId\":\"\",\"cutoverNanos\":\"0\",\"cutoffNanos\":\"0\"}],\"shardSetId\":1,\"hostname\":\"\",\"port\":0}},\"replicaFactor\":1,\"numShards\":0,\"isSharded\":true,\"cutoverTime\":\"0\",\"isMirrored\":true,\"maxShardSetId\":2},\"version\":2}", string(body))
} else {
default:
require.Equal(t, "{\"placement\":{\"instances\":{\"host1\":{\"id\":\"host1\",\"isolationGroup\":\"a\",\"zone\":\"\",\"weight\":10,\"endpoint\":\"\",\"shards\":[{\"id\":0,\"state\":\"LEAVING\",\"sourceId\":\"\","+
"\"cutoverNanos\":\"0\",\"cutoffNanos\":\"0\"}],\"shardSetId\":0,\"hostname\":\"\",\"port\":0},\"host2\":{\"id\":\"host2\",\"isolationGroup\":\"b\",\"zone\":\"\",\"weight\":10,\"endpoint\":\"\",\"shards\":"+
"[{\"id\":0,\"state\":\"AVAILABLE\",\"sourceId\":\"\",\"cutoverNanos\":\"0\",\"cutoffNanos\":\"0\"},{\"id\":1,\"state\":\"AVAILABLE\",\"sourceId\":\"\",\"cutoverNanos\":\"0\",\"cutoffNanos\":\"0\"}],"+
Expand Down
4 changes: 4 additions & 0 deletions src/query/api/v1/handler/placement/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var (
// M3AggGetURL is the url for the placement get handler (with the GET method)
// for the M3Agg service.
M3AggGetURL = path.Join(handler.RoutePrefixV1, M3AggServicePlacementPathName)

// M3CoordinatorGetURL is the url for the placement get handler (with the GET method)
// for the M3Coordinator service.
M3CoordinatorGetURL = path.Join(handler.RoutePrefixV1, M3CoordinatorServicePlacementPathName)
)

// GetHandler is the handler for placement gets.
Expand Down
Loading