diff --git a/api/v2.0/swagger.yaml b/api/v2.0/swagger.yaml index 48c604b1557..c6ad6db13d4 100644 --- a/api/v2.0/swagger.yaml +++ b/api/v2.0/swagger.yaml @@ -7095,6 +7095,9 @@ definitions: type: boolean description: Whether the preheat policy enabled x-omitempty: false + scope: + type: string + description: The scope of preheat policy creation_time: type: string format: date-time diff --git a/make/migrations/postgresql/0150_2.12.0_schema.up.sql b/make/migrations/postgresql/0150_2.12.0_schema.up.sql index ce167b83ebc..808a00160da 100644 --- a/make/migrations/postgresql/0150_2.12.0_schema.up.sql +++ b/make/migrations/postgresql/0150_2.12.0_schema.up.sql @@ -3,3 +3,5 @@ Add new column creator_ref and creator_type for robot table to record the creato */ ALTER TABLE robot ADD COLUMN IF NOT EXISTS creator_ref integer default 0; ALTER TABLE robot ADD COLUMN IF NOT EXISTS creator_type varchar(255); + +ALTER TABLE p2p_preheat_policy ADD COLUMN IF NOT EXISTS scope varchar(255); \ No newline at end of file diff --git a/src/controller/p2p/preheat/enforcer.go b/src/controller/p2p/preheat/enforcer.go index 2a0cc058535..005c60809f5 100644 --- a/src/controller/p2p/preheat/enforcer.go +++ b/src/controller/p2p/preheat/enforcer.go @@ -402,7 +402,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s // Start tasks count := 0 for _, c := range candidates { - if _, err = de.startTask(ctx, eid, c, insData); err != nil { + if _, err = de.startTask(ctx, eid, c, insData, pl.Scope); err != nil { // Just log the error and skip log.Errorf("start task error for preheating image: %s/%s:%s@%s", c.Namespace, c.Repository, c.Tags[0], c.Digest) continue @@ -421,7 +421,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s } // startTask starts the preheat task(job) for the given candidate -func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, candidate *selector.Candidate, instance string) (int64, error) { +func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, candidate *selector.Candidate, instance, scope string) (int64, error) { u, err := de.fullURLGetter(candidate) if err != nil { return -1, err @@ -441,6 +441,7 @@ func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, can ImageName: fmt.Sprintf("%s/%s", candidate.Namespace, candidate.Repository), Tag: candidate.Tags[0], Digest: candidate.Digest, + Scope: scope, } piData, err := pi.ToJSON() diff --git a/src/controller/p2p/preheat/enforcer_test.go b/src/controller/p2p/preheat/enforcer_test.go index 9d6ed5f2434..bbaadc9e607 100644 --- a/src/controller/p2p/preheat/enforcer_test.go +++ b/src/controller/p2p/preheat/enforcer_test.go @@ -210,6 +210,7 @@ func mockPolicies() []*po.Schema { Type: po.TriggerTypeManual, }, Enabled: true, + Scope: "single_peer", CreatedAt: time.Now().UTC(), UpdatedTime: time.Now().UTC(), }, { @@ -235,6 +236,7 @@ func mockPolicies() []*po.Schema { Trigger: &po.Trigger{ Type: po.TriggerTypeEventBased, }, + Scope: "all_peers", Enabled: true, CreatedAt: time.Now().UTC(), UpdatedTime: time.Now().UTC(), diff --git a/src/pkg/p2p/preheat/instance/manager.go b/src/pkg/p2p/preheat/instance/manager.go index acb43b0c9e1..3f4418a1013 100644 --- a/src/pkg/p2p/preheat/instance/manager.go +++ b/src/pkg/p2p/preheat/instance/manager.go @@ -16,6 +16,7 @@ package instance import ( "context" + "encoding/json" "github.com/goharbor/harbor/src/lib/q" dao "github.com/goharbor/harbor/src/pkg/p2p/preheat/dao/instance" @@ -114,7 +115,18 @@ func (dm *manager) Update(ctx context.Context, inst *provider.Instance, props .. // Get implements @Manager.Get func (dm *manager) Get(ctx context.Context, id int64) (*provider.Instance, error) { - return dm.dao.Get(ctx, id) + ins, err := dm.dao.Get(ctx, id) + if err != nil { + return nil, err + } + // mapping auth data to auth info. + if len(ins.AuthData) > 0 { + if err := json.Unmarshal([]byte(ins.AuthData), &ins.AuthInfo); err != nil { + return nil, err + } + } + + return ins, nil } // Get implements @Manager.GetByName diff --git a/src/pkg/p2p/preheat/job.go b/src/pkg/p2p/preheat/job.go index 7321260b196..ee68a6272d3 100644 --- a/src/pkg/p2p/preheat/job.go +++ b/src/pkg/p2p/preheat/job.go @@ -191,6 +191,11 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error { return preheatJobRunningError(errors.Errorf("preheat failed: %s", s)) case provider.PreheatingStatusSuccess: // Finished + // log the message if received message from provider. + if s.Message != "" { + myLogger.Infof("Preheat job finished, message from provider: \n%s", s.Message) + } + return nil default: // do nothing, check again diff --git a/src/pkg/p2p/preheat/models/policy/policy.go b/src/pkg/p2p/preheat/models/policy/policy.go index 1a59bdddf40..b9e410b1c25 100644 --- a/src/pkg/p2p/preheat/models/policy/policy.go +++ b/src/pkg/p2p/preheat/models/policy/policy.go @@ -16,12 +16,10 @@ package policy import ( "encoding/json" - "fmt" "strconv" "time" beego_orm "github.com/beego/beego/v2/client/orm" - "github.com/beego/beego/v2/core/validation" "github.com/goharbor/harbor/src/common/utils" "github.com/goharbor/harbor/src/lib/errors" @@ -32,6 +30,9 @@ func init() { beego_orm.RegisterModel(&Schema{}) } +// ScopeType represents the preheat scope type. +type ScopeType = string + const ( // Filters: // Repository : type=Repository value=name text (double star pattern used) @@ -57,6 +58,11 @@ const ( TriggerTypeScheduled TriggerType = "scheduled" // TriggerTypeEventBased represents the event_based trigger type TriggerTypeEventBased TriggerType = "event_based" + + // ScopeTypeSinglePeer represents preheat image to single peer in p2p cluster. + ScopeTypeSinglePeer ScopeType = "single_peer" + // ScopeTypeAllPeers represents preheat image to all peers in p2p cluster. + ScopeTypeAllPeers ScopeType = "all_peers" ) // Schema defines p2p preheat policy schema @@ -72,8 +78,10 @@ type Schema struct { FiltersStr string `orm:"column(filters)" json:"-"` Trigger *Trigger `orm:"-" json:"trigger"` // Use JSON data format (query by trigger type should be supported) - TriggerStr string `orm:"column(trigger)" json:"-"` - Enabled bool `orm:"column(enabled)" json:"enabled"` + TriggerStr string `orm:"column(trigger)" json:"-"` + Enabled bool `orm:"column(enabled)" json:"enabled"` + // Scope decides the preheat scope. + Scope string `orm:"column(scope)" json:"scope"` CreatedAt time.Time `orm:"column(creation_time)" json:"creation_time"` UpdatedTime time.Time `orm:"column(update_time)" json:"update_time"` } @@ -127,65 +135,13 @@ func (s *Schema) ValidatePreheatPolicy() error { WithMessage("invalid cron string for scheduled preheat: %s, error: %v", s.Trigger.Settings.Cron, err) } } - return nil -} -// Valid the policy -func (s *Schema) Valid(v *validation.Validation) { - if len(s.Name) == 0 { - _ = v.SetError("name", "cannot be empty") + // validate preheat scope + if s.Scope != "" && s.Scope != ScopeTypeSinglePeer && s.Scope != ScopeTypeAllPeers { + return errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid scope for preheat policy: %s", s.Scope) } - // valid the filters - for _, filter := range s.Filters { - switch filter.Type { - case FilterTypeRepository, FilterTypeTag, FilterTypeVulnerability: - _, ok := filter.Value.(string) - if !ok { - _ = v.SetError("filters", "the type of filter value isn't string") - break - } - case FilterTypeSignature: - _, ok := filter.Value.(bool) - if !ok { - _ = v.SetError("filers", "the type of signature filter value isn't bool") - break - } - case FilterTypeLabel: - labels, ok := filter.Value.([]interface{}) - if !ok { - _ = v.SetError("filters", "the type of label filter value isn't string slice") - break - } - for _, label := range labels { - _, ok := label.(string) - if !ok { - _ = v.SetError("filters", "the type of label filter value isn't string slice") - break - } - } - default: - _ = v.SetError("filters", "invalid filter type") - } - } - - // valid trigger - if s.Trigger != nil { - switch s.Trigger.Type { - case TriggerTypeManual, TriggerTypeEventBased: - case TriggerTypeScheduled: - if len(s.Trigger.Settings.Cron) == 0 { - _ = v.SetError("trigger", fmt.Sprintf("the cron string cannot be empty when the trigger type is %s", TriggerTypeScheduled)) - } else { - _, err := utils.CronParser().Parse(s.Trigger.Settings.Cron) - if err != nil { - _ = v.SetError("trigger", fmt.Sprintf("invalid cron string for scheduled trigger: %s", s.Trigger.Settings.Cron)) - } - } - default: - _ = v.SetError("trigger", "invalid trigger type") - } - } + return nil } // Encode encodes policy schema. diff --git a/src/pkg/p2p/preheat/models/policy/policy_test.go b/src/pkg/p2p/preheat/models/policy/policy_test.go index e2310933a6a..71a1e9ca1bd 100644 --- a/src/pkg/p2p/preheat/models/policy/policy_test.go +++ b/src/pkg/p2p/preheat/models/policy/policy_test.go @@ -17,8 +17,6 @@ package policy import ( "testing" - "github.com/beego/beego/v2/core/validation" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -66,92 +64,13 @@ func (p *PolicyTestSuite) TestValidatePreheatPolicy() { // valid cron string p.schema.Trigger.Settings.Cron = "0 0 0 1 1 *" p.NoError(p.schema.ValidatePreheatPolicy()) -} - -// TestValid tests Valid method. -func (p *PolicyTestSuite) TestValid() { - // policy name is empty, should return error - v := &validation.Validation{} - p.schema.Valid(v) - require.True(p.T(), v.HasErrors(), "no policy name should return one error") - require.Contains(p.T(), v.Errors[0].Error(), "cannot be empty") - - // policy with name but with error filter type - p.schema.Name = "policy-test" - p.schema.Filters = []*Filter{ - { - Type: "invalid-type", - }, - } - v = &validation.Validation{} - p.schema.Valid(v) - require.True(p.T(), v.HasErrors(), "invalid filter type should return one error") - require.Contains(p.T(), v.Errors[0].Error(), "invalid filter type") - - filterCases := [][]*Filter{ - { - { - Type: FilterTypeSignature, - Value: "invalid-value", - }, - }, - - { - { - Type: FilterTypeTag, - Value: true, - }, - }, - { - { - Type: FilterTypeLabel, - Value: "invalid-value", - }, - }, - } - // with valid filter type but with error value type - for _, filters := range filterCases { - p.schema.Filters = filters - v = &validation.Validation{} - p.schema.Valid(v) - require.True(p.T(), v.HasErrors(), "invalid filter value type should return one error") - } - // with valid filter but error trigger type - p.schema.Filters = []*Filter{ - { - Type: FilterTypeSignature, - Value: true, - }, - } - p.schema.Trigger = &Trigger{ - Type: "invalid-type", - } - v = &validation.Validation{} - p.schema.Valid(v) - require.True(p.T(), v.HasErrors(), "invalid trigger type should return one error") - require.Contains(p.T(), v.Errors[0].Error(), "invalid trigger type") - - // with valid filter but error trigger value - p.schema.Trigger = &Trigger{ - Type: TriggerTypeScheduled, - } - v = &validation.Validation{} - p.schema.Valid(v) - require.True(p.T(), v.HasErrors(), "invalid trigger value should return one error") - require.Contains(p.T(), v.Errors[0].Error(), "the cron string cannot be empty") - // with invalid cron - p.schema.Trigger.Settings.Cron = "1111111111111" - v = &validation.Validation{} - p.schema.Valid(v) - require.True(p.T(), v.HasErrors(), "invalid trigger value should return one error") - require.Contains(p.T(), v.Errors[0].Error(), "invalid cron string for scheduled trigger") - - // all is well - p.schema.Trigger.Settings.Cron = "0/12 * * * * *" - v = &validation.Validation{} - p.schema.Valid(v) - require.False(p.T(), v.HasErrors(), "should return nil error") + // invalid preheat scope + p.schema.Scope = "invalid scope" + p.Error(p.schema.ValidatePreheatPolicy()) + // valid preheat scope + p.schema.Scope = "single_peer" + p.NoError(p.schema.ValidatePreheatPolicy()) } // TestDecode tests decode. @@ -167,11 +86,14 @@ func (p *PolicyTestSuite) TestDecode() { Trigger: nil, TriggerStr: "{\"type\":\"event_based\",\"trigger_setting\":{\"cron\":\"\"}}", Enabled: false, + Scope: "all_peers", } p.NoError(s.Decode()) p.Len(s.Filters, 3) p.NotNil(s.Trigger) + p.Equal(ScopeTypeAllPeers, s.Scope) + // invalid filter or trigger s.FiltersStr = "" s.TriggerStr = "invalid" @@ -210,8 +132,10 @@ func (p *PolicyTestSuite) TestEncode() { }, TriggerStr: "", Enabled: false, + Scope: "single_peer", } p.NoError(s.Encode()) p.Equal(`[{"type":"repository","value":"**"},{"type":"tag","value":"**"},{"type":"label","value":"test"}]`, s.FiltersStr) p.Equal(`{"type":"event_based","trigger_setting":{}}`, s.TriggerStr) + p.Equal(ScopeTypeSinglePeer, s.Scope) } diff --git a/src/pkg/p2p/preheat/provider/dragonfly.go b/src/pkg/p2p/preheat/provider/dragonfly.go index 996594f5a76..32b3b6bc11e 100644 --- a/src/pkg/p2p/preheat/provider/dragonfly.go +++ b/src/pkg/p2p/preheat/provider/dragonfly.go @@ -15,37 +15,139 @@ package provider import ( + "bytes" "encoding/json" "fmt" - "net/http" "strings" + "time" - common_http "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/lib" "github.com/goharbor/harbor/src/lib/errors" "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/auth" "github.com/goharbor/harbor/src/pkg/p2p/preheat/provider/client" + + "github.com/olekukonko/tablewriter" +) + +const ( + // dragonflyHealthPath is the health check path for dragonfly openapi. + dragonflyHealthPath = "/healthy" + + // dragonflyJobPath is the job path for dragonfly openapi. + dragonflyJobPath = "/oapi/v1/jobs" ) const ( - healthCheckEndpoint = "/_ping" - preheatEndpoint = "/preheats" - preheatTaskEndpoint = "/preheats/{task_id}" - dragonflyPending = "WAITING" - dragonflyFailed = "FAILED" + // dragonflyJobPendingState is the pending state of the job, which means + // the job is waiting to be processed and running. + dragonflyJobPendingState = "PENDING" + + // dragonflyJobSuccessState is the success state of the job, which means + // the job is processed successfully. + dragonflyJobSuccessState = "SUCCESS" + + // dragonflyJobFailureState is the failure state of the job, which means + // the job is processed failed. + dragonflyJobFailureState = "FAILURE" ) -type dragonflyPreheatCreateResp struct { - ID string `json:"ID"` +type dragonflyCreateJobRequest struct { + // Type is the job type, support preheat. + Type string `json:"type" binding:"required"` + + // Args is the preheating args. + Args dragonflyCreateJobRequestArgs `json:"args" binding:"omitempty"` + + // SchedulerClusterIDs is the scheduler cluster ids for preheating. + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } -type dragonflyPreheatInfo struct { - ID string `json:"ID"` - StartTime string `json:"startTime,omitempty"` - FinishTime string `json:"finishTime,omitempty"` - ErrorMsg string `json:"errorMsg"` - Status string +type dragonflyCreateJobRequestArgs struct { + // Type is the preheating type, support image and file. + Type string `json:"type"` + + // URL is the image url for preheating. + URL string `json:"url"` + + // Tag is the tag for preheating. + Tag string `json:"tag"` + + // FilteredQueryParams is the filtered query params for preheating. + FilteredQueryParams string `json:"filtered_query_params"` + + // Headers is the http headers for authentication. + Headers map[string]string `json:"headers"` + + // Scope is the scope for preheating, default is single_peer. + Scope string `json:"scope"` + + // BatchSize is the batch size for preheating all peers, default is 50. + ConcurrentCount int64 `json:"concurrent_count"` + + // Timeout is the timeout for preheating, default is 30 minutes. + Timeout time.Duration `json:"timeout"` +} + +type dragonflyJobResponse struct { + // ID is the job id. + ID int `json:"id"` + + // CreatedAt is the job created time. + CreatedAt time.Time `json:"created_at"` + + // UpdatedAt is the job updated time. + UpdatedAt time.Time `json:"updated_at"` + + // State is the job state, support PENDING, SUCCESS, FAILURE. + State string `json:"state"` + + // Results is the job results. + Result struct { + + // JobStates is the job states, including each job state. + JobStates []struct { + + // Error is the job error message. + Error string `json:"error"` + + // Results is the job results. + Results []struct { + + // SuccessTasks is the success tasks. + SuccessTasks []*struct { + + // URL is the url of the task, which is the blob url. + URL string `json:"url"` + + // Hostname is the hostname of the task. + Hostname string `json:"hostname"` + + // IP is the ip of the task. + IP string `json:"ip"` + } `json:"success_tasks"` + + // FailureTasks is the failure tasks. + FailureTasks []*struct { + + // URL is the url of the task, which is the blob url. + URL string `json:"url"` + + // Hostname is the hostname of the task. + Hostname string `json:"hostname"` + + // IP is the ip of the task. + IP string `json:"ip"` + + // Description is the failure description. + Description string `json:"description"` + } `json:"failure_tasks"` + + // SchedulerClusterID is the scheduler cluster id. + SchedulerClusterID uint `json:"scheduler_cluster_id"` + } `json:"results"` + } `json:"job_states"` + } `json:"result"` } // DragonflyDriver implements the provider driver interface for Alibaba dragonfly. @@ -59,10 +161,10 @@ func (dd *DragonflyDriver) Self() *Metadata { return &Metadata{ ID: "dragonfly", Name: "Dragonfly", - Icon: "https://raw.githubusercontent.com/alibaba/Dragonfly/master/docs/images/logo.png", - Version: "0.10.1", - Source: "https://github.com/alibaba/Dragonfly", - Maintainers: []string{"Jin Zhang/taiyun.zj@alibaba-inc.com"}, + Icon: "https://raw.githubusercontent.com/dragonflyoss/Dragonfly2/master/docs/images/logo/dragonfly-linear.png", + Version: "2.1.57", + Source: "https://github.com/dragonflyoss/Dragonfly2", + Maintainers: []string{"chlins.zhang@gmail.com", "gaius.qi@gmail.com"}, } } @@ -72,13 +174,13 @@ func (dd *DragonflyDriver) GetHealth() (*DriverStatus, error) { return nil, errors.New("missing instance metadata") } - url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), healthCheckEndpoint) + url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyHealthPath) url, err := lib.ValidateHTTPURL(url) if err != nil { return nil, err } - _, err = client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil) - if err != nil { + + if _, err = client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil); err != nil { // Unhealthy return nil, err } @@ -99,97 +201,112 @@ func (dd *DragonflyDriver) Preheat(preheatingImage *PreheatImage) (*PreheatingSt return nil, errors.New("no image specified") } - taskStatus := provider.PreheatingStatusPending // default - url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), preheatEndpoint) - bytes, err := client.GetHTTPClient(dd.instance.Insecure).Post(url, dd.getCred(), preheatingImage, nil) + // Construct the preheat job request by the given parameters of the preheating image . + req := &dragonflyCreateJobRequest{ + Type: "preheat", + // TODO: Support set SchedulerClusterIDs, FilteredQueryParam, ConcurrentCount and Timeout. + Args: dragonflyCreateJobRequestArgs{ + Type: preheatingImage.Type, + URL: preheatingImage.URL, + Headers: headerToMapString(preheatingImage.Headers), + Scope: preheatingImage.Scope, + }, + } + + url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyJobPath) + data, err := client.GetHTTPClient(dd.instance.Insecure).Post(url, dd.getCred(), req, nil) if err != nil { - if httpErr, ok := err.(*common_http.Error); ok && httpErr.Code == http.StatusAlreadyReported { - // If the resource was preheated already with empty task ID, we should set preheat status to success. - // Otherwise later querying for the task - taskStatus = provider.PreheatingStatusSuccess - } else { - return nil, err - } + return nil, err } - result := &dragonflyPreheatCreateResp{} - if err := json.Unmarshal(bytes, result); err != nil { + resp := &dragonflyJobResponse{} + if err := json.Unmarshal(data, resp); err != nil { return nil, err } return &PreheatingStatus{ - TaskID: result.ID, - Status: taskStatus, + TaskID: fmt.Sprintf("%d", resp.ID), + Status: provider.PreheatingStatusPending, + StartTime: resp.CreatedAt.Format(time.RFC3339), + FinishTime: resp.UpdatedAt.Format(time.RFC3339), }, nil } // CheckProgress implements @Driver.CheckProgress. func (dd *DragonflyDriver) CheckProgress(taskID string) (*PreheatingStatus, error) { - status, err := dd.getProgressStatus(taskID) - if err != nil { - return nil, err + if dd.instance == nil { + return nil, errors.New("missing instance metadata") } - // If preheat job already exists - if strings.Contains(status.ErrorMsg, "preheat task already exists, id:") { - if taskID, err = getTaskExistedFromErrMsg(status.ErrorMsg); err != nil { - return nil, err - } - if status, err = dd.getProgressStatus(taskID); err != nil { - return nil, err - } + if taskID == "" { + return nil, errors.New("no task ID") } - if status.Status == dragonflyPending { - status.Status = provider.PreheatingStatusPending - } else if status.Status == dragonflyFailed { - status.Status = provider.PreheatingStatusFail + url := fmt.Sprintf("%s%s/%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), dragonflyJobPath, taskID) + data, err := client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil) + if err != nil { + return nil, err } - res := &PreheatingStatus{ - Status: status.Status, - TaskID: taskID, - } - if status.StartTime != "" { - res.StartTime = status.StartTime - } - if status.FinishTime != "" { - res.FinishTime = status.FinishTime + resp := &dragonflyJobResponse{} + if err := json.Unmarshal(data, resp); err != nil { + return nil, err } - return res, nil -} + var ( + successMessage string + errorMessage string + ) -func getTaskExistedFromErrMsg(msg string) (string, error) { - begin := strings.Index(msg, "preheat task already exists, id:") + 32 - end := strings.LastIndex(msg, "\"}") - if end-begin <= 0 { - return "", errors.Errorf("can't find existed task id by error msg:%s", msg) - } - return msg[begin:end], nil -} + var state string + switch resp.State { + case dragonflyJobPendingState: + state = provider.PreheatingStatusRunning + case dragonflyJobSuccessState: + state = provider.PreheatingStatusSuccess -func (dd *DragonflyDriver) getProgressStatus(taskID string) (*dragonflyPreheatInfo, error) { - if dd.instance == nil { - return nil, errors.New("missing instance metadata") - } + var buffer bytes.Buffer + table := tablewriter.NewWriter(&buffer) + table.SetHeader([]string{"Blob URL", "Hostname", "IP", "Cluster ID", "State", "Error Message"}) + for _, jobState := range resp.Result.JobStates { + for _, result := range jobState.Results { + // Write the success tasks records to the table. + for _, successTask := range result.SuccessTasks { + table.Append([]string{successTask.URL, successTask.Hostname, successTask.IP, fmt.Sprint(result.SchedulerClusterID), dragonflyJobSuccessState, ""}) + } - if len(taskID) == 0 { - return nil, errors.New("no task ID") - } + // Write the failure tasks records to the table. + for _, failureTask := range result.FailureTasks { + table.Append([]string{failureTask.URL, failureTask.Hostname, failureTask.IP, fmt.Sprint(result.SchedulerClusterID), dragonflyJobFailureState, failureTask.Description}) + } + } + } - path := strings.Replace(preheatTaskEndpoint, "{task_id}", taskID, 1) - url := fmt.Sprintf("%s%s", strings.TrimSuffix(dd.instance.Endpoint, "/"), path) - bytes, err := client.GetHTTPClient(dd.instance.Insecure).Get(url, dd.getCred(), nil, nil) - if err != nil { - return nil, err - } + table.Render() + successMessage = buffer.String() + case dragonflyJobFailureState: + var errs errors.Errors + state = provider.PreheatingStatusFail + for _, jobState := range resp.Result.JobStates { + errs = append(errs, errors.New(jobState.Error)) + } - status := &dragonflyPreheatInfo{} - if err := json.Unmarshal(bytes, status); err != nil { - return nil, err + if len(errs) > 0 { + errorMessage = errs.Error() + } + default: + state = provider.PreheatingStatusFail + errorMessage = fmt.Sprintf("unknown state: %s", resp.State) } - return status, nil + + return &PreheatingStatus{ + TaskID: fmt.Sprintf("%d", resp.ID), + Status: state, + Message: successMessage, + Error: errorMessage, + StartTime: resp.CreatedAt.Format(time.RFC3339), + FinishTime: resp.UpdatedAt.Format(time.RFC3339), + }, nil } func (dd *DragonflyDriver) getCred() *auth.Credential { @@ -198,3 +315,14 @@ func (dd *DragonflyDriver) getCred() *auth.Credential { Data: dd.instance.AuthInfo, } } + +func headerToMapString(header map[string]interface{}) map[string]string { + m := make(map[string]string) + for k, v := range header { + if s, ok := v.(string); ok { + m[k] = s + } + } + + return m +} diff --git a/src/pkg/p2p/preheat/provider/dragonfly_test.go b/src/pkg/p2p/preheat/provider/dragonfly_test.go index e7bfa658feb..e407cc77905 100644 --- a/src/pkg/p2p/preheat/provider/dragonfly_test.go +++ b/src/pkg/p2p/preheat/provider/dragonfly_test.go @@ -79,64 +79,41 @@ func (suite *DragonflyTestSuite) TestGetHealth() { // TestPreheat tests Preheat method. func (suite *DragonflyTestSuite) TestPreheat() { - // preheat first time st, err := suite.driver.Preheat(&PreheatImage{ Type: "image", ImageName: "busybox", Tag: "latest", URL: "https://harbor.com", Digest: "sha256:f3c97e3bd1e27393eb853a5c90b1132f2cda84336d5ba5d100c720dc98524c82", + Scope: "single_peer", }) require.NoError(suite.T(), err, "preheat image") - suite.Equal("dragonfly-id", st.TaskID, "preheat image result") - - // preheat the same image second time - st, err = suite.driver.Preheat(&PreheatImage{ - Type: "image", - ImageName: "busybox", - Tag: "latest", - URL: "https://harbor.com", - Digest: "sha256:f3c97e3bd1e27393eb853a5c90b1132f2cda84336d5ba5d100c720dc98524c82", - }) - require.NoError(suite.T(), err, "preheat image") - suite.Equal("", st.TaskID, "preheat image result") - - // preheat image digest is empty - st, err = suite.driver.Preheat(&PreheatImage{ - ImageName: "", - }) - require.Error(suite.T(), err, "preheat image") + suite.Equal(provider.PreheatingStatusPending, st.Status, "preheat status") + suite.Equal("0", st.TaskID, "task id") + suite.NotEmptyf(st.StartTime, "start time") + suite.NotEmptyf(st.FinishTime, "finish time") } // TestCheckProgress tests CheckProgress method. func (suite *DragonflyTestSuite) TestCheckProgress() { - st, err := suite.driver.CheckProgress("dragonfly-id") - require.NoError(suite.T(), err, "get preheat status") + st, err := suite.driver.CheckProgress("1") + require.NoError(suite.T(), err, "get image") + suite.Equal(provider.PreheatingStatusRunning, st.Status, "preheat status") + suite.Equal("1", st.TaskID, "task id") + suite.NotEmptyf(st.StartTime, "start time") + suite.NotEmptyf(st.FinishTime, "finish time") + + st, err = suite.driver.CheckProgress("2") + require.NoError(suite.T(), err, "get image") suite.Equal(provider.PreheatingStatusSuccess, st.Status, "preheat status") + suite.Equal("2", st.TaskID, "task id") + suite.NotEmptyf(st.StartTime, "start time") + suite.NotEmptyf(st.FinishTime, "finish time") - // preheat job exit but returns no id - st, err = suite.driver.CheckProgress("preheat-job-exist-with-no-id") - require.Error(suite.T(), err, "get preheat status") - - // preheat job exit returns id but get info with that failed - st, err = suite.driver.CheckProgress("preheat-job-exist-with-id-1") - require.Error(suite.T(), err, "get preheat status") - - // preheat job normal failed - st, err = suite.driver.CheckProgress("preheat-job-normal-failed") - require.NoError(suite.T(), err, "get preheat status") + st, err = suite.driver.CheckProgress("3") + require.NoError(suite.T(), err, "get image") suite.Equal(provider.PreheatingStatusFail, st.Status, "preheat status") - - // instance is empty - testDriver := &DragonflyDriver{} - st, err = testDriver.CheckProgress("") - require.Error(suite.T(), err, "get preheat status") - - // preheat job with no task id - st, err = suite.driver.CheckProgress("") - require.Error(suite.T(), err, "get preheat status") - - // preheat job with err json response - st, err = suite.driver.CheckProgress("preheat-job-err-body-json") - require.Error(suite.T(), err, "get preheat status") + suite.Equal("3", st.TaskID, "task id") + suite.NotEmptyf(st.StartTime, "start time") + suite.NotEmptyf(st.FinishTime, "finish time") } diff --git a/src/pkg/p2p/preheat/provider/driver.go b/src/pkg/p2p/preheat/provider/driver.go index 2702c6e318f..0c87bdda0a5 100644 --- a/src/pkg/p2p/preheat/provider/driver.go +++ b/src/pkg/p2p/preheat/provider/driver.go @@ -77,6 +77,7 @@ type DriverStatus struct { type PreheatingStatus struct { TaskID string `json:"task_id"` Status string `json:"status"` + Message string `json:"message,omitempty"` Error string `json:"error,omitempty"` StartTime string `json:"start_time"` FinishTime string `json:"finish_time"` diff --git a/src/pkg/p2p/preheat/provider/mock.go b/src/pkg/p2p/preheat/provider/mock.go index 3ff3973bbe6..eb3c7b6c251 100644 --- a/src/pkg/p2p/preheat/provider/mock.go +++ b/src/pkg/p2p/preheat/provider/mock.go @@ -16,10 +16,10 @@ package provider import ( "encoding/json" + "fmt" "io" "net/http" "net/http/httptest" - "strings" "time" "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/notification" @@ -32,126 +32,146 @@ var preheatMap = make(map[string]struct{}) func MockDragonflyProvider() *httptest.Server { return httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.RequestURI { - case healthCheckEndpoint: + case dragonflyHealthPath: if r.Method != http.MethodGet { w.WriteHeader(http.StatusNotImplemented) return } w.WriteHeader(http.StatusOK) - case preheatEndpoint: + case dragonflyJobPath: if r.Method != http.MethodPost { w.WriteHeader(http.StatusNotImplemented) return } - data, err := io.ReadAll(r.Body) + var resp = &dragonflyJobResponse{ + ID: 0, + State: dragonflyJobPendingState, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + bytes, err := json.Marshal(resp) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) return } - image := &PreheatImage{} - if err := json.Unmarshal(data, image); err != nil { + if _, err := w.Write(bytes); err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte(err.Error())) return } - if image.ImageName == "" { - w.WriteHeader(http.StatusBadRequest) + w.WriteHeader(http.StatusOK) + case fmt.Sprintf("%s/%s", dragonflyJobPath, "0"): + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusNotImplemented) return } - if _, ok := preheatMap[image.Digest]; ok { - w.WriteHeader(http.StatusAlreadyReported) - _, _ = w.Write([]byte(`{"ID":""}`)) - return + var resp = &dragonflyJobResponse{ + ID: 1, + State: dragonflyJobSuccessState, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), } - preheatMap[image.Digest] = struct{}{} - - if image.Type == "image" && - image.URL == "https://harbor.com" && - image.ImageName == "busybox" && - image.Tag == "latest" { - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(`{"ID":"dragonfly-id"}`)) + bytes, err := json.Marshal(resp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) return } - w.WriteHeader(http.StatusBadRequest) - case strings.Replace(preheatTaskEndpoint, "{task_id}", "dragonfly-id", 1): + if _, err := w.Write(bytes); err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return + } + case fmt.Sprintf("%s/%s", dragonflyJobPath, "1"): if r.Method != http.MethodGet { w.WriteHeader(http.StatusNotImplemented) return } - status := &dragonflyPreheatInfo{ - ID: "dragonfly-id", - StartTime: time.Now().UTC().String(), - FinishTime: time.Now().Add(5 * time.Minute).UTC().String(), - Status: "SUCCESS", + + var resp = &dragonflyJobResponse{ + ID: 1, + State: dragonflyJobPendingState, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), } - bytes, _ := json.Marshal(status) - _, _ = w.Write(bytes) - case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-exist-with-no-id", 1): - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusNotImplemented) + + bytes, err := json.Marshal(resp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) return } - status := &dragonflyPreheatInfo{ - ID: "preheat-exist-with-no-id", - StartTime: time.Now().UTC().String(), - FinishTime: time.Now().Add(5 * time.Minute).UTC().String(), - Status: "FAILED", - ErrorMsg: "{\"Code\":208,\"Msg\":\"preheat task already exists, id:\"}", + + if _, err := w.Write(bytes); err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return } - bytes, _ := json.Marshal(status) - _, _ = w.Write(bytes) - case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-normal-failed", 1): + + w.WriteHeader(http.StatusOK) + case fmt.Sprintf("%s/%s", dragonflyJobPath, "2"): if r.Method != http.MethodGet { w.WriteHeader(http.StatusNotImplemented) return } - status := &dragonflyPreheatInfo{ - ID: "preheat-job-exist-with-id-1", - StartTime: time.Now().UTC().String(), - FinishTime: time.Now().Add(5 * time.Minute).UTC().String(), - Status: "FAILED", - ErrorMsg: "{\"Code\":208,\"Msg\":\"some msg\"}", + + var resp = &dragonflyJobResponse{ + ID: 2, + State: dragonflyJobSuccessState, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), } - bytes, _ := json.Marshal(status) - _, _ = w.Write(bytes) - case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-exist-with-id-1", 1): - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusNotImplemented) + + bytes, err := json.Marshal(resp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) return } - status := &dragonflyPreheatInfo{ - ID: "preheat-job-exist-with-id-1", - StartTime: time.Now().UTC().String(), - FinishTime: time.Now().Add(5 * time.Minute).UTC().String(), - Status: "FAILED", - ErrorMsg: "{\"Code\":208,\"Msg\":\"preheat task already exists, id:preheat-job-exist-with-id-1-1\"}", + + if _, err := w.Write(bytes); err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return } - bytes, _ := json.Marshal(status) - _, _ = w.Write(bytes) - case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-exist-with-id-1-1", 1): + + w.WriteHeader(http.StatusOK) + case fmt.Sprintf("%s/%s", dragonflyJobPath, "3"): if r.Method != http.MethodGet { w.WriteHeader(http.StatusNotImplemented) return } - w.WriteHeader(http.StatusInternalServerError) - case strings.Replace(preheatTaskEndpoint, "{task_id}", "preheat-job-err-body-json", 1): - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusNotImplemented) + + var resp = &dragonflyJobResponse{ + ID: 3, + State: dragonflyJobFailureState, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + bytes, err := json.Marshal(resp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) return } - bodyStr := "\"err body\"" - _, _ = w.Write([]byte(bodyStr)) - default: - w.WriteHeader(http.StatusNotImplemented) + + if _, err := w.Write(bytes); err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + return + } + + w.WriteHeader(http.StatusOK) } })) } diff --git a/src/pkg/p2p/preheat/provider/preheat_image.go b/src/pkg/p2p/preheat/provider/preheat_image.go index e690d038544..8b576f96dec 100644 --- a/src/pkg/p2p/preheat/provider/preheat_image.go +++ b/src/pkg/p2p/preheat/provider/preheat_image.go @@ -45,6 +45,9 @@ type PreheatImage struct { // Digest of the preheating image Digest string `json:"digest"` + + // Scope indicates the preheat scope. + Scope string `json:"scope,omitempty"` } // FromJSON build preheating image from the given data. diff --git a/src/portal/src/app/base/project/p2p-provider/add-p2p-policy/add-p2p-policy.component.html b/src/portal/src/app/base/project/p2p-provider/add-p2p-policy/add-p2p-policy.component.html index 4dd78313eef..ea8c21937d8 100644 --- a/src/portal/src/app/base/project/p2p-provider/add-p2p-policy/add-p2p-policy.component.html +++ b/src/portal/src/app/base/project/p2p-provider/add-p2p-policy/add-p2p-policy.component.html @@ -457,6 +457,30 @@