Skip to content

Commit

Permalink
Retry heartbeat after initial service registration (#247)
Browse files Browse the repository at this point in the history
Sending a heartbeat to a not-yet registered service fails on both the
`discovery/consul` and `discovery/etcd` backends. Previously we'd catch
the error, register the service, and return so that we'd send a passing
heartbeat on the next health check poll. This change retries the
heartbeat on the newly registered service so that we become healthy more
quickly.
  • Loading branch information
tgross authored Nov 29, 2016
1 parent d93c156 commit 230f651
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 21 deletions.
7 changes: 7 additions & 0 deletions discovery/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,16 @@ func (c *Consul) SendHeartbeat(service *discovery.ServiceDefinition) {
log.Infof("%v\nService not registered, registering...", err)
if err = c.registerService(*service); err != nil {
log.Warnf("Service registration failed: %s", err)
return
}
if err = c.registerCheck(*service); err != nil {
log.Warnf("Check registration failed: %s", err)
return
}
// now that we're ensured we're registered, we can push the
// heartbeat again
if err := c.Agent().PassTTL(service.ID, "ok"); err != nil {
log.Errorf("Failed to write heartbeat: %s", err)
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions discovery/consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,9 @@ func TestConsulTTLPass(t *testing.T) {
consul, service := setupConsul("service-TestConsulTTLPass")
id := service.ID

consul.SendHeartbeat(service) // force registration
consul.SendHeartbeat(service) // force registration and 1st heartbeat
checks, _ := consul.Agent().Checks()
check := checks[id]
if check.Status != "critical" {
t.Fatalf("status of check %s should be 'critical' but is %s", id, check.Status)
}

consul.SendHeartbeat(service) // write TTL and verify
checks, _ = consul.Agent().Checks()
check = checks[id]
if check.Status != "passing" {
t.Fatalf("status of check %s should be 'passing' but is %s", id, check.Status)
}
Expand Down
6 changes: 6 additions & 0 deletions discovery/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (c *Etcd) SendHeartbeat(service *discovery.ServiceDefinition) {
log.Infof("Service not registered, registering...")
if err := c.registerService(service); err != nil {
log.Warnf("Error registering service %s: %s", service.Name, err)
return
}
// now that we're ensured we're registered, we can push the
// heartbeat again
if err := c.updateServiceTTL(service); err != nil {
log.Errorf("Failed to write heartbeat: %s", err)
}
}
}
Expand Down
33 changes: 20 additions & 13 deletions discovery/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,16 @@ func TestEtcdParseStringEndpoints(t *testing.T) {
}
}

func TestEtcdTTLPass(t *testing.T) {
func TestEtcdTTLExpires(t *testing.T) {
etcd, service := setupEtcd("service-TestEtcdTTLPass")
id := service.ID

etcd.SendHeartbeat(service) // force registration
if !checkServiceExists(etcd, service) {
t.Fatalf("Expected service %s to be registered, but was not", id)
}

etcd.SendHeartbeat(service) // write TTL and verify
if !checkServiceExists(etcd, service) {
t.Fatalf("Expected service %s to be registered, but was not", id)
etcd.SendHeartbeat(service) // force registration and TTL
if !checkServiceHealthy(etcd, service) {
t.Fatalf("Expected service %s to be registered and healthy, but was not", id)
}

time.Sleep(2 * time.Second)

time.Sleep(2 * time.Second) // wait for TTL to expire
if checkServiceExists(etcd, service) {
t.Fatalf("Expected service %s to be deregistered registered", id)
}
Expand Down Expand Up @@ -90,8 +84,7 @@ func TestEtcdCheckForChanges(t *testing.T) {
if etcd.CheckForUpstreamChanges(backend, "") {
t.Fatalf("First read of %s should show `false` for change", id)
}
etcd.SendHeartbeat(service) // force registration
etcd.SendHeartbeat(service) // write TTL
etcd.SendHeartbeat(service) // force registration and TTL

if !etcd.CheckForUpstreamChanges(backend, "") {
t.Errorf("%v should have changed after first health check TTL", id)
Expand All @@ -114,3 +107,17 @@ func checkServiceExists(etcd *Etcd, service *discovery.ServiceDefinition) bool {
}
return true
}

func checkServiceHealthy(etcd *Etcd, service *discovery.ServiceDefinition) bool {
key := etcd.getNodeKey(service)
if resp, err := etcd.API.Get(context.Background(), key, nil); err != nil {
if etcdErr, ok := err.(client.Error); ok {
return etcdErr.Code != client.ErrorCodeKeyNotFound
}
} else {
if len(resp.Node.Nodes) == 1 {
return true
}
}
return false
}

0 comments on commit 230f651

Please sign in to comment.