Skip to content

Commit

Permalink
Issue #179: TCP Proxy Support
Browse files Browse the repository at this point in the history
* Add generic TCP proxy support.
* Add support for ReadTimeout and WriteTimeout for the TCP and the TCP+SNI
  proxy.
* Add integration tests for the TCP and TCP+SNI proxy.
* Update the demo server to provide a TCP server.
* Add a tcptest package for generic TCP server testing.

Fixes #1, #178, #179
  • Loading branch information
magiconair committed Mar 25, 2017
1 parent 726eb51 commit a4d7342
Show file tree
Hide file tree
Showing 16 changed files with 593 additions and 111 deletions.
9 changes: 6 additions & 3 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ func parseListen(cfg string, cs map[string]CertSource, readTimeout, writeTimeout
switch k {
case "proto":
l.Proto = v
if l.Proto != "http" && l.Proto != "https" && l.Proto != "tcp+sni" {
switch l.Proto {
case "tcp", "tcp+sni", "http", "https":
// ok
default:
return Listen{}, fmt.Errorf("unknown protocol %q", v)
}
case "rt": // read timeout
Expand Down Expand Up @@ -300,8 +303,8 @@ func parseListen(cfg string, cs map[string]CertSource, readTimeout, writeTimeout
if l.Proto == "" {
l.Proto = "http"
}
if csName != "" && l.Proto != "https" {
return Listen{}, fmt.Errorf("cert source requires proto 'https'")
if csName != "" && l.Proto != "https" && l.Proto != "tcp" {
return Listen{}, fmt.Errorf("cert source requires proto 'https' or 'tcp'")
}
if csName == "" && l.Proto == "https" {
return Listen{}, fmt.Errorf("proto 'https' requires cert source")
Expand Down
15 changes: 11 additions & 4 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func TestLoad(t *testing.T) {
return cfg
},
},
{
args: []string{"-proxy.addr", ":5555;proto=tcp"},
cfg: func(cfg *Config) *Config {
cfg.Listen = []Listen{{Addr: ":5555", Proto: "tcp"}}
return cfg
},
},
{
args: []string{"-proxy.addr", ":5555;proto=tcp+sni"},
cfg: func(cfg *Config) *Config {
Expand Down Expand Up @@ -686,16 +693,16 @@ func TestLoad(t *testing.T) {
err: errors.New("proto 'https' requires cert source"),
},
{
desc: "-proxy.addr with cert source and proto 'http' requires proto 'https'",
desc: "-proxy.addr with cert source and proto 'http' requires proto 'https' or 'tcp'",
args: []string{"-proxy.addr", ":5555;cs=name;proto=http", "-proxy.cs", "cs=name;type=path;cert=value"},
cfg: func(cfg *Config) *Config { return nil },
err: errors.New("cert source requires proto 'https'"),
err: errors.New("cert source requires proto 'https' or 'tcp'"),
},
{
desc: "-proxy.addr with cert source and proto 'tcp+sni' requires proto 'https'",
desc: "-proxy.addr with cert source and proto 'tcp+sni' requires proto 'https' or 'tcp'",
args: []string{"-proxy.addr", ":5555;cs=name;proto=tcp+sni", "-proxy.cs", "cs=name;type=path;cert=value"},
cfg: func(cfg *Config) *Config { return nil },
err: errors.New("cert source requires proto 'https'"),
err: errors.New("cert source requires proto 'https' or 'tcp'"),
},
{
args: []string{"-cfg"},
Expand Down
170 changes: 115 additions & 55 deletions demo/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
// # websocket server
// ./server -addr 127.0.0.1:6000 -name ws-a -prefix /echo1,/echo2 -proto ws
//
// # tcp server
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp
//
package main

import (
"bufio"
"flag"
"fmt"
"io"
Expand All @@ -39,19 +43,58 @@ import (
"strconv"
"strings"

"github.com/eBay/fabio/proxy/tcp"
"github.com/hashicorp/consul/api"
"golang.org/x/net/websocket"
)

type Server interface {
// embedded server methods
ListenAndServe() error
ListenAndServeTLS(certFile, keyFile string) error

// consul register helpers
Tags() []string
Check() *api.AgentServiceCheck
}

type HTTPServer struct {
*http.Server
tags []string
check *api.AgentServiceCheck
}

func (s *HTTPServer) Check() *api.AgentServiceCheck {
return s.check
}

func (s *HTTPServer) Tags() []string {
return s.tags
}

type TCPServer struct {
*tcp.Server
tags []string
check *api.AgentServiceCheck
}

func (s *TCPServer) Check() *api.AgentServiceCheck {
return s.check
}

func (s *TCPServer) Tags() []string {
return s.tags
}

func main() {
var addr, consul, name, prefix, proto, token string
var certFile, keyFile string
var status int
flag.StringVar(&addr, "addr", "127.0.0.1:5000", "host:port of the service")
flag.StringVar(&consul, "consul", "127.0.0.1:8500", "host:port of the consul agent")
flag.StringVar(&name, "name", filepath.Base(os.Args[0]), "name of the service")
flag.StringVar(&prefix, "prefix", "", "comma-sep list of host/path prefixes to register")
flag.StringVar(&proto, "proto", "http", "protocol for endpoints: http or ws")
flag.StringVar(&prefix, "prefix", "", "comma-sep list of 'host/path' or ':port' prefixes to register")
flag.StringVar(&proto, "proto", "http", "protocol for endpoints: http, ws or tcp")
flag.StringVar(&token, "token", "", "consul ACL token")
flag.StringVar(&certFile, "cert", "", "path to cert file")
flag.StringVar(&keyFile, "key", "", "path to key file")
Expand All @@ -63,54 +106,65 @@ func main() {
os.Exit(1)
}

// register prefixes
prefixes := strings.Split(prefix, ",")
for _, p := range prefixes {
switch proto {
case "http":
http.HandleFunc(p, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(status)
fmt.Fprintf(w, "Serving %s from %s on %s\n", r.RequestURI, name, addr)
})
case "ws":
http.Handle(p, websocket.Handler(EchoServer))
default:
log.Fatal("Invalid protocol ", proto)
var srv Server
switch proto {
case "http", "ws":
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "OK")
})
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "not found", 404)
log.Printf("%s -> 404", r.URL)
})

var tags []string
for _, p := range strings.Split(prefix, ",") {
tags = append(tags, "urlprefix-"+p)
switch proto {
case "http":
mux.HandleFunc(p, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(status)
fmt.Fprintf(w, "Serving %s from %s on %s\n", r.RequestURI, name, addr)
})
case "ws":
mux.Handle(p, websocket.Handler(WSEchoServer))
}
}
}

// register consul health check endpoint
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "OK")
})
var check *api.AgentServiceCheck
if certFile != "" {
check = &api.AgentServiceCheck{TCP: addr, Interval: "2s", Timeout: "1s"}
} else {
check = &api.AgentServiceCheck{HTTP: "http://" + addr + "/health", Interval: "1s", Timeout: "1s"}
}
srv = &HTTPServer{&http.Server{Addr: addr, Handler: mux}, tags, check}

case "tcp":
var tags []string
for _, p := range strings.Split(prefix, ",") {
tags = append(tags, "urlprefix-"+p+" proto=tcp")
}
check := &api.AgentServiceCheck{TCP: addr, Interval: "2s", Timeout: "1s"}
srv = &TCPServer{&tcp.Server{Addr: addr, Handler: tcp.HandlerFunc(TCPEchoHandler)}, tags, check}

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "not found", 404)
log.Printf("%s -> 404", r.URL)
})
default:
log.Fatal("Invalid protocol ", proto)
}

// start http server
// start server
go func() {
log.Printf("Listening on %s serving %s", addr, prefix)

var err error
if certFile != "" {
err = http.ListenAndServeTLS(addr, certFile, keyFile, nil)
err = srv.ListenAndServeTLS(certFile, keyFile)
} else {
err = http.ListenAndServe(addr, nil)
err = srv.ListenAndServe()
}
if err != nil {
log.Fatal(err)
}
}()

// build urlprefix-host/path tag list
// e.g. urlprefix-/foo, urlprefix-/bar, ...
var tags []string
for _, p := range prefixes {
tags = append(tags, "urlprefix-"+p)
}

// get host and port as string/int
host, portstr, err := net.SplitHostPort(addr)
if err != nil {
Expand All @@ -121,30 +175,15 @@ func main() {
log.Fatal(err)
}

var check *api.AgentServiceCheck
if certFile != "" {
check = &api.AgentServiceCheck{
TCP: addr,
Interval: "2s",
Timeout: "1s",
}
} else {
check = &api.AgentServiceCheck{
HTTP: "http://" + addr + "/health",
Interval: "1s",
Timeout: "1s",
}
}

// register service with health check
serviceID := name + "-" + addr
service := &api.AgentServiceRegistration{
ID: serviceID,
Name: name,
Port: port,
Address: host,
Tags: tags,
Check: check,
Tags: srv.Tags(),
Check: srv.Check(),
}

config := &api.Config{Address: consul, Scheme: "http", Token: token}
Expand All @@ -156,7 +195,7 @@ func main() {
if err := client.Agent().ServiceRegister(service); err != nil {
log.Fatal(err)
}
log.Printf("Registered service %q in consul with tags %q", name, strings.Join(tags, ","))
log.Printf("Registered %s service %q in consul with tags %q", proto, name, strings.Join(srv.Tags(), ","))

// run until we get a signal
quit := make(chan os.Signal, 1)
Expand All @@ -170,7 +209,7 @@ func main() {
log.Printf("Deregistered service %q in consul", name)
}

func EchoServer(ws *websocket.Conn) {
func WSEchoServer(ws *websocket.Conn) {
addr := ws.LocalAddr().String()
pfx := []byte("[" + addr + "] ")

Expand All @@ -193,3 +232,24 @@ func EchoServer(ws *websocket.Conn) {
}
log.Printf("ws disconnect on %s", addr)
}

func TCPEchoHandler(c net.Conn) error {
defer c.Close()

addr := c.LocalAddr().String()
_, err := fmt.Fprintf(c, "[%s] Welcome\n", addr)
if err != nil {
return err
}

for {
line, _, err := bufio.NewReader(c).ReadLine()
if err != nil {
return err
}
_, err = fmt.Fprintf(c, "[%s] %s\n", addr, string(line))
if err != nil {
return err
}
}
}
10 changes: 5 additions & 5 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@
#
# * http for HTTP based protocols
# * https for HTTPS based protocols
# * tcp+sni for an SNI aware TCP proxy (EXPERIMENTAL)
# * tcp for a raw TCP proxy with or witout TLS support
# * tcp+sni for an SNI aware TCP proxy
#
# If no 'proto' option is specified then the protocol
# is either 'http' or 'https' depending on whether a
Expand All @@ -189,10 +190,6 @@
# extension and then forwards the encrypted traffic
# to the destination without decrypting the traffic.
#
# The TCP+SNI proxy is currently marked as EXPERIMENTAL
# since it needs more real-world testing and an integration
# test.
#
# General options:
#
# rt: Sets the read timeout as a duration value (e.g. '3s')
Expand Down Expand Up @@ -223,6 +220,9 @@
# # HTTPS listener on port 443 with certificate source
# proxy.addr = :443;cs=some-name
#
# # TCP listener on port 1234 with port routing
# proxy.addr = :1234;proto=tcp
#
# # TCP listener on port 443 with SNI routing
# proxy.addr = :443;proto=tcp+sni
#
Expand Down
Loading

0 comments on commit a4d7342

Please sign in to comment.