diff --git a/internal/api/server.go b/internal/api/server.go index ece254e9..16714310 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -161,21 +161,21 @@ func (s *Server) ListSnapshotFiles(ctx context.Context, query *v1.ListSnapshotFi func (s *Server) GetOperationEvents(_ *emptypb.Empty, stream v1.ResticUI_GetOperationEventsServer) error { errorChan := make(chan error) defer close(errorChan) - callback := func(eventType oplog.EventType, op *v1.Operation) { + callback := func(oldOp *v1.Operation, newOp *v1.Operation) { var eventTypeMapped v1.OperationEventType - switch eventType { - case oplog.EventTypeOpCreated: + eventType := oplog.EventTypeUnknown + if oldOp == nil && newOp != nil { eventTypeMapped = v1.OperationEventType_EVENT_CREATED - case oplog.EventTypeOpUpdated: + } else if oldOp != nil && newOp != nil { eventTypeMapped = v1.OperationEventType_EVENT_UPDATED - default: + } else { zap.L().Error("Unknown event type", zap.Int("eventType", int(eventType))) - eventTypeMapped = v1.OperationEventType_EVENT_UNKNOWN + return } event := &v1.OperationEvent{ Type: eventTypeMapped, - Operation: op, + Operation: newOp, } go func() { diff --git a/internal/oplog/oplog.go b/internal/oplog/oplog.go index 5586d286..2ca536d9 100644 --- a/internal/oplog/oplog.go +++ b/internal/oplog/oplog.go @@ -44,7 +44,7 @@ type OpLog struct { db *bolt.DB subscribersMu sync.RWMutex - subscribers []*func(EventType, *v1.Operation) + subscribers []*func(*v1.Operation, *v1.Operation) nextId atomic.Int64 } @@ -138,7 +138,7 @@ func (o *OpLog) Add(op *v1.Operation) error { return nil }) if err == nil { - o.notifyHelper(EventTypeOpCreated, op) + o.notifyHelper(nil, op) } return err } @@ -157,7 +157,7 @@ func (o *OpLog) BulkAdd(ops []*v1.Operation) error { }) if err == nil { for _, op := range ops { - o.notifyHelper(EventTypeOpCreated, op) + o.notifyHelper(nil, op) } } return err @@ -167,9 +167,11 @@ func (o *OpLog) Update(op *v1.Operation) error { if op.Id == 0 { return errors.New("operation does not have an ID, OpLog.Update expects operation with an ID") } - + var oldOp *v1.Operation err := o.db.Update(func(tx *bolt.Tx) error { - if err := o.deleteOperationHelper(tx, op.Id); err != nil { + var err error + oldOp, err = o.deleteOperationHelper(tx, op.Id) + if err != nil { return fmt.Errorf("deleting existing value prior to update: %w", err) } if err := o.addOperationHelper(tx, op); err != nil { @@ -178,7 +180,7 @@ func (o *OpLog) Update(op *v1.Operation) error { return nil }) if err == nil { - o.notifyHelper(EventTypeOpUpdated, op) + o.notifyHelper(oldOp, op) } return err } @@ -189,7 +191,7 @@ func (o *OpLog) Delete(id int64) error { if val == nil { return ErrNotExist } - if err := o.deleteOperationHelper(tx, id); err != nil { + if _, err := o.deleteOperationHelper(tx, id); err != nil { return fmt.Errorf("deleting operation %v: %w", id, err) } @@ -204,11 +206,11 @@ func (o *OpLog) Delete(id int64) error { return err } -func (o *OpLog) notifyHelper(eventType EventType, op *v1.Operation) { +func (o *OpLog) notifyHelper(old *v1.Operation, new *v1.Operation) { o.subscribersMu.RLock() defer o.subscribersMu.RUnlock() for _, sub := range o.subscribers { - (*sub)(eventType, op) + (*sub)(old, new) } } @@ -280,37 +282,37 @@ func (o *OpLog) addOperationHelper(tx *bolt.Tx, op *v1.Operation) error { return nil } -func (o *OpLog) deleteOperationHelper(tx *bolt.Tx, id int64) error { +func (o *OpLog) deleteOperationHelper(tx *bolt.Tx, id int64) (*v1.Operation, error) { b := tx.Bucket(OpLogBucket) prevValue, err := o.getOperationHelper(b, id) if err != nil { - return fmt.Errorf("getting operation %v: %w", id, err) + return nil, fmt.Errorf("getting operation %v: %w", id, err) } if prevValue.PlanId != "" { if err := indexutil.IndexRemoveByteValue(tx.Bucket(PlanIndexBucket), []byte(prevValue.PlanId), id); err != nil { - return fmt.Errorf("removing operation %v from plan index: %w", id, err) + return nil, fmt.Errorf("removing operation %v from plan index: %w", id, err) } } if prevValue.RepoId != "" { if err := indexutil.IndexRemoveByteValue(tx.Bucket(RepoIndexBucket), []byte(prevValue.RepoId), id); err != nil { - return fmt.Errorf("removing operation %v from repo index: %w", id, err) + return nil, fmt.Errorf("removing operation %v from repo index: %w", id, err) } } if prevValue.SnapshotId != "" { if err := indexutil.IndexRemoveByteValue(tx.Bucket(SnapshotIndexBucket), []byte(prevValue.SnapshotId), id); err != nil { - return fmt.Errorf("removing operation %v from snapshot index: %w", id, err) + return nil, fmt.Errorf("removing operation %v from snapshot index: %w", id, err) } } if err := b.Delete(serializationutil.Itob(id)); err != nil { - return fmt.Errorf("deleting operation %v from bucket: %w", id, err) + return nil, fmt.Errorf("deleting operation %v from bucket: %w", id, err) } - return nil + return prevValue, nil } func (o *OpLog) Get(id int64) (*v1.Operation, error) { @@ -382,13 +384,13 @@ func (o *OpLog) ForAll(do func(op *v1.Operation) error) error { return nil } -func (o *OpLog) Subscribe(callback *func(EventType, *v1.Operation)) { +func (o *OpLog) Subscribe(callback *func(*v1.Operation, *v1.Operation)) { o.subscribersMu.Lock() defer o.subscribersMu.Unlock() o.subscribers = append(o.subscribers, callback) } -func (o *OpLog) Unsubscribe(callback *func(EventType, *v1.Operation)) { +func (o *OpLog) Unsubscribe(callback *func(*v1.Operation, *v1.Operation)) { o.subscribersMu.Lock() defer o.subscribersMu.Unlock() subs := o.subscribers diff --git a/internal/oplog/oplog_test.go b/internal/oplog/oplog_test.go index 0829c341..0ceab29f 100644 --- a/internal/oplog/oplog_test.go +++ b/internal/oplog/oplog_test.go @@ -103,7 +103,6 @@ func TestAddOperation(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - t.Parallel() if err := log.Add(tc.op); (err != nil) != tc.wantErr { t.Errorf("Add() error = %v, wantErr %v", err, tc.wantErr) }