From c0f9898500e87ccf58b49d8f7f00eb106ae2b806 Mon Sep 17 00:00:00 2001 From: Oscar Gonzalez Date: Wed, 25 Sep 2024 04:34:04 +0000 Subject: [PATCH] First attempt at shared MQTT subscription --- compose.override.local.yml | 6 +++++- compose/app.yml | 4 +++- compose/db.yml | 1 + compose/mqtt-task-common.yml | 14 ++++++++++++ compose/mqtt-task.yml | 39 ++++++++++++++++++++++------------ compose/telnet-task.yml | 2 +- env.example | 1 + lib/tasks/mqtt_subscriber.rake | 15 +++++++------ 8 files changed, 59 insertions(+), 23 deletions(-) create mode 100644 compose/mqtt-task-common.yml diff --git a/compose.override.local.yml b/compose.override.local.yml index 7d37b254..696dd628 100644 --- a/compose.override.local.yml +++ b/compose.override.local.yml @@ -7,7 +7,11 @@ services: restart: "no" sidekiq: restart: "no" - mqtt-task: + mqtt-task-main-1: + restart: "no" + mqtt-task-main-2: + restart: "no" + mqtt-task-secondary: restart: "no" telnet-task: restart: "no" diff --git a/compose/app.yml b/compose/app.yml index e4b311b9..0d202996 100644 --- a/compose/app.yml +++ b/compose/app.yml @@ -15,7 +15,9 @@ services: - auth - redis - sidekiq - - mqtt-task + - mqtt-task-main-1 + - mqtt-task-main-2 + - mqtt-task-secondary - telnet-task #- mqtt restart: always diff --git a/compose/db.yml b/compose/db.yml index 1f67828a..f11b4a59 100644 --- a/compose/db.yml +++ b/compose/db.yml @@ -1,6 +1,7 @@ services: db: image: postgres:10 + command: -c max_connections=200 volumes: - sck-postgres:/var/lib/postgresql/data env_file: ../.env diff --git a/compose/mqtt-task-common.yml b/compose/mqtt-task-common.yml new file mode 100644 index 00000000..194254f3 --- /dev/null +++ b/compose/mqtt-task-common.yml @@ -0,0 +1,14 @@ +services: + mqtt-task: + build: ../ + env_file: ../.env + command: bundle exec rake mqtt:sub + restart: always + volumes: + - "../log:/app/log" + logging: + driver: "json-file" + options: + max-size: "100m" + environment: + db_pool_size: 2 diff --git a/compose/mqtt-task.yml b/compose/mqtt-task.yml index e7f94c43..03c982d0 100644 --- a/compose/mqtt-task.yml +++ b/compose/mqtt-task.yml @@ -1,14 +1,27 @@ services: - mqtt-task: - build: ../ - env_file: ../.env - command: bundle exec rake mqtt:sub - restart: always - volumes: - - "../log:/app/log" - logging: - driver: "json-file" - options: - max-size: "100m" - environment: - db_pool_size: 5 + mqtt-task-main-1: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: smartcitizen-api-staging-main-1 + MQTT_CLEAN_SESSION: false + mqtt-task-main-2: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: smartcitizen-api-staging-main-2 + MQTT_CLEAN_SESSION: false + mqtt-task-secondary: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: "smartcitizen-api-staging-secondary-${HOSTNAME}" + MQTT_CLEAN_SESSION: true + deploy: + mode: replicated + replicas: 2 + + diff --git a/compose/telnet-task.yml b/compose/telnet-task.yml index caa14fa2..272dc162 100644 --- a/compose/telnet-task.yml +++ b/compose/telnet-task.yml @@ -5,4 +5,4 @@ services: command: bundle exec rake telnet:push restart: always environment: - db_pool_size: 5 + db_pool_size: 2 diff --git a/env.example b/env.example index f018c684..fcdd837d 100644 --- a/env.example +++ b/env.example @@ -18,6 +18,7 @@ REDIS_STORE=redis://redis:6379/3 # MQTT Settings MQTT_HOST=mqtt +#MQTT_SHARED_SUBSCRIPTION_GROUP="group1" #MQTT_CLEAN_SESSION=true #MQTT_CLIENT_ID=some_id #MQTT_PORT=port diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 41c45a15..56529a49 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -3,15 +3,16 @@ namespace :mqtt do task sub: :environment do pid_file = Rails.root.join('tmp/pids/mqtt_subscriber.pid') File.open(pid_file, 'w') { |f| f.puts Process.pid } - mqtt_log = Logger.new('log/mqtt.log', 5, 100.megabytes) mqtt_clean_session = ENV.has_key?('MQTT_CLEAN_SESSION') ? ENV['MQTT_CLEAN_SESSION'] == "true" : true mqtt_client_id = ENV.has_key?('MQTT_CLIENT_ID') ? ENV['MQTT_CLIENT_ID'] : nil mqtt_host = ENV.has_key?('MQTT_HOST') ? ENV['MQTT_HOST'] : 'mqtt' mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883 mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false + mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil) mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '') mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ] + mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes) mqtt_log.info('MQTT TASK STARTING') mqtt_log.info("clean_session: #{mqtt_clean_session}") mqtt_log.info("client_id: #{mqtt_client_id}") @@ -32,15 +33,15 @@ namespace :mqtt do mqtt_log.info "Using clean_session setting: #{client.clean_session}" message_handler = MqttMessagesHandler.new - + prefix = mqtt_shared_subscription_group ? "$share/#{mqtt_shared_subscription_group}" : "$queue" client.subscribe(*mqtt_topics.flat_map { |topic| topic = topic == "" ? topic : topic + "/" [ - "$queue/#{topic}device/sck/+/readings" => 2, - "$queue/#{topic}device/sck/+/readings/raw" => 2, - "$queue/#{topic}device/sck/+/hello" => 2, - "$queue/#{topic}device/sck/+/info" => 2, - "$queue/#{topic}device/inventory" => 2 + "#{prefix}/#{topic}device/sck/+/readings" => 2, + "#{prefix}/#{topic}device/sck/+/readings/raw" => 2, + "#{prefix}/#{topic}device/sck/+/hello" => 2, + "#{prefix}/#{topic}device/sck/+/info" => 2, + "#{prefix}/#{topic}device/inventory" => 2 ] })