Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for workunitbase and wrap fsnotify library calls #866

Merged
merged 3 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
})
}()
go cmdWaiter(cmd, doneChan)
go cw.monitorLocalStatus()
go cw.MonitorLocalStatus()

Check warning on line 222 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L222

Added line #L222 was not covered by tests

return nil
}
Expand Down Expand Up @@ -263,7 +263,7 @@
// Job never started - mark it failed
cw.UpdateBasicStatus(WorkStateFailed, "Pending at restart", stdoutSize(cw.UnitDir()))
}
go cw.monitorLocalStatus()
go cw.MonitorLocalStatus()

Check warning on line 266 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L266

Added line #L266 was not covered by tests

return nil
}
Expand Down Expand Up @@ -331,7 +331,7 @@
baseParams: cfg.Params,
allowRuntimeParams: cfg.AllowRuntimeParams,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

Check warning on line 334 in pkg/workceptor/command.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/command.go#L334

Added line #L334 was not covered by tests

return cw
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/workceptor/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newCommandWorker(w *Workceptor, unitID string, workType string) WorkUnit {
baseParams: "foo",
allowRuntimeParams: true,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

return cw
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/workceptor/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@
} else {
go kw.runWorkUsingLogger()
}
go kw.monitorLocalStatus()
go kw.MonitorLocalStatus()

Check warning on line 1275 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1275

Added line #L1275 was not covered by tests

return nil
}
Expand Down Expand Up @@ -1388,7 +1388,7 @@
deletePodOnRestart: cfg.DeletePodOnRestart,
namePrefix: fmt.Sprintf("%s-", strings.ToLower(cfg.WorkType)),
}
ku.BaseWorkUnit.Init(w, unitID, workType)
ku.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

Check warning on line 1391 in pkg/workceptor/kubernetes.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/kubernetes.go#L1391

Added line #L1391 was not covered by tests

return ku
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/workceptor/mock_workceptor/stdio_utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 76 additions & 0 deletions pkg/workceptor/mock_workceptor/workunitbase.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/workceptor/python.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
function: cfg.Function,
config: cfg.Config,
}
cw.BaseWorkUnit.Init(w, unitID, workType)
cw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

Check warning on line 66 in pkg/workceptor/python.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/python.go#L66

Added line #L66 was not covered by tests

return cw
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/workceptor/remote_work.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@

func newRemoteWorker(w *Workceptor, unitID, workType string) WorkUnit {
rw := &remoteUnit{logger: w.nc.GetLogger()}
rw.BaseWorkUnit.Init(w, unitID, workType)
rw.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

Check warning on line 683 in pkg/workceptor/remote_work.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/remote_work.go#L683

Added line #L683 was not covered by tests
red := &remoteExtraData{}
red.RemoteParams = make(map[string]string)
rw.status.ExtraData = red
Expand Down
6 changes: 6 additions & 0 deletions pkg/workceptor/stdio_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
Stat(name string) (os.FileInfo, error)
Open(name string) (*os.File, error)
RemoveAll(path string) error
}

// FileSystem represents the real filesystem.
Expand All @@ -36,6 +37,11 @@
return os.Open(name)
}

// RemoveAll removes path and any children it contains.
func (FileSystem) RemoveAll(path string) error {
return os.RemoveAll(path)

Check warning on line 42 in pkg/workceptor/stdio_utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/stdio_utils.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}

// FileWriteCloser wraps io.WriteCloser.
type FileWriteCloser interface {
io.WriteCloser
Expand Down
89 changes: 61 additions & 28 deletions pkg/workceptor/workunitbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@
WorkStateCanceled = 4
)

type WatcherWrapper interface {
AaronH88 marked this conversation as resolved.
Show resolved Hide resolved
Add(name string) error
Close() error
EventChannel() chan fsnotify.Event
}

type RealWatcher struct {
watcher *fsnotify.Watcher
}

func (rw *RealWatcher) Add(name string) error {
return rw.watcher.Add(name)

Check warning on line 46 in pkg/workceptor/workunitbase.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/workunitbase.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}

func (rw *RealWatcher) Close() error {
return rw.watcher.Close()

Check warning on line 50 in pkg/workceptor/workunitbase.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/workunitbase.go#L49-L50

Added lines #L49 - L50 were not covered by tests
}

func (rw *RealWatcher) EventChannel() chan fsnotify.Event {
return rw.watcher.Events

Check warning on line 54 in pkg/workceptor/workunitbase.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/workunitbase.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

// IsComplete returns true if a given WorkState indicates the job is finished.
func IsComplete(workState int) bool {
return workState == WorkStateSucceeded || workState == WorkStateFailed
Expand Down Expand Up @@ -74,10 +96,12 @@
lastUpdateErrorLock *sync.RWMutex
ctx context.Context
cancel context.CancelFunc
fs FileSystemer
watcher WatcherWrapper
}

// Init initializes the basic work unit data, in memory only.
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string) {
func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper) {
AaronH88 marked this conversation as resolved.
Show resolved Hide resolved
bwu.w = w
bwu.status.State = WorkStatePending
bwu.status.Detail = "Unit Created"
Expand All @@ -90,6 +114,17 @@
bwu.statusLock = &sync.RWMutex{}
bwu.lastUpdateErrorLock = &sync.RWMutex{}
bwu.ctx, bwu.cancel = context.WithCancel(w.ctx)
bwu.fs = fs
if watcher != nil {
bwu.watcher = watcher
} else {
watcher, err := fsnotify.NewWatcher()
if err == nil {
bwu.watcher = &RealWatcher{watcher: watcher}
} else {
bwu.watcher = nil
}

Check warning on line 126 in pkg/workceptor/workunitbase.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/workunitbase.go#L125-L126

Added lines #L125 - L126 were not covered by tests
}
}

// Error logs message with unitID prepended.
Expand Down Expand Up @@ -340,33 +375,29 @@
return bwu.lastUpdateError
}

// monitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
func (bwu *BaseWorkUnit) monitorLocalStatus() {
// MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.
func (bwu *BaseWorkUnit) MonitorLocalStatus() {
statusFile := path.Join(bwu.UnitDir(), "status")
watcher, err := fsnotify.NewWatcher()
if err == nil {
err = watcher.Add(statusFile)
var watcherEvents chan fsnotify.Event
watcherEvents = make(chan fsnotify.Event)

if bwu.watcher != nil {
err := bwu.watcher.Add(statusFile)
if err == nil {
defer func() {
_ = watcher.Close()
_ = bwu.watcher.Close()
}()
watcherEvents = bwu.watcher.EventChannel()
} else {
_ = watcher.Close()
watcher = nil
_ = bwu.watcher.Close()
bwu.watcher = nil
}
} else {
watcher = nil
}
fi, err := os.Stat(statusFile)
fi, err := bwu.fs.Stat(statusFile)
if err != nil {
fi = nil
}
var watcherEvents chan fsnotify.Event
if watcher == nil {
watcherEvents = make(chan fsnotify.Event)
} else {
watcherEvents = watcher.Events
}

loop:
for {
select {
Expand All @@ -380,14 +411,12 @@
}
}
case <-time.After(time.Second):
newFi, err := os.Stat(statusFile)
if err == nil {
if fi == nil || fi.ModTime() != newFi.ModTime() {
fi = newFi
err = bwu.Load()
if err != nil {
bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err)
}
newFi, err := bwu.fs.Stat(statusFile)
if err == nil && (fi == nil || fi.ModTime() != newFi.ModTime()) {
fi = newFi
err = bwu.Load()
if err != nil {
bwu.w.nc.GetLogger().Error("Error reading %s: %s", statusFile, err)

Check warning on line 419 in pkg/workceptor/workunitbase.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/workunitbase.go#L414-L419

Added lines #L414 - L419 were not covered by tests
}
}
}
Expand Down Expand Up @@ -425,7 +454,7 @@
defer bwu.statusLock.Unlock()
attemptsLeft := 3
for {
err := os.RemoveAll(bwu.UnitDir())
err := bwu.fs.RemoveAll(bwu.UnitDir())
if force {
break
} else if err != nil {
Expand All @@ -451,11 +480,15 @@
return nil
}

func (bwu *BaseWorkUnit) CancelContext() {
bwu.cancel()
}

// =============================================================================================== //

func newUnknownWorker(w *Workceptor, unitID string, workType string) WorkUnit {
uu := &unknownUnit{}
uu.BaseWorkUnit.Init(w, unitID, workType)
uu.BaseWorkUnit.Init(w, unitID, workType, FileSystem{}, nil)

Check warning on line 491 in pkg/workceptor/workunitbase.go

View check run for this annotation

Codecov / codecov/patch

pkg/workceptor/workunitbase.go#L491

Added line #L491 was not covered by tests

return uu
}
Expand Down
Loading
Loading