Skip to content

Commit

Permalink
client: decouple clients out of executor managers (pingcap#46)
Browse files Browse the repository at this point in the history
* client: decouple clients out of executor managers
* refine pkg
* add tests
* address comment and add more comments
  • Loading branch information
hanfei1991 authored Dec 16, 2021
1 parent 5355972 commit 766eefe
Show file tree
Hide file tree
Showing 17 changed files with 296 additions and 168 deletions.
57 changes: 57 additions & 0 deletions client/client_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package client

import (
"context"
"sync"

"github.com/hanfei1991/microcosm/model"
"github.com/pingcap/ticdc/dm/pkg/log"
"go.uber.org/zap"
)

func NewClientManager() *Manager {
return &Manager{
executors: make(map[model.ExecutorID]ExecutorClient),
}
}

// TODO: We need to consider when to remove executor client and how to process transilient error.
type Manager struct {
mu sync.RWMutex

master *MasterClient
executors map[model.ExecutorID]ExecutorClient
}

func (c *Manager) MasterClient() *MasterClient {
return c.master
}

func (c *Manager) ExecutorClient(id model.ExecutorID) ExecutorClient {
c.mu.RLock()
defer c.mu.RUnlock()
return c.executors[id]
}

// TODO Right now the interface and params are not consistant. We should abstract a "grpc pool"
// interface to maintain a pool of grpc connections.
func (c *Manager) AddMasterClient(ctx context.Context, addrs []string) error {
var err error
c.master, err = NewMasterClient(ctx, addrs)
return err
}

func (c *Manager) AddExecutor(id model.ExecutorID, addr string) error {
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.executors[id]; ok {
return nil
}
log.L().Info("client manager adds executor", zap.String("id", string(id)), zap.String("addr", addr))
client, err := newExecutorClient(addr)
if err != nil {
return err
}
c.executors[id] = client
return nil
}
67 changes: 67 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package client_test

import (
"context"
"testing"
"time"

"github.com/hanfei1991/microcosm/client"
"github.com/hanfei1991/microcosm/executor"
"github.com/hanfei1991/microcosm/master"
"github.com/hanfei1991/microcosm/pkg/etcdutils"
"github.com/hanfei1991/microcosm/test"
"github.com/stretchr/testify/require"
)

func TestClientManager(t *testing.T) {
test.GlobalTestFlag = true

manager := client.NewClientManager()
require.Nil(t, manager.MasterClient())
require.Nil(t, manager.ExecutorClient("abc"))
ctx := context.Background()
err := manager.AddMasterClient(ctx, []string{"127.0.0.1:1992"})
require.NotNil(t, err)
require.Nil(t, manager.MasterClient())

masterCfg := &master.Config{
Etcd: &etcdutils.ConfigParams{
Name: "master1",
DataDir: "/tmp/df",
},
MasterAddr: "127.0.0.1:1992",
KeepAliveTTL: 20000000 * time.Second,
KeepAliveInterval: 200 * time.Millisecond,
RPCTimeout: time.Second,
}

masterServer, err := master.NewServer(masterCfg, test.NewContext())
require.Nil(t, err)

masterCtx, masterCancel := context.WithCancel(ctx)
defer masterCancel()
err = masterServer.Start(masterCtx)
require.Nil(t, err)

err = manager.AddMasterClient(ctx, []string{"127.0.0.1:1992"})
require.Nil(t, err)
require.NotNil(t, manager.MasterClient())

executorCfg := &executor.Config{
Join: "127.0.0.1:1992",
WorkerAddr: "127.0.0.1:1993",
KeepAliveTTL: 20000000 * time.Second,
KeepAliveInterval: 200 * time.Millisecond,
RPCTimeout: time.Second,
}

execServer := executor.NewServer(executorCfg, test.NewContext())
execCtx, execCancel := context.WithCancel(ctx)
defer execCancel()
err = execServer.Start(execCtx)
require.Nil(t, err)

err = manager.AddExecutor("executor", "127.0.0.1:1993")
require.Nil(t, err)
require.NotNil(t, manager.ExecutorClient("executor"))
}
14 changes: 5 additions & 9 deletions master/cluster/executor_client.go → client/executor_client.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package cluster
package client

import (
"context"

"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pb"
"github.com/hanfei1991/microcosm/pkg/errors"
"github.com/hanfei1991/microcosm/test"
"github.com/hanfei1991/microcosm/test/mock"
"github.com/pingcap/ticdc/dm/pkg/log"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

type ExecutorClient interface {
Send(context.Context, model.ExecutorID, *ExecutorRequest) (*ExecutorResponse, error)
Send(context.Context, *ExecutorRequest) (*ExecutorResponse, error)
}

type closeable interface {
Expand All @@ -26,11 +26,7 @@ type executorClient struct {
client pb.ExecutorClient
}

func (c *executorClient) close() error {
return c.conn.Close()
}

func (c *executorClient) send(ctx context.Context, req *ExecutorRequest) (*ExecutorResponse, error) {
func (c *executorClient) Send(ctx context.Context, req *ExecutorRequest) (*ExecutorResponse, error) {
resp := &ExecutorResponse{}
var err error
switch req.Cmd {
Expand Down Expand Up @@ -60,7 +56,7 @@ func newExecutorClient(addr string) (*executorClient, error) {
if test.GlobalTestFlag {
return newExecutorClientForTest(addr)
}
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig}))
if err != nil {
return nil, errors.ErrGrpcBuildConn.GenWithStackByArgs(addr)
}
Expand Down
33 changes: 16 additions & 17 deletions master/client.go → client/master_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package master
package client

import (
"context"
Expand All @@ -13,18 +13,14 @@ import (
"google.golang.org/grpc"
)

type Client struct {
type MasterClient struct {
urls []string
leader string
conn closeable
client pb.MasterClient
}

type closeable interface {
Close() error
}

func (c *Client) init(ctx context.Context) error {
func (c *MasterClient) init(ctx context.Context) error {
log.L().Logger.Info("dialing master", zap.String("leader", c.leader))
conn, err := grpc.DialContext(ctx, c.leader, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
Expand All @@ -35,7 +31,7 @@ func (c *Client) init(ctx context.Context) error {
return nil
}

func (c *Client) initForTest(_ context.Context) error {
func (c *MasterClient) initForTest(_ context.Context) error {
log.L().Logger.Info("dialing master", zap.String("leader", c.leader))
conn, err := mock.Dial(c.leader)
if err != nil {
Expand All @@ -46,8 +42,8 @@ func (c *Client) initForTest(_ context.Context) error {
return nil
}

func NewMasterClient(ctx context.Context, join []string) (*Client, error) {
client := &Client{
func NewMasterClient(ctx context.Context, join []string) (*MasterClient, error) {
client := &MasterClient{
urls: join,
}
client.leader = client.urls[0]
Expand All @@ -57,32 +53,35 @@ func NewMasterClient(ctx context.Context, join []string) (*Client, error) {
} else {
err = client.init(ctx)
}
return client, err
if err != nil {
return nil, err
}
return client, nil
}

// SendHeartbeat to master-server.
func (c *Client) SendHeartbeat(ctx context.Context, req *pb.HeartbeatRequest, timeout time.Duration) (*pb.HeartbeatResponse, error) {
func (c *MasterClient) SendHeartbeat(ctx context.Context, req *pb.HeartbeatRequest, timeout time.Duration) (*pb.HeartbeatResponse, error) {
ctx1, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return c.client.Heartbeat(ctx1, req)
}

// RegisterExecutor to master-server.
func (c *Client) RegisterExecutor(ctx context.Context, req *pb.RegisterExecutorRequest, timeout time.Duration) (resp *pb.RegisterExecutorResponse, err error) {
func (c *MasterClient) RegisterExecutor(ctx context.Context, req *pb.RegisterExecutorRequest, timeout time.Duration) (resp *pb.RegisterExecutorResponse, err error) {
ctx1, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return c.client.RegisterExecutor(ctx1, req)
}

func (c *Client) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) (resp *pb.SubmitJobResponse, err error) {
func (c *MasterClient) SubmitJob(ctx context.Context, req *pb.SubmitJobRequest) (resp *pb.SubmitJobResponse, err error) {
return c.client.SubmitJob(ctx, req)
}

func (c *Client) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (resp *pb.CancelJobResponse, err error) {
func (c *MasterClient) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (resp *pb.CancelJobResponse, err error) {
return c.client.CancelJob(ctx, req)
}

func (c *Client) QueryMetaStore(
func (c *MasterClient) QueryMetaStore(
ctx context.Context, req *pb.QueryMetaStoreRequest, timeout time.Duration,
) (*pb.QueryMetaStoreResponse, error) {
ctx1, cancel := context.WithTimeout(ctx, timeout)
Expand All @@ -92,7 +91,7 @@ func (c *Client) QueryMetaStore(

// RequestForSchedule sends TaskSchedulerRequest to server master and master
// will ask resource manager for resource and allocates executors to given tasks
func (c *Client) RequestForSchedule(
func (c *MasterClient) RequestForSchedule(
ctx context.Context,
req *pb.TaskSchedulerRequest,
timeout time.Duration,
Expand Down
4 changes: 2 additions & 2 deletions cmd/master-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"os"
"strconv"

"github.com/hanfei1991/microcosm/master"
"github.com/hanfei1991/microcosm/client"
"github.com/hanfei1991/microcosm/master/jobmaster/benchmark"
"github.com/hanfei1991/microcosm/pb"
"github.com/pkg/errors"
Expand All @@ -30,7 +30,7 @@ func main() {
os.Exit(0)
}
ctx := context.Background()
clt, err := master.NewMasterClient(ctx, []string{addr})
clt, err := client.NewMasterClient(ctx, []string{addr})
if err != nil {
fmt.Printf("err: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions executor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"strings"
"time"

"github.com/hanfei1991/microcosm/client"
"github.com/hanfei1991/microcosm/executor/runtime"
"github.com/hanfei1991/microcosm/master"
"github.com/hanfei1991/microcosm/model"
"github.com/hanfei1991/microcosm/pb"
"github.com/hanfei1991/microcosm/pkg/errors"
Expand All @@ -29,7 +29,7 @@ type Server struct {
testCtx *test.Context

srv *grpc.Server
cli *master.Client
cli *client.MasterClient
sch *runtime.Runtime
info *model.ExecutorInfo

Expand Down Expand Up @@ -259,7 +259,7 @@ func (s *Server) keepalive(ctx context.Context) error {

func (s *Server) selfRegister(ctx context.Context) (err error) {
// Register myself
s.cli, err = master.NewMasterClient(ctx, getJoinURLs(s.cfg.Join))
s.cli, err = client.NewMasterClient(ctx, getJoinURLs(s.cfg.Join))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 766eefe

Please sign in to comment.