diff --git a/api/api.go b/api/api.go index e9bc405b..d6419aea 100644 --- a/api/api.go +++ b/api/api.go @@ -23,6 +23,7 @@ type API struct { License service.LicenseService Template service.TemplateService Task service.TaskService + Locker service.LockerService *service.AppCombinedService log *log.Logger } @@ -90,6 +91,10 @@ func NewAPI(config *config.CloudConfig) (*API, error) { if err != nil { return nil, err } + lockerService, err := service.NewLockerService(config) + if err != nil { + return nil, err + } return &API{ NS: namespaceService, Node: nodeService, @@ -104,6 +109,7 @@ func NewAPI(config *config.CloudConfig) (*API, error) { License: licenseService, Template: templateService, Task: taskService, + Locker: lockerService, AppCombinedService: acs, log: log.L().With(log.Any("api", "admin")), }, nil diff --git a/api/api_test.go b/api/api_test.go index 520c56a2..3074f430 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -31,6 +31,7 @@ func TestNewAdminAPI(t *testing.T) { c.Plugin.Module = common.RandString(9) c.Plugin.SyncLinks = []string{common.RandString(9), common.RandString(9)} c.Plugin.Task = common.RandString(9) + c.Plugin.Locker = common.RandString(9) mockCtl := gomock.NewController(t) defer mockCtl.Finish() @@ -93,6 +94,11 @@ func TestNewAdminAPI(t *testing.T) { plugin.RegisterFactory(c.Plugin.Task, func() (plugin.Plugin, error) { return mockTask, nil }) + + mockLocker := mockPlugin.NewMockLocker(mockCtl) + plugin.RegisterFactory(c.Plugin.Locker, func() (plugin.Plugin, error) { + return mockLocker, nil + }) api, err := NewAPI(c) assert.NoError(t, err) assert.NotNil(t, api) diff --git a/common/context.go b/common/context.go index 00871ec2..1526793f 100644 --- a/common/context.go +++ b/common/context.go @@ -1,6 +1,7 @@ package common import ( + "context" "encoding/json" "net/http" "runtime/debug" @@ -148,6 +149,8 @@ func PopulateFailedResponse(cc *Context, err error, abort bool) { // HandlerFunc HandlerFunc type HandlerFunc func(c *Context) (interface{}, error) +type LockFunc func(ctx context.Context, name string, ttl int64) (string, error) +type UnlockFunc func(ctx context.Context, name, version string) // Wrapper Wrapper // TODO: to use gin.HandlerFunc ? @@ -176,6 +179,33 @@ func Wrapper(handler HandlerFunc) func(c *gin.Context) { } } +// WrapperWithLock wrap handler with lock +func WrapperWithLock(lockFunc LockFunc, unlockFunc UnlockFunc) func(c *gin.Context) { + return func(c *gin.Context) { + cc := NewContext(c) + defer func() { + if r := recover(); r != nil { + err, ok := r.(error) + if !ok { + err = Error(ErrUnknown, Field("error", r)) + } + log.L().Info("handle a panic", log.Any(cc.GetTrace()), log.Code(err), log.Error(err), log.Any("panic", string(debug.Stack()))) + PopulateFailedResponse(cc, err, false) + } + }() + ctx := context.Background() + lockName := "namespace_" + cc.GetNamespace() + version, err := lockFunc(ctx, lockName, 0) + if err != nil { + log.L().Error("failed to handler request", log.Any(cc.GetTrace()), log.Code(err), log.Error(err)) + PopulateFailedResponse(cc, err, true) + return + } + defer unlockFunc(ctx, lockName, version) + cc.Next() + } +} + func WrapperRaw(handler HandlerFunc, abort bool) func(c *gin.Context) { return func(c *gin.Context) { cc := NewContext(c) diff --git a/mock/plugin/lock.go b/mock/plugin/lock.go index 5c098572..1b4fcbda 100644 --- a/mock/plugin/lock.go +++ b/mock/plugin/lock.go @@ -5,6 +5,7 @@ package plugin import ( + context "context" gomock "github.com/golang/mock/gomock" reflect "reflect" ) @@ -32,44 +33,43 @@ func (m *MockLocker) EXPECT() *MockLockerMockRecorder { return m.recorder } -// Lock mocks base method -func (m *MockLocker) Lock(arg0, arg1 string) error { +// Close mocks base method +func (m *MockLocker) Close() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Lock", arg0, arg1) + ret := m.ctrl.Call(m, "Close") ret0, _ := ret[0].(error) return ret0 } -// Lock indicates an expected call of Lock -func (mr *MockLockerMockRecorder) Lock(arg0, arg1 interface{}) *gomock.Call { +// Close indicates an expected call of Close +func (mr *MockLockerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockLocker)(nil).Lock), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockLocker)(nil).Close)) } -// LockWithExpireTime mocks base method -func (m *MockLocker) LockWithExpireTime(arg0, arg1 string, arg2 int64) error { +// Lock mocks base method +func (m *MockLocker) Lock(arg0 context.Context, arg1 string, arg2 int64) (string, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LockWithExpireTime", arg0, arg1, arg2) - ret0, _ := ret[0].(error) - return ret0 + ret := m.ctrl.Call(m, "Lock", arg0, arg1, arg2) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// LockWithExpireTime indicates an expected call of LockWithExpireTime -func (mr *MockLockerMockRecorder) LockWithExpireTime(arg0, arg1, arg2 interface{}) *gomock.Call { +// Lock indicates an expected call of Lock +func (mr *MockLockerMockRecorder) Lock(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LockWithExpireTime", reflect.TypeOf((*MockLocker)(nil).LockWithExpireTime), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockLocker)(nil).Lock), arg0, arg1, arg2) } // Unlock mocks base method -func (m *MockLocker) Unlock(arg0, arg1 string) error { +func (m *MockLocker) Unlock(arg0 context.Context, arg1, arg2 string) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Unlock", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "Unlock", arg0, arg1, arg2) } // Unlock indicates an expected call of Unlock -func (mr *MockLockerMockRecorder) Unlock(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockLockerMockRecorder) Unlock(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockLocker)(nil).Unlock), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockLocker)(nil).Unlock), arg0, arg1, arg2) } diff --git a/mock/service/locker.go b/mock/service/locker.go new file mode 100644 index 00000000..f4cd2311 --- /dev/null +++ b/mock/service/locker.go @@ -0,0 +1,61 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/baetyl/baetyl-cloud/v2/service (interfaces: LockerService) + +// Package service is a generated GoMock package. +package service + +import ( + context "context" + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockLockerService is a mock of LockerService interface +type MockLockerService struct { + ctrl *gomock.Controller + recorder *MockLockerServiceMockRecorder +} + +// MockLockerServiceMockRecorder is the mock recorder for MockLockerService +type MockLockerServiceMockRecorder struct { + mock *MockLockerService +} + +// NewMockLockerService creates a new mock instance +func NewMockLockerService(ctrl *gomock.Controller) *MockLockerService { + mock := &MockLockerService{ctrl: ctrl} + mock.recorder = &MockLockerServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockLockerService) EXPECT() *MockLockerServiceMockRecorder { + return m.recorder +} + +// Lock mocks base method +func (m *MockLockerService) Lock(arg0 context.Context, arg1 string, arg2 int64) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Lock", arg0, arg1, arg2) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Lock indicates an expected call of Lock +func (mr *MockLockerServiceMockRecorder) Lock(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockLockerService)(nil).Lock), arg0, arg1, arg2) +} + +// Unlock mocks base method +func (m *MockLockerService) Unlock(arg0 context.Context, arg1, arg2 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Unlock", arg0, arg1, arg2) +} + +// Unlock indicates an expected call of Unlock +func (mr *MockLockerServiceMockRecorder) Unlock(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unlock", reflect.TypeOf((*MockLockerService)(nil).Unlock), arg0, arg1, arg2) +} diff --git a/plugin/default/lock/empty_lock.go b/plugin/default/lock/empty_lock.go index 29bf6b82..9fc470c4 100644 --- a/plugin/default/lock/empty_lock.go +++ b/plugin/default/lock/empty_lock.go @@ -1,6 +1,10 @@ package lock -import "github.com/baetyl/baetyl-cloud/v2/plugin" +import ( + "context" + + "github.com/baetyl/baetyl-cloud/v2/plugin" +) func init() { plugin.RegisterFactory("defaultlocker", New) @@ -12,16 +16,12 @@ func New() (plugin.Plugin, error) { return &emptyLocker{}, nil } -func (l *emptyLocker) Lock(name, value string) error { - return nil -} - -func (l *emptyLocker) LockWithExpireTime(name, value string, expireTime int64) error { - return nil +func (l *emptyLocker) Lock(ctx context.Context, name string, ttl int64) (string, error) { + return "", nil } -func (l *emptyLocker) Unlock(name, value string) error { - return nil +func (l *emptyLocker) Unlock(ctx context.Context, name, version string) { + return } func (l *emptyLocker) Close() error { diff --git a/plugin/default/lock/empty_lock_test.go b/plugin/default/lock/empty_lock_test.go index 8042fb0d..ec136e1b 100644 --- a/plugin/default/lock/empty_lock_test.go +++ b/plugin/default/lock/empty_lock_test.go @@ -1,6 +1,7 @@ package lock import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -9,14 +10,11 @@ import ( func TestEmptyLocker(t *testing.T) { locker := &emptyLocker{} - err := locker.Lock("", "") + res, err := locker.Lock(context.Background(), "", 0) assert.NoError(t, err) + assert.Equal(t, res, "") - err = locker.LockWithExpireTime("", "", 10) - assert.NoError(t, err) - - err = locker.Unlock("", "") - assert.NoError(t, err) + locker.Unlock(context.Background(), "", "") err = locker.Close() assert.NoError(t, err) diff --git a/plugin/lock.go b/plugin/lock.go index 30ef5d16..6ea9cd2d 100644 --- a/plugin/lock.go +++ b/plugin/lock.go @@ -1,5 +1,10 @@ package plugin +import ( + "context" + "io" +) + //go:generate mockgen -destination=../mock/plugin/lock.go -package=plugin github.com/baetyl/baetyl-cloud/v2/plugin Locker // Locker - the lock manager for baetyl cloud @@ -8,22 +13,16 @@ type Locker interface { // Lock lock the resource, Lock should be paired with Unlock. // PARAMS: // - name: the lock's name + // - ttl: expire time of lock, if 0, use default time. // RETURNS: // error: if has error else nil - Lock(name, value string) error - - // LockWithExpireTime lock the resource with expire time - // PARAMS: - // - name: the lock's name - // - expireTime(seconds): the expire time of the lock, if acquired the lock - // RETURNS: - // error: if has error else nil - LockWithExpireTime(name, value string, expireTime int64) error + Lock(ctx context.Context, name string, ttl int64) (string, error) // Unlock release the lock by name // PARAMS: // - name: the lock's name // RETURNS: // error: if has error else nil - Unlock(name, value string) error + Unlock(ctx context.Context, name, version string) + io.Closer } diff --git a/server/admin_server.go b/server/admin_server.go index 49fad4c7..863aef7a 100644 --- a/server/admin_server.go +++ b/server/admin_server.go @@ -84,7 +84,7 @@ func (s *AdminServer) InitRoute() { { configs := v1.Group("/configs") configs.GET("/:name", common.Wrapper(s.api.GetConfig)) - configs.PUT("/:name", common.Wrapper(s.api.UpdateConfig)) + configs.PUT("/:name", common.WrapperWithLock(s.api.Locker.Lock, s.api.Locker.Unlock), common.Wrapper(s.api.UpdateConfig)) configs.DELETE("/:name", common.WrapperRaw(s.api.ValidateResourceForDeleting, true), common.Wrapper(s.api.DeleteConfig)) configs.POST("", common.WrapperRaw(s.api.ValidateResourceForCreating, true), common.Wrapper(s.api.CreateConfig)) configs.GET("", common.Wrapper(s.api.ListConfig)) @@ -103,7 +103,7 @@ func (s *AdminServer) InitRoute() { { certificate := v1.Group("/certificates") certificate.GET("/:name", common.Wrapper(s.api.GetCertificate)) - certificate.PUT("/:name", common.Wrapper(s.api.UpdateCertificate)) + certificate.PUT("/:name", common.WrapperWithLock(s.api.Locker.Lock, s.api.Locker.Unlock), common.Wrapper(s.api.UpdateCertificate)) certificate.DELETE("/:name", common.WrapperRaw(s.api.ValidateResourceForDeleting, true), common.Wrapper(s.api.DeleteCertificate)) certificate.POST("", common.WrapperRaw(s.api.ValidateResourceForCreating, true), common.Wrapper(s.api.CreateCertificate)) certificate.GET("", common.Wrapper(s.api.ListCertificate)) @@ -124,7 +124,7 @@ func (s *AdminServer) InitRoute() { nodes.PUT("", common.Wrapper(s.api.GetNodes)) nodes.GET("/:name/apps", common.Wrapper(s.api.GetAppByNode)) nodes.GET("/:name/stats", common.Wrapper(s.api.GetNodeStats)) - nodes.PUT("/:name", common.Wrapper(s.api.UpdateNode)) + nodes.PUT("/:name", common.WrapperWithLock(s.api.Locker.Lock, s.api.Locker.Unlock), common.Wrapper(s.api.UpdateNode)) nodes.DELETE("/:name", common.Wrapper(s.api.DeleteNode)) nodes.POST("", s.NodeQuotaHandler, common.Wrapper(s.api.CreateNode)) nodes.GET("", common.Wrapper(s.api.ListNode)) @@ -144,9 +144,9 @@ func (s *AdminServer) InitRoute() { apps.GET("/:name/secrets", common.Wrapper(s.api.GetSysAppSecrets)) apps.GET("/:name/certificates", common.Wrapper(s.api.GetSysAppCertificates)) apps.GET("/:name/registries", common.Wrapper(s.api.GetSysAppRegistries)) - apps.PUT("/:name", common.Wrapper(s.api.UpdateApplication)) + apps.PUT("/:name", common.WrapperWithLock(s.api.Locker.Lock, s.api.Locker.Unlock), common.Wrapper(s.api.UpdateApplication)) apps.DELETE("/:name", common.WrapperRaw(s.api.ValidateResourceForDeleting, true), common.Wrapper(s.api.DeleteApplication)) - apps.POST("", common.WrapperRaw(s.api.ValidateResourceForCreating, true), common.Wrapper(s.api.CreateApplication)) + apps.POST("", common.WrapperRaw(s.api.ValidateResourceForCreating, true), common.WrapperWithLock(s.api.Locker.Lock, s.api.Locker.Unlock), common.Wrapper(s.api.CreateApplication)) apps.GET("", common.Wrapper(s.api.ListApplication)) } { diff --git a/server/admin_server_test.go b/server/admin_server_test.go index edec0cfe..ec8770d7 100644 --- a/server/admin_server_test.go +++ b/server/admin_server_test.go @@ -39,6 +39,7 @@ func initAdminServerMock(t *testing.T) (*AdminServer, *mockPlugin.MockAuth, *moc c.Plugin.Property = common.RandString(9) c.Plugin.Module = common.RandString(9) c.Plugin.Task = common.RandString(9) + c.Plugin.Locker = common.RandString(9) mockCtl := gomock.NewController(t) mockObjectStorage := mockPlugin.NewMockObject(mockCtl) @@ -103,6 +104,11 @@ func initAdminServerMock(t *testing.T) (*AdminServer, *mockPlugin.MockAuth, *moc return mockTask, nil }) + mockLocker := mockPlugin.NewMockLocker(mockCtl) + plugin.RegisterFactory(c.Plugin.Locker, func() (plugin.Plugin, error) { + return mockLocker, nil + }) + mockAPI, err := api.NewAPI(c) assert.NoError(t, err) diff --git a/server/mis_server_test.go b/server/mis_server_test.go index 6e5b1765..ce91c47a 100644 --- a/server/mis_server_test.go +++ b/server/mis_server_test.go @@ -31,6 +31,7 @@ func initMisServerMock(t *testing.T) (*MisServer, *gomock.Controller) { c.Plugin.Property = common.RandString(9) c.Plugin.Module = common.RandString(9) c.Plugin.Task = common.RandString(9) + c.Plugin.Locker = common.RandString(9) mockCtl := gomock.NewController(t) mockObjectStorage := mockPlugin.NewMockObject(mockCtl) @@ -105,6 +106,11 @@ func initMisServerMock(t *testing.T) (*MisServer, *gomock.Controller) { plugin.RegisterFactory(c.Plugin.Task, func() (plugin.Plugin, error) { return mockTask, nil }) + + mockLocker := mockPlugin.NewMockLocker(mockCtl) + plugin.RegisterFactory(c.Plugin.Locker, func() (plugin.Plugin, error) { + return mockLocker, nil + }) mockAPI, err := api.NewAPI(c) assert.NoError(t, err) diff --git a/service/locker.go b/service/locker.go new file mode 100644 index 00000000..f4176355 --- /dev/null +++ b/service/locker.go @@ -0,0 +1,24 @@ +package service + +import ( + "context" + + "github.com/baetyl/baetyl-cloud/v2/config" + "github.com/baetyl/baetyl-cloud/v2/plugin" +) + +//go:generate mockgen -destination=../mock/service/locker.go -package=service github.com/baetyl/baetyl-cloud/v2/service LockerService + +type LockerService interface { + Lock(ctx context.Context, name string, ttl int64) (string, error) + Unlock(ctx context.Context, name, value string) +} + +// NewModuleService +func NewLockerService(config *config.CloudConfig) (LockerService, error) { + locker, err := plugin.GetPlugin(config.Plugin.Locker) + if err != nil { + return nil, err + } + return locker.(plugin.Locker), nil +} diff --git a/task/task.go b/task/task.go index b5d403a1..0ba85c2a 100644 --- a/task/task.go +++ b/task/task.go @@ -89,17 +89,6 @@ func (m *TaskManager) RunTasks() { func (m *TaskManager) runTask(task *models.Task) { defer func() { <-m.concurrency }() - err := m.lock.LockWithExpireTime(task.Name, "", m.config.Lock.ExpireTime) - if err != nil { - log.L().Error("get lock error", - log.Any("name", task.Name), - log.Any("namespace", task.Namespace), - log.Any("resourceType", task.ResourceType), - log.Any("resourceName", task.ResourceName), - log.Error(err)) - return - } - psList := TaskRegister.GetProcessorListByTask(task.RegistrationName) if task.ProcessorsStatus == nil { @@ -131,7 +120,7 @@ func (m *TaskManager) runTask(task *models.Task) { task.Status = models.TaskFinished } task.Version = task.Version + 1 - _, err = m.taskService.UpdateTask(task) + _, err := m.taskService.UpdateTask(task) if err != nil { log.L().Error("update task error", log.Any("name", task.Name),