Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: support generate direct deal index #540

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions tools/index/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,26 @@

主要有两个功能,一个是给未生成索引的 active 订单生成索引,另一个是迁移 top index 到 MongoDB,迁移 shard 到 MySQL。

### 编译
## 编译

```
```bash
make index
```

### 生成索引
## 生成索引

先去 droplet 获取订单状态是 active 的订单,然后去遍历 car 文件,如果被 active 订单使用且未生成索引,则为其生成索引。

* --car-dir:存储 car 文件的目录。
* --index-dir:存储索引文件的目录,`droplet` 默认在 `~/.droplet/dagstore/index`。
* --car-dir:存储 car 文件的目录,需要用绝对路径
* --index-dir:存储索引文件的目录,需要用绝对路径,`droplet` 默认在 `~/.droplet/dagstore/index`。
* --mongo-url:MongoDB 的连接地址,用于存储 top index,数据库是 `market_index`,collection 是 `top_index`。
* --mysql-url:MySQL 的连接地址,用于存储 shard 状态,要和 `droplet` 使用同一个数据库,表名是 `shards`。
* --droplet-url:droplet 服务的 RPC 地址。
* --droplet-token:droplet 服务的 token。
* --start:订单创建时间需大于设置的值。
* --end:订单创建时间需小于设置的值。
* --concurrency:生成索引的并发数,默认是 1。
* --miner-addr:指定 miner 生成索引,未设置则给所有 miner 生成索引。

```bash
./index-tool gen-index \
Expand All @@ -34,7 +35,7 @@ make index

> 成功生成索引会输出类似日志:`generate index success: xxxxxx`

### 迁移索引
## 迁移索引

目前 top index 和 shard 都是存储在 badger,这样多个 droplet 时不能共享,所有需要把 top index 存储到 MongoDB,shard 存储到 MySQL,方便共享数据。

Expand Down
90 changes: 76 additions & 14 deletions tools/index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/dagstore/throttle"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc"
marketapi "github.com/filecoin-project/venus/venus-shared/api/market/v1"
Expand Down Expand Up @@ -54,9 +55,9 @@ var (
Required: true,
}
dropletURLFlag = &cli.StringFlag{
Name: "droplet-url",
Usage: "droplet url",
Required: true,
Name: "droplet-url",
Usage: "droplet url",
Value: "/ip4/127.0.0.1/tcp/41264",
}
dropletTokenFlag = &cli.StringFlag{
Name: "droplet-token",
Expand All @@ -76,6 +77,10 @@ var (
Usage: "Concurrent number of indexes generated",
Value: 1,
}
minersAddrFlag = &cli.StringFlag{
Name: "miner-addr",
Usage: "miner address, eg --miner-addr t010001 or --miner-addr t010001,t010002",
}
)

func main() {
Expand Down Expand Up @@ -106,11 +111,12 @@ var generateIndexCmd = &cli.Command{
dropletURLFlag,
startFlag,
endFlag,
minersAddrFlag,
concurrencyFlag,
},
Action: func(cctx *cli.Context) error {
ctx := cctx.Context
carDir := cctx.String("car-dir")
carDir := cctx.String(carDirFlag.Name)
indexDir := cctx.String(indexDirFlag.Name)
p, err := paramsFromContext(cctx)
if err != nil {
Expand Down Expand Up @@ -149,10 +155,33 @@ func paramsFromContext(cctx *cli.Context) (*params, error) {
mysqlURL := cctx.String(mysqlURLFlag.Name)
url := cctx.String(dropletURLFlag.Name)
token := cctx.String(dropletTokenFlag.Name)

minerAddrStr := cctx.String(minersAddrFlag.Name)
fmt.Println("mongo url:", mongoURL)
fmt.Println("mysql url:", mysqlURL)
fmt.Println("droplet url:", url, "token:", token)
fmt.Println("miner addr:", minerAddrStr)

minerAddrs := make(map[address.Address]struct{})
for _, addr := range strings.Split(minerAddrStr, ",") {
if len(addr) == 0 {
continue
}
addr, err := address.NewFromString(addr)
if err != nil {
return nil, err
}
minerAddrs[addr] = struct{}{}
}

filter := func(addr address.Address) bool {
if len(minerAddrs) == 0 {
return false
}

_, ok := minerAddrs[addr]

return !ok
}

api, close, err := marketapi.DialIMarketRPC(ctx, url, token, nil)
if err != nil {
Expand Down Expand Up @@ -182,13 +211,39 @@ func paramsFromContext(cctx *cli.Context) (*params, error) {
if end != nil && end.Before(deal.CreationTime.Time()) {
continue
}
if filter(deal.Proposal.Provider) {
continue
}
p := deal.Proposal.PieceCID.String()
if _, ok := pieces[p]; !ok {
pieces[p] = struct{}{}
pieceInfos = append(pieceInfos, &pieceInfo{piece: p, payloadSize: deal.PayloadSize, pieceSize: uint64(deal.Proposal.PieceSize)})
}
}
fmt.Printf("active deals: %d, valid deals: %d, pieces: %d\n", len(deals), len(pieceInfos), len(pieces))

activeDirectDeal := market.DealActive
directDeals, err := api.ListDirectDeals(ctx, market.DirectDealQueryParams{State: &activeDirectDeal})
if err != nil {
return nil, fmt.Errorf("list direct deal failed: %v", err)
}
for _, deal := range directDeals {
if start != nil && start.After(time.Unix(int64(deal.CreatedAt), 0)) {
continue
}
if end != nil && end.Before(time.Unix(int64(deal.CreatedAt), 0)) {
continue
}
if filter(deal.Provider) {
continue
}
p := deal.PieceCID.String()
if _, ok := pieces[p]; !ok {
pieces[p] = struct{}{}
pieceInfos = append(pieceInfos, &pieceInfo{piece: p, payloadSize: deal.PayloadSize, pieceSize: uint64(deal.PieceSize)})
}
}

fmt.Printf("active deals: %d, valid deals: %d\n", len(deals)+len(directDeals), len(pieceInfos))

var topIndexRepo *dagstore.MongoTopIndex
if len(mongoURL) != 0 {
Expand Down Expand Up @@ -238,7 +293,7 @@ func getStartEndTime(cctx *cli.Context) (*time.Time, *time.Time, error) {
func generateIndex(ctx context.Context, carDir string, indexDir string, p *params) error {
doGenIndex := func(pi *pieceInfo) error {
piece := pi.piece
f, err := os.Open(filepath.Join(carDir, piece))
f, err := openCar(carDir, piece)
if err != nil {
return err
}
Expand Down Expand Up @@ -272,12 +327,12 @@ func generateIndex(ctx context.Context, carDir string, indexDir string, p *param
var globalErr error
for _, pi := range p.pieceInfos {
pi := pi
has, err := hasIndex(ctx, pi.piece, indexDir)
has, err := hasIndex(pi.piece, indexDir)
if err != nil {
return err
}
if has {
fmt.Println("already had index:", pi.piece)
// fmt.Println("already had index:", pi.piece)
continue
}
if globalErr != nil {
Expand Down Expand Up @@ -305,7 +360,17 @@ func generateIndex(ctx context.Context, carDir string, indexDir string, p *param
return globalErr
}

func hasIndex(ctx context.Context, piece string, indexDir string) (bool, error) {
func openCar(carDir, pieceCID string) (*os.File, error) {
carPath := filepath.Join(carDir, pieceCID+".car")
f, err := os.Open(carPath)
if err == nil {
return f, nil
}

return os.Open(filepath.Join(carDir, pieceCID))
}

func hasIndex(piece string, indexDir string) (bool, error) {
_, err := os.Stat(filepath.Join(indexDir, piece+indexSuffix))
if err != nil {
if os.IsNotExist(err) {
Expand Down Expand Up @@ -370,8 +435,6 @@ var migrateIndexCmd = &cli.Command{
return err
}

fmt.Println("index dir:", indexDir)

return migrateIndex(ctx, indexDir, p)
},
}
Expand All @@ -396,7 +459,7 @@ func migrateIndex(ctx context.Context, indexDir string, p *params) error {
return err
}
if has {
fmt.Println("already had shard:", piece)
// fmt.Println("already had shard:", piece)
return nil
}

Expand Down Expand Up @@ -449,7 +512,6 @@ var indexInfoCmd = &cli.Command{
}

indexFile := cctx.Args().First()
fmt.Println("index file: ", indexFile)
f, err := os.Open(indexFile)
if err != nil {
return err
Expand Down
Loading