Skip to content

Commit

Permalink
fix(archive): Only delete offloaded data we do not need. Fixes #2170
Browse files Browse the repository at this point in the history
…and #2156 (#2172)
  • Loading branch information
alexec authored Feb 6, 2020
1 parent 73cb541 commit b9c828a
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ up:
.PHONY: env
env:
export ARGO_SERVER=localhost:2746
export ARGO_TOKEN=$(ARGO_TOKEN)
export ARGO_TOKEN="Bearer $(ARGO_TOKEN)"

.PHONY: pf
pf:
Expand Down
5 changes: 2 additions & 3 deletions docs/workflow-controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ data:
connectionPool:
maxIdleConns: 100
maxOpenConns: 0
# save the entire workflow into etcd and DB
# if true node status is only saved to the persistence DB to avoid the 1MB limit in etcd
nodeStatusOffLoad: false
# save completed workloads to the archived, even if disabled, you'll be able to
# read from the archive
# save completed workloads to the workflow archive
archive: false
# Optional name of the cluster I'm running in. This must be unique for your cluster.
clusterName: default
Expand Down
4 changes: 2 additions & 2 deletions persist/sqldb/explosive_offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func (n *explosiveOffloadNodeStatusRepo) List(string) (map[UUIDVersion]wfv1.Node
return nil, notSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) Delete(string) error {
func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error {
return notSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) ListOldUIDs(namespace string) ([]string, error) {
func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) ([]UUIDVersion, error) {
return nil, notSupportedError
}
1 change: 1 addition & 0 deletions persist/sqldb/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (m migrate) Exec(ctx context.Context) error {
ansiSQLChange(`alter table argo_archived_workflows alter column name set not null`),
),
// clustername(not null) | uid(not null) | | name (not null) | phase(not null) | namespace(not null) | workflow(not null) | startedat(not null) | finishedat(not null)
ansiSQLChange(`create index ` + m.tableName + `_i2 on ` + m.tableName + ` (clustername,namespace,updatedat)`),
} {
err := m.applyChange(ctx, changeSchemaVersion, change)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions persist/sqldb/mocks/OffloadNodeStatusRepo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 14 additions & 17 deletions persist/sqldb/offload_node_status_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type OffloadNodeStatusRepo interface {
Save(uid, namespace string, nodes wfv1.Nodes) (string, error)
Get(uid, version string) (wfv1.Nodes, error)
List(namespace string) (map[UUIDVersion]wfv1.Nodes, error)
ListOldUIDs(namespace string) ([]string, error)
Delete(uid string) error
ListOldOffloads(namespace string) ([]UUIDVersion, error)
Delete(uid, version string) error
IsEnabled() bool
}

Expand Down Expand Up @@ -166,32 +166,29 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes,
return res, nil
}

func (wdc *nodeOffloadRepo) ListOldUIDs(namespace string) ([]string, error) {
func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, error) {
log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes")
row, err := wdc.session.
Query("select distinct uid from "+wdc.tableName+" where clustername = ? and namespace = ? and updatedat < current_timestamp - interval '5' minute", wdc.clusterName, namespace)
var records []UUIDVersion
err := wdc.session.
SelectFrom(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(namespaceEqual(namespace)).
And("updatedat < current_timestamp - interval '5' minute").
All(&records)
if err != nil {
return nil, err
}
var uids []string
for row.Next() {
uid := ""
err := row.Scan(&uid)
if err != nil {
return nil, err
}
uids = append(uids, uid)
}
return uids, nil
return records, nil
}

func (wdc *nodeOffloadRepo) Delete(uid string) error {
logCtx := log.WithFields(log.Fields{"uid": uid})
func (wdc *nodeOffloadRepo) Delete(uid, version string) error {
logCtx := log.WithFields(log.Fields{"uid": uid, "version": version})
logCtx.Debug("Deleting offloaded nodes")
rs, err := wdc.session.
DeleteFrom(wdc.tableName).
Where(db.Cond{"clustername": wdc.clusterName}).
And(db.Cond{"uid": uid}).
And(db.Cond{"version": version}).
Exec()
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (s *E2ESuite) DeleteResources(label string) {
for _, wf := range list.Items {
isTestWf[wf.Name] = false
if s.Persistence.IsEnabled() {
err := s.Persistence.offloadNodeStatusRepo.Delete(string(wf.UID))
// TODO - may make tests flakey
err := s.Persistence.offloadNodeStatusRepo.Delete(string(wf.UID), wf.Status.OffloadNodeStatusVersion)
if err != nil {
panic(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ export class WorkflowDetails extends React.Component<RouteComponentProps<any>, {
private async loadWorkflow(namespace: string, name: string) {
try {
this.ensureUnsubscribed();
this.changesSubscription = services.workflows.watch({name, namespace})
this.changesSubscription = services.workflows
.watch({name, namespace})
.map(changeEvent => changeEvent.object)
.catch((error, caught) => {
return caught;
Expand Down
26 changes: 15 additions & 11 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,27 +265,31 @@ func (wfc *WorkflowController) periodicWorkflowGarbageCollector(stopCh <-chan st
case <-ticker.C:
if wfc.offloadNodeStatusRepo.IsEnabled() {
log.Info("Performing periodic workflow GC")
oldUIDs, err := wfc.offloadNodeStatusRepo.ListOldUIDs(wfc.GetManagedNamespace())
oldRecords, err := wfc.offloadNodeStatusRepo.ListOldOffloads(wfc.GetManagedNamespace())
if err != nil {
log.WithField("err", err).Error("Failed to list offloaded nodes")
log.WithField("err", err).Error("Failed to list old offloaded nodes")
}
if len(oldUIDs) == 0 {
log.Info("Zero old UIDs, nothing to do")
if len(oldRecords) == 0 {
log.Info("Zero old records, nothing to do")
return
}
// get every lives workflow (1000s) into a map
liveOffloadNodeStatusVersions := make(map[types.UID]string)
list, err := util.NewWorkflowLister(wfc.wfInformer).List()
if err != nil {
log.WithField("err", err).Error("Failed to list workflows")
return
}
wfs := make(map[types.UID]bool)
for _, wf := range list {
wfs[wf.UID] = true
// this could be the empty string - as it is no longer offloaded
liveOffloadNodeStatusVersions[wf.UID] = wf.Status.OffloadNodeStatusVersion
}
log.WithFields(log.Fields{"len_wfs": len(wfs), "len_old_uids": len(oldUIDs)}).Info("Deleting old UIDs that are not live")
for _, uid := range oldUIDs {
_, ok := wfs[types.UID(uid)]
if !ok {
err := wfc.offloadNodeStatusRepo.Delete(uid)
log.WithFields(log.Fields{"len_wfs": len(liveOffloadNodeStatusVersions), "len_old_records": len(oldRecords)}).Info("Deleting old UIDs that are not live")
for _, record := range oldRecords {
// this could be empty string
nodeStatusVersion, ok := liveOffloadNodeStatusVersions[types.UID(record.UID)]
if !ok || nodeStatusVersion != record.Version {
err := wfc.offloadNodeStatusRepo.Delete(record.UID, record.Version)
if err != nil {
log.WithField("err", err).Error("Failed to delete offloaded nodes")
}
Expand Down

0 comments on commit b9c828a

Please sign in to comment.