Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bcachefs): initial implementation of bcachefs. #2375

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions pkg/storage/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
log "github.com/rs/zerolog/log"
"github.com/threefoldtech/zos/pkg"
"github.com/threefoldtech/zos/pkg/gridtypes"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -145,10 +146,38 @@ func (s *Module) DiskWrite(name string, image string) error {
return fmt.Errorf("image size is bigger than disk")
}

_, err = io.Copy(file, source)
if err != nil {
var (
g = new(errgroup.Group)
concurrentNUm int = 5
imgSize int64 = imgStat.Size()
chunkSize = imgSize / int64(concurrentNUm)
)

log.Info().Int("concurrentNum", concurrentNUm).Msg("writing image concurrently")

for i := 0; i < concurrentNUm; i++ {
index := i
g.Go(func() error {
start := chunkSize * int64(index)
len := chunkSize
if index == concurrentNUm-1 {
len = imgSize - start
}
wr := io.NewOffsetWriter(file, start)
rd := io.NewSectionReader(source, start, len)
_, err = io.Copy(wr, rd)
return err
})
}
if err := g.Wait(); err != nil {
return errors.Wrap(err, "failed to write disk image")
}
log.Info().Msg("writing image finished")

/*_, err = io.Copy(file, source)
if err != nil {
return errors.Wrap(err, "failed to write disk image")
}*/

return nil
}
Expand Down
330 changes: 330 additions & 0 deletions pkg/storage/filesystem/bcachefs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
package filesystem

import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"syscall"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/zos/pkg/gridtypes/zos"
)

// NewBcachefsPool creates a btrfs pool associated with device.
// if device does not have a filesystem one is created
func NewBcachefsPool(device DeviceInfo) (Pool, error) {
return newBcachefsPool(device, executerFunc(run))
}

func newBcachefsPool(device DeviceInfo, exe executer) (Pool, error) {
pool := &bcachefsPool{
device: device,
utils: newBcachefsCmd(exe),
name: device.Label,
}

return pool, pool.prepare()
}

var (
errNotImplemented = errors.New("not implemented")
)

type bcachefsPool struct {
device DeviceInfo
utils bcachefsUtils
name string
}

func (p *bcachefsPool) prepare() error {
//p.name = uuid.New().String() //p.device.Label TODO
// check if already have filesystem
if p.device.Used() {
return nil
}
ctx := context.TODO()

// otherwise format
if err := p.format(ctx); err != nil {
return err
}
// make sure kernel knows about this
return Partprobe(ctx)
}

func (p *bcachefsPool) format(ctx context.Context) error {
name := uuid.New().String()
p.name = name

args := []string{
"-L", name,
p.device.Path,
}
log.Info().Str("device", p.device.Path).Msg("formatting device")

if _, err := p.utils.run(ctx, "mkfs.bcachefs", args...); err != nil {
return errors.Wrapf(err, "failed to format device '%s'", p.device.Path)
}

return nil
}

// Volume ID
func (b *bcachefsPool) ID() int {
return 0
}

// Path of the volume
func (b *bcachefsPool) Path() string {
return filepath.Join("/mnt", b.name)
}

// Usage returns the pool usage
func (b *bcachefsPool) Usage() (usage Usage, err error) {
mnt, err := b.Mounted()
if err != nil {
return usage, err
}

volumes, err := b.Volumes()

if err != nil {
return usage, errors.Wrapf(err, "failed to list pool '%s' volumes", mnt)
}

usage.Size = b.device.Size

for _, volume := range volumes {
vol, err := volume.Usage()
if err != nil {
return Usage{}, errors.Wrapf(err, "failed to calculate volume '%s' usage", volume.Path())
}

usage.Used += vol.Used
usage.Excl += vol.Excl
}

return
}

// Limit on a pool is not supported yet
func (b *bcachefsPool) Limit(size uint64) error {
return errNotImplemented // btrfs also doesn't support this
}

// Name of the volume
func (b *bcachefsPool) Name() string {
return b.name
}

// FsType of the volume
func (b *bcachefsPool) FsType() string {
return "bcachefs"
}

// Mounted returns whether the pool is mounted or not. If it is mounted,
// the mountpoint is returned
// It doesn't check the default mount location of the pool
// but instead check if any of the pool devices is mounted
// under any location
func (p *bcachefsPool) Mounted() (string, error) {
ctx := context.TODO()
mnt, err := p.device.Mountpoint(ctx)
if err != nil {
return "", err
}

if len(mnt) != 0 {
return mnt, nil
}

return "", ErrDeviceNotMounted
}

// Mount the pool, the mountpoint is returned
func (p *bcachefsPool) Mount() (string, error) {
mnt, err := p.Mounted()
if err == nil {
return mnt, nil
} else if !errors.Is(err, ErrDeviceNotMounted) {
return "", errors.Wrap(err, "failed to check device mount status")
}

// device is not mounted
mnt = p.Path()
if err := os.MkdirAll(mnt, 0755); err != nil {
return "", err
}

if err := syscall.Mount(p.device.Path, mnt, "bcachefs", 0, ""); err != nil {
return "", err
}

// TODO: check
//if err := p.utils.QGroupEnable(ctx, mnt); err != nil {
// return "", fmt.Errorf("failed to enable qgroup: %w", err)
//}

return mnt, p.maintenance()
}

func (p *bcachefsPool) maintenance() error {
// TODO
return nil
}

// UnMount the pool
func (p *bcachefsPool) UnMount() error {
mnt, err := p.Mounted()
if err != nil {
if errors.Is(err, ErrDeviceNotMounted) {
return nil
}
return err
}

return syscall.Unmount(mnt, syscall.MNT_DETACH)
}

// Volumes are all subvolumes of this volume
// TODO: bcachefs doesn't have the feature
func (p *bcachefsPool) Volumes() ([]Volume, error) {
mnt, err := p.Mounted()
if err != nil {
return nil, err
}

var volumes []Volume

subs, err := p.utils.SubvolumeList(context.Background(), mnt)
if err != nil {
return nil, fmt.Errorf("subvolumelist failed: %v", err)
}

for _, sub := range subs {
volumes = append(volumes, sub.ToStorageVolume(mnt))
}
return volumes, nil
}

// AddVolume adds a new subvolume with the given name
func (p *bcachefsPool) AddVolume(name string) (Volume, error) {
mnt, err := p.Mounted()
if err != nil {
return nil, err
}

root := filepath.Join(mnt, name)
return p.addVolume(root)
}

func (p *bcachefsPool) addVolume(root string) (Volume, error) {
ctx := context.Background()
vol, err := p.utils.SubvolumeAdd(ctx, root)
if err != nil {
return nil, err
}

//volume, err := p.utils.SubvolumeInfo(ctx, root)
//if err != nil {
// return nil, err
//}
return &vol, nil
}

// RemoveVolume removes a subvolume with the given name
func (b *bcachefsPool) RemoveVolume(name string) error {
mnt, err := b.Mounted()
if err != nil {
return err
}

root := filepath.Join(mnt, name)
return b.removeVolume(root)
}

func (p *bcachefsPool) removeVolume(root string) error {
ctx := context.Background()

//info, err := p.utils.SubvolumeInfo(ctx, root)
//if err != nil {
// return err
//}

if err := p.utils.SubvolumeRemove(ctx, root); err != nil {
return err
}

/*qgroupID := fmt.Sprintf("0/%d", info.ID)
if err := p.utils.QGroupDestroy(ctx, qgroupID, p.Path()); err != nil {
// we log here and not return an error because
// - qgroup deletion can fail because it is still used by the system
// even if the volume is gone
// - failure to delete a qgroup is not a fatal error
log.Warn().Err(err).Str("group-id", qgroupID).Msg("failed to delete qgroup")
return nil
}*/

return nil
}

// Shutdown spins down the device where the pool is mounted
func (b *bcachefsPool) Shutdown() error {
cmd := exec.Command("hdparm", "-y", b.device.Path)
if err := cmd.Run(); err != nil {
return errors.Wrapf(err, "failed to shutdown device '%s'", b.device.Path)
}

return nil
}

// Device return device associated with pool
func (b *bcachefsPool) Device() DeviceInfo {
return b.device
}

// SetType sets a device type on the pool. this will make
// sure that the detected device type is reported
// correctly by calling the Type() method.
// TODO : merge the code with btrfs
func (b *bcachefsPool) SetType(typ zos.DeviceType) error {
path, err := b.Mounted()
if err != nil {
return err
}
diskTypePath := filepath.Join(path, ".seektime")
if err := os.WriteFile(diskTypePath, []byte(typ), 0644); err != nil {
return errors.Wrapf(err, "failed to store device type for '%s' in '%s'", b.Name(), diskTypePath)
}

return nil
}

// Type returns the device type set by a previous call
// to SetType.
// TODO : merge the code with btrfs
func (b *bcachefsPool) Type() (zos.DeviceType, bool, error) {
path, err := b.Mounted()
if err != nil {
return "", false, err
}
diskTypePath := filepath.Join(path, ".seektime")
diskType, err := os.ReadFile(diskTypePath)
if os.IsNotExist(err) {
return "", false, nil
}

if err != nil {
return "", false, err
}

if len(diskType) == 0 {
return "", false, nil
}

return zos.DeviceType(diskType), true, nil
}
Loading
Loading