diff --git a/LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE b/LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE new file mode 100644 index 0000000000..7adc86eb25 --- /dev/null +++ b/LICENSES/vendor/github.com/agiledragon/gomonkey/v2/LICENSE @@ -0,0 +1,25 @@ += vendor/github.com/agiledragon/gomonkey/v2 licensed under: = + +MIT License + +Copyright (c) 2018 Zhang Xiaolong + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + += vendor/github.com/agiledragon/gomonkey/v2/LICENSE 9bd88aaa83a25e41d110ebfa6571e8cf diff --git a/go.mod b/go.mod index 017baf1d72..e44663b43c 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module volcano.sh/volcano go 1.17 require ( + github.com/agiledragon/gomonkey/v2 v2.9.0 github.com/fsnotify/fsnotify v1.4.9 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 github.com/hashicorp/go-multierror v1.0.0 diff --git a/go.sum b/go.sum index 61e86db46e..7785ed8aa4 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/PuerkitoBio/purell v1.1.1 h1:WEQqlqaGbrPkxLJWfBwQmfEAE1Z7ONdDLqrN38tN github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV/sSk/8dngufqelfh6jnri85riMAaF/M= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/agiledragon/gomonkey/v2 v2.9.0 h1:PDiKKybR596O6FHW+RVSG0Z7uGCBNbmbUXh3uCNQ7Hc= +github.com/agiledragon/gomonkey/v2 v2.9.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index d2d0e5e408..907858b929 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -17,14 +17,20 @@ limitations under the License. package allocate import ( + "context" + "fmt" + "k8s.io/apimachinery/pkg/api/resource" "reflect" "testing" - "time" + "volcano.sh/volcano/pkg/scheduler/plugins/gang" + "volcano.sh/volcano/pkg/scheduler/plugins/priority" + + "github.com/agiledragon/gomonkey/v2" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" - schedulingv1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/cmd/scheduler/app/options" "volcano.sh/volcano/pkg/scheduler/api" @@ -37,6 +43,13 @@ import ( ) func TestAllocate(t *testing.T) { + var tmp *cache.SchedulerCache + patches := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "AddBindTask", func(scCache *cache.SchedulerCache, task *api.TaskInfo) error { + scCache.Binder.Bind(nil, []*api.TaskInfo{task}) + return nil + }) + defer patches.Reset() + framework.RegisterPluginBuilder("drf", drf.New) framework.RegisterPluginBuilder("proportion", proportion.New) @@ -224,6 +237,7 @@ func TestAllocate(t *testing.T) { Recorder: record.NewFakeRecorder(100), } + for _, node := range test.nodes { schedulerCache.AddNode(node) } @@ -261,17 +275,198 @@ func TestAllocate(t *testing.T) { allocate.Execute(ssn) - for i := 0; i < len(test.expected); i++ { - select { - case <-binder.Channel: - case <-time.After(3 * time.Second): - t.Errorf("Failed to get binding request.") - } - } - if !reflect.DeepEqual(test.expected, binder.Binds) { t.Errorf("expected: %v, got %v ", test.expected, binder.Binds) } }) } } + +func TestAllocateWithDynamicPVC(t *testing.T) { + var tmp *cache.SchedulerCache + patches := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "AddBindTask", func(scCache *cache.SchedulerCache, task *api.TaskInfo) error { + scCache.VolumeBinder.BindVolumes(task, task.PodVolumes) + scCache.Binder.Bind(nil, []*api.TaskInfo{task}) + return nil + }) + defer patches.Reset() + + framework.RegisterPluginBuilder("gang", gang.New) + framework.RegisterPluginBuilder("priority", priority.New) + + options.ServerOpts = &options.ServerOption{ + MinNodesToFind: 100, + MinPercentageOfNodesToFind: 5, + PercentageOfNodesToFind: 100, + } + + defer framework.CleanupPluginBuilders() + + queue := &schedulingv1.Queue{ + ObjectMeta: metav1.ObjectMeta{ + Name: "c1", + }, + Spec: schedulingv1.QueueSpec{ + Weight: 1, + }, + } + pg := &schedulingv1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1", + Namespace: "c1", + }, + Spec: schedulingv1.PodGroupSpec{ + Queue: "c1", + MinMember: 2, + }, + Status: schedulingv1.PodGroupStatus{ + Phase: schedulingv1.PodGroupInqueue, + }, + } + kubeClient := fake.NewSimpleClientset() + pvc, pv, sc := util.BuildDynamicPVC("c1", "pvc1", v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("1Gi"), + }) + kubeClient.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}) + kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{}) + pvcs := []*v1.PersistentVolumeClaim{pvc} + for i := 1; i <= 5; i++ { + tmp := pvc.DeepCopy() + tmp.Name = fmt.Sprintf("pvc%d", i) + pvcs = append(pvcs, tmp) + kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), tmp, metav1.CreateOptions{}) + } + fakeVolumeBinder := util.NewFakeVolumeBinder(kubeClient) + + allocate := New() + + tests := []struct { + name string + pods []*v1.Pod + nodes []*v1.Node + pvs []*v1.PersistentVolume + expectedBind map[string]string + expectedActions map[string][]string + }{ + { + name: "resource not match", + pods: []*v1.Pod{ + util.BuildPodWithPVC("c1", "p1", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvc, "pg1", make(map[string]string), make(map[string]string)), + util.BuildPodWithPVC("c1", "p2", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvcs[2], "pg1", make(map[string]string), make(map[string]string)), + }, + nodes: []*v1.Node{ + util.BuildNode("n1", util.BuildResourceList("1", "4Gi"), make(map[string]string)), + }, + expectedBind: map[string]string{}, + expectedActions: map[string][]string{ + "c1/p1": {"GetPodVolumes", "AllocateVolumes", "RevertVolumes"}, + }, + }, + { + name: "node changed with enough resource", + pods: []*v1.Pod{ + util.BuildPodWithPVC("c1", "p1", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvc, "pg1", make(map[string]string), make(map[string]string)), + util.BuildPodWithPVC("c1", "p2", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvcs[2], "pg1", make(map[string]string), make(map[string]string)), + }, + nodes: []*v1.Node{ + util.BuildNode("n2", util.BuildResourceList("2", "4Gi"), make(map[string]string)), + }, + expectedBind: map[string]string{ + "c1/p1": "n2", + "c1/p2": "n2", + }, + expectedActions: map[string][]string{ + "c1/p1": {"GetPodVolumes", "AllocateVolumes", "DynamicProvisions"}, + "c1/p2": {"GetPodVolumes", "AllocateVolumes", "DynamicProvisions"}, + }, + }, + { + name: "pvc with matched pv", + pods: []*v1.Pod{ + util.BuildPodWithPVC("c1", "p3", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvcs[3], "pg1", make(map[string]string), make(map[string]string)), + util.BuildPodWithPVC("c1", "p4", "", v1.PodPending, util.BuildResourceList("1", "1G"), pvcs[4], "pg1", make(map[string]string), make(map[string]string)), + }, + pvs: []*v1.PersistentVolume{ + pv, + }, + nodes: []*v1.Node{ + util.BuildNode("n3", util.BuildResourceList("2", "4Gi"), make(map[string]string)), + }, + expectedBind: map[string]string{ + "c1/p3": "n3", + "c1/p4": "n3", + }, + expectedActions: map[string][]string{ + "c1/p3": {"GetPodVolumes", "AllocateVolumes", "StaticBindings"}, + "c1/p4": {"GetPodVolumes", "AllocateVolumes", "DynamicProvisions"}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + binder := &util.FakeBinder{ + Binds: map[string]string{}, + Channel: make(chan string), + } + schedulerCache := &cache.SchedulerCache{ + Nodes: make(map[string]*api.NodeInfo), + Jobs: make(map[api.JobID]*api.JobInfo), + Queues: make(map[api.QueueID]*api.QueueInfo), + Binder: binder, + StatusUpdater: &util.FakeStatusUpdater{}, + VolumeBinder: fakeVolumeBinder, + Recorder: record.NewFakeRecorder(100), + } + + schedulerCache.AddQueueV1beta1(queue) + schedulerCache.AddPodGroupV1beta1(pg) + for i, pod := range test.pods { + priority := int32(-i) + pod.Spec.Priority = &priority + schedulerCache.AddPod(pod) + } + for _, pv := range test.pvs { + kubeClient.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}) + } + for _, node := range test.nodes { + schedulerCache.AddNode(node) + } + + trueValue := true + ssn := framework.OpenSession(schedulerCache, []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: "priority", + EnabledJobReady: &trueValue, + EnabledPredicate: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + }, + { + Name: "gang", + EnabledJobReady: &trueValue, + EnabledPredicate: &trueValue, + EnabledJobPipelined: &trueValue, + EnabledTaskOrder: &trueValue, + }, + }, + }, + }, nil) + defer framework.CloseSession(ssn) + + allocate.Execute(ssn) + for _, pv := range test.pvs { + kubeClient.CoreV1().PersistentVolumes().Delete(context.TODO(), pv.Name, metav1.DeleteOptions{}) + } + if !reflect.DeepEqual(test.expectedBind, binder.Binds) { + t.Errorf("expected: %v, got %v ", test.expectedBind, binder.Binds) + } + if !reflect.DeepEqual(test.expectedActions, fakeVolumeBinder.Actions) { + t.Errorf("expected: %v, got %v ", test.expectedActions, fakeVolumeBinder.Actions) + } + fakeVolumeBinder.Actions = make(map[string][]string) + }) + } +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index f1612f8c08..ef2ec57b19 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -19,6 +19,9 @@ package cache import ( "context" "fmt" + "os" + "strconv" + "strings" "sync" "time" @@ -53,6 +56,7 @@ import ( vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" "volcano.sh/volcano/cmd/scheduler/app/options" schedulingapi "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/metrics" ) func init() { @@ -111,6 +115,10 @@ type SchedulerCache struct { deletedJobs workqueue.RateLimitingInterface informerFactory informers.SharedInformerFactory + + BindFlowChannel chan *schedulingapi.TaskInfo + bindCache []*schedulingapi.TaskInfo + batchNum int } type defaultBinder struct { @@ -118,20 +126,33 @@ type defaultBinder struct { } //Bind will send bind request to api server -func (db *defaultBinder) Bind(p *v1.Pod, hostname string) error { - if err := db.kubeclient.CoreV1().Pods(p.Namespace).Bind(context.TODO(), - &v1.Binding{ - ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: p.Annotations}, - Target: v1.ObjectReference{ - Kind: "Node", - Name: hostname, +func (db *defaultBinder) Bind(kubeClient *kubernetes.Clientset, tasks []*schedulingapi.TaskInfo) (error, []*schedulingapi.TaskInfo) { + var errTasks []*schedulingapi.TaskInfo + for _, task := range tasks { + p := task.Pod + if err := kubeClient.CoreV1().Pods(p.Namespace).Bind(context.TODO(), + &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: p.Annotations}, + Target: v1.ObjectReference{ + Kind: "Node", + Name: task.NodeName, + }, }, - }, - metav1.CreateOptions{}); err != nil { - klog.Errorf("Failed to bind pod <%v/%v>: %#v", p.Namespace, p.Name, err) - return err + metav1.CreateOptions{}); err != nil { + klog.Errorf("Failed to bind pod <%v/%v> to node %s : %#v", p.Namespace, p.Name, task.NodeName, err) + errTasks = append(errTasks, task) + } } - return nil + + if len(errTasks) > 0 { + return fmt.Errorf("failed to bind pods"), errTasks + } + + return nil, nil +} + +func NewBinder() *defaultBinder { + return &defaultBinder{} } type defaultEvictor struct { @@ -247,15 +268,38 @@ func (dvb *defaultVolumeBinder) AllocateVolumes(task *schedulingapi.TaskInfo, ho return err } +// RevertVolumes clean cache generated by AllocateVolumes +func (dvb *defaultVolumeBinder) RevertVolumes(task *schedulingapi.TaskInfo, podVolumes *volumescheduling.PodVolumes) { + if podVolumes != nil { + klog.Infof("Revert assumed volumes for task %v/%v on node %s", task.Namespace, task.Name, task.NodeName) + dvb.volumeBinder.RevertAssumedPodVolumes(podVolumes) + task.VolumeReady = false + task.PodVolumes = nil + } +} + // GetPodVolumes get pod volume on the host func (dvb *defaultVolumeBinder) GetPodVolumes(task *schedulingapi.TaskInfo, node *v1.Node) (podVolumes *volumescheduling.PodVolumes, err error) { - boundClaims, claimsToBind, _, err := dvb.volumeBinder.GetPodVolumes(task.Pod) + boundClaims, claimsToBind, unboundClaimsImmediate, err := dvb.volumeBinder.GetPodVolumes(task.Pod) if err != nil { return nil, err } + if len(unboundClaimsImmediate) > 0 { + return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims") + } + + podVolumes, reasons, err := dvb.volumeBinder.FindPodVolumes(task.Pod, boundClaims, claimsToBind, node) + if err != nil { + return nil, err + } else if len(reasons) > 0 { + var errors []string + for _, reason := range reasons { + errors = append(errors, string(reason)) + } + return nil, fmt.Errorf(strings.Join(errors, ",")) + } - podVolumes, _, err = dvb.volumeBinder.FindPodVolumes(task.Pod, boundClaims, claimsToBind, node) return podVolumes, err } @@ -318,8 +362,15 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")}) sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: schedulerName}) - sc.Binder = &defaultBinder{ - kubeclient: sc.kubeClient, + sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000) + sc.Binder = GetBindMethod() + + var batchNum int + batchNum, err = strconv.Atoi(os.Getenv("BATCH_BIND_NUM")) + if err == nil && batchNum > 0 { + sc.batchNum = batchNum + } else { + sc.batchNum = 1 } sc.Evictor = &defaultEvictor{ @@ -448,6 +499,8 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) { // Cleanup jobs. go wait.Until(sc.processCleanupJob, 0, stopCh) + + go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh) } // WaitForCacheSync sync the cache with the api server @@ -545,60 +598,24 @@ func (sc *SchedulerCache) Evict(taskInfo *schedulingapi.TaskInfo, reason string) } // Bind binds task to the target host. -func (sc *SchedulerCache) Bind(taskInfo *schedulingapi.TaskInfo, hostname string) error { - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - job, task, err := sc.findJobAndTask(taskInfo) - - if err != nil { - return err - } - - node, found := sc.Nodes[hostname] - if !found { - return fmt.Errorf("failed to bind Task %v to host %v, host does not exist", - task.UID, hostname) - } - - originalStatus := task.Status - if err := job.UpdateTaskStatus(task, schedulingapi.Binding); err != nil { - return err - } - - // Add task to the node. - if err := node.AddTask(task); err != nil { - // After failing to update task to a node we need to revert task status from Releasing, - // otherwise task might be stuck in the Releasing state indefinitely. - if err := job.UpdateTaskStatus(task, originalStatus); err != nil { - klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+ - "from %s to %s after failing to update Task on Node <%s>: %v", - task.Namespace, task.Name, task.Status, originalStatus, node.Name, err) - sc.resyncTask(task) - } - return err - } - - p := task.Pod - go func() { - taskID := schedulingapi.PodKey(p) - - sc.Lock() - node.AddBindingTask(taskID) - sc.Unlock() - - defer func() { - sc.Lock() - node.RemoveBindingTask(taskID) - sc.Unlock() - }() - - if err := sc.Binder.Bind(p, hostname); err != nil { - sc.resyncTask(task) +func (sc *SchedulerCache) Bind(tasks []*schedulingapi.TaskInfo) error { + go func(taskArray []*schedulingapi.TaskInfo) { + tmp := time.Now() + err, errTasks := sc.Binder.Bind(sc.kubeClient, taskArray) + if err == nil { + klog.V(3).Infof("bind ok, latency %v", time.Since(tmp)) + for _, task := range tasks { + sc.Recorder.Eventf(task.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", + task.Namespace, task.Name, task.NodeName) + } } else { - sc.Recorder.Eventf(p, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", p.Namespace, p.Name, hostname) + for _, task := range errTasks { + klog.V(2).Infof("resyncTask task %s", task.Name) + sc.VolumeBinder.RevertVolumes(task, task.PodVolumes) + sc.resyncTask(task) + } } - }() + }(tasks) return nil } @@ -618,6 +635,11 @@ func (sc *SchedulerCache) BindVolumes(task *schedulingapi.TaskInfo, podVolumes * return sc.VolumeBinder.BindVolumes(task, podVolumes) } +// RevertVolumes clean cache generated by AllocateVolumes +func (sc *SchedulerCache) RevertVolumes(task *schedulingapi.TaskInfo, podVolumes *volumescheduling.PodVolumes) { + sc.VolumeBinder.RevertVolumes(task, podVolumes) +} + // Client returns the kubernetes clientSet func (sc *SchedulerCache) Client() kubernetes.Interface { return sc.kubeClient @@ -737,6 +759,95 @@ func (sc *SchedulerCache) processResyncTask() { } } +func (sc *SchedulerCache) AddBindTask(taskInfo *schedulingapi.TaskInfo) error { + klog.V(5).Infof("add bind task %v/%v", taskInfo.Namespace, taskInfo.Name) + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + job, task, err := sc.findJobAndTask(taskInfo) + if err != nil { + return err + } + + node, found := sc.Nodes[taskInfo.NodeName] + if !found { + return fmt.Errorf("failed to bind Task %v to host %v, host does not exist", + task.UID, taskInfo.NodeName) + } + + originalStatus := task.Status + if err := job.UpdateTaskStatus(task, schedulingapi.Binding); err != nil { + return err + } + + // Add task to the node. + if err := node.AddTask(task); err != nil { + // After failing to update task to a node we need to revert task status from Releasing, + // otherwise task might be stuck in the Releasing state indefinitely. + if err := job.UpdateTaskStatus(task, originalStatus); err != nil { + klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+ + "from %s to %s after failing to update Task on Node <%s>: %v", + task.Namespace, task.Name, task.Status, originalStatus, node.Name, err) + sc.resyncTask(task) + } + return err + } + + sc.BindFlowChannel <- taskInfo + + return nil +} + +func (sc *SchedulerCache) processBindTask() { + for { + select { + case taskInfo, ok := <-sc.BindFlowChannel: + if !ok { + return + } + + sc.bindCache = append(sc.bindCache, taskInfo) + if len(sc.bindCache) == sc.batchNum { + sc.BindTask() + } + } + + if len(sc.BindFlowChannel) == 0 { + break + } + } + + if len(sc.bindCache) == 0 { + return + } + + sc.BindTask() +} + +func (sc *SchedulerCache) BindTask() { + klog.V(5).Infof("batch bind task count %d", len(sc.bindCache)) + for _, task := range sc.bindCache { + if err := sc.VolumeBinder.BindVolumes(task, task.PodVolumes); err != nil { + klog.Errorf("task %s/%s bind Volumes failed: %#v", task.Namespace, task.Name, err) + sc.VolumeBinder.RevertVolumes(task, task.PodVolumes) + sc.resyncTask(task) + return + } + } + + bindTasks := make([]*schedulingapi.TaskInfo, len(sc.bindCache)) + copy(bindTasks, sc.bindCache) + if err := sc.Bind(bindTasks); err != nil { + return + } + + for _, task := range sc.bindCache { + metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time)) + } + + sc.bindCache = sc.bindCache[0:0] + return +} + // Snapshot returns the complete snapshot of the cluster from cache func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo { sc.Mutex.Lock() diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 91f3ba480d..c7d9f2c0a2 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -146,6 +146,7 @@ func TestSchedulerCache_Bind_NodeWithSufficientResources(t *testing.T) { Binds: map[string]string{}, Channel: make(chan string), }, + BindFlowChannel: make(chan *api.TaskInfo, 5000), } pod := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), @@ -160,8 +161,8 @@ func TestSchedulerCache_Bind_NodeWithSufficientResources(t *testing.T) { if err := cache.addTask(task); err != nil { t.Errorf("failed to add task %v", err) } - - err := cache.Bind(task, "n1") + task.NodeName = "n1" + err := cache.AddBindTask(task) if err != nil { t.Errorf("failed to bind pod to node: %v", err) } @@ -177,6 +178,7 @@ func TestSchedulerCache_Bind_NodeWithInsufficientResources(t *testing.T) { Binds: map[string]string{}, Channel: make(chan string), }, + BindFlowChannel: make(chan *api.TaskInfo, 5000), } pod := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("5000m", "50G"), @@ -193,10 +195,11 @@ func TestSchedulerCache_Bind_NodeWithInsufficientResources(t *testing.T) { t.Errorf("failed to add task %v", err) } + task.NodeName = "n1" taskBeforeBind := task.Clone() nodeBeforeBind := cache.Nodes["n1"].Clone() - err := cache.Bind(task, "n1") + err := cache.AddBindTask(task) if err == nil { t.Errorf("expected bind to fail for node with insufficient resources") } diff --git a/pkg/scheduler/cache/factory.go b/pkg/scheduler/cache/factory.go new file mode 100644 index 0000000000..63625f1037 --- /dev/null +++ b/pkg/scheduler/cache/factory.go @@ -0,0 +1,17 @@ +package cache + +// bindMethodMap Binder management +var bindMethodMap Binder + +// RegisterBindMethod register Bind Method +func RegisterBindMethod(binder Binder) { + bindMethodMap = binder +} + +func GetBindMethod() Binder { + return bindMethodMap +} + +func init() { + RegisterBindMethod(NewBinder()) +} diff --git a/pkg/scheduler/cache/interface.go b/pkg/scheduler/cache/interface.go index 84f555e9c8..4882272ede 100644 --- a/pkg/scheduler/cache/interface.go +++ b/pkg/scheduler/cache/interface.go @@ -38,9 +38,9 @@ type Cache interface { // WaitForCacheSync waits for all cache synced WaitForCacheSync(stopCh <-chan struct{}) bool - // Bind binds Task to the target host. + // AddBindTask binds Task to the target host. // TODO(jinzhej): clean up expire Tasks. - Bind(task *api.TaskInfo, hostname string) error + AddBindTask(task *api.TaskInfo) error // Evict evicts the task to release resources. Evict(task *api.TaskInfo, reason string) error @@ -52,6 +52,7 @@ type Cache interface { // UpdateJobStatus puts job in backlog for a while. UpdateJobStatus(job *api.JobInfo, updatePG bool) (*api.JobInfo, error) + // GetPodVolumes get pod volume on the host GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*scheduling.PodVolumes, error) // AllocateVolumes allocates volume on the host to the task @@ -60,6 +61,9 @@ type Cache interface { // BindVolumes binds volumes to the task BindVolumes(task *api.TaskInfo, volumes *scheduling.PodVolumes) error + // RevertVolumes clean cache generated by AllocateVolumes + RevertVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes) + // Client returns the kubernetes clientSet, which can be used by plugins Client() kubernetes.Interface @@ -73,13 +77,14 @@ type Cache interface { // VolumeBinder interface for allocate and bind volumes type VolumeBinder interface { GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*scheduling.PodVolumes, error) + RevertVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes) AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *scheduling.PodVolumes) error BindVolumes(task *api.TaskInfo, podVolumes *scheduling.PodVolumes) error } //Binder interface for binding task and hostname type Binder interface { - Bind(task *v1.Pod, hostname string) error + Bind(kubeClient *kubernetes.Clientset, tasks []*api.TaskInfo) (error, []*api.TaskInfo) } // Evictor interface for evict pods diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index ac492c9244..b3237d3294 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -27,8 +27,6 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "k8s.io/klog" - volumescheduling "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" - "volcano.sh/apis/pkg/apis/scheduling" schedulingscheme "volcano.sh/apis/pkg/apis/scheduling/scheme" vcv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" @@ -272,7 +270,7 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { } //Allocate the task to the node in the session -func (ssn *Session) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) error { +func (ssn *Session) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) { podVolumes, err := ssn.cache.GetPodVolumes(task, nodeInfo.Node) if err != nil { return err @@ -282,6 +280,11 @@ func (ssn *Session) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) error { if err := ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil { return err } + defer func() { + if err != nil { + ssn.cache.RevertVolumes(task, podVolumes) + } + }() task.Pod.Spec.NodeName = hostname task.PodVolumes = podVolumes @@ -327,23 +330,21 @@ func (ssn *Session) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) error { if ssn.JobReady(job) { for _, task := range job.TaskStatusIndex[api.Allocated] { - if err := ssn.dispatch(task, podVolumes); err != nil { + if err := ssn.dispatch(task); err != nil { klog.Errorf("Failed to dispatch task <%v/%v>: %v", task.Namespace, task.Name, err) return err } } + } else { + ssn.cache.RevertVolumes(task, podVolumes) } return nil } -func (ssn *Session) dispatch(task *api.TaskInfo, volumes *volumescheduling.PodVolumes) error { - if err := ssn.cache.BindVolumes(task, volumes); err != nil { - return err - } - - if err := ssn.cache.Bind(task, task.NodeName); err != nil { +func (ssn *Session) dispatch(task *api.TaskInfo) error { + if err := ssn.cache.AddBindTask(task); err != nil { return err } diff --git a/pkg/scheduler/framework/statement.go b/pkg/scheduler/framework/statement.go index 3956fdf8c9..48fcbaabec 100644 --- a/pkg/scheduler/framework/statement.go +++ b/pkg/scheduler/framework/statement.go @@ -226,7 +226,7 @@ func (s *Statement) unpipeline(task *api.TaskInfo) error { } // Allocate the task to node -func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) error { +func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) (err error) { podVolumes, err := s.ssn.cache.GetPodVolumes(task, nodeInfo.Node) if err != nil { return err @@ -236,6 +236,11 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) error { if err := s.ssn.cache.AllocateVolumes(task, hostname, podVolumes); err != nil { return err } + defer func() { + if err != nil { + s.ssn.cache.RevertVolumes(task, podVolumes) + } + }() task.Pod.Spec.NodeName = hostname task.PodVolumes = podVolumes @@ -289,15 +294,10 @@ func (s *Statement) Allocate(task *api.TaskInfo, nodeInfo *api.NodeInfo) error { } func (s *Statement) allocate(task *api.TaskInfo) error { - if err := s.ssn.cache.BindVolumes(task, task.PodVolumes); err != nil { + if err := s.ssn.cache.AddBindTask(task); err != nil { return err } - if err := s.ssn.cache.Bind(task, task.NodeName); err != nil { - return err - } - - // Update status in session if job, found := s.ssn.Jobs[task.Job]; found { if err := job.UpdateTaskStatus(task, api.Binding); err != nil { klog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", @@ -316,6 +316,8 @@ func (s *Statement) allocate(task *api.TaskInfo) error { // unallocate the pod for task func (s *Statement) unallocate(task *api.TaskInfo) error { + s.ssn.cache.RevertVolumes(task, task.PodVolumes) + // Update status in session job, found := s.ssn.Jobs[task.Job] if found { @@ -388,6 +390,7 @@ func (s *Statement) Commit() { case Allocate: err := s.allocate(op.task) if err != nil { + s.ssn.cache.RevertVolumes(op.task, op.task.PodVolumes) klog.Errorf("Failed to allocate task: for %s", err.Error()) } } diff --git a/pkg/scheduler/plugins/drf/hdrf_test.go b/pkg/scheduler/plugins/drf/hdrf_test.go index 7979ad5d3f..172b05361e 100644 --- a/pkg/scheduler/plugins/drf/hdrf_test.go +++ b/pkg/scheduler/plugins/drf/hdrf_test.go @@ -3,8 +3,11 @@ package drf import ( "flag" "fmt" + "reflect" "testing" + "github.com/agiledragon/gomonkey/v2" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" @@ -48,6 +51,13 @@ func TestHDRF(t *testing.T) { klog.InitFlags(nil) flag.Set("v", "4") flag.Set("alsologtostderr", "true") + var tmp *cache.SchedulerCache + patches := gomonkey.ApplyMethod(reflect.TypeOf(tmp), "AddBindTask", func(scCache *cache.SchedulerCache, task *api.TaskInfo) error { + scCache.Binder.Bind(nil, []*api.TaskInfo{task}) + return nil + }) + defer patches.Reset() + s := options.NewServerOption() s.MinNodesToFind = 100 s.PercentageOfNodesToFind = 100 diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 7268974a30..54d3904515 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -84,6 +84,7 @@ func (pc *Scheduler) Run(stopCh <-chan struct{}) { // Start cache for policy. go pc.cache.Run(stopCh) pc.cache.WaitForCacheSync(stopCh) + klog.V(2).Infof("scheduler completes Initialization and start to run") go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh) } diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index 319dd37730..099841f2ff 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -17,16 +17,20 @@ limitations under the License. package util import ( + "context" "fmt" - - "sync" + storagev1 "k8s.io/api/storage/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + "time" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - + "k8s.io/client-go/kubernetes" volumescheduling "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding" + "sync" schedulingv2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -92,24 +96,116 @@ func BuildPod(namespace, name, nodename string, p v1.PodPhase, req v1.ResourceLi } } +// BuildPodWithPVC builts Pod object with pvc volume +func BuildPodWithPVC(namespace, name, nodename string, p v1.PodPhase, req v1.ResourceList, pvc *v1.PersistentVolumeClaim, groupName string, labels map[string]string, selector map[string]string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), + Name: name, + Namespace: namespace, + Labels: labels, + Annotations: map[string]string{ + schedulingv2.KubeGroupNameAnnotationKey: groupName, + }, + }, + Status: v1.PodStatus{ + Phase: p, + }, + Spec: v1.PodSpec{ + NodeName: nodename, + NodeSelector: selector, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: req, + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: pvc.Name, + MountPath: "/data", + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: pvc.Name, + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.Name, + }, + }, + }, + }, + }, + } +} + +// BuildDynamicPVC create pv pvc and storage class +func BuildDynamicPVC(namespace, name string, req v1.ResourceList) (*v1.PersistentVolumeClaim, *v1.PersistentVolume, *storagev1.StorageClass) { + tmp := v1.PersistentVolumeReclaimDelete + tmp2 := storagev1.VolumeBindingWaitForFirstConsumer + sc := &storagev1.StorageClass{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), + ResourceVersion: "1", + Name: name, + }, + Provisioner: name, + ReclaimPolicy: &tmp, + VolumeBindingMode: &tmp2, + } + tmp3 := v1.PersistentVolumeFilesystem + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), + ResourceVersion: "1", + Namespace: namespace, + Name: name, + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: req, + }, + StorageClassName: &sc.Name, + VolumeMode: &tmp3, + }, + } + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(fmt.Sprintf("%v-%v", namespace, name)), + ResourceVersion: "1", + Name: name, + }, + Spec: v1.PersistentVolumeSpec{ + StorageClassName: sc.Name, + Capacity: req, + VolumeMode: &tmp3, + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + }, + Status: v1.PersistentVolumeStatus{ + Phase: v1.VolumeAvailable, + }, + } + return pvc, pv, sc +} + // FakeBinder is used as fake binder type FakeBinder struct { - sync.Mutex Binds map[string]string Channel chan string } // Bind used by fake binder struct to bind pods -func (fb *FakeBinder) Bind(p *v1.Pod, hostname string) error { - fb.Lock() - defer fb.Unlock() - - key := fmt.Sprintf("%v/%v", p.Namespace, p.Name) - fb.Binds[key] = hostname - - fb.Channel <- key +func (fb *FakeBinder) Bind(kubeClient *kubernetes.Clientset, tasks []*api.TaskInfo) (error, []*api.TaskInfo) { + for _, p := range tasks { + key := fmt.Sprintf("%v/%v", p.Namespace, p.Name) + fb.Binds[key] = p.NodeName + } - return nil + return nil, nil } // FakeEvictor is used as fake evictor @@ -158,19 +254,109 @@ func (ftsu *FakeStatusUpdater) UpdatePodGroup(pg *api.PodGroup) (*api.PodGroup, // FakeVolumeBinder is used as fake volume binder type FakeVolumeBinder struct { + volumeBinder volumescheduling.SchedulerVolumeBinder + Actions map[string][]string +} + +// NewFakeVolumeBinder create fake volume binder with kubeclient +func NewFakeVolumeBinder(kubeClient kubernetes.Interface) *FakeVolumeBinder { + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + podInformer := informerFactory.Core().V1().Pods() + pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() + pvInformer := informerFactory.Core().V1().PersistentVolumes() + scInformer := informerFactory.Storage().V1().StorageClasses() + nodeInformer := informerFactory.Core().V1().Nodes() + csiNodeInformer := informerFactory.Storage().V1().CSINodes() + + go podInformer.Informer().Run(context.TODO().Done()) + go pvcInformer.Informer().Run(context.TODO().Done()) + go pvInformer.Informer().Run(context.TODO().Done()) + go scInformer.Informer().Run(context.TODO().Done()) + go nodeInformer.Informer().Run(context.TODO().Done()) + go csiNodeInformer.Informer().Run(context.TODO().Done()) + + cache.WaitForCacheSync(context.TODO().Done(), podInformer.Informer().HasSynced, + pvcInformer.Informer().HasSynced, + pvInformer.Informer().HasSynced, + scInformer.Informer().HasSynced, + nodeInformer.Informer().HasSynced, + csiNodeInformer.Informer().HasSynced) + return &FakeVolumeBinder{ + volumeBinder: volumescheduling.NewVolumeBinder( + kubeClient, + podInformer, + nodeInformer, + csiNodeInformer, + pvcInformer, + pvInformer, + scInformer, + nil, + 30*time.Second, + ), + Actions: make(map[string][]string), + } } // AllocateVolumes is a empty function func (fvb *FakeVolumeBinder) AllocateVolumes(task *api.TaskInfo, hostname string, podVolumes *volumescheduling.PodVolumes) error { - return nil + if fvb.volumeBinder == nil { + return nil + } + _, err := fvb.volumeBinder.AssumePodVolumes(task.Pod, hostname, podVolumes) + + key := fmt.Sprintf("%s/%s", task.Namespace, task.Name) + fvb.Actions[key] = append(fvb.Actions[key], "AllocateVolumes") + return err } // BindVolumes is a empty function func (fvb *FakeVolumeBinder) BindVolumes(task *api.TaskInfo, podVolumes *volumescheduling.PodVolumes) error { + if fvb.volumeBinder == nil { + return nil + } + + key := fmt.Sprintf("%s/%s", task.Namespace, task.Name) + if len(podVolumes.DynamicProvisions) > 0 { + fvb.Actions[key] = append(fvb.Actions[key], "DynamicProvisions") + } + if len(podVolumes.StaticBindings) > 0 { + fvb.Actions[key] = append(fvb.Actions[key], "StaticBindings") + } return nil } // GetPodVolumes is a empty function func (fvb *FakeVolumeBinder) GetPodVolumes(task *api.TaskInfo, node *v1.Node) (*volumescheduling.PodVolumes, error) { - return nil, nil + if fvb.volumeBinder == nil { + return nil, nil + } + key := fmt.Sprintf("%s/%s", task.Namespace, task.Name) + fvb.Actions[key] = []string{"GetPodVolumes"} + boundClaims, claimsToBind, unboundClaimsImmediate, err := fvb.volumeBinder.GetPodVolumes(task.Pod) + if err != nil { + return nil, err + } + if len(unboundClaimsImmediate) > 0 { + return nil, fmt.Errorf("pod has unbound immediate PersistentVolumeClaims") + } + + podVolumes, reasons, err := fvb.volumeBinder.FindPodVolumes(task.Pod, boundClaims, claimsToBind, node) + if err != nil { + return nil, err + } else if len(reasons) > 0 { + return nil, fmt.Errorf("%v", reasons[0]) + } + return podVolumes, err +} + +// RevertVolumes is a empty function +func (fvb *FakeVolumeBinder) RevertVolumes(task *api.TaskInfo, podVolumes *volumescheduling.PodVolumes) { + if fvb.volumeBinder == nil { + return + } + key := fmt.Sprintf("%s/%s", task.Namespace, task.Name) + fvb.Actions[key] = append(fvb.Actions[key], "RevertVolumes") + if podVolumes != nil { + fvb.volumeBinder.RevertAssumedPodVolumes(podVolumes) + } } diff --git a/vendor/github.com/agiledragon/gomonkey/v2/LICENSE b/vendor/github.com/agiledragon/gomonkey/v2/LICENSE new file mode 100644 index 0000000000..d75dc90e65 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Zhang Xiaolong + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/agiledragon/gomonkey/v2/README.md b/vendor/github.com/agiledragon/gomonkey/v2/README.md new file mode 100644 index 0000000000..b3321ba07e --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/README.md @@ -0,0 +1,53 @@ +# gomonkey + +gomonkey is a library to make monkey patching in unit tests easy, and the core idea of monkey patching comes from [Bouke](https://github.com/bouk), you can read [this blogpost](https://bou.ke/blog/monkey-patching-in-go/) for an explanation on how it works. + +## Features + ++ support a patch for a function ++ support a patch for a public member method ++ support a patch for a private member method ++ support a patch for a interface ++ support a patch for a function variable ++ support a patch for a global variable ++ support patches of a specified sequence for a function ++ support patches of a specified sequence for a member method ++ support patches of a specified sequence for a interface ++ support patches of a specified sequence for a function variable + +## Notes ++ gomonkey fails to patch a function or a member method if inlining is enabled, please running your tests with inlining disabled by adding the command line argument that is `-gcflags=-l`(below go1.10) or `-gcflags=all=-l`(go1.10 and above). ++ A panic may happen when a goroutine is patching a function or a member method that is visited by another goroutine at the same time. That is to say, gomonkey is not threadsafe. + +## Supported Platform: + +- ARCH + - amd64 + - arm64 + - 386 + +- OS + - Linux + - MAC OS X + - Windows + +## Installation +- below v2.1.0, for example v2.0.2 +```go +$ go get github.com/agiledragon/gomonkey@v2.0.2 +``` +- v2.1.0 and above, for example v2.2.0 +```go +$ go get github.com/agiledragon/gomonkey/v2@v2.2.0 +``` + +## Test Method +```go +$ cd test +$ go test -gcflags=all=-l +``` + +## Using gomonkey + +Please refer to the test cases as idioms, very complete and detailed. + diff --git a/vendor/github.com/agiledragon/gomonkey/v2/creflect/ae1.17.go b/vendor/github.com/agiledragon/gomonkey/v2/creflect/ae1.17.go new file mode 100644 index 0000000000..68a7e750e9 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/creflect/ae1.17.go @@ -0,0 +1,39 @@ +//go:build go1.17 +// +build go1.17 + +package creflect + +import ( + "unsafe" +) + +// name is an encoded type name with optional extra data. +type name struct { + bytes *byte +} + +func (n name) data(off int, whySafe string) *byte { + return (*byte)(add(unsafe.Pointer(n.bytes), uintptr(off), whySafe)) +} + +func (n name) readVarint(off int) (int, int) { + v := 0 + for i := 0; ; i++ { + x := *n.data(off+i, "read varint") + v += int(x&0x7f) << (7 * i) + if x&0x80 == 0 { + return i + 1, v + } + } +} + +func (n name) name() (s string) { + if n.bytes == nil { + return + } + i, l := n.readVarint(1) + hdr := (*String)(unsafe.Pointer(&s)) + hdr.Data = unsafe.Pointer(n.data(1+i, "non-empty string")) + hdr.Len = l + return +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/creflect/be1.16.go b/vendor/github.com/agiledragon/gomonkey/v2/creflect/be1.16.go new file mode 100644 index 0000000000..1fb6988afc --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/creflect/be1.16.go @@ -0,0 +1,25 @@ +//go:build !go1.17 +// +build !go1.17 + +package creflect + +import ( + "unsafe" +) + +// name is an encoded type name with optional extra data. +type name struct { + bytes *byte +} + +func (n name) name() (s string) { + if n.bytes == nil { + return + } + b := (*[4]byte)(unsafe.Pointer(n.bytes)) + + hdr := (*String)(unsafe.Pointer(&s)) + hdr.Data = unsafe.Pointer(&b[3]) + hdr.Len = int(b[1])<<8 | int(b[2]) + return s +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/creflect/type.go b/vendor/github.com/agiledragon/gomonkey/v2/creflect/type.go new file mode 100644 index 0000000000..7df1f09207 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/creflect/type.go @@ -0,0 +1,179 @@ +// Customized reflect package for gomonkey,copy most code from go/src/reflect/type.go + +package creflect + +import ( + "reflect" + "unsafe" +) + +// rtype is the common implementation of most values. +// rtype must be kept in sync with ../runtime/type.go:/^type._type. +type rtype struct { + size uintptr + ptrdata uintptr // number of bytes in the type that can contain pointers + hash uint32 // hash of type; avoids computation in hash tables + tflag tflag // extra type information flags + align uint8 // alignment of variable with this type + fieldAlign uint8 // alignment of struct field with this type + kind uint8 // enumeration for C + // function for comparing objects of this type + // (ptr to object A, ptr to object B) -> ==? + equal func(unsafe.Pointer, unsafe.Pointer) bool + gcdata *byte // garbage collection data + str nameOff // string form + ptrToThis typeOff // type for pointer to this type, may be zero +} + +func Create(t reflect.Type) *rtype { + i := *(*funcValue)(unsafe.Pointer(&t)) + r := (*rtype)(i.p) + return r +} + +type funcValue struct { + _ uintptr + p unsafe.Pointer +} + +func funcPointer(v reflect.Method, ok bool) (unsafe.Pointer, bool) { + return (*funcValue)(unsafe.Pointer(&v.Func)).p, ok +} +func MethodByName(r reflect.Type, name string) (fn unsafe.Pointer, ok bool) { + t := Create(r) + if r.Kind() == reflect.Interface { + return funcPointer(r.MethodByName(name)) + } + ut := t.uncommon(r) + if ut == nil { + return nil, false + } + + for _, p := range ut.methods() { + if t.nameOff(p.name).name() == name { + return t.Method(p), true + } + } + return nil, false +} + +func (t *rtype) Method(p method) (fn unsafe.Pointer) { + tfn := t.textOff(p.tfn) + fn = unsafe.Pointer(&tfn) + return +} + +type tflag uint8 +type nameOff int32 // offset to a name +type typeOff int32 // offset to an *rtype +type textOff int32 // offset from top of text section + +//go:linkname resolveTextOff reflect.resolveTextOff +func resolveTextOff(rtype unsafe.Pointer, off int32) unsafe.Pointer + +func (t *rtype) textOff(off textOff) unsafe.Pointer { + return resolveTextOff(unsafe.Pointer(t), int32(off)) +} + +//go:linkname resolveNameOff reflect.resolveNameOff +func resolveNameOff(ptrInModule unsafe.Pointer, off int32) unsafe.Pointer + +func (t *rtype) nameOff(off nameOff) name { + return name{(*byte)(resolveNameOff(unsafe.Pointer(t), int32(off)))} +} + +const ( + tflagUncommon tflag = 1 << 0 +) + +// uncommonType is present only for defined types or types with methods +type uncommonType struct { + pkgPath nameOff // import path; empty for built-in types like int, string + mcount uint16 // number of methods + xcount uint16 // number of exported methods + moff uint32 // offset from this uncommontype to [mcount]method + _ uint32 // unused +} + +// ptrType represents a pointer type. +type ptrType struct { + rtype + elem *rtype // pointer element (pointed at) type +} + +// funcType represents a function type. +type funcType struct { + rtype + inCount uint16 + outCount uint16 // top bit is set if last input parameter is ... +} + +func add(p unsafe.Pointer, x uintptr, whySafe string) unsafe.Pointer { + return unsafe.Pointer(uintptr(p) + x) +} + +// interfaceType represents an interface type. +type interfaceType struct { + rtype + pkgPath name // import path + methods []imethod // sorted by hash +} + +type imethod struct { + name nameOff // name of method + typ typeOff // .(*FuncType) underneath +} + +type String struct { + Data unsafe.Pointer + Len int +} + +func (t *rtype) uncommon(r reflect.Type) *uncommonType { + if t.tflag&tflagUncommon == 0 { + return nil + } + switch r.Kind() { + case reflect.Ptr: + type u struct { + ptrType + u uncommonType + } + return &(*u)(unsafe.Pointer(t)).u + case reflect.Func: + type u struct { + funcType + u uncommonType + } + return &(*u)(unsafe.Pointer(t)).u + case reflect.Interface: + type u struct { + interfaceType + u uncommonType + } + return &(*u)(unsafe.Pointer(t)).u + case reflect.Struct: + type u struct { + interfaceType + u uncommonType + } + return &(*u)(unsafe.Pointer(t)).u + default: + return nil + } +} + +// Method on non-interface type +type method struct { + name nameOff // name of method + mtyp typeOff // method type (without receiver) + ifn textOff // fn used in interface call (one-word receiver) + tfn textOff // fn used for normal method call +} + +func (t *uncommonType) methods() []method { + if t.mcount == 0 { + return nil + } + return (*[1 << 16]method)(add(unsafe.Pointer(t), uintptr(t.moff), "t.mcount > 0"))[:t.mcount:t.mcount] +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/jmp_386.go b/vendor/github.com/agiledragon/gomonkey/v2/jmp_386.go new file mode 100644 index 0000000000..27c8d4f210 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/jmp_386.go @@ -0,0 +1,13 @@ +package gomonkey + +func buildJmpDirective(double uintptr) []byte { + d0 := byte(double) + d1 := byte(double >> 8) + d2 := byte(double >> 16) + d3 := byte(double >> 24) + + return []byte{ + 0xBA, d0, d1, d2, d3, // MOV edx, double + 0xFF, 0x22, // JMP [edx] + } +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/jmp_amd64.go b/vendor/github.com/agiledragon/gomonkey/v2/jmp_amd64.go new file mode 100644 index 0000000000..02c1c42c70 --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/jmp_amd64.go @@ -0,0 +1,18 @@ +package gomonkey + +func buildJmpDirective(double uintptr) []byte { + d0 := byte(double) + d1 := byte(double >> 8) + d2 := byte(double >> 16) + d3 := byte(double >> 24) + d4 := byte(double >> 32) + d5 := byte(double >> 40) + d6 := byte(double >> 48) + d7 := byte(double >> 56) + + return []byte{ + 0x48, 0xBA, d0, d1, d2, d3, d4, d5, d6, d7, // MOV rdx, double + 0xFF, 0x22, // JMP [rdx] + } +} + diff --git a/vendor/github.com/agiledragon/gomonkey/v2/jmp_arm64.go b/vendor/github.com/agiledragon/gomonkey/v2/jmp_arm64.go new file mode 100644 index 0000000000..add3fd999f --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/jmp_arm64.go @@ -0,0 +1,34 @@ +package gomonkey + +import "unsafe" + +func buildJmpDirective(double uintptr) []byte { + res := make([]byte, 0, 24) + d0d1 := double & 0xFFFF + d2d3 := double >> 16 & 0xFFFF + d4d5 := double >> 32 & 0xFFFF + d6d7 := double >> 48 & 0xFFFF + + res = append(res, movImm(0B10, 0, d0d1)...) // MOVZ x26, double[16:0] + res = append(res, movImm(0B11, 1, d2d3)...) // MOVK x26, double[32:16] + res = append(res, movImm(0B11, 2, d4d5)...) // MOVK x26, double[48:32] + res = append(res, movImm(0B11, 3, d6d7)...) // MOVK x26, double[64:48] + res = append(res, []byte{0x4A, 0x03, 0x40, 0xF9}...) // LDR x10, [x26] + res = append(res, []byte{0x40, 0x01, 0x1F, 0xD6}...) // BR x10 + + return res +} + +func movImm(opc, shift int, val uintptr) []byte { + var m uint32 = 26 // rd + m |= uint32(val) << 5 // imm16 + m |= uint32(shift&3) << 21 // hw + m |= 0b100101 << 23 // const + m |= uint32(opc&0x3) << 29 // opc + m |= 0b1 << 31 // sf + + res := make([]byte, 4) + *(*uint32)(unsafe.Pointer(&res[0])) = m + + return res +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_darwin.go b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_darwin.go new file mode 100644 index 0000000000..6c169888ea --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_darwin.go @@ -0,0 +1,27 @@ +package gomonkey + +import "syscall" + +func modifyBinary(target uintptr, bytes []byte) { + function := entryAddress(target, len(bytes)) + err := mprotectCrossPage(target, len(bytes), syscall.PROT_READ|syscall.PROT_WRITE|syscall.PROT_EXEC) + if err != nil { + panic(err) + } + copy(function, bytes) + err = mprotectCrossPage(target, len(bytes), syscall.PROT_READ|syscall.PROT_EXEC) + if err != nil { + panic(err) + } +} + +func mprotectCrossPage(addr uintptr, length int, prot int) error { + pageSize := syscall.Getpagesize() + for p := pageStart(addr); p < addr+uintptr(length); p += uintptr(pageSize) { + page := entryAddress(p, pageSize) + if err := syscall.Mprotect(page, prot); err != nil { + return err + } + } + return nil +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_linux.go b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_linux.go new file mode 100644 index 0000000000..6c169888ea --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_linux.go @@ -0,0 +1,27 @@ +package gomonkey + +import "syscall" + +func modifyBinary(target uintptr, bytes []byte) { + function := entryAddress(target, len(bytes)) + err := mprotectCrossPage(target, len(bytes), syscall.PROT_READ|syscall.PROT_WRITE|syscall.PROT_EXEC) + if err != nil { + panic(err) + } + copy(function, bytes) + err = mprotectCrossPage(target, len(bytes), syscall.PROT_READ|syscall.PROT_EXEC) + if err != nil { + panic(err) + } +} + +func mprotectCrossPage(addr uintptr, length int, prot int) error { + pageSize := syscall.Getpagesize() + for p := pageStart(addr); p < addr+uintptr(length); p += uintptr(pageSize) { + page := entryAddress(p, pageSize) + if err := syscall.Mprotect(page, prot); err != nil { + return err + } + } + return nil +} diff --git a/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_windows.go b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_windows.go new file mode 100644 index 0000000000..ef0dbc756a --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/modify_binary_windows.go @@ -0,0 +1,25 @@ +package gomonkey + +import ( + "syscall" + "unsafe" +) + +func modifyBinary(target uintptr, bytes []byte) { + function := entryAddress(target, len(bytes)) + + proc := syscall.NewLazyDLL("kernel32.dll").NewProc("VirtualProtect") + const PROT_READ_WRITE = 0x40 + var old uint32 + result, _, _ := proc.Call(target, uintptr(len(bytes)), uintptr(PROT_READ_WRITE), uintptr(unsafe.Pointer(&old))) + if result == 0 { + panic(result) + } + copy(function, bytes) + + var ignore uint32 + result, _, _ = proc.Call(target, uintptr(len(bytes)), uintptr(old), uintptr(unsafe.Pointer(&ignore))) + if result == 0 { + panic(result) + } +} \ No newline at end of file diff --git a/vendor/github.com/agiledragon/gomonkey/v2/patch.go b/vendor/github.com/agiledragon/gomonkey/v2/patch.go new file mode 100644 index 0000000000..468ae6708f --- /dev/null +++ b/vendor/github.com/agiledragon/gomonkey/v2/patch.go @@ -0,0 +1,365 @@ +package gomonkey + +import ( + "fmt" + "reflect" + "syscall" + "unsafe" + + "github.com/agiledragon/gomonkey/v2/creflect" +) + +type Patches struct { + originals map[uintptr][]byte + values map[reflect.Value]reflect.Value + valueHolders map[reflect.Value]reflect.Value +} + +type Params []interface{} +type OutputCell struct { + Values Params + Times int +} + +func ApplyFunc(target, double interface{}) *Patches { + return create().ApplyFunc(target, double) +} + +func ApplyMethod(target interface{}, methodName string, double interface{}) *Patches { + return create().ApplyMethod(target, methodName, double) +} + +func ApplyMethodFunc(target interface{}, methodName string, doubleFunc interface{}) *Patches { + return create().ApplyMethodFunc(target, methodName, doubleFunc) +} + +func ApplyPrivateMethod(target interface{}, methodName string, double interface{}) *Patches { + return create().ApplyPrivateMethod(target, methodName, double) +} + +func ApplyGlobalVar(target, double interface{}) *Patches { + return create().ApplyGlobalVar(target, double) +} + +func ApplyFuncVar(target, double interface{}) *Patches { + return create().ApplyFuncVar(target, double) +} + +func ApplyFuncSeq(target interface{}, outputs []OutputCell) *Patches { + return create().ApplyFuncSeq(target, outputs) +} + +func ApplyMethodSeq(target interface{}, methodName string, outputs []OutputCell) *Patches { + return create().ApplyMethodSeq(target, methodName, outputs) +} + +func ApplyFuncVarSeq(target interface{}, outputs []OutputCell) *Patches { + return create().ApplyFuncVarSeq(target, outputs) +} + +func ApplyFuncReturn(target interface{}, output ...interface{}) *Patches { + return create().ApplyFuncReturn(target, output...) +} + +func ApplyMethodReturn(target interface{}, methodName string, output ...interface{}) *Patches { + return create().ApplyMethodReturn(target, methodName, output...) +} + +func ApplyFuncVarReturn(target interface{}, output ...interface{}) *Patches { + return create().ApplyFuncVarReturn(target, output...) +} + +func create() *Patches { + return &Patches{originals: make(map[uintptr][]byte), values: make(map[reflect.Value]reflect.Value), valueHolders: make(map[reflect.Value]reflect.Value)} +} + +func NewPatches() *Patches { + return create() +} + +func (this *Patches) ApplyFunc(target, double interface{}) *Patches { + t := reflect.ValueOf(target) + d := reflect.ValueOf(double) + return this.ApplyCore(t, d) +} + +func (this *Patches) ApplyMethod(target interface{}, methodName string, double interface{}) *Patches { + m, ok := castRType(target).MethodByName(methodName) + if !ok { + panic("retrieve method by name failed") + } + d := reflect.ValueOf(double) + return this.ApplyCore(m.Func, d) +} + +func (this *Patches) ApplyMethodFunc(target interface{}, methodName string, doubleFunc interface{}) *Patches { + m, ok := castRType(target).MethodByName(methodName) + if !ok { + panic("retrieve method by name failed") + } + d := funcToMethod(m.Type, doubleFunc) + return this.ApplyCore(m.Func, d) +} + +func (this *Patches) ApplyPrivateMethod(target interface{}, methodName string, double interface{}) *Patches { + m, ok := creflect.MethodByName(castRType(target), methodName) + if !ok { + panic("retrieve method by name failed") + } + d := reflect.ValueOf(double) + return this.ApplyCoreOnlyForPrivateMethod(m, d) +} + +func (this *Patches) ApplyGlobalVar(target, double interface{}) *Patches { + t := reflect.ValueOf(target) + if t.Type().Kind() != reflect.Ptr { + panic("target is not a pointer") + } + + this.values[t] = reflect.ValueOf(t.Elem().Interface()) + d := reflect.ValueOf(double) + t.Elem().Set(d) + return this +} + +func (this *Patches) ApplyFuncVar(target, double interface{}) *Patches { + t := reflect.ValueOf(target) + d := reflect.ValueOf(double) + if t.Type().Kind() != reflect.Ptr { + panic("target is not a pointer") + } + this.check(t.Elem(), d) + return this.ApplyGlobalVar(target, double) +} + +func (this *Patches) ApplyFuncSeq(target interface{}, outputs []OutputCell) *Patches { + funcType := reflect.TypeOf(target) + t := reflect.ValueOf(target) + d := getDoubleFunc(funcType, outputs) + return this.ApplyCore(t, d) +} + +func (this *Patches) ApplyMethodSeq(target interface{}, methodName string, outputs []OutputCell) *Patches { + m, ok := castRType(target).MethodByName(methodName) + if !ok { + panic("retrieve method by name failed") + } + d := getDoubleFunc(m.Type, outputs) + return this.ApplyCore(m.Func, d) +} + +func (this *Patches) ApplyFuncVarSeq(target interface{}, outputs []OutputCell) *Patches { + t := reflect.ValueOf(target) + if t.Type().Kind() != reflect.Ptr { + panic("target is not a pointer") + } + if t.Elem().Kind() != reflect.Func { + panic("target is not a func") + } + + funcType := reflect.TypeOf(target).Elem() + double := getDoubleFunc(funcType, outputs).Interface() + return this.ApplyGlobalVar(target, double) +} + +func (this *Patches) ApplyFuncReturn(target interface{}, returns ...interface{}) *Patches { + funcType := reflect.TypeOf(target) + t := reflect.ValueOf(target) + outputs := []OutputCell{{Values: returns, Times: -1}} + d := getDoubleFunc(funcType, outputs) + return this.ApplyCore(t, d) +} + +func (this *Patches) ApplyMethodReturn(target interface{}, methodName string, returns ...interface{}) *Patches { + m, ok := reflect.TypeOf(target).MethodByName(methodName) + if !ok { + panic("retrieve method by name failed") + } + + outputs := []OutputCell{{Values: returns, Times: -1}} + d := getDoubleFunc(m.Type, outputs) + return this.ApplyCore(m.Func, d) +} + +func (this *Patches) ApplyFuncVarReturn(target interface{}, returns ...interface{}) *Patches { + t := reflect.ValueOf(target) + if t.Type().Kind() != reflect.Ptr { + panic("target is not a pointer") + } + if t.Elem().Kind() != reflect.Func { + panic("target is not a func") + } + + funcType := reflect.TypeOf(target).Elem() + outputs := []OutputCell{{Values: returns, Times: -1}} + double := getDoubleFunc(funcType, outputs).Interface() + return this.ApplyGlobalVar(target, double) +} + +func (this *Patches) Reset() { + for target, bytes := range this.originals { + modifyBinary(target, bytes) + delete(this.originals, target) + } + + for target, variable := range this.values { + target.Elem().Set(variable) + } +} + +func (this *Patches) ApplyCore(target, double reflect.Value) *Patches { + this.check(target, double) + assTarget := *(*uintptr)(getPointer(target)) + original := replace(assTarget, uintptr(getPointer(double))) + if _, ok := this.originals[assTarget]; !ok { + this.originals[assTarget] = original + } + this.valueHolders[double] = double + return this +} + +func (this *Patches) ApplyCoreOnlyForPrivateMethod(target unsafe.Pointer, double reflect.Value) *Patches { + if double.Kind() != reflect.Func { + panic("double is not a func") + } + assTarget := *(*uintptr)(target) + original := replace(assTarget, uintptr(getPointer(double))) + if _, ok := this.originals[assTarget]; !ok { + this.originals[assTarget] = original + } + this.valueHolders[double] = double + return this +} + +func (this *Patches) check(target, double reflect.Value) { + if target.Kind() != reflect.Func { + panic("target is not a func") + } + + if double.Kind() != reflect.Func { + panic("double is not a func") + } + + targetType := target.Type() + doubleType := double.Type() + + if targetType.NumIn() < doubleType.NumIn() || + targetType.NumOut() != doubleType.NumOut() || + (targetType.NumIn() == doubleType.NumIn() && targetType.IsVariadic() != doubleType.IsVariadic()) { + panic(fmt.Sprintf("target type(%s) and double type(%s) are different", target.Type(), double.Type())) + } + + for i, size := 0, doubleType.NumIn(); i < size; i++ { + targetIn := targetType.In(i) + doubleIn := doubleType.In(i) + + if targetIn.AssignableTo(doubleIn) { + continue + } + + panic(fmt.Sprintf("target type(%s) and double type(%s) are different", target.Type(), double.Type())) + } +} + +func replace(target, double uintptr) []byte { + code := buildJmpDirective(double) + bytes := entryAddress(target, len(code)) + original := make([]byte, len(bytes)) + copy(original, bytes) + modifyBinary(target, code) + return original +} + +func getDoubleFunc(funcType reflect.Type, outputs []OutputCell) reflect.Value { + if funcType.NumOut() != len(outputs[0].Values) { + panic(fmt.Sprintf("func type has %v return values, but only %v values provided as double", + funcType.NumOut(), len(outputs[0].Values))) + } + + needReturn := false + slice := make([]Params, 0) + for _, output := range outputs { + if output.Times == -1 { + needReturn = true + slice = []Params{output.Values} + break + } + t := 0 + if output.Times <= 1 { + t = 1 + } else { + t = output.Times + } + for j := 0; j < t; j++ { + slice = append(slice, output.Values) + } + } + + i := 0 + lenOutputs := len(slice) + return reflect.MakeFunc(funcType, func(_ []reflect.Value) []reflect.Value { + if needReturn { + return GetResultValues(funcType, slice[0]...) + } + if i < lenOutputs { + i++ + return GetResultValues(funcType, slice[i-1]...) + } + panic("double seq is less than call seq") + }) +} + +func GetResultValues(funcType reflect.Type, results ...interface{}) []reflect.Value { + var resultValues []reflect.Value + for i, r := range results { + var resultValue reflect.Value + if r == nil { + resultValue = reflect.Zero(funcType.Out(i)) + } else { + v := reflect.New(funcType.Out(i)) + v.Elem().Set(reflect.ValueOf(r)) + resultValue = v.Elem() + } + resultValues = append(resultValues, resultValue) + } + return resultValues +} + +type funcValue struct { + _ uintptr + p unsafe.Pointer +} + +func getPointer(v reflect.Value) unsafe.Pointer { + return (*funcValue)(unsafe.Pointer(&v)).p +} + +func entryAddress(p uintptr, l int) []byte { + return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{Data: p, Len: l, Cap: l})) +} + +func pageStart(ptr uintptr) uintptr { + return ptr & ^(uintptr(syscall.Getpagesize() - 1)) +} + +func funcToMethod(funcType reflect.Type, doubleFunc interface{}) reflect.Value { + rf := reflect.TypeOf(doubleFunc) + if rf.Kind() != reflect.Func { + panic("doubleFunc is not a func") + } + vf := reflect.ValueOf(doubleFunc) + return reflect.MakeFunc(funcType, func(in []reflect.Value) []reflect.Value { + if funcType.IsVariadic() { + return vf.CallSlice(in[1:]) + } else { + return vf.Call(in[1:]) + } + }) +} + +func castRType(val interface{}) reflect.Type { + if rTypeVal, ok := val.(reflect.Type); ok { + return rTypeVal + } + return reflect.TypeOf(val) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b90d615fd3..d4a6f07903 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -7,6 +7,10 @@ github.com/PuerkitoBio/purell # github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 ## explicit github.com/PuerkitoBio/urlesc +# github.com/agiledragon/gomonkey/v2 v2.9.0 +## explicit; go 1.14 +github.com/agiledragon/gomonkey/v2 +github.com/agiledragon/gomonkey/v2/creflect # github.com/beorn7/perks v1.0.1 ## explicit; go 1.11 github.com/beorn7/perks/quantile