Skip to content

Commit

Permalink
engine: keep the same lint with tiflow repo (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored May 11, 2022
1 parent a2980f0 commit a22bef5
Show file tree
Hide file tree
Showing 117 changed files with 713 additions and 364 deletions.
15 changes: 14 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,18 @@ issues:
- errcheck
- gosec
- makezero
- path: lib
linters:
- revive
- path: jobmaster
linters:
- revive
- path: executor/worker
linters:
- revive
include:
- EXC0012 # exported (.+) should have comment( \(or a comment on this block\))? or be unexported)
- EXC0014 # comment on exported (.+) should be of the form "(.+)..."

run:
go: 1.18
go: 1.18
6 changes: 6 additions & 0 deletions client/base_executor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,32 @@ func (c *baseExecutorClientImpl) Send(ctx context.Context, req *ExecutorRequest)
return resp, err
}

// CmdType represents the request type when dispatching task from server master to executor.
type CmdType uint16

// CmdType values.
const (
CmdPreDispatchTask CmdType = 1 + iota
CmdConfirmDispatchTask
)

// ExecutorRequest wraps CmdType and dispatch task request object
type ExecutorRequest struct {
Cmd CmdType
Req interface{}
}

// PreDispatchTask unwraps gRPC PreDispatchTaskRequest from ExecutorRequest
func (e *ExecutorRequest) PreDispatchTask() *pb.PreDispatchTaskRequest {
return e.Req.(*pb.PreDispatchTaskRequest)
}

// ConfirmDispatchTask unwraps gRPC ConfirmDispatchTask from ExecutorRequest
func (e *ExecutorRequest) ConfirmDispatchTask() *pb.ConfirmDispatchTaskRequest {
return e.Req.(*pb.ConfirmDispatchTaskRequest)
}

// ExecutorResponse wraps DispatchTaskResponse object
type ExecutorResponse struct {
Resp interface{}
}
9 changes: 9 additions & 0 deletions client/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ type ClientsManager interface {
AddExecutor(id model.ExecutorID, addr string) error
}

// NewClientManager creates a new Manager instance
func NewClientManager() *Manager {
return &Manager{
executors: make(map[model.ExecutorID]ExecutorClient),
}
}

// Manager is used to maintain all clients to server master and executor.
// TODO: We need to consider when to remove executor client and how to process transilient error.
type Manager struct {
mu sync.RWMutex
Expand All @@ -31,16 +33,19 @@ type Manager struct {
executors map[model.ExecutorID]ExecutorClient
}

// MasterClient implements ClientsManager.MasterClient.
func (c *Manager) MasterClient() MasterClient {
return c.master
}

// ExecutorClient implements ClientsManager.ExecutorClient
func (c *Manager) ExecutorClient(id model.ExecutorID) ExecutorClient {
c.mu.RLock()
defer c.mu.RUnlock()
return c.executors[id]
}

// AddMasterClient creates a new master client.
// TODO Right now the interface and params are not consistent. We should abstract a "grpc pool"
// interface to maintain a pool of grpc connections.
func (c *Manager) AddMasterClient(ctx context.Context, addrs []string) error {
Expand All @@ -52,6 +57,9 @@ func (c *Manager) AddMasterClient(ctx context.Context, addrs []string) error {
return err
}

// AddExecutor implements ClientsManager.AddExecutor
// It creates a new executor client for the given executor. If the executor
// client already exists, does nothing.
func (c *Manager) AddExecutor(id model.ExecutorID, addr string) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -67,6 +75,7 @@ func (c *Manager) AddExecutor(id model.ExecutorID, addr string) error {
return nil
}

// AddExecutorClient adds an executor client(for a executor) to executor client manager
func (c *Manager) AddExecutorClient(id model.ExecutorID, client ExecutorClient) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions client/executor_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
)

// ExecutorClient defines an interface that supports sending gRPC from server
// master to executor.
type ExecutorClient interface {
baseExecutorClient

Expand Down
11 changes: 11 additions & 0 deletions client/master_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/hanfei1991/microcosm/test/mock"
)

// DialTimeout is the default timeout for gRPC dialing
const DialTimeout = 5 * time.Second

// MasterClient abstracts an interface that can be used to interact with server master
type MasterClient interface {
UpdateClients(ctx context.Context, urls []string, leaderURL string)
Endpoints() []string
Expand Down Expand Up @@ -44,6 +46,7 @@ type MasterClient interface {
GetLeaderClient() pb.MasterClient
}

// MasterClientImpl implemeents MasterClient interface
type MasterClientImpl struct {
*rpcutil.FailoverRPCClients[pb.MasterClient]
}
Expand All @@ -66,6 +69,7 @@ var mockDialImpl = func(ctx context.Context, addr string) (pb.MasterClient, rpcu
return mock.NewMasterClient(conn), conn, nil
}

// NewMasterClient creates a new MasterClientImpl instance
func NewMasterClient(ctx context.Context, join []string) (*MasterClientImpl, error) {
dialer := dialImpl
if test.GetGlobalTestFlag() {
Expand All @@ -88,22 +92,27 @@ func (c *MasterClientImpl) RegisterExecutor(ctx context.Context, req *pb.Registe
return rpcutil.DoFailoverRPC(ctx, c.FailoverRPCClients, req, pb.MasterClient.RegisterExecutor)
}

// SubmitJob implemeents MasterClient.SubmitJob
func (c *MasterClientImpl) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) (resp *pb.SubmitJobResponse, err error) {
return rpcutil.DoFailoverRPC(ctx, c.FailoverRPCClients, req, pb.MasterClient.SubmitJob)
}

// QueryJob implemeents MasterClient.QueryJob
func (c *MasterClientImpl) QueryJob(ctx context.Context, req *pb.QueryJobRequest) (resp *pb.QueryJobResponse, err error) {
return rpcutil.DoFailoverRPC(ctx, c.FailoverRPCClients, req, pb.MasterClient.QueryJob)
}

// PauseJob implemeents MasterClient.PauseJob
func (c *MasterClientImpl) PauseJob(ctx context.Context, req *pb.PauseJobRequest) (resp *pb.PauseJobResponse, err error) {
return rpcutil.DoFailoverRPC(ctx, c.FailoverRPCClients, req, pb.MasterClient.PauseJob)
}

// CancelJob implemeents MasterClient.CancelJob
func (c *MasterClientImpl) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (resp *pb.CancelJobResponse, err error) {
return rpcutil.DoFailoverRPC(ctx, c.FailoverRPCClients, req, pb.MasterClient.CancelJob)
}

// QueryMetaStore implemeents MasterClient.QueryMetaStore
func (c *MasterClientImpl) QueryMetaStore(
ctx context.Context, req *pb.QueryMetaStoreRequest, timeout time.Duration,
) (resp *pb.QueryMetaStoreResponse, err error) {
Expand All @@ -124,13 +133,15 @@ func (c *MasterClientImpl) ScheduleTask(
return rpcutil.DoFailoverRPC(ctx1, c.FailoverRPCClients, req, pb.MasterClient.ScheduleTask)
}

// ReportExecutorWorkload implemeents MasterClient.ReportExecutorWorkload
func (c *MasterClientImpl) ReportExecutorWorkload(
ctx context.Context,
req *pb.ExecWorkloadRequest,
) (resp *pb.ExecWorkloadResponse, err error) {
return rpcutil.DoFailoverRPC(ctx, c.FailoverRPCClients, req, pb.MasterClient.ReportExecutorWorkload)
}

// PersistResource implemeents MasterClient.PersistResource
func (c *MasterClientImpl) PersistResource(
ctx context.Context,
req *pb.PersistResourceRequest,
Expand Down
18 changes: 18 additions & 0 deletions client/mock_clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
"github.com/hanfei1991/microcosm/pb"
)

// MockExecutorClient is a mock implement of ExecutorClient interface
type MockExecutorClient struct {
mu sync.Mutex
mock.Mock
}

var _ ExecutorClient = (*MockExecutorClient)(nil)

// Send implements ExecutorClient.Send
func (c *MockExecutorClient) Send(ctx context.Context, request *ExecutorRequest) (*ExecutorResponse, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -25,6 +27,7 @@ func (c *MockExecutorClient) Send(ctx context.Context, request *ExecutorRequest)
return args.Get(0).(*ExecutorResponse), args.Error(1)
}

// DispatchTask implements ExecutorClient.DispatchTask
func (c *MockExecutorClient) DispatchTask(
ctx context.Context,
args *DispatchTaskArgs,
Expand All @@ -38,18 +41,21 @@ func (c *MockExecutorClient) DispatchTask(
return retArgs.Error(0)
}

// MockServerMasterClient mocks server master gRPC client
type MockServerMasterClient struct {
mu sync.Mutex
mock.Mock
}

// UpdateClients implements MasterClient.UpdateClients
func (c *MockServerMasterClient) UpdateClients(ctx context.Context, urls []string, _ string) {
c.mu.Lock()
defer c.mu.Unlock()

c.Mock.Called(ctx, urls)
}

// Endpoints implements MasterClient.Endpoints
func (c *MockServerMasterClient) Endpoints() []string {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -58,6 +64,7 @@ func (c *MockServerMasterClient) Endpoints() []string {
return args.Get(0).([]string)
}

// Heartbeat implements MasterClient.Heartbeat
func (c *MockServerMasterClient) Heartbeat(
ctx context.Context,
req *pb.HeartbeatRequest,
Expand All @@ -70,6 +77,7 @@ func (c *MockServerMasterClient) Heartbeat(
return args.Get(0).(*pb.HeartbeatResponse), args.Error(1)
}

// RegisterExecutor implements MasterClient.RegisterExecutor
func (c *MockServerMasterClient) RegisterExecutor(
ctx context.Context,
req *pb.RegisterExecutorRequest,
Expand All @@ -82,6 +90,7 @@ func (c *MockServerMasterClient) RegisterExecutor(
return args.Get(0).(*pb.RegisterExecutorResponse), args.Error(1)
}

// SubmitJob implements MasterClient.SubmitJob
func (c *MockServerMasterClient) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) (resp *pb.SubmitJobResponse, err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -90,6 +99,7 @@ func (c *MockServerMasterClient) SubmitJob(ctx context.Context, req *pb.SubmitJo
return args.Get(0).(*pb.SubmitJobResponse), args.Error(1)
}

// QueryJob implements MasterClient.QueryJob
func (c *MockServerMasterClient) QueryJob(ctx context.Context, req *pb.QueryJobRequest) (resp *pb.QueryJobResponse, err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -98,6 +108,7 @@ func (c *MockServerMasterClient) QueryJob(ctx context.Context, req *pb.QueryJobR
return args.Get(0).(*pb.QueryJobResponse), args.Error(1)
}

// PauseJob implements MasterClient.PauseJob
func (c *MockServerMasterClient) PauseJob(ctx context.Context, req *pb.PauseJobRequest) (resp *pb.PauseJobResponse, err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -106,6 +117,7 @@ func (c *MockServerMasterClient) PauseJob(ctx context.Context, req *pb.PauseJobR
return args.Get(0).(*pb.PauseJobResponse), args.Error(1)
}

// CancelJob implements MasterClient.CancelJob
func (c *MockServerMasterClient) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (resp *pb.CancelJobResponse, err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -114,6 +126,7 @@ func (c *MockServerMasterClient) CancelJob(ctx context.Context, req *pb.CancelJo
return args.Get(0).(*pb.CancelJobResponse), args.Error(1)
}

// QueryMetaStore implements MasterClient.QueryMetaStore
func (c *MockServerMasterClient) QueryMetaStore(
ctx context.Context,
req *pb.QueryMetaStoreRequest,
Expand All @@ -126,6 +139,7 @@ func (c *MockServerMasterClient) QueryMetaStore(
return args.Get(0).(*pb.QueryMetaStoreResponse), args.Error(1)
}

// Close implements MasterClient.Close
func (c *MockServerMasterClient) Close() (err error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -134,6 +148,7 @@ func (c *MockServerMasterClient) Close() (err error) {
return args.Error(0)
}

// ReportExecutorWorkload implements MasterClient.ReportExecutorWorkload
func (c *MockServerMasterClient) ReportExecutorWorkload(
ctx context.Context,
req *pb.ExecWorkloadRequest,
Expand All @@ -145,10 +160,12 @@ func (c *MockServerMasterClient) ReportExecutorWorkload(
return args.Get(0).(*pb.ExecWorkloadResponse), args.Error(1)
}

// GetLeaderClient implements MasterClient.GetLeaderClient
func (c *MockServerMasterClient) GetLeaderClient() pb.MasterClient {
panic("implement me")
}

// ScheduleTask implements MasterClient.ScheduleTask
func (c *MockServerMasterClient) ScheduleTask(
ctx context.Context,
req *pb.ScheduleTaskRequest,
Expand All @@ -161,6 +178,7 @@ func (c *MockServerMasterClient) ScheduleTask(
return args.Get(0).(*pb.ScheduleTaskResponse), args.Error(1)
}

// PersistResource implements MasterClient.PersistResource
func (c *MockServerMasterClient) PersistResource(
ctx context.Context,
in *pb.PersistResourceRequest,
Expand Down
4 changes: 4 additions & 0 deletions client/task_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ type DispatchTaskArgs struct {
}

type (
// StartWorkerCallback alias to the function that is called after the pre
// dispatch task is successful and before confirm dispatch task.
StartWorkerCallback = func()
// AbortWorkerCallback alias to the function that is called only if the
// failure is guaranteed when creating worker.
AbortWorkerCallback = func(error)
)

Expand Down
18 changes: 9 additions & 9 deletions cmd/demoserver/kvdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ import (
)

func TestDemoLogic(t *testing.T) {
DemoAddress = "127.0.0.1:1234"
DemoDir = "/tmp/data"
WtDir := "/tmp/data1"
demoAddress = "127.0.0.1:1234"
demoDir = "/tmp/data"
wtDir := "/tmp/data1"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go StartDataService(ctx)
go startDataService(ctx)
defer func() {
os.RemoveAll(DemoDir)
os.RemoveAll(WtDir)
os.RemoveAll(demoDir)
os.RemoveAll(wtDir)
}()
ctx1, cancel1 := context.WithTimeout(ctx, 2*time.Second)
defer cancel1()
conn, err := grpc.DialContext(ctx1, DemoAddress, grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.DialContext(ctx1, demoAddress, grpc.WithInsecure(), grpc.WithBlock())
require.Nil(t, err)
demoClt := pb.NewDataRWServiceClient(conn)
// Generate Data
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestDemoLogic(t *testing.T) {
require.Equal(t, false, rlResp.IsEof, i)
require.Equal(t, strs[0][i], string(rlResp.Key))
err = wrClt.Send(&pb.WriteLinesRequest{
Dir: WtDir,
Dir: wtDir,
FileIdx: 0,
Key: rlResp.Key,
Value: rlResp.Val,
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestDemoLogic(t *testing.T) {
require.Equal(t, true, rlResp.IsEof)

result, err := demoClt.CheckDir(ctx, &pb.CheckDirRequest{
Dir: DemoDir,
Dir: demoDir,
})
require.Nil(t, err)
require.Empty(t, result.ErrMsg)
Expand Down
Loading

0 comments on commit a22bef5

Please sign in to comment.