Skip to content

Commit

Permalink
test: added e2e test case for issue 14571: etcd doesn't load auth inf…
Browse files Browse the repository at this point in the history
…o when recovering from a snapshot

Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr committed Oct 30, 2022
1 parent 17cb291 commit bd7405a
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 0 deletions.
197 changes: 197 additions & 0 deletions tests/e2e/ctl_v3_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ package e2e
import (
"context"
"fmt"
"net/url"
"os"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.etcd.io/etcd/client/v3"
)

Expand Down Expand Up @@ -72,6 +76,10 @@ func TestCtlV3AuthJWTExpire(t *testing.T) { testCtl(t, authTestJWTExpi
func TestCtlV3AuthRevisionConsistency(t *testing.T) { testCtl(t, authTestRevisionConsistency) }
func TestCtlV3AuthTestCacheReload(t *testing.T) { testCtl(t, authTestCacheReload) }

func TestCtlV3AuthRecoverFromSnapshot(t *testing.T) {
testCtl(t, authTestRecoverSnapshot, withCfg(*newConfigNoTLS()), withQuorum(), withSnapshotCount(5))
}

func authEnableTest(cx ctlCtx) {
if err := authEnable(cx); err != nil {
cx.t.Fatal(err)
Expand Down Expand Up @@ -1289,3 +1297,192 @@ func authTestCacheReload(cx ctlCtx) {
cx.t.Fatal(err)
}
}

// Verify that etcd works after recovering from a snapshot.
// Refer to https://github.com/etcd-io/etcd/issues/14571.
func authTestRecoverSnapshot(cx ctlCtx) {
roles := []authRole{
{
role: "role0",
permission: clientv3.PermissionType(clientv3.PermReadWrite),
key: "foo",
},
}

users := []authUser{
{
user: "root",
pass: "rootPass",
role: "root",
},
{
user: "user0",
pass: "user0Pass",
role: "role0",
},
}

cx.t.Log("setup and enable auth")
setupAuth(cx, roles, users)

// create a client with root user
cx.t.Log("create a client with root user")
cliRoot, err := clientv3.New(clientv3.Config{Endpoints: cx.epc.EndpointsV3(), Username: "root", Password: "rootPass", DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cliRoot.Close()

// write more than SnapshotCount keys, so that at least one snapshot is created
cx.t.Log("Write enough key/value to trigger a snapshot")
for i := 0; i <= 6; i++ {
if _, err := cliRoot.Put(context.TODO(), fmt.Sprintf("key_%d", i), fmt.Sprintf("value_%d", i)); err != nil {
cx.t.Fatalf("failed to Put (%v)", err)
}
}

// add a new member into the cluster
// Refer to https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/tests/e2e/cluster_test.go#L238
var (
idx = 3
name = fmt.Sprintf("test-%d", idx)
port = cx.cfg.basePort + 5*idx
curlHost = fmt.Sprintf("localhost:%d", port)
nodeClientURL = url.URL{Scheme: cx.cfg.clientScheme(), Host: curlHost}
nodePeerURL = url.URL{Scheme: cx.cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
initialCluster = cx.epc.procs[0].Config().initialCluster + "," + fmt.Sprintf("%s=%s", name, nodePeerURL.String())
)
cx.t.Logf("Adding a new member: %s", nodePeerURL.String())
// Must wait at least 5 seconds, otherwise it will always get an
// "etcdserver: unhealthy cluster" response, please refer to link below,
// https://github.com/etcd-io/etcd/blob/17cb291f1515d0a4d712acdf396a1f2874f172bf/server/etcdserver/server.go#L1611
assert.Eventually(cx.t, func() bool {
if _, err := cliRoot.MemberAdd(context.TODO(), []string{nodePeerURL.String()}); err != nil {
cx.t.Logf("Failed to add member, peelURL: %s, error: %v", nodePeerURL.String(), err)
return false
}
return true
}, 8*time.Second, 2*time.Second)

cx.t.Logf("Starting the new member: %s", nodePeerURL.String())
newProc, err := runEtcdNode(name, cx.t.TempDir(), nodeClientURL.String(), nodePeerURL.String(), "existing", initialCluster)
require.NoError(cx.t, err)
defer newProc.Stop()

// create a client with user "user0", and connects to the new member
cx.t.Log("create a client with user 'user0'")
cliUser, err := clientv3.New(clientv3.Config{Endpoints: []string{nodeClientURL.String()}, Username: "user0", Password: "user0Pass", DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer cliUser.Close()

// write data using the cliUser, expect no error
cx.t.Log("Write a key/value using user 'user0'")
_, err = cliUser.Put(context.TODO(), "foo", "bar")
require.NoError(cx.t, err)

//verify all nodes have the same revision and hash
var endpoints []string
for _, proc := range cx.epc.procs {
endpoints = append(endpoints, proc.Config().acurl)
}
endpoints = append(endpoints, nodeClientURL.String())
cx.t.Log("Verify all members have the same revision and hash")
assert.Eventually(cx.t, func() bool {
hashKvs, err := hashKVs(endpoints, cliRoot)
if err != nil {
cx.t.Logf("failed to get HashKV: %v", err)
return false
}

if len(hashKvs) != 4 {
cx.t.Logf("expected 4 hashkv responses, but got: %d", len(hashKvs))
return false
}

if !(hashKvs[0].Header.Revision == hashKvs[1].Header.Revision &&
hashKvs[0].Header.Revision == hashKvs[2].Header.Revision &&
hashKvs[0].Header.Revision == hashKvs[3].Header.Revision) {
cx.t.Logf("Got different revisions, [%d, %d, %d, %d]",
hashKvs[0].Header.Revision,
hashKvs[1].Header.Revision,
hashKvs[2].Header.Revision,
hashKvs[3].Header.Revision)
return false
}

assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[1].Hash)
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[2].Hash)
assert.Equal(cx.t, hashKvs[0].Hash, hashKvs[3].Hash)

return true
}, 5*time.Second, 100*time.Millisecond)
}

type authRole struct {
role string
permission clientv3.PermissionType
key string
keyEnd string
}

type authUser struct {
user string
pass string
role string
}

func setupAuth(cx ctlCtx, roles []authRole, users []authUser) {
endpoint := cx.epc.procs[0].EndpointsV3()[0]

// create a client
c, err := clientv3.New(clientv3.Config{Endpoints: []string{endpoint}, DialTimeout: 3 * time.Second})
if err != nil {
cx.t.Fatal(err)
}
defer c.Close()

// create roles
for _, r := range roles {
// add role
if _, err = c.RoleAdd(context.TODO(), r.role); err != nil {
cx.t.Fatal(err)
}

// grant permission to role
if _, err = c.RoleGrantPermission(context.TODO(), r.role, r.key, r.keyEnd, r.permission); err != nil {
cx.t.Fatal(err)
}
}

// create users
for _, u := range users {
// add user
if _, err = c.UserAdd(context.TODO(), u.user, u.pass); err != nil {
cx.t.Fatal(err)
}

// grant role to user
if _, err = c.UserGrantRole(context.TODO(), u.user, u.role); err != nil {
cx.t.Fatal(err)
}
}

// enable auth
if _, err = c.AuthEnable(context.TODO()); err != nil {
cx.t.Fatal(err)
}
}

func hashKVs(endpoints []string, cli *clientv3.Client) ([]*clientv3.HashKVResponse, error) {
var retHashKVs []*clientv3.HashKVResponse
for _, ep := range endpoints {
resp, err := cli.HashKV(context.TODO(), ep, 0)
if err != nil {
return nil, err
}
retHashKVs = append(retHashKVs, resp)
}
return retHashKVs, nil
}
8 changes: 8 additions & 0 deletions tests/e2e/ctl_v3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ func withMaxConcurrentStreams(streams uint32) ctlOption {
}
}

// This function must be called after the `withCfg`, otherwise its value
// may be overwritten by `withCfg`.
func withSnapshotCount(snapshotCount int) ctlOption {
return func(cx *ctlCtx) {
cx.cfg.snapshotCount = snapshotCount
}
}

func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
testCtlWithOffline(t, testFunc, nil, opts...)
}
Expand Down

0 comments on commit bd7405a

Please sign in to comment.