Skip to content

Commit

Permalink
Feature/pipeline queue manager (#2334) (#2420)
Browse files Browse the repository at this point in the history
* annotate backup throttler func

* query, update queue usage to etcd

* use protobuf marshal unmarshal queue usage

* watch etcd to update queue instead of in memory

Co-authored-by: chengjoey <30427474+chengjoey@users.noreply.github.com>
  • Loading branch information
erda-bot and chengjoey authored Oct 19, 2021
1 parent 793d65d commit bee0684
Show file tree
Hide file tree
Showing 16 changed files with 423 additions and 14 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3
github.com/golang-jwt/jwt v3.2.1+incompatible
github.com/golang/mock v1.5.0
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0
github.com/googlecloudplatform/flink-operator v0.0.0-00010101000000-000000000000
github.com/gorilla/mux v1.8.0
Expand Down
6 changes: 2 additions & 4 deletions modules/pipeline/endpoints/queue_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (e *Endpoints) updatePipelineQueue(ctx context.Context, r *http.Request, va
}

// update queue in manager
e.reconciler.QueueManager.IdempotentAddQueue(queue)
e.reconciler.QueueManager.SendQueueToEtcd(queue.ID)

return httpserver.OkResp(queue)
}
Expand Down Expand Up @@ -204,9 +204,7 @@ func (e *Endpoints) batchUpgradePipelinePriority(ctx context.Context, r *http.Re
return errorresp.ErrResp(err)
}

if err = e.reconciler.QueueManager.BatchUpdatePipelinePriorityInQueue(queue, req.PipelineIDsOrderByPriorityFromHighToLow); err != nil {
return apierrors.ErrUpgradePipelinePriority.InternalError(err).ToResp(), nil
}
e.reconciler.QueueManager.SendUpdatePriorityPipelineIDsToEtcd(queue.ID, req.PipelineIDsOrderByPriorityFromHighToLow)

return httpserver.OkResp(nil)
}
3 changes: 3 additions & 0 deletions modules/pipeline/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ func (p *provider) do() error {
if err != nil {
return fmt.Errorf("failed to init reconciler, err: %v", err)
}
if err := r.LoadQueueManger(context.Background()); err != nil {
return fmt.Errorf("failed to load reconciler queue manager, err: %v", err)
}
if err := engine.OnceDo(r); err != nil {
return err
}
Expand Down
13 changes: 11 additions & 2 deletions modules/pipeline/pipengine/reconciler/before_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,21 @@ import (
)

func (r *Reconciler) beforeListen(ctx context.Context) error {
// init before listen
if err := retry.DoWithInterval(func() error { return r.loadQueueManger(ctx) }, 3, time.Second*10); err != nil {
// init before listen, use new queue manager cover the initialize context
if err := retry.DoWithInterval(func() error { return r.LoadQueueManger(ctx) }, 3, time.Second*10); err != nil {
return err
}
if err := retry.DoWithInterval(func() error { return r.loadThrottler(ctx) }, 3, time.Second*10); err != nil {
return err
}
go func() {
r.continueBackupQueueUsage(ctx)
}()
go func() {
r.QueueManager.ListenInputQueueFromEtcd(ctx)
}()
go func() {
r.QueueManager.ListenUpdatePriorityPipelineIDsFromEtcd(ctx)
}()
return nil
}
2 changes: 1 addition & 1 deletion modules/pipeline/pipengine/reconciler/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Reconciler struct {
dbClient *dbclient.Client

QueueManager types.QueueManager
TaskThrottler throttler.Throttler
TaskThrottler throttler.Throttler // TODO remove the throttler.Throttler, after release/1.3 iteration throttler is not necessary

// processingTasks store task id which is in processing
processingTasks sync.Map
Expand Down
56 changes: 54 additions & 2 deletions modules/pipeline/pipengine/reconciler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,66 @@ package reconciler

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/sirupsen/logrus"

"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/manager"
)

const logPrefixContinueBackupQueueUsage = "[queue usage backup]"

// loadQueueManger
func (r *Reconciler) loadQueueManger(ctx context.Context) error {
func (r *Reconciler) LoadQueueManger(ctx context.Context) error {
// init queue manager
r.QueueManager = manager.New(ctx, manager.WithDBClient(r.dbClient))
r.QueueManager = manager.New(ctx, manager.WithDBClient(r.dbClient), manager.WithEtcdClient(r.etcd), manager.WithJsClient(r.js))

return nil
}

func (r *Reconciler) continueBackupQueueUsage(ctx context.Context) {
done := make(chan struct{})
errDone := make(chan error)

var costTime time.Duration
for {
go func() {
begin := time.Now()
backup := r.QueueManager.Export()
end := time.Now()
costTime = end.Sub(begin)
queueSnapshot := manager.SnapshotObj{}
if err := json.Unmarshal(backup, &queueSnapshot); err != nil {
errDone <- err
return
}
errs := []string{}
for qID, qMsg := range queueSnapshot.QueueUsageByID {
if err := r.etcd.Put(ctx, manager.MakeQueueUsageBackupKey(qID), string(qMsg)); err != nil {
errs = append(errs, fmt.Sprintf("%v", err))
continue
}
}
if len(errs) > 0 {
usageErr := fmt.Errorf(strings.Join(errs, ","))
errDone <- usageErr
return
}
done <- struct{}{}
}()

select {
case <-done:
logrus.Debugf("%s: sleep 30s for next backup (cost %s this time)", logPrefixContinueBackupQueueUsage, costTime)
time.Sleep(time.Second * 30)
case err := <-errDone:
logrus.Errorf("%s: failed to load, wait 10s for next loading, err: %v", logPrefixContinueBackupQueueUsage, err)
time.Sleep(time.Second * 10)
case <-ctx.Done():
return
}
}
}
59 changes: 59 additions & 0 deletions modules/pipeline/pipengine/reconciler/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reconciler

import (
"context"
"fmt"
"reflect"
"testing"
"time"

"bou.ke/monkey"

"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/manager"
"github.com/erda-project/erda/pkg/jsonstore/etcd"
)

func TestContinueBackupQueueUsage(t *testing.T) {
etcdClient := &etcd.Store{}
pm := monkey.PatchInstanceMethod(reflect.TypeOf(etcdClient), "Put", func(j *etcd.Store, ctx context.Context, key string, value string) error {
return fmt.Errorf("failed to put msg")
})
defer pm.Unpatch()
q := manager.New(context.Background(), manager.WithEtcdClient(etcdClient))
//pm1 := monkey.PatchInstanceMethod(reflect.TypeOf(q), "Export", func(mgr *types.QueueManager) json.RawMessage {
// u := &pb.QueueUsage{}
// bu, _ := json.Marshal(u)
// sna := manager.SnapshotObj{
// QueueUsageByID: map[string]json.RawMessage{
// "1": bu,
// },
// }
// snaByte, _ := json.Marshal(&sna)
// return snaByte
//})
//defer pm1.Unpatch()
r := &Reconciler{
QueueManager: q,
}
t.Run("continueBackupQueueUsage", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go r.continueBackupQueueUsage(ctx)
time.Sleep(2 * time.Second)
cancel()
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/erda-project/erda/modules/pipeline/dbclient"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/types"
"github.com/erda-project/erda/pkg/jsonstore"
"github.com/erda-project/erda/pkg/jsonstore/etcd"
)

// defaultManager is the default manager.
Expand All @@ -35,6 +37,8 @@ type defaultManager struct {
//pCacheLock sync.RWMutex

dbClient *dbclient.Client
etcd *etcd.Store
js jsonstore.JsonStore
}

// New return a new queue manager.
Expand Down Expand Up @@ -72,3 +76,15 @@ func WithDBClient(dbClient *dbclient.Client) Option {
mgr.dbClient = dbClient
}
}

func WithEtcdClient(etcdClient *etcd.Store) Option {
return func(mgr *defaultManager) {
mgr.etcd = etcdClient
}
}

func WithJsClient(js jsonstore.JsonStore) Option {
return func(mgr *defaultManager) {
mgr.js = js
}
}
125 changes: 125 additions & 0 deletions modules/pipeline/pipengine/reconciler/queuemanage/manager/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,28 @@
package manager

import (
"context"
"fmt"
"strconv"
"time"

"github.com/sirupsen/logrus"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/queue"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/types"
"github.com/erda-project/erda/pkg/jsonstore/storetypes"
"github.com/erda-project/erda/pkg/loop"
"github.com/erda-project/erda/pkg/strutil"
)

const (
etcdQueueWatchPrefix = "/devops/pipeline/queue_manager/actions/update/"
etcdQueuePipelineWatchPrefix = "/devops/pipeline/queue_manager/actions/batch-update/"
)

var (
defaultQueueManagerLogPrefix = "[default queue manager]"
)

// IdempotentAddQueue add to to manager idempotent.
Expand All @@ -43,3 +62,109 @@ func (mgr *defaultManager) IdempotentAddQueue(pq *apistructs.PipelineQueue) type

return newQueue
}

func (mgr *defaultManager) SendQueueToEtcd(queueID uint64) {
_ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(time.Second*60)).Do(func() (abort bool, err error) {
err = mgr.js.Put(context.Background(), fmt.Sprintf("%s%d", etcdQueueWatchPrefix, queueID), nil)
if err != nil {
logrus.Errorf("%s: send to queue failed, err: %v", defaultQueueManagerLogPrefix, err)
return false, err
}
logrus.Infof("%s: queue id: %d add to queue success", defaultQueueManagerLogPrefix, queueID)
return true, nil
})
}

func (mgr *defaultManager) SendUpdatePriorityPipelineIDsToEtcd(queueID uint64, pipelineIDS []uint64) {
_ = loop.New(loop.WithDeclineRatio(2), loop.WithDeclineLimit(time.Second*60)).Do(func() (abort bool, err error) {
err = mgr.js.Put(context.Background(), fmt.Sprintf("%s%d", etcdQueuePipelineWatchPrefix, queueID), pipelineIDS)
if err != nil {
logrus.Errorf("%s: send to queue pipelines failed, err: %v", defaultQueueManagerLogPrefix, err)
return false, err
}
logrus.Infof("%s: queue id: %d add to queue pipelines success", defaultQueueManagerLogPrefix, queueID)
return true, nil
})
}

// Listen leader node should listen etcd, watch queue information
func (mgr *defaultManager) ListenInputQueueFromEtcd(ctx context.Context) {
logrus.Infof("%s: start listen", defaultQueueManagerLogPrefix)
for {
select {
case <-ctx.Done():
return
default:
_ = mgr.js.IncludeWatch().Watch(ctx, etcdQueueWatchPrefix, true, true, true, nil,
func(key string, _ interface{}, t storetypes.ChangeType) (_ error) {
go func() {
logrus.Infof("%s: watched a key change: %s, changeType", key, t.String())
queueID, err := parseQueueIDFromWatchedKey(key, etcdQueueWatchPrefix)
if err != nil {
logrus.Errorf("%s: failed to parse queueID from watched key, key: %s, err: %v", defaultQueueManagerLogPrefix, key, err)
return
}
pq, exist, err := mgr.dbClient.GetPipelineQueue(queueID)
if err != nil {
logrus.Errorf("%s: failed to get queue, id: %d, err: %v", defaultQueueManagerLogPrefix, queueID, err)
return
}
if !exist {
logrus.Errorf("%s: queue not existed, id: %d, err: %v", defaultQueueManagerLogPrefix, queueID, err)
return
}

_ = mgr.IdempotentAddQueue(pq)
}()

return nil
})
}
}
}

func (mgr *defaultManager) ListenUpdatePriorityPipelineIDsFromEtcd(ctx context.Context) {
logrus.Infof("%s: start listen pipeline ids", defaultQueueManagerLogPrefix)
for {
select {
case <-ctx.Done():
return
default:
_ = mgr.js.IncludeWatch().Watch(ctx, etcdQueuePipelineWatchPrefix, true, true, false, []uint64{},
func(key string, value interface{}, t storetypes.ChangeType) (_ error) {
logrus.Infof("%s: watched a key change: %s, value: %v, changeType", key, value, t.String())
queueID, err := parseQueueIDFromWatchedKey(key, etcdQueuePipelineWatchPrefix)
if err != nil {
logrus.Errorf("%s: failed to parse queueID from watched key, key: %s, err: %v", defaultQueueManagerLogPrefix, key, err)
return
}
pq, exist, err := mgr.dbClient.GetPipelineQueue(queueID)
if err != nil {
logrus.Errorf("%s: failed to get queue, id: %d, err: %v", defaultQueueManagerLogPrefix, queueID, err)
return
}
if !exist {
logrus.Errorf("%s: queue not existed, id: %d, err: %v", defaultQueueManagerLogPrefix, queueID, err)
return
}

pipelineIDS, ok := value.(*[]uint64)
if !ok {
logrus.Errorf("%s: failed convert value: %v to pipeline ids", defaultQueueManagerLogPrefix, value)
return
}
if err := mgr.BatchUpdatePipelinePriorityInQueue(pq, *pipelineIDS); err != nil {
logrus.Errorf("%s: failed to batch update pipeline priority in queue, err: %v", defaultQueueManagerLogPrefix, err)
return
}

return nil
})
}
}
}

func parseQueueIDFromWatchedKey(key string, prefixKey string) (uint64, error) {
pipelineIDStr := strutil.TrimPrefixes(key, prefixKey)
return strconv.ParseUint(pipelineIDStr, 10, 64)
}
Loading

0 comments on commit bee0684

Please sign in to comment.