Skip to content

Commit

Permalink
test(integration): add integration test for master lost during syncin…
Browse files Browse the repository at this point in the history
…g sst files. (#2691)
  • Loading branch information
LindaSummer authored Dec 12, 2024
1 parent 6bbdc55 commit 7cda420
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 4 deletions.
1 change: 1 addition & 0 deletions tests/gocase/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.9.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/sync v0.10.0
golang.org/x/sys v0.27.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 2 additions & 4 deletions tests/gocase/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU=
Expand All @@ -39,10 +37,10 @@ github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPD
github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo=
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
57 changes: 57 additions & 0 deletions tests/gocase/integration/replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,3 +552,60 @@ func TestFullSyncReplication(t *testing.T) {
require.Equal(t, "bar", slaveClient.Get(ctx, "foo").Val())
})
}

func TestSlaveLostMaster(t *testing.T) {
// integration test for #2662 and #2671
ctx := context.Background()

masterSrv := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"max-replication-mb": "1",
"rocksdb.compression": "no",
"rocksdb.write_buffer_size": "1",
"rocksdb.target_file_size_base": "1",
})
defer func() { masterSrv.Close() }()
masterClient := masterSrv.NewClient()
defer func() { require.NoError(t, masterClient.Close()) }()
masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterNodeID).Err())

replicaSrv := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"replication-connect-timeout-ms": "5000",
"replication-recv-timeout-ms": "5100",
})
defer func() { replicaSrv.Close() }()
replicaClient := replicaSrv.NewClient()
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())
defer func() { require.NoError(t, replicaClient.Close()) }()
replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", replicaNodeID).Err())

proxyCtx, cancelProxy := context.WithCancel(ctx)
newMasterPort := util.SimpleTCPProxy(proxyCtx, t, fmt.Sprintf("127.0.0.1:%d", masterSrv.Port()), true)

masterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s 127.0.0.1 %d slave %s",
masterNodeID, masterSrv.Port(), replicaNodeID, replicaSrv.Port(), masterNodeID)
clusterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s 127.0.0.1 %d slave %s",
masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(), masterNodeID)
unexistNodesInfo := fmt.Sprintf("%s 127.0.0.2 %d master - 0-16383\n%s 127.0.0.1 %d slave %s",
masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(), masterNodeID)

require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", masterNodesInfo, "1").Err())
value := strings.Repeat("a", 128*1024)

for i := 0; i < 1024; i++ {
require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err())
}

require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", clusterNodesInfo, "1").Err())

time.Sleep(2 * time.Second)
cancelProxy()
start := time.Now()
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", unexistNodesInfo, "2").Err())
duration := time.Since(start)
require.Less(t, duration, time.Second*6)
}
80 changes: 80 additions & 0 deletions tests/gocase/util/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ package util

import (
"context"
"errors"
"fmt"
"io"
"net"
"regexp"
"strings"
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func FindInfoEntry(rdb *redis.Client, key string, section ...string) string {
Expand Down Expand Up @@ -71,3 +75,79 @@ func Populate(t testing.TB, rdb *redis.Client, prefix string, n, size int) {
_, err := p.Exec(ctx)
require.NoError(t, err)
}

func SimpleTCPProxy(ctx context.Context, t testing.TB, to string, slowdown bool) uint64 {
addr, err := findFreePort()
if err != nil {
t.Fatalf("can't find a free port, %v", err)
}
from := addr.String()

listener, err := net.Listen("tcp", from)
if err != nil {
t.Fatalf("listen to %s failed, err: %v", from, err)
}

copyBytes := func(src, dest io.ReadWriter) func() error {
buffer := make([]byte, 4096)
return func() error {
COPY_LOOP:
for {
select {
case <-ctx.Done():
t.Log("forwarding tcp stream stopped")
break COPY_LOOP
default:
if slowdown {
time.Sleep(time.Millisecond * 100)
}
n, err := src.Read(buffer)
if err != nil {
if errors.Is(err, io.EOF) {
break COPY_LOOP
}
return err
}
_, err = dest.Write(buffer[:n])
if err != nil {
if errors.Is(err, io.EOF) {
break COPY_LOOP
}
return err
}
}
}
return nil
}
}

go func() {
defer listener.Close()
LISTEN_LOOP:
for {
select {
case <-ctx.Done():
break LISTEN_LOOP

default:
conn, err := listener.Accept()
if err != nil {
t.Fatalf("accept conn failed, err: %v", err)
}
dest, err := net.Dial("tcp", to)
if err != nil {
t.Fatalf("accept conn failed, err: %v", err)
}
var errGrp errgroup.Group
errGrp.Go(copyBytes(conn, dest))
errGrp.Go(copyBytes(dest, conn))
err = errGrp.Wait()
if err != nil {
t.Fatalf("forward tcp stream failed, err: %v", err)
}

}
}
}()
return uint64(addr.Port)
}

0 comments on commit 7cda420

Please sign in to comment.