Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add deduplication ratio to 'ipfs dag stat' #9787

Merged
merged 27 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f1604da
checkpoint1
arthurgavazza Mar 31, 2023
ac3e6b0
feat: achieve correct minimal output
arthurgavazza Mar 31, 2023
ce81fd0
feat: start adding json encoding
arthurgavazza Mar 31, 2023
6dffb58
fix: fmt
arthurgavazza Mar 31, 2023
946fdd6
feat: add custom json encoder and format text encoder
arthurgavazza Apr 1, 2023
63d271b
fix: remove test json files
arthurgavazza Apr 1, 2023
fbc1c9c
Update core/commands/dag/dag.go
arthurgavazza May 3, 2023
eac10df
fix: check error on cdv write
arthurgavazza May 3, 2023
27547d3
fix: tsv title format
arthurgavazza May 3, 2023
7f38e7e
fix: print line with Fprintln
arthurgavazza May 3, 2023
76e5f20
fix: use boxo traverse
arthurgavazza May 3, 2023
b48dcb0
refactor: calculate summary with only one method
arthurgavazza May 3, 2023
9bbef7f
refactor: make simplifications
arthurgavazza May 3, 2023
620f73c
fix: add support to progressive information
arthurgavazza May 12, 2023
9146d95
fix: fix ci
arthurgavazza May 12, 2023
ee82e97
test: add a winner test using a fixture
arthurgavazza May 16, 2023
6641fb3
Merge branch 'master' into feat/add-deduplication-ratio-stat
arthurgavazza May 16, 2023
8d79bcb
Update test/cli/dag_test.go
arthurgavazza May 16, 2023
f1840ca
fix: deduplication calculations and tests
arthurgavazza May 21, 2023
864e91b
fix: lint
arthurgavazza May 21, 2023
b689c10
fix: CI
arthurgavazza May 21, 2023
220856f
Merge branch 'master' into feat/add-deduplication-ratio-stat
arthurgavazza May 21, 2023
f2051db
test: add text output tests
arthurgavazza Jun 4, 2023
0df029e
Merge branch 'master' into feat/add-deduplication-ratio-stat
arthurgavazza Jun 4, 2023
08f837c
Update core/commands/dag/dag.go
Jorropo Jun 5, 2023
538f5bb
Update test/cli/dag_test.go
Jorropo Jun 5, 2023
336a98c
FIXUP
Jorropo Jun 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 104 additions & 6 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package dagcmd

import (
"encoding/csv"
"encoding/json"
"fmt"
"io"

Expand Down Expand Up @@ -275,13 +277,83 @@ CAR file follows the CARv1 format: https://ipld.io/specs/transport/car/carv1/
}

// DagStat is a dag stat command response

Jorropo marked this conversation as resolved.
Show resolved Hide resolved
type DagStat struct {
Size uint64
NumBlocks int64
Cid cid.Cid `json:",omitempty"`
Size uint64 `json:",omitempty"`
NumBlocks int64 `json:",omitempty"`
}

func (s *DagStat) String() string {
return fmt.Sprintf("Size: %d, NumBlocks: %d", s.Size, s.NumBlocks)
return fmt.Sprintf("%s %d %d", s.Cid.String()[:20], s.Size, s.NumBlocks)
}

func (s *DagStat) MarshalJSON() ([]byte, error) {
type Alias DagStat
/*
We can't rely on cid.Cid.MarshalJSON since it uses the {"/": "..."}
format. To make the output consistent and follow the Kubo API patterns
we use the Cid.String method
*/
return json.Marshal(struct {
Cid string `json:"Cid"`
*Alias
}{
Cid: s.Cid.String(),
Alias: (*Alias)(s),
})
}

func (s *DagStat) UnmarshalJSON(data []byte) error {
/*
We can't rely on cid.Cid.UnmarshalJSON since it uses the {"/": "..."}
format. To make the output consistent and follow the Kubo API patterns
we use the Cid.Parse method
*/
type Alias DagStat
aux := struct {
Cid string `json:"Cid"`
*Alias
}{
Alias: (*Alias)(s),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
Cid, err := cid.Parse(aux.Cid)
if err != nil {
return err
}
s.Cid = Cid
return nil
}

type DagStatSummary struct {
redundantSize uint64 `json:"-"`
UniqueBlocks int `json:",omitempty"`
TotalSize uint64 `json:",omitempty"`
SharedSize uint64 `json:",omitempty"`
Ratio float32 `json:",omitempty"`
DagStatsArray []*DagStat `json:"DagStats,omitempty"`
}

func (s *DagStatSummary) String() string {
return fmt.Sprintf("Total Size: %d\nUnique Blocks: %d\nShared Size: %d\nRatio: %f", s.TotalSize, s.UniqueBlocks, s.SharedSize, s.Ratio)
}

func (s *DagStatSummary) incrementTotalSize(size uint64) {
s.TotalSize += size
}
func (s *DagStatSummary) incrementRedundantSize(size uint64) {
s.redundantSize += size
}
func (s *DagStatSummary) appendStats(stats *DagStat) {
s.DagStatsArray = append(s.DagStatsArray, stats)
}

func (s *DagStatSummary) calculateSummary() {
s.Ratio = float32(s.redundantSize) / float32(s.TotalSize)
s.SharedSize = s.redundantSize - s.TotalSize
}

// DagStatCmd is a command for getting size information about an ipfs-stored dag
Expand All @@ -296,24 +368,50 @@ Note: This command skips duplicate blocks in reporting both size and the number
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("root", true, false, "CID of a DAG root to get statistics for").EnableStdin(),
cmds.StringArg("root", true, true, "CID of a DAG root to get statistics for").EnableStdin(),
},
Options: []cmds.Option{
cmds.BoolOption(progressOptionName, "p", "Return progressive data while reading through the DAG").WithDefault(true),
},
Run: dagStat,
Type: DagStat{},
Type: DagStatSummary{},
PostRun: cmds.PostRunMap{
cmds.CLI: finishCLIStat,
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, event *DagStat) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, event *DagStatSummary) error {
fmt.Fprintln(w)
csvWriter := csv.NewWriter(w)
csvWriter.Comma = '\t'
cidSpacing := len(event.DagStatsArray[0].Cid.String())
header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), "Size"}
if err := csvWriter.Write(header); err != nil {
return err
}
for _, dagStat := range event.DagStatsArray {
numBlocksStr := fmt.Sprint(dagStat.NumBlocks)
err := csvWriter.Write([]string{
dagStat.Cid.String(),
fmt.Sprintf("%-15s", numBlocksStr),
fmt.Sprint(dagStat.Size),
})
if err != nil {
return err
}
}
csvWriter.Flush()
fmt.Fprint(w, "\nSummary\n")
_, err := fmt.Fprintf(
w,
"%v\n",
event,
)
fmt.Fprint(w, "\n\n")
return err
}),
cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, event *DagStatSummary) error {
return json.NewEncoder(w).Encode(event)
},
),
},
}
113 changes: 66 additions & 47 deletions core/commands/dag/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,70 +6,82 @@ import (
"os"

"github.com/ipfs/boxo/coreiface/path"
mdag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/traverse"
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/kubo/core/commands/cmdenv"
"github.com/ipfs/kubo/core/commands/e"

mdag "github.com/ipfs/boxo/ipld/merkledag"
cmds "github.com/ipfs/go-ipfs-cmds"
)

// TODO cache every cid traversal in a dp cache
// if the cid exists in the cache, don't traverse it, and use the cached result
// to compute the new state

func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
progressive := req.Options[progressOptionName].(bool)

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}

rp, err := api.ResolvePath(req.Context, path.New(req.Arguments[0]))
if err != nil {
return err
}

if len(rp.Remainder()) > 0 {
return fmt.Errorf("cannot return size for anything other than a DAG with a root CID")
}

nodeGetter := mdag.NewSession(req.Context, api.Dag())
obj, err := nodeGetter.Get(req.Context, rp.Cid())
if err != nil {
return err
}

dagstats := &DagStat{}
err = traverse.Traverse(obj, traverse.Options{
DAG: nodeGetter,
Order: traverse.DFSPre,
Func: func(current traverse.State) error {
dagstats.Size += uint64(len(current.Node.RawData()))
dagstats.NumBlocks++

if progressive {
if err := res.Emit(dagstats); err != nil {
return err
}
}
return nil
},
ErrFunc: nil,
SkipDuplicates: true,
})
if err != nil {
return fmt.Errorf("error traversing DAG: %w", err)
}
cidSet := cid.NewSet()
dagStatSummary := &DagStatSummary{DagStatsArray: []*DagStat{}}
for _, a := range req.Arguments {
rp, err := api.ResolvePath(req.Context, path.New(a))
if err != nil {
return err
}
if len(rp.Remainder()) > 0 {
return fmt.Errorf("cannot return size for anything other than a DAG with a root CID")
}

if !progressive {
if err := res.Emit(dagstats); err != nil {
arthurgavazza marked this conversation as resolved.
Show resolved Hide resolved
obj, err := nodeGetter.Get(req.Context, rp.Cid())
if err != nil {
return err
}
dagstats := &DagStat{Cid: rp.Cid()}
dagStatSummary.appendStats(dagstats)
err = traverse.Traverse(obj, traverse.Options{
DAG: nodeGetter,
Order: traverse.DFSPre,
Func: func(current traverse.State) error {
fmt.Println("previousDagStatSize:", dagstats.Size)
currentNodeSize := uint64(len(current.Node.RawData()))
dagstats.Size += currentNodeSize
dagstats.NumBlocks++
if !cidSet.Has(current.Node.Cid()) {
dagStatSummary.incrementTotalSize(currentNodeSize)
}
dagStatSummary.incrementRedundantSize(currentNodeSize)
cidSet.Add(current.Node.Cid())
if progressive {
if err := res.Emit(dagStatSummary); err != nil {
return err
}
}
return nil
},
ErrFunc: nil,
SkipDuplicates: true,
})
if err != nil {
return fmt.Errorf("error traversing DAG: %w", err)
}
}

dagStatSummary.UniqueBlocks = cidSet.Len()
dagStatSummary.calculateSummary()

if err := res.Emit(dagStatSummary); err != nil {
return err
}
return nil
}

func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error {
var dagStats *DagStat
var dagStats *DagStatSummary
for {
v, err := res.Next()
if err != nil {
Expand All @@ -78,13 +90,20 @@ func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error {
}
return err
}

out, ok := v.(*DagStat)
if !ok {
switch out := v.(type) {
case *DagStatSummary:
dagStats = out
if dagStats.Ratio == 0 {
length := len(dagStats.DagStatsArray)
if length > 0 {
currentStat := dagStats.DagStatsArray[length-1]
fmt.Fprintf(os.Stderr, "CID: %s, Size: %d, NumBlocks: %d\n", currentStat.Cid, currentStat.Size, currentStat.NumBlocks)
}
}
default:
return e.TypeErr(out, v)

}
dagStats = out
fmt.Fprintf(os.Stderr, "%v\r", out)
}
return re.Emit(dagStats)
}
Loading