Skip to content

Commit

Permalink
feat: proxy support
Browse files Browse the repository at this point in the history
  • Loading branch information
sloonz committed Jan 25, 2024
1 parent a797ef3 commit 98659a5
Show file tree
Hide file tree
Showing 12 changed files with 674 additions and 58 deletions.
171 changes: 171 additions & 0 deletions cmd/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package cmd

import (
"github.com/sloonz/uback/destinations"
"github.com/sloonz/uback/lib"
"github.com/sloonz/uback/sources"

"io"
"net/rpc"
"os"

"github.com/hashicorp/yamux"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

type Destination struct {
dataStream *yamux.Stream
}

func (d *Destination) ListBackups(args *destinations.ListBackupsArgs, reply *[]uback.Backup) error {
var err error

dstOpts := newOptionsBuilder(&args.Options, nil).WithDestination()
if dstOpts.Error != nil {
return dstOpts.Error
}

*reply, err = dstOpts.Destination.ListBackups()
if err != nil {
return err
}

return nil
}

func (d *Destination) RemoveBackup(args *destinations.RemoveBackupArgs, reply *struct{}) error {
dstOpts := newOptionsBuilder(&args.Options, nil).WithDestination()
if dstOpts.Error != nil {
return dstOpts.Error
}

return dstOpts.Destination.RemoveBackup(args.Backup)
}

func (d *Destination) SendBackup(args *destinations.SendBackupArgs, reply *struct{}) error {
dstOpts := newOptionsBuilder(&args.Options, nil).WithDestination()
if dstOpts.Error != nil {
return dstOpts.Error
}

if err := dstOpts.Destination.SendBackup(args.Backup, d.dataStream); err != nil {
return err
}

return d.dataStream.Close()
}

func (d *Destination) ReceiveBackup(args *destinations.ReceiveBackupArgs, reply *struct{}) error {
dstOpts := newOptionsBuilder(&args.Options, nil).WithDestination()
if dstOpts.Error != nil {
return dstOpts.Error
}

r, err := dstOpts.Destination.ReceiveBackup(args.Backup)
if err != nil {
return err
}

if _, err = io.Copy(d.dataStream, r); err != nil {
return err
}

if err = r.Close(); err != nil {
return err
}

return d.dataStream.Close()
}

type Source struct {
dataStream *yamux.Stream
backup io.ReadCloser
}

func (s *Source) ListSnapshots(args *sources.ListSnapshotsArgs, reply *[]uback.Snapshot) error {
var err error

srcOpts := newOptionsBuilder(&args.Options, nil).WithSource()
if srcOpts.Error != nil {
return srcOpts.Error
}

*reply, err = srcOpts.Source.ListSnapshots()
if err != nil {
return err
}
return nil
}

func (s *Source) RemoveSnapshot(args *sources.RemoveSnapshotArgs, reply *struct{}) error {
srcOpts := newOptionsBuilder(&args.Options, nil).WithSource()
if srcOpts.Error != nil {
return srcOpts.Error
}

return srcOpts.Source.RemoveSnapshot(args.Snapshot)
}

func (s *Source) CreateBackup(args *sources.CreateBackupArgs, reply *uback.Backup) error {
var err error

srcOpts := newOptionsBuilder(&args.Options, nil).WithSource()
if srcOpts.Error != nil {
return srcOpts.Error
}

*reply, s.backup, err = srcOpts.Source.CreateBackup(args.Snapshot)
return err
}

func (s *Source) TransmitBackup(args *struct{}, reply *struct{}) error {
if _, err := io.Copy(s.dataStream, s.backup); err != nil {
return err
}

if err := s.backup.Close(); err != nil {
return err
}

return s.dataStream.Close()
}

var (
cmdProxy = &cobra.Command{
Use: "proxy",
Hidden: true,
Run: func(cmd *cobra.Command, args []string) {
rwc := uback.ReadWriteCloser{
ReadCloser: os.Stdin,
WriteCloser: os.Stdout,
}

session, err := yamux.Server(&rwc, nil)
if err != nil {
logrus.Fatalf("Failed to start proxy server: %v", err)
}
defer session.Close()

rpcStream, err := session.AcceptStream()
if err != nil {
logrus.Fatalf("Failed to start proxy server: %v", err)
}

dataStream, err := session.AcceptStream()
if err != nil {
logrus.Fatalf("Failed to start proxy server: %v", err)
}

if err = rpc.Register(&Destination{dataStream: dataStream}); err != nil {
logrus.Fatalf("Failed to start proxy server: %v", err)
}

if err = rpc.Register(&Source{dataStream: dataStream}); err != nil {
logrus.Fatalf("Failed to start proxy server: %v", err)
}

rpc.ServeConn(rpcStream)
},
}
)
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func init() {
})

rootCmd.PersistentFlags().StringVarP(&presetsDir, "presets-dir", "p", "", "path to presets directory")
rootCmd.AddCommand(cmdPreset, cmdBackup, cmdKey, cmdContainer, cmdList, cmdPrune, cmdFetch, cmdRestore, cmdVersion)
rootCmd.AddCommand(cmdPreset, cmdBackup, cmdKey, cmdContainer, cmdList, cmdPrune, cmdFetch, cmdRestore, cmdVersion, cmdProxy)
}

func Execute() {
Expand Down
2 changes: 2 additions & 0 deletions destinations/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func New(options *uback.Options) (uback.Destination, error) {
return newObjectStorageDestination(options)
case "command":
return newCommandDestination(options)
case "proxy":
return newProxyDestination(options)
default:
return nil, fmt.Errorf("invalid destination type %v", options.String["Type"])
}
Expand Down
155 changes: 155 additions & 0 deletions destinations/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package destinations

import (
"github.com/sloonz/uback/lib"

"errors"
"fmt"
"io"
"sync"

"github.com/sirupsen/logrus"
)

var (
ErrProxyCommandMissing = errors.New("proxy destination: missing command")
proxyLog = logrus.WithFields(logrus.Fields{
"destination": "proxy",
})
)

type ListBackupsArgs struct {
uback.Options
}

type RemoveBackupArgs struct {
uback.Options
uback.Backup
}

type SendBackupArgs struct {
uback.Options
uback.Backup
}

type ReceiveBackupArgs struct {
uback.Options
uback.Backup
}

type proxyDestination struct {
options *uback.Options
command []string
}

func newProxyDestination(options *uback.Options) (uback.Destination, error) {
command := options.GetCommand("Command", nil)
if len(command) == 0 {
return nil, ErrProxyCommandMissing
}

return &proxyDestination{options: options, command: command}, nil
}

func (d *proxyDestination) ListBackups() ([]uback.Backup, error) {
session, rpcClient, _, err := uback.OpenProxy(proxyLog, d.command)
if err != nil {
return nil, fmt.Errorf("Failed to open proxy session: %v", err)
}
defer uback.CloseProxy(session, rpcClient) //nolint: errcheck

var backups []uback.Backup
err = rpcClient.Call("Destination.ListBackups", &ListBackupsArgs{Options: uback.ProxiedOptions(d.options)}, &backups)
if err != nil {
return nil, err
}

err = uback.CloseProxy(session, rpcClient)
if err != nil {
return nil, err
}

return backups, nil
}

func (d *proxyDestination) RemoveBackup(backup uback.Backup) error {
session, rpcClient, _, err := uback.OpenProxy(proxyLog, d.command)
if err != nil {
return fmt.Errorf("Failed to open proxy session: %v", err)
}
defer uback.CloseProxy(session, rpcClient) //nolint: errcheck

return rpcClient.Call("Destination.RemoveBackup", &RemoveBackupArgs{Options: uback.ProxiedOptions(d.options), Backup: backup}, nil)
}

func (d *proxyDestination) SendBackup(backup uback.Backup, data io.Reader) error {
session, rpcClient, dataStream, err := uback.OpenProxy(proxyLog, d.command)
if err != nil {
return fmt.Errorf("Failed to open proxy session: %v", err)
}
defer uback.CloseProxy(session, rpcClient) //nolint: errcheck

call := rpcClient.Go("Destination.SendBackup", &SendBackupArgs{Options: uback.ProxiedOptions(d.options), Backup: backup}, nil, nil)

if _, err := io.Copy(dataStream, data); err != nil {
return err
}

if err := dataStream.Close(); err != nil {
return err
}

<-call.Done

return call.Error
}

func (d *proxyDestination) ReceiveBackup(backup uback.Backup) (io.ReadCloser, error) {
session, rpcClient, dataStream, err := uback.OpenProxy(proxyLog, d.command)
if err != nil {
return nil, fmt.Errorf("Failed to open proxy session: %v", err)
}

pr, pw := io.Pipe()
call := rpcClient.Go("Destination.ReceiveBackup", &ReceiveBackupArgs{Options: uback.ProxiedOptions(d.options), Backup: backup}, nil, nil)

ch := make(chan error, 2)
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
<-call.Done
ch <- call.Error
}()

wg.Add(1)
go func() {
defer wg.Done()

if _, err := io.Copy(pw, dataStream); err != nil {
ch <- err
return
}

if err := dataStream.Close(); err != nil {
ch <- err
return
}
}()

go func() {
defer uback.CloseProxy(session, rpcClient) //nolint: errcheck
wg.Wait()
close(ch)
for err := range ch {
if err != nil {
pw.CloseWithError(err)
return
}
}
pw.Close()
}()

return pr, nil
}
46 changes: 46 additions & 0 deletions doc/proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Proxying

## Overview

Sometimes, you may want to produce backup data on a remote host or store
backup data to a remote host while still keeping `uback` configuration
on a third host. For example, you may want to use a `btrfs` source
on a remote host to produce the backup but store it locally on a `fs`
destination, or use a `tar` source locally and use a `fs` destination
on another host. You may combine both requirements, and on host A
get a backup from a `btrfs` source from host B and store it in a `fs`
destination on host C.

Proxying means using another `uback` process to provide the source and/or
the destination while doing a backup. The other `uback` process may run
on another user, or in a container, or a remote host.

Note that encryption and compression is done on the local process (not
the remote one). Also, restoring with proxy is not supported ; you must
use a direct source.

## Usage

For both sources and destinations :

1. Use `proxy` as a source or destination type.

2. Spawn the other `uback` instance by setting the `command` option to
`uback proxy`.

3. Specify the proxyfied `type` and/or `command` option by prefixying
it with `proxy-`.

## Examples

Proxy a custom destination using ssh :

```
type=proxy,command="ssh root@example.com uback proxy",proxy-type=command,proxy-command=uback-custom-dest
```

Proxy a `btrfs` source using sudo :

```
type=proxy,command="sudo uback proxy",proxy-type=btrfs
```
Loading

0 comments on commit 98659a5

Please sign in to comment.