Skip to content

Commit

Permalink
refactor: move plugin models to sdk (#699)
Browse files Browse the repository at this point in the history
* refactor: move plugin models to sdk

* refactor: add mock for plugins

* refactor: move mocks to sdk

* refactor: update sdk plugin

* refactor: use plugin models from sdk

* fix: fix failing test

* fix: add missing method

* fix: change version to 0 for sdk

* chore: update sdk version
  • Loading branch information
sbchaos authored Jan 5, 2023
1 parent 5899045 commit fdbec95
Show file tree
Hide file tree
Showing 44 changed files with 987 additions and 975 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ linters:
- sqlclosecheck
- structcheck
- wastedassign
- gomoddirectives

# Following linters should be enabled after fixing the code
- testpackage
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/internal/manage_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// InitPlugins triggers initialization of all available plugins
func InitPlugins(logLevel config.LogLevel) (*models.RegisteredPlugins, error) {
func InitPlugins(logLevel config.LogLevel) (*models.PluginRepository, error) {
pluginLogLevel := hclog.Info
if logLevel == config.LogLevelDebug {
pluginLogLevel = hclog.Debug
Expand Down
16 changes: 8 additions & 8 deletions client/cmd/internal/survey/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/odpf/optimus/client/local"
"github.com/odpf/optimus/client/local/model"
"github.com/odpf/optimus/internal/models"
"github.com/odpf/optimus/sdk/plugin"
)

// JobSurvey defines survey for job specification in general
Expand Down Expand Up @@ -43,7 +43,7 @@ func (*JobSurvey) AskToSelectJobName(jobSpecReader local.SpecReader[*model.JobSp
return selectedJobName, nil
}

func (j *JobSurvey) askCliModSurveyQuestion(ctx context.Context, cliMod models.CommandLineMod, question models.PluginQuestion) (models.PluginAnswers, error) {
func (j *JobSurvey) askCliModSurveyQuestion(ctx context.Context, cliMod plugin.CommandLineMod, question plugin.Question) (plugin.Answers, error) {
surveyPrompt := j.getSurveyPromptFromPluginQuestion(question)

var responseStr string
Expand All @@ -55,8 +55,8 @@ func (j *JobSurvey) askCliModSurveyQuestion(ctx context.Context, cliMod models.C
return nil, fmt.Errorf("AskSurveyQuestion: %w", err)
}

answers := models.PluginAnswers{
models.PluginAnswer{
answers := plugin.Answers{
plugin.Answer{
Question: question,
Value: responseStr,
},
Expand All @@ -78,7 +78,7 @@ func (j *JobSurvey) askCliModSurveyQuestion(ctx context.Context, cliMod models.C
return answers, nil
}

func (*JobSurvey) getSurveyPromptFromPluginQuestion(question models.PluginQuestion) survey.Prompt {
func (*JobSurvey) getSurveyPromptFromPluginQuestion(question plugin.Question) survey.Prompt {
var surveyPrompt survey.Prompt
if len(question.Multiselect) > 0 {
sel := &survey.Select{
Expand All @@ -103,14 +103,14 @@ func (*JobSurvey) getSurveyPromptFromPluginQuestion(question models.PluginQuesti
return surveyPrompt
}

func (j *JobSurvey) getValidatePluginQuestion(ctx context.Context, cliMod models.CommandLineMod, question models.PluginQuestion) survey.Validator {
func (j *JobSurvey) getValidatePluginQuestion(ctx context.Context, cliMod plugin.CommandLineMod, question plugin.Question) survey.Validator {
return func(val interface{}) error {
str, err := j.convertUserInputPluginToString(val)
if err != nil {
return err
}
resp, err := cliMod.ValidateQuestion(ctx, models.ValidateQuestionRequest{
Answer: models.PluginAnswer{
resp, err := cliMod.ValidateQuestion(ctx, plugin.ValidateQuestionRequest{
Answer: plugin.Answer{
Question: question,
Value: str,
},
Expand Down
15 changes: 8 additions & 7 deletions client/cmd/internal/survey/job_addhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/odpf/optimus/client/local/model"
"github.com/odpf/optimus/internal/models"
"github.com/odpf/optimus/sdk/plugin"
)

// JobAddHookSurvey defines survey for job add hook
Expand All @@ -24,7 +25,7 @@ func NewJobAddHookSurvey() *JobAddHookSurvey {
}

// AskToAddHook asks questions to add hook to a job
func (j *JobAddHookSurvey) AskToAddHook(pluginRepo models.PluginRepository, jobSpec *model.JobSpec) (*model.JobSpec, error) {
func (j *JobAddHookSurvey) AskToAddHook(pluginRepo *models.PluginRepository, jobSpec *model.JobSpec) (*model.JobSpec, error) {
newJobSpec := *jobSpec
availableHookNames := j.getAvailableHookNames(pluginRepo)
if len(availableHookNames) == 0 {
Expand Down Expand Up @@ -66,9 +67,9 @@ func (j *JobAddHookSurvey) AskToAddHook(pluginRepo models.PluginRepository, jobS
return &newJobSpec, nil
}

func (*JobAddHookSurvey) getHookConfig(cliMod models.CommandLineMod, answers models.PluginAnswers) (map[string]string, error) {
func (*JobAddHookSurvey) getHookConfig(cliMod plugin.CommandLineMod, answers plugin.Answers) (map[string]string, error) {
ctx := context.Background()
configRequest := models.DefaultConfigRequest{Answers: answers}
configRequest := plugin.DefaultConfigRequest{Answers: answers}
generatedConfigResponse, err := cliMod.DefaultConfig(ctx, configRequest)
if err != nil {
return nil, err
Expand All @@ -84,7 +85,7 @@ func (*JobAddHookSurvey) getHookConfig(cliMod models.CommandLineMod, answers mod
return config, nil
}

func (*JobAddHookSurvey) getAvailableHookNames(pluginRepo models.PluginRepository) []string {
func (*JobAddHookSurvey) getAvailableHookNames(pluginRepo *models.PluginRepository) []string {
var output []string
for _, hook := range pluginRepo.GetHooks() {
output = append(output, hook.Info().Name)
Expand Down Expand Up @@ -113,14 +114,14 @@ func (*JobAddHookSurvey) isSelectedHookAlreadyInJob(jobSpec *model.JobSpec, sele
return false
}

func (j *JobAddHookSurvey) askHookQuestions(ctx context.Context, cliMod models.CommandLineMod, jobName string) (models.PluginAnswers, error) {
questionRequest := models.GetQuestionsRequest{JobName: jobName}
func (j *JobAddHookSurvey) askHookQuestions(ctx context.Context, cliMod plugin.CommandLineMod, jobName string) (plugin.Answers, error) {
questionRequest := plugin.GetQuestionsRequest{JobName: jobName}
questionResponse, err := cliMod.GetQuestions(ctx, questionRequest)
if err != nil {
return nil, err
}

answers := models.PluginAnswers{}
answers := plugin.Answers{}
for _, question := range questionResponse.Questions {
responseAnswer, err := j.jobSurvey.askCliModSurveyQuestion(ctx, cliMod, question)
if err != nil {
Expand Down
21 changes: 11 additions & 10 deletions client/cmd/internal/survey/job_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/odpf/optimus/client/local/model"
"github.com/odpf/optimus/internal/models"
"github.com/odpf/optimus/internal/utils"
"github.com/odpf/optimus/sdk/plugin"
)

const (
Expand Down Expand Up @@ -42,7 +43,7 @@ func NewJobCreateSurvey() *JobCreateSurvey {
}

// AskToCreateJob asks questions to create job
func (j *JobCreateSurvey) AskToCreateJob(pluginRepo models.PluginRepository, jobSpecReader local.SpecReader[*model.JobSpec], jobDir, defaultJobName string) (model.JobSpec, error) {
func (j *JobCreateSurvey) AskToCreateJob(pluginRepo *models.PluginRepository, jobSpecReader local.SpecReader[*model.JobSpec], jobDir, defaultJobName string) (model.JobSpec, error) {
availableTaskNames := j.getAvailableTaskNames(pluginRepo)
if len(availableTaskNames) == 0 {
return model.JobSpec{}, errors.New("no supported task plugin found")
Expand Down Expand Up @@ -85,9 +86,9 @@ func (j *JobCreateSurvey) AskToCreateJob(pluginRepo models.PluginRepository, job
return jobInput, nil
}

func (*JobCreateSurvey) getJobAsset(cliMod models.CommandLineMod, answers models.PluginAnswers) (map[string]string, error) {
func (*JobCreateSurvey) getJobAsset(cliMod plugin.CommandLineMod, answers plugin.Answers) (map[string]string, error) {
ctx := context.Background()
defaultAssetRequest := models.DefaultAssetsRequest{Answers: answers}
defaultAssetRequest := plugin.DefaultAssetsRequest{Answers: answers}
generatedAssetResponse, err := cliMod.DefaultAssets(ctx, defaultAssetRequest)
if err != nil {
return nil, err
Expand All @@ -99,9 +100,9 @@ func (*JobCreateSurvey) getJobAsset(cliMod models.CommandLineMod, answers models
return asset, nil
}

func (*JobCreateSurvey) getTaskConfig(cliMod models.CommandLineMod, answers models.PluginAnswers) (map[string]string, error) {
func (*JobCreateSurvey) getTaskConfig(cliMod plugin.CommandLineMod, answers plugin.Answers) (map[string]string, error) {
ctx := context.Background()
defaultConfigRequest := models.DefaultConfigRequest{Answers: answers}
defaultConfigRequest := plugin.DefaultConfigRequest{Answers: answers}
generatedConfigResponse, err := cliMod.DefaultConfig(ctx, defaultConfigRequest)
if err != nil {
return nil, err
Expand All @@ -116,7 +117,7 @@ func (*JobCreateSurvey) getTaskConfig(cliMod models.CommandLineMod, answers mode
return taskConfig, nil
}

func (*JobCreateSurvey) getAvailableTaskNames(pluginRepo models.PluginRepository) []string {
func (*JobCreateSurvey) getAvailableTaskNames(pluginRepo *models.PluginRepository) []string {
plugins := pluginRepo.GetTasks()
var output []string
for _, task := range plugins {
Expand Down Expand Up @@ -220,23 +221,23 @@ func (j *JobCreateSurvey) askCreateQuestions(questions []*survey.Question) (mode
}, nil
}

func (*JobCreateSurvey) getPluginCliMod(pluginRepo models.PluginRepository, taskName string) (models.CommandLineMod, error) {
func (*JobCreateSurvey) getPluginCliMod(pluginRepo *models.PluginRepository, taskName string) (plugin.CommandLineMod, error) {
plugin, err := pluginRepo.GetByName(taskName)
if err != nil {
return nil, err
}
return plugin.GetSurveyMod(), nil
}

func (j *JobCreateSurvey) askPluginQuestions(cliMod models.CommandLineMod, jobName string) (models.PluginAnswers, error) {
func (j *JobCreateSurvey) askPluginQuestions(cliMod plugin.CommandLineMod, jobName string) (plugin.Answers, error) {
ctx := context.Background()
questionRequest := models.GetQuestionsRequest{JobName: jobName}
questionRequest := plugin.GetQuestionsRequest{JobName: jobName}
questionResponse, err := cliMod.GetQuestions(ctx, questionRequest)
if err != nil {
return nil, err
}

answers := models.PluginAnswers{}
answers := plugin.Answers{}
for _, question := range questionResponse.Questions {
subAnswers, err := j.jobSurvey.askCliModSurveyQuestion(ctx, cliMod, question)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/job/addhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type addHookCommand struct {
jobSurvey *survey.JobSurvey
jobAddHookSurvey *survey.JobAddHookSurvey
namespaceSurvey *survey.NamespaceSurvey
pluginRepo *models.RegisteredPlugins
pluginRepo *models.PluginRepository
}

// NewAddHookCommand initializes command for adding hook
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/job/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type createCommand struct {
clientConfig *config.ClientConfig
namespaceSurvey *survey.NamespaceSurvey
jobCreateSurvey *survey.JobCreateSurvey
pluginRepo *models.RegisteredPlugins
pluginRepo *models.PluginRepository
}

// NewCreateCommand initializes job create command
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type versionCommand struct {
isWithServer bool
host string

pluginRepo models.PluginRepository
pluginRepo *models.PluginRepository
}

// NewVersionCommand initializes command to get version
Expand Down
3 changes: 2 additions & 1 deletion core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/odpf/optimus/internal/models"
"github.com/odpf/optimus/internal/writer"
pb "github.com/odpf/optimus/protos/odpf/optimus/core/v1beta1"
"github.com/odpf/optimus/sdk/plugin"
)

type JobHandler struct {
Expand All @@ -37,7 +38,7 @@ type JobService interface {
Update(ctx context.Context, jobTenant tenant.Tenant, jobs []*job.Spec) error
Delete(ctx context.Context, jobTenant tenant.Tenant, jobName job.Name, cleanFlag bool, forceFlag bool) (affectedDownstream []job.FullName, err error)
Get(ctx context.Context, jobTenant tenant.Tenant, jobName job.Name) (jobSpec *job.Job, err error)
GetTaskInfo(ctx context.Context, task job.Task) (*models.PluginInfoResponse, error)
GetTaskInfo(ctx context.Context, task job.Task) (*plugin.Info, error)
GetByFilter(ctx context.Context, filters ...filter.FilterOpt) (jobSpecs []*job.Job, err error)
ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, jobs []*job.Spec, jobNamesWithValidationError []job.Name, logWriter writer.LogWriter) error
Refresh(ctx context.Context, projectName tenant.ProjectName, namespaceNames []string, jobNames []string, logWriter writer.LogWriter) error
Expand Down
11 changes: 6 additions & 5 deletions core/job/handler/v1beta1/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/odpf/optimus/internal/models"
"github.com/odpf/optimus/internal/writer"
pb "github.com/odpf/optimus/protos/odpf/optimus/core/v1beta1"
"github.com/odpf/optimus/sdk/plugin"
)

func TestNewJobHandler(t *testing.T) {
Expand Down Expand Up @@ -1793,7 +1794,7 @@ func TestNewJobHandler(t *testing.T) {
JobName: jobA.Spec().Name().String(),
}

taskInfo := &models.PluginInfoResponse{
taskInfo := &plugin.Info{
Name: "bq2bq",
Description: "task info desc",
Image: "odpf/bq2bq:latest",
Expand Down Expand Up @@ -1951,15 +1952,15 @@ func (_m *JobService) GetJobBasicInfo(ctx context.Context, jobTenant tenant.Tena
}

// GetTaskInfo provides a mock function with given fields: ctx, task
func (_m *JobService) GetTaskInfo(ctx context.Context, task job.Task) (*models.PluginInfoResponse, error) {
func (_m *JobService) GetTaskInfo(ctx context.Context, task job.Task) (*plugin.Info, error) {
ret := _m.Called(ctx, task)

var r0 *models.PluginInfoResponse
if rf, ok := ret.Get(0).(func(context.Context, job.Task) *models.PluginInfoResponse); ok {
var r0 *plugin.Info
if rf, ok := ret.Get(0).(func(context.Context, job.Task) *plugin.Info); ok {
r0 = rf(ctx, task)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*models.PluginInfoResponse)
r0 = ret.Get(0).(*plugin.Info)
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/odpf/optimus/core/tenant"
"github.com/odpf/optimus/internal/errors"
"github.com/odpf/optimus/internal/lib/tree"
"github.com/odpf/optimus/internal/models"
"github.com/odpf/optimus/internal/writer"
"github.com/odpf/optimus/sdk/plugin"
)

const (
Expand Down Expand Up @@ -45,7 +45,7 @@ func NewJobService(repo JobRepository, pluginService PluginService, upstreamReso
}

type PluginService interface {
Info(context.Context, job.TaskName) (*models.PluginInfoResponse, error)
Info(context.Context, job.TaskName) (*plugin.Info, error)
GenerateDestination(context.Context, *tenant.WithDetails, job.Task) (job.ResourceURN, error)
GenerateUpstreams(ctx context.Context, jobTenant *tenant.WithDetails, spec *job.Spec, dryRun bool) ([]job.ResourceURN, error)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (j JobService) Get(ctx context.Context, jobTenant tenant.Tenant, jobName jo
return &job.Job{}, nil
}

func (j JobService) GetTaskInfo(ctx context.Context, task job.Task) (*models.PluginInfoResponse, error) {
func (j JobService) GetTaskInfo(ctx context.Context, task job.Task) (*plugin.Info, error) {
return j.pluginService.Info(ctx, task.Name())
}

Expand Down
11 changes: 6 additions & 5 deletions core/job/service/job_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
optErrors "github.com/odpf/optimus/internal/errors"
"github.com/odpf/optimus/internal/models"
"github.com/odpf/optimus/internal/writer"
"github.com/odpf/optimus/sdk/plugin"
)

func TestJobService(t *testing.T) {
Expand Down Expand Up @@ -1743,7 +1744,7 @@ func TestJobService(t *testing.T) {
pluginService := new(PluginService)
defer pluginService.AssertExpectations(t)

pluginInfoResp := &models.PluginInfoResponse{
pluginInfoResp := &plugin.Info{
Name: "bq2bq",
Description: "plugin desc",
Image: "odpf/bq2bq:latest",
Expand Down Expand Up @@ -2590,15 +2591,15 @@ func (_m *PluginService) GenerateUpstreams(ctx context.Context, jobTenant *tenan
}

// Info provides a mock function with given fields: _a0, _a1
func (_m *PluginService) Info(_a0 context.Context, _a1 job.TaskName) (*models.PluginInfoResponse, error) {
func (_m *PluginService) Info(_a0 context.Context, _a1 job.TaskName) (*plugin.Info, error) {
ret := _m.Called(_a0, _a1)

var r0 *models.PluginInfoResponse
if rf, ok := ret.Get(0).(func(context.Context, job.TaskName) *models.PluginInfoResponse); ok {
var r0 *plugin.Info
if rf, ok := ret.Get(0).(func(context.Context, job.TaskName) *plugin.Info); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*models.PluginInfoResponse)
r0 = ret.Get(0).(*plugin.Info)
}
}

Expand Down
Loading

0 comments on commit fdbec95

Please sign in to comment.