diff --git a/share/eds/store.go b/share/eds/store.go index 0683df0d99..c088d0b19e 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -69,7 +69,10 @@ func NewStore(basepath string, ds datastore.Batching) (*Store, error) { } r := mount.NewRegistry() - err = r.Register("fs", &mount.FileMount{Path: basepath + blocksPath}) + err = r.Register("fs", &inMemoryOnceMount{Mount: &mount.FileMount{}}) + if err != nil { + return nil, fmt.Errorf("failed to register memory mount on the registry: %w", err) + } if err != nil { return nil, fmt.Errorf("failed to register FS mount on the registry: %w", err) } @@ -181,15 +184,24 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext } defer f.Close() - err = WriteEDS(ctx, square, f) + // save encoded eds into buffer + mount := &inMemoryOnceMount{ + // TODO: buffer could be pre-allocated with capacity calculated based on eds size. + buf: bytes.NewBuffer(nil), + Mount: &mount.FileMount{Path: s.basepath + blocksPath + key}, + } + err = WriteEDS(ctx, square, mount) if err != nil { return fmt.Errorf("failed to write EDS to file: %w", err) } + // write whole buffered mount data in one go to optimize i/o + if _, err = mount.WriteTo(f); err != nil { + return fmt.Errorf("failed to write EDS to file: %w", err) + } + ch := make(chan dagstore.ShardResult, 1) - err = s.dgstr.RegisterShard(ctx, shard.KeyFromString(key), &mount.FileMount{ - Path: s.basepath + blocksPath + key, - }, ch, dagstore.RegisterOpts{}) + err = s.dgstr.RegisterShard(ctx, shard.KeyFromString(key), mount, ch, dagstore.RegisterOpts{}) if err != nil { return fmt.Errorf("failed to initiate shard registration: %w", err) } @@ -418,3 +430,39 @@ func setupPath(basepath string) error { } return nil } + +// inMemoryOnceMount is used to allow reading once from buffer before using main mount.Reader +type inMemoryOnceMount struct { + buf *bytes.Buffer + + readOnce atomic.Bool + mount.Mount +} + +func (m *inMemoryOnceMount) Fetch(ctx context.Context) (mount.Reader, error) { + if !m.readOnce.Swap(true) { + reader := &inMemoryReader{Reader: bytes.NewReader(m.buf.Bytes())} + // release memory for gc, otherwise buffer will stick forever + m.buf = nil + return reader, nil + } + return m.Mount.Fetch(ctx) +} + +func (m *inMemoryOnceMount) Write(b []byte) (int, error) { + return m.buf.Write(b) +} + +func (m *inMemoryOnceMount) WriteTo(w io.Writer) (int64, error) { + return io.Copy(w, bytes.NewReader(m.buf.Bytes())) +} + +// inMemoryReader extends bytes.Reader to implement mount.Reader interface +type inMemoryReader struct { + *bytes.Reader +} + +// Close allows inMemoryReader to satisfy mount.Reader interface +func (r *inMemoryReader) Close() error { + return nil +}