Skip to content

Commit

Permalink
storage/memstore: update to match the new interfaces.
Browse files Browse the repository at this point in the history
Most usages change from a one-liner to another one-liner.

Some tests get a bit more involved, because they were actually
using the callback structure to hook introspections up.
  • Loading branch information
warpfork committed Oct 14, 2021
1 parent 55a4896 commit 273362c
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 45 deletions.
4 changes: 2 additions & 2 deletions adl/rot13adl/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func ExampleReify_loadingToADL() {
}}
linkSystem := cidlink.DefaultLinkSystem()
storage := &memstore.Store{}
linkSystem.StorageReadOpener = storage.OpenRead
linkSystem.StorageWriteOpener = storage.OpenWrite
linkSystem.SetReadStorage(storage)
linkSystem.SetWriteStorage(storage)
linkSystem.NodeReifier = func(_ linking.LinkContext, nd datamodel.Node, _ *linking.LinkSystem) (datamodel.Node, error) {
return rot13adl.Reify(nd)
}
Expand Down
8 changes: 4 additions & 4 deletions linking/linkingExamples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func ExampleLinkSystem_Store() {
// We want to store the serialized data somewhere.
// We'll use an in-memory store for this. (It's a package scoped variable.)
// You can use any kind of storage system here;
// you just need a function that conforms to the datamodel.BlockWriteOpener interface.
lsys.StorageWriteOpener = (&store).OpenWrite
// or if you need even more control, you could also write a function that conforms to the linking.BlockWriteOpener interface.
lsys.SetWriteStorage(&store)

// To create any links, first we need a LinkPrototype.
// This gathers together any parameters that might be needed when making a link.
Expand Down Expand Up @@ -103,8 +103,8 @@ func ExampleLinkSystem_Load() {
// We'll use an in-memory store for this. (It's a package scoped variable.)
// (This particular memory store was filled with the data we'll load earlier, during ExampleLinkSystem_Store.)
// You can use any kind of storage system here;
// you just need a function that conforms to the datamodel.BlockReadOpener interface.
lsys.StorageReadOpener = (&store).OpenRead
// or if you need even more control, you could also write a function that conforms to the linking.BlockReadOpener interface.
lsys.SetReadStorage(&store)

// We'll need to decide what in-memory implementation of datamodel.Node we want to use.
// Here, we'll use the "basicnode" implementation. This is a good getting-started choice.
Expand Down
4 changes: 2 additions & 2 deletions node/tests/schemaLinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func encode(n datamodel.Node) (datamodel.Node, datamodel.Link) {
MhLength: 4,
}}
lsys := cidlink.DefaultLinkSystem()
lsys.StorageWriteOpener = (&store).OpenWrite
lsys.SetWriteStorage(&store)

lnk, err := lsys.Store(linking.LinkContext{}, lp, n)
if err != nil {
Expand Down Expand Up @@ -96,7 +96,7 @@ func SchemaTestLinks(t *testing.T, engine Engine) {

var order int
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
err = traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down
79 changes: 58 additions & 21 deletions storage/memstore/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,91 @@ package memstore

import (
"bytes"
"context"
"fmt"
"io"

"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
)

// TODO: this is implementing the linking APIs, and it should be updated to just implement the storage APIs, and let LinkSystem construction figure out the rest of the connections.

// Store is a simple in-memory storage.
// (It's little more than a map -- in fact, the map is exported,
// and you can poke it directly.)
//
// The OpenRead method conforms to linking.BlockReadOpener,
// and the OpenWrite method conforms to linking.BlockWriteOpener.
// Therefore it's easy to use in a LinkSystem like this:
// Store conforms to the storage.ReadableStorage and storage.WritableStorage APIs.
// Additionally, it supports storage.PeekableStorage and storage.StreamingReadableStorage,
// because it can do so while provoking fewer copies.
//
// If you want to use this store with streaming APIs,
// you can still do so by using the functions in the storage package,
// such as storage.GetStream and storage.PutStream, which will synthesize the correct behavior.
//
// store := memstore.Store{}
// lsys.StorageReadOpener = (&store).OpenRead
// lsys.StorageWriteOpener = (&store).OpenWrite
// You can use this storage with a linking.LinkSystem easily,
// by using the LinkSystem.SetReadStorage and/or LinkSystem.SetWriteStorage methods.
//
// There are no construction parameters for sharding functions nor escaping functions.
// Any keys are acceptable.
//
// This storage is mostly expected to be used for testing and demos,
// and as an example of how you can implement and integrate your own storage systems.
// It does not provide persistence beyond memory.
type Store struct {
Bag map[datamodel.Link][]byte
Bag map[string][]byte
}

func (store *Store) beInitialized() {
if store.Bag != nil {
return
}
store.Bag = make(map[datamodel.Link][]byte)
store.Bag = make(map[string][]byte)
}

func (store *Store) OpenRead(lnkCtx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
// Get implements go-ipld-prime/storage.ReadableStorage.Get.
//
// Note that this internally performs a defensive copy;
// use Peek for higher performance if you are certain you won't mutate the returned slice.
func (store *Store) Get(ctx context.Context, key string) ([]byte, error) {
store.beInitialized()
data, exists := store.Bag[lnk]
content, exists := store.Bag[key]
if !exists {
return nil, fmt.Errorf("404") // FIXME this needs a standard error type
}
return bytes.NewReader(data), nil
cpy := make([]byte, len(content))
copy(cpy, content)
return cpy, nil
}

func (store *Store) OpenWrite(lnkCtx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) {
// Put implements go-ipld-prime/storage.WritableStorage.Put.
func (store *Store) Put(ctx context.Context, key string, content []byte) error {
store.beInitialized()
buf := bytes.Buffer{}
return &buf, func(lnk datamodel.Link) error {
store.Bag[lnk] = buf.Bytes()
if _, exists := store.Bag[key]; exists {
return nil
}, nil
}
cpy := make([]byte, len(content))
copy(cpy, content)
store.Bag[key] = cpy
return nil
}

// GetStream implements go-ipld-prime/storage.StreamingReadableStorage.GetStream.
//
// It's useful for this storage implementation to explicitly support this,
// because returning a reader gives us room to avoid needing a defensive copy.
func (store *Store) GetStream(ctx context.Context, key string) (io.Reader, error) {
content, exists := store.Bag[key]
if !exists {
return nil, fmt.Errorf("404") // FIXME this needs a standard error type
}
return bytes.NewReader(content), nil
}

// Peek implements go-ipld-prime/storage.PeekableStorage.Peek.
func (store *Store) Peek(ctx context.Context, key string) ([]byte, io.Closer, error) {
content, exists := store.Bag[key]
if !exists {
return nil, nil, fmt.Errorf("404") // FIXME this needs a standard error type
}
return content, noopCloser{}, nil
}

type noopCloser struct{}

func (noopCloser) Close() error { return nil }
10 changes: 5 additions & 5 deletions traversal/focus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func encode(n datamodel.Node) (datamodel.Node, datamodel.Link) {
MhLength: 4,
}}
lsys := cidlink.DefaultLinkSystem()
lsys.StorageWriteOpener = (&store).OpenWrite
lsys.SetWriteStorage(&store)

lnk, err := lsys.Store(linking.LinkContext{}, lp, n)
if err != nil {
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestFocusWithLinkLoading(t *testing.T) {
})
t.Run("link traversal with loader should work", func(t *testing.T) {
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
err := traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestGetWithLinkLoading(t *testing.T) {
})
t.Run("link traversal with loader should work", func(t *testing.T) {
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
n, err := traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down Expand Up @@ -316,8 +316,8 @@ func TestFocusedTransform(t *testing.T) {
func TestFocusedTransformWithLinks(t *testing.T) {
var store2 = memstore.Store{}
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.StorageWriteOpener = (&store2).OpenWrite
lsys.SetReadStorage(&store)
lsys.SetWriteStorage(&store2)
cfg := traversal.Config{
LinkSystem: lsys,
LinkTargetNodePrototypeChooser: basicnode.Chooser,
Expand Down
27 changes: 18 additions & 9 deletions traversal/walk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/storage"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestWalkMatching(t *testing.T) {
s, err := ss.Selector()
var order int
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
err = traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down Expand Up @@ -174,7 +175,7 @@ func TestWalkMatching(t *testing.T) {
s, err := ss.Selector()
var order int
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
err = traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down Expand Up @@ -217,7 +218,7 @@ func TestWalkMatching(t *testing.T) {
s, err := ss.Selector()
var order int
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
err = traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down Expand Up @@ -302,7 +303,7 @@ func TestWalkBudgets(t *testing.T) {
qt.Assert(t, err, qt.Equals, nil)
var order int
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
err = traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down Expand Up @@ -416,12 +417,16 @@ func TestWalkBlockLoadOrder(t *testing.T) {

t.Run("CommonSelector_MatchAllRecursively", func(t *testing.T) {
s := selectorparse.CommonSelector_MatchAllRecursively
verifySelectorLoads(t, expectedAllBlocks, s, false, (&store).OpenRead)
verifySelectorLoads(t, expectedAllBlocks, s, false, func(lctx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
return storage.GetStream(lctx.Ctx, &store, lnk.Binary())
})
})

t.Run("CommonSelector_ExploreAllRecursively", func(t *testing.T) {
s := selectorparse.CommonSelector_ExploreAllRecursively
verifySelectorLoads(t, expectedAllBlocks, s, false, (&store).OpenRead)
verifySelectorLoads(t, expectedAllBlocks, s, false, func(lctx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
return storage.GetStream(lctx.Ctx, &store, lnk.Binary())
})
})

t.Run("constructed explore-all selector", func(t *testing.T) {
Expand All @@ -430,7 +435,9 @@ func TestWalkBlockLoadOrder(t *testing.T) {
s := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).
Node()
verifySelectorLoads(t, expectedAllBlocks, s, false, (&store).OpenRead)
verifySelectorLoads(t, expectedAllBlocks, s, false, func(lctx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
return storage.GetStream(lctx.Ctx, &store, lnk.Binary())
})
})

t.Run("explore-all with duplicate load skips via SkipMe", func(t *testing.T) {
Expand Down Expand Up @@ -463,7 +470,7 @@ func TestWalkBlockLoadOrder(t *testing.T) {
return nil, traversal.SkipMe{}
}
visited[l] = true
return (&store).OpenRead(lc, l)
return storage.GetStream(lc.Ctx, &store, l.Binary())
})
})

Expand All @@ -479,6 +486,8 @@ func TestWalkBlockLoadOrder(t *testing.T) {
middleMapNodeLnk,
}
s := selectorparse.CommonSelector_ExploreAllRecursively
verifySelectorLoads(t, expectedLinkRevisitBlocks, s, true, (&store).OpenRead)
verifySelectorLoads(t, expectedLinkRevisitBlocks, s, true, func(lctx linking.LinkContext, lnk datamodel.Link) (io.Reader, error) {
return storage.GetStream(lctx.Ctx, &store, lnk.Binary())
})
})
}
4 changes: 2 additions & 2 deletions traversal/walk_with_stop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestStopAtLink(t *testing.T) {
}
var order int
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
err = traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down Expand Up @@ -219,7 +219,7 @@ func stopAtInChainTest(t *testing.T, chainNode datamodel.Node, stopLnk datamodel

var order int
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = (&store).OpenRead
lsys.SetReadStorage(&store)
err = traversal.Progress{
Cfg: &traversal.Config{
LinkSystem: lsys,
Expand Down

0 comments on commit 273362c

Please sign in to comment.