Skip to content

Commit

Permalink
Add accurate realtime message rate counter.
Browse files Browse the repository at this point in the history
The `rate` field `/api/campaigns/running/stats` returned was computed
based on the total time spent from the start of the campaign to the
current time. This meant that for large campaigns, if there were
pauses or slowdowns in between, the rate would be skewed heavily
making it useless to figure out the current send rate.

This commit introduces a realtime running rate counter in the campaign
manager that returns accurate (running) send rates for the last minute.

The `rate` field in the API now shows the live running rate and a
new `net_rate` field shows the rate from the beginning of the campaign.
  • Loading branch information
knadh committed Feb 6, 2022
1 parent 1b163d1 commit 0f6a037
Show file tree
Hide file tree
Showing 20 changed files with 74 additions and 18 deletions.
27 changes: 16 additions & 11 deletions cmd/campaigns.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type campaignStats struct {
Sent int `db:"sent" json:"sent"`
Started null.Time `db:"started_at" json:"started_at"`
UpdatedAt null.Time `db:"updated_at" json:"updated_at"`
Rate float64 `json:"rate"`
Rate int `json:"rate"`
NetRate int `json:"net_rate"`
}

type campsWrap struct {
Expand Down Expand Up @@ -522,17 +523,21 @@ func handleGetRunningCampaignStats(c echo.Context) error {
// Compute rate.
for i, c := range out {
if c.Started.Valid && c.UpdatedAt.Valid {
diff := c.UpdatedAt.Time.Sub(c.Started.Time).Minutes()
if diff > 0 {
var (
sent = float64(c.Sent)
rate = sent / diff
)
if rate > sent || rate > float64(c.ToSend) {
rate = sent
}
out[i].Rate = rate
diff := int(c.UpdatedAt.Time.Sub(c.Started.Time).Minutes())
if diff < 1 {
diff = 1
}

rate := c.Sent / diff
if rate > c.Sent || rate > c.ToSend {
rate = c.Sent
}

// Rate since the starting of the campaign.
out[i].NetRate = rate

// Realtime running rate over the last minute.
out[i].Rate = app.manager.GetCampaignStats(c.ID).SendRate
}
}

Expand Down
11 changes: 8 additions & 3 deletions frontend/src/views/Campaigns.vue
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
<span>{{ $utils.niceDate(stats.updatedAt, true) }}</span>
</p>
<p v-if="stats.startedAt && stats.updatedAt"
class="is-capitalized" title="Duration">
class="is-capitalized">
<label><b-icon icon="alarm" size="is-small" /></label>
<span>{{ $utils.duration(stats.startedAt, stats.updatedAt) }}</span>
</p>
Expand Down Expand Up @@ -142,10 +142,15 @@
</router-link>
</span>
</p>
<p title="Speed" v-if="stats.rate">
<p v-if="stats.rate">
<label><b-icon icon="speedometer" size="is-small"></b-icon></label>
<span class="send-rate">
{{ stats.rate.toFixed(0) }} / min
<b-tooltip
:label="`${stats.netRate} / ${$t('campaigns.rateMinuteShort')} @
${$utils.duration(stats.startedAt, stats.updatedAt)}`"
type="is-dark">
{{ stats.rate.toFixed(0) }} / {{ $t('campaigns.rateMinuteShort') }}
</b-tooltip>
</span>
</p>
<p v-if="isRunning(props.row.id)">
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mitchellh/mapstructure v1.4.2 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/paulbellamy/ratecounter v0.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/rhnvrm/simples3 v0.8.1
github.com/spf13/cast v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
Expand All @@ -142,8 +144,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rhnvrm/simples3 v0.8.0 h1:SAjJtsqObltKkejIGl3WgyySq2xdrfwZWXi6njFluuA=
github.com/rhnvrm/simples3 v0.8.0/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/rhnvrm/simples3 v0.8.1 h1:jL2yCi9P0pA8hFYkyVWZ4cs5RX3AMgcVsXTOqnCj0/w=
github.com/rhnvrm/simples3 v0.8.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
Expand Down
1 change: 1 addition & 0 deletions i18n/cs-cz.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Náhled",
"campaigns.progress": "Průběh",
"campaigns.queryPlaceholder": "Jméno nebo předmět",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Prvotní HTML",
"campaigns.removeAltText": "Odebrat alternativní zprávu ve formátu prostého textu",
"campaigns.richText": "Formátovaný text",
Expand Down
1 change: 1 addition & 0 deletions i18n/de.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Vorschau",
"campaigns.progress": "Fortschritt",
"campaigns.queryPlaceholder": "Name oder Betreff",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML Code",
"campaigns.removeAltText": "Lösche den alternativen unformatierten Text",
"campaigns.richText": "Rich-Text",
Expand Down
1 change: 1 addition & 0 deletions i18n/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Preview",
"campaigns.progress": "Progress",
"campaigns.queryPlaceholder": "Name or subject",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Raw HTML",
"campaigns.removeAltText": "Remove alternate plain text message",
"campaigns.richText": "Rich text",
Expand Down
1 change: 1 addition & 0 deletions i18n/es.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Vista previa",
"campaigns.progress": "Progreso",
"campaigns.queryPlaceholder": "Nombre o asunto",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML crudo",
"campaigns.removeAltText": "Eliminar mensaje en texto plano alternativo",
"campaigns.richText": "Texto enriquecido",
Expand Down
1 change: 1 addition & 0 deletions i18n/fr.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Aperçu",
"campaigns.progress": "Avancement",
"campaigns.queryPlaceholder": "Nom ou objet",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML brut",
"campaigns.removeAltText": "Supprimer le message alternatif en texte brut",
"campaigns.richText": "Texte riche",
Expand Down
1 change: 1 addition & 0 deletions i18n/hu.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Előnézet",
"campaigns.progress": "Folyamatban",
"campaigns.queryPlaceholder": "Név vagy tárgy",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Nyers (Raw) HTML",
"campaigns.removeAltText": "Alternatív egyszerű szöveges üzenet eltávolítása",
"campaigns.richText": "Rich text",
Expand Down
1 change: 1 addition & 0 deletions i18n/it.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Anteprima",
"campaigns.progress": "Avanzamento",
"campaigns.queryPlaceholder": "Nome o oggetto",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML semplice",
"campaigns.removeAltText": "Cancellare il messaggio sostitutivo in testo semplice",
"campaigns.richText": "Testo formattato",
Expand Down
1 change: 1 addition & 0 deletions i18n/ml.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "പ്രിവ്യൂ",
"campaigns.progress": "പുരോഗതി",
"campaigns.queryPlaceholder": "പേരോ വിഷയമോ",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "അസംസ്കൃത എച്. ടി. എം. എൽ",
"campaigns.removeAltText": "Remove alternate plain text message",
"campaigns.richText": "റിച്ച് ടെക്സ്റ്റ്",
Expand Down
1 change: 1 addition & 0 deletions i18n/nl.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Voorbeeld",
"campaigns.progress": "Voortgang",
"campaigns.queryPlaceholder": "Naam of onderwerp",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML code",
"campaigns.removeAltText": "Verwijder plain text bericht",
"campaigns.richText": "Rich text",
Expand Down
1 change: 1 addition & 0 deletions i18n/pl.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Podgląd",
"campaigns.progress": "Postęp",
"campaigns.queryPlaceholder": "Nazwa lub temat",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Raw HTML",
"campaigns.removeAltText": "Usuń alternatywną treść typu plain text",
"campaigns.richText": "Wzbogacony format tekstowy (Rich text)",
Expand Down
1 change: 1 addition & 0 deletions i18n/pt-BR.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Pré-visualizar",
"campaigns.progress": "Progresso",
"campaigns.queryPlaceholder": "Nome ou assunto",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Código HTML",
"campaigns.removeAltText": "Remover mensagem alternativa em texto simples",
"campaigns.richText": "Texto com formatação",
Expand Down
1 change: 1 addition & 0 deletions i18n/pt.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Pré-visualizar",
"campaigns.progress": "Progresso",
"campaigns.queryPlaceholder": "Nome ou assunto",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML simples",
"campaigns.removeAltText": "Remover mensagem alternativa em texto simples",
"campaigns.richText": "Texto rico",
Expand Down
1 change: 1 addition & 0 deletions i18n/ro.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Previzualizare",
"campaigns.progress": "Progres",
"campaigns.queryPlaceholder": "Numele sau subiectul",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "HTML brut",
"campaigns.removeAltText": "Eliminați un mesaj text alternativ",
"campaigns.richText": "Text îmbogățit",
Expand Down
1 change: 1 addition & 0 deletions i18n/ru.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Предпросмотр",
"campaigns.progress": "Прогресс",
"campaigns.queryPlaceholder": "Имя темы",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Необработанный HTML",
"campaigns.removeAltText": "Удалить альтернативное простое текстовое сообщение",
"campaigns.richText": "Форматированный текст",
Expand Down
1 change: 1 addition & 0 deletions i18n/tr.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"campaigns.preview": "Önizleme",
"campaigns.progress": "İlerleme durumu",
"campaigns.queryPlaceholder": "İsim veya konu",
"campaigns.rateMinuteShort": "min",
"campaigns.rawHTML": "Ham HTML",
"campaigns.removeAltText": "Alternatif düz yazıyı kaldır",
"campaigns.richText": "Zengin metin",
Expand Down
34 changes: 32 additions & 2 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/internal/messenger"
"github.com/knadh/listmonk/models"
"github.com/paulbellamy/ratecounter"
)

const (
Expand All @@ -39,6 +40,11 @@ type Store interface {
DeleteSubscriber(id int64) error
}

// CampStats contains campaign stats like per minute send rate.
type CampStats struct {
SendRate int
}

// Manager handles the scheduling, processing, and queuing of campaigns
// and message pushes.
type Manager struct {
Expand All @@ -50,8 +56,9 @@ type Manager struct {
logger *log.Logger

// Campaigns that are currently running.
camps map[int]*models.Campaign
campsMut sync.RWMutex
camps map[int]*models.Campaign
campRates map[int]*ratecounter.RateCounter
campsMut sync.RWMutex

// Links generated using Track() are cached here so as to not query
// the database for the link UUID for every message sent. This has to
Expand Down Expand Up @@ -153,6 +160,7 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18
logger: l,
messengers: make(map[string]messenger.Messenger),
camps: make(map[int]*models.Campaign),
campRates: make(map[int]*ratecounter.RateCounter),
links: make(map[string]string),
subFetchQueue: make(chan *models.Campaign, cfg.Concurrency),
campMsgQueue: make(chan CampaignMessage, cfg.Concurrency*2),
Expand Down Expand Up @@ -237,6 +245,19 @@ func (m *Manager) HasRunningCampaigns() bool {
return len(m.camps) > 0
}

// GetCampaignStats returns campaign statistics.
func (m *Manager) GetCampaignStats(id int) CampStats {
n := 0

m.campsMut.Lock()
if r, ok := m.campRates[id]; ok {
n = int(r.Rate())
}
m.campsMut.Unlock()

return CampStats{SendRate: n}
}

// Run is a blocking function (that should be invoked as a goroutine)
// that scans the data source at regular intervals for pending campaigns,
// and queues them for processing. The process queue fetches batches of
Expand Down Expand Up @@ -337,9 +358,16 @@ func (m *Manager) worker() {
select {
case m.campMsgErrorQueue <- msgError{camp: msg.Campaign, err: err}:
default:
continue
}
}

m.campsMut.Lock()
if r, ok := m.campRates[msg.Campaign.ID]; ok {
r.Incr(1)
}
m.campsMut.Unlock()

// Arbitrary message.
case msg, ok := <-m.msgQueue:
if !ok {
Expand Down Expand Up @@ -497,6 +525,7 @@ func (m *Manager) addCampaign(c *models.Campaign) error {
// Add the campaign to the active map.
m.campsMut.Lock()
m.camps[c.ID] = c
m.campRates[c.ID] = ratecounter.NewRateCounter(time.Minute)
m.campsMut.Unlock()
return nil
}
Expand Down Expand Up @@ -589,6 +618,7 @@ func (m *Manager) isCampaignProcessing(id int) bool {
func (m *Manager) exhaustCampaign(c *models.Campaign, status string) (*models.Campaign, error) {
m.campsMut.Lock()
delete(m.camps, c.ID)
delete(m.campRates, c.ID)
m.campsMut.Unlock()

// A status has been passed. Change the campaign's status
Expand Down

0 comments on commit 0f6a037

Please sign in to comment.