Skip to content

Commit

Permalink
Added a warning when Python wheel wrapper needs to be used (#807)
Browse files Browse the repository at this point in the history
## Changes
Added a warning when Python wheel wrapper needs to be used

## Tests
Added unit tests + manual run with different bundle configurations
  • Loading branch information
andrewnester authored Sep 27, 2023
1 parent 99cc01c commit 3ee89c4
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 0 deletions.
2 changes: 2 additions & 0 deletions bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/variable"
"github.com/databricks/cli/bundle/deploy/terraform"
"github.com/databricks/cli/bundle/python"
"github.com/databricks/cli/bundle/scripts"
)

Expand All @@ -31,6 +32,7 @@ func Initialize() bundle.Mutator {
mutator.OverrideCompute(),
mutator.ProcessTargetMode(),
mutator.TranslatePaths(),
python.WrapperWarning(),
terraform.Initialize(),
scripts.Execute(config.ScriptPostInit),
},
Expand Down
65 changes: 65 additions & 0 deletions bundle/python/warning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package python

import (
"context"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/cmdio"
"golang.org/x/mod/semver"
)

type wrapperWarning struct {
}

func WrapperWarning() bundle.Mutator {
return &wrapperWarning{}
}

func (m *wrapperWarning) Apply(ctx context.Context, b *bundle.Bundle) error {
if hasIncompatibleWheelTasks(ctx, b) {
cmdio.LogString(ctx, "Python wheel tasks with local libraries require compute with DBR 13.1+. Please change your cluster configuration or set experimental 'python_wheel_wrapper' setting to 'true'")
}
return nil
}

func hasIncompatibleWheelTasks(ctx context.Context, b *bundle.Bundle) bool {
tasks := libraries.FindAllWheelTasksWithLocalLibraries(b)
for _, task := range tasks {
if task.NewCluster != nil {
if lowerThanExpectedVersion(ctx, task.NewCluster.SparkVersion) {
return true
}
}

if task.JobClusterKey != "" {
for _, job := range b.Config.Resources.Jobs {
for _, cluster := range job.JobClusters {
if task.JobClusterKey == cluster.JobClusterKey && cluster.NewCluster != nil {
if lowerThanExpectedVersion(ctx, cluster.NewCluster.SparkVersion) {
return true
}
}
}
}
}
}

return false
}

func lowerThanExpectedVersion(ctx context.Context, sparkVersion string) bool {
parts := strings.Split(sparkVersion, ".")
if len(parts) < 2 {
return false
}

v := "v" + parts[0] + "." + parts[1]
return semver.Compare(v, "v13.1") < 0
}

// Name implements bundle.Mutator.
func (m *wrapperWarning) Name() string {
return "PythonWrapperWarning"
}
199 changes: 199 additions & 0 deletions bundle/python/warning_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package python

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)

func TestIncompatibleWheelTasksWithNewCluster(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
TaskKey: "key1",
PythonWheelTask: &jobs.PythonWheelTask{},
NewCluster: &compute.ClusterSpec{
SparkVersion: "12.2.x-scala2.12",
},
Libraries: []compute.Library{
{Whl: "./dist/test.whl"},
},
},
{
TaskKey: "key2",
PythonWheelTask: &jobs.PythonWheelTask{},
NewCluster: &compute.ClusterSpec{
SparkVersion: "13.1.x-scala2.12",
},
Libraries: []compute.Library{
{Whl: "./dist/test.whl"},
},
},
},
},
},
},
},
},
}

require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
}

func TestIncompatibleWheelTasksWithJobClusterKey(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
JobClusterKey: "cluster1",
NewCluster: &compute.ClusterSpec{
SparkVersion: "12.2.x-scala2.12",
},
},
{
JobClusterKey: "cluster2",
NewCluster: &compute.ClusterSpec{
SparkVersion: "13.1.x-scala2.12",
},
},
},
Tasks: []jobs.Task{
{
TaskKey: "key1",
PythonWheelTask: &jobs.PythonWheelTask{},
JobClusterKey: "cluster1",
Libraries: []compute.Library{
{Whl: "./dist/test.whl"},
},
},
{
TaskKey: "key2",
PythonWheelTask: &jobs.PythonWheelTask{},
JobClusterKey: "cluster2",
Libraries: []compute.Library{
{Whl: "./dist/test.whl"},
},
},
},
},
},
},
},
},
}

require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
}

func TestNoIncompatibleWheelTasks(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
JobClusterKey: "cluster1",
NewCluster: &compute.ClusterSpec{
SparkVersion: "12.2.x-scala2.12",
},
},
{
JobClusterKey: "cluster2",
NewCluster: &compute.ClusterSpec{
SparkVersion: "13.1.x-scala2.12",
},
},
},
Tasks: []jobs.Task{
{
TaskKey: "key1",
PythonWheelTask: &jobs.PythonWheelTask{},
NewCluster: &compute.ClusterSpec{
SparkVersion: "12.2.x-scala2.12",
},
Libraries: []compute.Library{
{Whl: "/Workspace/Users/me@me.com/dist/test.whl"},
},
},
{
TaskKey: "key2",
PythonWheelTask: &jobs.PythonWheelTask{},
NewCluster: &compute.ClusterSpec{
SparkVersion: "13.3.x-scala2.12",
},
Libraries: []compute.Library{
{Whl: "./dist/test.whl"},
},
},
{
TaskKey: "key3",
PythonWheelTask: &jobs.PythonWheelTask{},
NewCluster: &compute.ClusterSpec{
SparkVersion: "12.2.x-scala2.12",
},
Libraries: []compute.Library{
{Whl: "dbfs:/dist/test.whl"},
},
},
{
TaskKey: "key4",
PythonWheelTask: &jobs.PythonWheelTask{},
JobClusterKey: "cluster1",
Libraries: []compute.Library{
{Whl: "/Workspace/Users/me@me.com/dist/test.whl"},
},
},
{
TaskKey: "key5",
PythonWheelTask: &jobs.PythonWheelTask{},
JobClusterKey: "cluster2",
Libraries: []compute.Library{
{Whl: "./dist/test.whl"},
},
},
},
},
},
},
},
},
}

require.False(t, hasIncompatibleWheelTasks(context.Background(), b))
}

func TestSparkVersionLowerThanExpected(t *testing.T) {
testCases := map[string]bool{
"13.1.x-scala2.12": false,
"13.2.x-scala2.12": false,
"13.3.x-scala2.12": false,
"14.0.x-scala2.12": false,
"14.1.x-scala2.12": false,
"10.4.x-aarch64-photon-scala2.12": true,
"10.4.x-scala2.12": true,
"13.0.x-scala2.12": true,
"5.0.x-rc-gpu-ml-scala2.11": true,
}

for k, v := range testCases {
result := lowerThanExpectedVersion(context.Background(), k)
require.Equal(t, v, result, k)
}
}

0 comments on commit 3ee89c4

Please sign in to comment.