diff --git a/workflow/controller/config.go b/workflow/controller/config.go index ef7761e7ed65..01713877eb72 100644 --- a/workflow/controller/config.go +++ b/workflow/controller/config.go @@ -27,6 +27,7 @@ func (wfc *WorkflowController) updateConfig() error { wfc.offloadNodeStatusRepo = sqldb.ExplosiveOffloadNodeStatusRepo wfc.wfArchive = sqldb.NullWorkflowArchive wfc.archiveLabelSelector = labels.Everything() + persistence := wfc.Config.Persistence if persistence != nil { log.Info("Persistence configuration enabled") @@ -40,14 +41,7 @@ func (wfc *WorkflowController) updateConfig() error { return err } log.Info("Persistence Session created successfully") - if !persistence.SkipMigration { - err = sqldb.NewMigrate(session, persistence.GetClusterName(), tableName).Exec(context.Background()) - if err != nil { - return err - } - } else { - log.Info("DB migration is disabled") - } + wfc.session = session } sqldb.ConfigureDBSession(wfc.session, persistence.ConnectionPool) @@ -75,6 +69,7 @@ func (wfc *WorkflowController) updateConfig() error { } else { log.Info("Persistence configuration disabled") } + wfc.hydrator = hydrator.New(wfc.offloadNodeStatusRepo) wfc.updateEstimatorFactory() wfc.rateLimiter = wfc.newRateLimiter() @@ -87,6 +82,27 @@ func (wfc *WorkflowController) updateConfig() error { return nil } +// initDB inits argo DB tables +func (wfc *WorkflowController) initDB() error { + persistence := wfc.Config.Persistence + if persistence != nil { + tableName, err := sqldb.GetTableName(persistence) + if err != nil { + return err + } + if !persistence.SkipMigration { + err = sqldb.NewMigrate(wfc.session, persistence.GetClusterName(), tableName).Exec(context.Background()) + if err != nil { + return err + } + } else { + log.Info("DB migration is disabled") + } + } + + return nil +} + func (wfc *WorkflowController) newRateLimiter() *rate.Limiter { rateLimiter := wfc.Config.GetResourceRateLimit() return rate.NewLimiter(rate.Limit(rateLimiter.Limit), rateLimiter.Burst) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 829cf36d37b4..1d92ed7dc90d 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -247,6 +247,11 @@ var indexers = cache.Indexers{ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers int) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) + // init DB after leader election (if enabled) + if err := wfc.initDB(); err != nil { + log.Fatalf("Failed to init db: %v", err) + } + ctx, cancel := context.WithCancel(ctx) defer cancel()