Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCADA client is reload-able #1199

Merged
merged 8 commits into from
Aug 26, 2015
2 changes: 2 additions & 0 deletions command/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func nextConfig() *Config {
idx := int(atomic.AddUint64(&offset, 1))
conf := DefaultConfig()

conf.Version = "a.b"
conf.VersionPrerelease = "c.d"
conf.AdvertiseAddr = "127.0.0.1"
conf.Bootstrap = true
conf.Datacenter = "dc1"
Expand Down
68 changes: 53 additions & 15 deletions command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Command struct {
httpServers []*HTTPServer
dnsServer *DNSServer
scadaProvider *scada.Provider
scadaHttp *HTTPServer
}

// readConfig is responsible for setup of our configuration using
Expand Down Expand Up @@ -345,20 +346,14 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter)

// Enable the SCADA integration
var scadaList net.Listener
if config.AtlasInfrastructure != "" {
provider, list, err := NewProvider(config, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err))
return err
}
c.scadaProvider = provider
scadaList = list
if err := c.setupScadaConn(config); err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err))
return err
}

if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 || scadaList != nil {
servers, err := NewHTTPServers(agent, config, scadaList, logOutput)
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 {
servers, err := NewHTTPServers(agent, config, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting http servers: %s", err))
Expand Down Expand Up @@ -684,9 +679,16 @@ AFTER_MIGRATE:
for _, server := range c.httpServers {
defer server.Shutdown()
}
if c.scadaProvider != nil {
defer c.scadaProvider.Shutdown()
}

// Check and shut down the SCADA listeners at the end
defer func() {
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
}()

// Join startup nodes if specified
if err := c.startupJoin(config); err != nil {
Expand Down Expand Up @@ -904,9 +906,45 @@ func (c *Command) handleReload(config *Config) *Config {
}(wp)
}

// Reload SCADA client if we have a change
if newConf.AtlasInfrastructure != config.AtlasInfrastructure ||
newConf.AtlasToken != config.AtlasToken {
if err := c.setupScadaConn(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err))
return nil
}
}

return newConf
}

// startScadaClient is used to start a new SCADA provider and listener,
// replacing any existing listeners.
func (c *Command) setupScadaConn(config *Config) error {
// Shut down existing SCADA listeners
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}

// No-op if we don't have an infrastructure
if config.AtlasInfrastructure == "" {
return nil
}

// Create the new provider and listener
c.Ui.Output("Connecting to Atlas: " + config.AtlasInfrastructure)
provider, list, err := NewProvider(config, c.logOutput)
if err != nil {
return err
}
c.scadaProvider = provider
c.scadaHttp = newScadaHttp(c.agent, list)
return nil
}

func (c *Command) Synopsis() string {
return "Runs a Consul agent"
}
Expand Down
53 changes: 53 additions & 0 deletions command/agent/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"os"
"strings"
"testing"

"github.com/hashicorp/consul/testutil"
Expand Down Expand Up @@ -246,3 +247,55 @@ func TestSetupAgent_RPCUnixSocket_FileExists(t *testing.T) {
t.Fatalf("bad permissions: %s", fi.Mode())
}
}

func TestSetupScadaConn(t *testing.T) {
// Create a config and assign an infra name
conf1 := nextConfig()
conf1.AtlasInfrastructure = "hashicorp/test1"
conf1.AtlasToken = "abc"

dir, agent := makeAgent(t, conf1)
defer os.RemoveAll(dir)
defer agent.Shutdown()

cmd := &Command{
ShutdownCh: make(chan struct{}),
Ui: new(cli.MockUi),
agent: agent,
}

// First start creates the scada conn
if err := cmd.setupScadaConn(conf1); err != nil {
t.Fatalf("err: %s", err)
}
list := cmd.scadaHttp.listener.(*scadaListener)
if list == nil || list.addr.infra != "hashicorp/test1" {
t.Fatalf("bad: %#v", list)
}
http1 := cmd.scadaHttp
provider1 := cmd.scadaProvider

// Performing setup again tears down original and replaces
// with a new SCADA client.
conf2 := nextConfig()
conf2.AtlasInfrastructure = "hashicorp/test2"
conf2.AtlasToken = "123"
if err := cmd.setupScadaConn(conf2); err != nil {
t.Fatalf("err: %s", err)
}
if cmd.scadaHttp == http1 || cmd.scadaProvider == provider1 {
t.Fatalf("should change: %#v %#v", cmd.scadaHttp, cmd.scadaProvider)
}
list = cmd.scadaHttp.listener.(*scadaListener)
if list == nil || list.addr.infra != "hashicorp/test2" {
t.Fatalf("bad: %#v", list)
}

// Original provider and listener must be closed
if !provider1.IsShutdown() {
t.Fatalf("should be shutdown")
}
if _, err := http1.listener.Accept(); !strings.Contains(err.Error(), "closed") {
t.Fatalf("should be closed")
}
}
39 changes: 20 additions & 19 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type HTTPServer struct {

// NewHTTPServers starts new HTTP servers to provide an interface to
// the agent.
func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput io.Writer) ([]*HTTPServer, error) {
func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPServer, error) {
var servers []*HTTPServer

if config.Ports.HTTPS > 0 {
Expand Down Expand Up @@ -142,27 +142,28 @@ func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput
servers = append(servers, srv)
}

if scada != nil {
// Create the mux
mux := http.NewServeMux()

// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: scada,
logger: log.New(logOutput, "", log.LstdFlags),
uiDir: config.UiDir,
addr: scadaHTTPAddr,
}
srv.registerHandlers(false) // Never allow debug for SCADA
return servers, nil
}

// Start the server
go http.Serve(scada, mux)
servers = append(servers, srv)
// newScadaHttp creates a new HTTP server wrapping the SCADA
// listener such that HTTP calls can be sent from the brokers.
func newScadaHttp(agent *Agent, list net.Listener) *HTTPServer {
// Create the mux
mux := http.NewServeMux()

// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: list,
logger: agent.logger,
addr: scadaHTTPAddr,
}
srv.registerHandlers(false) // Never allow debug for SCADA

return servers, nil
// Start the server
go http.Serve(list, mux)
return srv
}

// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
Expand Down
37 changes: 35 additions & 2 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPSe
t.Fatalf("err: %v", err)
}
conf.UiDir = uiDir
servers, err := NewHTTPServers(agent, conf, nil, agent.logOutput)
servers, err := NewHTTPServers(agent, conf, agent.logOutput)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) {
defer os.RemoveAll(dir)

// Try to start the server with the same path anyways.
if _, err := NewHTTPServers(agent, conf, nil, agent.logOutput); err != nil {
if _, err := NewHTTPServers(agent, conf, agent.logOutput); err != nil {
t.Fatalf("err: %s", err)
}

Expand Down Expand Up @@ -516,6 +516,39 @@ func TestACLResolution(t *testing.T) {
})
}

func TestScadaHTTP(t *testing.T) {
// Create the agent
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()

// Create a generic listener
list, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("err: %s", err)
}
defer list.Close()

// Create the SCADA HTTP server
scadaHttp := newScadaHttp(agent, list)

// Returned server uses the listener and scada addr
if scadaHttp.listener != list {
t.Fatalf("bad listener: %#v", scadaHttp)
}
if scadaHttp.addr != scadaHTTPAddr {
t.Fatalf("expected %v, got: %v", scadaHttp.addr, scadaHTTPAddr)
}

// Check that debug endpoints were not enabled. This will cause
// the serve mux to panic if the routes are already handled.
mockFn := func(w http.ResponseWriter, r *http.Request) {}
scadaHttp.mux.HandleFunc("/debug/pprof/", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/cmdline", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/profile", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/symbol", mockFn)
}

// assertIndex tests that X-Consul-Index is set and non-zero
func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) {
header := resp.Header().Get("X-Consul-Index")
Expand Down
2 changes: 1 addition & 1 deletion command/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper {

conf.Addresses.HTTP = "127.0.0.1"
httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP)
http, err := agent.NewHTTPServers(a, conf, nil, os.Stderr)
http, err := agent.NewHTTPServers(a, conf, os.Stderr)
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))
Expand Down
2 changes: 2 additions & 0 deletions website/source/docs/agent/options.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,5 @@ items which are reloaded include:
* Services
* Watches
* HTTP Client Address
* Atlas Token
* Atlas Infrastructure