Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry heartbeat after initial service registration #247

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions discovery/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,16 @@ func (c *Consul) SendHeartbeat(service *discovery.ServiceDefinition) {
log.Infof("%v\nService not registered, registering...", err)
if err = c.registerService(*service); err != nil {
log.Warnf("Service registration failed: %s", err)
return
}
if err = c.registerCheck(*service); err != nil {
log.Warnf("Check registration failed: %s", err)
return
}
// now that we're ensured we're registered, we can push the
// heartbeat again
if err := c.Agent().PassTTL(service.ID, "ok"); err != nil {
log.Errorf("Failed to write heartbeat: %s", err)
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions discovery/consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,9 @@ func TestConsulTTLPass(t *testing.T) {
consul, service := setupConsul("service-TestConsulTTLPass")
id := service.ID

consul.SendHeartbeat(service) // force registration
consul.SendHeartbeat(service) // force registration and 1st heartbeat
checks, _ := consul.Agent().Checks()
check := checks[id]
if check.Status != "critical" {
t.Fatalf("status of check %s should be 'critical' but is %s", id, check.Status)
}

consul.SendHeartbeat(service) // write TTL and verify
checks, _ = consul.Agent().Checks()
check = checks[id]
if check.Status != "passing" {
t.Fatalf("status of check %s should be 'passing' but is %s", id, check.Status)
}
Expand Down
6 changes: 6 additions & 0 deletions discovery/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (c *Etcd) SendHeartbeat(service *discovery.ServiceDefinition) {
log.Infof("Service not registered, registering...")
if err := c.registerService(service); err != nil {
log.Warnf("Error registering service %s: %s", service.Name, err)
return
}
// now that we're ensured we're registered, we can push the
// heartbeat again
if err := c.updateServiceTTL(service); err != nil {
log.Errorf("Failed to write heartbeat: %s", err)
}
}
}
Expand Down
33 changes: 20 additions & 13 deletions discovery/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,16 @@ func TestEtcdParseStringEndpoints(t *testing.T) {
}
}

func TestEtcdTTLPass(t *testing.T) {
func TestEtcdTTLExpires(t *testing.T) {
etcd, service := setupEtcd("service-TestEtcdTTLPass")
id := service.ID

etcd.SendHeartbeat(service) // force registration
if !checkServiceExists(etcd, service) {
t.Fatalf("Expected service %s to be registered, but was not", id)
}

etcd.SendHeartbeat(service) // write TTL and verify
if !checkServiceExists(etcd, service) {
t.Fatalf("Expected service %s to be registered, but was not", id)
etcd.SendHeartbeat(service) // force registration and TTL
if !checkServiceHealthy(etcd, service) {
t.Fatalf("Expected service %s to be registered and healthy, but was not", id)
}

time.Sleep(2 * time.Second)

time.Sleep(2 * time.Second) // wait for TTL to expire
if checkServiceExists(etcd, service) {
t.Fatalf("Expected service %s to be deregistered registered", id)
}
Expand Down Expand Up @@ -90,8 +84,7 @@ func TestEtcdCheckForChanges(t *testing.T) {
if etcd.CheckForUpstreamChanges(backend, "") {
t.Fatalf("First read of %s should show `false` for change", id)
}
etcd.SendHeartbeat(service) // force registration
etcd.SendHeartbeat(service) // write TTL
etcd.SendHeartbeat(service) // force registration and TTL

if !etcd.CheckForUpstreamChanges(backend, "") {
t.Errorf("%v should have changed after first health check TTL", id)
Expand All @@ -114,3 +107,17 @@ func checkServiceExists(etcd *Etcd, service *discovery.ServiceDefinition) bool {
}
return true
}

func checkServiceHealthy(etcd *Etcd, service *discovery.ServiceDefinition) bool {
key := etcd.getNodeKey(service)
if resp, err := etcd.API.Get(context.Background(), key, nil); err != nil {
if etcdErr, ok := err.(client.Error); ok {
return etcdErr.Code != client.ErrorCodeKeyNotFound
}
} else {
if len(resp.Node.Nodes) == 1 {
return true
}
}
return false
}