diff --git a/Gemfile.lock b/Gemfile.lock index c32c6f2f..09d294a4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -620,4 +620,4 @@ RUBY VERSION ruby 3.0.6p216 BUNDLED WITH - 2.5.20 + 2.5.21 diff --git a/app/models/component.rb b/app/models/component.rb index fa4b9ae5..c3b2c5a9 100644 --- a/app/models/component.rb +++ b/app/models/component.rb @@ -5,8 +5,17 @@ class Component < ActiveRecord::Base belongs_to :sensor validates_presence_of :device, :sensor - validates :sensor_id, :uniqueness => { :scope => [:device_id] } - validates :key, :uniqueness => { :scope => [:device_id] } + + # IMPORTANT: Validation of sensor/device uniqueness is done at the database level, + # as this allows us to use the create_or_find_by! method to atomically upsert components + # in the mqtt-task, avoiding component duplication due to race conditions. + # For some reason, create_or_find_by! ONLY works when the database constraint is + # the ONLY uniqueness constraint on those two values, so adding a rails validation here + # causes an error. Leaving the validations here commented out by way of documentation. + # See https://stackoverflow.com/questions/74566974/create-or-find-by-not-working-as-it-should-in-rails-6 + # validates :sensor_id, :uniqueness => { :scope => [:device_id] } + # validates :key, :uniqueness => { :scope => [:device_id] } + before_validation :set_key, on: :create diff --git a/app/models/concerns/data_parser/storer.rb b/app/models/concerns/data_parser/storer.rb index 721ddd10..84a37720 100644 --- a/app/models/concerns/data_parser/storer.rb +++ b/app/models/concerns/data_parser/storer.rb @@ -54,7 +54,7 @@ def timestamp_parse(timestamp) end def sensor_reading(device, sensor) - component = device.find_or_create_component_for_sensor_reading(sensor) + component = device.create_or_find_component_for_sensor_reading(sensor) return nil if component.nil? value = component.normalized_value( (Float(sensor['value']) rescue sensor['value']) ) { diff --git a/app/models/device.rb b/app/models/device.rb index 503997c0..52c37a8e 100644 --- a/app/models/device.rb +++ b/app/models/device.rb @@ -110,25 +110,25 @@ def sensor_map components.map { |c| [c.key, c.sensor.id]}.to_h end - def find_or_create_component_by_sensor_id(sensor_id) + def create_or_find_component_by_sensor_id(sensor_id) return nil if sensor_id.nil? || !Sensor.exists?(id: sensor_id) - components.find_or_create_by(sensor_id: sensor_id) + components.create_or_find_by!(sensor_id: sensor_id) end - def find_or_create_component_by_sensor_key(sensor_key) + def create_or_find_component_by_sensor_key(sensor_key) return nil if sensor_key.nil? sensor = Sensor.find_by(default_key: sensor_key) return nil if sensor.nil? - components.find_or_create_by(sensor_id: sensor.id) + components.create_or_find_by!(sensor_id: sensor.id) end - def find_or_create_component_for_sensor_reading(reading) + def create_or_find_component_for_sensor_reading(reading) key_or_id = reading["id"] if key_or_id.is_a?(Integer) || key_or_id =~ /\d+/ # It's an integer and therefore a sensor id - find_or_create_component_by_sensor_id(key_or_id) + create_or_find_component_by_sensor_id(key_or_id) else - find_or_create_component_by_sensor_key(key_or_id) + create_or_find_component_by_sensor_key(key_or_id) end end @@ -251,7 +251,10 @@ def remove_mac_address_for_newly_registered_device! def update_component_timestamps(timestamp, sensor_ids) components.select {|c| sensor_ids.include?(c.sensor_id) }.each do |component| - component.update_column(:last_reading_at, timestamp) + component.lock! if self.class.connection.transaction_open? + if !component.reload.last_reading_at || timestamp > component.last_reading_at + component.update_column(:last_reading_at, timestamp) + end end end diff --git a/app/models/raw_storer.rb b/app/models/raw_storer.rb index 0e46402b..89e5f144 100644 --- a/app/models/raw_storer.rb +++ b/app/models/raw_storer.rb @@ -30,7 +30,7 @@ def store data, mac, version, ip, raise_errors=false metric_id = device.find_sensor_id_by_key(metric) - component = device.find_or_create_component_by_sensor_id(metric_id) + component = device.create_or_find_component_by_sensor_id(metric_id) next if component.nil? value = component.normalized_value( (Float(value) rescue value) ) diff --git a/app/models/storer.rb b/app/models/storer.rb index bd40d9cd..07fd1d6a 100644 --- a/app/models/storer.rb +++ b/app/models/storer.rb @@ -23,18 +23,20 @@ def store device, reading, do_update = true def update_device(device, parsed_ts, sql_data) return if parsed_ts <= Time.at(0) + device.transaction do + device.lock! + if device.reload.last_reading_at.present? + # Comparison errors if device.last_reading_at is nil (new devices). + # Devices can post multiple readings, in a non-sorted order. + # Do not update data with an older timestamp. + return if parsed_ts < device.last_reading_at + end - if device.last_reading_at.present? - # Comparison errors if device.last_reading_at is nil (new devices). - # Devices can post multiple readings, in a non-sorted order. - # Do not update data with an older timestamp. - return if parsed_ts < device.last_reading_at + sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data + device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published') + sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq + device.update_component_timestamps(parsed_ts, sensor_ids) end - - sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data - device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published') - sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq - device.update_component_timestamps(parsed_ts, sensor_ids) end def kairos_publish(reading_data) diff --git a/compose.override.local.yml b/compose.override.local.yml index 7d37b254..696dd628 100644 --- a/compose.override.local.yml +++ b/compose.override.local.yml @@ -7,7 +7,11 @@ services: restart: "no" sidekiq: restart: "no" - mqtt-task: + mqtt-task-main-1: + restart: "no" + mqtt-task-main-2: + restart: "no" + mqtt-task-secondary: restart: "no" telnet-task: restart: "no" diff --git a/compose/app.yml b/compose/app.yml index e4b311b9..0d202996 100644 --- a/compose/app.yml +++ b/compose/app.yml @@ -15,7 +15,9 @@ services: - auth - redis - sidekiq - - mqtt-task + - mqtt-task-main-1 + - mqtt-task-main-2 + - mqtt-task-secondary - telnet-task #- mqtt restart: always diff --git a/compose/db.yml b/compose/db.yml index 1f67828a..f11b4a59 100644 --- a/compose/db.yml +++ b/compose/db.yml @@ -1,6 +1,7 @@ services: db: image: postgres:10 + command: -c max_connections=200 volumes: - sck-postgres:/var/lib/postgresql/data env_file: ../.env diff --git a/compose/mqtt-task-common.yml b/compose/mqtt-task-common.yml new file mode 100644 index 00000000..12146f9c --- /dev/null +++ b/compose/mqtt-task-common.yml @@ -0,0 +1,14 @@ +services: + mqtt-task: + build: ../ + env_file: ../.env + command: ./mqtt_subscriber.sh + restart: always + volumes: + - "../log:/app/log" + logging: + driver: "json-file" + options: + max-size: "100m" + environment: + db_pool_size: 5 diff --git a/compose/mqtt-task.yml b/compose/mqtt-task.yml index e7f94c43..ab83109b 100644 --- a/compose/mqtt-task.yml +++ b/compose/mqtt-task.yml @@ -1,14 +1,27 @@ services: - mqtt-task: - build: ../ - env_file: ../.env - command: bundle exec rake mqtt:sub - restart: always - volumes: - - "../log:/app/log" - logging: - driver: "json-file" - options: - max-size: "100m" - environment: - db_pool_size: 5 + mqtt-task-main-1: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: smartcitizen-api-staging-main-1 + MQTT_CLEAN_SESSION: false + mqtt-task-main-2: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: smartcitizen-api-staging-main-2 + MQTT_CLEAN_SESSION: false + mqtt-task-secondary: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: "smartcitizen-api-staging-secondary" + MQTT_CLEAN_SESSION: true + deploy: + mode: replicated + replicas: 2 + + diff --git a/compose/telnet-task.yml b/compose/telnet-task.yml index caa14fa2..272dc162 100644 --- a/compose/telnet-task.yml +++ b/compose/telnet-task.yml @@ -5,4 +5,4 @@ services: command: bundle exec rake telnet:push restart: always environment: - db_pool_size: 5 + db_pool_size: 2 diff --git a/db/migrate/20241009174732_unique_index_on_components.rb b/db/migrate/20241009174732_unique_index_on_components.rb new file mode 100644 index 00000000..a3fc5f01 --- /dev/null +++ b/db/migrate/20241009174732_unique_index_on_components.rb @@ -0,0 +1,23 @@ +class UniqueIndexOnComponents < ActiveRecord::Migration[6.1] + def up + remove_index :components, [:device_id, :sensor_id] + add_index :components, [:device_id, :sensor_id], unique: true + execute %{ + ALTER TABLE components ADD CONSTRAINT unique_sensor_for_device UNIQUE (device_id, sensor_id) + } + execute %{ + ALTER TABLE components ADD CONSTRAINT unique_key_for_device UNIQUE (device_id, key) + } + end + + def down + execute %{ + ALTER TABLE components DROP CONSTRAINT IF EXISTS unique_key_for_device + } + execute %{ + ALTER TABLE components DROP CONSTRAINT IF EXISTS unique_sensor_for_device + } + remove_index :components, [:device_id, :sensor_id], unique: true + add_index :components, [:device_id, :sensor_id] + end +end diff --git a/db/schema.rb b/db/schema.rb index 7d9f5d01..cf2d78d4 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,8 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2024_10_01_080033) do - +ActiveRecord::Schema.define(version: 2024_10_09_174732) do # These are extensions that must be enabled in order to support this database enable_extension "adminpack" enable_extension "hstore" @@ -66,7 +65,9 @@ t.string "key" t.integer "bus", default: 1, null: false t.datetime "last_reading_at" - t.index ["device_id", "sensor_id"], name: "index_components_on_device_id_and_sensor_id" + t.index ["device_id", "key"], name: "unique_key_for_device", unique: true + t.index ["device_id", "sensor_id"], name: "index_components_on_device_id_and_sensor_id", unique: true + t.index ["device_id", "sensor_id"], name: "unique_sensor_for_device", unique: true end create_table "devices", id: :serial, force: :cascade do |t| diff --git a/env.example b/env.example index f018c684..fcdd837d 100644 --- a/env.example +++ b/env.example @@ -18,6 +18,7 @@ REDIS_STORE=redis://redis:6379/3 # MQTT Settings MQTT_HOST=mqtt +#MQTT_SHARED_SUBSCRIPTION_GROUP="group1" #MQTT_CLEAN_SESSION=true #MQTT_CLIENT_ID=some_id #MQTT_PORT=port diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 41c45a15..1e4afe26 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -3,15 +3,23 @@ namespace :mqtt do task sub: :environment do pid_file = Rails.root.join('tmp/pids/mqtt_subscriber.pid') File.open(pid_file, 'w') { |f| f.puts Process.pid } - mqtt_log = Logger.new('log/mqtt.log', 5, 100.megabytes) mqtt_clean_session = ENV.has_key?('MQTT_CLEAN_SESSION') ? ENV['MQTT_CLEAN_SESSION'] == "true" : true mqtt_client_id = ENV.has_key?('MQTT_CLIENT_ID') ? ENV['MQTT_CLIENT_ID'] : nil mqtt_host = ENV.has_key?('MQTT_HOST') ? ENV['MQTT_HOST'] : 'mqtt' mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883 mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false + mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil) + mqtt_queue_length_warning_threshold = ENV.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", "30").to_i + mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '') mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ] + + if mqtt_shared_subscription_group && mqtt_clean_session + mqtt_client_id += "-#{ENV.fetch("HOSTNAME")}" + end + + mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes) mqtt_log.info('MQTT TASK STARTING') mqtt_log.info("clean_session: #{mqtt_clean_session}") mqtt_log.info("client_id: #{mqtt_client_id}") @@ -32,18 +40,18 @@ namespace :mqtt do mqtt_log.info "Using clean_session setting: #{client.clean_session}" message_handler = MqttMessagesHandler.new - + prefix = mqtt_shared_subscription_group ? "$share/#{mqtt_shared_subscription_group}" : "$queue" client.subscribe(*mqtt_topics.flat_map { |topic| topic = topic == "" ? topic : topic + "/" [ - "$queue/#{topic}device/sck/+/readings" => 2, - "$queue/#{topic}device/sck/+/readings/raw" => 2, - "$queue/#{topic}device/sck/+/hello" => 2, - "$queue/#{topic}device/sck/+/info" => 2, - "$queue/#{topic}device/inventory" => 2 + "#{prefix}/#{topic}device/sck/+/readings" => 2, + "#{prefix}/#{topic}device/sck/+/readings/raw" => 2, + "#{prefix}/#{topic}device/sck/+/hello" => 2, + "#{prefix}/#{topic}device/sck/+/info" => 2, + "#{prefix}/#{topic}device/inventory" => 2 ] }) - + threshold_passed = false client.get do |topic, message| Sentry.with_scope do begin @@ -52,6 +60,14 @@ namespace :mqtt do end mqtt_log.info "Processed MQTT message in #{time}" mqtt_log.info "MQTT queue length: #{client.queue_length}" + if client.queue_length >= mqtt_queue_length_warning_threshold + if !threshold_passed + Sentry.capture_message("Warning: Internal MQTT queue length is #{client.queue_length} (>= #{mqtt_queue_length_warning_threshold} on client #{mqtt_client_id}).") + threshold_passed = true + end + else + threshold_passed = false + end rescue Exception => e mqtt_log.info e Sentry.capture_exception(e) diff --git a/mqtt_subscriber.sh b/mqtt_subscriber.sh new file mode 100755 index 00000000..481d728e --- /dev/null +++ b/mqtt_subscriber.sh @@ -0,0 +1,2 @@ +#!/bin/bash +bundle exec rake mqtt:sub diff --git a/spec/models/component_spec.rb b/spec/models/component_spec.rb index e6ea21ad..dc30c5b9 100644 --- a/spec/models/component_spec.rb +++ b/spec/models/component_spec.rb @@ -8,7 +8,7 @@ it "validates uniqueness of board to sensor" do component = create(:component, device: create(:device), sensor: create(:sensor)) - expect{ create(:component, device: component.device, sensor: component.sensor) }.to raise_error(ActiveRecord::RecordInvalid) + expect{ create(:component, device: component.device, sensor: component.sensor) }.to raise_error(ActiveRecord::RecordNotUnique) end describe "creating a unique sensor key" do diff --git a/spec/models/device_spec.rb b/spec/models/device_spec.rb index c03a8c81..39aed487 100644 --- a/spec/models/device_spec.rb +++ b/spec/models/device_spec.rb @@ -645,19 +645,19 @@ end end - describe "#find_or_create_component_by_sensor_id" do + describe "#create_or_find_component_by_sensor_id" do context "when the sensor exists and a component already exists for this device" do it "returns the existing component" do sensor = create(:sensor) component = create(:component, sensor: sensor, device: device) - expect(device.find_or_create_component_by_sensor_id(sensor.id)).to eq(component) + expect(device.create_or_find_component_by_sensor_id(sensor.id)).to eq(component) end end context "when the sensor exists and a component does not already exist for this device" do it "returns a new valid component with the correct sensor and device" do sensor = create(:sensor) - component = device.find_or_create_component_by_sensor_id(sensor.id) + component = device.create_or_find_component_by_sensor_id(sensor.id) expect(component).not_to be_blank expect(component).to be_a Component expect(component.valid?).to be(true) @@ -670,13 +670,13 @@ context "when no sensor exists with this id" do it "returns nil" do create(:sensor, id: 12345) - expect(device.find_or_create_component_by_sensor_id(54321)).to be_blank + expect(device.create_or_find_component_by_sensor_id(54321)).to be_blank end end context "when the id is nil" do it "returns nil" do - expect(device.find_or_create_component_by_sensor_id(nil)).to be_blank + expect(device.create_or_find_component_by_sensor_id(nil)).to be_blank end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 2d5c8cb4..9de16f78 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -14,7 +14,6 @@ # users commonly want. # # See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration - RSpec.configure do |config| # rspec-expectations config goes here. You can use an alternate # assertion/expectation library such as wrong or the stdlib/minitest