From 8da4d22b365f6c61585c235a8259caaf7d9589aa Mon Sep 17 00:00:00 2001 From: WizardXiao <89761062+WizardXiao@users.noreply.github.com> Date: Wed, 18 May 2022 18:26:39 +0800 Subject: [PATCH] This is an automated cherry-pick of #5390 Signed-off-by: ti-chi-bot --- dm/dm/master/openapi_view.go | 930 ++++++++++++++ dm/dm/master/openapi_view_test.go | 1187 ++++++++++++++++++ dm/dm/master/server.go | 10 + dm/tests/openapi/client/openapi_source_check | 65 +- dm/tests/openapi/run.sh | 71 +- dm/tests/openapi/tls_conf/ca.pem | 8 + dm/tests/openapi/tls_conf/dm-master1.toml | 16 + dm/tests/openapi/tls_conf/dm-master2.toml | 14 + dm/tests/openapi/tls_conf/dm-worker1.toml | 7 + dm/tests/openapi/tls_conf/dm-worker2.toml | 7 + dm/tests/openapi/tls_conf/dm.key | 8 + dm/tests/openapi/tls_conf/dm.pem | 10 + 12 files changed, 2329 insertions(+), 4 deletions(-) create mode 100644 dm/dm/master/openapi_view.go create mode 100644 dm/dm/master/openapi_view_test.go create mode 100644 dm/tests/openapi/tls_conf/ca.pem create mode 100644 dm/tests/openapi/tls_conf/dm-master1.toml create mode 100644 dm/tests/openapi/tls_conf/dm-master2.toml create mode 100644 dm/tests/openapi/tls_conf/dm-worker1.toml create mode 100644 dm/tests/openapi/tls_conf/dm-worker2.toml create mode 100644 dm/tests/openapi/tls_conf/dm.key create mode 100644 dm/tests/openapi/tls_conf/dm.pem diff --git a/dm/dm/master/openapi_view.go b/dm/dm/master/openapi_view.go new file mode 100644 index 00000000000..048820704ae --- /dev/null +++ b/dm/dm/master/openapi_view.go @@ -0,0 +1,930 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// this file implement all of the APIs of the DataMigration service. + +package master + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "net/http/httputil" + + "github.com/pingcap/failpoint" + + ginmiddleware "github.com/deepmap/oapi-codegen/pkg/gin-middleware" + "github.com/gin-gonic/gin" + "go.uber.org/zap" + + "github.com/pingcap/tiflow/dm/dm/config" + "github.com/pingcap/tiflow/dm/dm/master/workerrpc" + "github.com/pingcap/tiflow/dm/dm/pb" + "github.com/pingcap/tiflow/dm/openapi" + "github.com/pingcap/tiflow/dm/pkg/conn" + "github.com/pingcap/tiflow/dm/pkg/ha" + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/terror" + "github.com/pingcap/tiflow/dm/pkg/utils" +) + +const ( + docJSONBasePath = "/api/v1/dm.json" +) + +// reverseRequestToLeaderMW reverses request to leader. +func (s *Server) reverseRequestToLeaderMW(tlsCfg *tls.Config) gin.HandlerFunc { + return func(c *gin.Context) { + ctx2 := c.Request.Context() + isLeader, _ := s.isLeaderAndNeedForward(ctx2) + if isLeader { + c.Next() + } else { + // nolint:dogsled + _, _, leaderOpenAPIAddr, err := s.election.LeaderInfo(ctx2) + if err != nil { + _ = c.AbortWithError(http.StatusBadRequest, err) + return + } + + failpoint.Inject("MockNotSetTls", func() { + tlsCfg = nil + }) + // simpleProxy just reverses to leader host + simpleProxy := httputil.ReverseProxy{ + Director: func(req *http.Request) { + if tlsCfg != nil { + req.URL.Scheme = "https" + } else { + req.URL.Scheme = "http" + } + req.URL.Host = leaderOpenAPIAddr + req.Host = leaderOpenAPIAddr + }, + } + if tlsCfg != nil { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.TLSClientConfig = tlsCfg + simpleProxy.Transport = transport + } + log.L().Info("reverse request to leader", zap.String("Request URL", c.Request.URL.String()), zap.String("leader", leaderOpenAPIAddr), zap.Bool("hasTLS", tlsCfg != nil)) + simpleProxy.ServeHTTP(c.Writer, c.Request) + c.Abort() + } + } +} + +// InitOpenAPIHandles init openapi handlers. +func (s *Server) InitOpenAPIHandles(tlsCfg *tls.Config) error { + swagger, err := openapi.GetSwagger() + if err != nil { + return err + } + // disables swagger server name validation. it seems to work poorly + swagger.Servers = nil + gin.SetMode(gin.ReleaseMode) + r := gin.New() + // middlewares + r.Use(gin.Recovery()) + r.Use(openapi.ZapLogger(log.L().WithFields(zap.String("component", "openapi")).Logger)) + r.Use(s.reverseRequestToLeaderMW(tlsCfg)) + r.Use(terrorHTTPErrorHandler()) + // use validation middleware to check all requests against the OpenAPI schema. + r.Use(ginmiddleware.OapiRequestValidator(swagger)) + // register handlers + openapi.RegisterHandlers(r, s) + s.openapiHandles = r + return nil +} + +// GetDocJSON url is:(GET /api/v1/dm.json). +func (s *Server) GetDocJSON(c *gin.Context) { + var masterURL string + if info, err := s.getClusterInfo(c.Request.Context()); err != nil { + _ = c.Error(err) + return + } else if info.Topology.MasterTopologyList != nil && len(*info.Topology.MasterTopologyList) > 0 { + masterTopos := *info.Topology.MasterTopologyList + protocol := "http" + if useTLS.Load() { + protocol = "https" + } + masterURL = fmt.Sprintf("%s://%s:%d", protocol, masterTopos[0].Host, masterTopos[0].Port) + } + swagger, err := openapi.GetSwagger() + if err != nil { + _ = c.Error(err) + return + } else if masterURL != "" { + for idx := range swagger.Servers { + swagger.Servers[idx].URL = masterURL + } + } + c.JSON(http.StatusOK, swagger) +} + +// GetDocHTML url is:(GET /api/v1/docs). +func (s *Server) GetDocHTML(c *gin.Context) { + html, err := openapi.GetSwaggerHTML(openapi.NewSwaggerConfig(docJSONBasePath, "")) + if err != nil { + _ = c.Error(err) + return + } + c.Writer.WriteHeader(http.StatusOK) + _, err = c.Writer.Write([]byte(html)) + if err != nil { + _ = c.Error(err) + } +} + +// DMAPIGetClusterMasterList get cluster master node list url is:(GET /api/v1/cluster/masters). +func (s *Server) DMAPIGetClusterMasterList(c *gin.Context) { + newCtx := c.Request.Context() + memberMasters, err := s.listMemberMaster(newCtx, nil) + if err != nil { + _ = c.Error(err) + return + } + masterCnt := len(memberMasters.Master.Masters) + masters := make([]openapi.ClusterMaster, masterCnt) + for idx, master := range memberMasters.Master.Masters { + masters[idx] = openapi.ClusterMaster{ + Name: master.GetName(), + Alive: master.GetAlive(), + Addr: master.GetPeerURLs()[0], + Leader: master.GetName() == s.cfg.Name, // only leader can handle request + } + } + resp := &openapi.GetClusterMasterListResponse{Total: masterCnt, Data: masters} + c.IndentedJSON(http.StatusOK, resp) +} + +// DMAPIOfflineMasterNode offline master node url is: (DELETE /api/v1/cluster/masters/{master-name}). +func (s *Server) DMAPIOfflineMasterNode(c *gin.Context, masterName string) { + newCtx := c.Request.Context() + if err := s.deleteMasterByName(newCtx, masterName); err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusNoContent) +} + +// DMAPIGetClusterWorkerList get cluster worker node list url is: (GET /api/v1/cluster/workers). +func (s *Server) DMAPIGetClusterWorkerList(c *gin.Context) { + memberWorkers := s.listMemberWorker(nil) + workerCnt := len(memberWorkers.Worker.Workers) + workers := make([]openapi.ClusterWorker, workerCnt) + for idx, worker := range memberWorkers.Worker.Workers { + workers[idx] = openapi.ClusterWorker{ + Name: worker.GetName(), + Addr: worker.GetAddr(), + BoundSourceName: worker.GetSource(), + BoundStage: worker.GetStage(), + } + } + resp := &openapi.GetClusterWorkerListResponse{Total: workerCnt, Data: workers} + c.IndentedJSON(http.StatusOK, resp) +} + +// DMAPIOfflineWorkerNode offline worker node url is: (DELETE /api/v1/cluster/workers/{worker-name}). +func (s *Server) DMAPIOfflineWorkerNode(c *gin.Context, workerName string) { + if err := s.scheduler.RemoveWorker(workerName); err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusNoContent) +} + +// DMAPIGetClusterInfo return cluster id of dm cluster url is: (GET /api/v1/cluster/info). +func (s *Server) DMAPIGetClusterInfo(c *gin.Context) { + info, err := s.getClusterInfo(c.Request.Context()) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, info) +} + +// DMAPIGetClusterInfo return cluster id of dm cluster url is: (PUT /api/v1/cluster/info). +func (s *Server) DMAPIUpdateClusterInfo(c *gin.Context) { + var req openapi.ClusterTopology + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + info, err := s.updateClusterInfo(c.Request.Context(), &req) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, info) +} + +// DMAPICreateSource url is:(POST /api/v1/sources). +func (s *Server) DMAPICreateSource(c *gin.Context) { + var req openapi.CreateSourceRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + ctx := c.Request.Context() + source, err := s.createSource(ctx, req) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusCreated, source) +} + +// DMAPIGetSourceList url is:(GET /api/v1/sources). +func (s *Server) DMAPIGetSourceList(c *gin.Context, params openapi.DMAPIGetSourceListParams) { + ctx := c.Request.Context() + sourceList, err := s.listSource(ctx, params) + if err != nil { + _ = c.Error(err) + return + } + resp := openapi.GetSourceListResponse{Total: len(sourceList), Data: sourceList} + c.IndentedJSON(http.StatusOK, resp) +} + +// DMAPIGetSource url is:(GET /api/v1/sources/{source-name}). +func (s *Server) DMAPIGetSource(c *gin.Context, sourceName string, params openapi.DMAPIGetSourceParams) { + ctx := c.Request.Context() + source, err := s.getSource(ctx, sourceName, params) + if err != nil { + if terror.ErrSchedulerSourceCfgNotExist.Equal(err) { + c.Status(http.StatusNotFound) + return + } + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, source) +} + +// DMAPIGetSourceStatus url is: (GET /api/v1/sources/{source-id}/status). +func (s *Server) DMAPIGetSourceStatus(c *gin.Context, sourceName string) { + ctx := c.Request.Context() + withStatus := true + source, err := s.getSource(ctx, sourceName, openapi.DMAPIGetSourceParams{WithStatus: &withStatus}) + if err != nil { + _ = c.Error(err) + return + } + var resp openapi.GetSourceStatusResponse + // current this source not bound to any worker + if worker := s.scheduler.GetWorkerBySource(sourceName); worker == nil { + resp.Data = append(resp.Data, openapi.SourceStatus{SourceName: sourceName}) + resp.Total = len(resp.Data) + c.IndentedJSON(http.StatusOK, resp) + return + } + resp.Data = append(resp.Data, *source.StatusList...) + resp.Total = len(resp.Data) + c.IndentedJSON(http.StatusOK, resp) +} + +// DMAPIUpdateSource url is:(PUT /api/v1/sources/{source-name}). +func (s *Server) DMAPIUpdateSource(c *gin.Context, sourceName string) { + var req openapi.UpdateSourceRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + source, err := s.updateSource(c.Request.Context(), sourceName, req) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, source) +} + +// DMAPIDeleteSource url is:(DELETE /api/v1/sources/{source-name}). +func (s *Server) DMAPIDeleteSource(c *gin.Context, sourceName string, params openapi.DMAPIDeleteSourceParams) { + ctx := c.Request.Context() + var force bool + // force means delete source and stop all task of this source + if params.Force != nil && *params.Force { + force = *params.Force + } + if err := s.deleteSource(ctx, sourceName, force); err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusNoContent) +} + +// DMAPIEnableSource url is:(POST /api/v1/sources/{source-name}/enable). +func (s *Server) DMAPIEnableSource(c *gin.Context, sourceName string) { + ctx := c.Request.Context() + if _, err := s.getSource(ctx, sourceName, openapi.DMAPIGetSourceParams{}); err != nil { + _ = c.Error(err) + return + } + if err := s.enableSource(ctx, sourceName); err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusOK) +} + +// DMAPIDisableSource url is:(POST /api/v1/sources/{source-name}/disable). +func (s *Server) DMAPIDisableSource(c *gin.Context, sourceName string) { + ctx := c.Request.Context() + if _, err := s.getSource(ctx, sourceName, openapi.DMAPIGetSourceParams{}); err != nil { + _ = c.Error(err) + return + } + if err := s.disableSource(ctx, sourceName); err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusOK) +} + +// DMAPITransferSource transfer source to another free worker url is: (POST /api/v1/sources/{source-name}/transfer). +func (s *Server) DMAPITransferSource(c *gin.Context, sourceName string) { + var req openapi.WorkerNameRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + if err := s.transferSource(c.Request.Context(), sourceName, req.WorkerName); err != nil { + _ = c.Error(err) + } +} + +// DMAPIGetSourceSchemaList get source schema list url is: (GET /api/v1/sources/{source-name}/schemas). +func (s *Server) DMAPIGetSourceSchemaList(c *gin.Context, sourceName string) { + baseDB, err := s.getBaseDBBySourceName(sourceName) + if err != nil { + _ = c.Error(err) + return + } + defer baseDB.Close() + schemaList, err := utils.GetSchemaList(c.Request.Context(), baseDB.DB) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, schemaList) +} + +// DMAPIGetSourceTableList get source table list url is: (GET /api/v1/sources/{source-name}/schemas/{schema-name}). +func (s *Server) DMAPIGetSourceTableList(c *gin.Context, sourceName string, schemaName string) { + baseDB, err := s.getBaseDBBySourceName(sourceName) + if err != nil { + _ = c.Error(err) + return + } + defer baseDB.Close() + tableList, err := utils.GetTableList(c.Request.Context(), baseDB.DB, schemaName) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, tableList) +} + +// DMAPIEnableRelay url is:(POST /api/v1/relay/enable). +func (s *Server) DMAPIEnableRelay(c *gin.Context, sourceName string) { + var req openapi.EnableRelayRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + if err := s.enableRelay(c.Request.Context(), sourceName, req); err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusOK) +} + +// DMAPIEnableRelay url is:(POST /api/v1/relay/disable). +func (s *Server) DMAPIDisableRelay(c *gin.Context, sourceName string) { + var req openapi.DisableRelayRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + if err := s.disableRelay(c.Request.Context(), sourceName, req); err != nil { + _ = c.Error(err) + } + c.Status(http.StatusOK) +} + +// DMAPIPurgeRelay url is:(POST /api/v1/relay/purge). +func (s *Server) DMAPIPurgeRelay(c *gin.Context, sourceName string) { + var req openapi.PurgeRelayRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + if err := s.purgeRelay(c.Request.Context(), sourceName, req); err != nil { + _ = c.Error(err) + } + c.Status(http.StatusOK) +} + +func (s *Server) getBaseDBBySourceName(sourceName string) (*conn.BaseDB, error) { + sourceCfg := s.scheduler.GetSourceCfgByID(sourceName) + if sourceCfg == nil { + return nil, terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName) + } + dbCfg := sourceCfg.GenerateDBConfig() + return conn.DefaultDBProvider.Apply(dbCfg) +} + +// DMAPICreateTask url is:(POST /api/v1/tasks). +func (s *Server) DMAPICreateTask(c *gin.Context) { + var req openapi.CreateTaskRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + res, err := s.createTask(c.Request.Context(), req) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusCreated, res) +} + +// DMAPIUpdateTask url is: (PUT /api/v1/tasks/{task-name}). +func (s *Server) DMAPIUpdateTask(c *gin.Context, taskName string) { + var req openapi.UpdateTaskRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + res, err := s.updateTask(c.Request.Context(), req) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, res) +} + +// DMAPIDeleteTask url is:(DELETE /api/v1/tasks). +func (s *Server) DMAPIDeleteTask(c *gin.Context, taskName string, params openapi.DMAPIDeleteTaskParams) { + ctx := c.Request.Context() + var force bool + if params.Force != nil && *params.Force { + force = *params.Force + } + if err := s.deleteTask(ctx, taskName, force); err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusNoContent) +} + +// DMAPIGetTask url is:(GET /api/v1/tasks/{task-name}). +func (s *Server) DMAPIGetTask(c *gin.Context, taskName string, params openapi.DMAPIGetTaskParams) { + ctx := c.Request.Context() + task, err := s.getTask(ctx, taskName, params) + if err != nil { + if terror.ErrSchedulerSourceCfgNotExist.Equal(err) { + c.Status(http.StatusNotFound) + return + } + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, task) +} + +// DMAPIGetTaskStatus url is:(GET /api/v1/tasks/{task-name}/status). +func (s *Server) DMAPIGetTaskStatus(c *gin.Context, taskName string, params openapi.DMAPIGetTaskStatusParams) { + ctx := c.Request.Context() + withStatus := true + task, err := s.getTask(ctx, taskName, openapi.DMAPIGetTaskParams{WithStatus: &withStatus}) + if err != nil { + _ = c.Error(err) + return + } + resp := openapi.GetTaskStatusResponse{Total: len(*task.StatusList), Data: *task.StatusList} + c.IndentedJSON(http.StatusOK, resp) +} + +// DMAPIGetTaskList url is:(GET /api/v1/tasks). +func (s *Server) DMAPIGetTaskList(c *gin.Context, params openapi.DMAPIGetTaskListParams) { + ctx := c.Request.Context() + taskList, err := s.listTask(ctx, params) + if err != nil { + _ = c.Error(err) + return + } + resp := openapi.GetTaskListResponse{Total: len(taskList), Data: taskList} + c.IndentedJSON(http.StatusOK, resp) +} + +// DMAPIStartTask url is: (POST /api/v1/tasks/{task-name}/start). +func (s *Server) DMAPIStartTask(c *gin.Context, taskName string) { + var req openapi.StartTaskRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + ctx := c.Request.Context() + if err := s.startTask(ctx, taskName, req); err != nil { + _ = c.Error(err) + } + c.Status(http.StatusOK) +} + +// DMAPIStopTask url is: (POST /api/v1/tasks/{task-name}/stop). +func (s *Server) DMAPIStopTask(c *gin.Context, taskName string) { + var req openapi.StopTaskRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + ctx := c.Request.Context() + if err := s.stopTask(ctx, taskName, req); err != nil { + _ = c.Error(err) + } + c.Status(http.StatusOK) +} + +// DMAPIGetSchemaListByTaskAndSource get task source schema list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas). +func (s *Server) DMAPIGetSchemaListByTaskAndSource(c *gin.Context, taskName string, sourceName string) { + worker := s.scheduler.GetWorkerBySource(sourceName) + if worker == nil { + _ = c.Error(terror.ErrWorkerNoStart) + return + } + workerReq := workerrpc.Request{ + Type: workerrpc.CmdOperateSchema, + OperateSchema: &pb.OperateWorkerSchemaRequest{ + Op: pb.SchemaOp_ListSchema, + Task: taskName, + Source: sourceName, + }, + } + newCtx := c.Request.Context() + resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout) + if err != nil { + _ = c.Error(err) + return + } + if !resp.OperateSchema.Result { + _ = c.Error(terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)) + return + } + schemaList := openapi.SchemaNameList{} + if err := json.Unmarshal([]byte(resp.OperateSchema.Msg), &schemaList); err != nil { + _ = c.Error(terror.ErrSchemaTrackerUnMarshalJSON.Delegate(err, resp.OperateSchema.Msg)) + return + } + c.IndentedJSON(http.StatusOK, schemaList) +} + +// DMAPIGetTaskMigrateTargets get task migrate targets list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/migrate_targets). +func (s *Server) DMAPIGetTaskMigrateTargets(c *gin.Context, taskName string, sourceName string, params openapi.DMAPIGetTaskMigrateTargetsParams) { + worker := s.scheduler.GetWorkerBySource(sourceName) + if worker == nil { + _ = c.Error(terror.ErrWorkerNoStart) + return + } + var schemaPattern, tablePattern string + if params.SchemaPattern != nil { + schemaPattern = *params.SchemaPattern + } + if params.TablePattern != nil { + tablePattern = *params.TablePattern + } + workerReq := workerrpc.Request{ + Type: workerrpc.CmdOperateSchema, + OperateSchema: &pb.OperateWorkerSchemaRequest{ + Op: pb.SchemaOp_ListMigrateTargets, + Task: taskName, + Source: sourceName, + Schema: schemaPattern, + Table: tablePattern, + }, + } + newCtx := c.Request.Context() + resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout) + if err != nil { + _ = c.Error(err) + return + } + if !resp.OperateSchema.Result { + _ = c.Error(terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)) + return + } + targets := []openapi.TaskMigrateTarget{} + if err := json.Unmarshal([]byte(resp.OperateSchema.Msg), &targets); err != nil { + _ = c.Error(terror.ErrSchemaTrackerUnMarshalJSON.Delegate(err, resp.OperateSchema.Msg)) + return + } + c.IndentedJSON(http.StatusOK, openapi.GetTaskMigrateTargetsResponse{Data: targets, Total: len(targets)}) +} + +// DMAPIGetTableListByTaskAndSource get task source table list url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}). +func (s *Server) DMAPIGetTableListByTaskAndSource(c *gin.Context, taskName string, sourceName string, schemaName string) { + worker := s.scheduler.GetWorkerBySource(sourceName) + if worker == nil { + _ = c.Error(terror.ErrWorkerNoStart) + return + } + workerReq := workerrpc.Request{ + Type: workerrpc.CmdOperateSchema, + OperateSchema: &pb.OperateWorkerSchemaRequest{ + Op: pb.SchemaOp_ListTable, + Task: taskName, + Source: sourceName, + Database: schemaName, + }, + } + newCtx := c.Request.Context() + resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout) + if err != nil { + _ = c.Error(err) + return + } + if !resp.OperateSchema.Result { + _ = c.Error(terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)) + return + } + tableList := openapi.TableNameList{} + if err := json.Unmarshal([]byte(resp.OperateSchema.Msg), &tableList); err != nil { + _ = c.Error(terror.ErrSchemaTrackerUnMarshalJSON.Delegate(err, resp.OperateSchema.Msg)) + return + } + c.IndentedJSON(http.StatusOK, tableList) +} + +// DMAPIGetTableStructure get task source table structure url is: (GET /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}). +func (s *Server) DMAPIGetTableStructure(c *gin.Context, taskName string, sourceName string, schemaName string, tableName string) { + worker := s.scheduler.GetWorkerBySource(sourceName) + if worker == nil { + _ = c.Error(terror.ErrWorkerNoStart) + return + } + workerReq := workerrpc.Request{ + Type: workerrpc.CmdOperateSchema, + OperateSchema: &pb.OperateWorkerSchemaRequest{ + Op: pb.SchemaOp_GetSchema, + Task: taskName, + Source: sourceName, + Database: schemaName, + Table: tableName, + }, + } + newCtx := c.Request.Context() + resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout) + if err != nil { + _ = c.Error(err) + return + } + if !resp.OperateSchema.Result { + _ = c.Error(terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)) + return + } + taskTableStruct := openapi.GetTaskTableStructureResponse{ + SchemaCreateSql: &resp.OperateSchema.Msg, + SchemaName: &schemaName, + TableName: tableName, + } + c.IndentedJSON(http.StatusOK, taskTableStruct) +} + +// DMAPIDeleteTableStructure delete task source table structure url is: (DELETE /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}). +func (s *Server) DMAPIDeleteTableStructure(c *gin.Context, taskName string, sourceName string, schemaName string, tableName string) { + worker := s.scheduler.GetWorkerBySource(sourceName) + if worker == nil { + _ = c.Error(terror.ErrWorkerNoStart) + return + } + workerReq := workerrpc.Request{ + Type: workerrpc.CmdOperateSchema, + OperateSchema: &pb.OperateWorkerSchemaRequest{ + Op: pb.SchemaOp_RemoveSchema, + Task: taskName, + Source: sourceName, + Database: schemaName, + Table: tableName, + }, + } + newCtx := c.Request.Context() + resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout) + if err != nil { + _ = c.Error(err) + return + } + if !resp.OperateSchema.Result { + _ = c.Error(terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)) + return + } + c.Status(http.StatusNoContent) +} + +// DMAPIOperateTableStructure operate task source table structure url is: (PUT /api/v1/tasks/{task-name}/sources/{source-name}/schemas/{schema-name}/{table-name}). +func (s *Server) DMAPIOperateTableStructure(c *gin.Context, taskName string, sourceName string, schemaName string, tableName string) { + var req openapi.OperateTaskTableStructureRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + worker := s.scheduler.GetWorkerBySource(sourceName) + if worker == nil { + _ = c.Error(terror.ErrWorkerNoStart) + return + } + opReq := &pb.OperateWorkerSchemaRequest{ + Op: pb.SchemaOp_SetSchema, + Task: taskName, + Source: sourceName, + Database: schemaName, + Table: tableName, + Schema: req.SqlContent, + Sync: *req.Sync, + Flush: *req.Flush, + } + if req.Sync != nil { + opReq.Sync = *req.Sync + } + if req.Flush != nil { + opReq.Flush = *req.Flush + } + workerReq := workerrpc.Request{Type: workerrpc.CmdOperateSchema, OperateSchema: opReq} + newCtx := c.Request.Context() + resp, err := worker.SendRequest(newCtx, &workerReq, s.cfg.RPCTimeout) + if err != nil { + _ = c.Error(err) + return + } + if !resp.OperateSchema.Result { + _ = c.Error(terror.ErrOpenAPICommonError.New(resp.OperateSchema.Msg)) + return + } +} + +// DMAPIConvertTask turns task into the format of a configuration file or vice versa url is: (POST /api/v1/tasks/,). +func (s *Server) DMAPIConvertTask(c *gin.Context) { + var req openapi.ConverterTaskRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + if req.Task == nil && req.TaskConfigFile == nil { + _ = c.Error(terror.ErrOpenAPICommonError.Generate("request body is invalid one of `task` or `task_config_file` must be entered.")) + return + } + task, taskCfg, err := s.convertTaskConfig(c.Request.Context(), req) + if err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, openapi.ConverterTaskResponse{Task: *task, TaskConfigFile: taskCfg.String()}) +} + +// DMAPIImportTaskTemplate create task_config_template url is: (POST /api/v1/tasks/templates/import). +func (s *Server) DMAPIImportTaskTemplate(c *gin.Context) { + var req openapi.TaskTemplateRequest + if err := c.Bind(&req); err != nil { + _ = c.Error(err) + return + } + resp := openapi.TaskTemplateResponse{ + FailedTaskList: []struct { + ErrorMsg string `json:"error_msg"` + TaskName string `json:"task_name"` + }{}, + SuccessTaskList: []string{}, + } + for _, task := range config.SubTaskConfigsToOpenAPITaskList(s.scheduler.GetALlSubTaskCfgs()) { + if err := ha.PutOpenAPITaskTemplate(s.etcdClient, *task, req.Overwrite); err != nil { + resp.FailedTaskList = append(resp.FailedTaskList, struct { + ErrorMsg string `json:"error_msg"` + TaskName string `json:"task_name"` + }{ + ErrorMsg: err.Error(), + TaskName: task.Name, + }) + } else { + resp.SuccessTaskList = append(resp.SuccessTaskList, task.Name) + } + } + c.IndentedJSON(http.StatusAccepted, resp) +} + +// DMAPICreateTaskTemplate create task_config_template url is: (POST /api/tasks/templates). +func (s *Server) DMAPICreateTaskTemplate(c *gin.Context) { + task := &openapi.Task{} + if err := c.Bind(task); err != nil { + _ = c.Error(err) + return + } + if err := task.Adjust(); err != nil { + _ = c.Error(err) + return + } + // prepare target db config + newCtx := c.Request.Context() + toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task) + if adjustDBErr := adjustTargetDB(newCtx, toDBCfg); adjustDBErr != nil { + _ = c.Error(terror.WithClass(adjustDBErr, terror.ClassDMMaster)) + return + } + if err := ha.PutOpenAPITaskTemplate(s.etcdClient, *task, false); err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusCreated, task) +} + +// DMAPIGetTaskTemplateList get task_config_template list url is: (GET /api/v1/tasks/templates). +func (s *Server) DMAPIGetTaskTemplateList(c *gin.Context) { + TaskConfigList, err := ha.GetAllOpenAPITaskTemplate(s.etcdClient) + if err != nil { + _ = c.Error(err) + return + } + taskList := make([]openapi.Task, len(TaskConfigList)) + for i, TaskConfig := range TaskConfigList { + taskList[i] = *TaskConfig + } + resp := openapi.GetTaskListResponse{Total: len(TaskConfigList), Data: taskList} + c.IndentedJSON(http.StatusOK, resp) +} + +// DMAPIDeleteTaskTemplate delete task_config_template url is: (DELETE /api/v1/tasks/templates/{task-name}). +func (s *Server) DMAPIDeleteTaskTemplate(c *gin.Context, taskName string) { + if err := ha.DeleteOpenAPITaskTemplate(s.etcdClient, taskName); err != nil { + _ = c.Error(err) + return + } + c.Status(http.StatusNoContent) +} + +// DMAPIGetTaskTemplate get task_config_template url is: (GET /api/v1/tasks/templates/{task-name}). +func (s *Server) DMAPIGetTaskTemplate(c *gin.Context, taskName string) { + task, err := ha.GetOpenAPITaskTemplate(s.etcdClient, taskName) + if err != nil { + _ = c.Error(err) + return + } + if task == nil { + _ = c.Error(terror.ErrOpenAPITaskConfigNotExist.Generate(taskName)) + return + } + c.IndentedJSON(http.StatusOK, task) +} + +// DMAPUpdateTaskTemplate update task_config_template url is: (PUT /api/v1/tasks/templates/{task-name}). +func (s *Server) DMAPUpdateTaskTemplate(c *gin.Context, taskName string) { + task := &openapi.Task{} + if err := c.Bind(task); err != nil { + _ = c.Error(err) + return + } + if err := task.Adjust(); err != nil { + _ = c.Error(err) + return + } + newCtx := c.Request.Context() + toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task) + if adjustDBErr := adjustTargetDB(newCtx, toDBCfg); adjustDBErr != nil { + _ = c.Error(terror.WithClass(adjustDBErr, terror.ClassDMMaster)) + return + } + if err := ha.UpdateOpenAPITaskTemplate(s.etcdClient, *task); err != nil { + _ = c.Error(err) + return + } + c.IndentedJSON(http.StatusOK, task) +} + +func terrorHTTPErrorHandler() gin.HandlerFunc { + return func(c *gin.Context) { + c.Next() + gErr := c.Errors.Last() + if gErr == nil { + return + } + var code int + var msg string + if tErr, ok := gErr.Err.(*terror.Error); ok { + code = int(tErr.Code()) + msg = tErr.Error() + } else { + msg = gErr.Error() + } + c.IndentedJSON(http.StatusBadRequest, openapi.ErrorWithMessage{ErrorMsg: msg, ErrorCode: code}) + } +} diff --git a/dm/dm/master/openapi_view_test.go b/dm/dm/master/openapi_view_test.go new file mode 100644 index 00000000000..16abd4d399b --- /dev/null +++ b/dm/dm/master/openapi_view_test.go @@ -0,0 +1,1187 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// this file implement all of the APIs of the DataMigration service. + +package master + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/deepmap/oapi-codegen/pkg/testutil" + "github.com/golang/mock/gomock" + "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/tempurl" + + "github.com/pingcap/tiflow/dm/checker" + "github.com/pingcap/tiflow/dm/dm/config" + "github.com/pingcap/tiflow/dm/dm/pb" + "github.com/pingcap/tiflow/dm/dm/pbmock" + "github.com/pingcap/tiflow/dm/openapi" + "github.com/pingcap/tiflow/dm/openapi/fixtures" + "github.com/pingcap/tiflow/dm/pkg/conn" + "github.com/pingcap/tiflow/dm/pkg/ha" + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/terror" + "github.com/pingcap/tiflow/dm/pkg/utils" +) + +// some data for test. +var ( + source1Name = "mysql-replica-01" +) + +func setupTestServer(ctx context.Context, t *testing.T) *Server { + t.Helper() + // create a new cluster + cfg1 := NewConfig() + require.NoError(t, cfg1.FromContent(SampleConfig)) + cfg1.Name = "dm-master-1" + cfg1.DataDir = t.TempDir() + cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg1.PeerUrls = tempurl.Alloc() + cfg1.AdvertisePeerUrls = cfg1.PeerUrls + cfg1.AdvertiseAddr = cfg1.MasterAddr + cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls) + cfg1.OpenAPI = true + + s1 := NewServer(cfg1) + require.NoError(t, s1.Start(ctx)) + // wait the first one become the leader + require.True(t, utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return s1.election.IsLeader() && s1.scheduler.Started() + })) + return s1 +} + +// nolint:unparam +func mockRelayQueryStatus( + mockWorkerClient *pbmock.MockWorkerClient, sourceName, workerName string, stage pb.Stage, +) { + queryResp := &pb.QueryStatusResponse{ + Result: true, + SourceStatus: &pb.SourceStatus{ + Worker: workerName, + Source: sourceName, + }, + } + if stage == pb.Stage_Running { + queryResp.SourceStatus.RelayStatus = &pb.RelayStatus{Stage: stage} + } + if stage == pb.Stage_Paused { + queryResp.Result = false + queryResp.Msg = "some error happened" + } + mockWorkerClient.EXPECT().QueryStatus( + gomock.Any(), + &pb.QueryStatusRequest{Name: ""}, + ).Return(queryResp, nil).MaxTimes(maxRetryNum) +} + +// nolint:unparam +func mockPurgeRelay(mockWorkerClient *pbmock.MockWorkerClient) { + resp := &pb.CommonWorkerResponse{Result: true} + mockWorkerClient.EXPECT().PurgeRelay(gomock.Any(), gomock.Any()).Return(resp, nil).MaxTimes(maxRetryNum) +} + +func mockTaskQueryStatus( + mockWorkerClient *pbmock.MockWorkerClient, taskName, sourceName, workerName string, needError bool, +) { + var queryResp *pb.QueryStatusResponse + if needError { + queryResp = &pb.QueryStatusResponse{ + Result: false, + Msg: "some error happened", + SourceStatus: &pb.SourceStatus{ + Worker: workerName, + Source: sourceName, + }, + } + } else { + queryResp = &pb.QueryStatusResponse{ + Result: true, + SourceStatus: &pb.SourceStatus{ + Worker: workerName, + Source: sourceName, + }, + SubTaskStatus: []*pb.SubTaskStatus{ + { + Stage: pb.Stage_Running, + Name: taskName, + Status: &pb.SubTaskStatus_Dump{ + Dump: &pb.DumpStatus{ + CompletedTables: 0.0, + EstimateTotalRows: 10.0, + FinishedBytes: 0.0, + FinishedRows: 5.0, + TotalTables: 1, + }, + }, + }, + }, + } + } + + mockWorkerClient.EXPECT().QueryStatus( + gomock.Any(), + gomock.Any(), + ).Return(queryResp, nil).MaxTimes(maxRetryNum) +} + +func mockCheckSyncConfig(ctx context.Context, cfgs []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) { + return "", nil +} + +type OpenAPIViewSuite struct { + suite.Suite +} + +func (s *OpenAPIViewSuite) SetupSuite() { + s.NoError(log.InitLogger(&log.Config{})) +} + +func (s *OpenAPIViewSuite) SetupTest() { + checker.CheckSyncConfigFunc = mockCheckSyncConfig + checkAndAdjustSourceConfigFunc = checkAndNoAdjustSourceConfigMock + s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`)) + s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData", `return(true)`)) +} + +func (s *OpenAPIViewSuite) TearDownTest() { + checker.CheckSyncConfigFunc = checker.CheckSyncConfig + checkAndAdjustSourceConfigFunc = checkAndAdjustSourceConfig + s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB")) + s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipRemoveMetaData")) +} + +func (s *OpenAPIViewSuite) TestClusterAPI() { + ctx1, cancel1 := context.WithCancel(context.Background()) + s1 := setupTestServer(ctx1, s.T()) + defer func() { + cancel1() + s1.Close() + }() + + // join a new master node to an existing cluster + cfg2 := NewConfig() + s.Nil(cfg2.FromContent(SampleConfig)) + cfg2.Name = "dm-master-2" + cfg2.DataDir = s.T().TempDir() + cfg2.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg2.PeerUrls = tempurl.Alloc() + cfg2.AdvertisePeerUrls = cfg2.PeerUrls + cfg2.Join = s1.cfg.MasterAddr // join to an existing cluster + cfg2.AdvertiseAddr = cfg2.MasterAddr + s2 := NewServer(cfg2) + ctx2, cancel2 := context.WithCancel(context.Background()) + require.NoError(s.T(), s2.Start(ctx2)) + + defer func() { + cancel2() + s2.Close() + }() + + baseURL := "/api/v1/cluster/" + masterURL := baseURL + "masters" + + result := testutil.NewRequest().Get(masterURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var resultMasters openapi.GetClusterMasterListResponse + err := result.UnmarshalBodyToObject(&resultMasters) + s.NoError(err) + s.Equal(2, resultMasters.Total) + s.Equal(s1.cfg.Name, resultMasters.Data[0].Name) + s.Equal(s1.cfg.PeerUrls, resultMasters.Data[0].Addr) + s.True(resultMasters.Data[0].Leader) + s.True(resultMasters.Data[0].Alive) + + // check cluster id + clusterInfoURL := baseURL + "info" + result = testutil.NewRequest().Get(clusterInfoURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var info openapi.GetClusterInfoResponse + s.NoError(result.UnmarshalBodyToObject(&info)) + s.Greater(info.ClusterId, uint64(0)) + s.Nil(info.Topology) + + // update topo info + fakeHost := "1.1.1.1" + fakePort := 8261 + masterTopo := []openapi.MasterTopology{{Host: fakeHost, Port: fakePort}} + workerTopo := []openapi.WorkerTopology{{Host: fakeHost, Port: fakePort}} + grafanaTopo := openapi.GrafanaTopology{Host: fakeHost, Port: fakePort} + prometheusTopo := openapi.PrometheusTopology{Host: fakeHost, Port: fakePort} + alertMangerTopo := openapi.AlertManagerTopology{Host: fakeHost, Port: fakePort} + topo := openapi.ClusterTopology{ + MasterTopologyList: &masterTopo, + WorkerTopologyList: &workerTopo, + GrafanaTopology: &grafanaTopo, + AlertManagerTopology: &alertMangerTopo, + PrometheusTopology: &prometheusTopo, + } + result = testutil.NewRequest().Put(clusterInfoURL).WithJsonBody(topo).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + info = openapi.GetClusterInfoResponse{} + s.NoError(result.UnmarshalBodyToObject(&info)) + s.EqualValues(&topo, info.Topology) + // get again + result = testutil.NewRequest().Get(clusterInfoURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&info)) + s.EqualValues(&topo, info.Topology) + + // offline master-2 with retry + // operate etcd cluster may met `etcdserver: unhealthy cluster`, add some retry + for i := 0; i < 20; i++ { + result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", masterURL, s2.cfg.Name)).GoWithHTTPHandler(s.T(), s1.openapiHandles) + if result.Code() == http.StatusBadRequest { + s.Equal(http.StatusBadRequest, result.Code()) + errResp := &openapi.ErrorWithMessage{} + err = result.UnmarshalBodyToObject(errResp) + s.Nil(err) + s.Regexp("etcdserver: unhealthy cluster", errResp.ErrorMsg) + time.Sleep(time.Second) + } else { + s.Equal(http.StatusNoContent, result.Code()) + break + } + } + cancel2() // stop dm-master-2 + + // list master again get one node + result = testutil.NewRequest().Get(masterURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&resultMasters)) + s.Equal(1, resultMasters.Total) + + workerName1 := "worker1" + s.NoError(s1.scheduler.AddWorker(workerName1, "172.16.10.72:8262")) + // list worker node + workerURL := baseURL + "workers" + result = testutil.NewRequest().Get(workerURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + var resultWorkers openapi.GetClusterWorkerListResponse + s.NoError(result.UnmarshalBodyToObject(&resultWorkers)) + s.Equal(1, resultWorkers.Total) + + // offline worker-1 + result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", workerURL, workerName1)).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusNoContent, result.Code()) + // after offline, no worker node + result = testutil.NewRequest().Get(workerURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + err = result.UnmarshalBodyToObject(&resultWorkers) + s.Nil(err) + s.Equal(0, resultWorkers.Total) + + cancel1() +} + +func (s *OpenAPIViewSuite) TestReverseRequestToLeader() { + ctx1, cancel1 := context.WithCancel(context.Background()) + s1 := setupTestServer(ctx1, s.T()) + defer func() { + cancel1() + s1.Close() + }() + + // join a new master node to an existing cluster + cfg2 := NewConfig() + s.Nil(cfg2.FromContent(SampleConfig)) + cfg2.Name = "dm-master-2" + cfg2.DataDir = s.T().TempDir() + cfg2.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg2.PeerUrls = tempurl.Alloc() + cfg2.AdvertisePeerUrls = cfg2.PeerUrls + cfg2.Join = s1.cfg.MasterAddr // join to an existing cluster + cfg2.AdvertiseAddr = cfg2.MasterAddr + cfg2.OpenAPI = true + s2 := NewServer(cfg2) + ctx2, cancel2 := context.WithCancel(context.Background()) + require.NoError(s.T(), s2.Start(ctx2)) + + defer func() { + cancel2() + s2.Close() + }() + + // wait the second master ready + s.False(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return s2.election.IsLeader() + })) + + baseURL := "/api/v1/sources" + // list source from leader + result := testutil.NewRequest().Get(baseURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var resultListSource openapi.GetSourceListResponse + s.NoError(result.UnmarshalBodyToObject(&resultListSource)) + s.Len(resultListSource.Data, 0) + s.Equal(0, resultListSource.Total) + + // list source from non-leader will get result too + result, err := HTTPTestWithTestResponseRecorder(testutil.NewRequest().Get(baseURL), s2.openapiHandles) + s.NoError(err) + s.Equal(http.StatusOK, result.Code()) + var resultListSource2 openapi.GetSourceListResponse + s.NoError(result.UnmarshalBodyToObject(&resultListSource2)) + s.Len(resultListSource2.Data, 0) + s.Equal(0, resultListSource2.Total) +} + +func (s *OpenAPIViewSuite) TestReverseRequestToHttpsLeader() { + pwd, err := os.Getwd() + require.NoError(s.T(), err) + caPath := pwd + "/tls_for_test/ca.pem" + certPath := pwd + "/tls_for_test/dm.pem" + keyPath := pwd + "/tls_for_test/dm.key" + + // master1 + masterAddr1 := tempurl.Alloc()[len("http://"):] + peerAddr1 := tempurl.Alloc()[len("http://"):] + cfg1 := NewConfig() + require.NoError(s.T(), cfg1.Parse([]string{ + "--name=dm-master-tls-1", + fmt.Sprintf("--data-dir=%s", s.T().TempDir()), + fmt.Sprintf("--master-addr=https://%s", masterAddr1), + fmt.Sprintf("--advertise-addr=https://%s", masterAddr1), + fmt.Sprintf("--peer-urls=https://%s", peerAddr1), + fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr1), + fmt.Sprintf("--initial-cluster=dm-master-tls-1=https://%s", peerAddr1), + "--ssl-ca=" + caPath, + "--ssl-cert=" + certPath, + "--ssl-key=" + keyPath, + })) + cfg1.OpenAPI = true + s1 := NewServer(cfg1) + ctx1, cancel1 := context.WithCancel(context.Background()) + require.NoError(s.T(), s1.Start(ctx1)) + defer func() { + cancel1() + s1.Close() + }() + // wait the first one become the leader + require.True(s.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return s1.election.IsLeader() && s1.scheduler.Started() + })) + + // master2 + masterAddr2 := tempurl.Alloc()[len("http://"):] + peerAddr2 := tempurl.Alloc()[len("http://"):] + cfg2 := NewConfig() + require.NoError(s.T(), cfg2.Parse([]string{ + "--name=dm-master-tls-2", + fmt.Sprintf("--data-dir=%s", s.T().TempDir()), + fmt.Sprintf("--master-addr=https://%s", masterAddr2), + fmt.Sprintf("--advertise-addr=https://%s", masterAddr2), + fmt.Sprintf("--peer-urls=https://%s", peerAddr2), + fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr2), + "--ssl-ca=" + caPath, + "--ssl-cert=" + certPath, + "--ssl-key=" + keyPath, + })) + cfg2.OpenAPI = true + cfg2.Join = s1.cfg.MasterAddr // join to an existing cluster + s2 := NewServer(cfg2) + ctx2, cancel2 := context.WithCancel(context.Background()) + require.NoError(s.T(), s2.Start(ctx2)) + defer func() { + cancel2() + s2.Close() + }() + // wait the second master ready + require.False(s.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return s2.election.IsLeader() + })) + + baseURL := "/api/v1/sources" + // list source from leader + result := testutil.NewRequest().Get(baseURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var resultListSource openapi.GetSourceListResponse + s.NoError(result.UnmarshalBodyToObject(&resultListSource)) + s.Len(resultListSource.Data, 0) + s.Equal(0, resultListSource.Total) + + // with tls, list source not from leader will get result too + result, err = HTTPTestWithTestResponseRecorder(testutil.NewRequest().Get(baseURL), s2.openapiHandles) + s.NoError(err) + s.Equal(http.StatusOK, result.Code()) + var resultListSource2 openapi.GetSourceListResponse + s.NoError(result.UnmarshalBodyToObject(&resultListSource2)) + s.Len(resultListSource2.Data, 0) + s.Equal(0, resultListSource2.Total) + + // without tls, list source not from leader will be 502 + s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockNotSetTls", `return()`)) + result, err = HTTPTestWithTestResponseRecorder(testutil.NewRequest().Get(baseURL), s2.openapiHandles) + s.NoError(err) + s.Equal(http.StatusBadGateway, result.Code()) + s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockNotSetTls")) +} + +// httptest.ResponseRecorder is not http.CloseNotifier, will panic when test reverse proxy. +// We need to implement the interface ourselves. +// ref: https://github.com/gin-gonic/gin/blob/ce20f107f5dc498ec7489d7739541a25dcd48463/context_test.go#L1747-L1765 +type TestResponseRecorder struct { + *httptest.ResponseRecorder + closeChannel chan bool +} + +func (r *TestResponseRecorder) CloseNotify() <-chan bool { + return r.closeChannel +} + +func (r *TestResponseRecorder) closeClient() { + r.closeChannel <- true +} + +func CreateTestResponseRecorder() *TestResponseRecorder { + return &TestResponseRecorder{ + httptest.NewRecorder(), + make(chan bool, 1), + } +} + +func HTTPTestWithTestResponseRecorder(r *testutil.RequestBuilder, handler http.Handler) (*testutil.CompletedRequest, error) { + if r == nil { + return nil, nil + } + if r.Error != nil { + return nil, r.Error + } + var bodyReader io.Reader + if r.Body != nil { + bodyReader = bytes.NewReader(r.Body) + } + + req := httptest.NewRequest(r.Method, r.Path, bodyReader) + for h, v := range r.Headers { + req.Header.Add(h, v) + } + if host, ok := r.Headers["Host"]; ok { + req.Host = host + } + for _, c := range r.Cookies { + req.AddCookie(c) + } + + rec := CreateTestResponseRecorder() + handler.ServeHTTP(rec, req) + + return &testutil.CompletedRequest{ + Recorder: rec.ResponseRecorder, + }, nil +} + +func (s *OpenAPIViewSuite) TestOpenAPIWillNotStartInDefaultConfig() { + // create a new cluster + cfg1 := NewConfig() + s.NoError(cfg1.FromContent(SampleConfig)) + cfg1.Name = "dm-master-1" + cfg1.DataDir = s.T().TempDir() + cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] + cfg1.AdvertiseAddr = cfg1.MasterAddr + cfg1.PeerUrls = tempurl.Alloc() + cfg1.AdvertisePeerUrls = cfg1.PeerUrls + cfg1.InitialCluster = fmt.Sprintf("%s=%s", cfg1.Name, cfg1.AdvertisePeerUrls) + + s1 := NewServer(cfg1) + ctx, cancel := context.WithCancel(context.Background()) + s.NoError(s1.Start(ctx)) + s.Nil(s1.openapiHandles) + cancel() + s1.Close() +} + +func (s *OpenAPIViewSuite) TestTaskTemplatesAPI() { + ctx, cancel := context.WithCancel(context.Background()) + s1 := setupTestServer(ctx, s.T()) + defer func() { + cancel() + s1.Close() + }() + + dbCfg := config.GetDBConfigForTest() + source1 := openapi.Source{ + SourceName: source1Name, + EnableGtid: false, + Host: dbCfg.Host, + Password: &dbCfg.Password, + Port: dbCfg.Port, + User: dbCfg.User, + } + createReq := openapi.CreateSourceRequest{Source: source1} + // create source + sourceURL := "/api/v1/sources" + result := testutil.NewRequest().Post(sourceURL).WithJsonBody(createReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusCreated, result.Code()) + + // create task config template + url := "/api/v1/tasks/templates" + + task, err := fixtures.GenNoShardOpenAPITaskForTest() + s.NoError(err) + // use a valid target db + task.TargetConfig.Host = dbCfg.Host + task.TargetConfig.Port = dbCfg.Port + task.TargetConfig.User = dbCfg.User + task.TargetConfig.Password = dbCfg.Password + + // create one + result = testutil.NewRequest().Post(url).WithJsonBody(task).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusCreated, result.Code()) + var createTaskResp openapi.Task + s.NoError(result.UnmarshalBodyToObject(&createTaskResp)) + s.Equal(createTaskResp.Name, task.Name) + + // create again will fail + result = testutil.NewRequest().Post(url).WithJsonBody(task).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusBadRequest, result.Code()) + var errResp openapi.ErrorWithMessage + s.NoError(result.UnmarshalBodyToObject(&errResp)) + s.Equal(int(terror.ErrOpenAPITaskConfigExist.Code()), errResp.ErrorCode) + + // list templates + result = testutil.NewRequest().Get(url).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var resultTaskList openapi.GetTaskListResponse + s.NoError(result.UnmarshalBodyToObject(&resultTaskList)) + s.Equal(1, resultTaskList.Total) + s.Equal(task.Name, resultTaskList.Data[0].Name) + + // get detail + oneURL := fmt.Sprintf("%s/%s", url, task.Name) + result = testutil.NewRequest().Get(oneURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var respTask openapi.Task + s.NoError(result.UnmarshalBodyToObject(&respTask)) + s.Equal(task.Name, respTask.Name) + + // get not exist + notExistURL := fmt.Sprintf("%s/%s", url, "notexist") + result = testutil.NewRequest().Get(notExistURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusBadRequest, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&errResp)) + s.Equal(int(terror.ErrOpenAPITaskConfigNotExist.Code()), errResp.ErrorCode) + + // update + task.TaskMode = openapi.TaskTaskModeAll + result = testutil.NewRequest().Put(oneURL).WithJsonBody(task).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&respTask)) + s.Equal(task.Name, respTask.Name) + + // update not exist will fail + task.Name = "notexist" + result = testutil.NewRequest().Put(notExistURL).WithJsonBody(task).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusBadRequest, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&errResp)) + s.Equal(int(terror.ErrOpenAPITaskConfigNotExist.Code()), errResp.ErrorCode) + + // delete task config template + result = testutil.NewRequest().Delete(oneURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusNoContent, result.Code()) + result = testutil.NewRequest().Get(url).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&resultTaskList)) + s.Equal(0, resultTaskList.Total) +} + +func (s *OpenAPIViewSuite) TestSourceAPI() { + ctx, cancel := context.WithCancel(context.Background()) + s1 := setupTestServer(ctx, s.T()) + defer func() { + cancel() + s1.Close() + }() + + baseURL := "/api/v1/sources" + + dbCfg := config.GetDBConfigForTest() + purgeInterVal := int64(10) + source1 := openapi.Source{ + SourceName: source1Name, + Enable: true, + EnableGtid: false, + Host: dbCfg.Host, + Password: &dbCfg.Password, + Port: dbCfg.Port, + User: dbCfg.User, + Purge: &openapi.Purge{Interval: &purgeInterVal}, + } + createReq := openapi.CreateSourceRequest{Source: source1} + result := testutil.NewRequest().Post(baseURL).WithJsonBody(createReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusCreated, result.Code()) + var resultSource openapi.Source + s.NoError(result.UnmarshalBodyToObject(&resultSource)) + s.Equal(source1.User, resultSource.User) + s.Equal(source1.Host, resultSource.Host) + s.Equal(source1.Port, resultSource.Port) + s.Equal(source1.Password, resultSource.Password) + s.Equal(source1.EnableGtid, resultSource.EnableGtid) + s.Equal(source1.SourceName, resultSource.SourceName) + s.EqualValues(source1.Purge.Interval, resultSource.Purge.Interval) + + // create source with same name will failed + result = testutil.NewRequest().Post(baseURL).WithJsonBody(createReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusBadRequest, result.Code()) + var errResp openapi.ErrorWithMessage + s.NoError(result.UnmarshalBodyToObject(&errResp)) + s.Equal(int(terror.ErrSchedulerSourceCfgExist.Code()), errResp.ErrorCode) + + // get source + source1URL := fmt.Sprintf("%s/%s", baseURL, source1Name) + var source1FromHTTP openapi.Source + result = testutil.NewRequest().Get(source1URL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&source1FromHTTP)) + s.Equal(source1FromHTTP.SourceName, source1.SourceName) + // update a source + clone := source1 + clone.EnableGtid = true + updateReq := openapi.UpdateSourceRequest{Source: clone} + result = testutil.NewRequest().Put(source1URL).WithJsonBody(updateReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&source1FromHTTP)) + s.Equal(source1FromHTTP.EnableGtid, clone.EnableGtid) + + // get source not existed + sourceNotExistedURL := fmt.Sprintf("%s/not_existed", baseURL) + result = testutil.NewRequest().Get(sourceNotExistedURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusNotFound, result.Code()) + // get source status + var source1Status openapi.GetSourceStatusResponse + source1StatusURL := fmt.Sprintf("%s/%s/status", baseURL, source1Name) + result = testutil.NewRequest().Get(source1StatusURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&source1Status)) + s.Len(source1Status.Data, 1) + s.Equal(source1.SourceName, source1Status.Data[0].SourceName) + s.Equal("", source1Status.Data[0].WorkerName) // no worker now + + // list source + result = testutil.NewRequest().Get(baseURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusOK, result.Code()) + var resultListSource openapi.GetSourceListResponse + s.NoError(result.UnmarshalBodyToObject(&resultListSource)) + s.Len(resultListSource.Data, 1) + s.Equal(1, resultListSource.Total) + s.Equal(source1.SourceName, resultListSource.Data[0].SourceName) + + // test get source schema and table + _, mockDB, err := conn.InitMockDBFull() + s.NoError(err) + schemaName := "information_schema" + mockDB.ExpectQuery("SHOW DATABASES").WillReturnRows(sqlmock.NewRows([]string{"Database"}).AddRow(schemaName)) + + schemaURL := fmt.Sprintf("%s/%s/schemas", baseURL, source1.SourceName) + result = testutil.NewRequest().Get(schemaURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var schemaNameList openapi.SchemaNameList + s.NoError(result.UnmarshalBodyToObject(&schemaNameList)) + s.Len(schemaNameList, 1) + s.Equal(schemaName, schemaNameList[0]) + s.NoError(mockDB.ExpectationsWereMet()) + + _, mockDB, err = conn.InitMockDBFull() + s.NoError(err) + tableName := "CHARACTER_SETS" + mockDB.ExpectQuery("SHOW TABLES FROM " + schemaName).WillReturnRows(sqlmock.NewRows([]string{"Tables_in_information_schema"}).AddRow(tableName)) + tableURL := fmt.Sprintf("%s/%s/schemas/%s", baseURL, source1.SourceName, schemaName) + result = testutil.NewRequest().Get(tableURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var tableNameList openapi.TableNameList + s.NoError(result.UnmarshalBodyToObject(&tableNameList)) + s.Len(tableNameList, 1) + s.Equal(tableName, tableNameList[0]) + s.NoError(mockDB.ExpectationsWereMet()) + + ctrl := gomock.NewController(s.T()) + defer ctrl.Finish() + // add mock worker the unbounded sources should be bounded + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() + workerName1 := "worker1" + s.NoError(s1.scheduler.AddWorker(workerName1, "172.16.10.72:8262")) + go func(ctx context.Context, workerName string) { + s.NoError(ha.KeepAlive(ctx, s1.etcdClient, workerName, keepAliveTTL)) + }(ctx1, workerName1) + // wait worker ready + s.True(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + w := s1.scheduler.GetWorkerBySource(source1.SourceName) + return w != nil + }), true) + + // mock worker get status relay not started + mockWorkerClient := pbmock.NewMockWorkerClient(ctrl) + mockRelayQueryStatus(mockWorkerClient, source1.SourceName, workerName1, pb.Stage_InvalidStage) + s1.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient)) + + // get source status again,source should be bounded by worker1,but relay not started + result = testutil.NewRequest().Get(source1StatusURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&source1Status)) + s.Equal(source1.SourceName, source1Status.Data[0].SourceName) + s.Equal(workerName1, source1Status.Data[0].WorkerName) // worker1 is bound + s.Nil(source1Status.Data[0].RelayStatus) // not start relay + s.Equal(1, source1Status.Total) + + // list source with status + result = testutil.NewRequest().Get(baseURL+"?with_status=true").GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&resultListSource)) + s.Len(resultListSource.Data, 1) + s.Equal(1, resultListSource.Total) + s.Equal(source1.SourceName, resultListSource.Data[0].SourceName) + statusList := *resultListSource.Data[0].StatusList + s.Len(statusList, 1) + status := statusList[0] + s.Equal(workerName1, status.WorkerName) + s.Nil(status.RelayStatus) + + // start relay + enableRelayURL := fmt.Sprintf("%s/relay/enable", source1URL) + result = testutil.NewRequest().Post(enableRelayURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusOK, result.Code()) + relayWorkers, err := s1.scheduler.GetRelayWorkers(source1Name) + s.NoError(err) + s.Len(relayWorkers, 1) + + // mock worker get status relay started + mockWorkerClient = pbmock.NewMockWorkerClient(ctrl) + mockRelayQueryStatus(mockWorkerClient, source1.SourceName, workerName1, pb.Stage_Running) + s1.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient)) + // get source status again, relay status should not be nil + result = testutil.NewRequest().Get(source1StatusURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&source1Status)) + s.Equal(pb.Stage_Running.String(), source1Status.Data[0].RelayStatus.Stage) + + // mock worker get status meet error + mockWorkerClient = pbmock.NewMockWorkerClient(ctrl) + mockRelayQueryStatus(mockWorkerClient, source1.SourceName, workerName1, pb.Stage_Paused) + s1.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient)) + // get source status again, error message should not be nil + result = testutil.NewRequest().Get(source1StatusURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&source1Status)) + s.Regexp("some error happened", *source1Status.Data[0].ErrorMsg) + s.Equal(workerName1, source1Status.Data[0].WorkerName) + + // test list source and filter by enable-relay + result = testutil.NewRequest().Get(baseURL+"?enable_relay=true").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&resultListSource)) + s.Len(resultListSource.Data, 1) + result = testutil.NewRequest().Get(baseURL+"?enable_relay=false").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&resultListSource)) + s.Len(resultListSource.Data, 0) + + // purge relay + purgeRelay := fmt.Sprintf("%s/relay/purge", source1URL) + purgeRelayReq := openapi.PurgeRelayRequest{RelayBinlogName: "binlog.001"} + mockWorkerClient = pbmock.NewMockWorkerClient(ctrl) + mockPurgeRelay(mockWorkerClient) + s1.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient)) + result = testutil.NewRequest().Post(purgeRelay).WithJsonBody(purgeRelayReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + + // test disable relay + disableRelayURL := fmt.Sprintf("%s/relay/disable", source1URL) + disableRelayReq := openapi.DisableRelayRequest{} + result = testutil.NewRequest().Post(disableRelayURL).WithJsonBody(disableRelayReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + relayWorkers, err = s1.scheduler.GetRelayWorkers(source1Name) + s.NoError(err) + s.Len(relayWorkers, 0) + + // mock worker get status relay already stopped + mockWorkerClient = pbmock.NewMockWorkerClient(ctrl) + mockRelayQueryStatus(mockWorkerClient, source1.SourceName, workerName1, pb.Stage_InvalidStage) + s1.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient)) + // get source status again + result = testutil.NewRequest().Get(source1StatusURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + source1Status = openapi.GetSourceStatusResponse{} // reset + s.NoError(result.UnmarshalBodyToObject(&source1Status)) + s.Equal(source1.SourceName, source1Status.Data[0].SourceName) + s.Equal(workerName1, source1Status.Data[0].WorkerName) // worker1 is bound + s.Nil(source1Status.Data[0].RelayStatus) // not start relay + s.Equal(1, source1Status.Total) + + // delete source with --force + result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s?force=true", baseURL, source1.SourceName)).GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusNoContent, result.Code()) + + // delete again will failed + result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s", baseURL, source1.SourceName)).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusBadRequest, result.Code()) + var errResp2 openapi.ErrorWithMessage + s.NoError(result.UnmarshalBodyToObject(&errResp2)) + s.Equal(int(terror.ErrSchedulerSourceCfgNotExist.Code()), errResp2.ErrorCode) + + // list source + result = testutil.NewRequest().Get(baseURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusOK, result.Code()) + var resultListSource2 openapi.GetSourceListResponse + s.NoError(result.UnmarshalBodyToObject(&resultListSource2)) + s.Len(resultListSource2.Data, 0) + s.Equal(0, resultListSource2.Total) + + // create with no password + sourceNoPassword := source1 + sourceNoPassword.Password = nil + createReqNoPassword := openapi.CreateSourceRequest{Source: sourceNoPassword} + result = testutil.NewRequest().Post(baseURL).WithJsonBody(createReqNoPassword).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusCreated, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&resultSource)) + s.Nil(resultSource.Password) + + // update to have password + sourceHasPassword := source1 + updateReqHasPassword := openapi.UpdateSourceRequest{Source: sourceHasPassword} + result = testutil.NewRequest().Put(source1URL).WithJsonBody(updateReqHasPassword).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&source1FromHTTP)) + s.Equal(source1FromHTTP.Password, sourceHasPassword.Password) + + // update with no password, will use old password + updateReqNoPassword := openapi.UpdateSourceRequest{Source: sourceNoPassword} + result = testutil.NewRequest().Put(source1URL).WithJsonBody(updateReqNoPassword).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&source1FromHTTP)) + s.Nil(source1FromHTTP.Password) + // password is old + conf := s1.scheduler.GetSourceCfgByID(source1FromHTTP.SourceName) + s.NotNil(conf) + s.Equal(*sourceHasPassword.Password, conf.From.Password) + + // delete source with --force + result = testutil.NewRequest().Delete(fmt.Sprintf("%s/%s?force=true", baseURL, source1.SourceName)).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusNoContent, result.Code()) +} + +func (s *OpenAPIViewSuite) testImportTaskTemplate(task *openapi.Task, s1 *Server) { + // test batch import task config + taskBatchImportURL := "/api/v1/tasks/templates/import" + req := openapi.TaskTemplateRequest{Overwrite: false} + result := testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusAccepted, result.Code()) + var resp openapi.TaskTemplateResponse + s.NoError(result.UnmarshalBodyToObject(&resp)) + s.Len(resp.SuccessTaskList, 1) + s.Equal(task.Name, resp.SuccessTaskList[0]) + s.Len(resp.FailedTaskList, 0) + + // import again without overwrite will fail + result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusAccepted, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&resp)) + s.Len(resp.SuccessTaskList, 0) + s.Len(resp.FailedTaskList, 1) + s.Equal(task.Name, resp.FailedTaskList[0].TaskName) + + // import again with overwrite will success + req.Overwrite = true + result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.NoError(result.UnmarshalBodyToObject(&resp)) + s.Len(resp.SuccessTaskList, 1) + s.Equal(task.Name, resp.SuccessTaskList[0]) + s.Len(resp.FailedTaskList, 0) +} + +func (s *OpenAPIViewSuite) testSourceOperationWithTask(source *openapi.Source, task *openapi.Task, s1 *Server) { + source1URL := fmt.Sprintf("/api/v1/sources/%s", source.SourceName) + disableSource1URL := fmt.Sprintf("%s/disable", source1URL) + enableSource1URL := fmt.Sprintf("%s/enable", source1URL) + transferSource1URL := fmt.Sprintf("%s/transfer", source1URL) + + // disable + result := testutil.NewRequest().Post(disableSource1URL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.Equal(pb.Stage_Stopped, s1.scheduler.GetExpectSubTaskStage(task.Name, source1Name).Expect) + + // enable again + result = testutil.NewRequest().Post(enableSource1URL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.Equal(pb.Stage_Running, s1.scheduler.GetExpectSubTaskStage(task.Name, source1Name).Expect) + + // test transfer failed,success transfer is tested in IT test + req := openapi.WorkerNameRequest{WorkerName: "not exist"} + result = testutil.NewRequest().Post(transferSource1URL).WithJsonBody(req).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusBadRequest, result.Code()) + var resp openapi.ErrorWithMessage + s.NoError(result.UnmarshalBodyToObject(&resp)) + s.Equal(int(terror.ErrSchedulerWorkerNotExist.Code()), resp.ErrorCode) +} + +func (s *OpenAPIViewSuite) TestTaskAPI() { + ctx, cancel := context.WithCancel(context.Background()) + s1 := setupTestServer(ctx, s.T()) + ctrl := gomock.NewController(s.T()) + defer func() { + cancel() + s1.Close() + ctrl.Finish() + }() + + dbCfg := config.GetDBConfigForTest() + source1 := openapi.Source{ + Enable: true, + SourceName: source1Name, + EnableGtid: false, + Host: dbCfg.Host, + Password: &dbCfg.Password, + Port: dbCfg.Port, + User: dbCfg.User, + } + // create source + sourceURL := "/api/v1/sources" + createSourceReq := openapi.CreateSourceRequest{Source: source1} + result := testutil.NewRequest().Post(sourceURL).WithJsonBody(createSourceReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + // check http status code + s.Equal(http.StatusCreated, result.Code()) + + // add mock worker start workers, the unbounded sources should be bounded + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() + workerName1 := "worker-1" + s.NoError(s1.scheduler.AddWorker(workerName1, "172.16.10.72:8262")) + go func(ctx context.Context, workerName string) { + s.NoError(ha.KeepAlive(ctx, s1.etcdClient, workerName, keepAliveTTL)) + }(ctx1, workerName1) + // wait worker ready + s.True(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + w := s1.scheduler.GetWorkerBySource(source1.SourceName) + return w != nil + }), true) + + // create task + taskURL := "/api/v1/tasks" + + task, err := fixtures.GenNoShardOpenAPITaskForTest() + s.NoError(err) + // use a valid target db + task.TargetConfig.Host = dbCfg.Host + task.TargetConfig.Port = dbCfg.Port + task.TargetConfig.User = dbCfg.User + task.TargetConfig.Password = dbCfg.Password + + // create task + createTaskReq := openapi.CreateTaskRequest{Task: task} + result = testutil.NewRequest().Post(taskURL).WithJsonBody(createTaskReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusCreated, result.Code()) + var createTaskResp openapi.OperateTaskResponse + s.NoError(result.UnmarshalBodyToObject(&createTaskResp)) + s.Equal(createTaskResp.Task.Name, task.Name) + subTaskM := s1.scheduler.GetSubTaskCfgsByTask(task.Name) + s.Len(subTaskM, 1) + s.Equal(task.Name, subTaskM[source1Name].Name) + + // get task + task1URL := fmt.Sprintf("%s/%s", taskURL, task.Name) + var task1FromHTTP openapi.Task + result = testutil.NewRequest().Get(task1URL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&task1FromHTTP)) + s.Equal(task1FromHTTP.Name, task.Name) + + // update a task + s.NoError(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/scheduler/operateCheckSubtasksCanUpdate", `return("success")`)) + clone := task + batch := 1000 + clone.SourceConfig.IncrMigrateConf.ReplBatch = &batch + updateReq := openapi.UpdateTaskRequest{Task: clone} + result = testutil.NewRequest().Put(task1URL).WithJsonBody(updateReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var updateResp openapi.OperateTaskResponse + s.NoError(result.UnmarshalBodyToObject(&updateResp)) + s.EqualValues(updateResp.Task.SourceConfig.IncrMigrateConf.ReplBatch, clone.SourceConfig.IncrMigrateConf.ReplBatch) + s.NoError(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/scheduler/operateCheckSubtasksCanUpdate")) + + // list tasks + result = testutil.NewRequest().Get(taskURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var resultTaskList openapi.GetTaskListResponse + s.NoError(result.UnmarshalBodyToObject(&resultTaskList)) + s.Equal(1, resultTaskList.Total) + s.Equal(task.Name, resultTaskList.Data[0].Name) + + s.testImportTaskTemplate(&task, s1) + + // start task + startTaskURL := fmt.Sprintf("%s/%s/start", taskURL, task.Name) + result = testutil.NewRequest().Post(startTaskURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.Equal(pb.Stage_Running, s1.scheduler.GetExpectSubTaskStage(task.Name, source1Name).Expect) + + // get task status + mockWorkerClient := pbmock.NewMockWorkerClient(ctrl) + mockTaskQueryStatus(mockWorkerClient, task.Name, source1.SourceName, workerName1, false) + s1.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient)) + taskStatusURL := fmt.Sprintf("%s/%s/status", taskURL, task.Name) + result = testutil.NewRequest().Get(taskStatusURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var resultTaskStatus openapi.GetTaskStatusResponse + s.NoError(result.UnmarshalBodyToObject(&resultTaskStatus)) + s.Equal(1, resultTaskStatus.Total) // only 1 subtask + s.Equal(task.Name, resultTaskStatus.Data[0].Name) + s.Equal(openapi.TaskStageRunning, resultTaskStatus.Data[0].Stage) + s.Equal(workerName1, resultTaskStatus.Data[0].WorkerName) + s.Equal(float64(0), resultTaskStatus.Data[0].DumpStatus.CompletedTables) + s.Equal(int64(1), resultTaskStatus.Data[0].DumpStatus.TotalTables) + s.Equal(float64(10), resultTaskStatus.Data[0].DumpStatus.EstimateTotalRows) + + // get task status with source name + taskStatusURL = fmt.Sprintf("%s/%s/status?source_name_list=%s", taskURL, task.Name, source1Name) + result = testutil.NewRequest().Get(taskStatusURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var resultTaskStatusWithStatus openapi.GetTaskStatusResponse + s.NoError(result.UnmarshalBodyToObject(&resultTaskStatusWithStatus)) + s.EqualValues(resultTaskStatus, resultTaskStatusWithStatus) + + // list task with status + result = testutil.NewRequest().Get(taskURL+"?with_status=true").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + var resultListTask openapi.GetTaskListResponse + s.NoError(result.UnmarshalBodyToObject(&resultListTask)) + s.Len(resultListTask.Data, 1) + s.Equal(1, resultListTask.Total) + s.NotNil(resultListTask.Data[0].StatusList) + statusList := *resultListTask.Data[0].StatusList + status := statusList[0] + s.Equal(workerName1, status.WorkerName) + s.Equal(task.Name, status.Name) + + // list with filter + result = testutil.NewRequest().Get(taskURL+"?stage=Stopped").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + resultListTask = openapi.GetTaskListResponse{} // reset + s.NoError(result.UnmarshalBodyToObject(&resultListTask)) + s.Len(resultListTask.Data, 0) + + result = testutil.NewRequest().Get(taskURL+"?stage=Running").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + resultListTask = openapi.GetTaskListResponse{} // reset + s.NoError(result.UnmarshalBodyToObject(&resultListTask)) + s.Len(resultListTask.Data, 1) + + result = testutil.NewRequest().Get(taskURL+"?stage=Running&source_name_list=notsource").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + resultListTask = openapi.GetTaskListResponse{} // reset + s.NoError(result.UnmarshalBodyToObject(&resultListTask)) + s.Len(resultListTask.Data, 0) + + result = testutil.NewRequest().Get(taskURL+"?stage=Running&source_name_list="+source1Name).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + resultListTask = openapi.GetTaskListResponse{} // reset + s.NoError(result.UnmarshalBodyToObject(&resultListTask)) + s.Len(resultListTask.Data, 1) + + // get task with status + result = testutil.NewRequest().Get(task1URL+"?with_status=true").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&task1FromHTTP)) + s.Equal(task1FromHTTP.Name, task.Name) + statusList = *task1FromHTTP.StatusList + s.Len(statusList, 1) + s.Equal(workerName1, statusList[0].WorkerName) + s.Equal(task.Name, statusList[0].Name) + + // test some error happened on worker + mockWorkerClient = pbmock.NewMockWorkerClient(ctrl) + mockTaskQueryStatus(mockWorkerClient, task.Name, source1.SourceName, workerName1, true) + s1.scheduler.SetWorkerClientForTest(workerName1, newMockRPCClient(mockWorkerClient)) + result = testutil.NewRequest().Get(taskURL+"?with_status=true").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&resultListTask)) + s.Len(resultListTask.Data, 1) + s.Equal(1, resultListTask.Total) + s.NotNil(resultListTask.Data[0].StatusList) + statusList = *resultListTask.Data[0].StatusList + s.Len(statusList, 1) + status = statusList[0] + s.NotNil(status.ErrorMsg) + + // test convertTaskConfig + convertReq := openapi.ConverterTaskRequest{} + convertResp := openapi.ConverterTaskResponse{} + convertURL := fmt.Sprintf("%s/%s", taskURL, "converters") + result = testutil.NewRequest().Post(convertURL).WithJsonBody(convertReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusBadRequest, result.Code()) // not valid req + + // from task to taskConfig + convertReq.Task = &task + result = testutil.NewRequest().Post(convertURL).WithJsonBody(convertReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&convertResp)) + s.NotNil(convertResp.Task) + s.NotNil(convertResp.TaskConfigFile) + taskConfigFile := convertResp.TaskConfigFile + + // from taskCfg to task + convertReq.Task = nil + convertReq.TaskConfigFile = &taskConfigFile + result = testutil.NewRequest().Post(convertURL).WithJsonBody(convertReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.NoError(result.UnmarshalBodyToObject(&convertResp)) + s.NotNil(convertResp.Task) + s.NotNil(convertResp.TaskConfigFile) + taskConfigFile2 := convertResp.TaskConfigFile + s.Equal(taskConfigFile2, taskConfigFile) + + s.testSourceOperationWithTask(&source1, &task, s1) + + // stop task + stopTaskURL := fmt.Sprintf("%s/%s/stop", taskURL, task.Name) + stopTaskReq := openapi.StopTaskRequest{} + result = testutil.NewRequest().Post(stopTaskURL).WithJsonBody(stopTaskReq).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + s.Equal(pb.Stage_Stopped, s1.scheduler.GetExpectSubTaskStage(task.Name, source1Name).Expect) + + // delete task + result = testutil.NewRequest().Delete(task1URL+"?force=true").GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusNoContent, result.Code()) + subTaskM = s1.scheduler.GetSubTaskCfgsByTask(task.Name) + s.Len(subTaskM, 0) + + // list tasks + result = testutil.NewRequest().Get(taskURL).GoWithHTTPHandler(s.T(), s1.openapiHandles) + s.Equal(http.StatusOK, result.Code()) + resultListTask = openapi.GetTaskListResponse{} // reset + s.NoError(result.UnmarshalBodyToObject(&resultTaskList)) + s.Equal(0, resultTaskList.Total) +} + +func TestOpenAPIViewSuite(t *testing.T) { + suite.Run(t, new(OpenAPIViewSuite)) +} diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index 42309798856..9eee8aa06b5 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -190,8 +190,18 @@ func (s *Server) Start(ctx context.Context) (err error) { "/status": getStatusHandle(), "/debug/": getDebugHandler(), } +<<<<<<< HEAD if s.cfg.ExperimentalFeatures.OpenAPI { if initOpenAPIErr := s.InitOpenAPIHandles(); initOpenAPIErr != nil { +======= + if s.cfg.OpenAPI { + // tls3 is used to openapi reverse proxy + tls3, err1 := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN) + if err1 != nil { + return terror.ErrMasterTLSConfigNotValid.Delegate(err1) + } + if initOpenAPIErr := s.InitOpenAPIHandles(tls3.TLSConfig()); initOpenAPIErr != nil { +>>>>>>> 359af1861 (DM/Openapi: use reverse proxy instead of redirect (#5390)) return terror.ErrOpenAPICommonError.Delegate(initOpenAPIErr) } userHandles["/api/v1/"] = s.echo diff --git a/dm/tests/openapi/client/openapi_source_check b/dm/tests/openapi/client/openapi_source_check index 5b7c1dfd0c1..f018ad71ec9 100755 --- a/dm/tests/openapi/client/openapi_source_check +++ b/dm/tests/openapi/client/openapi_source_check @@ -1,6 +1,7 @@ #!/usr/bin/env python import sys import requests +import ssl SOURCE1_NAME = "mysql-01" SOURCE2_NAME = "mysql-02" @@ -11,6 +12,10 @@ WORKER2_NAME = "worker2" API_ENDPOINT = "http://127.0.0.1:8261/api/v1/sources" API_ENDPOINT_NOT_LEADER = "http://127.0.0.1:8361/api/v1/sources" +API_ENDPOINT_HTTPS = "https://127.0.0.1:8261/api/v1/sources" +API_ENDPOINT_NOT_LEADER_HTTPS = "https://127.0.0.1:8361/api/v1/sources" + + def create_source_failed(): resp = requests.post(url=API_ENDPOINT) @@ -47,6 +52,41 @@ def create_source2_success(): print("create_source1_success resp=", resp.json()) assert resp.status_code == 201 +<<<<<<< HEAD +======= +def create_source_success_https(ssl_ca, ssl_cert, ssl_key): + req = { + "source": { + "case_sensitive": False, + "enable": True, + "enable_gtid": False, + "host": "127.0.0.1", + "password": "123456", + "port": 3306, + "source_name": SOURCE1_NAME, + "user": "root", + } + } + resp = requests.post(url=API_ENDPOINT_HTTPS, json=req, verify=ssl_ca, cert=(ssl_cert, ssl_key)) + print("create_source_success_https resp=", resp.json()) + assert resp.status_code == 201 + +def update_source1_without_password_success(): + req = { + "source": { + "case_sensitive": False, + "enable": True, + "enable_gtid": False, + "host": "127.0.0.1", + "port": 3306, + "source_name": SOURCE1_NAME, + "user": "root", + } + } + resp = requests.put(url=API_ENDPOINT + "/" + SOURCE1_NAME, json=req) + print("update_source1_without_password_success resp=", resp.json()) + assert resp.status_code == 200 +>>>>>>> 359af1861 (DM/Openapi: use reverse proxy instead of redirect (#5390)) def list_source_success(source_count): resp = requests.get(url=API_ENDPOINT) @@ -55,6 +95,12 @@ def list_source_success(source_count): print("list_source_by_openapi_success resp=", data) assert data["total"] == int(source_count) +def list_source_success_https(source_count, ssl_ca, ssl_cert, ssl_key): + resp = requests.get(url=API_ENDPOINT_HTTPS, verify=ssl_ca, cert=(ssl_cert, ssl_key)) + assert resp.status_code == 200 + data = resp.json() + print("list_source_success_https resp=", data) + assert data["total"] == int(source_count) def list_source_with_status_success(source_count, status_count): resp = requests.get(url=API_ENDPOINT + "?with_status=true") @@ -66,13 +112,19 @@ def list_source_with_status_success(source_count, status_count): assert len(data["data"][i]["status_list"]) == int(status_count) -def list_source_with_redirect(source_count): +def list_source_with_reverse(source_count): resp = requests.get(url=API_ENDPOINT_NOT_LEADER) assert resp.status_code == 200 data = resp.json() - print("list_source_by_openapi_redirect resp=", data) + print("list_source_with_reverse resp=", data) assert data["total"] == int(source_count) +def list_source_with_reverse_https(source_count, ssl_ca, ssl_cert, ssl_key): + resp = requests.get(url=API_ENDPOINT_NOT_LEADER_HTTPS, verify=ssl_ca, cert=(ssl_cert, ssl_key)) + assert resp.status_code == 200 + data = resp.json() + print("list_source_with_reverse_https resp=", data) + assert data["total"] == int(source_count) def delete_source_success(source_name): resp = requests.delete(url=API_ENDPOINT + "/" + source_name) @@ -215,8 +267,15 @@ if __name__ == "__main__": "create_source_failed": create_source_failed, "create_source1_success": create_source1_success, "create_source2_success": create_source2_success, +<<<<<<< HEAD +======= + "create_source_success_https": create_source_success_https, + "update_source1_without_password_success": update_source1_without_password_success, +>>>>>>> 359af1861 (DM/Openapi: use reverse proxy instead of redirect (#5390)) "list_source_success": list_source_success, - "list_source_with_redirect": list_source_with_redirect, + "list_source_success_https": list_source_success_https, + "list_source_with_reverse_https": list_source_with_reverse_https, + "list_source_with_reverse": list_source_with_reverse, "list_source_with_status_success": list_source_with_status_success, "delete_source_failed": delete_source_failed, "delete_source_success": delete_source_success, diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 969f4f767b6..29ffbe8bfd7 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -67,7 +67,7 @@ function test_source() { openapi_source_check "delete_source_failed" "mysql-01" # send request to not leader node - openapi_source_check "list_source_with_redirect" 0 + openapi_source_check "list_source_with_reverse" 0 echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: SOURCE SUCCESS" } @@ -294,6 +294,62 @@ function test_noshard_task() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: NO SHARD TASK SUCCESS" } +function test_reverse_https() { + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: REVERSE HTTPS" + cleanup_data openapi + cleanup_process + + cp $cur/tls_conf/dm-master1.toml $WORK_DIR/ + cp $cur/tls_conf/dm-master2.toml $WORK_DIR/ + cp $cur/tls_conf/dm-worker1.toml $WORK_DIR/ + cp $cur/tls_conf/dm-worker2.toml $WORK_DIR/ + sed -i "s%dir-placeholer%$cur\/tls_conf%g" $WORK_DIR/dm-master1.toml + sed -i "s%dir-placeholer%$cur\/tls_conf%g" $WORK_DIR/dm-master2.toml + sed -i "s%dir-placeholer%$cur\/tls_conf%g" $WORK_DIR/dm-worker1.toml + sed -i "s%dir-placeholer%$cur\/tls_conf%g" $WORK_DIR/dm-worker2.toml + + # run dm-master1 + run_dm_master $WORK_DIR/master1 $MASTER_PORT1 $WORK_DIR/dm-master1.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1 "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key" + # join master2 + run_dm_master $WORK_DIR/master2 $MASTER_PORT2 $WORK_DIR/dm-master2.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT2 "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key" + # run dm-worker1 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $WORK_DIR/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key" + # run dm-worker2 + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $WORK_DIR/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key" + + prepare_database + # create source successfully + openapi_source_check "create_source_success_https" "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key" + + # get source list success + openapi_source_check "list_source_success_https" 1 "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key" + + # send request to not leader node + openapi_source_check "list_source_with_reverse_https" 1 "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key" + + cleanup_data openapi + cleanup_process + + # run dm-master1 + run_dm_master $WORK_DIR/master1 $MASTER_PORT1 $cur/conf/dm-master1.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1 + # join master2 + run_dm_master $WORK_DIR/master2 $MASTER_PORT2 $cur/conf/dm-master2.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT2 + # run dm-worker1 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + # run dm-worker2 + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: REVERSE HTTPS" +} + function test_cluster() { # list master and worker node openapi_cluster_check "list_master_success" 2 @@ -333,6 +389,19 @@ function run() { test_shard_task test_noshard_task +<<<<<<< HEAD +======= + test_task_templates + test_noshard_task_dump_status + test_complex_operations_of_source_and_task + test_task_with_ignore_check_items + test_delete_task_with_stopped_downstream + test_start_task_with_condition + test_stop_task_with_condition + test_reverse_https + + # NOTE: this test case MUST running at last, because it will offline some members of cluster +>>>>>>> 359af1861 (DM/Openapi: use reverse proxy instead of redirect (#5390)) test_cluster } diff --git a/dm/tests/openapi/tls_conf/ca.pem b/dm/tests/openapi/tls_conf/ca.pem new file mode 100644 index 00000000000..9fc215fa83b --- /dev/null +++ b/dm/tests/openapi/tls_conf/ca.pem @@ -0,0 +1,8 @@ +-----BEGIN CERTIFICATE----- +MIIBGDCBwAIJAOjYXLFw5V1HMAoGCCqGSM49BAMCMBQxEjAQBgNVBAMMCWxvY2Fs +aG9zdDAgFw0yMDAzMTcxMjAwMzNaGA8yMjkzMTIzMTEyMDAzM1owFDESMBAGA1UE +AwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEglCIJD8uVBfD +kuM+UQP+VA7Srbz17WPLA0Sqc+sQ2p6fT6HYKCW60EXiZ/yEC0925iyVbXEEbX4J +xCc2Heow5TAKBggqhkjOPQQDAgNHADBEAiAILL3Zt/3NFeDW9c9UAcJ9lc92E0ZL +GNDuH6i19Fex3wIgT0ZMAKAFSirGGtcLu0emceuk+zVKjJzmYbsLdpj/JuQ= +-----END CERTIFICATE----- diff --git a/dm/tests/openapi/tls_conf/dm-master1.toml b/dm/tests/openapi/tls_conf/dm-master1.toml new file mode 100644 index 00000000000..d7044b68be9 --- /dev/null +++ b/dm/tests/openapi/tls_conf/dm-master1.toml @@ -0,0 +1,16 @@ +# Master Configuration. +advertise-addr = "127.0.0.1:8261" +initial-cluster = "master1=https://127.0.0.1:8291" +master-addr = ":8261" +name = "master1" +peer-urls = "127.0.0.1:8291" +openapi = true + +ssl-ca = "dir-placeholer/ca.pem" +ssl-cert = "dir-placeholer/dm.pem" +ssl-key = "dir-placeholer/dm.key" +cert-allowed-cn = ["dm"] +auto-compaction-retention = "3s" + + + diff --git a/dm/tests/openapi/tls_conf/dm-master2.toml b/dm/tests/openapi/tls_conf/dm-master2.toml new file mode 100644 index 00000000000..6c9bbffeb68 --- /dev/null +++ b/dm/tests/openapi/tls_conf/dm-master2.toml @@ -0,0 +1,14 @@ +# Master Configuration. +name = "master2" +master-addr = ":8361" +advertise-addr = "127.0.0.1:8361" +peer-urls = "http://127.0.0.1:8292" +join = "127.0.0.1:8261" +openapi = true + +ssl-ca = "dir-placeholer/ca.pem" +ssl-cert = "dir-placeholer/dm.pem" +ssl-key = "dir-placeholer/dm.key" +cert-allowed-cn = ["dm"] +auto-compaction-retention = "3s" + diff --git a/dm/tests/openapi/tls_conf/dm-worker1.toml b/dm/tests/openapi/tls_conf/dm-worker1.toml new file mode 100644 index 00000000000..7e57bcf2744 --- /dev/null +++ b/dm/tests/openapi/tls_conf/dm-worker1.toml @@ -0,0 +1,7 @@ +name = "worker1" +join = "127.0.0.1:8261" + +ssl-ca = "dir-placeholer/ca.pem" +ssl-cert = "dir-placeholer/dm.pem" +ssl-key = "dir-placeholer/dm.key" +cert-allowed-cn = ["dm"] \ No newline at end of file diff --git a/dm/tests/openapi/tls_conf/dm-worker2.toml b/dm/tests/openapi/tls_conf/dm-worker2.toml new file mode 100644 index 00000000000..96301e0cffb --- /dev/null +++ b/dm/tests/openapi/tls_conf/dm-worker2.toml @@ -0,0 +1,7 @@ +name = "worker2" +join = "127.0.0.1:8261" + +ssl-ca = "dir-placeholer/ca.pem" +ssl-cert = "dir-placeholer/dm.pem" +ssl-key = "dir-placeholer/dm.key" +cert-allowed-cn = ["dm"] \ No newline at end of file diff --git a/dm/tests/openapi/tls_conf/dm.key b/dm/tests/openapi/tls_conf/dm.key new file mode 100644 index 00000000000..dfdc077bc4d --- /dev/null +++ b/dm/tests/openapi/tls_conf/dm.key @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEICF/GDtVxhTPTP501nOu4jgwGSDY01xN+61xd9MfChw+oAoGCCqGSM49 +AwEHoUQDQgAEgQOv5bQO7xK16vZWhwJqlz2vl19+AXW2Ql7KQyGiBJVSvLbyDLOr +kIeFlHN04iqQ39SKSOSfeGSfRt6doU6IcA== +-----END EC PRIVATE KEY----- diff --git a/dm/tests/openapi/tls_conf/dm.pem b/dm/tests/openapi/tls_conf/dm.pem new file mode 100644 index 00000000000..d4f846e3a22 --- /dev/null +++ b/dm/tests/openapi/tls_conf/dm.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE----- +MIIBZDCCAQqgAwIBAgIJAIT/lgXUc1JqMAoGCCqGSM49BAMCMBQxEjAQBgNVBAMM +CWxvY2FsaG9zdDAgFw0yMDAzMTcxMjAwMzNaGA8yMjkzMTIzMTEyMDAzM1owDTEL +MAkGA1UEAwwCZG0wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASBA6/ltA7vErXq +9laHAmqXPa+XX34BdbZCXspDIaIElVK8tvIMs6uQh4WUc3TiKpDf1IpI5J94ZJ9G +3p2hTohwo0owSDAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8AAAEwCwYDVR0PBAQD +AgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDATAKBggqhkjOPQQDAgNI +ADBFAiEAx6ljJ+tNa55ypWLGNqmXlB4UdMmKmE4RSKJ8mmEelfECIG2ZmCE59rv5 +wImM6KnK+vM2QnEiISH3PeYyyRzQzycu +-----END CERTIFICATE-----