Skip to content

Commit

Permalink
First attempt at shared MQTT subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
oscgonfer authored and timcowlishaw committed Oct 22, 2024
1 parent 8ca2cb4 commit c0f9898
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 23 deletions.
6 changes: 5 additions & 1 deletion compose.override.local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ services:
restart: "no"
sidekiq:
restart: "no"
mqtt-task:
mqtt-task-main-1:
restart: "no"
mqtt-task-main-2:
restart: "no"
mqtt-task-secondary:
restart: "no"
telnet-task:
restart: "no"
Expand Down
4 changes: 3 additions & 1 deletion compose/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ services:
- auth
- redis
- sidekiq
- mqtt-task
- mqtt-task-main-1
- mqtt-task-main-2
- mqtt-task-secondary
- telnet-task
#- mqtt
restart: always
Expand Down
1 change: 1 addition & 0 deletions compose/db.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
services:
db:
image: postgres:10
command: -c max_connections=200
volumes:
- sck-postgres:/var/lib/postgresql/data
env_file: ../.env
Expand Down
14 changes: 14 additions & 0 deletions compose/mqtt-task-common.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
mqtt-task:
build: ../
env_file: ../.env
command: bundle exec rake mqtt:sub
restart: always
volumes:
- "../log:/app/log"
logging:
driver: "json-file"
options:
max-size: "100m"
environment:
db_pool_size: 2
39 changes: 26 additions & 13 deletions compose/mqtt-task.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
services:
mqtt-task:
build: ../
env_file: ../.env
command: bundle exec rake mqtt:sub
restart: always
volumes:
- "../log:/app/log"
logging:
driver: "json-file"
options:
max-size: "100m"
environment:
db_pool_size: 5
mqtt-task-main-1:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: smartcitizen-api-staging-main-1
MQTT_CLEAN_SESSION: false
mqtt-task-main-2:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: smartcitizen-api-staging-main-2
MQTT_CLEAN_SESSION: false
mqtt-task-secondary:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: "smartcitizen-api-staging-secondary-${HOSTNAME}"
MQTT_CLEAN_SESSION: true
deploy:
mode: replicated
replicas: 2


2 changes: 1 addition & 1 deletion compose/telnet-task.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ services:
command: bundle exec rake telnet:push
restart: always
environment:
db_pool_size: 5
db_pool_size: 2
1 change: 1 addition & 0 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ REDIS_STORE=redis://redis:6379/3

# MQTT Settings
MQTT_HOST=mqtt
#MQTT_SHARED_SUBSCRIPTION_GROUP="group1"
#MQTT_CLEAN_SESSION=true
#MQTT_CLIENT_ID=some_id
#MQTT_PORT=port
Expand Down
15 changes: 8 additions & 7 deletions lib/tasks/mqtt_subscriber.rake
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ namespace :mqtt do
task sub: :environment do
pid_file = Rails.root.join('tmp/pids/mqtt_subscriber.pid')
File.open(pid_file, 'w') { |f| f.puts Process.pid }
mqtt_log = Logger.new('log/mqtt.log', 5, 100.megabytes)

mqtt_clean_session = ENV.has_key?('MQTT_CLEAN_SESSION') ? ENV['MQTT_CLEAN_SESSION'] == "true" : true
mqtt_client_id = ENV.has_key?('MQTT_CLIENT_ID') ? ENV['MQTT_CLIENT_ID'] : nil
mqtt_host = ENV.has_key?('MQTT_HOST') ? ENV['MQTT_HOST'] : 'mqtt'
mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883
mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false
mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil)
mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '')
mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ]
mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes)
mqtt_log.info('MQTT TASK STARTING')
mqtt_log.info("clean_session: #{mqtt_clean_session}")
mqtt_log.info("client_id: #{mqtt_client_id}")
Expand All @@ -32,15 +33,15 @@ namespace :mqtt do
mqtt_log.info "Using clean_session setting: #{client.clean_session}"

message_handler = MqttMessagesHandler.new

prefix = mqtt_shared_subscription_group ? "$share/#{mqtt_shared_subscription_group}" : "$queue"
client.subscribe(*mqtt_topics.flat_map { |topic|
topic = topic == "" ? topic : topic + "/"
[
"$queue/#{topic}device/sck/+/readings" => 2,
"$queue/#{topic}device/sck/+/readings/raw" => 2,
"$queue/#{topic}device/sck/+/hello" => 2,
"$queue/#{topic}device/sck/+/info" => 2,
"$queue/#{topic}device/inventory" => 2
"#{prefix}/#{topic}device/sck/+/readings" => 2,
"#{prefix}/#{topic}device/sck/+/readings/raw" => 2,
"#{prefix}/#{topic}device/sck/+/hello" => 2,
"#{prefix}/#{topic}device/sck/+/info" => 2,
"#{prefix}/#{topic}device/inventory" => 2
]
})

Expand Down

0 comments on commit c0f9898

Please sign in to comment.