Skip to content

Commit

Permalink
Rework rpc messages
Browse files Browse the repository at this point in the history
  • Loading branch information
bep committed Aug 7, 2024
1 parent 32d4a94 commit 7eef0f3
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 108 deletions.
4 changes: 2 additions & 2 deletions internal/warpc/js/greet.bundle.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/warpc/js/greet.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { readInput, writeOutput } from './common';

const greet = function (input) {
writeOutput({ header: input.header, greeting: 'Hello ' + input.name + '!' });
writeOutput({ header: input.header, data: { greeting: 'Hello ' + input.data.name + '!' } });
};

readInput(greet);
2 changes: 1 addition & 1 deletion internal/warpc/js/renderkatex.bundle.js

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions internal/warpc/js/renderkatex.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ import { readInput, writeOutput } from './common';
import katex from 'katex';

const render = function (input) {
const expression = input.expression;
const id = input.id;
delete input.expression;
delete input.id;
input.throwOnError = false;
writeOutput({ id: id, output: katex.renderToString(expression, input) });
const data = input.data;
const expression = data.expression;
const options = data.options;
options.throwOnError = true;
writeOutput({ header: input.header, data: { output: katex.renderToString(expression, options) } });
};

(() => {
readInput(render);
})();
readInput(render);
16 changes: 5 additions & 11 deletions internal/warpc/katex.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,15 @@ func StartKatex() (Dispatcher[KatexInput, KatexOutput], error) {

// See https://katex.org/docs/options.html
type KatexInput struct {
ID uint32 `json:"id"`
Expression string `json:"expression"`
Expression string `json:"expression"`
Options KatexOptions `json:"options"`
}

type KatexOptions struct {
Output string `json:"output"` // html, mathml, htmlAndMathml (default)
DisplayMode bool `json:"displayMode"`
}

type KatexOutput struct {
ID uint32 `json:"id"`
Output string `json:"output"`
}

func (k KatexOutput) GetID() uint32 {
return k.ID
}

func (k KatexInput) GetID() uint32 {
return k.ID
}
41 changes: 27 additions & 14 deletions internal/warpc/warpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"os"
"strings"
"sync"
"sync/atomic"
Expand All @@ -32,12 +33,22 @@ type Header struct {
ID uint32 `json:"id"`
}

type Message[T any] struct {
Header Header `json:"header"`
Data T `json:"data"`
}

func (m Message[T]) GetID() uint32 {
return m.Header.ID
}

// TODO1 remove me.
type IDGetter interface {
GetID() uint32
}

type Dispatcher[Q, R IDGetter] interface {
Execute(ctx context.Context, q Q) (R, error)
type Dispatcher[Q, R any] interface {
Execute(ctx context.Context, q Message[Q]) (Message[R], error)
Close() error
}

Expand All @@ -50,8 +61,8 @@ func (p *dispatcherPool[Q, R]) Close() error {
return p.close()
}

type dispatcher[Q, R IDGetter] struct {
zero R
type dispatcher[Q, R any] struct {
zero Message[R]

mu sync.RWMutex
encMu sync.Mutex
Expand Down Expand Up @@ -96,7 +107,7 @@ func putTimer(t *time.Timer) {
}

// Execute sends a request to the dispatcher and waits for the response.
func (p *dispatcherPool[Q, R]) Execute(ctx context.Context, q Q) (R, error) {
func (p *dispatcherPool[Q, R]) Execute(ctx context.Context, q Message[Q]) (Message[R], error) {
d := p.getDispatcher()
if q.GetID() == 0 {
return d.zero, errors.New("ID must not be 0 (note that this must be unique within the current request set time window)")
Expand Down Expand Up @@ -127,7 +138,7 @@ func (p *dispatcherPool[Q, R]) Execute(ctx context.Context, q Q) (R, error) {
return call.Response, nil
}

func (d *dispatcher[Q, R]) newCall(q Q) (*call[Q, R], error) {
func (d *dispatcher[Q, R]) newCall(q Message[Q]) (*call[Q, R], error) {
call := &call[Q, R]{
Done: make(chan *call[Q, R], 1),
Request: q,
Expand Down Expand Up @@ -167,7 +178,7 @@ func (d *dispatcher[Q, R]) input() {
var inputErr error

for d.inOut.dec.More() {
var r R
var r Message[R]
if err := d.inOut.dec.Decode(&r); err != nil {
inputErr = err
break
Expand Down Expand Up @@ -207,8 +218,8 @@ func (d *dispatcher[Q, R]) input() {
}

type call[Q, R any] struct {
Request Q
Response R
Request Message[Q]
Response Message[R]
Error error
Done chan *call[Q, R]
}
Expand Down Expand Up @@ -262,7 +273,8 @@ type CompiledModule struct {
Module wazero.CompiledModule
}

func Start[Q, R IDGetter](opts Options) (Dispatcher[Q, R], error) {
// TODO1 do a test release build to verify that this compiles for all OSes.c
func Start[Q, R any](opts Options) (Dispatcher[Q, R], error) {
if opts.Main.Data == nil {
return nil, errors.New("Main.Data must be set")
}
Expand All @@ -281,13 +293,13 @@ func Start[Q, R IDGetter](opts Options) (Dispatcher[Q, R], error) {
return newDispatcher[Q, R](opts)
}

type dispatcherPool[Q, R IDGetter] struct {
type dispatcherPool[Q, R any] struct {
counter atomic.Uint32
dispatchers []*dispatcher[Q, R]
close func() error
}

func newDispatcher[Q, R IDGetter](opts Options) (*dispatcherPool[Q, R], error) {
func newDispatcher[Q, R any](opts Options) (*dispatcherPool[Q, R], error) {
if opts.Ctx == nil {
opts.Ctx = context.Background()
}
Expand Down Expand Up @@ -350,7 +362,8 @@ func newDispatcher[Q, R IDGetter](opts Options) (*dispatcherPool[Q, R], error) {
for _, c := range inOuts {
c := c
g.Go(func() error {
configBase := wazero.NewModuleConfig().WithStdout(c.stdout).WithStdin(c.stdin).WithStartFunctions()
ctx := context.WithoutCancel(ctx)
configBase := wazero.NewModuleConfig().WithStderr(os.Stderr).WithStdout(c.stdout).WithStdin(c.stdin).WithStartFunctions()
if opts.Runtime.Data != nil {
// This needs to be anonymous, it will be resolved in the import resolver below.
runtimeInstance, err := r.InstantiateModule(ctx, runtimeModule, configBase.WithName(""))
Expand All @@ -367,7 +380,7 @@ func newDispatcher[Q, R IDGetter](opts Options) (*dispatcherPool[Q, R], error) {
)
}

mainInstance, err := r.InstantiateModule(ctx, mainModule, configBase.WithName("").WithStdout(c.stdout).WithStdin(c.stdin))
mainInstance, err := r.InstantiateModule(ctx, mainModule, configBase.WithName(""))
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 7eef0f3

Please sign in to comment.