Skip to content

Commit

Permalink
Merge pull request #4 from wetware/feat/filesystem-stub
Browse files Browse the repository at this point in the history
Stub: mount IPFS unixfs to wasm guest's filesystem.
  • Loading branch information
lthibault authored Aug 4, 2024
2 parents 8050012 + 922d6bc commit cdb1423
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 129 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/libp2p/go-libp2p-kad-dht v0.25.2
github.com/lmittmann/tint v1.0.4
github.com/multiformats/go-multiaddr v0.12.3
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
github.com/tetratelabs/wazero v1.7.1
github.com/thejerf/suture/v4 v4.0.5
Expand Down Expand Up @@ -113,7 +114,6 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
Expand Down
65 changes: 0 additions & 65 deletions guest/guest.go

This file was deleted.

87 changes: 87 additions & 0 deletions system/fs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package system

import (
"context"
"io/fs"

"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/path"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/pkg/errors"
)

var _ fs.FS = (*FS)(nil)

// An FS provides access to a hierarchical file system.
//
// The FS interface is the minimum implementation required of the file system.
// A file system may implement additional interfaces,
// such as [ReadFileFS], to provide additional or optimized functionality.
//
// [testing/fstest.TestFS] may be used to test implementations of an FS for
// correctness.
type FS struct {
API iface.UnixfsAPI
Path path.Path
}

// Open opens the named file.
//
// When Open returns an error, it should be of type *PathError
// with the Op field set to "open", the Path field set to name,
// and the Err field describing the problem.
//
// Open should reject attempts to open names that do not satisfy
// fs.ValidPath(name), returning a *fs.PathError with Err set to
// fs.ErrInvalid or fs.ErrNotExist.
func (f FS) Open(name string) (fs.File, error) {
p, err := path.Join(f.Path, name)
if err != nil {
return nil, &fs.PathError{
Op: "path.Join",
Path: name,
Err: err,
}
}

if !fs.ValidPath(p.String()) {
return nil, &fs.PathError{
Op: "fs.ValidPath",
Path: name,
Err: errors.New("invalid path"),
}
}

node, err := f.API.Get(context.TODO(), p)
if err != nil {
return nil, err
}

switch n := node.(type) {
case files.File:
return fileNode{File: n}, nil

case files.Directory:
defer n.Close()
return nil, &fs.PathError{
Op: "FS.Open",
Path: name,
Err: errors.New("node is a directory"),
}

default:
panic(n) // unhandled type
}
}

// fileNode provides access to a single file. The fs.File interface is the minimum
// implementation required of the file. Directory files should also implement [ReadDirFile].
// A file may implement io.ReaderAt or io.Seeker as optimizations.

type fileNode struct {
files.File
}

func (n fileNode) Stat() (fs.FileInfo, error) {
return nil, errors.New("fileNode.Stat::NOT IMPLEMENTED")
}
16 changes: 16 additions & 0 deletions system/fs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package system_test

import (
"testing"
"testing/fstest"

"github.com/stretchr/testify/require"
"github.com/wetware/go/system"
)

func TestFS(t *testing.T) {
t.Parallel()

err := fstest.TestFS(system.FS{})
require.NoError(t, err)
}
128 changes: 65 additions & 63 deletions ww.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package ww

import (
"context"
"crypto/rand"
"errors"
"fmt"
"log/slog"
"io"
"os"
"reflect"

"capnproto.org/go/capnp/v3/rpc"
"github.com/ipfs/boxo/files"
"github.com/ipfs/boxo/path"
iface "github.com/ipfs/kubo/core/coreiface"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/tetratelabs/wazero"
wasi "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
"github.com/thejerf/suture/v4"
"github.com/wetware/go/guest"
"github.com/wetware/go/system"
"github.com/wetware/go/vat"
)

const Proto = "/ww/0.0.0"
Expand Down Expand Up @@ -45,81 +47,81 @@ type Cluster struct {
}

func (c Cluster) String() string {
peer := c.Host.ID()
return fmt.Sprintf("Cluster{peer=%s}", peer)
return c.Config.NS
}

// Serve the cluster's root process
func (c Cluster) Serve(ctx context.Context) error {
if c.Router == nil {
slog.WarnContext(ctx, "started with null router",
"ns", c.NS)
return nil
}

if err := c.Router.Bootstrap(ctx); err != nil {
root, err := c.IPFS.Name().Resolve(ctx, c.NS)
if err != nil {
return err
}

root, err := c.IPFS.Name().Resolve(ctx, c.NS)
node, err := c.IPFS.Unixfs().Get(ctx, root)
if err != nil {
return err
}
defer node.Close()

return c.ServeVat(ctx, root)
}

func (c Cluster) ServeVat(ctx context.Context, root path.Path) error {
r := wazero.NewRuntimeWithConfig(ctx, wazero.NewRuntimeConfig().
WithMemoryLimitPages(1024). // 64MB
WithCloseOnContextDone(true).
WithDebugInfoEnabled(c.Debug))
WithCloseOnContextDone(true))
defer r.Close(ctx)

cl, err := wasi.Instantiate(ctx, r)
if err != nil {
return err
}
defer cl.Close(ctx)
switch n := node.(type) {
case files.File:
// assume it's a WASM file; run it
body := io.LimitReader(n, 2<<32)
b, err := io.ReadAll(body)
if err != nil {
return err
}

sys, err := system.Builder{
// Host: c.Host,
// IPFS: c.IPFS,
}.Instantiate(ctx, r)
if err != nil {
return err
}
defer sys.Close(ctx)
r := wazero.NewRuntimeWithConfig(ctx, wazero.NewRuntimeConfig().
WithCloseOnContextDone(true))
defer r.Close(ctx)

mod, err := guest.Config{
IPFS: c.IPFS,
Root: root,
Sys: sys,
}.Instanatiate(ctx, r)
if err != nil {
return err
}
defer mod.Close(ctx)

// Obtain the system client. This gives us an API to our root
// process.
client := sys.Boot(ctx, mod)
defer client.Release()

net := vat.Config{
Host: c.Host,
Proto: vat.ProtoFromModule(mod),
}.Build(ctx)
defer net.Release()

for {
if conn, err := net.Accept(ctx, &rpc.Options{
BootstrapClient: client.AddRef(),
Network: net,
}); err == nil {
go net.ServeConn(ctx, conn)
} else {
cl, err := wasi_snapshot_preview1.Instantiate(ctx, r)
if err != nil {
return err
}
defer cl.Close(ctx)

cm, err := r.CompileModule(ctx, b)
if err != nil {
return err
}
defer cm.Close(ctx)

mod, err := r.InstantiateModule(ctx, cm, wazero.NewModuleConfig().
// WithArgs().
// WithEnv().
// WithNanosleep().
// WithNanotime().
// WithOsyield().
// WithSysNanosleep().
// WithSysNanotime().
// WithSysWalltime().
// WithWalltime().
WithStartFunctions().
WithFS(system.FS{API: c.IPFS.Unixfs()}).
WithRandSource(rand.Reader).
WithStdin(os.Stdin).
WithStderr(os.Stderr).
WithStdout(os.Stdout).
WithName(c.NS))
if err != nil {
return err
}
defer mod.Close(ctx)

_, err = mod.ExportedFunction("_start").Call(ctx)
return err

case files.Directory:
// TODO: look for a main.wasm and execute it
return errors.New("Cluster.Serve::TODO:implement directory handler")

default:
return fmt.Errorf("unhandled type: %s", reflect.TypeOf(n))
}
}

0 comments on commit cdb1423

Please sign in to comment.