diff --git a/internal/database/indexutil/indexutil.go b/internal/database/indexutil/indexutil.go index 5549aadf..fffc1f59 100644 --- a/internal/database/indexutil/indexutil.go +++ b/internal/database/indexutil/indexutil.go @@ -7,20 +7,18 @@ import ( bolt "go.etcd.io/bbolt" ) +// IndexByteValue indexes a value and recordId tuple creating multimap from value to lists of associated recordIds. func IndexByteValue(b *bolt.Bucket, value []byte, recordId int64) error { key := serializationutil.BytesToKey(value) key = append(key, serializationutil.Itob(recordId)...) return b.Put(key, []byte{}) } +// IndexSearchByteValue searches the index given a value and returns an iterator over the associated recordIds. func IndexSearchByteValue(b *bolt.Bucket, value []byte) *IndexSearchIterator { return newSearchIterator(b, serializationutil.BytesToKey(value)) } -func IndexSearchIntValue(b *bolt.Bucket, value int64) *IndexSearchIterator { - return newSearchIterator(b, serializationutil.Itob(value)) -} - type IndexSearchIterator struct { c *bolt.Cursor k []byte diff --git a/internal/database/indexutil/indexutil_test.go b/internal/database/indexutil/indexutil_test.go new file mode 100644 index 00000000..d04eb072 --- /dev/null +++ b/internal/database/indexutil/indexutil_test.go @@ -0,0 +1,45 @@ +package indexutil + +import ( + "fmt" + "testing" + + "go.etcd.io/bbolt" +) + +func TestIndexing(t *testing.T) { + db, err := bbolt.Open(t.TempDir() + "/test.boltdb", 0600, nil) + if err != nil { + t.Fatalf("error opening database: %s", err) + } + + if err := db.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucket([]byte("test")) + if err != nil { + return fmt.Errorf("error creating bucket: %s", err) + } + for id := 0; id < 100; id += 1 { + if err := IndexByteValue(b, []byte("document"), int64(id)); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("db.Update error: %v", err) + } + + if err := db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte("test")) + ids := IndexSearchByteValue(b, []byte("document")).ToSlice() + if len(ids) != 100 { + t.Errorf("want 100 ids, got %d", len(ids)) + } + ids = IndexSearchByteValue(b, []byte("other")).ToSlice() + if len(ids) != 0 { + t.Errorf("want 0 ids, got %d", len(ids)) + } + return nil + }); err != nil { + t.Fatalf("db.View error: %v", err) + } +} diff --git a/internal/database/oplog/oplog.go b/internal/database/oplog/oplog.go index 8e5f31b4..0cfaab2b 100644 --- a/internal/database/oplog/oplog.go +++ b/internal/database/oplog/oplog.go @@ -1,7 +1,6 @@ package oplog import ( - "bytes" "errors" "fmt" "os" @@ -30,6 +29,7 @@ var ( OpLogBucket = []byte("oplog.log") // oplog stores the operations themselves RepoIndexBucket = []byte("oplog.repo_idx") // repo_index tracks IDs of operations affecting a given repo PlanIndexBucket = []byte("oplog.plan_idx") // plan_index tracks IDs of operations affecting a given plan + IndexedSnapshotsSetBucket = []byte("oplog.indexed_snapshots") // indexed_snapshots is a set of snapshot IDs that have been indexed ) // OpLog represents a log of operations performed. @@ -53,7 +53,9 @@ func NewOpLog(databasePath string) (*OpLog, error) { // Create the buckets if they don't exist if err := db.Update(func(tx *bolt.Tx) error { - for _, bucket := range [][]byte{SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket} { + for _, bucket := range [][]byte{ + SystemBucket, OpLogBucket, RepoIndexBucket, PlanIndexBucket, IndexedSnapshotsSetBucket, + } { if _, err := tx.CreateBucketIfNotExists(bucket); err != nil { return fmt.Errorf("error creating bucket %s: %s", string(bucket), err) } @@ -70,48 +72,104 @@ func (o *OpLog) Close() error { return o.db.Close() } +// Add adds a generic operation to the operation log. func (o *OpLog) Add(op *v1.Operation) error { if op.Id != 0 { return errors.New("operation already has an ID, OpLog.Add is expected to set the ID") } err := o.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(OpLogBucket) - - id, err := b.NextSequence() + err := o.addOperationHelper(tx, op) if err != nil { - return fmt.Errorf("error getting next sequence: %w", err) + return err } + return nil + }) + if err == nil { + o.notifyHelper(EventTypeOpCreated, op) + } - op.Id = int64(id) + return err +} - bytes, err := proto.Marshal(op) - if err != nil { - return fmt.Errorf("error marshalling operation: %w", err) +func (o *OpLog) BulkAdd(ops []*v1.Operation) { + o.db.Update(func(tx *bolt.Tx) error { + for _, op := range ops { + if err := o.addOperationHelper(tx, op); err != nil { + return err + } } + return nil + }) +} +func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error { + b := tx.Bucket(OpLogBucket) - if err := b.Put(serializationutil.Itob(op.Id), bytes); err != nil { - return fmt.Errorf("error putting operation into bucket: %w", err) + id, err := b.NextSequence() + if err != nil { + return fmt.Errorf("error getting next sequence: %w", err) + } + + op.Id = int64(id) + + bytes, err := proto.Marshal(op) + if err != nil { + return fmt.Errorf("error marshalling operation: %w", err) + } + + + if err := b.Put(serializationutil.Itob(op.Id), bytes); err != nil { + return fmt.Errorf("error putting operation into bucket: %w", err) + } + + // Update always universal indices + if op.RepoId != "" { + if err := indexutil.IndexByteValue(tx.Bucket(RepoIndexBucket), []byte(op.RepoId), op.Id); err != nil { + return fmt.Errorf("error adding operation to repo index: %w", err) + } + } + if op.PlanId != "" { + if err := indexutil.IndexByteValue(tx.Bucket(PlanIndexBucket), []byte(op.PlanId), op.Id); err != nil { + return fmt.Errorf("error adding operation to repo index: %w", err) } + } - if op.RepoId != "" { - if err := indexutil.IndexByteValue(tx.Bucket(RepoIndexBucket), []byte(op.RepoId), op.Id); err != nil { - return fmt.Errorf("error adding operation to repo index: %w", err) - } + // Update operation type dependent indices. + switch wrappedOp := op.Op.(type) { + case *v1.Operation_OperationBackup: + // Nothing extra to be done. + case *v1.Operation_OperationIndexSnapshot: + if wrappedOp.OperationIndexSnapshot == nil || wrappedOp.OperationIndexSnapshot.Snapshot == nil { + return errors.New("op.OperationIndexSnapshot or op.OperationIndexSnapshot.Snapshot is nil") } - if op.PlanId != "" { - if err := indexutil.IndexByteValue(tx.Bucket(PlanIndexBucket), []byte(op.PlanId), op.Id); err != nil { - return fmt.Errorf("error adding operation to repo index: %w", err) - } + snapshotId := serializationutil.NormalizeSnapshotId(wrappedOp.OperationIndexSnapshot.Snapshot.Id) + key := serializationutil.BytesToKey([]byte(snapshotId)) + if err := tx.Bucket(IndexedSnapshotsSetBucket).Put(key, serializationutil.Itob(op.Id)); err != nil { + return fmt.Errorf("error adding OperationIndexSnapshot to indexed snapshots set: %w", err) } + default: + return fmt.Errorf("unknown operation type: %T", wrappedOp) + } + return nil +} + +func (o *OpLog) HasIndexedSnapshot(snapshotId string) (int64, error) { + var id int64 + if err := o.db.View(func(tx *bolt.Tx) error { + key := serializationutil.BytesToKey([]byte(snapshotId)) + idBytes := tx.Bucket(IndexedSnapshotsSetBucket).Get(key) + if idBytes == nil { + id = -1 + } else { + id = serializationutil.Btoi(idBytes) + } return nil - }) - if err == nil { - o.notifyHelper(EventTypeOpCreated, op) + }); err != nil { + return 0, err } - return err + return id, nil } func (o *OpLog) Update(op *v1.Operation) error { @@ -237,33 +295,6 @@ func (o *OpLog) Unsubscribe(callback *func(EventType, *v1.Operation)) { } } -// addOpToIndexBucket adds the given operation ID to the given index bucket for the given key ID. -func (o *OpLog) addOpToIndexBucket(tx *bolt.Tx, bucket []byte, indexId string, opId int64) error { - b := tx.Bucket(bucket) - - var key []byte - key = append(key, serializationutil.Stob(indexId)...) - key = append(key, serializationutil.Itob(opId)...) - if err := b.Put(key, []byte{}); err != nil { - return fmt.Errorf("error adding operation to repo index: %w", err) - } - return nil -} - -// readOpsFromIndexBucket reads all operations from the given index bucket for the given key ID. -func (o *OpLog) readOpsFromIndexBucket(tx *bolt.Tx, bucket []byte, indexId string) ([]int64, error) { - b := tx.Bucket(bucket) - - var ops []int64 - c := b.Cursor() - prefix := serializationutil.Stob(indexId) - for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() { - ops = append(ops, serializationutil.Btoi(k[len(prefix):])) - } - - return ops, nil -} - type Filter func([]int64)[]int64 func FilterKeepAll() Filter { diff --git a/internal/database/oplog/oplog_test.go b/internal/database/oplog/oplog_test.go index f5780024..f9d2993a 100644 --- a/internal/database/oplog/oplog_test.go +++ b/internal/database/oplog/oplog_test.go @@ -197,6 +197,45 @@ func TestBigIO(t *testing.T) { } } +func TestIndexSnapshot(t *testing.T) { + t.Parallel() + log, err := NewOpLog(t.TempDir() + "/test.boltdb") + if err != nil { + t.Fatalf("error creating oplog: %s", err) + } + + op := &v1.Operation{ + PlanId: "plan1", + RepoId: "repo1", + Op: &v1.Operation_OperationIndexSnapshot{ + OperationIndexSnapshot: &v1.OperationIndexSnapshot{ + Snapshot: &v1.ResticSnapshot{ + Id: "abcdefghijklmnop", + }, + }, + }, + } + if err := log.Add(op); err != nil { + t.Fatalf("error adding operation: %s", err) + } + + id, err := log.HasIndexedSnapshot("abcdefgh") + if err != nil { + t.Fatalf("error checking for snapshot: %s", err) + } + if id != op.Id { + t.Fatalf("want id %d, got %d", op.Id, id) + } + + id, err = log.HasIndexedSnapshot("notfound") + if err != nil { + t.Fatalf("error checking for snapshot: %s", err) + } + if id != -1 { + t.Fatalf("want id -1, got %d", id) + } +} + func collectMessages(ops []*v1.Operation) []string { var messages []string for _, op := range ops {