Skip to content

Commit

Permalink
Executor Service #1 (#253) (#4010)
Browse files Browse the repository at this point in the history
* Move PulsarConfig into common/config (#217) (#3907)

* ARMADA-2848 Move PulsarConfig into commonconfig

* Update test name TestValidateHasJobSetID->Id

* Revert unintended changes to yarn.lock file

* fix import order



(cherry picked from commit 35cb59f)


* Adding ControlPlaneEventsTopic to pulsar config

* Evolving ControlPlaneEvents message structure

We've decided on a parent/wrapper message for the ControlPlaneEvents to avoid passing around ambiguous proto.Message slices in the Publisher and Ingester.

* Setting maxAllowedMessageSize to correct value in relevant tests

* Removing reason for uncordon requests to the executor service

* Moving event creation time to parent Control Plane Event, modifying executor service rpcs to reflect the events being published, changed pulsar message keys to hard coded strings rather than proto name

* Renaming UpdateExecutorSettings rpc to UpsertExecutorSettings

* Removing message keys from ControlPlaneEvent messages, reverting method name changes

* Renaming LimitEventSequencesByteSize

* Adding executor cordoning functionality to armadactl

* Renaming ControlPlaneEvent to Event

* Simplifying executor cordoning code

* More sane checks on UpsertExecutorSettings rpc, better error messages

* Typo

* Updated command descriptions for executor cordoning and uncordoning

* Separating executor service args from controlplaneevents

* Executor Service #2 (#254)

* Generalising common ingestion pipeline

* Removing unused config

* Amending comments and variable names in common ingestion pipeline to be more event agnostic

* Returning to original metric name, denoting ingested event type via labal rather than metric name

* Import ordering

* Generalising pulsar publisher

* Executor Service #3 (#255)

* Modifying SchedulerIngester to ingest control plane events, creating executor settings table and associated plumbing

* Simplifying dbops merge for controlplanevents

* Moving DBOperation scoping into schedulerdb

* Adding GetOperation method to DBOperation, determining locking using this

* Executor Service #4 (#257)

* Implementing cluster cordoning in scheduler

* Filter executors from previous filter result

* Adding default value for queue label when publishing controlplaneevent metrics

---------

Signed-off-by: mustaily891 <mustafa.ilyas@gresearch.co.uk>
Co-authored-by: Mustafa Ilyas <Mustafa.Ilyas@gresearch.co.uk>
Co-authored-by: Eleanor Pratt <Eleanor.Pratt@gresearch.co.uk>
  • Loading branch information
3 people authored Oct 17, 2024
1 parent 6271a58 commit 421dc6d
Show file tree
Hide file tree
Showing 66 changed files with 4,145 additions and 470 deletions.
51 changes: 49 additions & 2 deletions cmd/armadactl/cmd/cordon.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ func cordon() *cobra.Command {
cmd := &cobra.Command{
Use: "cordon",
Short: "Pause scheduling by resource",
Long: "Pause scheduling by resource. Supported: queue, queues",
Long: "Pause scheduling by resource. Supported: queue, queues, executor",
}
cmd.AddCommand(cordonQueues(a))
cmd.AddCommand(cordonExecutor(a))
return cmd
}

Expand All @@ -25,9 +26,10 @@ func uncordon() *cobra.Command {
cmd := &cobra.Command{
Use: "uncordon",
Short: "Resume scheduling by resource",
Long: "Resume scheduling by resource. Supported: queue, queues",
Long: "Resume scheduling by resource. Supported: queue, queues, executor",
}
cmd.AddCommand(uncordonQueues(a))
cmd.AddCommand(uncordonExecutor(a))
return cmd
}

Expand Down Expand Up @@ -132,3 +134,48 @@ func uncordonQueues(a *armadactl.App) *cobra.Command {

return cmd
}

func cordonExecutor(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "executor <executor_name> <cordon_reason>",
Short: "Pause scheduling on an executor",
Long: "Pause scheduling on an executor",
Args: cobra.ExactArgs(2),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
executorName := args[0]
cordonReason := args[1]
if executorName == "" {
return fmt.Errorf("provided executor name is invalid: %s", executorName)
} else if cordonReason == "" {
return fmt.Errorf("provided cordon reason is invalid: %s", cordonReason)
}

return a.CordonExecutor(executorName, cordonReason)
},
}
return cmd
}

func uncordonExecutor(a *armadactl.App) *cobra.Command {
cmd := &cobra.Command{
Use: "executor <executor_name>",
Short: "Resume scheduling on an executor",
Long: "Resume scheduling on an executor",
Args: cobra.ExactArgs(1),
PreRunE: func(cmd *cobra.Command, args []string) error {
return initParams(cmd, a.Params)
},
RunE: func(cmd *cobra.Command, args []string) error {
executorName := args[0]
if executorName == "" {
return fmt.Errorf("provided executor name is invalid: %s", executorName)
}

return a.UncordonExecutor(executorName)
},
}
return cmd
}
4 changes: 4 additions & 0 deletions cmd/armadactl/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/armadaproject/armada/internal/armadactl"
"github.com/armadaproject/armada/pkg/client"
ce "github.com/armadaproject/armada/pkg/client/executor"
cq "github.com/armadaproject/armada/pkg/client/queue"
)

Expand All @@ -29,5 +30,8 @@ func initParams(cmd *cobra.Command, params *armadactl.Params) error {
params.QueueAPI.Cordon = cq.Cordon(client.ExtractCommandlineArmadaApiConnectionDetails)
params.QueueAPI.Uncordon = cq.Uncordon(client.ExtractCommandlineArmadaApiConnectionDetails)

params.ExecutorAPI.Cordon = ce.CordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)
params.ExecutorAPI.Uncordon = ce.UncordonExecutor(client.ExtractCommandlineArmadaApiConnectionDetails)

return nil
}
1 change: 1 addition & 0 deletions config/scheduleringester/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ metrics:
pulsar:
URL: "pulsar://localhost:6650"
jobsetEventsTopic: "events"
controlPlaneEventsTopic: "control-plane"
backoffTime: 1s
receiverQueueSize: 100
subscriptionName: "scheduler-ingester"
Expand Down
1 change: 1 addition & 0 deletions config/server/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ submission:
pulsar:
URL: "pulsar://pulsar:6650"
jobsetEventsTopic: "events"
controlPlaneEventsTopic: "control-plane"
maxConnectionsPerBroker: 1
compressionType: zlib
compressionLevel: faster
Expand Down
1 change: 1 addition & 0 deletions developer/config/insecure-armada.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ auth:
reprioritize_any_jobs: ["everyone"]
watch_all_events: ["everyone"]
execute_jobs: ["everyone"]
update_executor_settings: ["everyone"]
8 changes: 8 additions & 0 deletions internal/armadactl/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"

"github.com/armadaproject/armada/pkg/client"
"github.com/armadaproject/armada/pkg/client/executor"
"github.com/armadaproject/armada/pkg/client/queue"
)

Expand All @@ -39,6 +40,7 @@ type App struct {
type Params struct {
ApiConnectionDetails *client.ApiConnectionDetails
QueueAPI *QueueAPI
ExecutorAPI *ExecutorAPI
}

// QueueAPI struct holds pointers to functions that are called by armadactl.
Expand All @@ -56,6 +58,11 @@ type QueueAPI struct {
Uncordon queue.UncordonAPI
}

type ExecutorAPI struct {
Cordon executor.CordonAPI
Uncordon executor.UncordonAPI
}

// New instantiates an App with default parameters, including standard output
// and cryptographically secure random source.
func New() *App {
Expand All @@ -65,5 +72,6 @@ func New() *App {
Random: rand.Reader,
}
app.Params.QueueAPI = &QueueAPI{}
app.Params.ExecutorAPI = &ExecutorAPI{}
return app
}
20 changes: 20 additions & 0 deletions internal/armadactl/cordon.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,23 @@ func (a *App) UncordonQueues(queryArgs *QueueQueryArgs, dryRun bool) error {

return nil
}

func (a *App) UncordonExecutor(executor string) error {
fmt.Println("Uncordoning the following executors:")
if err := a.Params.ExecutorAPI.Uncordon(executor); err != nil {
return fmt.Errorf("error uncordoning executor %s: %s", executor, err)
} else {
fmt.Printf("%s uncordoned\n", executor)
}
return nil
}

func (a *App) CordonExecutor(executor string, cordonReason string) error {
fmt.Println("Cordoning the following executors:")
if err := a.Params.ExecutorAPI.Cordon(executor, cordonReason); err != nil {
return fmt.Errorf("error cordoning executor %s: %s", executor, err)
} else {
fmt.Printf("%s cordoned\n", executor)
}
return nil
}
2 changes: 2 additions & 0 deletions internal/common/config/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type PulsarConfig struct {
JwtTokenPath string
// The pulsar topic that Jobset Events will be published to
JobsetEventsTopic string
// The pulsar topic that Control Plane Events will be published to
ControlPlaneEventsTopic string
// Compression to use. Valid values are "None", "LZ4", "Zlib", "Zstd". Default is "None"
CompressionType pulsar.CompressionType
// Compression Level to use. Valid values are "Default", "Better", "Faster". Default is "Default"
Expand Down
26 changes: 24 additions & 2 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/armadaerrors"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/controlplaneevents"
)

// UnmarshalEventSequence returns an EventSequence object contained in a byte buffer
// after validating that the resulting EventSequence is valid.
func UnmarshalEventSequence(ctx *armadacontext.Context, payload []byte) (*armadaevents.EventSequence, error) {
func UnmarshalEventSequence(payload []byte) (*armadaevents.EventSequence, error) {
sequence := &armadaevents.EventSequence{}
err := proto.Unmarshal(payload, sequence)
if err != nil {
Expand Down Expand Up @@ -66,6 +66,28 @@ func UnmarshalEventSequence(ctx *armadacontext.Context, payload []byte) (*armada
return sequence, nil
}

// UnmarshalControlPlaneEvent returns a ControlPlane Event object contained in a byte buffer
// after validating that the resulting ControlPlane Event is valid.
func UnmarshalControlPlaneEvent(payload []byte) (*controlplaneevents.Event, error) {
event := &controlplaneevents.Event{}
err := proto.Unmarshal(payload, event)
if err != nil {
err = errors.WithStack(err)
return nil, err
}

if event.Event == nil {
err = &armadaerrors.ErrInvalidArgument{
Name: "Event",
Value: nil,
Message: "no event in controlplane event",
}
err = errors.WithStack(err)
return nil, err
}
return event, nil
}

// ShortSequenceString returns a short string representation of an events sequence.
// To be used for logging, for example.
func ShortSequenceString(sequence *armadaevents.EventSequence) string {
Expand Down
10 changes: 5 additions & 5 deletions internal/common/ingest/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ type Batcher[T any] struct {
// Such as if you are batching objects A, but want to limit on the number of A.[]B objects seen
// In which case this function should return len(A.[]B)
itemCountFunc func(T) int
callback func([]T)
publish chan []T
buffer []T
mutex sync.Mutex
}

func NewBatcher[T any](input <-chan T, maxItems int, maxTimeout time.Duration, itemCountFunc func(T) int, callback func([]T)) *Batcher[T] {
func NewBatcher[T any](input <-chan T, maxItems int, maxTimeout time.Duration, itemCountFunc func(T) int, publish chan []T) *Batcher[T] {
return &Batcher[T]{
input: input,
maxItems: maxItems,
maxTimeout: maxTimeout,
itemCountFunc: itemCountFunc,
callback: callback,
publish: publish,
clock: clock.RealClock{},
mutex: sync.Mutex{},
}
Expand All @@ -59,14 +59,14 @@ func (b *Batcher[T]) Run(ctx *armadacontext.Context) {
b.buffer = append(b.buffer, value)
totalNumberOfItems += b.itemCountFunc(value)
if totalNumberOfItems >= b.maxItems {
b.callback(b.buffer)
b.publish <- b.buffer
appendToBatch = false
}
b.mutex.Unlock()
case <-expire:
b.mutex.Lock()
if len(b.buffer) > 0 {
b.callback(b.buffer)
b.publish <- b.buffer
}
appendToBatch = false
b.mutex.Unlock()
Expand Down
40 changes: 36 additions & 4 deletions internal/common/ingest/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ type resultHolder struct {
mutex sync.Mutex
}

func addToResult(result *resultHolder, publishChan chan []int) {
for {
select {
case value, ok := <-publishChan:
if !ok {
return
}
result.add(value)
}
}
}

func newResultHolder() *resultHolder {
return &resultHolder{
result: make([][]int, 0),
Expand All @@ -47,12 +59,17 @@ func TestBatch_MaxItems(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second)
testClock := clock.NewFakeClock(time.Now())
inputChan := make(chan int)
publishChan := make(chan []int)
result := newResultHolder()
batcher := NewBatcher[int](inputChan, defaultMaxItems, defaultMaxTimeOut, defaultItemCounterFunc, result.add)
batcher := NewBatcher[int](inputChan, defaultMaxItems, defaultMaxTimeOut, defaultItemCounterFunc, publishChan)
batcher.clock = testClock

go addToResult(result, publishChan)

go func() {
batcher.Run(ctx)
close(inputChan)
close(publishChan)
}()

// Post 6 items on the input channel without advancing the clock
Expand All @@ -72,14 +89,19 @@ func TestBatch_MaxItems_CustomItemCountFunction(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second)
testClock := clock.NewFakeClock(time.Now())
inputChan := make(chan int)
publishChan := make(chan []int)
result := newResultHolder()
// This function will mean each item on the input channel will count as 2 items
doubleItemCounterFunc := func(i int) int { return 2 }
batcher := NewBatcher[int](inputChan, defaultMaxItems, defaultMaxTimeOut, doubleItemCounterFunc, result.add)
batcher := NewBatcher[int](inputChan, defaultMaxItems, defaultMaxTimeOut, doubleItemCounterFunc, publishChan)
batcher.clock = testClock

go addToResult(result, publishChan)

go func() {
batcher.Run(ctx)
close(inputChan)
close(publishChan)
}()

// Post 6 items on the input channel without advancing the clock
Expand All @@ -99,12 +121,17 @@ func TestBatch_Time(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second)
testClock := clock.NewFakeClock(time.Now())
inputChan := make(chan int)
publishChan := make(chan []int)
result := newResultHolder()
batcher := NewBatcher[int](inputChan, defaultMaxItems, defaultMaxTimeOut, defaultItemCounterFunc, result.add)
batcher := NewBatcher[int](inputChan, defaultMaxItems, defaultMaxTimeOut, defaultItemCounterFunc, publishChan)
batcher.clock = testClock

go addToResult(result, publishChan)

go func() {
batcher.Run(ctx)
close(inputChan)
close(publishChan)
}()

inputChan <- 1
Expand All @@ -121,12 +148,17 @@ func TestBatch_Time_WithIntialQuiet(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second)
testClock := clock.NewFakeClock(time.Now())
inputChan := make(chan int)
publishChan := make(chan []int)
result := newResultHolder()
batcher := NewBatcher[int](inputChan, defaultMaxItems, defaultMaxTimeOut, defaultItemCounterFunc, result.add)
batcher := NewBatcher[int](inputChan, defaultMaxItems, defaultMaxTimeOut, defaultItemCounterFunc, publishChan)
batcher.clock = testClock

go addToResult(result, publishChan)

go func() {
batcher.Run(ctx)
close(inputChan)
close(publishChan)
}()

// initial quiet period
Expand Down
Loading

0 comments on commit 421dc6d

Please sign in to comment.