Skip to content

Commit

Permalink
fix: standardize on fully qualified snapshot_id and decouple protobuf…
Browse files Browse the repository at this point in the history
…s from restic package
  • Loading branch information
garethgeorge committed Nov 26, 2023
1 parent 0221a23 commit e6031bf
Show file tree
Hide file tree
Showing 16 changed files with 379 additions and 162 deletions.
51 changes: 25 additions & 26 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,35 @@ name: Build and Test

on:
push:
branches: [ "main" ]
branches: ["main"]
pull_request:
branches: [ "main" ]
branches: ["main"]
workflow_dispatch:

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'
- name: Setup NodeJS
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install Restic
run: sudo apt install -y restic
- name: Build WebUI
run: cd webui && npm install && npm run build

- name: Build
run: go build -v ./...

- name: Test
run: go test ./...
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.21"

- name: Setup NodeJS
uses: actions/setup-node@v4
with:
node-version: "20"

- name: Install Restic
run: sudo apt install -y restic && restic self-update --output ./restic

- name: Build WebUI
run: cd webui && npm install && npm run build

- name: Build
run: go build -v ./...

- name: Test
run: PATH=$(pwd):$PATH go test ./...
3 changes: 1 addition & 2 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func loggingFunc(l *zap.Logger) logging.Logger {
})
}


func serveGRPC(ctx context.Context, socket string, server *Server) error {
lis, err := net.Listen("unix", socket)
if err != nil {
Expand Down Expand Up @@ -110,4 +109,4 @@ func ServeAPI(ctx context.Context, server *Server, mux *http.ServeMux) error {
apiMux := runtime.NewServeMux()
mux.Handle("/api/", http.StripPrefix("/api", apiMux))
return serveHTTPHandlers(ctx, server, apiMux)
}
}
4 changes: 2 additions & 2 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/garethgeorge/resticui/internal/oplog"
"github.com/garethgeorge/resticui/internal/oplog/indexutil"
"github.com/garethgeorge/resticui/internal/orchestrator"
"github.com/garethgeorge/resticui/internal/protoutil"
"github.com/garethgeorge/resticui/pkg/restic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -110,7 +111,6 @@ func (s *Server) ListSnapshots(ctx context.Context, query *v1.ListSnapshotsReque
if err != nil {
return nil, fmt.Errorf("failed to get plan %q: %w", query.PlanId, err)
}

snapshots, err = repo.SnapshotsForPlan(ctx, plan)
} else {
snapshots, err = repo.Snapshots(ctx)
Expand All @@ -123,7 +123,7 @@ func (s *Server) ListSnapshots(ctx context.Context, query *v1.ListSnapshotsReque
// Transform the snapshots and return them.
var rs []*v1.ResticSnapshot
for _, snapshot := range snapshots {
rs = append(rs, snapshot.ToProto())
rs = append(rs, protoutil.SnapshotToProto(snapshot))
}

return &v1.ResticSnapshotList{
Expand Down
25 changes: 14 additions & 11 deletions internal/oplog/oplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
v1 "github.com/garethgeorge/resticui/gen/go/v1"
"github.com/garethgeorge/resticui/internal/oplog/indexutil"
"github.com/garethgeorge/resticui/internal/oplog/serializationutil"
"github.com/garethgeorge/resticui/internal/protoutil"
"github.com/garethgeorge/resticui/pkg/restic"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand All @@ -35,6 +37,7 @@ var (
RepoIndexBucket = []byte("oplog.repo_idx") // repo_index tracks IDs of operations affecting a given repo
PlanIndexBucket = []byte("oplog.plan_idx") // plan_index tracks IDs of operations affecting a given plan
SnapshotIndexBucket = []byte("oplog.snapshot_idx") // snapshot_index tracks IDs of operations affecting a given snapshot
indexBuckets = [][]byte{RepoIndexBucket, PlanIndexBucket, SnapshotIndexBucket}
)

// OpLog represents a log of operations performed.
Expand Down Expand Up @@ -63,6 +66,11 @@ func NewOpLog(databasePath string) (*OpLog, error) {
o.nextId.Store(1)

if err := db.Update(func(tx *bolt.Tx) error {
sysBucket, err := tx.CreateBucketIfNotExists(SystemBucket)
if err != nil {
return fmt.Errorf("creating system bucket: %s", err)
}

// Create the buckets if they don't exist
for _, bucket := range [][]byte{
SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket, SnapshotIndexBucket,
Expand All @@ -72,8 +80,6 @@ func NewOpLog(databasePath string) (*OpLog, error) {
}
}

sysBucket := tx.Bucket(SystemBucket)

// Validate the operation log on startup.
opLogBucket := tx.Bucket(OpLogBucket)
c := opLogBucket.Cursor()
Expand Down Expand Up @@ -216,7 +222,9 @@ func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error {
}
}

op.SnapshotId = NormalizeSnapshotId(op.SnapshotId)
if err := protoutil.ValidateOperation(op); err != nil {
return fmt.Errorf("validating operation: %w", err)
}

bytes, err := proto.Marshal(op)
if err != nil {
Expand Down Expand Up @@ -315,7 +323,9 @@ func (o *OpLog) GetByPlan(planId string, collector indexutil.Collector) ([]*v1.O
}

func (o *OpLog) GetBySnapshotId(snapshotId string, collector indexutil.Collector) ([]*v1.Operation, error) {
snapshotId = NormalizeSnapshotId(snapshotId)
if err := restic.ValidateSnapshotId(snapshotId); err != nil {
return nil, err
}
var err error
var ops []*v1.Operation
o.db.View(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -374,10 +384,3 @@ func (o *OpLog) Unsubscribe(callback *func(EventType, *v1.Operation)) {
}
}
}

func NormalizeSnapshotId(id string) string {
if len(id) < 8 {
return id
}
return id[:8]
}
35 changes: 24 additions & 11 deletions internal/oplog/oplog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/garethgeorge/resticui/internal/oplog/indexutil"
)

const (
snapshotId = "1234567890123456789012345678901234567890123456789012345678901234"
snapshotId2 = "abcdefgh01234567890123456789012345678901234567890123456789012345"
)

func TestCreate(t *testing.T) {
// t.Parallel()
log, err := NewOpLog(t.TempDir() + "/test.boltdb")
Expand All @@ -21,7 +26,6 @@ func TestCreate(t *testing.T) {
}

func TestAddOperation(t *testing.T) {
// t.Parallel()
log, err := NewOpLog(t.TempDir() + "/test.boltdb")
if err != nil {
t.Fatalf("error creating oplog: %s", err)
Expand All @@ -38,12 +42,14 @@ func TestAddOperation(t *testing.T) {
op: &v1.Operation{
UnixTimeStartMs: 1234,
},
wantErr: false,
wantErr: true,
},
{
name: "basic backup operation",
op: &v1.Operation{
UnixTimeStartMs: 1234,
RepoId: "testrepo",
PlanId: "testplan",
Op: &v1.Operation_OperationBackup{},
},
wantErr: false,
Expand All @@ -52,6 +58,8 @@ func TestAddOperation(t *testing.T) {
name: "basic snapshot operation",
op: &v1.Operation{
UnixTimeStartMs: 1234,
RepoId: "testrepo",
PlanId: "testplan",
Op: &v1.Operation_OperationIndexSnapshot{
OperationIndexSnapshot: &v1.OperationIndexSnapshot{
Snapshot: &v1.ResticSnapshot{
Expand All @@ -66,31 +74,36 @@ func TestAddOperation(t *testing.T) {
name: "operation with ID",
op: &v1.Operation{
Id: 1,
RepoId: "testrepo",
PlanId: "testplan",
UnixTimeStartMs: 1234,
Op: &v1.Operation_OperationBackup{},
},
wantErr: true,
},
{
name: "operation with repo",
name: "operation with repo only",
op: &v1.Operation{
UnixTimeStartMs: 1234,
RepoId: "testrepo",
Op: &v1.Operation_OperationBackup{},
},
wantErr: true,
},
{
name: "operation with plan",
name: "operation with plan only",
op: &v1.Operation{
UnixTimeStartMs: 1234,
PlanId: "testplan",
Op: &v1.Operation_OperationBackup{},
},
wantErr: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if err := log.Add(tc.op); (err != nil) != tc.wantErr {
t.Errorf("Add() error = %v, wantErr %v", err, tc.wantErr)
}
Expand Down Expand Up @@ -250,14 +263,14 @@ func TestIndexSnapshot(t *testing.T) {
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
SnapshotId: "abcdefgh",
SnapshotId: snapshotId,
Op: &v1.Operation_OperationIndexSnapshot{},
}
if err := log.Add(op); err != nil {
t.Fatalf("error adding operation: %s", err)
}

ops, err := log.GetBySnapshotId("abcdefgh", indexutil.CollectAll())
ops, err := log.GetBySnapshotId(snapshotId, indexutil.CollectAll())
if err != nil {
t.Fatalf("error checking for snapshot: %s", err)
}
Expand All @@ -282,7 +295,7 @@ func TestUpdateOperation(t *testing.T) {
UnixTimeStartMs: 1234,
PlanId: "oldplan",
RepoId: "oldrepo",
SnapshotId: "12345678",
SnapshotId: snapshotId,
}
if err := log.Add(op); err != nil {
t.Fatalf("error adding operation: %s", err)
Expand All @@ -300,14 +313,14 @@ func TestUpdateOperation(t *testing.T) {
} else if len(ops) != 1 {
t.Fatalf("want 1 operation, got %d", len(ops))
}
if ops, err := log.GetBySnapshotId("12345678", indexutil.CollectAll()); err != nil {
if ops, err := log.GetBySnapshotId(snapshotId, indexutil.CollectAll()); err != nil {
t.Fatalf("error checking for snapshot: %s", err)
} else if len(ops) != 1 {
t.Fatalf("want 1 operation, got %d", len(ops))
}

// Update indexed values
op.SnapshotId = "abcdefgh"
op.SnapshotId = snapshotId2
op.PlanId = "myplan"
op.RepoId = "myrepo"
if err := log.Update(op); err != nil {
Expand All @@ -318,7 +331,7 @@ func TestUpdateOperation(t *testing.T) {
if opId != op.Id {
t.Errorf("want operation ID %d, got %d", opId, op.Id)
}
if ops, err := log.GetBySnapshotId("abcdefgh", indexutil.CollectAll()); err != nil {
if ops, err := log.GetBySnapshotId(snapshotId2, indexutil.CollectAll()); err != nil {
t.Fatalf("error checking for snapshot: %s", err)
} else if len(ops) != 1 {
t.Fatalf("want 1 operation, got %d", len(ops))
Expand Down Expand Up @@ -347,7 +360,7 @@ func TestUpdateOperation(t *testing.T) {
} else if len(ops) != 0 {
t.Fatalf("want 0 operations, got %d", len(ops))
}
if ops, err := log.GetBySnapshotId("12345678", indexutil.CollectAll()); err != nil {
if ops, err := log.GetBySnapshotId(snapshotId, indexutil.CollectAll()); err != nil {
t.Fatalf("error checking for snapshot: %s", err)
} else if len(ops) != 0 {
t.Fatalf("want 0 operations, got %d", len(ops))
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestSnapshotParenting(t *testing.T) {
prev := snapshots[i-1]
curr := snapshots[i]

if prev.ToProto().UnixTimeMs >= curr.ToProto().UnixTimeMs {
if prev.UnixTimeMs() >= curr.UnixTimeMs() {
t.Errorf("snapshots are out of order")
}

Expand Down
10 changes: 7 additions & 3 deletions internal/orchestrator/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
v1 "github.com/garethgeorge/resticui/gen/go/v1"
"github.com/garethgeorge/resticui/internal/oplog"
"github.com/garethgeorge/resticui/internal/oplog/indexutil"
"github.com/garethgeorge/resticui/internal/protoutil"
"github.com/garethgeorge/resticui/pkg/restic"
"github.com/gitploy-io/cronexpr"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -114,7 +115,7 @@ func backupHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan
}
lastSent = time.Now()

backupOp.OperationBackup.LastStatus = entry.ToProto()
backupOp.OperationBackup.LastStatus = protoutil.BackupProgressEntryToProto(entry)
if err := orchestrator.OpLog.Update(op); err != nil {
zap.S().Errorf("failed to update oplog with progress for backup: %v", err)
}
Expand All @@ -125,7 +126,10 @@ func backupHelper(ctx context.Context, orchestrator *Orchestrator, plan *v1.Plan
}

op.SnapshotId = summary.SnapshotId
backupOp.OperationBackup.LastStatus = summary.ToProto()
backupOp.OperationBackup.LastStatus = protoutil.BackupProgressEntryToProto(summary)
if backupOp.OperationBackup.LastStatus == nil {
return fmt.Errorf("expected a final backup progress entry, got nil")
}

zap.L().Info("backup complete", zap.String("plan", plan.Id), zap.Duration("duration", time.Since(startTime)))
return nil
Expand Down Expand Up @@ -167,7 +171,7 @@ func indexSnapshotsHelper(ctx context.Context, orchestrator *Orchestrator, plan
continue
}

snapshotProto := snapshot.ToProto()
snapshotProto := protoutil.SnapshotToProto(snapshot)
indexOps = append(indexOps, &v1.Operation{
RepoId: plan.Repo,
PlanId: plan.Id,
Expand Down
Loading

0 comments on commit e6031bf

Please sign in to comment.