Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DECO-79][DECO-165] Incremental sync with support for multiple profiles #82

Merged
merged 19 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 91 additions & 23 deletions cmd/sync/snapshot.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sync

import (
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -9,30 +10,93 @@ import (
"strings"
"time"

"crypto/md5"
"encoding/hex"

"github.com/databricks/bricks/git"
"github.com/databricks/bricks/project"
)

type snapshot map[string]time.Time
// A snapshot is a persistant store of knowledge bricks cli has about state of files
// in the remote repo. We use the last modified times (mtime) of files to determine
// whether a files need to be updated in the remote repo.
//
// 1. Any stale files in the remote repo are updated. That is if the last modified
// time recorded in the snapshot is less than the actual last modified time of the file
//
// 2. Any files present in snapshot but absent locally are deleted from remote path
//
// Changing either the databricks workspace (ie Host) or the remote path (ie RemotePath)
// local files are being synced to will make bricks cli switch to a different
// snapshot for persisting/loading sync state
type Snapshot struct {
pietern marked this conversation as resolved.
Show resolved Hide resolved
// hostname of the workspace this snapshot is for
Host string `json:"host"`
// Path in workspace for project repo
RemotePath string `json:"remote_path"`
// Map of all files present in the remote repo with the:
// key: relative file path from project root
// value: last time the remote instance of this file was updated
LastUpdatedTimes map[string]time.Time `json:"last_modified_times"`
}

type diff struct {
put []string
delete []string
}

const SyncSnapshotFile = "repo_snapshot.json"
const BricksDir = ".bricks"
const syncSnapshotDirName = "sync-snapshots"

func (s *snapshot) storeSnapshot(root string) error {
// create snapshot file
configDir := filepath.Join(root, BricksDir)
if _, err := os.Stat(configDir); os.IsNotExist(err) {
err = os.Mkdir(configDir, os.ModeDir|os.ModePerm)
// Compute path of the snapshot file on the local machine
// The file name for unique for a tuple of (host, remotePath)
// precisely it's the first 16 characters of md5(concat(host, remotePath))
func (s *Snapshot) getPath(ctx context.Context) (string, error) {
prj := project.Get(ctx)
cacheDir, err := prj.CacheDir()
if err != nil {
return "", err
}
snapshotDir := filepath.Join(cacheDir, syncSnapshotDirName)
if _, err := os.Stat(snapshotDir); os.IsNotExist(err) {
err = os.Mkdir(snapshotDir, os.ModeDir|os.ModePerm)
if err != nil {
return fmt.Errorf("failed to create config directory: %s", err)
return "", fmt.Errorf("failed to create config directory: %s", err)
}
}
persistedSnapshotPath := filepath.Join(configDir, SyncSnapshotFile)
f, err := os.OpenFile(persistedSnapshotPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0755)
hash := md5.Sum([]byte(s.Host + s.RemotePath))
hashString := hex.EncodeToString(hash[:])
shreyas-goenka marked this conversation as resolved.
Show resolved Hide resolved
return filepath.Join(snapshotDir, hashString[:16]+".json"), nil
}

func newSnapshot(ctx context.Context, remotePath string) (*Snapshot, error) {
prj := project.Get(ctx)

// Get host this snapshot is for
wsc := prj.WorkspacesClient()
if wsc == nil {
return nil, fmt.Errorf("failed to resolve workspaces client for project")
shreyas-goenka marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: The host may be late-initialized in certain Azure setups where we
// specify the workspace by its resource ID. tracked in: https://databricks.atlassian.net/browse/DECO-194
host := wsc.Config.Host
pietern marked this conversation as resolved.
Show resolved Hide resolved
if host == "" {
return nil, fmt.Errorf("failed to resolve host for snapshot")
}

return &Snapshot{
Host: host,
RemotePath: remotePath,
LastUpdatedTimes: make(map[string]time.Time),
}, nil
}

func (s *Snapshot) storeSnapshot(ctx context.Context) error {
snapshotPath, err := s.getPath(ctx)
if err != nil {
return err
}
f, err := os.OpenFile(snapshotPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create/open persisted sync snapshot file: %s", err)
}
Expand All @@ -50,24 +114,27 @@ func (s *snapshot) storeSnapshot(root string) error {
return nil
}

func (s *snapshot) loadSnapshot(root string) error {
persistedSnapshotPath := filepath.Join(root, BricksDir, SyncSnapshotFile)
if _, err := os.Stat(persistedSnapshotPath); os.IsNotExist(err) {
func (s *Snapshot) loadSnapshot(ctx context.Context) error {
snapshotPath, err := s.getPath(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken this will create the path even if you run with --persist-snapshot=false. It's counterintuitive that this would have a side effect. Can you do a follow up PR to only run mkdir's when we actually want to save the snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wouldn't happen because loadSnapshot / storeSnapshot are only called when persist-snapshot is true.

When this flag is false we don't load or store snapshots

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Please keep it in mind for future changes though; getters generally shouldn't have side effects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created ticket to track this, will send a quick followup to refactor out this side effect:

https://databricks.atlassian.net/browse/DECO-198

if err != nil {
return err
}
// Snapshot file not found. We do not load anything
if _, err := os.Stat(snapshotPath); os.IsNotExist(err) {
return nil
}

f, err := os.Open(persistedSnapshotPath)
f, err := os.Open(snapshotPath)
if err != nil {
return fmt.Errorf("failed to open persisted sync snapshot file: %s", err)
}
defer f.Close()

bytes, err := io.ReadAll(f)
if err != nil {
// clean up these error messages a bit
return fmt.Errorf("failed to read sync snapshot from disk: %s", err)
}
err = json.Unmarshal(bytes, s)
err = json.Unmarshal(bytes, &s)
if err != nil {
return fmt.Errorf("failed to json unmarshal persisted snapshot: %s", err)
}
Expand All @@ -92,34 +159,35 @@ func (d diff) String() string {
return strings.Join(changes, ", ")
}

func (s snapshot) diff(all []git.File) (change diff) {
func (s Snapshot) diff(all []git.File) (change diff) {
currentFilenames := map[string]bool{}
lastModifiedTimes := s.LastUpdatedTimes
for _, f := range all {
// create set of current files to figure out if removals are needed
currentFilenames[f.Relative] = true
// get current modified timestamp
modified := f.Modified()
lastSeenModified, seen := s[f.Relative]
lastSeenModified, seen := lastModifiedTimes[f.Relative]

if !seen || modified.After(lastSeenModified) {
change.put = append(change.put, f.Relative)
s[f.Relative] = modified
lastModifiedTimes[f.Relative] = modified
}
}
// figure out files in the snapshot, but not on local filesystem
for relative := range s {
for relative := range lastModifiedTimes {
_, exists := currentFilenames[relative]
if exists {
continue
}
// add them to a delete batch
change.delete = append(change.delete, relative)
// remove the file from snapshot
delete(s, relative)
delete(lastModifiedTimes, relative)
}
// and remove them from the snapshot
for _, v := range change.delete {
delete(s, v)
delete(lastModifiedTimes, v)
}
return
}
4 changes: 3 additions & 1 deletion cmd/sync/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func TestDiff(t *testing.T) {
fileSet := git.NewFileSet(projectDir)
files, err := fileSet.All()
assert.NoError(t, err)
state := snapshot{}
state := Snapshot{
LastUpdatedTimes: make(map[string]time.Time),
}
change := state.diff(files)

// New files are added to put
Expand Down
6 changes: 1 addition & 5 deletions cmd/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,8 @@ var syncCmd = &cobra.Command{
}

root := prj.Root()
fileSet := git.NewFileSet(root)
if err != nil {
return err
}
syncCallback := getRemoteSyncCallback(ctx, root, *remotePath, wsc)
err = spawnSyncRoutine(ctx, fileSet, *interval, syncCallback)
err = spawnSyncRoutine(ctx, *interval, syncCallback, *remotePath)
return err
},
}
Expand Down
30 changes: 16 additions & 14 deletions cmd/sync/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"sync"
"time"

"github.com/databricks/bricks/git"
"github.com/databricks/bricks/project"
"github.com/databricks/databricks-sdk-go/databricks/client"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/databricks/databricks-sdk-go/workspaces"
"golang.org/x/sync/errgroup"
)

// TODO: add .databricks to .gitignore on bricks init
type watchdog struct {
files git.FileSet
ticker *time.Ticker
wg sync.WaitGroup
failure error // data race? make channel?
Expand Down Expand Up @@ -104,28 +103,30 @@ func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *wor
}

func spawnSyncRoutine(ctx context.Context,
files git.FileSet,
interval time.Duration,
applyDiff func(diff) error) error {
applyDiff func(diff) error,
remotePath string) error {
w := &watchdog{
files: files,
ticker: time.NewTicker(interval),
}
w.wg.Add(1)
go w.main(ctx, applyDiff)
go w.main(ctx, applyDiff, remotePath)
w.wg.Wait()
return w.failure
}

// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error) {
func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error, remotePath string) {
defer w.wg.Done()
// load from json or sync it every time there's an action
state := snapshot{}
root := w.files.Root()
snapshot, err := newSnapshot(ctx, remotePath)
if err != nil {
log.Printf("[ERROR] cannot create snapshot: %s", err)
w.failure = err
return
}
if *persistSnapshot {
err := state.loadSnapshot(root)
err := snapshot.loadSnapshot(ctx)
if err != nil {
log.Printf("[ERROR] cannot load snapshot: %s", err)
w.failure = err
Expand All @@ -137,13 +138,14 @@ func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error) {
case <-ctx.Done():
return
case <-w.ticker.C:
all, err := w.files.All()
prj := project.Get(ctx)
all, err := prj.GetFileSet().All()
shreyas-goenka marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Printf("[ERROR] cannot list files: %s", err)
w.failure = err
return
}
change := state.diff(all)
change := snapshot.diff(all)
if change.IsEmpty() {
continue
}
Expand All @@ -154,7 +156,7 @@ func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error) {
return
}
if *persistSnapshot {
err = state.storeSnapshot(root)
err = snapshot.storeSnapshot(ctx)
if err != nil {
log.Printf("[ERROR] cannot store snapshot: %s", err)
w.failure = err
Expand Down
4 changes: 4 additions & 0 deletions git/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (w *FileSet) All() ([]File, error) {
return w.RecursiveListFiles(w.root)
}

func (w *FileSet) IsGitIgnored(pattern string) bool {
return w.ignore.MatchesPath(pattern)
}

// Recursively traverses dir in a depth first manner and returns a list of all files
// that are being tracked in the FileSet (ie not being ignored for matching one of the
// patterns in w.ignore)
Expand Down
Loading