Skip to content

Commit

Permalink
Add "passive" mode with --passive flag.
Browse files Browse the repository at this point in the history
Every listmonk instance scans the DB periodically to look for
running campaigns to process. This made running multiple instances of
listmonk impractical as they would all pick up the same running
campaign and process them, resulting in duplicate e-mails.

This commit adds a `--passive` flag to the binary that runs listmonk
in a "passive" mode where campaign processing is disabled. This allows
multiple instances of listmonk to be run to handle different kinds of
requests if there is a requirement (scale/traffic?). It is important
to note that there should only be one non-passive instance running at
any given time. If distributed campaign processing is ever considered,
this will change.
  • Loading branch information
knadh committed Oct 29, 2021
1 parent 9dd8592 commit 1101039
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
13 changes: 10 additions & 3 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ func initFlags() {
f.Bool("install", false, "setup database (first time)")
f.Bool("idempotent", false, "make --install run only if the databse isn't already setup")
f.Bool("upgrade", false, "upgrade database to the current version")
f.Bool("version", false, "current version of the build")
f.Bool("version", false, "show current version of the build")
f.Bool("new-config", false, "generate sample config file")
f.String("static-dir", "", "(optional) path to directory with static files")
f.String("i18n-dir", "", "(optional) path to directory with i18n language files")
f.Bool("yes", false, "assume 'yes' to prompts during --install/upgrade")
f.Bool("passive", false, "run in passive mode where campaigns are not processed")
if err := f.Parse(os.Args[1:]); err != nil {
lo.Fatalf("error loading flags: %v", err)
}
Expand Down Expand Up @@ -194,7 +195,7 @@ func initFS(appDir, frontendDir, staticDir, i18nDir string) stuffbin.FileSystem
// Default dir in cwd.
i18nDir = "i18n"
}
lo.Printf("will load i18n files from: %v", i18nDir)
lo.Printf("loading i18n files from: %v", i18nDir)
files = append(files, joinFSPaths(i18nDir, i18nFiles)...)
}

Expand All @@ -203,7 +204,7 @@ func initFS(appDir, frontendDir, staticDir, i18nDir string) stuffbin.FileSystem
// Default dir in cwd.
staticDir = "static"
}
lo.Printf("will load static files from: %v", staticDir)
lo.Printf("loading static files from: %v", staticDir)
files = append(files, joinFSPaths(staticDir, staticFiles)...)
}

Expand Down Expand Up @@ -352,6 +353,10 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager {
lo.Fatal("app.message_rate should be at least 1")
}

if ko.Bool("passive") {
lo.Println("running in passive mode. won't process campaigns.")
}

return manager.New(manager.Config{
BatchSize: ko.Int("app.batch_size"),
Concurrency: ko.Int("app.concurrency"),
Expand All @@ -368,6 +373,8 @@ func initCampaignManager(q *Queries, cs *constants, app *App) *manager.Manager {
SlidingWindow: ko.Bool("app.message_sliding_window"),
SlidingWindowDuration: ko.Duration("app.message_sliding_window_duration"),
SlidingWindowRate: ko.Int("app.message_sliding_window_rate"),
ScanInterval: time.Second * 5,
ScanCampaigns: !ko.Bool("passive"),
}, newManagerStore(q), campNotifCB, app.i18n, lo)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func main() {

// Start the campaign workers. The campaign batches (fetch from DB, push out
// messages) get processed at the specified interval.
go app.manager.Run(time.Second * 5)
go app.manager.Run()

// Start the app server.
srv := initHTTPServer(app)
Expand Down
19 changes: 15 additions & 4 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ type Message struct {
// Config has parameters for configuring the manager.
type Config struct {
// Number of subscribers to pull from the DB in a single iteration.
BatchSize int

BatchSize int
Concurrency int
MessageRate int
MaxSendErrors int
Expand All @@ -115,6 +114,16 @@ type Config struct {
MessageURL string
ViewTrackURL string
UnsubHeader bool

// Interval to scan the DB for active campaign checkpoints.
ScanInterval time.Duration

// ScanCampaigns indicates whether this instance of manager will scan the DB
// for active campaigns and process them.
// This can be used to run multiple instances of listmonk
// (exposed to the internet, private etc.) where only one does campaign
// processing while the others handle other kinds of traffic.
ScanCampaigns bool
}

type msgError struct {
Expand Down Expand Up @@ -234,8 +243,10 @@ func (m *Manager) HasRunningCampaigns() bool {
// subscribers and pushes messages to them for each queued campaign
// until all subscribers are exhausted, at which point, a campaign is marked
// as "finished".
func (m *Manager) Run(tick time.Duration) {
go m.scanCampaigns(tick)
func (m *Manager) Run() {
if m.cfg.ScanCampaigns {
go m.scanCampaigns(m.cfg.ScanInterval)
}

// Spawn N message workers.
for i := 0; i < m.cfg.Concurrency; i++ {
Expand Down

0 comments on commit 1101039

Please sign in to comment.