Skip to content

Commit

Permalink
fix(message-dequeuer): ability to disable batching
Browse files Browse the repository at this point in the history
  • Loading branch information
adamcooke committed Mar 16, 2024
1 parent 45dd8aa commit 4fcb9e9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 7 deletions.
4 changes: 3 additions & 1 deletion app/lib/message_dequeuer/initial_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def process
# Process the original message and then all of those
# found for batching.
process_message(@queued_message)
@other_messages.each { |message| process_message(message) }
@other_messages&.each { |message| process_message(message) }
end
ensure
@state.finished
Expand Down Expand Up @@ -45,6 +45,8 @@ def check_message_is_ready
end

def find_other_messages_for_batch
return unless Postal::Config.postal.batch_queued_messages?

@other_messages = @queued_message.batchable_messages(100)
log "found #{@other_messages.size} associated messages to process at the same time", batch_key: @queued_message.batch_key
rescue StandardError
Expand Down
1 change: 1 addition & 0 deletions doc/config/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This document contains all the environment variables which are available for thi
| `POSTAL_SMTP_RELAYS` | Array of strings | An array of SMTP relays in the format of smtp://host:port | [] |
| `POSTAL_TRUSTED_PROXIES` | Array of strings | An array of IP addresses to trust for proxying requests to Postal (in addition to localhost addresses) | [] |
| `POSTAL_QUEUED_MESSAGE_LOCK_STALE_DAYS` | Integer | The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried. | 1 |
| `POSTAL_BATCH_QUEUED_MESSAGES` | Boolean | When enabled queued messages will be de-queued in batches based on their destination | true |
| `WEB_SERVER_DEFAULT_PORT` | Integer | The default port the web server should listen on unless overriden by the PORT environment variable | 5000 |
| `WEB_SERVER_DEFAULT_BIND_ADDRESS` | String | The default bind address the web server should listen on unless overriden by the BIND_ADDRESS environment variable | 127.0.0.1 |
| `WEB_SERVER_MAX_THREADS` | Integer | The maximum number of threads which can be used by the web server | 5 |
Expand Down
2 changes: 2 additions & 0 deletions doc/config/yaml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ postal:
trusted_proxies: []
# The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried.
queued_message_lock_stale_days: 1
# When enabled queued messages will be de-queued in batches based on their destination
batch_queued_messages: true

web_server:
# The default port the web server should listen on unless overriden by the PORT environment variable
Expand Down
5 changes: 5 additions & 0 deletions lib/postal/config_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ module Postal
description "The number of days after which to consider a lock as stale. Messages with stale locks will be removed and not retried."
default 1
end

boolean :batch_queued_messages do
description "When enabled queued messages will be de-queued in batches based on their destination"
default true
end
end

group :web_server do
Expand Down
27 changes: 21 additions & 6 deletions spec/lib/message_dequeuer/initial_message_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,28 @@ module MessageDequeuer
@queued_message3 = create(:queued_message, message: @message3)
end

it "calls the single message process for the initial message and all batchable messages" do
[queued_message, @queued_message2, @queued_message3].each do |msg|
expect(SingleMessageProcessor).to receive(:process).with(msg,
logger: logger,
state: processor.state)
context "when postal.batch_queued_messages is enabled" do
it "calls the single message process for the initial message and all batchable messages" do
[queued_message, @queued_message2, @queued_message3].each do |msg|
expect(SingleMessageProcessor).to receive(:process).with(msg,
logger: logger,
state: processor.state)
end
processor.process
end
end

context "when postal.batch_queued_messages is disabled" do
before do
allow(Postal::Config.postal).to receive(:batch_queued_messages?) { false }
end

it "does not call the single message process more than once" do
expect(SingleMessageProcessor).to receive(:process).once.with(queued_message,
logger: logger,
state: processor.state)
processor.process
end
processor.process
end
end

Expand Down

0 comments on commit 4fcb9e9

Please sign in to comment.