Skip to content

Commit

Permalink
Merge branch 'master' into fix_backoff_misbehavior
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoxinyu authored Jul 21, 2022
2 parents 98eca9f + 97f6592 commit 6a699ae
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 13 deletions.
21 changes: 21 additions & 0 deletions cdc/redo/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,16 @@ func NewWriter(ctx context.Context, cfg *FileWriterConfig, opts ...Option) (*Wri
w.uuidGenerator = uuid.NewGenerator()
}

if len(cfg.Dir) == 0 {
return nil, cerror.WrapError(cerror.ErrRedoFileOp, errors.New("invalid redo dir path"))
}

err := os.MkdirAll(cfg.Dir, common.DefaultDirMode)
if err != nil {
return nil, cerror.WrapError(cerror.ErrRedoFileOp,
errors.Annotatef(err, "can't make dir: %s for redo writing", cfg.Dir))
}

// if we use S3 as the remote storage, a file allocator can be leveraged to
// pre-allocate files for us.
// TODO: test whether this improvement can also be applied to NFS.
Expand Down Expand Up @@ -333,6 +343,17 @@ func (w *Writer) close() error {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

dirFile, err := os.Open(w.cfg.Dir)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
defer dirFile.Close()
// sync the dir so as to guarantee the renamed file is persisted to disk.
err = dirFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

// We only write content to S3 before closing the local file.
// By this way, we no longer need renaming object in S3.
if w.cfg.S3Storage {
Expand Down
23 changes: 19 additions & 4 deletions cdc/redo/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,20 +551,35 @@ func (l *LogWriter) flushLogMeta(checkpointTs, resolvedTs uint64) error {
return cerror.WrapError(cerror.ErrRedoFileOp, errors.Annotate(err, "can't make dir for new redo logfile"))
}

metaFile, err := openTruncFile(l.filePath())
// we will create a temp metadata file and then atomically rename it.
tmpFileName := l.filePath() + common.MetaTmpEXT
tmpFile, err := openTruncFile(tmpFileName)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
defer tmpFile.Close()

_, err = metaFile.Write(data)
_, err = tmpFile.Write(data)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
err = metaFile.Sync()
err = tmpFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
err = metaFile.Close()

err = os.Rename(tmpFileName, l.filePath())
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}

dirFile, err := os.Open(l.cfg.Dir)
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
defer dirFile.Close()
// sync the dir so as to guarantee the renamed file is persisted to disk.
err = dirFile.Sync()
if err != nil {
return cerror.WrapError(cerror.ErrRedoFileOp, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func newMockGCCluster() *mockCluster {
leaderVal,
&rpcutil.LeaderClientWithLock[pb.ResourceManagerClient]{},
atomic.NewBool(true),
&rate.Limiter{}))
&rate.Limiter{}, nil))

clientsManager := client.NewClientManager()
resourceTp := resourcetypes.NewLocalFileResourceType(clientsManager)
Expand Down
2 changes: 1 addition & 1 deletion engine/pkg/externalresource/manager/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newServiceTestSuite(t *testing.T) *serviceTestSuite {
leaderVal,
&rpcutil.LeaderClientWithLock[pb.ResourceManagerClient]{},
atomic.NewBool(true),
&rate.Limiter{}))
&rate.Limiter{}, nil))
return &serviceTestSuite{
service: srvc,
executorInfoProvider: execPro,
Expand Down
34 changes: 29 additions & 5 deletions engine/pkg/rpcutil/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,29 @@ func (l *LeaderClientWithLock[T]) Close() {
}
}

// rpcLimiter is a customized rate limiter, which delegates Allow of rate.Limiter,
// and provides an allow list with a higher priority.
type rpcLimiter struct {
limiter *rate.Limiter
allowList []string
}

func newRPCLimiter(limiter *rate.Limiter, allowList []string) *rpcLimiter {
return &rpcLimiter{
limiter: limiter,
allowList: allowList,
}
}

func (rl *rpcLimiter) Allow(methodName string) bool {
for _, name := range rl.allowList {
if name == methodName {
return true
}
}
return rl.limiter.Allow()
}

// PreRPCHook provides some common functionality that should be executed before
// some RPC, like "forward to leader", "checking rate limit". It should be embedded
// into an RPC server struct and call PreRPCHook.PreRPC() for every RPC method.
Expand All @@ -103,7 +126,7 @@ type PreRPCHook[T RPCClientType] struct {
initialized *atomic.Bool

// rate limiter
limiter *rate.Limiter
limiter *rpcLimiter
}

// NewPreRPCHook creates a new PreRPCHook
Expand All @@ -113,13 +136,15 @@ func NewPreRPCHook[T RPCClientType](
leaderCli *LeaderClientWithLock[T],
initialized *atomic.Bool,
limiter *rate.Limiter,
rpcLimiterAllowList []string,
) *PreRPCHook[T] {
rpcLim := newRPCLimiter(limiter, rpcLimiterAllowList)
return &PreRPCHook[T]{
id: id,
leader: leader,
leaderCli: leaderCli,
initialized: initialized,
limiter: limiter,
limiter: rpcLim,
}
}

Expand Down Expand Up @@ -153,9 +178,8 @@ func (h PreRPCHook[T]) PreRPC(
}

func (h PreRPCHook[T]) logRateLimit(methodName string, req interface{}) {
// TODO: rate limiter based on different sender
if h.limiter.Allow() {
log.Info("", zap.Any("payload", req), zap.String("request", methodName))
if h.limiter.Allow(methodName) {
log.Info("Executing rpc", zap.String("request", methodName), zap.Any("payload", req))
}
}

Expand Down
19 changes: 18 additions & 1 deletion engine/pkg/rpcutil/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,13 @@ func (s *mockRPCServer) MockRPCWithErrField(ctx context.Context, req *mockRPCReq
// newMockRPCServer returns a mockRPCServer that is ready to use.
func newMockRPCServer() *mockRPCServer {
serverID := "server1"
rpcLim := newRPCLimiter(rate.NewLimiter(rate.Every(time.Second*5), 3), nil)
h := &PreRPCHook[mockRPCClientIface]{
id: serverID,
leader: &atomic.Value{},
leaderCli: &LeaderClientWithLock[mockRPCClientIface]{},
initialized: atomic.NewBool(true),
limiter: rate.NewLimiter(rate.Every(time.Second*5), 3),
limiter: rpcLim,
}
h.leader.Store(&Member{Name: serverID})
return &mockRPCServer{hook: h}
Expand Down Expand Up @@ -205,3 +206,19 @@ func TestCheckInitialized(t *testing.T) {
require.NoError(t, err)
require.Equal(t, pb.ErrorCode_MasterNotReady, resp.Err.Code)
}

func TestRPCLimiter(t *testing.T) {
t.Parallel()

allowList := []string{"submit", "cancel"}
rl := rate.NewLimiter(rate.Every(time.Minute*10), 1)
rpcLim := newRPCLimiter(rl, allowList)
require.True(t, rpcLim.Allow(allowList[0]))
require.True(t, rpcLim.Allow(allowList[1]))
require.True(t, rpcLim.Allow("query"))
require.False(t, rpcLim.Allow("query"))
require.True(t, rpcLim.Allow(allowList[0]))
require.True(t, rpcLim.Allow(allowList[1]))
require.False(t, rpcLim.Allow("query"))
require.False(t, rpcLim.Allow("query"))
}
3 changes: 3 additions & 0 deletions engine/servermaster/campaign_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestLeaderLoopSuccess(t *testing.T) {
s.masterCli,
&s.leaderInitialized,
s.rpcLogRL,
nil,
)
s.masterRPCHook = preRPCHook
sessionCfg, err := s.generateSessionConfig()
Expand Down Expand Up @@ -135,6 +136,7 @@ func TestLeaderLoopMeetStaleData(t *testing.T) {
s.masterCli,
&s.leaderInitialized,
s.rpcLogRL,
nil,
)
s.masterRPCHook = preRPCHook

Expand Down Expand Up @@ -211,6 +213,7 @@ func TestLeaderLoopWatchLeader(t *testing.T) {
s.masterCli,
&s.leaderInitialized,
s.rpcLogRL,
nil,
)
s.masterRPCHook = preRPCHook
s.leaderServiceFn = mockLeaderServiceFn
Expand Down
1 change: 1 addition & 0 deletions engine/servermaster/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func TestUpdateServerMembers(t *testing.T) {
s.masterCli,
&s.leaderInitialized,
s.rpcLogRL,
nil,
)
s.masterRPCHook = preRPCHook
leader, exists := s.masterRPCHook.CheckLeader()
Expand Down
15 changes: 15 additions & 0 deletions engine/servermaster/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ import (
p2pProtocol "github.com/pingcap/tiflow/proto/p2p"
)

// use a slice instead of map because in small data size, slice search is faster
// than map search.
var masterRPCLimiterAllowList = []string{
"SubmitJob",
"CancelJob",
"ScheduleTask",
}

var resourceRPCLimiterAllowList = []string{
"CreateResource",
"RemoveResource",
}

// Server handles PRC requests for df master.
type Server struct {
etcd *embed.Etcd
Expand Down Expand Up @@ -198,6 +211,7 @@ func NewServer(cfg *Config, ctx *test.Context) (*Server, error) {
server.masterCli,
&server.leaderInitialized,
server.rpcLogRL,
masterRPCLimiterAllowList,
)
server.masterRPCHook = masterRPCHook
return server, nil
Expand Down Expand Up @@ -534,6 +548,7 @@ func (s *Server) startResourceManager() error {
s.resourceCli,
&s.leaderInitialized,
s.rpcLogRL,
resourceRPCLimiterAllowList,
)
s.resourceManagerService = externRescManager.NewService(
s.frameMetaClient,
Expand Down
10 changes: 9 additions & 1 deletion pkg/fsutil/file_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,21 @@ func TestFileAllocateSuccess(t *testing.T) {
}

func TestFileAllocateFailed(t *testing.T) {
// 1. the requested allocation space will cause disk full
fl := NewFileAllocator(t.TempDir(), "test", math.MaxInt64)
defer fl.Close()

f, err := fl.Open()
require.Nil(t, f)
require.NotNil(t, err)
f.Close()
fl.Close()

// 2. the directory does not exist
fl = NewFileAllocator("not-exist-dir", "test", 1024)
f, err = fl.Open()
require.NotNil(t, err)
require.Nil(t, f)
fl.Close()
}

func benchmarkWriteData(b *testing.B, size int, useFileAlloctor bool) {
Expand Down

0 comments on commit 6a699ae

Please sign in to comment.