Skip to content

Commit

Permalink
Merge pull request #361 from fablabbcn/mqtt-task-shared-subscription
Browse files Browse the repository at this point in the history
MQTT subscription
  • Loading branch information
timcowlishaw authored Oct 22, 2024
2 parents 8ca2cb4 + a728506 commit 9430b5f
Show file tree
Hide file tree
Showing 20 changed files with 147 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,4 @@ RUBY VERSION
ruby 3.0.6p216

BUNDLED WITH
2.5.20
2.5.21
13 changes: 11 additions & 2 deletions app/models/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@ class Component < ActiveRecord::Base
belongs_to :sensor

validates_presence_of :device, :sensor
validates :sensor_id, :uniqueness => { :scope => [:device_id] }
validates :key, :uniqueness => { :scope => [:device_id] }

# IMPORTANT: Validation of sensor/device uniqueness is done at the database level,
# as this allows us to use the create_or_find_by! method to atomically upsert components
# in the mqtt-task, avoiding component duplication due to race conditions.
# For some reason, create_or_find_by! ONLY works when the database constraint is
# the ONLY uniqueness constraint on those two values, so adding a rails validation here
# causes an error. Leaving the validations here commented out by way of documentation.
# See https://stackoverflow.com/questions/74566974/create-or-find-by-not-working-as-it-should-in-rails-6
# validates :sensor_id, :uniqueness => { :scope => [:device_id] }
# validates :key, :uniqueness => { :scope => [:device_id] }


before_validation :set_key, on: :create

Expand Down
2 changes: 1 addition & 1 deletion app/models/concerns/data_parser/storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def timestamp_parse(timestamp)
end

def sensor_reading(device, sensor)
component = device.find_or_create_component_for_sensor_reading(sensor)
component = device.create_or_find_component_for_sensor_reading(sensor)
return nil if component.nil?
value = component.normalized_value( (Float(sensor['value']) rescue sensor['value']) )
{
Expand Down
19 changes: 11 additions & 8 deletions app/models/device.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,25 @@ def sensor_map
components.map { |c| [c.key, c.sensor.id]}.to_h
end

def find_or_create_component_by_sensor_id(sensor_id)
def create_or_find_component_by_sensor_id(sensor_id)
return nil if sensor_id.nil? || !Sensor.exists?(id: sensor_id)
components.find_or_create_by(sensor_id: sensor_id)
components.create_or_find_by!(sensor_id: sensor_id)
end

def find_or_create_component_by_sensor_key(sensor_key)
def create_or_find_component_by_sensor_key(sensor_key)
return nil if sensor_key.nil?
sensor = Sensor.find_by(default_key: sensor_key)
return nil if sensor.nil?
components.find_or_create_by(sensor_id: sensor.id)
components.create_or_find_by!(sensor_id: sensor.id)
end

def find_or_create_component_for_sensor_reading(reading)
def create_or_find_component_for_sensor_reading(reading)
key_or_id = reading["id"]
if key_or_id.is_a?(Integer) || key_or_id =~ /\d+/
# It's an integer and therefore a sensor id
find_or_create_component_by_sensor_id(key_or_id)
create_or_find_component_by_sensor_id(key_or_id)
else
find_or_create_component_by_sensor_key(key_or_id)
create_or_find_component_by_sensor_key(key_or_id)
end
end

Expand Down Expand Up @@ -251,7 +251,10 @@ def remove_mac_address_for_newly_registered_device!

def update_component_timestamps(timestamp, sensor_ids)
components.select {|c| sensor_ids.include?(c.sensor_id) }.each do |component|
component.update_column(:last_reading_at, timestamp)
component.lock! if self.class.connection.transaction_open?
if !component.reload.last_reading_at || timestamp > component.last_reading_at
component.update_column(:last_reading_at, timestamp)
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/raw_storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def store data, mac, version, ip, raise_errors=false

metric_id = device.find_sensor_id_by_key(metric)

component = device.find_or_create_component_by_sensor_id(metric_id)
component = device.create_or_find_component_by_sensor_id(metric_id)
next if component.nil?

value = component.normalized_value( (Float(value) rescue value) )
Expand Down
22 changes: 12 additions & 10 deletions app/models/storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ def store device, reading, do_update = true

def update_device(device, parsed_ts, sql_data)
return if parsed_ts <= Time.at(0)
device.transaction do
device.lock!
if device.reload.last_reading_at.present?
# Comparison errors if device.last_reading_at is nil (new devices).
# Devices can post multiple readings, in a non-sorted order.
# Do not update data with an older timestamp.
return if parsed_ts < device.last_reading_at
end

if device.last_reading_at.present?
# Comparison errors if device.last_reading_at is nil (new devices).
# Devices can post multiple readings, in a non-sorted order.
# Do not update data with an older timestamp.
return if parsed_ts < device.last_reading_at
sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq
device.update_component_timestamps(parsed_ts, sensor_ids)
end

sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq
device.update_component_timestamps(parsed_ts, sensor_ids)
end

def kairos_publish(reading_data)
Expand Down
6 changes: 5 additions & 1 deletion compose.override.local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ services:
restart: "no"
sidekiq:
restart: "no"
mqtt-task:
mqtt-task-main-1:
restart: "no"
mqtt-task-main-2:
restart: "no"
mqtt-task-secondary:
restart: "no"
telnet-task:
restart: "no"
Expand Down
4 changes: 3 additions & 1 deletion compose/app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ services:
- auth
- redis
- sidekiq
- mqtt-task
- mqtt-task-main-1
- mqtt-task-main-2
- mqtt-task-secondary
- telnet-task
#- mqtt
restart: always
Expand Down
1 change: 1 addition & 0 deletions compose/db.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
services:
db:
image: postgres:10
command: -c max_connections=200
volumes:
- sck-postgres:/var/lib/postgresql/data
env_file: ../.env
Expand Down
14 changes: 14 additions & 0 deletions compose/mqtt-task-common.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
mqtt-task:
build: ../
env_file: ../.env
command: ./mqtt_subscriber.sh
restart: always
volumes:
- "../log:/app/log"
logging:
driver: "json-file"
options:
max-size: "100m"
environment:
db_pool_size: 5
39 changes: 26 additions & 13 deletions compose/mqtt-task.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
services:
mqtt-task:
build: ../
env_file: ../.env
command: bundle exec rake mqtt:sub
restart: always
volumes:
- "../log:/app/log"
logging:
driver: "json-file"
options:
max-size: "100m"
environment:
db_pool_size: 5
mqtt-task-main-1:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: smartcitizen-api-staging-main-1
MQTT_CLEAN_SESSION: false
mqtt-task-main-2:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: smartcitizen-api-staging-main-2
MQTT_CLEAN_SESSION: false
mqtt-task-secondary:
extends:
file: mqtt-task-common.yml
service: mqtt-task
environment:
MQTT_CLIENT_ID: "smartcitizen-api-staging-secondary"
MQTT_CLEAN_SESSION: true
deploy:
mode: replicated
replicas: 2


2 changes: 1 addition & 1 deletion compose/telnet-task.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ services:
command: bundle exec rake telnet:push
restart: always
environment:
db_pool_size: 5
db_pool_size: 2
23 changes: 23 additions & 0 deletions db/migrate/20241009174732_unique_index_on_components.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
class UniqueIndexOnComponents < ActiveRecord::Migration[6.1]
def up
remove_index :components, [:device_id, :sensor_id]
add_index :components, [:device_id, :sensor_id], unique: true
execute %{
ALTER TABLE components ADD CONSTRAINT unique_sensor_for_device UNIQUE (device_id, sensor_id)
}
execute %{
ALTER TABLE components ADD CONSTRAINT unique_key_for_device UNIQUE (device_id, key)
}
end

def down
execute %{
ALTER TABLE components DROP CONSTRAINT IF EXISTS unique_key_for_device
}
execute %{
ALTER TABLE components DROP CONSTRAINT IF EXISTS unique_sensor_for_device
}
remove_index :components, [:device_id, :sensor_id], unique: true
add_index :components, [:device_id, :sensor_id]
end
end
7 changes: 4 additions & 3 deletions db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2024_10_01_080033) do

ActiveRecord::Schema.define(version: 2024_10_09_174732) do
# These are extensions that must be enabled in order to support this database
enable_extension "adminpack"
enable_extension "hstore"
Expand Down Expand Up @@ -66,7 +65,9 @@
t.string "key"
t.integer "bus", default: 1, null: false
t.datetime "last_reading_at"
t.index ["device_id", "sensor_id"], name: "index_components_on_device_id_and_sensor_id"
t.index ["device_id", "key"], name: "unique_key_for_device", unique: true
t.index ["device_id", "sensor_id"], name: "index_components_on_device_id_and_sensor_id", unique: true
t.index ["device_id", "sensor_id"], name: "unique_sensor_for_device", unique: true
end

create_table "devices", id: :serial, force: :cascade do |t|
Expand Down
1 change: 1 addition & 0 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ REDIS_STORE=redis://redis:6379/3

# MQTT Settings
MQTT_HOST=mqtt
#MQTT_SHARED_SUBSCRIPTION_GROUP="group1"
#MQTT_CLEAN_SESSION=true
#MQTT_CLIENT_ID=some_id
#MQTT_PORT=port
Expand Down
32 changes: 24 additions & 8 deletions lib/tasks/mqtt_subscriber.rake
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@ namespace :mqtt do
task sub: :environment do
pid_file = Rails.root.join('tmp/pids/mqtt_subscriber.pid')
File.open(pid_file, 'w') { |f| f.puts Process.pid }
mqtt_log = Logger.new('log/mqtt.log', 5, 100.megabytes)

mqtt_clean_session = ENV.has_key?('MQTT_CLEAN_SESSION') ? ENV['MQTT_CLEAN_SESSION'] == "true" : true
mqtt_client_id = ENV.has_key?('MQTT_CLIENT_ID') ? ENV['MQTT_CLIENT_ID'] : nil
mqtt_host = ENV.has_key?('MQTT_HOST') ? ENV['MQTT_HOST'] : 'mqtt'
mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883
mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false
mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil)
mqtt_queue_length_warning_threshold = ENV.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", "30").to_i

mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '')
mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ]

if mqtt_shared_subscription_group && mqtt_clean_session
mqtt_client_id += "-#{ENV.fetch("HOSTNAME")}"
end

mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes)
mqtt_log.info('MQTT TASK STARTING')
mqtt_log.info("clean_session: #{mqtt_clean_session}")
mqtt_log.info("client_id: #{mqtt_client_id}")
Expand All @@ -32,18 +40,18 @@ namespace :mqtt do
mqtt_log.info "Using clean_session setting: #{client.clean_session}"

message_handler = MqttMessagesHandler.new

prefix = mqtt_shared_subscription_group ? "$share/#{mqtt_shared_subscription_group}" : "$queue"
client.subscribe(*mqtt_topics.flat_map { |topic|
topic = topic == "" ? topic : topic + "/"
[
"$queue/#{topic}device/sck/+/readings" => 2,
"$queue/#{topic}device/sck/+/readings/raw" => 2,
"$queue/#{topic}device/sck/+/hello" => 2,
"$queue/#{topic}device/sck/+/info" => 2,
"$queue/#{topic}device/inventory" => 2
"#{prefix}/#{topic}device/sck/+/readings" => 2,
"#{prefix}/#{topic}device/sck/+/readings/raw" => 2,
"#{prefix}/#{topic}device/sck/+/hello" => 2,
"#{prefix}/#{topic}device/sck/+/info" => 2,
"#{prefix}/#{topic}device/inventory" => 2
]
})

threshold_passed = false
client.get do |topic, message|
Sentry.with_scope do
begin
Expand All @@ -52,6 +60,14 @@ namespace :mqtt do
end
mqtt_log.info "Processed MQTT message in #{time}"
mqtt_log.info "MQTT queue length: #{client.queue_length}"
if client.queue_length >= mqtt_queue_length_warning_threshold
if !threshold_passed
Sentry.capture_message("Warning: Internal MQTT queue length is #{client.queue_length} (>= #{mqtt_queue_length_warning_threshold} on client #{mqtt_client_id}).")
threshold_passed = true
end
else
threshold_passed = false
end
rescue Exception => e
mqtt_log.info e
Sentry.capture_exception(e)
Expand Down
2 changes: 2 additions & 0 deletions mqtt_subscriber.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
bundle exec rake mqtt:sub
2 changes: 1 addition & 1 deletion spec/models/component_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

it "validates uniqueness of board to sensor" do
component = create(:component, device: create(:device), sensor: create(:sensor))
expect{ create(:component, device: component.device, sensor: component.sensor) }.to raise_error(ActiveRecord::RecordInvalid)
expect{ create(:component, device: component.device, sensor: component.sensor) }.to raise_error(ActiveRecord::RecordNotUnique)
end

describe "creating a unique sensor key" do
Expand Down
10 changes: 5 additions & 5 deletions spec/models/device_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -645,19 +645,19 @@
end
end

describe "#find_or_create_component_by_sensor_id" do
describe "#create_or_find_component_by_sensor_id" do
context "when the sensor exists and a component already exists for this device" do
it "returns the existing component" do
sensor = create(:sensor)
component = create(:component, sensor: sensor, device: device)
expect(device.find_or_create_component_by_sensor_id(sensor.id)).to eq(component)
expect(device.create_or_find_component_by_sensor_id(sensor.id)).to eq(component)
end
end

context "when the sensor exists and a component does not already exist for this device" do
it "returns a new valid component with the correct sensor and device" do
sensor = create(:sensor)
component = device.find_or_create_component_by_sensor_id(sensor.id)
component = device.create_or_find_component_by_sensor_id(sensor.id)
expect(component).not_to be_blank
expect(component).to be_a Component
expect(component.valid?).to be(true)
Expand All @@ -670,13 +670,13 @@
context "when no sensor exists with this id" do
it "returns nil" do
create(:sensor, id: 12345)
expect(device.find_or_create_component_by_sensor_id(54321)).to be_blank
expect(device.create_or_find_component_by_sensor_id(54321)).to be_blank
end
end

context "when the id is nil" do
it "returns nil" do
expect(device.find_or_create_component_by_sensor_id(nil)).to be_blank
expect(device.create_or_find_component_by_sensor_id(nil)).to be_blank
end
end
end
Expand Down
1 change: 0 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# users commonly want.
#
# See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration

RSpec.configure do |config|
# rspec-expectations config goes here. You can use an alternate
# assertion/expectation library such as wrong or the stdlib/minitest
Expand Down

0 comments on commit 9430b5f

Please sign in to comment.