Skip to content

Commit

Permalink
Refactor: p2p preheat dragonfly driver (#20922)
Browse files Browse the repository at this point in the history
  • Loading branch information
chlins authored Sep 21, 2024
1 parent 8d52a63 commit c97253f
Show file tree
Hide file tree
Showing 28 changed files with 472 additions and 352 deletions.
3 changes: 3 additions & 0 deletions api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7095,6 +7095,9 @@ definitions:
type: boolean
description: Whether the preheat policy enabled
x-omitempty: false
scope:
type: string
description: The scope of preheat policy
creation_time:
type: string
format: date-time
Expand Down
2 changes: 2 additions & 0 deletions make/migrations/postgresql/0150_2.12.0_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ Add new column creator_ref and creator_type for robot table to record the creato
*/
ALTER TABLE robot ADD COLUMN IF NOT EXISTS creator_ref integer default 0;
ALTER TABLE robot ADD COLUMN IF NOT EXISTS creator_type varchar(255);

ALTER TABLE p2p_preheat_policy ADD COLUMN IF NOT EXISTS scope varchar(255);
5 changes: 3 additions & 2 deletions src/controller/p2p/preheat/enforcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s
// Start tasks
count := 0
for _, c := range candidates {
if _, err = de.startTask(ctx, eid, c, insData); err != nil {
if _, err = de.startTask(ctx, eid, c, insData, pl.Scope); err != nil {
// Just log the error and skip
log.Errorf("start task error for preheating image: %s/%s:%s@%s", c.Namespace, c.Repository, c.Tags[0], c.Digest)
continue
Expand All @@ -421,7 +421,7 @@ func (de *defaultEnforcer) launchExecutions(ctx context.Context, candidates []*s
}

// startTask starts the preheat task(job) for the given candidate
func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, candidate *selector.Candidate, instance string) (int64, error) {
func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, candidate *selector.Candidate, instance, scope string) (int64, error) {
u, err := de.fullURLGetter(candidate)
if err != nil {
return -1, err
Expand All @@ -441,6 +441,7 @@ func (de *defaultEnforcer) startTask(ctx context.Context, executionID int64, can
ImageName: fmt.Sprintf("%s/%s", candidate.Namespace, candidate.Repository),
Tag: candidate.Tags[0],
Digest: candidate.Digest,
Scope: scope,
}

piData, err := pi.ToJSON()
Expand Down
2 changes: 2 additions & 0 deletions src/controller/p2p/preheat/enforcer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func mockPolicies() []*po.Schema {
Type: po.TriggerTypeManual,
},
Enabled: true,
Scope: "single_peer",
CreatedAt: time.Now().UTC(),
UpdatedTime: time.Now().UTC(),
}, {
Expand All @@ -235,6 +236,7 @@ func mockPolicies() []*po.Schema {
Trigger: &po.Trigger{
Type: po.TriggerTypeEventBased,
},
Scope: "all_peers",
Enabled: true,
CreatedAt: time.Now().UTC(),
UpdatedTime: time.Now().UTC(),
Expand Down
14 changes: 13 additions & 1 deletion src/pkg/p2p/preheat/instance/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package instance

import (
"context"
"encoding/json"

"github.com/goharbor/harbor/src/lib/q"
dao "github.com/goharbor/harbor/src/pkg/p2p/preheat/dao/instance"
Expand Down Expand Up @@ -114,7 +115,18 @@ func (dm *manager) Update(ctx context.Context, inst *provider.Instance, props ..

// Get implements @Manager.Get
func (dm *manager) Get(ctx context.Context, id int64) (*provider.Instance, error) {
return dm.dao.Get(ctx, id)
ins, err := dm.dao.Get(ctx, id)
if err != nil {
return nil, err
}
// mapping auth data to auth info.
if len(ins.AuthData) > 0 {
if err := json.Unmarshal([]byte(ins.AuthData), &ins.AuthInfo); err != nil {
return nil, err
}
}

return ins, nil
}

// Get implements @Manager.GetByName
Expand Down
5 changes: 5 additions & 0 deletions src/pkg/p2p/preheat/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ func (j *Job) Run(ctx job.Context, params job.Parameters) error {
return preheatJobRunningError(errors.Errorf("preheat failed: %s", s))
case provider.PreheatingStatusSuccess:
// Finished
// log the message if received message from provider.
if s.Message != "" {
myLogger.Infof("Preheat job finished, message from provider: \n%s", s.Message)
}

return nil
default:
// do nothing, check again
Expand Down
76 changes: 16 additions & 60 deletions src/pkg/p2p/preheat/models/policy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ package policy

import (
"encoding/json"
"fmt"
"strconv"
"time"

beego_orm "github.com/beego/beego/v2/client/orm"
"github.com/beego/beego/v2/core/validation"

"github.com/goharbor/harbor/src/common/utils"
"github.com/goharbor/harbor/src/lib/errors"
Expand All @@ -32,6 +30,9 @@ func init() {
beego_orm.RegisterModel(&Schema{})
}

// ScopeType represents the preheat scope type.
type ScopeType = string

const (
// Filters:
// Repository : type=Repository value=name text (double star pattern used)
Expand All @@ -57,6 +58,11 @@ const (
TriggerTypeScheduled TriggerType = "scheduled"
// TriggerTypeEventBased represents the event_based trigger type
TriggerTypeEventBased TriggerType = "event_based"

// ScopeTypeSinglePeer represents preheat image to single peer in p2p cluster.
ScopeTypeSinglePeer ScopeType = "single_peer"
// ScopeTypeAllPeers represents preheat image to all peers in p2p cluster.
ScopeTypeAllPeers ScopeType = "all_peers"
)

// Schema defines p2p preheat policy schema
Expand All @@ -72,8 +78,10 @@ type Schema struct {
FiltersStr string `orm:"column(filters)" json:"-"`
Trigger *Trigger `orm:"-" json:"trigger"`
// Use JSON data format (query by trigger type should be supported)
TriggerStr string `orm:"column(trigger)" json:"-"`
Enabled bool `orm:"column(enabled)" json:"enabled"`
TriggerStr string `orm:"column(trigger)" json:"-"`
Enabled bool `orm:"column(enabled)" json:"enabled"`
// Scope decides the preheat scope.
Scope string `orm:"column(scope)" json:"scope"`
CreatedAt time.Time `orm:"column(creation_time)" json:"creation_time"`
UpdatedTime time.Time `orm:"column(update_time)" json:"update_time"`
}
Expand Down Expand Up @@ -127,65 +135,13 @@ func (s *Schema) ValidatePreheatPolicy() error {
WithMessage("invalid cron string for scheduled preheat: %s, error: %v", s.Trigger.Settings.Cron, err)
}
}
return nil
}

// Valid the policy
func (s *Schema) Valid(v *validation.Validation) {
if len(s.Name) == 0 {
_ = v.SetError("name", "cannot be empty")
// validate preheat scope
if s.Scope != "" && s.Scope != ScopeTypeSinglePeer && s.Scope != ScopeTypeAllPeers {
return errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid scope for preheat policy: %s", s.Scope)
}

// valid the filters
for _, filter := range s.Filters {
switch filter.Type {
case FilterTypeRepository, FilterTypeTag, FilterTypeVulnerability:
_, ok := filter.Value.(string)
if !ok {
_ = v.SetError("filters", "the type of filter value isn't string")
break
}
case FilterTypeSignature:
_, ok := filter.Value.(bool)
if !ok {
_ = v.SetError("filers", "the type of signature filter value isn't bool")
break
}
case FilterTypeLabel:
labels, ok := filter.Value.([]interface{})
if !ok {
_ = v.SetError("filters", "the type of label filter value isn't string slice")
break
}
for _, label := range labels {
_, ok := label.(string)
if !ok {
_ = v.SetError("filters", "the type of label filter value isn't string slice")
break
}
}
default:
_ = v.SetError("filters", "invalid filter type")
}
}

// valid trigger
if s.Trigger != nil {
switch s.Trigger.Type {
case TriggerTypeManual, TriggerTypeEventBased:
case TriggerTypeScheduled:
if len(s.Trigger.Settings.Cron) == 0 {
_ = v.SetError("trigger", fmt.Sprintf("the cron string cannot be empty when the trigger type is %s", TriggerTypeScheduled))
} else {
_, err := utils.CronParser().Parse(s.Trigger.Settings.Cron)
if err != nil {
_ = v.SetError("trigger", fmt.Sprintf("invalid cron string for scheduled trigger: %s", s.Trigger.Settings.Cron))
}
}
default:
_ = v.SetError("trigger", "invalid trigger type")
}
}
return nil
}

// Encode encodes policy schema.
Expand Down
98 changes: 11 additions & 87 deletions src/pkg/p2p/preheat/models/policy/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package policy
import (
"testing"

"github.com/beego/beego/v2/core/validation"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -66,92 +64,13 @@ func (p *PolicyTestSuite) TestValidatePreheatPolicy() {
// valid cron string
p.schema.Trigger.Settings.Cron = "0 0 0 1 1 *"
p.NoError(p.schema.ValidatePreheatPolicy())
}

// TestValid tests Valid method.
func (p *PolicyTestSuite) TestValid() {
// policy name is empty, should return error
v := &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "no policy name should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "cannot be empty")

// policy with name but with error filter type
p.schema.Name = "policy-test"
p.schema.Filters = []*Filter{
{
Type: "invalid-type",
},
}
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid filter type should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "invalid filter type")

filterCases := [][]*Filter{
{
{
Type: FilterTypeSignature,
Value: "invalid-value",
},
},

{
{
Type: FilterTypeTag,
Value: true,
},
},
{
{
Type: FilterTypeLabel,
Value: "invalid-value",
},
},
}
// with valid filter type but with error value type
for _, filters := range filterCases {
p.schema.Filters = filters
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid filter value type should return one error")
}

// with valid filter but error trigger type
p.schema.Filters = []*Filter{
{
Type: FilterTypeSignature,
Value: true,
},
}
p.schema.Trigger = &Trigger{
Type: "invalid-type",
}
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid trigger type should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "invalid trigger type")

// with valid filter but error trigger value
p.schema.Trigger = &Trigger{
Type: TriggerTypeScheduled,
}
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid trigger value should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "the cron string cannot be empty")
// with invalid cron
p.schema.Trigger.Settings.Cron = "1111111111111"
v = &validation.Validation{}
p.schema.Valid(v)
require.True(p.T(), v.HasErrors(), "invalid trigger value should return one error")
require.Contains(p.T(), v.Errors[0].Error(), "invalid cron string for scheduled trigger")

// all is well
p.schema.Trigger.Settings.Cron = "0/12 * * * * *"
v = &validation.Validation{}
p.schema.Valid(v)
require.False(p.T(), v.HasErrors(), "should return nil error")
// invalid preheat scope
p.schema.Scope = "invalid scope"
p.Error(p.schema.ValidatePreheatPolicy())
// valid preheat scope
p.schema.Scope = "single_peer"
p.NoError(p.schema.ValidatePreheatPolicy())
}

// TestDecode tests decode.
Expand All @@ -167,11 +86,14 @@ func (p *PolicyTestSuite) TestDecode() {
Trigger: nil,
TriggerStr: "{\"type\":\"event_based\",\"trigger_setting\":{\"cron\":\"\"}}",
Enabled: false,
Scope: "all_peers",
}
p.NoError(s.Decode())
p.Len(s.Filters, 3)
p.NotNil(s.Trigger)

p.Equal(ScopeTypeAllPeers, s.Scope)

// invalid filter or trigger
s.FiltersStr = ""
s.TriggerStr = "invalid"
Expand Down Expand Up @@ -210,8 +132,10 @@ func (p *PolicyTestSuite) TestEncode() {
},
TriggerStr: "",
Enabled: false,
Scope: "single_peer",
}
p.NoError(s.Encode())
p.Equal(`[{"type":"repository","value":"**"},{"type":"tag","value":"**"},{"type":"label","value":"test"}]`, s.FiltersStr)
p.Equal(`{"type":"event_based","trigger_setting":{}}`, s.TriggerStr)
p.Equal(ScopeTypeSinglePeer, s.Scope)
}
Loading

0 comments on commit c97253f

Please sign in to comment.