Skip to content

Commit

Permalink
query, update queue usage to etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
chengjoey committed Oct 12, 2021
1 parent e74dcd7 commit 0751721
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 5 deletions.
3 changes: 3 additions & 0 deletions modules/pipeline/pipengine/reconciler/before_listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ func (r *Reconciler) beforeListen(ctx context.Context) error {
if err := retry.DoWithInterval(func() error { return r.loadThrottler(ctx) }, 3, time.Second*10); err != nil {
return err
}
go func() {
r.continueBackupQueueUsage(ctx)
}()
return nil
}
45 changes: 44 additions & 1 deletion modules/pipeline/pipengine/reconciler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,57 @@ package reconciler

import (
"context"
"encoding/json"
"time"

"github.com/sirupsen/logrus"

"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/manager"
)

const logPrefixContinueBackupQueueUsage = "[queue usage backup]"

// loadQueueManger
func (r *Reconciler) loadQueueManger(ctx context.Context) error {
// init queue manager
r.QueueManager = manager.New(ctx, manager.WithDBClient(r.dbClient))
r.QueueManager = manager.New(ctx, manager.WithDBClient(r.dbClient), manager.WithJSClient(r.js))

return nil
}

func (r *Reconciler) continueBackupQueueUsage(ctx context.Context) {
done := make(chan struct{})
errDone := make(chan error)

var costTime time.Duration
for {
go func() {
begin := time.Now()
backup := r.QueueManager.Export()
end := time.Now()
costTime = end.Sub(begin)
queueSnapshot := manager.SnapshotObj{}
if err := json.Unmarshal(backup, &queueSnapshot); err != nil {
errDone <- err
}
for qID, qMsg := range queueSnapshot.QueueUsageByID {
if err := r.js.Put(ctx, manager.MakeQueueUsageBackupKey(qID), qMsg); err != nil {
errDone <- err
return
}
}
done <- struct{}{}
}()

select {
case <-done:
logrus.Debugf("%s: sleep 30s for next backup (cost %s this time)", logPrefixContinueBackupQueueUsage, costTime)
time.Sleep(time.Second * 30)
case err := <-errDone:
logrus.Errorf("%s: failed to load, wait 10s for next loading, err: %v", logPrefixContinueBackupQueueUsage, err)
time.Sleep(time.Second * 10)
case <-ctx.Done():
return
}
}
}
57 changes: 57 additions & 0 deletions modules/pipeline/pipengine/reconciler/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reconciler

import (
"context"
"reflect"
"testing"
"time"

"bou.ke/monkey"

"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/manager"
"github.com/erda-project/erda/pkg/jsonstore"
)

func TestContinueBackupQueueUsage(t *testing.T) {
js := &jsonstore.JsonStoreImpl{}
pm := monkey.PatchInstanceMethod(reflect.TypeOf(js), "Put", func(j *jsonstore.JsonStoreImpl, ctx context.Context, key string, object interface{}) error {
return nil
})
defer pm.Unpatch()
q := manager.New(context.Background(), manager.WithJSClient(js))
//pm1 := monkey.PatchInstanceMethod(reflect.TypeOf(q), "Export", func(mgr *types.QueueManager) json.RawMessage {
// u := &pb.QueueUsage{}
// bu, _ := json.Marshal(u)
// sna := manager.SnapshotObj{
// QueueUsageByID: map[string]json.RawMessage{
// "1": bu,
// },
// }
// snaByte, _ := json.Marshal(&sna)
// return snaByte
//})
//defer pm1.Unpatch()
r := &Reconciler{
QueueManager: q,
}
t.Run("continueBackupQueueUsage", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go r.continueBackupQueueUsage(ctx)
time.Sleep(2 * time.Second)
cancel()
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/erda-project/erda/modules/pipeline/dbclient"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/types"
"github.com/erda-project/erda/pkg/jsonstore"
)

// defaultManager is the default manager.
Expand All @@ -35,6 +36,7 @@ type defaultManager struct {
//pCacheLock sync.RWMutex

dbClient *dbclient.Client
js jsonstore.JsonStore
}

// New return a new queue manager.
Expand Down Expand Up @@ -72,3 +74,9 @@ func WithDBClient(dbClient *dbclient.Client) Option {
mgr.dbClient = dbClient
}
}

func WithJSClient(js jsonstore.JsonStore) Option {
return func(mgr *defaultManager) {
mgr.js = js
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package manager

import "encoding/json"

type SnapshotObj struct {
QueueUsageByID map[string]json.RawMessage `json:"queueUsageByID"`
}

func (mgr *defaultManager) Export() json.RawMessage {
mgr.qLock.Lock()
defer mgr.qLock.Unlock()
obj := SnapshotObj{
QueueUsageByID: make(map[string]json.RawMessage),
}
for qID, queue := range mgr.queueByID {
u := queue.Usage()
uByte, _ := json.Marshal(u)
obj.QueueUsageByID[qID] = uByte
}
b, _ := json.Marshal(&obj)
return b
}

// Import default queue manager execute in memory, don't need import
func (mgr *defaultManager) Import(rawMsg json.RawMessage) error {
return nil
}
17 changes: 13 additions & 4 deletions modules/pipeline/pipengine/reconciler/queuemanage/manager/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,28 @@
package manager

import (
"context"
"fmt"

"github.com/sirupsen/logrus"

"github.com/erda-project/erda-proto-go/pipeline/pb"
"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/pipengine/reconciler/queuemanage/queue"
)

func MakeQueueUsageBackupKey(qID string) string {
return fmt.Sprintf("/devops/pipeline/queue/reconciler/usage/%s", qID)
}

func (mgr *defaultManager) QueryQueueUsage(pq *apistructs.PipelineQueue) *pb.QueueUsage {
mgr.qLock.RLock()
defer mgr.qLock.RUnlock()
q, ok := mgr.queueByID[queue.New(pq).ID()]
if !ok {
usage := pb.QueueUsage{}
err := mgr.js.Get(context.Background(), MakeQueueUsageBackupKey(queue.New(pq).ID()), &usage)
if err != nil {
logrus.Errorf("failed to query queue usage, err: %v", err)
return nil
}

usage := q.Usage()
return &usage
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queue

import "encoding/json"

func (q *defaultQueue) Export() json.RawMessage {
return q.eq.Export()
}

func (q *defaultQueue) Import(rawMsg json.RawMessage) error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package types
import (
"github.com/erda-project/erda-proto-go/pipeline/pb"
"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/pipengine/queue/snapshot"
)

// QueueManager manage all queues and related pipelines.
Expand All @@ -27,4 +28,5 @@ type QueueManager interface {
PopOutPipelineFromQueue(pipelineID uint64)
BatchUpdatePipelinePriorityInQueue(pq *apistructs.PipelineQueue, pipelineIDs []uint64) error
Stop()
snapshot.Snapshot
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package types
import (
"github.com/erda-project/erda-proto-go/pipeline/pb"
"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/modules/pipeline/pipengine/queue/snapshot"
"github.com/erda-project/erda/modules/pipeline/spec"
)

Expand All @@ -31,4 +32,5 @@ type Queue interface {
AddPipelineIntoQueue(p *spec.Pipeline, doneCh chan struct{})
PopOutPipeline(p *spec.Pipeline)
BatchUpdatePipelinePriorityInQueue(pipelines []*spec.Pipeline) error
snapshot.Snapshot
}
1 change: 1 addition & 0 deletions modules/pipeline/pipengine/reconciler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (r *Reconciler) loadThrottler(ctx context.Context) error {
logrus.Warnf("reconciler: failed to load throttler, ignore, import err: %v", err)
// load from database

// continueBackupThrottler after 1.0 iteration, throttler is not necessary
//r.continueBackupThrottler(ctx)
return nil
}
Expand Down

0 comments on commit 0751721

Please sign in to comment.