Skip to content

Commit

Permalink
Pipe plumbing
Browse files Browse the repository at this point in the history
- Add store of pipes in the app
- Add pipe type, handling impedance mismatch, used in app and probe.
- App <-> Probe pipes have their own websockets.
- Add pipe websocket endpoint in app.
- Pipe IDs are strings, lose the request/response IDs, and give the json encoder lowercase field names.
- Add simple golang ws client, for testing.
- Pipe lifecycle plumbing.
- Ref count and timeout both ends of pipes in the app
- Deal with POST /api/pipe/:pid?_method=delete
- Add end-to-end unit test for pipes.
- Add test for timing out pipes.
- Update go-docker client to tomwilkie/go-dockerclient
- Backend work for non-raw ttys
- Close pipes when they close themselves in the probe
- Ensure all http connections are done before returning from client.Stop()
  • Loading branch information
tomwilkie committed Dec 10, 2015
1 parent 9c2c2b9 commit ac9c011
Show file tree
Hide file tree
Showing 19 changed files with 828 additions and 118 deletions.
16 changes: 9 additions & 7 deletions app/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func RegisterControlRoutes(router *mux.Router) {
type controlHandler struct {
id int64
client *rpc.Client
codec *xfer.JSONWebsocketCodec
}

type controlRouter struct {
sync.Mutex
probes map[string]controlHandler
}

func (ch *controlHandler) handle(req xfer.Request) xfer.Response {
Expand All @@ -34,11 +40,6 @@ func (ch *controlHandler) handle(req xfer.Request) xfer.Response {
return res
}

type controlRouter struct {
sync.Mutex
probes map[string]controlHandler
}

func (cr *controlRouter) get(probeID string) (controlHandler, bool) {
cr.Lock()
defer cr.Unlock()
Expand Down Expand Up @@ -79,15 +80,15 @@ func (cr *controlRouter) handleControl(w http.ResponseWriter, r *http.Request) {
}

result := handler.handle(xfer.Request{
ID: rand.Int63(),
AppID: UniqueID,
NodeID: nodeID,
Control: control,
})
if result.Error != "" {
respondWith(w, http.StatusBadRequest, result.Error)
return
}
respondWith(w, http.StatusOK, result.Value)
respondWith(w, http.StatusOK, result)
}

// handleProbeWS accepts websocket connections from the probe and registers
Expand All @@ -110,6 +111,7 @@ func (cr *controlRouter) handleProbeWS(w http.ResponseWriter, r *http.Request) {
client := rpc.NewClientWithCodec(codec)
handler := controlHandler{
id: rand.Int63(),
codec: codec,
client: client,
}

Expand Down
22 changes: 11 additions & 11 deletions app/controls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,7 @@ func TestControl(t *testing.T) {
probeConfig := xfer.ProbeConfig{
ProbeID: "foo",
}
client, err := xfer.NewAppClient(probeConfig, ip+":"+port, ip+":"+port)
if err != nil {
t.Fatal(err)
}
defer client.Stop()

client.ControlConnection(xfer.ControlHandlerFunc(func(req xfer.Request) xfer.Response {
controlHandler := xfer.ControlHandlerFunc(func(req xfer.Request) xfer.Response {
if req.NodeID != "nodeid" {
t.Fatalf("'%s' != 'nodeid'", req.NodeID)
}
Expand All @@ -47,7 +41,13 @@ func TestControl(t *testing.T) {
return xfer.Response{
Value: "foo",
}
}))
})
client, err := xfer.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, controlHandler)
if err != nil {
t.Fatal(err)
}
client.ControlConnection()
defer client.Stop()

time.Sleep(100 * time.Millisecond)

Expand All @@ -59,12 +59,12 @@ func TestControl(t *testing.T) {
t.Fatal(err)
}

var response string
var response xfer.Response
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
t.Fatal(err)
}

if response != "foo" {
t.Fatalf("'%s' != 'foo'", response)
if response.Value != "foo" {
t.Fatalf("'%s' != 'foo'", response.Value)
}
}
194 changes: 194 additions & 0 deletions app/pipes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package app

import (
"io"
"log"
"net/http"
"sync"
"time"

"github.com/gorilla/mux"

"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/xfer"
)

const (
gcInterval = 30 * time.Second // we check all the pipes every 30s
pipeTimeout = 1 * time.Minute // pipes are closed when a client hasn't been connected for 1 minute
gcTimeout = 1 * time.Minute // after another 1 minute, tombstoned pipes are forgotten
)

// PipeRouter connects incoming and outgoing pipes.
type PipeRouter struct {
sync.Mutex
wait sync.WaitGroup
quit chan struct{}
pipes map[string]*pipe
}

// for each end of the pipe, we keep a reference count & lastUsedTIme,
// such that we can timeout pipes when either end is inactive.
type end struct {
refCount int
lastUsedTime time.Time
}

type pipe struct {
ui, probe end
tombstoneTime time.Time

xfer.Pipe
}

// RegisterPipeRoutes registers the pipe routes
func RegisterPipeRoutes(router *mux.Router) *PipeRouter {
pipeRouter := &PipeRouter{
quit: make(chan struct{}),
pipes: map[string]*pipe{},
}
pipeRouter.wait.Add(1)
go pipeRouter.gcLoop()
router.Methods("GET").Path("/api/pipe/{pipeID}").HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) {
uiEnd, _ := p.Ends()
return &p.ui, uiEnd
}))
router.Methods("GET").Path("/api/pipe/{pipeID}/probe").HandlerFunc(pipeRouter.handleWs(func(p *pipe) (*end, io.ReadWriter) {
_, probeEnd := p.Ends()
return &p.probe, probeEnd
}))
router.Methods("DELETE").Path("/api/pipe/{pipeID}").HandlerFunc(pipeRouter.deletePipe)
router.Methods("POST").Path("/api/pipe/{pipeID}").HandlerFunc(pipeRouter.deletePipe)
return pipeRouter
}

// Stop stops the pipeRouter
func (pr *PipeRouter) Stop() {
close(pr.quit)
pr.wait.Wait()
}

func (pr *PipeRouter) gcLoop() {
defer pr.wait.Done()
ticker := time.Tick(gcInterval)
for {
select {
case <-pr.quit:
return
case <-ticker:
}

pr.timeoutPipes()
pr.gcPipes()
}
}

func (pr *PipeRouter) timeoutPipes() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for id, pipe := range pr.pipes {
if pipe.Closed() || (pipe.ui.refCount > 0 && pipe.probe.refCount > 0) {
continue
}

if (pipe.ui.refCount == 0 && now.Sub(pipe.ui.lastUsedTime) >= pipeTimeout) ||
(pipe.probe.refCount == 0 && now.Sub(pipe.probe.lastUsedTime) >= pipeTimeout) {
log.Printf("Timing out pipe %s", id)
pipe.Close()
pipe.tombstoneTime = now
}
}
}

func (pr *PipeRouter) gcPipes() {
pr.Lock()
defer pr.Unlock()
now := mtime.Now()
for pipeID, pipe := range pr.pipes {
if pipe.Closed() && now.Sub(pipe.tombstoneTime) >= gcTimeout {
delete(pr.pipes, pipeID)
}
}
}

func (pr *PipeRouter) getOrCreatePipe(id string) (*pipe, bool) {
pr.Lock()
defer pr.Unlock()
p, ok := pr.pipes[id]
if !ok {
log.Printf("Creating pipe id %s", id)
p = &pipe{
ui: end{lastUsedTime: mtime.Now()},
probe: end{lastUsedTime: mtime.Now()},
Pipe: xfer.NewPipe(),
}
pr.pipes[id] = p
}
if p.Closed() {
return nil, false
}
return p, true
}

func (pr *PipeRouter) getPipeRef(id string, pipe *pipe, end *end) bool {
pr.Lock()
defer pr.Unlock()
if pipe.Closed() {
return false
}
end.refCount++
return true
}

func (pr *PipeRouter) putPipeRef(id string, pipe *pipe, end *end) {
pr.Lock()
defer pr.Unlock()

end.refCount--
if end.refCount != 0 {
return
}

if !pipe.Closed() {
end.lastUsedTime = mtime.Now()
}
}

func (pr *PipeRouter) handleWs(endSelector func(*pipe) (*end, io.ReadWriter)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
pipeID := mux.Vars(r)["pipeID"]
pipe, ok := pr.getOrCreatePipe(pipeID)
if !ok {
http.NotFound(w, r)
return
}

endRef, endIO := endSelector(pipe)
if !pr.getPipeRef(pipeID, pipe, endRef) {
http.NotFound(w, r)
return
}
defer pr.putPipeRef(pipeID, pipe, endRef)

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Error upgrading to websocket: %v", err)
return
}
defer conn.Close()

pipe.CopyToWebsocket(endIO, conn)
}
}

func (pr *PipeRouter) deletePipe(w http.ResponseWriter, r *http.Request) {
pipeID := mux.Vars(r)["pipeID"]
pipe, ok := pr.getOrCreatePipe(pipeID)
if ok && pr.getPipeRef(pipeID, pipe, &pipe.ui) {
log.Printf("Closing pipe %s", pipeID)
pipe.Close()
pipe.tombstoneTime = mtime.Now()
pr.putPipeRef(pipeID, pipe, &pipe.ui)
}
}
Loading

0 comments on commit ac9c011

Please sign in to comment.