Skip to content

Commit

Permalink
Update sobek to fix interrupt with async code
Browse files Browse the repository at this point in the history
Also fix it through the codebase, although experimental/streams and fs
likely will need more work.
  • Loading branch information
mstoykov committed Oct 25, 2024
1 parent 461c779 commit 192a49e
Show file tree
Hide file tree
Showing 17 changed files with 140 additions and 90 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/pprof v0.0.0-20230728192033-2ba5b33183c6 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grafana/sobek v0.0.0-20241023145759-2dc9daf5bfa2
github.com/grafana/sobek v0.0.0-20241024150027-d91f02b05e9b
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/sobek v0.0.0-20241023145759-2dc9daf5bfa2 h1:qgthy9RbAIxinOmXaB9nSaT/w00VTqeEQ7JI0a+ScUU=
github.com/grafana/sobek v0.0.0-20241023145759-2dc9daf5bfa2/go.mod h1:FmcutBFPLiGgroH42I4/HBahv7GxVjODcVWFTw1ISes=
github.com/grafana/sobek v0.0.0-20241024150027-d91f02b05e9b h1:hzfIt1lf19Zx1jIYdeHvuWS266W+jL+7dxbpvH2PZMQ=
github.com/grafana/sobek v0.0.0-20241024150027-d91f02b05e9b/go.mod h1:FmcutBFPLiGgroH42I4/HBahv7GxVjODcVWFTw1ISes=
github.com/grafana/xk6-browser v1.8.5 h1:dNAG8dhcaEx/HOELEnGzAw8ShCvkpukfyTGUhebZsj0=
github.com/grafana/xk6-browser v1.8.5/go.mod h1:yCtZ4G8U/imVBikBO4HJoMyNoejmECcJk4CK5XGSxis=
github.com/grafana/xk6-dashboard v0.7.5 h1:TcILyffT/Ea/XD7xG1jMA5lwtusOPRbEQsQDHmO30Mk=
Expand Down
4 changes: 3 additions & 1 deletion js/modules/cjsmodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ func (cmi *cjsModuleInstance) HasTLA() bool { return false }

func (cmi *cjsModuleInstance) RequestedModules() []string { return cmi.w.RequestedModules() }

func (cmi *cjsModuleInstance) ExecuteModule(rt *sobek.Runtime, _, _ func(any)) (sobek.CyclicModuleInstance, error) {
func (cmi *cjsModuleInstance) ExecuteModule(
rt *sobek.Runtime, _, _ func(any) error,
) (sobek.CyclicModuleInstance, error) {
v, err := rt.RunProgram(cmi.w.prg)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion js/modules/gomodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type goModuleInstance struct {
defaultExport sobek.Value
}

func (gmi *goModuleInstance) ExecuteModule(_ *sobek.Runtime, _, _ func(any)) (sobek.CyclicModuleInstance, error) {
func (gmi *goModuleInstance) ExecuteModule(_ *sobek.Runtime, _, _ func(any) error) (sobek.CyclicModuleInstance, error) {
return gmi, nil
}
func (gmi *goModuleInstance) HasTLA() bool { return false }
Expand Down
4 changes: 3 additions & 1 deletion js/modules/gomodule_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (bgmi *basicGoModuleInstance) GetBindingValue(n string) sobek.Value {

func (bgmi *basicGoModuleInstance) HasTLA() bool { return false }

func (bgmi *basicGoModuleInstance) ExecuteModule(_ *sobek.Runtime, _, _ func(any)) (sobek.CyclicModuleInstance, error) {
func (bgmi *basicGoModuleInstance) ExecuteModule(
_ *sobek.Runtime, _, _ func(any) error,
) (sobek.CyclicModuleInstance, error) {
return bgmi, nil
}
52 changes: 23 additions & 29 deletions js/modules/k6/experimental/fs/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.k6.io/k6/lib/fsext"

"github.com/grafana/sobek"

"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/js/promises"
Expand Down Expand Up @@ -218,25 +219,25 @@ func (f *File) Stat() *sobek.Promise {
//
// It is possible for a read to successfully return with 0 bytes.
// This does not indicate EOF.
func (f *File) Read(into sobek.Value) *sobek.Promise {
func (f *File) Read(into sobek.Value) (*sobek.Promise, error) {
promise, resolve, reject := f.vu.Runtime().NewPromise()

if common.IsNullish(into) {
reject(newFsError(TypeError, "read() failed; reason: into argument cannot be null or undefined"))
return promise
err := reject(newFsError(TypeError, "read() failed; reason: into argument cannot be null or undefined"))
return promise, err
}

intoObj := into.ToObject(f.vu.Runtime())
if !isUint8Array(f.vu.Runtime(), intoObj) {
reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise
err := reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise, err
}

// Obtain the underlying ArrayBuffer from the Uint8Array
ab, ok := intoObj.Get("buffer").Export().(sobek.ArrayBuffer)
if !ok {
reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise
err := reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise, err
}

// To avoid concurrency linked to modifying the runtime's `into` buffer from multiple
Expand All @@ -257,8 +258,7 @@ func (f *File) Read(into sobek.Value) *sobek.Promise {
// Read was successful, resolve early with the number of
// bytes read.
if readErr == nil {
resolve(n)
return nil
return resolve(n)
}

// If the read operation failed, we need to check if it was an io.EOF error
Expand All @@ -270,67 +270,61 @@ func (f *File) Read(into sobek.Value) *sobek.Promise {
var fsErr *fsError
isFSErr := errors.As(readErr, &fsErr)
if !isFSErr {
reject(readErr)
return nil
return reject(readErr)
}

if fsErr.kind == EOFError && n == 0 {
resolve(sobek.Null())
} else {
resolve(n)
return resolve(sobek.Null())
}

return nil
return resolve(n)
})
}()

return promise
return promise, nil
}

// Seek seeks to the given `offset` in the file, under the given `whence` mode.
//
// The returned promise resolves to the new `offset` (position) within the file, which
// is expressed in bytes from the selected start, current, or end position depending
// the provided `whence`.
func (f *File) Seek(offset sobek.Value, whence sobek.Value) *sobek.Promise {
func (f *File) Seek(offset sobek.Value, whence sobek.Value) (*sobek.Promise, error) {
promise, resolve, reject := f.vu.Runtime().NewPromise()

intOffset, err := exportInt(offset)
if err != nil {
reject(newFsError(TypeError, "seek() failed; reason: the offset argument "+err.Error()))
return promise
err := reject(newFsError(TypeError, "seek() failed; reason: the offset argument "+err.Error()))
return promise, err
}

intWhence, err := exportInt(whence)
if err != nil {
reject(newFsError(TypeError, "seek() failed; reason: the whence argument "+err.Error()))
return promise
err := reject(newFsError(TypeError, "seek() failed; reason: the whence argument "+err.Error()))
return promise, err
}

seekMode := SeekMode(intWhence)
switch seekMode {
case SeekModeStart, SeekModeCurrent, SeekModeEnd:
// Valid modes, do nothing.
default:
reject(newFsError(TypeError, "seek() failed; reason: the whence argument must be a SeekMode"))
return promise
err := reject(newFsError(TypeError, "seek() failed; reason: the whence argument must be a SeekMode"))
return promise, err
}

callback := f.vu.RegisterCallback()
go func() {
newOffset, err := f.ReadSeekStater.Seek(intOffset, seekMode)
callback(func() error {
if err != nil {
reject(err)
return err
return reject(err)
}

resolve(newOffset)
return nil
return resolve(newOffset)
})
}()

return promise
return promise, nil
}

func isUint8Array(rt *sobek.Runtime, o *sobek.Object) bool {
Expand Down
12 changes: 10 additions & 2 deletions js/modules/k6/experimental/streams/goja.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,29 @@ import (
"reflect"

"github.com/grafana/sobek"

"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
)

// newResolvedPromise instantiates a new resolved promise.
func newResolvedPromise(vu modules.VU, with sobek.Value) *sobek.Promise {
promise, resolve, _ := vu.Runtime().NewPromise()
resolve(with)
err := resolve(with)
if err != nil { // TODO(@mstoykov): likely better to actually call Promise.resolve directly
panic(err)
}

return promise
}

// newRejectedPromise instantiates a new rejected promise.
func newRejectedPromise(vu modules.VU, with any) *sobek.Promise {
promise, _, reject := vu.Runtime().NewPromise()
reject(with)
err := reject(with)
if err != nil {
panic(err)
}
return promise
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streams

import (
"github.com/grafana/sobek"

"go.k6.io/k6/js/common"
)

Expand Down Expand Up @@ -63,15 +64,25 @@ func (reader *ReadableStreamDefaultReader) Read() *sobek.Promise {
readRequest := ReadRequest{
chunkSteps: func(chunk any) {
// Resolve promise with «[ "value" → chunk, "done" → false ]».
resolve(map[string]any{"value": chunk, "done": false})
// TODO(@mstoykov): propagate as error?
err := resolve(map[string]any{"value": chunk, "done": false})
if err != nil {
panic(err)
}
},
closeSteps: func() {
// Resolve promise with «[ "value" → undefined, "done" → true ]».
resolve(map[string]any{"value": sobek.Undefined(), "done": true})
err := resolve(map[string]any{"value": sobek.Undefined(), "done": true})
if err != nil {
panic(err)
}
},
errorSteps: func(e any) {
// Reject promise with e.
reject(e)
err := reject(e)
if err != nil {
panic(err)
}
},
}

Expand Down
33 changes: 23 additions & 10 deletions js/modules/k6/experimental/streams/readable_stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streams

import (
"github.com/grafana/sobek"

"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
)
Expand Down Expand Up @@ -35,10 +36,10 @@ type ReadableStreamGenericReader interface {
SetStream(stream *ReadableStream)

// GetClosed returns a [sobek.Promise] that resolves when the stream is closed.
GetClosed() (p *sobek.Promise, resolve func(any), reject func(any))
GetClosed() (p *sobek.Promise, resolve, reject func(any) error)

// SetClosed sets the [sobek.Promise] that resolves when the stream is closed.
SetClosed(p *sobek.Promise, resolve func(any), reject func(any))
SetClosed(p *sobek.Promise, resolve, reject func(any) error)

// Cancel returns a [sobek.Promise] that resolves when the stream is canceled.
Cancel(reason sobek.Value) *sobek.Promise
Expand All @@ -47,8 +48,8 @@ type ReadableStreamGenericReader interface {
// BaseReadableStreamReader is a base implement
type BaseReadableStreamReader struct {
closedPromise *sobek.Promise
closedPromiseResolveFunc func(resolve any)
closedPromiseRejectFunc func(reason any)
closedPromiseResolveFunc func(resolve any) error
closedPromiseRejectFunc func(reason any) error

// stream is a [ReadableStream] instance that owns this reader
stream *ReadableStream
Expand All @@ -73,12 +74,12 @@ func (reader *BaseReadableStreamReader) SetStream(stream *ReadableStream) {
}

// GetClosed returns the reader's closed promise as well as its resolve and reject functions.
func (reader *BaseReadableStreamReader) GetClosed() (p *sobek.Promise, resolve func(any), reject func(any)) {
func (reader *BaseReadableStreamReader) GetClosed() (p *sobek.Promise, resolve, reject func(any) error) {
return reader.closedPromise, reader.closedPromiseResolveFunc, reader.closedPromiseRejectFunc
}

// SetClosed sets the reader's closed promise as well as its resolve and reject functions.
func (reader *BaseReadableStreamReader) SetClosed(p *sobek.Promise, resolve func(any), reject func(any)) {
func (reader *BaseReadableStreamReader) SetClosed(p *sobek.Promise, resolve, reject func(any) error) {
reader.closedPromise = p
reader.closedPromiseResolveFunc = resolve
reader.closedPromiseRejectFunc = reject
Expand Down Expand Up @@ -133,7 +134,10 @@ func (reader *BaseReadableStreamReader) release() {

// 4. If stream.[[state]] is "readable", reject reader.[[closedPromise]] with a TypeError exception.
if stream.state == ReadableStreamStateReadable {
reader.closedPromiseRejectFunc(newTypeError(reader.runtime, "stream is readable").Err())
err := reader.closedPromiseRejectFunc(newTypeError(reader.runtime, "stream is readable").Err())
if err != nil {
panic(err)
}
} else { // 5. Otherwise, set reader.[[closedPromise]] to a promise rejected with a TypeError exception.
reader.closedPromise = newRejectedPromise(stream.vu, newTypeError(reader.runtime, "stream is not readable").Err())
}
Expand Down Expand Up @@ -196,7 +200,10 @@ func ReadableStreamReaderGenericInitialize(reader ReadableStreamGenericReader, s
// 4. Otherwise, if stream.[[state]] is "closed",
case ReadableStreamStateClosed:
// 4.1 Set reader.[[closedPromise]] to a promise resolved with undefined.
resolve(sobek.Undefined())
err := resolve(sobek.Undefined())
if err != nil {
panic(err) // TODO(@mstoykov): probably better to move them out as errors
}
// 5. Otherwise,
default:
// 5.1 Assert: stream.[[state]] is "errored".
Expand All @@ -206,9 +213,15 @@ func ReadableStreamReaderGenericInitialize(reader ReadableStreamGenericReader, s

// 5.2 Set reader.[[closedPromise]] to a promise rejected with stream.[[storedError]].
if jsErr, ok := stream.storedError.(*jsError); ok {
reject(jsErr.Err())
err := reject(jsErr.Err())
if err != nil {
panic(err)
}
} else {
reject(errToObj(stream.runtime, stream.storedError))
err := reject(errToObj(stream.runtime, stream.storedError))
if err != nil {
panic(err)
}
}

// 5.3 Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
Expand Down
19 changes: 12 additions & 7 deletions js/modules/k6/experimental/streams/readable_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"

"github.com/grafana/sobek"

"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/js/promises"
Expand Down Expand Up @@ -456,7 +457,10 @@ func (stream *ReadableStream) close() {
}

_, resolveFunc, _ := genericReader.GetClosed()
resolveFunc(sobek.Undefined())
err := resolveFunc(sobek.Undefined())
if err != nil {
panic(err) // TODO(@mstoykov): propagate as error instead
}

// 6. If reader implements ReadableStreamDefaultReader,
defaultReader, ok := reader.(*ReadableStreamDefaultReader)
Expand Down Expand Up @@ -503,19 +507,20 @@ func (stream *ReadableStream) error(e any) {
}

// 6. Reject reader.[[closedPromise]] with e.
var err error
promise, _, rejectFunc := genericReader.GetClosed()
if jsErr, ok := e.(*jsError); ok {
rejectFunc(jsErr.Err())
err = rejectFunc(jsErr.Err())
} else {
rejectFunc(e)
err = rejectFunc(e)
}
if err != nil {
panic(err) // TODO(@mstoykov): propagate as error instead
}

// 7. Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
// See https://github.com/dop251/goja/issues/565
var (
err error
doNothing = func(sobek.Value) {}
)
doNothing := func(sobek.Value) {}
_, err = promiseThen(stream.vu.Runtime(), promise, doNothing, doNothing)
if err != nil {
common.Throw(stream.vu.Runtime(), newError(RuntimeError, err.Error()))
Expand Down
Loading

0 comments on commit 192a49e

Please sign in to comment.