diff --git a/api/api_worker.go b/api/api_worker.go index d9f6cd0b..1fbb94b8 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -6,7 +6,6 @@ import ( "github.com/google/uuid" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/venus-sealer/constants" "github.com/filecoin-project/venus-sealer/sector-storage/stores" @@ -24,8 +23,6 @@ type WorkerAPI interface { TaskNumbers(context.Context) (string, error) - SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error) - storiface.WorkerCalls TaskDisable(ctx context.Context, tt types.TaskType) error diff --git a/api/worker.go b/api/worker.go index 4e65f973..eea5f700 100644 --- a/api/worker.go +++ b/api/worker.go @@ -42,7 +42,6 @@ type WorkerStruct struct { TaskEnable func(ctx context.Context, tt types.TaskType) error `perm:"admin"` TaskNumbers func(context.Context) (string, error) `perm:"admin"` - SectorExists func(context.Context, types.TaskType, storage.SectorRef) (bool, error) `perm:"admin"` Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` @@ -131,10 +130,6 @@ func (w *WorkerStruct) TaskNumbers(ctx context.Context) (string, error) { return w.Internal.TaskNumbers(ctx) } -func (w *WorkerStruct) SectorExists(ctx context.Context, task types.TaskType, sector storage.SectorRef) (bool, error) { - return w.Internal.SectorExists(ctx, task, sector) -} - func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error { return w.Internal.Remove(ctx, sector) } diff --git a/app/venus-sealer/storage.go b/app/venus-sealer/storage.go index 5077977c..4914203b 100644 --- a/app/venus-sealer/storage.go +++ b/app/venus-sealer/storage.go @@ -87,6 +87,14 @@ over time Name: "store", Usage: "(for init) use path for long-term storage", }, + &cli.StringSliceFlag{ + Name: "groups", + Usage: "path group names", + }, + &cli.StringSliceFlag{ + Name: "allow-to", + Usage: "path groups allowed to pull data from this path (allow all if not specified)", + }, }, Action: func(cctx *cli.Context) error { storageAPI, closer, err := api.GetStorageMinerAPI(cctx) @@ -125,6 +133,8 @@ over time Weight: cctx.Uint64("weight"), CanSeal: cctx.Bool("seal"), CanStore: cctx.Bool("store"), + Groups: cctx.StringSlice("groups"), + AllowTo: cctx.StringSlice("allow-to"), } if !(cfg.CanStore || cfg.CanSeal) { @@ -259,10 +269,17 @@ var storageListCmd = &cli.Command{ if si.CanStore { fmt.Print(color.CyanString("Store")) } - fmt.Println("") } else { fmt.Print(color.HiYellowString("Use: ReadOnly")) } + fmt.Println("") + + if len(si.Groups) > 0 { + fmt.Printf("\tGroups: %s\n", strings.Join(si.Groups, ", ")) + } + if len(si.AllowTo) > 0 { + fmt.Printf("\tAllowTo: %s\n", strings.Join(si.AllowTo, ", ")) + } if localPath, ok := local[s.ID]; ok { fmt.Printf("\tLocal: %s\n", color.GreenString(localPath)) diff --git a/app/venus-worker/main.go b/app/venus-worker/main.go index c3c19ab3..05ac25d6 100644 --- a/app/venus-worker/main.go +++ b/app/venus-worker/main.go @@ -166,11 +166,6 @@ var runCmd = &cli.Command{ Usage: "total number of task", Value: 100, }, - &cli.BoolFlag{ - Name: "bindP1P2", - Usage: "P1 and P2 phase tasks are bound to the same machine", - Value: false, - }, }, Action: func(cctx *cli.Context) error { log.Info("Starting venus worker") @@ -418,7 +413,6 @@ var runCmd = &cli.Command{ TaskTypes: taskTypes, NoSwap: cctx.Bool("no-swap"), TaskTotal: cctx.Int64("task-total"), - IsBindP1P2: cctx.Bool("bindP1P2"), }, remote, localStore, nodeApi, nodeApi, wsts), localStore: localStore, ls: localStorage, diff --git a/app/venus-worker/storage.go b/app/venus-worker/storage.go index 95f2ca99..2c4f3f96 100644 --- a/app/venus-worker/storage.go +++ b/app/venus-worker/storage.go @@ -45,6 +45,14 @@ var storageAttachCmd = &cli.Command{ Name: "store", Usage: "(for init) use path for long-term storage", }, + &cli.StringSliceFlag{ + Name: "groups", + Usage: "path group names", + }, + &cli.StringSliceFlag{ + Name: "allow-to", + Usage: "path groups allowed to pull data from this path (allow all if not specified)", + }, }, Action: func(cctx *cli.Context) error { workerApi, closer, err := api.GetWorkerAPI(cctx) @@ -83,6 +91,8 @@ var storageAttachCmd = &cli.Command{ Weight: cctx.Uint64("weight"), CanSeal: cctx.Bool("seal"), CanStore: cctx.Bool("store"), + Groups: cctx.StringSlice("groups"), + AllowTo: cctx.StringSlice("allow-to"), } if !(cfg.CanStore || cfg.CanSeal) { diff --git a/sector-storage/manager.go b/sector-storage/manager.go index 786c63b4..40f7c7a6 100644 --- a/sector-storage/manager.go +++ b/sector-storage/manager.go @@ -38,8 +38,6 @@ type Worker interface { TaskNumbers(context.Context) (string, error) - SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error) - // Returns paths accessible to the worker Paths(context.Context) ([]stores.StoragePath, error) diff --git a/sector-storage/sched_test.go b/sector-storage/sched_test.go index d5c7529e..c86b663b 100644 --- a/sector-storage/sched_test.go +++ b/sector-storage/sched_test.go @@ -124,10 +124,6 @@ func (t *schedTestWorker) TaskNumbers(ctx context.Context) (string, error) { return "0-0", nil } -func (s *schedTestWorker) SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error) { - return true, nil -} - func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { return s.paths, nil } diff --git a/sector-storage/selector_alloc.go b/sector-storage/selector_alloc.go index 1f97aa75..26b7db6c 100644 --- a/sector-storage/selector_alloc.go +++ b/sector-storage/selector_alloc.go @@ -38,15 +38,6 @@ func (s *allocSelector) Ok(ctx context.Context, task types.TaskType, spt abi.Reg return false, nil } - // whether the task of the previous stage is on this machine - bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector) - if err != nil { - return false, xerrors.Errorf("getting supported worker for same machine: %w", err) - } - if !bExist { - return false, nil - } - // Check the number of tasks taskNum, err := whnd.workerRpc.TaskNumbers(ctx) if err != nil { diff --git a/sector-storage/selector_existing.go b/sector-storage/selector_existing.go index 6e1f3c51..50d677eb 100644 --- a/sector-storage/selector_existing.go +++ b/sector-storage/selector_existing.go @@ -40,15 +40,6 @@ func (s *existingSelector) Ok(ctx context.Context, task types.TaskType, spt abi. return false, nil } - // whether the task of the previous stage is on this machine - bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector) - if err != nil { - return false, xerrors.Errorf("getting supported worker for same machine: %w", err) - } - if !bExist { - return false, nil - } - // Check the number of tasks taskNum, err := whnd.workerRpc.TaskNumbers(ctx) if err != nil { diff --git a/sector-storage/selector_task.go b/sector-storage/selector_task.go index eb927158..a53c3241 100644 --- a/sector-storage/selector_task.go +++ b/sector-storage/selector_task.go @@ -29,15 +29,6 @@ func (s *taskSelector) Ok(ctx context.Context, task types.TaskType, spt abi.Regi } _, supported := tasks[task] - // whether the task of the previous stage is on this machine - bExist, err := whnd.workerRpc.SectorExists(ctx, task, sector) - if err != nil { - return false, xerrors.Errorf("getting supported worker for same machine: %w", err) - } - if !bExist { - return false, nil - } - // Check the number of tasks taskNum, err := whnd.workerRpc.TaskNumbers(ctx) if err != nil { diff --git a/sector-storage/stores/index.go b/sector-storage/stores/index.go index 84cf8b02..d25d5557 100644 --- a/sector-storage/stores/index.go +++ b/sector-storage/stores/index.go @@ -26,6 +26,8 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5 // filesystem, local or networked / shared by multiple machines type ID string +type Group = string + type StorageInfo struct { ID ID URLs []string // TODO: Support non-http transports @@ -34,6 +36,9 @@ type StorageInfo struct { CanSeal bool CanStore bool + + Groups []Group + AllowTo []Group } type HealthReport struct { @@ -163,6 +168,8 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st fsutil.FsS i.stores[si.ID].info.MaxStorage = si.MaxStorage i.stores[si.ID].info.CanSeal = si.CanSeal i.stores[si.ID].info.CanStore = si.CanStore + i.stores[si.ID].info.Groups = si.Groups + i.stores[si.ID].info.AllowTo = si.AllowTo return nil } @@ -268,6 +275,8 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif storageIDs := map[ID]uint64{} isprimary := map[ID]bool{} + allowTo := map[Group]struct{}{} + for _, pathType := range storiface.PathTypes { if ft&pathType == 0 { continue @@ -299,6 +308,14 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif urls[k] = rl.String() } + if allowTo != nil && len(st.info.AllowTo) > 0 { + for _, group := range st.info.AllowTo { + allowTo[group] = struct{}{} + } + } else { + allowTo = nil // allow to any + } + out = append(out, SectorStorageInfo{ ID: id, URLs: urls, @@ -341,6 +358,22 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storif continue } + if allowTo != nil { + allow := false + for _, group := range st.info.Groups { + if _, found := allowTo[group]; found { + log.Debugf("path %s in allowed group %s", st.info.ID, group) + allow = true + break + } + } + + if !allow { + log.Debugf("not selecting on %s, not in allowed group, allow %+v; path has %+v", st.info.ID, allowTo, st.info.Groups) + continue + } + } + urls := make([]string, len(st.info.URLs)) for k, u := range st.info.URLs { rl, err := url.Parse(u) diff --git a/sector-storage/stores/index_test.go b/sector-storage/stores/index_test.go new file mode 100644 index 00000000..7b471063 --- /dev/null +++ b/sector-storage/stores/index_test.go @@ -0,0 +1,154 @@ +package stores + +import ( + "context" + "testing" + + "github.com/google/uuid" + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/venus-sealer/sector-storage/fsutil" + "github.com/filecoin-project/venus-sealer/sector-storage/storiface" +) + +func init() { + logging.SetLogLevel("stores", "DEBUG") +} + +func newTestStorage() StorageInfo { + return StorageInfo{ + ID: ID(uuid.New().String()), + CanSeal: true, + CanStore: true, + Groups: nil, + AllowTo: nil, + } +} + +var bigFsStat = fsutil.FsStat{ + Capacity: 1 << 40, + Available: 1 << 40, + FSAvailable: 1 << 40, + Reserved: 0, + Max: 0, + Used: 0, +} + +const s32g = 32 << 30 + +func TestFindSimple(t *testing.T) { + ctx := context.Background() + + i := NewIndex() + stor1 := newTestStorage() + stor2 := newTestStorage() + + require.NoError(t, i.StorageAttach(ctx, stor1, bigFsStat)) + require.NoError(t, i.StorageAttach(ctx, stor2, bigFsStat)) + + s1 := abi.SectorID{ + Miner: 12, + Number: 34, + } + + { + si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, true) + require.NoError(t, err) + require.Len(t, si, 0) + } + + require.NoError(t, i.StorageDeclareSector(ctx, stor1.ID, s1, storiface.FTSealed, true)) + + { + si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, false) + require.NoError(t, err) + require.Len(t, si, 1) + require.Equal(t, stor1.ID, si[0].ID) + } + + { + si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, true) + require.NoError(t, err) + require.Len(t, si, 2) + } +} + +func TestFindNoAllow(t *testing.T) { + ctx := context.Background() + + i := NewIndex() + stor1 := newTestStorage() + stor1.AllowTo = []Group{"grp1"} + stor2 := newTestStorage() + + require.NoError(t, i.StorageAttach(ctx, stor1, bigFsStat)) + require.NoError(t, i.StorageAttach(ctx, stor2, bigFsStat)) + + s1 := abi.SectorID{ + Miner: 12, + Number: 34, + } + require.NoError(t, i.StorageDeclareSector(ctx, stor1.ID, s1, storiface.FTSealed, true)) + + { + si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, false) + require.NoError(t, err) + require.Len(t, si, 1) + require.Equal(t, stor1.ID, si[0].ID) + } + + { + si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, true) + require.NoError(t, err) + require.Len(t, si, 1) + require.Equal(t, stor1.ID, si[0].ID) + } +} + +func TestFindAllow(t *testing.T) { + ctx := context.Background() + + i := NewIndex() + + stor1 := newTestStorage() + stor1.AllowTo = []Group{"grp1"} + + stor2 := newTestStorage() + stor2.Groups = []Group{"grp1"} + + stor3 := newTestStorage() + stor3.Groups = []Group{"grp2"} + + require.NoError(t, i.StorageAttach(ctx, stor1, bigFsStat)) + require.NoError(t, i.StorageAttach(ctx, stor2, bigFsStat)) + require.NoError(t, i.StorageAttach(ctx, stor3, bigFsStat)) + + s1 := abi.SectorID{ + Miner: 12, + Number: 34, + } + require.NoError(t, i.StorageDeclareSector(ctx, stor1.ID, s1, storiface.FTSealed, true)) + + { + si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, false) + require.NoError(t, err) + require.Len(t, si, 1) + require.Equal(t, stor1.ID, si[0].ID) + } + + { + si, err := i.StorageFindSector(ctx, s1, storiface.FTSealed, s32g, true) + require.NoError(t, err) + require.Len(t, si, 2) + if si[0].ID == stor1.ID { + require.Equal(t, stor1.ID, si[0].ID) + require.Equal(t, stor2.ID, si[1].ID) + } else { + require.Equal(t, stor1.ID, si[1].ID) + require.Equal(t, stor2.ID, si[0].ID) + } + } +} diff --git a/sector-storage/stores/local.go b/sector-storage/stores/local.go index 7dbdb201..5e73cab5 100644 --- a/sector-storage/stores/local.go +++ b/sector-storage/stores/local.go @@ -46,6 +46,13 @@ type LocalStorageMeta struct { // MaxStorage specifies the maximum number of bytes to use for sector storage // (0 = unlimited) MaxStorage uint64 + + // List of storage groups this path belongs to + Groups []string + + // List of storage groups to which data from this path can be moved. If none + // are specified, allow to all + AllowTo []string } // StorageConfig .lotusstorage/storage.json @@ -210,6 +217,8 @@ func (st *Local) OpenPath(ctx context.Context, p string) error { MaxStorage: meta.MaxStorage, CanSeal: meta.CanSeal, CanStore: meta.CanStore, + Groups: meta.Groups, + AllowTo: meta.AllowTo, }, fst) if err != nil { return xerrors.Errorf("declaring storage in index: %w", err) @@ -274,6 +283,8 @@ func (st *Local) Redeclare(ctx context.Context) error { MaxStorage: meta.MaxStorage, CanSeal: meta.CanSeal, CanStore: meta.CanStore, + Groups: meta.Groups, + AllowTo: meta.AllowTo, }, fst) if err != nil { return xerrors.Errorf("redeclaring storage in index: %w", err) diff --git a/sector-storage/testworker_test.go b/sector-storage/testworker_test.go index 512368a0..da026d28 100644 --- a/sector-storage/testworker_test.go +++ b/sector-storage/testworker_test.go @@ -102,10 +102,6 @@ func (t *testWorker) TaskNumbers(ctx context.Context) (string, error) { return "0-0", nil } -func (t *testWorker) SectorExists(context.Context, types.TaskType, storage.SectorRef) (bool, error) { - return true, nil -} - func (t *testWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { return t.lstor.Local(ctx) } diff --git a/sector-storage/worker_local.go b/sector-storage/worker_local.go index 1d25e880..ef285845 100644 --- a/sector-storage/worker_local.go +++ b/sector-storage/worker_local.go @@ -41,7 +41,6 @@ type WorkerConfig struct { IgnoreResourceFiltering bool TaskTotal int64 - IsBindP1P2 bool } // used do provide custom proofs impl (mostly used in testing) @@ -65,8 +64,6 @@ type LocalWorker struct { taskNumber int64 taskTotal int64 - isBindP1P2 bool - session uuid.UUID testDisable int64 closing chan struct{} @@ -94,7 +91,6 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store ignoreResources: wcfg.IgnoreResourceFiltering, session: uuid.New(), closing: make(chan struct{}), - isBindP1P2: wcfg.IsBindP1P2, } if w.executor == nil { @@ -493,29 +489,6 @@ func (l *LocalWorker) TaskNumbers(context.Context) (string, error) { return str, nil } -func (l *LocalWorker) SectorExists(ctx context.Context, task types.TaskType, sector storage.SectorRef) (bool, error) { - if l.isBindP1P2 && task == types.TTPreCommit2 { - paths, _, err := l.storage.AcquireSector(ctx, sector, 0, storiface.FTSealed, storiface.PathSealing, storiface.AcquireMode("")) - if err != nil { - log.Errorf("try to find sector paths err: %s", err.Error()) - return true, nil - } - - log.Infof("find acquire sector paths: %v", paths) - bExist, err := storiface.FileExists(paths.Sealed) - if err != nil { - log.Errorf("check %s exist err: %s", paths.Sealed) - return true, nil - } - - if !bExist { - return false, nil - } - } - - return true, nil -} - func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { hostname, err := os.Hostname() // TODO: allow overriding from config if err != nil {