Skip to content

Commit

Permalink
Merge pull request #3636 from hashicorp/esm-changes
Browse files Browse the repository at this point in the history
Add coordinate update endpoint and http/tcp check fields
  • Loading branch information
kyhavlov authored Nov 1, 2017
2 parents c4375d5 + c9c083d commit df5e3fb
Show file tree
Hide file tree
Showing 18 changed files with 379 additions and 37 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ func (a *Agent) reapServicesInternal() {
}

// reapServices is a long running goroutine that looks for checks that have been
// critical too long and dregisters their associated services.
// critical too long and deregisters their associated services.
func (a *Agent) reapServices() {
for {
select {
Expand Down
4 changes: 3 additions & 1 deletion agent/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"github.com/hashicorp/consul/agent/structs"
)

var durations = NewDurationFixer("interval", "timeout", "deregistercriticalserviceafter")

func (s *HTTPServer) CatalogRegister(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
return nil, MethodNotAllowedError{req.Method, []string{"PUT"}}
}

var args structs.RegisterRequest
if err := decodeBody(req, &args, nil); err != nil {
if err := decodeBody(req, &args, durations.FixupDurations); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
Expand Down
7 changes: 5 additions & 2 deletions agent/consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now())

// Verify the args.
if args.Node == "" || args.Address == "" {
return fmt.Errorf("Must provide node and address")
if args.Node == "" {
return fmt.Errorf("Must provide node")
}
if args.Address == "" && !args.SkipNodeUpdate {
return fmt.Errorf("Must provide address if SkipNodeUpdate is not set")
}
if args.ID != "" {
if _, err := uuid.ParseUUID(string(args.ID)); err != nil {
Expand Down
39 changes: 39 additions & 0 deletions agent/consul/catalog_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,45 @@ func TestCatalog_RegisterService_InvalidAddress(t *testing.T) {
}
}

func TestCatalog_RegisterService_SkipNodeUpdate(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

// Register a node
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
var out struct{}
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err != nil {
t.Fatal(err)
}

// Update it with a blank address, should fail.
arg.Address = ""
arg.Service = &structs.NodeService{
Service: "db",
Port: 8000,
}
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err == nil || err.Error() != "Must provide address if SkipNodeUpdate is not set" {
t.Fatalf("got error %v want 'Must provide address...'", err)
}

// Set SkipNodeUpdate, should succeed
arg.SkipNodeUpdate = true
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err != nil {
t.Fatal(err)
}
}

func TestCatalog_Register_NodeID(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
Expand Down
22 changes: 22 additions & 0 deletions agent/coordinate_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,25 @@ func filterCoordinates(req *http.Request, in structs.Coordinates) structs.Coordi
}
return out
}

// CoordinateUpdate inserts or updates the LAN coordinate of a node.
func (s *HTTPServer) CoordinateUpdate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
return nil, MethodNotAllowedError{req.Method, []string{"PUT"}}
}

args := structs.CoordinateUpdateRequest{}
if err := decodeBody(req, &args, nil); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
}
s.parseDC(req, &args.Datacenter)

var reply struct{}
if err := s.agent.RPC("Coordinate.Update", &args, &reply); err != nil {
return nil, err
}

return nil, nil
}
46 changes: 46 additions & 0 deletions agent/coordinate_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,49 @@ func TestCoordinate_Node(t *testing.T) {
t.Fatalf("bad: %v", resp.Code)
}
}

func TestCoordinate_Update(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()

// Register the node.
reg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
var reply struct{}
if err := a.RPC("Catalog.Register", &reg, &reply); err != nil {
t.Fatalf("err: %s", err)
}

// Update the coordinates and wait for it to complete.
coord := coordinate.NewCoordinate(coordinate.DefaultConfig())
coord.Height = -5.0
body := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "foo",
Coord: coord,
}
req, _ := http.NewRequest("PUT", "/v1/coordinate/update", jsonReader(body))
resp := httptest.NewRecorder()
_, err := a.srv.CoordinateUpdate(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(300 * time.Millisecond)

// Query back and check the coordinates are present.
args := structs.NodeSpecificRequest{Node: "foo", Datacenter: "dc1"}
var coords structs.IndexedCoordinates
if err := a.RPC("Coordinate.Node", &args, &coords); err != nil {
t.Fatalf("err: %s", err)
}

coordinates := coords.Coordinates
if len(coordinates) != 1 ||
coordinates[0].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
}
2 changes: 2 additions & 0 deletions agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,12 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler {
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
handleFuncMetrics("/v1/coordinate/node/", s.wrap(s.CoordinateNode))
handleFuncMetrics("/v1/coordinate/update", s.wrap(s.CoordinateUpdate))
} else {
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
handleFuncMetrics("/v1/coordinate/node/", s.wrap(coordinateDisabled))
handleFuncMetrics("/v1/coordinate/update", s.wrap(coordinateDisabled))
}
handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))
handleFuncMetrics("/v1/event/list", s.wrap(s.EventList))
Expand Down
27 changes: 2 additions & 25 deletions agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -220,7 +219,8 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
s.parseToken(req, &args.Token)

var conf api.AutopilotConfiguration
if err := decodeBody(req, &conf, FixupConfigDurations); err != nil {
durations := NewDurationFixer("lastcontactthreshold", "serverstabilizationtime")
if err := decodeBody(req, &conf, durations.FixupDurations); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Error parsing autopilot config: %v", err)
return nil, nil
Expand Down Expand Up @@ -265,29 +265,6 @@ func (s *HTTPServer) OperatorAutopilotConfiguration(resp http.ResponseWriter, re
}
}

// FixupConfigDurations is used to handle parsing the duration fields in
// the Autopilot config struct
func FixupConfigDurations(raw interface{}) error {
rawMap, ok := raw.(map[string]interface{})
if !ok {
return nil
}
for key, val := range rawMap {
if strings.ToLower(key) == "lastcontactthreshold" ||
strings.ToLower(key) == "serverstabilizationtime" {
// Convert a string value into an integer
if vStr, ok := val.(string); ok {
dur, err := time.ParseDuration(vStr)
if err != nil {
return err
}
rawMap[key] = dur
}
}
}
return nil
}

// OperatorServerHealth is used to get the health of the servers in the local DC
func (s *HTTPServer) OperatorServerHealth(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
Expand Down
13 changes: 13 additions & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,22 @@ type HealthCheck struct {
ServiceName string // optional service name
ServiceTags []string // optional service tags

Definition HealthCheckDefinition

RaftIndex
}

type HealthCheckDefinition struct {
HTTP string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
Header map[string][]string `json:",omitempty"`
Method string `json:",omitempty"`
TCP string `json:",omitempty"`
Interval api.ReadableDuration `json:",omitempty"`
Timeout api.ReadableDuration `json:",omitempty"`
DeregisterCriticalServiceAfter api.ReadableDuration `json:",omitempty"`
}

// IsSame checks if one HealthCheck is the same as another, without looking
// at the Raft information (that's why we didn't call it IsEqual). This is
// useful for seeing if an update would be idempotent for all the functional
Expand Down
55 changes: 55 additions & 0 deletions agent/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"os/signal"
osuser "os/user"
"strconv"
"strings"
"time"

"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-msgpack/codec"
Expand Down Expand Up @@ -113,3 +115,56 @@ func ForwardSignals(cmd *exec.Cmd, logFn func(error), shutdownCh <-chan struct{}
}
}()
}

type durationFixer map[string]bool

func NewDurationFixer(fields ...string) durationFixer {
d := make(map[string]bool)
for _, field := range fields {
d[field] = true
}
return d
}

// FixupDurations is used to handle parsing any field names in the map to time.Durations
func (d durationFixer) FixupDurations(raw interface{}) error {
rawMap, ok := raw.(map[string]interface{})
if !ok {
return nil
}
for key, val := range rawMap {
switch val.(type) {
case map[string]interface{}:
if err := d.FixupDurations(val); err != nil {
return err
}

case []interface{}:
for _, v := range val.([]interface{}) {
if err := d.FixupDurations(v); err != nil {
return err
}
}

case []map[string]interface{}:
for _, v := range val.([]map[string]interface{}) {
if err := d.FixupDurations(v); err != nil {
return err
}
}

default:
if d[strings.ToLower(key)] {
// Convert a string value into an integer
if vStr, ok := val.(string); ok {
dur, err := time.ParseDuration(vStr)
if err != nil {
return err
}
rawMap[key] = dur
}
}
}
}
return nil
}
45 changes: 45 additions & 0 deletions agent/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"os"
"runtime"
"testing"
"time"

"github.com/hashicorp/consul/testutil"
"github.com/pascaldekloe/goe/verify"
)

func TestStringHash(t *testing.T) {
Expand Down Expand Up @@ -74,3 +76,46 @@ func TestSetFilePermissions(t *testing.T) {
t.Fatalf("bad: %s", fi.Mode())
}
}

func TestDurationFixer(t *testing.T) {
obj := map[string]interface{}{
"key1": []map[string]interface{}{
{
"subkey1": "10s",
},
{
"subkey2": "5d",
},
},
"key2": map[string]interface{}{
"subkey3": "30s",
"subkey4": "20m",
},
"key3": "11s",
"key4": "49h",
}
expected := map[string]interface{}{
"key1": []map[string]interface{}{
{
"subkey1": 10 * time.Second,
},
{
"subkey2": "5d",
},
},
"key2": map[string]interface{}{
"subkey3": "30s",
"subkey4": 20 * time.Minute,
},
"key3": "11s",
"key4": 49 * time.Hour,
}

fixer := NewDurationFixer("key4", "subkey1", "subkey4")
if err := fixer.FixupDurations(obj); err != nil {
t.Fatal(err)
}

// Ensure we only processed the intended fieldnames
verify.Values(t, "", obj, expected)
}
1 change: 1 addition & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type AgentCheck struct {
Output string
ServiceID string
ServiceName string
Definition HealthCheckDefinition
}

// AgentService represents a service known to the agent
Expand Down
1 change: 1 addition & 0 deletions api/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type CatalogRegistration struct {
Datacenter string
Service *AgentService
Check *AgentCheck
SkipNodeUpdate bool
}

type CatalogDeregistration struct {
Expand Down
Loading

0 comments on commit df5e3fb

Please sign in to comment.