Skip to content

Commit

Permalink
Replace vfs.Path with extension-aware filer when running on DBR (#1556
Browse files Browse the repository at this point in the history
)

## Changes

The FUSE mount of the workspace file system on DBR doesn't include file
extensions for notebooks. When these notebooks are checked into a
repository, they do have an extension. PR #1457 added a filer type that
is aware of this disparity and makes these notebooks show up as if they
do have these extensions.

This change swaps out the native `vfs.Path` with one that uses this
filer when running on DBR.

Follow up: consolidate between interfaces exported by `filer.Filer` and
`vfs.Path`.

## Tests

* Unit tests pass
* (Manually ran a snapshot build on DBR against a bundle with notebooks)

---------

Co-authored-by: Andrew Nester <andrew.nester@databricks.com>
  • Loading branch information
pietern and andrewnester authored Jul 3, 2024
1 parent b3c044c commit f14dded
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 0 deletions.
50 changes: 50 additions & 0 deletions bundle/config/mutator/configure_wsfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package mutator

import (
"context"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/env"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/vfs"
)

const envDatabricksRuntimeVersion = "DATABRICKS_RUNTIME_VERSION"

type configureWSFS struct{}

func ConfigureWSFS() bundle.Mutator {
return &configureWSFS{}
}

func (m *configureWSFS) Name() string {
return "ConfigureWSFS"
}

func (m *configureWSFS) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
root := b.BundleRoot.Native()

// The bundle root must be located in /Workspace/
if !strings.HasPrefix(root, "/Workspace/") {
return nil
}

// The executable must be running on DBR.
if _, ok := env.Lookup(ctx, envDatabricksRuntimeVersion); !ok {
return nil
}

// If so, swap out vfs.Path instance of the sync root with one that
// makes all Workspace File System interactions extension aware.
p, err := vfs.NewFilerPath(ctx, root, func(path string) (filer.Filer, error) {
return filer.NewWorkspaceFilesExtensionsClient(b.WorkspaceClient(), path)
})
if err != nil {
return diag.FromErr(err)
}

b.BundleRoot = p
return nil
}
4 changes: 4 additions & 0 deletions bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func Initialize() bundle.Mutator {
mutator.ProcessTargetMode(),
mutator.DefaultQueueing(),
mutator.ExpandPipelineGlobPaths(),

// Configure use of WSFS for reads if the CLI is running on Databricks.
mutator.ConfigureWSFS(),

mutator.TranslatePaths(),
python.WrapperWarning(),
permissions.ApplyBundlePermissions(),
Expand Down
66 changes: 66 additions & 0 deletions libs/vfs/filer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package vfs

import (
"context"
"io/fs"
"path/filepath"

"github.com/databricks/cli/libs/filer"
)

type filerPath struct {
ctx context.Context
path string
fs FS

construct func(path string) (filer.Filer, error)
}

func NewFilerPath(ctx context.Context, path string, construct func(path string) (filer.Filer, error)) (Path, error) {
f, err := construct(path)
if err != nil {
return nil, err
}

return &filerPath{
ctx: ctx,
path: path,
fs: filer.NewFS(ctx, f).(FS),

construct: construct,
}, nil
}

func (f filerPath) Open(name string) (fs.File, error) {
return f.fs.Open(name)
}

func (f filerPath) Stat(name string) (fs.FileInfo, error) {
return f.fs.Stat(name)
}

func (f filerPath) ReadDir(name string) ([]fs.DirEntry, error) {
return f.fs.ReadDir(name)
}

func (f filerPath) ReadFile(name string) ([]byte, error) {
return f.fs.ReadFile(name)
}

func (f filerPath) Parent() Path {
if f.path == "/" {
return nil
}

dir := filepath.Dir(f.path)
nf, err := NewFilerPath(f.ctx, dir, f.construct)
if err != nil {
panic(err)
}

return nf
}

func (f filerPath) Native() string {
return f.path
}
79 changes: 79 additions & 0 deletions libs/vfs/filer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package vfs

import (
"context"
"errors"
"io/fs"
"os"
"path/filepath"
"strings"
"testing"

"github.com/databricks/cli/libs/filer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFilerPath(t *testing.T) {
ctx := context.Background()
wd, err := os.Getwd()
require.NoError(t, err)

// Create a new filer-backed path.
p, err := NewFilerPath(ctx, filepath.FromSlash(wd), filer.NewLocalClient)
require.NoError(t, err)

// Open self.
f, err := p.Open("filer_test.go")
require.NoError(t, err)
defer f.Close()

// Run stat on self.
s, err := f.Stat()
require.NoError(t, err)
assert.Equal(t, "filer_test.go", s.Name())
assert.GreaterOrEqual(t, int(s.Size()), 128)

// Read some bytes.
buf := make([]byte, 1024)
_, err = f.Read(buf)
require.NoError(t, err)
assert.True(t, strings.HasPrefix(string(buf), "package vfs"))

// Open non-existent file.
_, err = p.Open("doesntexist_test.go")
assert.True(t, errors.Is(err, fs.ErrNotExist))

// Stat self.
s, err = p.Stat("filer_test.go")
require.NoError(t, err)
assert.Equal(t, "filer_test.go", s.Name())
assert.GreaterOrEqual(t, int(s.Size()), 128)

// Stat non-existent file.
_, err = p.Stat("doesntexist_test.go")
assert.True(t, errors.Is(err, fs.ErrNotExist))

// ReadDir self.
entries, err := p.ReadDir(".")
require.NoError(t, err)
assert.GreaterOrEqual(t, len(entries), 1)

// ReadDir non-existent directory.
_, err = p.ReadDir("doesntexist")
assert.True(t, errors.Is(err, fs.ErrNotExist))

// ReadFile self.
buf, err = p.ReadFile("filer_test.go")
require.NoError(t, err)
assert.True(t, strings.HasPrefix(string(buf), "package vfs"))

// ReadFile non-existent file.
_, err = p.ReadFile("doesntexist_test.go")
assert.True(t, errors.Is(err, fs.ErrNotExist))

// Parent self.
pp := p.Parent()
require.NotNil(t, pp)
assert.Equal(t, filepath.Join(pp.Native(), "vfs"), p.Native())
}

0 comments on commit f14dded

Please sign in to comment.