Skip to content

Commit

Permalink
Avoid multiple file tree traversals on bundle deploy (#1493)
Browse files Browse the repository at this point in the history
## Changes
To run bundle deploy from DBR we use an abstraction over the workspace
import / export APIs to create a `filer.Filer` and abstract the file
system. Walking the file tree in such a filer is expensive and requires
multiple API calls. This PR remove the two duplicate file tree walks
that happen by caching the result.
  • Loading branch information
shreyas-goenka authored Jun 17, 2024
1 parent a5e89fd commit 44e3928
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 81 deletions.
4 changes: 4 additions & 0 deletions bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/env"
"github.com/databricks/cli/bundle/metadata"
"github.com/databricks/cli/libs/fileset"
"github.com/databricks/cli/libs/folders"
"github.com/databricks/cli/libs/git"
"github.com/databricks/cli/libs/locker"
Expand Down Expand Up @@ -50,6 +51,9 @@ type Bundle struct {
clientOnce sync.Once
client *databricks.WorkspaceClient

// Files that are synced to the workspace.file_path
Files []fileset.File

// Stores an initialized copy of this bundle's Terraform wrapper.
Terraform *tfexec.Terraform

Expand Down
2 changes: 1 addition & 1 deletion bundle/deploy/files/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
return diag.FromErr(err)
}

err = sync.RunOnce(ctx)
b.Files, err = sync.RunOnce(ctx)
if err != nil {
return diag.FromErr(err)
}
Expand Down
16 changes: 2 additions & 14 deletions bundle/deploy/state_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/deploy/files"
"github.com/databricks/cli/internal/build"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/log"
Expand Down Expand Up @@ -40,19 +39,8 @@ func (s *stateUpdate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
state.CliVersion = build.GetInfo().Version
state.Version = DeploymentStateVersion

// Get the current file list.
sync, err := files.GetSync(ctx, bundle.ReadOnly(b))
if err != nil {
return diag.FromErr(err)
}

files, err := sync.GetFileList(ctx)
if err != nil {
return diag.FromErr(err)
}

// Update the state with the current file list.
fl, err := FromSlice(files)
// Update the state with the current list of synced files.
fl, err := FromSlice(b.Files)
if err != nil {
return diag.FromErr(err)
}
Expand Down
101 changes: 45 additions & 56 deletions bundle/deploy/state_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,23 @@ import (
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/internal/build"
"github.com/databricks/cli/internal/testutil"
databrickscfg "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/cli/libs/fileset"
"github.com/databricks/cli/libs/vfs"
"github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestStateUpdate(t *testing.T) {
s := &stateUpdate{}
func setupBundleForStateUpdate(t *testing.T) *bundle.Bundle {
tmpDir := t.TempDir()

testutil.Touch(t, tmpDir, "test1.py")
testutil.TouchNotebook(t, tmpDir, "test2.py")

files, err := fileset.New(vfs.MustNew(tmpDir)).All()
require.NoError(t, err)

b := &bundle.Bundle{
RootPath: t.TempDir(),
return &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Bundle: config.Bundle{
Target: "default",
Expand All @@ -37,22 +41,14 @@ func TestStateUpdate(t *testing.T) {
},
},
},
Files: files,
}
}

testutil.Touch(t, b.RootPath, "test1.py")
testutil.Touch(t, b.RootPath, "test2.py")

m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &databrickscfg.Config{
Host: "https://test.com",
}
b.SetWorkpaceClient(m.WorkspaceClient)

wsApi := m.GetMockWorkspaceAPI()
wsApi.EXPECT().GetStatusByPath(mock.Anything, "/files").Return(&workspace.ObjectInfo{
ObjectType: "DIRECTORY",
}, nil)
func TestStateUpdate(t *testing.T) {
s := &stateUpdate{}

b := setupBundleForStateUpdate(t)
ctx := context.Background()

diags := bundle.Apply(ctx, b, s)
Expand All @@ -63,7 +59,15 @@ func TestStateUpdate(t *testing.T) {
require.NoError(t, err)

require.Equal(t, int64(1), state.Seq)
require.Len(t, state.Files, 3)
require.Equal(t, state.Files, Filelist{
{
LocalPath: "test1.py",
},
{
LocalPath: "test2.py",
IsNotebook: true,
},
})
require.Equal(t, build.GetInfo().Version, state.CliVersion)

diags = bundle.Apply(ctx, b, s)
Expand All @@ -74,45 +78,22 @@ func TestStateUpdate(t *testing.T) {
require.NoError(t, err)

require.Equal(t, int64(2), state.Seq)
require.Len(t, state.Files, 3)
require.Equal(t, state.Files, Filelist{
{
LocalPath: "test1.py",
},
{
LocalPath: "test2.py",
IsNotebook: true,
},
})
require.Equal(t, build.GetInfo().Version, state.CliVersion)
}

func TestStateUpdateWithExistingState(t *testing.T) {
s := &stateUpdate{}

b := &bundle.Bundle{
RootPath: t.TempDir(),
Config: config.Root{
Bundle: config.Bundle{
Target: "default",
},
Workspace: config.Workspace{
StatePath: "/state",
FilePath: "/files",
CurrentUser: &config.User{
User: &iam.User{
UserName: "test-user",
},
},
},
},
}

testutil.Touch(t, b.RootPath, "test1.py")
testutil.Touch(t, b.RootPath, "test2.py")

m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &databrickscfg.Config{
Host: "https://test.com",
}
b.SetWorkpaceClient(m.WorkspaceClient)

wsApi := m.GetMockWorkspaceAPI()
wsApi.EXPECT().GetStatusByPath(mock.Anything, "/files").Return(&workspace.ObjectInfo{
ObjectType: "DIRECTORY",
}, nil)

b := setupBundleForStateUpdate(t)
ctx := context.Background()

// Create an existing state file.
Expand Down Expand Up @@ -144,6 +125,14 @@ func TestStateUpdateWithExistingState(t *testing.T) {
require.NoError(t, err)

require.Equal(t, int64(11), state.Seq)
require.Len(t, state.Files, 3)
require.Equal(t, state.Files, Filelist{
{
LocalPath: "test1.py",
},
{
LocalPath: "test2.py",
IsNotebook: true,
},
})
require.Equal(t, build.GetInfo().Version, state.CliVersion)
}
3 changes: 2 additions & 1 deletion cmd/bundle/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func newSyncCommand() *cobra.Command {
return s.RunContinuous(ctx)
}

return s.RunOnce(ctx)
_, err = s.RunOnce(ctx)
return err
}

return cmd
Expand Down
2 changes: 1 addition & 1 deletion cmd/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func New() *cobra.Command {
if f.watch {
err = s.RunContinuous(ctx)
} else {
err = s.RunOnce(ctx)
_, err = s.RunOnce(ctx)
}

s.Close()
Expand Down
21 changes: 13 additions & 8 deletions libs/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,36 +152,41 @@ func (s *Sync) notifyComplete(ctx context.Context, d diff) {
s.seq++
}

func (s *Sync) RunOnce(ctx context.Context) error {
// Upload all files in the file tree rooted at the local path configured in the
// SyncOptions to the remote path configured in the SyncOptions.
//
// Returns the list of files tracked (and synchronized) by the syncer during the run,
// and an error if any occurred.
func (s *Sync) RunOnce(ctx context.Context) ([]fileset.File, error) {
files, err := s.GetFileList(ctx)
if err != nil {
return err
return files, err
}

change, err := s.snapshot.diff(ctx, files)
if err != nil {
return err
return files, err
}

s.notifyStart(ctx, change)
if change.IsEmpty() {
s.notifyComplete(ctx, change)
return nil
return files, nil
}

err = s.applyDiff(ctx, change)
if err != nil {
return err
return files, err
}

err = s.snapshot.Save(ctx)
if err != nil {
log.Errorf(ctx, "cannot store snapshot: %s", err)
return err
return files, err
}

s.notifyComplete(ctx, change)
return nil
return files, nil
}

func (s *Sync) GetFileList(ctx context.Context) ([]fileset.File, error) {
Expand Down Expand Up @@ -231,7 +236,7 @@ func (s *Sync) RunContinuous(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
err := s.RunOnce(ctx)
_, err := s.RunOnce(ctx)
if err != nil {
return err
}
Expand Down

0 comments on commit 44e3928

Please sign in to comment.