From 4732c36d884a523ce0662cf709e69c08395c7c1e Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Tue, 25 Nov 2014 11:06:14 -0500 Subject: [PATCH 1/9] Consul Session TTLs The design of the session TTLs is based on the Google Chubby approach (http://research.google.com/archive/chubby-osdi06.pdf). The Session struct has an additional TTL field now. This attaches an implicit heartbeat based failure detector. Tracking of heartbeats is done by the current leader and not persisted via the Raft log. The implication of this is during a leader failover, we do not retain the last heartbeat times. Similar to Chubby, the TTL represents a lower-bound. Consul promises not to terminate a session before the TTL has expired, but is allowed to extend the expiration past it. This enables us to reset the TTL on a leader failover. The TTL is also extended when the client does a heartbeat. Like Chubby, this means a TTL is extended on creation, heartbeat or failover. Additionally, because we must account for time requests are in transit and the relative rates of clocks on the clients and servers, Consul will take the conservative approach of internally multiplying the TTL by 2x. This helps to compensate for network latency and clock skew without violating the contract. Reference: https://docs.google.com/document/d/1Y5-pahLkUaA7Kz4SBU_mehKiyt9yaaUGcBTMZR7lToY/edit?usp=sharing --- command/agent/http.go | 1 + command/agent/session_endpoint.go | 30 +++++ command/agent/session_endpoint_test.go | 149 +++++++++++++++++++++++++ consul/leader.go | 7 ++ consul/server.go | 6 + consul/session_endpoint.go | 38 +++++++ consul/session_ttl.go | 96 ++++++++++++++++ consul/state_store.go | 26 +++++ consul/structs/structs.go | 8 ++ 9 files changed, 361 insertions(+) create mode 100644 consul/session_ttl.go diff --git a/command/agent/http.go b/command/agent/http.go index a7a3c4d58756..8c174711bf70 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -188,6 +188,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate)) s.mux.HandleFunc("/v1/session/destroy/", s.wrap(s.SessionDestroy)) + s.mux.HandleFunc("/v1/session/renew/", s.wrap(s.SessionRenew)) s.mux.HandleFunc("/v1/session/info/", s.wrap(s.SessionGet)) s.mux.HandleFunc("/v1/session/node/", s.wrap(s.SessionsForNode)) s.mux.HandleFunc("/v1/session/list", s.wrap(s.SessionList)) diff --git a/command/agent/session_endpoint.go b/command/agent/session_endpoint.go index cd9fa7ecdb74..b9b6c6b97f40 100644 --- a/command/agent/session_endpoint.go +++ b/command/agent/session_endpoint.go @@ -40,6 +40,7 @@ func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request) Checks: []string{consul.SerfCheckID}, LockDelay: 15 * time.Second, Behavior: structs.SessionKeysRelease, + TTL: "", }, } s.parseDC(req, &args.Datacenter) @@ -130,6 +131,35 @@ func (s *HTTPServer) SessionDestroy(resp http.ResponseWriter, req *http.Request) return true, nil } +// SessionRenew is used to renew the TTL on an existing TTL session +func (s *HTTPServer) SessionRenew(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Mandate a PUT request + if req.Method != "PUT" { + resp.WriteHeader(405) + return nil, nil + } + + args := structs.SessionSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + // Pull out the session id + args.Session = strings.TrimPrefix(req.URL.Path, "/v1/session/renew/") + if args.Session == "" { + resp.WriteHeader(400) + resp.Write([]byte("Missing session")) + return nil, nil + } + + var out structs.IndexedSessions + defer setMeta(resp, &out.QueryMeta) + if err := s.agent.RPC("Session.Renew", &args, &out); err != nil { + return nil, err + } + return out.Sessions, nil +} + // SessionGet is used to get info for a particular session func (s *HTTPServer) SessionGet(resp http.ResponseWriter, req *http.Request) (interface{}, error) { args := structs.SessionSpecificRequest{} diff --git a/command/agent/session_endpoint_test.go b/command/agent/session_endpoint_test.go index 74ec20ebf27a..a30afc0d0c20 100644 --- a/command/agent/session_endpoint_test.go +++ b/command/agent/session_endpoint_test.go @@ -176,6 +176,28 @@ func makeTestSessionDelete(t *testing.T, srv *HTTPServer) string { return sessResp.ID } +func makeTestSessionTTL(t *testing.T, srv *HTTPServer, ttl string) string { + // Create Session with TTL + body := bytes.NewBuffer(nil) + enc := json.NewEncoder(body) + raw := map[string]interface{}{ + "TTL": ttl, + } + enc.Encode(raw) + + req, err := http.NewRequest("PUT", "/v1/session/create", body) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := httptest.NewRecorder() + obj, err := srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + sessResp := obj.(sessionCreateResponse) + return sessResp.ID +} + func TestSessionDestroy(t *testing.T) { httpTest(t, func(srv *HTTPServer) { id := makeTestSession(t, srv) @@ -192,6 +214,133 @@ func TestSessionDestroy(t *testing.T) { }) } +func TestSessionTTL(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + TTL := "30s" + ttl := 30 * time.Second + + id := makeTestSessionTTL(t, srv, TTL) + + req, err := http.NewRequest("GET", + "/v1/session/info/"+id, nil) + resp := httptest.NewRecorder() + obj, err := srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok := obj.(structs.Sessions) + if !ok { + t.Fatalf("should work") + } + if len(respObj) != 1 { + t.Fatalf("bad: %v", respObj) + } + if respObj[0].TTL != TTL { + t.Fatalf("Incorrect TTL: %s", respObj[0].TTL) + } + + // now wait for timeout, it is really 2*TTL, so wait 3*TTL + time.Sleep(ttl * 3) + + req, err = http.NewRequest("GET", + "/v1/session/info/"+id, nil) + resp = httptest.NewRecorder() + obj, err = srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok = obj.(structs.Sessions) + if ok { + t.Fatalf("session '%s' should have been destroyed") + } + if len(respObj) != 0 { + t.Fatalf("bad: %v", respObj) + } + }) +} + +func TestSessionTTLRenew(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + TTL := "30s" + ttl := 30 * time.Second + + id := makeTestSessionTTL(t, srv, TTL) + + req, err := http.NewRequest("GET", + "/v1/session/info/"+id, nil) + resp := httptest.NewRecorder() + obj, err := srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok := obj.(structs.Sessions) + if !ok { + t.Fatalf("should work") + } + if len(respObj) != 1 { + t.Fatalf("bad: %v", respObj) + } + if respObj[0].TTL != TTL { + t.Fatalf("Incorrect TTL: %s", respObj[0].TTL) + } + + // Sleep for 45s (since internal effective ttl is really 60s when 30s is specified) + time.Sleep(45 * time.Second) + + req, err = http.NewRequest("PUT", + "/v1/session/renew/"+id, nil) + resp = httptest.NewRecorder() + obj, err = srv.SessionRenew(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok = obj.(structs.Sessions) + if !ok { + t.Fatalf("should work") + } + if len(respObj) != 1 { + t.Fatalf("bad: %v", respObj) + } + + // Sleep for another 45s (since effective ttl is ttl*2, meaning 60s) if renew + // didn't work, session would have got deleted + time.Sleep(45 * time.Second) + + req, err = http.NewRequest("GET", + "/v1/session/info/"+id, nil) + resp = httptest.NewRecorder() + obj, err = srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok = obj.(structs.Sessions) + if !ok { + t.Fatalf("session '%s' should have renewed") + } + if len(respObj) != 1 { + t.Fatalf("bad: %v", respObj) + } + + // now wait for timeout and expect session to get destroyed + time.Sleep(ttl * 2) + + req, err = http.NewRequest("GET", + "/v1/session/info/"+id, nil) + resp = httptest.NewRecorder() + obj, err = srv.SessionGet(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + respObj, ok = obj.(structs.Sessions) + if ok { + t.Fatalf("session '%s' should have been destroyed") + } + if len(respObj) != 0 { + t.Fatalf("bad: %v", respObj) + } + }) +} + func TestSessionGet(t *testing.T) { httpTest(t, func(srv *HTTPServer) { id := makeTestSession(t, srv) diff --git a/consul/leader.go b/consul/leader.go index 7f4f378a64ef..c57adb272ef9 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -61,6 +61,13 @@ func (s *Server) leaderLoop(stopCh chan struct{}) { s.logger.Printf("[ERR] consul: ACL initialization failed: %v", err) } + // Setup Session Timers if we are the leader and need to + if err := s.initializeSessionTimers(); err != nil { + s.logger.Printf("[ERR] consul: Session Timers initialization failed: %v", err) + } + // clear the session timers if we are no longer leader and exit the leaderLoop + defer s.clearAllSessionTimers() + // Reconcile channel is only used once initial reconcile // has succeeded var reconcileCh chan serf.Member diff --git a/consul/server.go b/consul/server.go index cba7f11bad90..f14dc2aa1176 100644 --- a/consul/server.go +++ b/consul/server.go @@ -128,6 +128,12 @@ type Server struct { // which SHOULD only consist of Consul servers serfWAN *serf.Serf + // sessionTimers track the expiration time of each Session that has + // a TTL. On expiration, a SessionDestroy event will occur, and + // destroy the session via standard session destory processing + sessionTimers map[string]*time.Timer + sessionTimersLock sync.RWMutex + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go index cfe60ea5702e..b24e71742929 100644 --- a/consul/session_endpoint.go +++ b/consul/session_endpoint.go @@ -36,6 +36,16 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { default: return fmt.Errorf("Invalid Behavior setting '%s'", args.Session.Behavior) } + if args.Session.TTL != "" { + ttl, err := time.ParseDuration(args.Session.TTL) + if err != nil { + return fmt.Errorf("Session TTL '%s' invalid: %v", args.Session.TTL, err) + } + + if ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax { + return fmt.Errorf("Invalid Session TTL '%d', must be between [%d-%d] seconds", ttl, structs.SessionTTLMin, structs.SessionTTLMax) + } + } // If this is a create, we must generate the Session ID. This must // be done prior to appending to the raft log, because the ID is not @@ -63,6 +73,13 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { s.srv.logger.Printf("[ERR] consul.session: Apply failed: %v", err) return err } + + if args.Op == structs.SessionCreate && args.Session.TTL != "" { + s.srv.resetSessionTimer(args.Session.ID, nil) + } else if args.Op == structs.SessionDestroy && args.Session.TTL != "" { + s.srv.clearSessionTimer(args.Session.ID) + } + if respErr, ok := resp.(error); ok { return respErr } @@ -133,3 +150,24 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, return err }) } + +// Renew is used to renew the TTL on a single session +func (s *Session) Renew(args *structs.SessionSpecificRequest, + reply *structs.IndexedSessions) error { + if done, err := s.srv.forward("Session.Renew", args, args, reply); done { + return err + } + + // Get the local state + state := s.srv.fsm.State() + // Get the session, from local state + index, session, err := state.SessionGet(args.Session) + reply.Index = index + if session != nil { + reply.Sessions = structs.Sessions{session} + // reset the session TTL timer + err = s.srv.resetSessionTimer(args.Session, session) + } + + return err +} diff --git a/consul/session_ttl.go b/consul/session_ttl.go new file mode 100644 index 000000000000..646d02ed70ed --- /dev/null +++ b/consul/session_ttl.go @@ -0,0 +1,96 @@ +package consul + +import ( + "fmt" + "github.com/hashicorp/consul/consul/structs" + "time" +) + +func (s *Server) initializeSessionTimers() error { + s.sessionTimersLock.Lock() + s.sessionTimers = make(map[string]*time.Timer) + s.sessionTimersLock.Unlock() + + // walk the TTL index and resetSessionTimer for each non-zero TTL + state := s.fsm.State() + _, sessions, err := state.SessionListTTL() + if err != nil { + return err + } + for _, session := range sessions { + err := s.resetSessionTimer(session.ID, session) + if err != nil { + return err + } + } + return nil +} + +func (s *Server) resetSessionTimer(id string, session *structs.Session) error { + if session == nil { + var err error + + // find the session + state := s.fsm.State() + _, session, err = state.SessionGet(id) + if err != nil || session == nil { + return fmt.Errorf("Could not find session for '%s'\n", id) + } + } + + if session.TTL != "" { + ttl, err := time.ParseDuration(session.TTL) + if err != nil { + return fmt.Errorf("Invalid Session TTL '%s': %v", session.TTL, err) + } + s.sessionTimersLock.Lock() + if s.sessionTimers == nil { + // should not happen . . . + panic(fmt.Sprintf("Invalid call to resetSessionTimer before creation of session timers in leaderLoop")) + } + defer s.sessionTimersLock.Unlock() + if t := s.sessionTimers[id]; t != nil { + // TBD may modify the session's active TTL based on load here + t.Reset(ttl * structs.SessionTTLMultiplier) + } else { + s.sessionTimers[session.ID] = time.AfterFunc(ttl*structs.SessionTTLMultiplier, func() { + s.sessionTimers[session.ID].Stop() + args := structs.SessionRequest{ + Datacenter: s.config.Datacenter, + Op: structs.SessionDestroy, + } + args.Session.ID = session.ID + + // Apply the update to destroy the session + _, err := s.raftApply(structs.SessionRequestType, args) + if err != nil { + s.logger.Printf("[ERR] consul.session: Apply failed: %v", err) + } + }) + } + } + return nil +} + +func (s *Server) clearSessionTimer(id string) error { + s.sessionTimersLock.Lock() + defer s.sessionTimersLock.Unlock() + if s.sessionTimers[id] != nil { + // stop the session timer and delete from the map + s.sessionTimers[id].Stop() + delete(s.sessionTimers, id) + } + return nil +} + +func (s *Server) clearAllSessionTimers() error { + s.sessionTimersLock.Lock() + defer s.sessionTimersLock.Unlock() + + // stop all timers and clear out the map + for id, t := range s.sessionTimers { + t.Stop() + delete(s.sessionTimers, id) + } + return nil +} diff --git a/consul/state_store.go b/consul/state_store.go index 0173cff697d8..762d2e75199b 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -294,6 +294,10 @@ func (s *StateStore) initialize() error { AllowBlank: true, Fields: []string{"Node"}, }, + "ttl": &MDBIndex{ + AllowBlank: true, + Fields: []string{"TTL"}, + }, }, Decoder: func(buf []byte) interface{} { out := new(structs.Session) @@ -369,6 +373,7 @@ func (s *StateStore) initialize() error { "KVSListKeys": MDBTables{s.kvsTable}, "SessionGet": MDBTables{s.sessionTable}, "SessionList": MDBTables{s.sessionTable}, + "SessionListTTL": MDBTables{s.sessionTable}, "NodeSessions": MDBTables{s.sessionTable}, "ACLGet": MDBTables{s.aclTable}, "ACLList": MDBTables{s.aclTable}, @@ -1336,6 +1341,17 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error return fmt.Errorf("Invalid Session Behavior setting '%s'", session.Behavior) } + if session.TTL != "" { + ttl, err := time.ParseDuration(session.TTL) + if err != nil { + return fmt.Errorf("Invalid Session TTL '%s': %v", session.TTL, err) + } + + if ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax { + return fmt.Errorf("Invalid Session TTL '%d', must be between [%d-%d] seconds", ttl, structs.SessionTTLMin, structs.SessionTTLMax) + } + } + // Assign the create index session.CreateIndex = index @@ -1445,6 +1461,16 @@ func (s *StateStore) SessionList() (uint64, []*structs.Session, error) { return idx, out, err } +// SessionListTTL is used to list all the open ttl sessions +func (s *StateStore) SessionListTTL() (uint64, []*structs.Session, error) { + idx, res, err := s.sessionTable.Get("ttl") + out := make([]*structs.Session, len(res)) + for i, raw := range res { + out[i] = raw.(*structs.Session) + } + return idx, out, err +} + // NodeSessions is used to list all the open sessions for a node func (s *StateStore) NodeSessions(node string) (uint64, []*structs.Session, error) { idx, res, err := s.sessionTable.Get("node", node) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index ced8567d2645..1d27e0726580 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -385,6 +385,12 @@ const ( SessionKeysDelete = "delete" ) +const ( + SessionTTLMin = 10 * time.Second + SessionTTLMax = 3600 * time.Second + SessionTTLMultiplier = 2 +) + // Session is used to represent an open session in the KV store. // This issued to associate node checks with acquired locks. type Session struct { @@ -395,6 +401,7 @@ type Session struct { Checks []string LockDelay time.Duration Behavior SessionBehavior // What to do when session is invalidated + TTL string } type Sessions []*Session @@ -403,6 +410,7 @@ type SessionOp string const ( SessionCreate SessionOp = "create" SessionDestroy = "destroy" + SessionRenew = "renew" ) // SessionRequest is used to operate on sessions From 624c465e2b164e261925f6398a416fe1a4a4c6b7 Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Wed, 10 Dec 2014 10:02:23 -0500 Subject: [PATCH 2/9] Added more tests. Also added return of 404 if the session id to renew is not found --- command/agent/session_endpoint.go | 5 + command/agent/session_endpoint_test.go | 15 ++- consul/leader_test.go | 3 + consul/session_endpoint_test.go | 127 +++++++++++++++++++++++++ consul/state_store.go | 10 ++ consul/state_store_test.go | 42 +++++--- 6 files changed, 180 insertions(+), 22 deletions(-) diff --git a/command/agent/session_endpoint.go b/command/agent/session_endpoint.go index b9b6c6b97f40..6de01db37881 100644 --- a/command/agent/session_endpoint.go +++ b/command/agent/session_endpoint.go @@ -156,7 +156,12 @@ func (s *HTTPServer) SessionRenew(resp http.ResponseWriter, req *http.Request) ( defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Session.Renew", &args, &out); err != nil { return nil, err + } else if out.Sessions == nil { + resp.WriteHeader(404) + resp.Write([]byte(fmt.Sprintf("Session id '%s' not found", args.Session))) + return nil, nil } + return out.Sessions, nil } diff --git a/command/agent/session_endpoint_test.go b/command/agent/session_endpoint_test.go index a30afc0d0c20..5a45879b6600 100644 --- a/command/agent/session_endpoint_test.go +++ b/command/agent/session_endpoint_test.go @@ -250,11 +250,8 @@ func TestSessionTTL(t *testing.T) { t.Fatalf("err: %v", err) } respObj, ok = obj.(structs.Sessions) - if ok { - t.Fatalf("session '%s' should have been destroyed") - } if len(respObj) != 0 { - t.Fatalf("bad: %v", respObj) + t.Fatalf("session '%s' should have been destroyed", id) } }) } @@ -315,10 +312,10 @@ func TestSessionTTLRenew(t *testing.T) { } respObj, ok = obj.(structs.Sessions) if !ok { - t.Fatalf("session '%s' should have renewed") + t.Fatalf("session '%s' should have renewed", id) } if len(respObj) != 1 { - t.Fatalf("bad: %v", respObj) + t.Fatalf("session '%s' should have renewed", id) } // now wait for timeout and expect session to get destroyed @@ -332,11 +329,11 @@ func TestSessionTTLRenew(t *testing.T) { t.Fatalf("err: %v", err) } respObj, ok = obj.(structs.Sessions) - if ok { - t.Fatalf("session '%s' should have been destroyed") + if !ok { + t.Fatalf("session '%s' should have destroyed", id) } if len(respObj) != 0 { - t.Fatalf("bad: %v", respObj) + t.Fatalf("session '%s' should have destroyed", id) } }) } diff --git a/consul/leader_test.go b/consul/leader_test.go index 75535d56bb8a..1bd910858bda 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -370,6 +370,9 @@ func TestLeader_LeftLeader(t *testing.T) { break } } + if leader == nil { + t.Fatalf("Should have a leader") + } leader.Leave() leader.Shutdown() time.Sleep(100 * time.Millisecond) diff --git a/consul/session_endpoint_test.go b/consul/session_endpoint_test.go index 794975294790..a745d16f1059 100644 --- a/consul/session_endpoint_test.go +++ b/consul/session_endpoint_test.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/consul/testutil" "os" "testing" + "time" ) func TestSessionEndpoint_Apply(t *testing.T) { @@ -223,6 +224,132 @@ func TestSessionEndpoint_List(t *testing.T) { } } +func TestSessionEndpoint_Renew(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + testutil.WaitForLeader(t, client.Call, "dc1") + + s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + ids := []string{} + for i := 0; i < 5; i++ { + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + TTL: "10s", + }, + } + var out string + if err := client.Call("Session.Apply", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + ids = append(ids, out) + } + + getR := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + + var sessions structs.IndexedSessions + if err := client.Call("Session.List", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + + if sessions.Index == 0 { + t.Fatalf("Bad: %v", sessions) + } + if len(sessions.Sessions) != 5 { + t.Fatalf("Bad: %v", sessions.Sessions) + } + for i := 0; i < len(sessions.Sessions); i++ { + s := sessions.Sessions[i] + if !strContains(ids, s.ID) { + t.Fatalf("bad: %v", s) + } + if s.Node != "foo" { + t.Fatalf("bad: %v", s) + } + if s.TTL != "30s" { + t.Fatalf("bad: %v", s) + } + } + + // now sleep for ttl - since internally we use ttl*2 to destroy, this is ok + time.Sleep(10 * time.Second) + + // renew 3 out of 5 sessions + for i := 0; i < 3; i++ { + renewR := structs.SessionSpecificRequest{ + Datacenter: "dc1", + Session: ids[i], + } + var session structs.IndexedSessions + if err := client.Call("Session.Renew", &renewR, &session); err != nil { + t.Fatalf("err: %v", err) + } + + if session.Index == 0 { + t.Fatalf("Bad: %v", session) + } + if len(session.Sessions) != 1 { + t.Fatalf("Bad: %v", session.Sessions) + } + + s := session.Sessions[0] + if !strContains(ids, s.ID) { + t.Fatalf("bad: %v", s) + } + if s.Node != "foo" { + t.Fatalf("bad: %v", s) + } + } + + // now sleep for ttl*2 - 3 sessions should still be alive + time.Sleep(20 * time.Second) + + if err := client.Call("Session.List", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + + if sessions.Index == 0 { + t.Fatalf("Bad: %v", sessions) + } + if len(sessions.Sessions) != 3 { + t.Fatalf("Bad: %v", sessions.Sessions) + } + for i := 0; i < len(sessions.Sessions); i++ { + s := sessions.Sessions[i] + if !strContains(ids, s.ID) { + t.Fatalf("bad: %v", s) + } + if s.Node != "foo" { + t.Fatalf("bad: %v", s) + } + if s.TTL != "30s" { + t.Fatalf("bad: %v", s) + } + } + + // now sleep again for ttl*2 - no sessions should still be alive + time.Sleep(20 * time.Second) + + if err := client.Call("Session.List", &getR, &sessions); err != nil { + t.Fatalf("err: %v", err) + } + + if sessions.Index != 0 { + t.Fatalf("Bad: %v", sessions) + } + if len(sessions.Sessions) != 0 { + t.Fatalf("Bad: %v", sessions.Sessions) + } +} + func TestSessionEndpoint_NodeSessions(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/state_store.go b/consul/state_store.go index 762d2e75199b..f6a9add62d4f 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1825,6 +1825,16 @@ func (s *StateSnapshot) SessionList() ([]*structs.Session, error) { return out, err } +// SessionListTTL is used to list all the open sessions +func (s *StateSnapshot) SessionListTTL() ([]*structs.Session, error) { + res, err := s.store.sessionTable.GetTxn(s.tx, "ttl") + out := make([]*structs.Session, len(res)) + for i, raw := range res { + out[i] = raw.(*structs.Session) + } + return out, err +} + // ACLList is used to list all of the ACLs func (s *StateSnapshot) ACLList() ([]*structs.ACL, error) { res, err := s.store.aclTable.GetTxn(s.tx, "id") diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 0dba19abda2e..210ef24c596d 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -703,13 +703,17 @@ func TestStoreSnapshot(t *testing.T) { if ok, err := store.KVSLock(18, d); err != nil || !ok { t.Fatalf("err: %v", err) } + session = &structs.Session{ID: generateUUID(), Node: "baz", TTL: "60s"} + if err := store.SessionCreate(19, session); err != nil { + t.Fatalf("err: %v", err) + } a1 := &structs.ACL{ ID: generateUUID(), Name: "User token", Type: structs.ACLTypeClient, } - if err := store.ACLSet(19, a1); err != nil { + if err := store.ACLSet(20, a1); err != nil { t.Fatalf("err: %v", err) } @@ -718,7 +722,7 @@ func TestStoreSnapshot(t *testing.T) { Name: "User token", Type: structs.ACLTypeClient, } - if err := store.ACLSet(20, a2); err != nil { + if err := store.ACLSet(21, a2); err != nil { t.Fatalf("err: %v", err) } @@ -730,7 +734,7 @@ func TestStoreSnapshot(t *testing.T) { defer snap.Close() // Check the last nodes - if idx := snap.LastIndex(); idx != 20 { + if idx := snap.LastIndex(); idx != 21 { t.Fatalf("bad: %v", idx) } @@ -785,15 +789,27 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("missing KVS entries!") } - // Check there are 2 sessions + // Check there are 3 sessions sessions, err := snap.SessionList() if err != nil { t.Fatalf("err: %v", err) } - if len(sessions) != 2 { + if len(sessions) != 3 { t.Fatalf("missing sessions") } + // Check there is 1 session with TTL + sessions, err = snap.SessionListTTL() + if err != nil { + t.Fatalf("err: %v", err) + } + + if len(sessions) < 1 { + t.Fatalf("missing TTL session") + } else if len(sessions) > 1 { + t.Fatalf("too many TTL sessions") + } + // Check for an acl acls, err := snap.ACLList() if err != nil { @@ -804,13 +820,13 @@ func TestStoreSnapshot(t *testing.T) { } // Make some changes! - if err := store.EnsureService(21, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { + if err := store.EnsureService(22, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService(22, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { + if err := store.EnsureService(23, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureNode(23, structs.Node{"baz", "127.0.0.3"}); err != nil { + if err := store.EnsureNode(24, structs.Node{"baz", "127.0.0.3"}); err != nil { t.Fatalf("err: %v", err) } checkAfter := &structs.HealthCheck{ @@ -820,16 +836,16 @@ func TestStoreSnapshot(t *testing.T) { Status: structs.HealthCritical, ServiceID: "db", } - if err := store.EnsureCheck(24, checkAfter); err != nil { + if err := store.EnsureCheck(26, checkAfter); err != nil { t.Fatalf("err: %v", err) } - if err := store.KVSDelete(25, "/web/b"); err != nil { + if err := store.KVSDelete(26, "/web/b"); err != nil { t.Fatalf("err: %v", err) } // Nuke an ACL - if err := store.ACLDelete(26, a1.ID); err != nil { + if err := store.ACLDelete(27, a1.ID); err != nil { t.Fatalf("err: %v", err) } @@ -883,12 +899,12 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("missing KVS entries!") } - // Check there are 2 sessions + // Check there are 3 sessions sessions, err = snap.SessionList() if err != nil { t.Fatalf("err: %v", err) } - if len(sessions) != 2 { + if len(sessions) != 3 { t.Fatalf("missing sessions") } From 87e9d855fd072a7eab3e1f02b0efe7d0ae7047ab Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Wed, 10 Dec 2014 16:43:15 -0500 Subject: [PATCH 3/9] Added more tests --- consul/session_endpoint_test.go | 41 ++++++-- consul/session_ttl_test.go | 171 ++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+), 8 deletions(-) create mode 100644 consul/session_ttl_test.go diff --git a/consul/session_endpoint_test.go b/consul/session_endpoint_test.go index a745d16f1059..b3c2a4fd18df 100644 --- a/consul/session_endpoint_test.go +++ b/consul/session_endpoint_test.go @@ -232,6 +232,7 @@ func TestSessionEndpoint_Renew(t *testing.T) { defer client.Close() testutil.WaitForLeader(t, client.Call, "dc1") + TTL := "10s" s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) ids := []string{} @@ -241,7 +242,7 @@ func TestSessionEndpoint_Renew(t *testing.T) { Op: structs.SessionCreate, Session: structs.Session{ Node: "foo", - TTL: "10s", + TTL: TTL, }, } var out string @@ -274,9 +275,10 @@ func TestSessionEndpoint_Renew(t *testing.T) { if s.Node != "foo" { t.Fatalf("bad: %v", s) } - if s.TTL != "30s" { - t.Fatalf("bad: %v", s) + if s.TTL != TTL { + t.Fatalf("bad session TTL: %s %v", s.TTL, s) } + t.Logf("Created session '%s'", s.ID) } // now sleep for ttl - since internally we use ttl*2 to destroy, this is ok @@ -307,10 +309,12 @@ func TestSessionEndpoint_Renew(t *testing.T) { if s.Node != "foo" { t.Fatalf("bad: %v", s) } + + t.Logf("Renewed session '%s'", s.ID) } // now sleep for ttl*2 - 3 sessions should still be alive - time.Sleep(20 * time.Second) + time.Sleep(2 * 10 * time.Second) if err := client.Call("Session.List", &getR, &sessions); err != nil { t.Fatalf("err: %v", err) @@ -319,9 +323,9 @@ func TestSessionEndpoint_Renew(t *testing.T) { if sessions.Index == 0 { t.Fatalf("Bad: %v", sessions) } - if len(sessions.Sessions) != 3 { - t.Fatalf("Bad: %v", sessions.Sessions) - } + + t.Logf("Expect 2 sessions to be destroyed") + for i := 0; i < len(sessions.Sessions); i++ { s := sessions.Sessions[i] if !strContains(ids, s.ID) { @@ -330,9 +334,16 @@ func TestSessionEndpoint_Renew(t *testing.T) { if s.Node != "foo" { t.Fatalf("bad: %v", s) } - if s.TTL != "30s" { + if s.TTL != TTL { t.Fatalf("bad: %v", s) } + if i > 2 { + t.Errorf("session '%s' should be destroyed", s.ID) + } + } + + if len(sessions.Sessions) > 3 { + t.Fatalf("Bad: %v", sessions.Sessions) } // now sleep again for ttl*2 - no sessions should still be alive @@ -346,6 +357,20 @@ func TestSessionEndpoint_Renew(t *testing.T) { t.Fatalf("Bad: %v", sessions) } if len(sessions.Sessions) != 0 { + for i := 0; i < len(sessions.Sessions); i++ { + s := sessions.Sessions[i] + if !strContains(ids, s.ID) { + t.Fatalf("bad: %v", s) + } + if s.Node != "foo" { + t.Fatalf("bad: %v", s) + } + if s.TTL != TTL { + t.Fatalf("bad: %v", s) + } + t.Errorf("session '%s' should be destroyed", s.ID) + } + t.Fatalf("Bad: %v", sessions.Sessions) } } diff --git a/consul/session_ttl_test.go b/consul/session_ttl_test.go new file mode 100644 index 000000000000..c5ca43642ad7 --- /dev/null +++ b/consul/session_ttl_test.go @@ -0,0 +1,171 @@ +package consul + +import ( + "errors" + "fmt" + "os" + "testing" + "time" + + "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" +) + +func TestServer_sessionTTL(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + servers := []*Server{s1, s2, s3} + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.raftPeers.Peers() + return len(peers) == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + // Find the leader + var leader *Server + for _, s := range servers { + // check that s.sessionTimers is empty + if len(s.sessionTimers) != 0 { + t.Fatalf("should have no sessionTimers") + } + // find the leader too + if s.IsLeader() { + leader = s + } + } + + if leader == nil { + t.Fatalf("Should have a leader") + } + + client := rpcClient(t, leader) + defer client.Close() + + leader.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) + + // create a TTL session + arg := structs.SessionRequest{ + Datacenter: "dc1", + Op: structs.SessionCreate, + Session: structs.Session{ + Node: "foo", + TTL: "10s", + }, + } + var id1 string + if err := client.Call("Session.Apply", &arg, &id1); err != nil { + t.Fatalf("err: %v", err) + } + + // check that leader.sessionTimers has the session id in it + // means initializeSessionTimers was called and resetSessionTimer was called + if len(leader.sessionTimers) == 0 || leader.sessionTimers[id1] == nil { + t.Fatalf("sessionTimers not initialized and does not contain session timer for session") + } + + time.Sleep(100 * time.Millisecond) + leader.Leave() + leader.Shutdown() + + // leader.sessionTimers should be empty due to clearAllSessionTimers getting called + if len(leader.sessionTimers) != 0 { + t.Fatalf("session timers should be empty on the shutdown leader") + } + + time.Sleep(100 * time.Millisecond) + + var remain *Server + for _, s := range servers { + if s == leader { + continue + } + remain = s + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.raftPeers.Peers() + return len(peers) == 2, errors.New(fmt.Sprintf("%v", peers)) + }, func(err error) { + t.Fatalf("should have 2 peers: %v", err) + }) + } + + // Verify the old leader is deregistered + state := remain.fsm.State() + testutil.WaitForResult(func() (bool, error) { + _, found, _ := state.GetNode(leader.config.NodeName) + return !found, nil + }, func(err error) { + t.Fatalf("leader should be deregistered") + }) + + // Find the new leader + for _, s := range servers { + // find the leader too + if s.IsLeader() { + leader = s + } + } + + if leader == nil { + t.Fatalf("Should have a new leader") + } + + // check that new leader.sessionTimers has the session id in it + if len(leader.sessionTimers) == 0 || leader.sessionTimers[id1] == nil { + t.Fatalf("sessionTimers not initialized and does not contain session timer for session") + } + + // create another TTL session with the same parameters + var id2 string + if err := client.Call("Session.Apply", &arg, &id2); err != nil { + t.Fatalf("err: %v", err) + } + + if len(leader.sessionTimers) != 2 { + t.Fatalf("sessionTimes length should be 2") + } + + // destroy the id1 session (test clearSessionTimer) + arg.Op = structs.SessionDestroy + arg.Session.ID = id1 + if err := client.Call("Session.Apply", &arg, &id1); err != nil { + t.Fatalf("err: %v", err) + } + + if len(leader.sessionTimers) != 1 { + t.Fatalf("sessionTimers length should 1") + } + + // destroy the id2 session (test clearSessionTimer) + arg.Op = structs.SessionDestroy + arg.Session.ID = id2 + if err := client.Call("Session.Apply", &arg, &id2); err != nil { + t.Fatalf("err: %v", err) + } + + if len(leader.sessionTimers) != 0 { + t.Fatalf("sessionTimers length should be 0") + } +} From 5229f3b44d23873fe202bed684bd8f3ec9ca7f93 Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Wed, 10 Dec 2014 20:49:06 -0500 Subject: [PATCH 4/9] Clean up code based on feedback from armon --- command/agent/session_endpoint.go | 16 +++++- command/agent/session_endpoint_test.go | 19 ++++--- consul/session_endpoint.go | 2 +- consul/session_endpoint_test.go | 46 +++++++++-------- consul/session_ttl.go | 68 ++++++++++++++------------ consul/session_ttl_test.go | 1 + consul/state_store.go | 2 +- consul/structs/structs.go | 1 - 8 files changed, 89 insertions(+), 66 deletions(-) diff --git a/command/agent/session_endpoint.go b/command/agent/session_endpoint.go index 6de01db37881..878a0b5843bf 100644 --- a/command/agent/session_endpoint.go +++ b/command/agent/session_endpoint.go @@ -52,6 +52,21 @@ func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request) resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err))) return nil, nil } + + if args.Session.TTL != "" { + ttl, err := time.ParseDuration(args.Session.TTL) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Request TTL decode failed: %v", err))) + return nil, nil + } + + if ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Request TTL '%s', must be between [%v-%v]", args.Session.TTL, structs.SessionTTLMin, structs.SessionTTLMax))) + return nil, nil + } + } } // Create the session, get the ID @@ -153,7 +168,6 @@ func (s *HTTPServer) SessionRenew(resp http.ResponseWriter, req *http.Request) ( } var out structs.IndexedSessions - defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Session.Renew", &args, &out); err != nil { return nil, err } else if out.Sessions == nil { diff --git a/command/agent/session_endpoint_test.go b/command/agent/session_endpoint_test.go index 5a45879b6600..9d2befb0275d 100644 --- a/command/agent/session_endpoint_test.go +++ b/command/agent/session_endpoint_test.go @@ -216,8 +216,8 @@ func TestSessionDestroy(t *testing.T) { func TestSessionTTL(t *testing.T) { httpTest(t, func(srv *HTTPServer) { - TTL := "30s" - ttl := 30 * time.Second + TTL := "10s" // use the minimum legal ttl + ttl := 10 * time.Second id := makeTestSessionTTL(t, srv, TTL) @@ -258,8 +258,8 @@ func TestSessionTTL(t *testing.T) { func TestSessionTTLRenew(t *testing.T) { httpTest(t, func(srv *HTTPServer) { - TTL := "30s" - ttl := 30 * time.Second + TTL := "10s" // use the minimum legal ttl + ttl := 10 * time.Second id := makeTestSessionTTL(t, srv, TTL) @@ -281,8 +281,8 @@ func TestSessionTTLRenew(t *testing.T) { t.Fatalf("Incorrect TTL: %s", respObj[0].TTL) } - // Sleep for 45s (since internal effective ttl is really 60s when 30s is specified) - time.Sleep(45 * time.Second) + // Sleep to consume some time before renew + time.Sleep(ttl * (structs.SessionTTLMultiplier / 2)) req, err = http.NewRequest("PUT", "/v1/session/renew/"+id, nil) @@ -299,9 +299,8 @@ func TestSessionTTLRenew(t *testing.T) { t.Fatalf("bad: %v", respObj) } - // Sleep for another 45s (since effective ttl is ttl*2, meaning 60s) if renew - // didn't work, session would have got deleted - time.Sleep(45 * time.Second) + // Sleep for ttl * TTL Multiplier + time.Sleep(ttl * structs.SessionTTLMultiplier) req, err = http.NewRequest("GET", "/v1/session/info/"+id, nil) @@ -319,7 +318,7 @@ func TestSessionTTLRenew(t *testing.T) { } // now wait for timeout and expect session to get destroyed - time.Sleep(ttl * 2) + time.Sleep(ttl * structs.SessionTTLMultiplier) req, err = http.NewRequest("GET", "/v1/session/info/"+id, nil) diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go index b24e71742929..98728d8a6a99 100644 --- a/consul/session_endpoint.go +++ b/consul/session_endpoint.go @@ -43,7 +43,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { } if ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax { - return fmt.Errorf("Invalid Session TTL '%d', must be between [%d-%d] seconds", ttl, structs.SessionTTLMin, structs.SessionTTLMax) + return fmt.Errorf("Invalid Session TTL '%d', must be between [%v=%v]", ttl, structs.SessionTTLMin, structs.SessionTTLMax) } } diff --git a/consul/session_endpoint_test.go b/consul/session_endpoint_test.go index b3c2a4fd18df..635d2479b21c 100644 --- a/consul/session_endpoint_test.go +++ b/consul/session_endpoint_test.go @@ -232,7 +232,8 @@ func TestSessionEndpoint_Renew(t *testing.T) { defer client.Close() testutil.WaitForLeader(t, client.Call, "dc1") - TTL := "10s" + TTL := "10s" // the minimum allowed ttl + ttl := 10 * time.Second s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) ids := []string{} @@ -281,8 +282,8 @@ func TestSessionEndpoint_Renew(t *testing.T) { t.Logf("Created session '%s'", s.ID) } - // now sleep for ttl - since internally we use ttl*2 to destroy, this is ok - time.Sleep(10 * time.Second) + // Sleep for time shorter than internal destroy ttl + time.Sleep(ttl * structs.SessionTTLMultiplier / 2) // renew 3 out of 5 sessions for i := 0; i < 3; i++ { @@ -313,21 +314,23 @@ func TestSessionEndpoint_Renew(t *testing.T) { t.Logf("Renewed session '%s'", s.ID) } - // now sleep for ttl*2 - 3 sessions should still be alive - time.Sleep(2 * 10 * time.Second) + // now sleep for 2/3 the internal destroy TTL time for renewed sessions + // which is more than the internal destroy TTL time for the non-renewed sessions + time.Sleep((ttl * structs.SessionTTLMultiplier) * 2.0 / 3.0) - if err := client.Call("Session.List", &getR, &sessions); err != nil { + var sessionsL1 structs.IndexedSessions + if err := client.Call("Session.List", &getR, &sessionsL1); err != nil { t.Fatalf("err: %v", err) } - if sessions.Index == 0 { - t.Fatalf("Bad: %v", sessions) + if sessionsL1.Index == 0 { + t.Fatalf("Bad: %v", sessionsL1) } t.Logf("Expect 2 sessions to be destroyed") - for i := 0; i < len(sessions.Sessions); i++ { - s := sessions.Sessions[i] + for i := 0; i < len(sessionsL1.Sessions); i++ { + s := sessionsL1.Sessions[i] if !strContains(ids, s.ID) { t.Fatalf("bad: %v", s) } @@ -342,23 +345,24 @@ func TestSessionEndpoint_Renew(t *testing.T) { } } - if len(sessions.Sessions) > 3 { - t.Fatalf("Bad: %v", sessions.Sessions) + if len(sessionsL1.Sessions) > 3 { + t.Fatalf("Bad: %v", sessionsL1.Sessions) } // now sleep again for ttl*2 - no sessions should still be alive - time.Sleep(20 * time.Second) + time.Sleep(ttl * structs.SessionTTLMultiplier) - if err := client.Call("Session.List", &getR, &sessions); err != nil { + var sessionsL2 structs.IndexedSessions + if err := client.Call("Session.List", &getR, &sessionsL2); err != nil { t.Fatalf("err: %v", err) } - if sessions.Index != 0 { - t.Fatalf("Bad: %v", sessions) + if sessionsL2.Index == 0 { + t.Fatalf("Bad: %v", sessionsL2) } - if len(sessions.Sessions) != 0 { - for i := 0; i < len(sessions.Sessions); i++ { - s := sessions.Sessions[i] + if len(sessionsL2.Sessions) != 0 { + for i := 0; i < len(sessionsL2.Sessions); i++ { + s := sessionsL2.Sessions[i] if !strContains(ids, s.ID) { t.Fatalf("bad: %v", s) } @@ -370,8 +374,8 @@ func TestSessionEndpoint_Renew(t *testing.T) { } t.Errorf("session '%s' should be destroyed", s.ID) } - - t.Fatalf("Bad: %v", sessions.Sessions) + + t.Fatalf("Bad: %v", sessionsL2.Sessions) } } diff --git a/consul/session_ttl.go b/consul/session_ttl.go index 646d02ed70ed..818573aeb09d 100644 --- a/consul/session_ttl.go +++ b/consul/session_ttl.go @@ -38,37 +38,42 @@ func (s *Server) resetSessionTimer(id string, session *structs.Session) error { } } - if session.TTL != "" { - ttl, err := time.ParseDuration(session.TTL) - if err != nil { - return fmt.Errorf("Invalid Session TTL '%s': %v", session.TTL, err) - } - s.sessionTimersLock.Lock() - if s.sessionTimers == nil { - // should not happen . . . - panic(fmt.Sprintf("Invalid call to resetSessionTimer before creation of session timers in leaderLoop")) - } - defer s.sessionTimersLock.Unlock() - if t := s.sessionTimers[id]; t != nil { - // TBD may modify the session's active TTL based on load here - t.Reset(ttl * structs.SessionTTLMultiplier) - } else { - s.sessionTimers[session.ID] = time.AfterFunc(ttl*structs.SessionTTLMultiplier, func() { - s.sessionTimers[session.ID].Stop() - args := structs.SessionRequest{ - Datacenter: s.config.Datacenter, - Op: structs.SessionDestroy, - } - args.Session.ID = session.ID + if session.TTL == "" { + return nil + } - // Apply the update to destroy the session - _, err := s.raftApply(structs.SessionRequestType, args) - if err != nil { - s.logger.Printf("[ERR] consul.session: Apply failed: %v", err) - } - }) - } + ttl, err := time.ParseDuration(session.TTL) + if err != nil { + return fmt.Errorf("Invalid Session TTL '%s': %v", session.TTL, err) } + if ttl == 0 { + return nil + } + + s.sessionTimersLock.Lock() + if s.sessionTimers == nil { + s.sessionTimers = make(map[string]*time.Timer) + } + defer s.sessionTimersLock.Unlock() + if t := s.sessionTimers[id]; t != nil { + // TBD may modify the session's active TTL based on load here + t.Reset(ttl * structs.SessionTTLMultiplier) + } else { + s.sessionTimers[session.ID] = time.AfterFunc(ttl*structs.SessionTTLMultiplier, func() { + args := structs.SessionRequest{ + Datacenter: s.config.Datacenter, + Op: structs.SessionDestroy, + } + args.Session.ID = session.ID + + // Apply the update to destroy the session + _, err := s.raftApply(structs.SessionRequestType, args) + if err != nil { + s.logger.Printf("[ERR] consul.session: Apply failed: %v", err) + } + }) + } + return nil } @@ -80,6 +85,7 @@ func (s *Server) clearSessionTimer(id string) error { s.sessionTimers[id].Stop() delete(s.sessionTimers, id) } + s.sessionTimers = nil return nil } @@ -88,9 +94,9 @@ func (s *Server) clearAllSessionTimers() error { defer s.sessionTimersLock.Unlock() // stop all timers and clear out the map - for id, t := range s.sessionTimers { + for _, t := range s.sessionTimers { t.Stop() - delete(s.sessionTimers, id) } + s.sessionTimers = nil return nil } diff --git a/consul/session_ttl_test.go b/consul/session_ttl_test.go index c5ca43642ad7..e4db812e3212 100644 --- a/consul/session_ttl_test.go +++ b/consul/session_ttl_test.go @@ -121,6 +121,7 @@ func TestServer_sessionTTL(t *testing.T) { }) // Find the new leader + leader = nil for _, s := range servers { // find the leader too if s.IsLeader() { diff --git a/consul/state_store.go b/consul/state_store.go index f6a9add62d4f..4dd8564c4c48 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1348,7 +1348,7 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error } if ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax { - return fmt.Errorf("Invalid Session TTL '%d', must be between [%d-%d] seconds", ttl, structs.SessionTTLMin, structs.SessionTTLMax) + return fmt.Errorf("Invalid Session TTL '%s', must be between [%v-%v]", session.TTL, structs.SessionTTLMin, structs.SessionTTLMax) } } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 1d27e0726580..2072780f3696 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -410,7 +410,6 @@ type SessionOp string const ( SessionCreate SessionOp = "create" SessionDestroy = "destroy" - SessionRenew = "renew" ) // SessionRequest is used to operate on sessions From 3821f3c57bf0c0bc72f0e9c3bc3efea344a1316b Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Wed, 10 Dec 2014 20:53:05 -0500 Subject: [PATCH 5/9] Took out StateSnapshot SessionListTTL also --- consul/state_store.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 4dd8564c4c48..72538bb783e4 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1825,16 +1825,6 @@ func (s *StateSnapshot) SessionList() ([]*structs.Session, error) { return out, err } -// SessionListTTL is used to list all the open sessions -func (s *StateSnapshot) SessionListTTL() ([]*structs.Session, error) { - res, err := s.store.sessionTable.GetTxn(s.tx, "ttl") - out := make([]*structs.Session, len(res)) - for i, raw := range res { - out[i] = raw.(*structs.Session) - } - return out, err -} - // ACLList is used to list all of the ACLs func (s *StateSnapshot) ACLList() ([]*structs.ACL, error) { res, err := s.store.aclTable.GetTxn(s.tx, "id") From 0f9723e6f83ad8b37f61cc028cc01c6d9ad0ee82 Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Wed, 10 Dec 2014 21:04:09 -0500 Subject: [PATCH 6/9] Remove hardcoded wait time in session TTL tests --- command/agent/session_endpoint_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/command/agent/session_endpoint_test.go b/command/agent/session_endpoint_test.go index 9d2befb0275d..86bb7243bd10 100644 --- a/command/agent/session_endpoint_test.go +++ b/command/agent/session_endpoint_test.go @@ -239,8 +239,7 @@ func TestSessionTTL(t *testing.T) { t.Fatalf("Incorrect TTL: %s", respObj[0].TTL) } - // now wait for timeout, it is really 2*TTL, so wait 3*TTL - time.Sleep(ttl * 3) + time.Sleep(ttl * structs.SessionTTLMultiplier + ttl) req, err = http.NewRequest("GET", "/v1/session/info/"+id, nil) From 60915629f6143402bb2ec5ee825abb9a1bcbc056 Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Wed, 10 Dec 2014 21:37:06 -0500 Subject: [PATCH 7/9] Took out usage of snapshot SessionListTTL --- command/agent/session_endpoint_test.go | 2 +- consul/state_store_test.go | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/command/agent/session_endpoint_test.go b/command/agent/session_endpoint_test.go index 86bb7243bd10..1843bcb83757 100644 --- a/command/agent/session_endpoint_test.go +++ b/command/agent/session_endpoint_test.go @@ -239,7 +239,7 @@ func TestSessionTTL(t *testing.T) { t.Fatalf("Incorrect TTL: %s", respObj[0].TTL) } - time.Sleep(ttl * structs.SessionTTLMultiplier + ttl) + time.Sleep(ttl*structs.SessionTTLMultiplier + ttl) req, err = http.NewRequest("GET", "/v1/session/info/"+id, nil) diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 210ef24c596d..c933dbdb0d94 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -798,16 +798,14 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("missing sessions") } - // Check there is 1 session with TTL - sessions, err = snap.SessionListTTL() - if err != nil { - t.Fatalf("err: %v", err) + ttls := 0 + for _, session := range sessions { + if session.TTL != "" { + ttls++ + } } - - if len(sessions) < 1 { - t.Fatalf("missing TTL session") - } else if len(sessions) > 1 { - t.Fatalf("too many TTL sessions") + if ttls != 1 { + t.Fatalf("Wrong number of sessions with TTL") } // Check for an acl From ac540100272b41a8de969f180422e3fddb148c41 Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Thu, 11 Dec 2014 05:34:31 -0500 Subject: [PATCH 8/9] Fixed clearSessionTimer, created invalidateSession, added invalid TTL test --- command/agent/session_endpoint_test.go | 78 ++++++++++++++++++++++++++ consul/session_ttl.go | 28 +++++---- 2 files changed, 94 insertions(+), 12 deletions(-) diff --git a/command/agent/session_endpoint_test.go b/command/agent/session_endpoint_test.go index 1843bcb83757..edfa074402f4 100644 --- a/command/agent/session_endpoint_test.go +++ b/command/agent/session_endpoint_test.go @@ -255,6 +255,84 @@ func TestSessionTTL(t *testing.T) { }) } +func TestSessionBadTTL(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + badTTL := "10z" + + // Create Session with illegal TTL + body := bytes.NewBuffer(nil) + enc := json.NewEncoder(body) + raw := map[string]interface{}{ + "TTL": badTTL, + } + enc.Encode(raw) + + req, err := http.NewRequest("PUT", "/v1/session/create", body) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := httptest.NewRecorder() + obj, err := srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if obj != nil { + t.Fatalf("illegal TTL '%s' allowed", badTTL) + } + if resp.Code != 400 { + t.Fatalf("Bad response code, should be 400") + } + + // less than SessionTTLMin + body = bytes.NewBuffer(nil) + enc = json.NewEncoder(body) + raw = map[string]interface{}{ + "TTL": "5s", + } + enc.Encode(raw) + + req, err = http.NewRequest("PUT", "/v1/session/create", body) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = httptest.NewRecorder() + obj, err = srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if obj != nil { + t.Fatalf("illegal TTL '%s' allowed", badTTL) + } + if resp.Code != 400 { + t.Fatalf("Bad response code, should be 400") + } + + // more than SessionTTLMax + body = bytes.NewBuffer(nil) + enc = json.NewEncoder(body) + raw = map[string]interface{}{ + "TTL": "4000s", + } + enc.Encode(raw) + + req, err = http.NewRequest("PUT", "/v1/session/create", body) + if err != nil { + t.Fatalf("err: %v", err) + } + resp = httptest.NewRecorder() + obj, err = srv.SessionCreate(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + if obj != nil { + t.Fatalf("illegal TTL '%s' allowed", badTTL) + } + if resp.Code != 400 { + t.Fatalf("Bad response code, should be 400") + } + }) +} + func TestSessionTTLRenew(t *testing.T) { httpTest(t, func(srv *HTTPServer) { TTL := "10s" // use the minimum legal ttl diff --git a/consul/session_ttl.go b/consul/session_ttl.go index 818573aeb09d..06c572be2591 100644 --- a/consul/session_ttl.go +++ b/consul/session_ttl.go @@ -26,6 +26,21 @@ func (s *Server) initializeSessionTimers() error { return nil } +// invalidate the session when timer expires, called by AfterFunc +func (s *Server) invalidateSession(id string) { + args := structs.SessionRequest{ + Datacenter: s.config.Datacenter, + Op: structs.SessionDestroy, + } + args.Session.ID = id + + // Apply the update to destroy the session + _, err := s.raftApply(structs.SessionRequestType, args) + if err != nil { + s.logger.Printf("[ERR] consul.session: Apply failed: %v", err) + } +} + func (s *Server) resetSessionTimer(id string, session *structs.Session) error { if session == nil { var err error @@ -60,17 +75,7 @@ func (s *Server) resetSessionTimer(id string, session *structs.Session) error { t.Reset(ttl * structs.SessionTTLMultiplier) } else { s.sessionTimers[session.ID] = time.AfterFunc(ttl*structs.SessionTTLMultiplier, func() { - args := structs.SessionRequest{ - Datacenter: s.config.Datacenter, - Op: structs.SessionDestroy, - } - args.Session.ID = session.ID - - // Apply the update to destroy the session - _, err := s.raftApply(structs.SessionRequestType, args) - if err != nil { - s.logger.Printf("[ERR] consul.session: Apply failed: %v", err) - } + s.invalidateSession(session.ID) }) } @@ -85,7 +90,6 @@ func (s *Server) clearSessionTimer(id string) error { s.sessionTimers[id].Stop() delete(s.sessionTimers, id) } - s.sessionTimers = nil return nil } From 31e461e10be8412f35512358e76351f580bbbd10 Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Thu, 11 Dec 2014 06:09:53 -0500 Subject: [PATCH 9/9] Add invalidateSession test --- consul/session_ttl_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/consul/session_ttl_test.go b/consul/session_ttl_test.go index e4db812e3212..d26264f03649 100644 --- a/consul/session_ttl_test.go +++ b/consul/session_ttl_test.go @@ -148,12 +148,8 @@ func TestServer_sessionTTL(t *testing.T) { t.Fatalf("sessionTimes length should be 2") } - // destroy the id1 session (test clearSessionTimer) - arg.Op = structs.SessionDestroy - arg.Session.ID = id1 - if err := client.Call("Session.Apply", &arg, &id1); err != nil { - t.Fatalf("err: %v", err) - } + // destroy the via invalidateSession as if on TTL expiry + leader.invalidateSession(id2) if len(leader.sessionTimers) != 1 { t.Fatalf("sessionTimers length should 1")