Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/store): store eds in memory for index builder #2409

Merged
58 changes: 53 additions & 5 deletions share/eds/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) {
}

r := mount.NewRegistry()
err = r.Register("fs", &mount.FileMount{Path: basepath + blocksPath})
err = r.Register("fs", &inMemoryOnceMount{Mount: &mount.FileMount{}})
if err != nil {
return nil, fmt.Errorf("failed to register memory mount on the registry: %w", err)
}
if err != nil {
return nil, fmt.Errorf("failed to register FS mount on the registry: %w", err)
}
Expand Down Expand Up @@ -182,15 +185,24 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext
}
defer f.Close()

err = WriteEDS(ctx, square, f)
// save encoded eds into buffer
mount := &inMemoryOnceMount{
// TODO: buffer could be pre-allocated with capacity calculated based on eds size.
buf: bytes.NewBuffer(nil),
Mount: &mount.FileMount{Path: s.basepath + blocksPath + key},
}
err = WriteEDS(ctx, square, mount)
if err != nil {
return fmt.Errorf("failed to write EDS to file: %w", err)
}

// write whole buffered mount data in one go to optimize i/o
if _, err = mount.WriteTo(f); err != nil {
return fmt.Errorf("failed to write EDS to file: %w", err)
}

ch := make(chan dagstore.ShardResult, 1)
err = s.dgstr.RegisterShard(ctx, shard.KeyFromString(key), &mount.FileMount{
Path: s.basepath + blocksPath + key,
}, ch, dagstore.RegisterOpts{})
err = s.dgstr.RegisterShard(ctx, shard.KeyFromString(key), mount, ch, dagstore.RegisterOpts{})
if err != nil {
return fmt.Errorf("failed to initiate shard registration: %w", err)
}
Expand Down Expand Up @@ -419,3 +431,39 @@ func setupPath(basepath string) error {
}
return nil
}

// inMemoryOnceMount is used to allow reading once from buffer before using main mount.Reader
type inMemoryOnceMount struct {
buf *bytes.Buffer

readOnce atomic.Bool
mount.Mount
}

func (m *inMemoryOnceMount) Fetch(ctx context.Context) (mount.Reader, error) {
if !m.readOnce.Swap(true) {
reader := &inMemoryReader{Reader: bytes.NewReader(m.buf.Bytes())}
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
// release memory for gc, otherwise buffer will stick forever
m.buf = nil
return reader, nil
}
return m.Mount.Fetch(ctx)
}

func (m *inMemoryOnceMount) Write(b []byte) (int, error) {
return m.buf.Write(b)
}

func (m *inMemoryOnceMount) WriteTo(w io.Writer) (int64, error) {
return io.Copy(w, bytes.NewReader(m.buf.Bytes()))
}

// inMemoryReader extends bytes.Reader to implement mount.Reader interface
type inMemoryReader struct {
*bytes.Reader
}

// Close allows inMemoryReader to satisfy mount.Reader interface
func (r *inMemoryReader) Close() error {
return nil
}