Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ring.Lifecycler: Handle when previous ring state is leaving and when the number of tokens has changed #287

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,4 @@
* [BUGFIX] Memberlist: fix crash when methods from `memberlist.Delegate` interface are called on `*memberlist.KV` before the service is fully started. #244
* [BUGFIX] runtimeconfig: Fix `runtime_config_last_reload_successful` metric after recovery from bad config with exact same config hash as before. #262
* [BUGFIX] Ring status page: fixed the owned tokens percentage value displayed. #282
* [BUGFIX] ring.Lifecycler: Handle when previous ring state is leaving and the number of tokens has changed. #79
48 changes: 43 additions & 5 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,11 +616,49 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
return ringDesc, true, nil
}

// If the ingester failed to clean its ring entry up in can leave its state in LEAVING
// OR unregister_on_shutdown=false
// Move it into ACTIVE to ensure the ingester joins the ring.
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) == i.cfg.NumTokens {
instanceDesc.State = ACTIVE
if instanceDesc.State == LEAVING {
var tokens Tokens = instanceDesc.Tokens // way of forcing tokens to be of type Tokens instead of []uint32.
setIsActive := true
if len(instanceDesc.Tokens) != i.cfg.NumTokens {
level.Debug(i.logger).Log("msg", "existing entry has different number of tokens", "existingTokens", len(instanceDesc.Tokens), "newTokens", i.cfg.NumTokens)
if len(tokensFromFile) > 0 {
level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokensFromFile))
if len(tokensFromFile) > i.cfg.NumTokens {
tokens = tokensFromFile
} else {
needTokens := i.cfg.NumTokens - len(tokensFromFile)
newTokens := GenerateTokens(needTokens, ringDesc.GetTokens())
tokens = append(instanceDesc.Tokens, newTokens...)
sort.Sort(tokens)
}
} else if i.cfg.NumTokens > len(instanceDesc.Tokens) {
needTokens := i.cfg.NumTokens - len(instanceDesc.Tokens)
level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones in addition to those of existing instance", "newTokens", needTokens)
newTokens := GenerateTokens(needTokens, ringDesc.GetTokens())
tokens = append(instanceDesc.Tokens, newTokens...)
sort.Sort(tokens)
} else {
level.Debug(i.logger).Log("msg", "no tokens in file, adopting a subset of existing instance's tokens", "numTokens", i.cfg.NumTokens)
tokens = instanceDesc.Tokens[0:i.cfg.NumTokens]
}
} else {
level.Debug(i.logger).Log("msg", "adopting tokens of existing instance")
}

if setIsActive {
level.Debug(i.logger).Log("msg", "switching state to active")
i.setState(ACTIVE)
} else {
level.Debug(i.logger).Log("msg", "not switching state to active", "state", i.GetState())
}

i.setTokens(tokens)
instanceDesc.State = i.GetState()
instanceDesc.Tokens = tokens
} else {
// We exist in the ring and not in leaving state, so assume the ring is right and copy tokens & state out of there.
i.setState(instanceDesc.State)
i.setTokens(instanceDesc.Tokens)
}

// We're taking over this entry, update instanceDesc with our values
Expand Down
261 changes: 261 additions & 0 deletions ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package ring

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -237,6 +240,242 @@ func TestLifecycler_InstancesInZoneCount(t *testing.T) {
}
}

// Test Lifecycler when increasing tokens and instance is already in the ring in leaving state.
func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) {
ctx := context.Background()

ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, r))
})

tokenDir := t.TempDir()
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
// Make sure changes are applied instantly
lifecyclerConfig.HeartbeatPeriod = 0
lifecyclerConfig.NumTokens = 128
lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens")

// Simulate ingester with 64 tokens left the ring in LEAVING state
err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := NewDesc()
addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6)
if err != nil {
return nil, false, err
}

ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(64, nil), LEAVING, time.Now())
return ringDesc, true, nil
})
require.NoError(t, err)

// Start ingester with increased number of tokens
l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, l))
})

// Verify ingester joined, is active, and has 128 tokens
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
require.True(t, ok)
ingDesc := desc.Ingesters["ing1"]
t.Log("Polling for new ingester to have become active with 128 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens))
return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 128
})
}

// Test Lifecycler when decreasing tokens and instance is already in the ring in leaving state.
func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) {
ctx := context.Background()

ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, r))
})

tokenDir := t.TempDir()
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
// Make sure changes are applied instantly
lifecyclerConfig.HeartbeatPeriod = 0
lifecyclerConfig.NumTokens = 64
lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens")

// Simulate ingester with 128 tokens left the ring in LEAVING state
err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := NewDesc()
addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6)
if err != nil {
return nil, false, err
}

ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(128, nil), LEAVING, time.Now())
return ringDesc, true, nil
})
require.NoError(t, err)

// Start ingester with decreased number of tokens
l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, l))
})

// Verify ingester joined, is active, and has 64 tokens
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
require.True(t, ok)
ingDesc := desc.Ingesters["ing1"]
t.Log("Polling for new ingester to have become active with 64 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens))
return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 64
})
}

// Test Lifecycler when increasing tokens and instance is already in the ring in leaving state and tokensFromFile is not empty.
func TestLifecycler_IncreasingTokensLeavingInstanceInTheRingAndTokensFromFileNotEmpty(t *testing.T) {
ctx := context.Background()

ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, r))
})

tokenDir := t.TempDir()
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
// Make sure changes are applied instantly
lifecyclerConfig.HeartbeatPeriod = 0
lifecyclerConfig.NumTokens = 128
f, _ := os.Create(filepath.Join(tokenDir, "/tokens"))
_, _ = f.WriteString(GenerateTokenJSONFile(64))
lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens")

// Simulate ingester with 64 tokens left the ring in LEAVING state
err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := NewDesc()
addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6)
if err != nil {
return nil, false, err
}

ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(64, nil), LEAVING, time.Now())
return ringDesc, true, nil
})
require.NoError(t, err)

// Start ingester with increased number of tokens
l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, l))
})

// Verify ingester joined, is active, and has 128 tokens
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
require.True(t, ok)
ingDesc := desc.Ingesters["ing1"]
t.Log("Polling for new ingester to have become active with 128 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens))
return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 128
})
}

// Test Lifecycler when decreasing tokens and instance is already in the ring in leaving state.
func TestLifecycler_DecreasingTokensLeavingInstanceInTheRingAndTokensFromFileNotEmpty(t *testing.T) {
ctx := context.Background()

ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore
r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, r))
})

tokenDir := t.TempDir()
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
// Make sure changes are applied instantly
lifecyclerConfig.HeartbeatPeriod = 0
lifecyclerConfig.NumTokens = 64
f, _ := os.Create(filepath.Join(tokenDir, "/tokens"))
_, _ = f.WriteString(GenerateTokenJSONFile(128))
lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens")

// Simulate ingester with 128 tokens left the ring in LEAVING state
err = r.KVClient.CAS(ctx, ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := NewDesc()
addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil, lifecyclerConfig.EnableInet6)
if err != nil {
return nil, false, err
}

ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(64, nil), LEAVING, time.Now())
return ringDesc, true, nil
})
require.NoError(t, err)

// Start ingester with decreased number of tokens
l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, l))
})

// Verify ingester joined, is active, and has 64 tokens
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
require.True(t, ok)
ingDesc := desc.Ingesters["ing1"]
t.Log("Polling for new ingester to have become active with 64 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens))
return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 64
})
}

func TestLifecycler_ZonesCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down Expand Up @@ -1164,3 +1403,25 @@ func TestDefaultFinalSleepValue(t *testing.T) {
assert.Equal(t, time.Minute, cfg.FinalSleep)
})
}

// GenerateTokenJsonFile generates a temp jsonTokenFile with given number of tokens upto 128
func GenerateTokenJSONFile(tokens int) string {
type TokenFile struct {
Tokens []int32 `json:"tokens"`
}
if tokens < 0 {
tokens = 0
}
if tokens > 128 {
tokens = 128
}
var tokenFile TokenFile
for tokens >= 1 {
tokenFile = TokenFile{
Tokens: append(tokenFile.Tokens, rand.Int31()),
}
tokens--
}
file, _ := json.Marshal(&tokenFile)
return string(file)
}
3 changes: 1 addition & 2 deletions runutil/runutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package runutil
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -59,7 +58,7 @@ func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ..
// dir except for the ignoreDirs directories.
// NOTE: DeleteAll is not idempotent.
func DeleteAll(dir string, ignoreDirs ...string) error {
entries, err := ioutil.ReadDir(dir)
entries, err := os.ReadDir(dir)
if os.IsNotExist(err) {
return nil
}
Expand Down