Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for 'watches' #298

Merged
merged 20 commits into from
Aug 22, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions command/agent/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/hashicorp/consul/consul/structs"
"log"
"os/exec"
"runtime"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -106,18 +105,13 @@ func (c *CheckMonitor) run() {

// check is invoked periodically to perform the script check
func (c *CheckMonitor) check() {
// Determine the shell invocation based on OS
var shell, flag string
if runtime.GOOS == "windows" {
shell = "cmd"
flag = "/C"
} else {
shell = "/bin/sh"
flag = "-c"
}

// Create the command
cmd := exec.Command(shell, flag, c.Script)
cmd, err := ExecScript(c.Script)
if err != nil {
c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", c.Script, err)
c.Notify.UpdateCheck(c.CheckID, structs.HealthUnknown, err.Error())
return
}

// Collect the output
output, _ := circbuf.NewBuffer(CheckBufSize)
Expand All @@ -140,7 +134,7 @@ func (c *CheckMonitor) check() {
time.Sleep(30 * time.Second)
errCh <- fmt.Errorf("Timed out running check '%s'", c.Script)
}()
err := <-errCh
err = <-errCh

// Get the output, add a message about truncation
outputStr := string(output.Bytes())
Expand Down
61 changes: 61 additions & 0 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
"github.com/mitchellh/cli"
Expand All @@ -37,6 +38,7 @@ type Command struct {
ShutdownCh <-chan struct{}
args []string
logFilter *logutils.LevelFilter
logOutput io.Writer
agent *Agent
rpcServer *AgentRPC
httpServer *HTTPServer
Expand Down Expand Up @@ -141,6 +143,25 @@ func (c *Command) readConfig() *Config {
return nil
}

// Compile all the watches
for _, params := range config.Watches {
// Parse the watches, excluding the handler
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err))
return nil
}

// Get the handler
if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err))
return nil
}

// Store the watch plan
config.WatchPlans = append(config.WatchPlans, wp)
}

// Warn if we are in expect mode
if config.BootstrapExpect == 1 {
c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.")
Expand Down Expand Up @@ -206,6 +227,7 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
}
c.logOutput = logOutput
return logGate, logWriter, logOutput
}

Expand Down Expand Up @@ -377,6 +399,23 @@ func (c *Command) Run(args []string) int {
}
}

// Get the new client listener addr
httpAddr, err := config.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}

// Register the watches
for _, wp := range config.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}

// Let the agent know we've finished registration
c.agent.StartSync()

Expand Down Expand Up @@ -518,6 +557,28 @@ func (c *Command) handleReload(config *Config) *Config {
}
}

// Get the new client listener addr
httpAddr, err := newConf.ClientListenerAddr(config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}

// Deregister the old watches
for _, wp := range config.WatchPlans {
wp.Stop()
}

// Register the new watches
for _, wp := range newConf.WatchPlans {
go func() {
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}()
}

return newConf
}

Expand Down
28 changes: 28 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/watch"
"github.com/mitchellh/mapstructure"
)

Expand Down Expand Up @@ -229,6 +230,11 @@ type Config struct {
// this acts like deny.
ACLDownPolicy string `mapstructure:"acl_down_policy"`

// Watches are used to monitor various endpoints and to invoke a
// handler to act appropriately. These are managed entirely in the
// agent layer using the standard APIs.
Watches []map[string]interface{} `mapstructure:"watches"`

// AEInterval controls the anti-entropy interval. This is how often
// the agent attempts to reconcile it's local state with the server'
// representation of our state. Defaults to every 60s.
Expand All @@ -251,6 +257,9 @@ type Config struct {

// VersionPrerelease is a label for pre-release builds
VersionPrerelease string `mapstructure:"-"`

// WatchPlans contains the compiled watches
WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"`
}

type dirEnts []os.FileInfo
Expand Down Expand Up @@ -302,6 +311,19 @@ func (c *Config) ClientListener(port int) (*net.TCPAddr, error) {
return &net.TCPAddr{IP: ip, Port: port}, nil
}

// ClientListenerAddr is used to format an address for a
// port on a ClientAddr, handling the zero IP.
func (c *Config) ClientListenerAddr(port int) (string, error) {
addr, err := c.ClientListener(port)
if err != nil {
return "", err
}
if addr.IP.IsUnspecified() {
addr.IP = net.ParseIP("127.0.0.1")
}
return addr.String(), nil
}

// DecodeConfig reads the configuration from the given reader in JSON
// format and decodes it into a proper Config structure.
func DecodeConfig(r io.Reader) (*Config, error) {
Expand Down Expand Up @@ -648,6 +670,12 @@ func MergeConfig(a, b *Config) *Config {
if b.ACLDefaultPolicy != "" {
result.ACLDefaultPolicy = b.ACLDefaultPolicy
}
if len(b.Watches) != 0 {
result.Watches = append(result.Watches, b.Watches...)
}
if len(b.WatchPlans) != 0 {
result.WatchPlans = append(result.WatchPlans, b.WatchPlans...)
}

// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
Expand Down
28 changes: 28 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,27 @@ func TestDecodeConfig(t *testing.T) {
if config.ACLDefaultPolicy != "deny" {
t.Fatalf("bad: %#v", config)
}

// Watches
input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}

if len(config.Watches) != 1 {
t.Fatalf("bad: %#v", config)
}

out := config.Watches[0]
exp := map[string]interface{}{
"type": "keyprefix",
"prefix": "foo/",
"handler": "foobar",
}
if !reflect.DeepEqual(out, exp) {
t.Fatalf("bad: %#v", config)
}
}

func TestDecodeConfig_Service(t *testing.T) {
Expand Down Expand Up @@ -538,6 +559,13 @@ func TestMergeConfig(t *testing.T) {
ACLTTLRaw: "15s",
ACLDownPolicy: "deny",
ACLDefaultPolicy: "deny",
Watches: []map[string]interface{}{
map[string]interface{}{
"type": "keyprefix",
"prefix": "foo/",
"handler": "foobar",
},
},
}

c := MergeConfig(a, b)
Expand Down
2 changes: 1 addition & 1 deletion command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ PARSE:
// _name._tag.service.consul
d.serviceLookup(network, datacenter, labels[n-3][1:], tag, req, resp)

// Consul 0.3 and prior format for SRV queries
// Consul 0.3 and prior format for SRV queries
} else {

// Support "." in the label, re-join all the parts
Expand Down
16 changes: 16 additions & 0 deletions command/agent/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package agent
import (
"math"
"math/rand"
"os/exec"
"runtime"
"time"
)

Expand Down Expand Up @@ -39,3 +41,17 @@ func strContains(l []string, s string) bool {
}
return false
}

// ExecScript returns a command to execute a script
func ExecScript(script string) (*exec.Cmd, error) {
var shell, flag string
if runtime.GOOS == "windows" {
shell = "cmd"
flag = "/C"
} else {
shell = "/bin/sh"
flag = "-c"
}
cmd := exec.Command(shell, flag, script)
return cmd, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never returns error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not currently, but it forces code upstream to check for a possible error

}
80 changes: 80 additions & 0 deletions command/agent/watch_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package agent

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"os"
"strconv"

"github.com/armon/circbuf"
"github.com/hashicorp/consul/watch"
)

const (
// Limit the size of a watch handlers's output to the
// last WatchBufSize. Prevents an enormous buffer
// from being captured
WatchBufSize = 4 * 1024 // 4KB
)

// verifyWatchHandler does the pre-check for our handler configuration
func verifyWatchHandler(params interface{}) error {
if params == nil {
return fmt.Errorf("Must provide watch handler")
}
_, ok := params.(string)
if !ok {
return fmt.Errorf("Watch handler must be a string")
}
return nil
}

// makeWatchHandler returns a handler for the given watch
func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc {
script := params.(string)
logger := log.New(logOutput, "", log.LstdFlags)
fn := func(idx uint64, data interface{}) {
// Create the command
cmd, err := ExecScript(script)
if err != nil {
logger.Printf("[ERR] agent: Failed to setup watch: %v", err)
return
}
cmd.Env = append(os.Environ(),
"CONSUL_INDEX="+strconv.FormatUint(idx, 10),
)

// Collect the output
output, _ := circbuf.NewBuffer(WatchBufSize)
cmd.Stdout = output
cmd.Stderr = output

// Setup the input
var inp bytes.Buffer
enc := json.NewEncoder(&inp)
if err := enc.Encode(data); err != nil {
logger.Printf("[ERR] agent: Failed to encode data for watch '%s': %v", script, err)
return
}
cmd.Stdin = &inp

// Run the handler
if err := cmd.Run(); err != nil {
logger.Printf("[ERR] agent: Failed to invoke watch handler '%s': %v", script, err)
}

// Get the output, add a message about truncation
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
}

// Log the output
logger.Printf("[DEBUG] agent: watch handler '%s' output: %s", script, outputStr)
}
return fn
}
Loading