Skip to content

Commit

Permalink
Merge pull request #2874 from walterlife/feature/workflow-controller
Browse files Browse the repository at this point in the history
[ISSUE #2180] add workflow controller module
  • Loading branch information
xwm1992 authored Jan 10, 2023
2 parents be19e22 + 77cfded commit ce32019
Show file tree
Hide file tree
Showing 17 changed files with 798 additions and 211 deletions.
125 changes: 125 additions & 0 deletions eventmesh-workflow-go/cmd/controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
_ "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/cmd/controller/docs"
pconfig "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/util"
"github.com/gin-gonic/gin"
swaggerFiles "github.com/swaggo/files"
ginSwagger "github.com/swaggo/gin-swagger"
"log"
"net/http"
)

type Server struct {
server *gin.Engine
workflow *WorkflowController
}

// @title Workflow API
// @version 1.0
// @description This is a workflow server.

// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
func main() {
s, err := initServer()
if err != nil {
log.Fatal("flow new server fail: " + err.Error())
}
s.router()
if err := s.run(); err != nil {
log.Fatal("run server fail: " + err.Error())
}
}

func initServer() (*Server, error) {
var s Server
if err := s.setupConfig(); err != nil {
return nil, err
}
if err := dal.Open(); err != nil {
return nil, err
}
r := gin.New()
r.Use(cors()).Use(gin.Recovery())
swagger(r)
s.server = r
s.workflow = NewWorkflowController()
return &s, nil
}

func swagger(r *gin.Engine) {
r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))
}

func cors() gin.HandlerFunc {
return func(c *gin.Context) {
method := c.Request.Method
origin := c.Request.Header.Get("Origin")
if origin != "" {
c.Header("Access-Control-Allow-Origin", "*")
c.Header("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE, UPDATE")
c.Header("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, "+
"Authorization")
c.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, "+
"Access-Control-Allow-Headers, Cache-Control, Content-Language, Content-Type")
c.Header("Access-Control-Allow-Credentials", "true")
}
if method == "OPTIONS" {
c.AbortWithStatus(http.StatusNoContent)
}
c.Next()
}
}

func (s *Server) router() {
s.server.POST("/workflow", s.workflow.Save)
s.server.GET("/workflow", s.workflow.QueryList)
s.server.GET("/workflow/:workflowId", s.workflow.QueryDetail)
s.server.DELETE("/workflow/:workflowId", s.workflow.Delete)
s.server.GET("/workflow/instances", s.workflow.QueryInstances)
}

func (s *Server) setupConfig() error {
config.ServerConfigPath = "./configs/controller.yaml"
// compatible local environment
if !util.Exists(config.ServerConfigPath) {
config.ServerConfigPath = "../configs/controller.yaml"
}
// compatible deploy environment
if !util.Exists(config.ServerConfigPath) {
config.ServerConfigPath = "../conf/controller.yaml"
}
cfg, err := config.LoadConfig(config.ServerConfigPath)
if err != nil {
return err
}
config.SetGlobalConfig(cfg)
if err := config.Setup(cfg); err != nil {
return err
}
return pconfig.Setup(config.ServerConfigPath)
}

func (s *Server) run() error {
return s.server.Run(fmt.Sprintf(":%d", config.GlobalConfig().Server.Port))
}
38 changes: 38 additions & 0 deletions eventmesh-workflow-go/cmd/controller/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"

// SaveWorkflowRequest save workflow request body
type SaveWorkflowRequest struct {
Workflow model.Workflow `json:"workflow"`
}

// QueryWorkflowsRequest query workflow list request body
type QueryWorkflowsRequest struct {
WorkflowID string `form:"workflow_id" json:"workflow_id"`
Status int `form:"status" json:"status"`
Page int `form:"page" json:"page"` // page num
Size int `form:"size" json:"size"` // page size
}

// QueryWorkflowInstancesRequest query workflow instances request body
type QueryWorkflowInstancesRequest struct {
WorkflowID string `form:"workflow_id" json:"workflow_id"`
Page int `form:"page" json:"page"` // page num
Size int `form:"size" json:"size"` // page size
}
35 changes: 35 additions & 0 deletions eventmesh-workflow-go/cmd/controller/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"

// QueryWorkflowsResponse query workflow list response data
type QueryWorkflowsResponse struct {
Workflows []model.Workflow `json:"workflows"`
Total int `json:"total"` // total count
}

// QueryWorkflowResponse query workflow detail response data
type QueryWorkflowResponse struct {
Workflow model.Workflow
}

// QueryWorkflowInstancesResponse query workflow instances response data
type QueryWorkflowInstancesResponse struct {
WorkflowInstances []model.WorkflowInstance `json:"workflow_instances"`
Total int `json:"total"` // total count
}
177 changes: 177 additions & 0 deletions eventmesh-workflow-go/cmd/controller/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/gin-gonic/gin"
"net/http"
)

const (
workflowIDParam = "workflowId"
)

// WorkflowController workflow controller operations
type WorkflowController struct {
workflowDAL dal.WorkflowDAL
}

func NewWorkflowController() *WorkflowController {
c := WorkflowController{}
c.workflowDAL = dal.NewWorkflowDAL()
return &c
}

// Save save a workflow
// @Summary save a workflow
// @Description save a workflow
// @Tags workflow
// @Accept json
// @Produce json
// @Param request body SaveWorkflowRequest true "workflow data"
// @Success 200
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /workflow [post]
func (c *WorkflowController) Save(ctx *gin.Context) {
request := SaveWorkflowRequest{}
if err := ctx.ShouldBind(&request); err != nil {
ctx.JSON(http.StatusBadRequest, err.Error())
return
}
if err := c.workflowDAL.Save(ctx, &request.Workflow); err != nil {
ctx.JSON(http.StatusInternalServerError, err.Error())
return
}
ctx.JSON(http.StatusOK, nil)
}

// QueryList query workflow list
// @Summary query workflow list
// @Description query workflow list
// @Tags workflow
// @Accept json
// @Produce json
// @Param workflow_id query string false "workflow id"
// @Param status query string false "workflow status"
// @Param page query string false "query page"
// @Param size query string false "query size"
// @Success 200 {object} QueryWorkflowsResponse
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /workflow [get]
func (c *WorkflowController) QueryList(ctx *gin.Context) {
request := QueryWorkflowsRequest{}
if err := ctx.ShouldBind(&request); err != nil {
ctx.JSON(http.StatusBadRequest, err.Error())
return
}
res, total, err := c.workflowDAL.SelectList(ctx, &model.QueryParam{
WorkflowID: request.WorkflowID,
Status: request.Status,
Page: request.Page,
Size: request.Size,
})
if err != nil {
ctx.JSON(http.StatusInternalServerError, err.Error())
return
}
ctx.JSON(http.StatusOK, &QueryWorkflowsResponse{Total: total, Workflows: res})
}

// QueryDetail query workflow detail info
// @Summary query workflow detail info
// @Description query workflow detail info
// @Tags workflow
// @Accept json
// @Produce json
// @Param workflowId path string true "workflow id"
// @Success 200 {object} QueryWorkflowResponse
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /workflow/{workflowId} [get]
func (c *WorkflowController) QueryDetail(ctx *gin.Context) {
workflowID := ctx.Param(workflowIDParam)
res, err := c.workflowDAL.Select(ctx, workflowID)
if err != nil {
ctx.JSON(http.StatusInternalServerError, err.Error())
return
}
if res == nil {
ctx.JSON(http.StatusOK, nil)
return
}
ctx.JSON(http.StatusOK, &QueryWorkflowResponse{Workflow: *res})
return
}

// Delete delete a workflow
// @Summary delete a workflow
// @Description delete a workflow
// @Tags workflow
// @Accept json
// @Produce json
// @Param workflowId path string true "workflow id"
// @Success 200 {object} QueryWorkflowsResponse
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /workflow/{workflowId} [delete]
func (c *WorkflowController) Delete(ctx *gin.Context) {
workflowID := ctx.Param(workflowIDParam)
if err := c.workflowDAL.Delete(ctx, workflowID); err != nil {
ctx.JSON(http.StatusInternalServerError, err.Error())
return
}
ctx.JSON(http.StatusOK, nil)
}

// QueryInstances query workflow instances
// @Summary query workflow instances
// @Description query workflow instances
// @Tags workflow
// @Accept json
// @Produce json
// @Param workflow_id query string false "workflow id"
// @Param page query string false "query page"
// @Param size query string false "query size"
// @Success 200 {object} QueryWorkflowInstancesResponse
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /workflow/instances [get]
func (c *WorkflowController) QueryInstances(ctx *gin.Context) {
request := QueryWorkflowInstancesRequest{}
if err := ctx.ShouldBind(&request); err != nil {
ctx.JSON(http.StatusBadRequest, err.Error())
return
}
res, total, err := c.workflowDAL.SelectInstances(ctx, &model.QueryParam{
WorkflowID: request.WorkflowID,
Page: request.Page,
Size: request.Size,
})
if err != nil {
ctx.JSON(http.StatusInternalServerError, err.Error())
return
}
ctx.JSON(http.StatusOK, &QueryWorkflowInstancesResponse{Total: total, WorkflowInstances: res})
}
Loading

0 comments on commit ce32019

Please sign in to comment.