Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add version to offload nodes. Fixes #1944 and #1946 #1974

Merged
merged 508 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
508 commits
Select commit Hold shift + click to select a range
59c0aed
search
alexec Jan 7, 2020
daa5bd2
docs + help
alexec Jan 7, 2020
e31737a
Merge branch 'master' into apiserverimpl
simster7 Jan 7, 2020
678d5ae
Fix imports
simster7 Jan 7, 2020
8962a47
Fixes:
simster7 Jan 7, 2020
4556caf
error boundary + loading
alexec Jan 7, 2020
aa73fe2
Can create workflow tempaltes from UI
simster7 Jan 7, 2020
75d2f07
removed dup delete fields
alexec Jan 7, 2020
06ffb49
lint
alexec Jan 7, 2020
39803ec
vendor
alexec Jan 7, 2020
e84719f
ui
alexec Jan 7, 2020
8f72da8
layout
alexec Jan 7, 2020
49910f5
rm yamljs
alexec Jan 7, 2020
d46ed75
Fixed NoAuth issue
sarabala1979 Jan 7, 2020
619c8aa
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 7, 2020
850669f
reduce workflow size
alexec Jan 7, 2020
a265031
ok
alexec Jan 7, 2020
079029b
continue
alexec Jan 7, 2020
bbb8434
continue
alexec Jan 7, 2020
008d19c
Remove leading space
whynowy Jan 8, 2020
ab84060
namespace filter
alexec Jan 8, 2020
2922c89
added missing labels
alexec Jan 8, 2020
350069e
help
alexec Jan 8, 2020
cf3c842
error boundary
alexec Jan 8, 2020
f5ce107
change creation endpoints
alexec Jan 8, 2020
494ef47
lint
alexec Jan 8, 2020
99b296a
refactor db logic removing unused methods
alexec Jan 8, 2020
156bafb
tidy up
alexec Jan 8, 2020
fea29e7
Cron Workflow UI Core
simster7 Jan 8, 2020
53a83d2
Cron Workflows app layer
simster7 Jan 8, 2020
1c94ed9
Upgrade argo-ui dependency
simster7 Jan 8, 2020
737aab3
deps
alexec Jan 8, 2020
3dbf433
Merge branch 'apiserverimpl' of github.com:argoproj/argo into apiserv…
alexec Jan 8, 2020
4d598eb
fix: stack overflow error
alexec Jan 8, 2020
b902a28
Enhanced log command for workflow
sarabala1979 Jan 8, 2020
77b677b
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 8, 2020
1714828
adds support to list cron workflows
alexec Jan 8, 2020
39c1bf9
enhance cron list
alexec Jan 8, 2020
551ec35
lint
alexec Jan 8, 2020
8bbc8ec
add getter cron workflows
alexec Jan 8, 2020
f7141c1
add delete for cron workflows
alexec Jan 8, 2020
442852c
small changes
alexec Jan 8, 2020
d310a9e
Re-instated namespace for creation URLs.
alexec Jan 8, 2020
14628e7
Add Created on Cronworkflow
sarabala1979 Jan 8, 2020
66a29f8
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 8, 2020
92ac216
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 8, 2020
c77bfab
fix-up
alexec Jan 9, 2020
d17e2d2
remove ResubmitArchivedWorkflom
alexec Jan 9, 2020
7cb6e49
tidy up
alexec Jan 9, 2020
b53a5e0
remove namespace from archive apis
alexec Jan 9, 2020
a5f922f
fix
alexec Jan 9, 2020
302330d
updated cron ui
alexec Jan 9, 2020
d54459a
fix phases
alexec Jan 9, 2020
399f129
clean up deps
alexec Jan 9, 2020
eacda3c
ruthless pruning of dependencies
alexec Jan 9, 2020
aba7c84
Refactored Auth Token functionality
sarabala1979 Jan 9, 2020
45646fd
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 9, 2020
61f7f7f
bug fix
alexec Jan 9, 2020
dfafb96
Delete Untitled
sarabala1979 Jan 9, 2020
9f838e2
test permission
whynowy Jan 9, 2020
b71e630
fix
alexec Jan 9, 2020
58353ae
Update manifests to ERSION
alexec Jan 9, 2020
bc8c31d
Update manifests to ERSION
alexec Jan 9, 2020
c424743
Update manifests to apiserverimpl
alexec Jan 9, 2020
eb04bd5
release
alexec Jan 9, 2020
1a926a0
bits
alexec Jan 9, 2020
f1c53bf
1
alexec Jan 9, 2020
e0f9dd9
1
alexec Jan 9, 2020
e8a0c9e
commen
alexec Jan 9, 2020
e3a40c2
lint
alexec Jan 9, 2020
6b793af
lint
alexec Jan 9, 2020
2498147
codegen
alexec Jan 9, 2020
dd5d521
faster
alexec Jan 9, 2020
79e0659
makefile
alexec Jan 9, 2020
9b09bec
goimports
alexec Jan 9, 2020
c5e278c
VERSION
alexec Jan 9, 2020
918eaa5
typo
alexec Jan 9, 2020
c5140af
makefile
alexec Jan 9, 2020
631c848
finished missing test
alexec Jan 9, 2020
6903277
corrected flag name
alexec Jan 9, 2020
efb9ab3
add TODO
alexec Jan 9, 2020
66e180c
manifests
alexec Jan 9, 2020
ea7d0d0
refactored
sarabala1979 Jan 9, 2020
bd42919
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 9, 2020
6f7ec53
snapshot
alexec Jan 9, 2020
aa18841
v1
alexec Jan 10, 2020
85f8e39
ubuntu-1604:201903-01
alexec Jan 10, 2020
5712a70
catch errors
alexec Jan 10, 2020
c9ecf82
upload assets
alexec Jan 10, 2020
f65cd6e
dep ensure
alexec Jan 10, 2020
87ce187
remove bug in upload-asset.sh
alexec Jan 10, 2020
2f79f24
lint
alexec Jan 10, 2020
84a68c6
re-instate v0
alexec Jan 10, 2020
4131e5d
only pull images ifNotPresent, so not being able to reach Docker Hub …
alexec Jan 10, 2020
a98a00b
try fix
alexec Jan 10, 2020
2a0d7d7
fix: sed invalid
alexec Jan 10, 2020
6f277da
debugging
alexec Jan 10, 2020
1625138
push
alexec Jan 10, 2020
139b09f
downloading
alexec Jan 10, 2020
d776cc4
fix install
alexec Jan 10, 2020
6f1209b
fix: should not delete artifacts
alexec Jan 10, 2020
1025669
add manifests
alexec Jan 10, 2020
898f974
skip tests
alexec Jan 10, 2020
2ac5bf6
latest
alexec Jan 10, 2020
22b836f
latest
alexec Jan 10, 2020
b66d299
increase timeout
alexec Jan 10, 2020
823ba8b
yamls
alexec Jan 10, 2020
96f1ec9
quick-start
alexec Jan 10, 2020
3560bb9
yamls
alexec Jan 10, 2020
defda9e
chore: remove make of clis
alexec Jan 10, 2020
1f79091
Merge tag 'apiserverimpl' of github.com:argoproj/argo into apiserverimpl
alexec Jan 10, 2020
d676c67
Merge branch 'apiserverimpl' of github.com:argoproj/argo into apiserv…
alexec Jan 10, 2020
bf12b53
install kustomize
alexec Jan 10, 2020
0307e2e
codegen
alexec Jan 10, 2020
912be93
k3
alexec Jan 10, 2020
629ce06
docs: update releasing.md
alexec Jan 10, 2020
c0905a1
chore: add missing .codecov.yaml
alexec Jan 10, 2020
257a841
chore: try to install kustomize .. again
alexec Jan 10, 2020
edd21f7
chore: add `make pre-push`
alexec Jan 10, 2020
fbac0cd
fix: chmod pre-push hook
alexec Jan 10, 2020
d79bc93
/.vendor-new
alexec Jan 10, 2020
87cb8e2
delete .vendor-new if exists
alexec Jan 10, 2020
c75a496
chore: create dist dir
alexec Jan 10, 2020
68ac298
rename file
alexec Jan 10, 2020
93500b1
run e2e test pre-push
alexec Jan 10, 2020
6ea8c5d
pf
alexec Jan 10, 2020
dbd33cc
mkdir dist for CI
alexec Jan 10, 2020
e793564
added TODO for argo-server role
alexec Jan 10, 2020
e4e4b2e
re-order CI config
alexec Jan 10, 2020
6e42c2c
fix: try fix pf
alexec Jan 10, 2020
ca48000
fix: background portforward on CI
alexec Jan 10, 2020
1575035
feat: upload swagger assets to release
alexec Jan 10, 2020
0ec6e94
updated manifests
alexec Jan 10, 2020
5f5f187
remove redundancy
alexec Jan 10, 2020
d0946e5
add pre-commit target
alexec Jan 10, 2020
d154ede
added readinessProbe
alexec Jan 11, 2020
3fc61e6
add GCP authprovider and fixed in list call
sarabala1979 Jan 11, 2020
5ac8b0c
fixed lint
sarabala1979 Jan 11, 2020
8a21c24
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 11, 2020
4302c43
lint
alexec Jan 11, 2020
54ab1bc
chore: logging
alexec Jan 11, 2020
ac8a62f
listen on 0.0.0.0:2746
alexec Jan 11, 2020
90a7931
added v2 token
alexec Jan 11, 2020
0c81d73
feat: add argo-server service account
alexec Jan 11, 2020
c9ce631
updated roles
alexec Jan 11, 2020
d5da88d
re-order build
alexec Jan 11, 2020
640e8d1
ci: fix build
alexec Jan 11, 2020
90a9d07
update manifests
alexec Jan 11, 2020
f6e4551
revert
alexec Jan 11, 2020
41e2189
do not scale down deployment, maybe this will fix CI
alexec Jan 11, 2020
8010e69
adds --force-namespace-isolation to controller
alexec Jan 11, 2020
962065c
typo
alexec Jan 11, 2020
191f1bb
namespace
alexec Jan 11, 2020
a47759e
1m
alexec Jan 11, 2020
607cfd9
namespace: argo
alexec Jan 11, 2020
c25cbff
adds --force-namespace-isolation
alexec Jan 11, 2020
4b3e965
FORCE_NAMESPACE_ISOLATION
alexec Jan 11, 2020
5d31a06
logging
alexec Jan 11, 2020
37d5c45
bug
alexec Jan 11, 2020
145eb6d
logging
alexec Jan 11, 2020
a24db6b
fix namespace
alexec Jan 11, 2020
eba9582
add delay
alexec Jan 11, 2020
fd0dc30
run smoke tests before e2e on CI
alexec Jan 11, 2020
aa7d7c5
add debug logging
alexec Jan 11, 2020
3280038
added guard of condition
alexec Jan 11, 2020
e2b8166
simplify update
alexec Jan 11, 2020
6f49e62
fix: send offloaded node status to watch
alexec Jan 11, 2020
2b88dca
test: add wait for workflow to be deleted
alexec Jan 11, 2020
0ae6b19
test: print more diagnostics on failure
alexec Jan 11, 2020
29fb493
chore: spelling
alexec Jan 11, 2020
fafdd9b
chore: remove debug logging
alexec Jan 11, 2020
ae99a28
test: watch should return offloaded node status
alexec Jan 11, 2020
0f89ff7
chore: added comments
alexec Jan 11, 2020
50acdbc
cron isolated informer
alexec Jan 11, 2020
6a325a5
roles
alexec Jan 11, 2020
4ee7602
get nodes
alexec Jan 11, 2020
a9de24a
lint
alexec Jan 11, 2020
5334c60
stuff
alexec Jan 11, 2020
bb004b1
postgres.host=localhost
alexec Jan 11, 2020
89805df
Incorporated Jesse Initial Comments
sarabala1979 Jan 12, 2020
c1ea1bd
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 12, 2020
96cfc44
Update workflow_server_test.go
sarabala1979 Jan 12, 2020
2b0f786
Update workflow_server_test.go
sarabala1979 Jan 12, 2020
80b230c
Update workflow_server_test.go
sarabala1979 Jan 12, 2020
9e4bba9
incorporated Jesse comments
sarabala1979 Jan 12, 2020
f2bc0c8
Merge branch 'master' into apiserverimpl
simster7 Jan 13, 2020
79ed7ff
New CLI tests
simster7 Jan 13, 2020
51dcb1c
Merge branch 'master' into apiserverimpl
simster7 Jan 13, 2020
db68dbd
Adds diagnostics to tests.
alexec Jan 13, 2020
43ac0ec
Minor test rework
simster7 Jan 13, 2020
18058f5
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
simster7 Jan 13, 2020
cc166ea
Removed Kubeservice
sarabala1979 Jan 13, 2020
f840729
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
sarabala1979 Jan 13, 2020
38d20e0
Fix CronWorkflow tests
simster7 Jan 13, 2020
bbbadcb
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
simster7 Jan 13, 2020
c4ae91c
disable forcing off offloading
alexec Jan 13, 2020
01c6e9e
Add workflow create verb to namespace install
simster7 Jan 13, 2020
690e05a
Skip legacy test
simster7 Jan 13, 2020
960d962
add github action
alexec Jan 14, 2020
bc1092b
fix links
alexec Jan 14, 2020
19185f7
fix for code review comment
alexec Jan 14, 2020
5aa433a
remove quick start
alexec Jan 14, 2020
adbfffe
fix version
alexec Jan 14, 2020
bc3da04
Merge branch 'apiserverimpl' of github.com:argoproj/argo into apiserv…
alexec Jan 14, 2020
b4a7e05
reinstate behaviour of Dockerfile
alexec Jan 14, 2020
4b3e827
Added WorkflowTemplate test infra and smoke test
simster7 Jan 14, 2020
f8378db
Merge branch 'apiserverimpl' of https://github.com/argoproj/argo into…
simster7 Jan 14, 2020
7d89014
logging
alexec Jan 14, 2020
6caa777
re-order more flakey task earlier
alexec Jan 14, 2020
1cd7124
Dockerfile
alexec Jan 14, 2020
64d15e4
lock yarn for speed
alexec Jan 14, 2020
08d76c3
update watch
alexec Jan 14, 2020
33f3b41
Merge branch 'master' into apiserverimpl
alexec Jan 14, 2020
a6b421c
post-merge fixes
alexec Jan 14, 2020
e2d4a7f
work!
alexec Jan 14, 2020
b61fe1a
more logging
alexec Jan 14, 2020
5c89895
feat: Enable MAX_WORKFLOW_SIZE. Closes #1946
alexec Jan 14, 2020
57c9396
feat: Add resourceversion to offload and archive tables. Fixes #1944
alexec Jan 14, 2020
e5fb64c
Update README.md
alexec Jan 14, 2020
3246a9c
add version to offloaded nodes
alexec Jan 15, 2020
77257ea
Merge branch 'nos' of github.com:alexec/argo into nos
alexec Jan 15, 2020
e219d5f
Merge branch 'master' into nos
alexec Jan 15, 2020
0b2c519
Fixes
alexec Jan 15, 2020
9df5a18
fix test
alexec Jan 15, 2020
b259813
Merge branch 'master' into nos
alexec Jan 15, 2020
d56fccc
Merge branch 'master' into nos
alexec Jan 15, 2020
d75819f
Merge remote-tracking branch 'origin/master' into nos
alexec Jan 16, 2020
644625b
trigger ci
alexec Jan 16, 2020
ba55e46
feat: add clustername to tables
alexec Jan 16, 2020
93a2fcd
Merge remote-tracking branch 'origin/master' into nos
alexec Jan 16, 2020
71109e7
changes
alexec Jan 16, 2020
3a8dcff
fix SQL syntax error
alexec Jan 16, 2020
4cf579f
fix: logic error
alexec Jan 16, 2020
c87fcb8
slightly longer timeout
alexec Jan 17, 2020
0a23c5f
pointer for slice
alexec Jan 17, 2020
698f4ae
only print diagnostics for e2e tests
alexec Jan 17, 2020
01a1c5b
correct README.md
alexec Jan 17, 2020
321189a
fix e2e tests
alexec Jan 17, 2020
89c8e6d
Merge branch 'master' into nos
alexec Jan 17, 2020
6e70609
codegen
alexec Jan 17, 2020
9ffbab4
Merge branch 'master' into nos
alexec Jan 20, 2020
9870772
fix: post-merge error
alexec Jan 20, 2020
46527bc
Merge branch 'master' into nos
alexec Jan 21, 2020
07ccbf1
Merge
alexec Jan 21, 2020
b852c54
Merge branch 'nos' of github.com:alexec/argo into nos
alexec Jan 21, 2020
e74f1e2
Merge branch 'master' into nos
alexec Jan 21, 2020
3de1820
feat(sql): post merge fixes
alexec Jan 21, 2020
8594d92
Merge branch 'master' into nos
alexec Jan 22, 2020
2bac47f
lint
alexec Jan 22, 2020
24a68d8
code review comments
alexec Jan 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1814,9 +1814,9 @@
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.NodeStatus"
}
},
"offloadNodeStatus": {
"description": "Whether on not node status has been offloaded to a database. If true, then Nodes and CompressedNodes will be empty.",
"type": "boolean"
"offloadNodeStatusVersion": {
"description": "Whether on not node status has been offloaded to a database. If exists, then Nodes and CompressedNodes will be empty. This will actually be populated with a hash of the offloaded data.",
"type": "string"
},
"outputs": {
"description": "Outputs captures output values and artifact locations produced by the workflow via global outputs",
Expand Down
17 changes: 9 additions & 8 deletions cmd/server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,19 @@ func (as *argoServer) Run(ctx context.Context, port int) {
// TODO: this currently returns an error every time
log.Errorf("Error marshalling config map: %s", err)
}
var offloadRepo sqldb.OffloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo
var wfArchive sqldb.WorkflowArchive = sqldb.NullWorkflowArchive
if configMap != nil && configMap.Persistence != nil {
session, tableName, err := sqldb.CreateDBSession(as.kubeClientset, as.namespace, configMap.Persistence)
var offloadRepo = sqldb.ExplosiveOffloadNodeStatusRepo
var wfArchive = sqldb.NullWorkflowArchive
persistence := configMap.Persistence
if configMap != nil && persistence != nil {
session, tableName, err := sqldb.CreateDBSession(as.kubeClientset, as.namespace, persistence)
if err != nil {
log.Fatal(err)
}
log.WithField("nodeStatusOffload", configMap.Persistence.NodeStatusOffload).Info("Offload node status")
if configMap.Persistence.NodeStatusOffload {
offloadRepo = sqldb.NewOffloadNodeStatusRepo(tableName, session)
log.WithField("nodeStatusOffload", persistence.NodeStatusOffload).Info("Offload node status")
if persistence.NodeStatusOffload {
offloadRepo = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
}
wfArchive = sqldb.NewWorkflowArchive(session)
wfArchive = sqldb.NewWorkflowArchive(session, persistence.GetClusterName())
}
artifactServer := artifacts.NewArtifactServer(as.authenticator, offloadRepo, wfArchive)
grpcServer := as.newGRPCServer(offloadRepo, wfArchive)
Expand Down
6 changes: 3 additions & 3 deletions cmd/server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ func (a *ArtifactServer) getWorkflow(ctx context.Context, namespace string, work
if err != nil {
return nil, err
}
if wf.Status.OffloadNodeStatus {
offloadedWf, err := a.offloadNodeStatusRepo.Get(workflowName, namespace)
if wf.Status.IsOffloadNodeStatus() {
offloadedNodes, err := a.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, err
}
wf.Status.Nodes = offloadedWf.Status.Nodes
wf.Status.Nodes = offloadedNodes
}
return wf, nil
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/server/workflow/workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -4358,10 +4358,9 @@
},
"description": "Nodes is a mapping between a node ID and the node's status."
},
"offloadNodeStatus": {
"type": "boolean",
"format": "boolean",
"description": "Whether on not node status has been offloaded to a database. If true, then Nodes and CompressedNodes will be empty."
"offloadNodeStatusVersion": {
"type": "string",
"description": "Whether on not node status has been offloaded to a database. If exists, then Nodes and CompressedNodes will be empty.\nThis will actually be populated with a hash of the offloaded data."
},
"storedTemplates": {
"type": "object",
Expand Down
37 changes: 17 additions & 20 deletions cmd/server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *WorkflowGetReques
return nil, err
}

if wf.Status.OffloadNodeStatus {
offloaded, err := s.offloadNodeStatusRepo.Get(req.Name, req.Namespace)
if wf.Status.IsOffloadNodeStatus() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, err
}
wf.Status.Nodes = offloaded.Status.Nodes
wf.Status.CompressedNodes = offloaded.Status.CompressedNodes
wf.Status.Nodes = offloadedNodes
}
err = packer.DecompressWorkflow(wf)
if err != nil {
Expand All @@ -108,8 +107,17 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *WorkflowListReq
if err != nil {
return nil, err
}
offloadedNodes, err := s.offloadNodeStatusRepo.List(req.Namespace)
if err != nil {
return nil, err
}
for i, wf := range wfList.Items {
if wf.Status.IsOffloadNodeStatus() {
wfList.Items[i].Status.Nodes = offloadedNodes[sqldb.UUIDVersion{UID: string(wf.UID), Version: wf.GetOffloadNodeStatusVersion()}]
}
}

return wfList, nil
return &v1alpha1.WorkflowList{Items: wfList.Items}, nil
}

func (s *workflowServer) WatchWorkflows(req *WatchWorkflowsRequest, ws WorkflowService_WatchWorkflowsServer) error {
Expand Down Expand Up @@ -144,12 +152,12 @@ func (s *workflowServer) WatchWorkflows(req *WatchWorkflowsRequest, ws WorkflowS
if err != nil {
return err
}
if wf.Status.OffloadNodeStatus {
offloaded, err := s.offloadNodeStatusRepo.Get(wf.Name, wf.Namespace)
if wf.Status.IsOffloadNodeStatus() {
offloadedNodes, err := s.offloadNodeStatusRepo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return err
}
wf.Status.Nodes = offloaded.Status.Nodes
wf.Status.Nodes = offloadedNodes
}
logCtx.Debug("Sending event")
err = ws.Send(&WorkflowWatchEvent{Type: string(next.Type), Object: wf})
Expand All @@ -165,18 +173,7 @@ func (s *workflowServer) WatchWorkflows(req *WatchWorkflowsRequest, ws WorkflowS
func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *WorkflowDeleteRequest) (*WorkflowDeleteResponse, error) {
wfClient := auth.GetWfClient(ctx)

wf, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(req.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}

if wf.Status.OffloadNodeStatus {
err = s.offloadNodeStatusRepo.Delete(req.Name, req.Namespace)
if err != nil {
return nil, err
}
}
err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Delete(req.Name, &metav1.DeleteOptions{})
err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Delete(req.Name, &metav1.DeleteOptions{})
if err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions cmd/server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"

"github.com/argoproj/argo/cmd/server/auth"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/persist/sqldb/mocks"
"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
v1alpha "github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
)
Expand Down Expand Up @@ -361,7 +364,9 @@ func getWorkflowServer() (WorkflowServiceServer, context.Context) {
_ = json.Unmarshal([]byte(wf3), &wfObj3)
_ = json.Unmarshal([]byte(wf4), &wfObj4)
_ = json.Unmarshal([]byte(wf5), &wfObj5)
server := NewWorkflowServer(nil)
offloadNodeStatusRepo := &mocks.OffloadNodeStatusRepo{}
offloadNodeStatusRepo.On("List", mock.Anything).Return(map[sqldb.UUIDVersion]v1alpha1.Nodes{}, nil)
server := NewWorkflowServer(offloadNodeStatusRepo)
kubeClientSet := fake.NewSimpleClientset()
wfClientset := v1alpha.NewSimpleClientset(&wfObj1, &wfObj2, &wfObj3, &wfObj4, &wfObj5)
wfClientset.PrependReactor("create", "workflows", generateNameReactor)
Expand Down Expand Up @@ -390,10 +395,7 @@ func getWorkflow(ctx context.Context, server WorkflowServiceServer, namespace st
}

func getWorkflowList(ctx context.Context, server WorkflowServiceServer, namespace string) (*v1alpha1.WorkflowList, error) {
req := WorkflowListRequest{
Namespace: namespace,
}
return server.ListWorkflows(ctx, &req)
return server.ListWorkflows(ctx, &WorkflowListRequest{Namespace: namespace})

}

Expand Down
12 changes: 6 additions & 6 deletions cmd/server/workflowarchive/archived_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
)

type archivedWorkflowServer struct {
repo sqldb.WorkflowArchive
wfArchive sqldb.WorkflowArchive
}

func NewWorkflowArchiveServer(repo sqldb.WorkflowArchive) ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{repo: repo}
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive: wfArchive}
}

func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) {
Expand Down Expand Up @@ -52,7 +52,7 @@ func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req
authorizer := auth.NewAuthorizer(ctx)
// keep trying until we have enough
for len(items) < limit {
moreItems, err := w.repo.ListWorkflows(namespace, limit, offset)
moreItems, err := w.wfArchive.ListWorkflows(namespace, limit, offset)
if err != nil {
return nil, err
}
Expand All @@ -79,7 +79,7 @@ func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req
}

func (w *archivedWorkflowServer) GetArchivedWorkflow(ctx context.Context, req *GetArchivedWorkflowRequest) (*wfv1.Workflow, error) {
wf, err := w.repo.GetWorkflow(req.Uid)
wf, err := w.wfArchive.GetWorkflow(req.Uid)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (w *archivedWorkflowServer) DeleteArchivedWorkflow(ctx context.Context, req
if !allowed {
return nil, status.Error(codes.PermissionDenied, "permission denied")
}
err = w.repo.DeleteWorkflow(req.Uid)
err = w.wfArchive.DeleteWorkflow(req.Uid)
if err != nil {
return nil, err
}
Expand Down
7 changes: 3 additions & 4 deletions cmd/server/workflowarchive/workflow-archive.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3688,10 +3688,9 @@
},
"description": "Nodes is a mapping between a node ID and the node's status."
},
"offloadNodeStatus": {
"type": "boolean",
"format": "boolean",
"description": "Whether on not node status has been offloaded to a database. If true, then Nodes and CompressedNodes will be empty."
"offloadNodeStatusVersion": {
"type": "string",
"description": "Whether on not node status has been offloaded to a database. If exists, then Nodes and CompressedNodes will be empty.\nThis will actually be populated with a hash of the offloaded data."
},
"storedTemplates": {
"type": "object",
Expand Down
2 changes: 2 additions & 0 deletions docs/workflow-controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ data:
# save completed workloads to the archived, even if disabled, you'll be able to
# read from the archive
archive: false
# Optional name of the cluster I'm running in. This must be unique for your cluster.
clusterName: default
postgresql:
host: localhost
port: 5432
Expand Down
11 changes: 11 additions & 0 deletions persist/sqldb/ansi_sql_change.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package sqldb

import "upper.io/db.v3/lib/sqlbuilder"

// represent a straight forward change that is compatible with all database providers
type ansiSQLChange string

func (s ansiSQLChange) Apply(session sqlbuilder.Database) error {
_, err := session.Exec(string(s))
return err
}
56 changes: 56 additions & 0 deletions persist/sqldb/backfill_cluster_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package sqldb

import (
"fmt"

log "github.com/sirupsen/logrus"
"upper.io/db.v3"
"upper.io/db.v3/lib/sqlbuilder"
)

type backfillClusterName struct {
clusterName string
tableName string
}

func (s backfillClusterName) String() string {
return fmt.Sprintf("backfillClusterName{%s,%s}", s.clusterName, s.tableName)
}

func (s backfillClusterName) Apply(session sqlbuilder.Database) error {
log.WithField("clustername", s.clusterName).Info("Back-filling cluster name")
rs, err := session.
Select("uid").
From(s.tableName).
Where(db.Cond{"clustername": nil}).
Query()
if err != nil {
return err
}
for rs.Next() {
uid := ""
err := rs.Scan(&uid)
if err != nil {
return err
}
logCtx := log.WithFields(log.Fields{"clustername": s.clusterName, "uid": uid})
logCtx.Info("Back-filling cluster name")
res, err := session.
Update(s.tableName).
Set("clustername", s.clusterName).
Where(db.Cond{"clustername": nil}).
And(db.Cond{"uuid": uid}).
Copy link
Contributor

@markterm markterm Feb 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this line read the following? (as there is no uuid column)

And(db.Cond{"uid": uid}).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - though why exactly any SQL framework would not throw some kind of exception at this clear error is beyond me 😠 😠 😠 😠

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually - maybe this code never runs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's running for me on migrating a legacy database ...

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did it migrate successfully? give that it'l likely that every record should have "default" it may be just luckily - though I ask again - who writes framework that allows such a big error thought?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reviewing the code - this backfill is really implemented stupidly - I think I'll make a patch fix and then a re-write

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exec()
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected != 1 {
logCtx.WithField("rowsAffected", rowsAffected).Warn("Expected exactly one row affected")
}
}
return nil
}
66 changes: 66 additions & 0 deletions persist/sqldb/backfill_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package sqldb

import (
"encoding/json"
"fmt"

log "github.com/sirupsen/logrus"
"upper.io/db.v3"
"upper.io/db.v3/lib/sqlbuilder"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

type backfillNodes struct {
tableName string
}

func (s backfillNodes) String() string {
return fmt.Sprintf("backfillNodes{%s}", s.tableName)
}

func (s backfillNodes) Apply(session sqlbuilder.Database) error {
log.Info("Backfill node status")
rs, err := session.SelectFrom(s.tableName).
Columns("workflow").
Where(db.Cond{"version": nil}).
Query()
if err != nil {
return err
}
for rs.Next() {
workflow := ""
err := rs.Scan(&workflow)
if err != nil {
return err
}
var wf *wfv1.Workflow
err = json.Unmarshal([]byte(workflow), &wf)
if err != nil {
return err
}
marshalled, version, err := nodeStatusVersion(wf.Status.Nodes)
if err != nil {
return err
}
logCtx := log.WithFields(log.Fields{"name": wf.Name, "namespace": wf.Namespace, "version": version})
logCtx.Info("Back-filling node status")
res, err := session.Update(archiveTableName).
Set("version", wf.ResourceVersion).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this line - the archive table doesn't have a version column

Set("nodes", marshalled).
Where(db.Cond{"name": wf.Name}).
And(db.Cond{"namespace": wf.Namespace}).
Exec()
if err != nil {
return err
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return err
}
if rowsAffected != 1 {
logCtx.WithField("rowsAffected", rowsAffected).Warn("Expected exactly one row affected")
}
}
return nil
}
Loading