Skip to content

Commit

Permalink
Feature/conluz 83 (#84)
Browse files Browse the repository at this point in the history
* [conluz-83] Added a new Telegraf service in docker compose file

* [conluz-83] Created processor job to read from MQTT messages InfluxDB measurement, process data, transform data and then store data in another InfluxDB measurement

* [conluz-83] Removed references to scheduler to get MQTT messages periodically to persist on InfluxDB

* [conluz-83] Fixed some namings
  • Loading branch information
viktorKhan authored Oct 23, 2024
1 parent 301953d commit 302a458
Show file tree
Hide file tree
Showing 18 changed files with 366 additions and 318 deletions.
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-quartz'
implementation 'org.springframework.boot:spring-boot-starter-actuator'

implementation 'org.springframework.integration:spring-integration-mqtt:6.2.3'

implementation 'org.liquibase:liquibase-core:4.28.0'
implementation 'org.influxdb:influxdb-java:2.23'
runtimeOnly 'org.postgresql:postgresql'
Expand Down
17 changes: 17 additions & 0 deletions deploy/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,20 @@ services:
- --path.sysfs=/host/sys
- --path.rootfs=/rootfs
restart: always
telegraf:
image: telegraf:latest
container_name: telegraf
environment:
- SPRING_INFLUXDB_URL
- SPRING_INFLUXDB_USERNAME
- SPRING_INFLUXDB_PASSWORD
- SPRING_INFLUXDB_DATABASE
- CONLUZ_MQTT_SERVER_TOPIC_PREFIX
- CONLUZ_MQTT_SERVER_URI
- CONLUZ_MQTT_SERVER_USERNAME
- CONLUZ_MQTT_SERVER_PASSWORD
depends_on:
- influxdb
volumes:
- ./telegraf.conf:/etc/telegraf/telegraf.conf
restart: unless-stopped
19 changes: 19 additions & 0 deletions deploy/telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[[inputs.mqtt_consumer]]
servers = ["${CONLUZ_MQTT_SERVER_URI}"]
username = "${CONLUZ_MQTT_SERVER_USERNAME}"
password = "${CONLUZ_MQTT_SERVER_PASSWORD}"
qos = 1
connection_timeout = "30s"
topics = [
"${CONLUZ_MQTT_SERVER_TOPIC_PREFIX}/+/emeter/+/power",
]
name_override = "shelly_mqtt_power_messages"
data_format = "value"
data_type = "float"

[[outputs.influxdb]]
urls = ["${SPRING_INFLUXDB_URL}"]
username = "${SPRING_INFLUXDB_USERNAME}"
password = "${SPRING_INFLUXDB_PASSWORD}"
database = "${SPRING_INFLUXDB_DATABASE}" # required
timeout = "5s"
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ List<ShellyInstantConsumption> getHourlyConsumptionsByRangeOfDatesAndSupply(@Not
List<ShellyInstantConsumption> getAllInstantConsumptions();

List<ShellyConsumption> getAllConsumptions();

List<ShellyInstantConsumption> getShellyMqttPowerMessagesByRangeOfDates(OffsetDateTime startDate,
OffsetDateTime endDate);
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.lucoenergia.conluz.infrastructure.consumption.shelly;

import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

import java.time.Instant;

@Measurement(name = ShellyMqttPowerMessagePoint.MEASUREMENT)
public class ShellyMqttPowerMessagePoint {

public static final String MEASUREMENT = "shelly_mqtt_power_messages";
public static final String TOPIC = "topic";
public static final String VALUE = "value";

@Column(name = "time")
private Instant time;
@Column(name = TOPIC)
private String topic;
@Column(name = VALUE)
private Double value;

public Instant getTime() {
return time;
}

public String getTopic() {
return topic;
}

public Double getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import org.lucoenergia.conluz.domain.consumption.shelly.get.GetShellyConsumptionRepository;
import org.lucoenergia.conluz.infrastructure.consumption.shelly.ShellyConsumptionPoint;
import org.lucoenergia.conluz.infrastructure.consumption.shelly.ShellyInstantConsumptionPoint;
import org.lucoenergia.conluz.infrastructure.consumption.shelly.ShellyMqttPowerMessagePoint;
import org.lucoenergia.conluz.infrastructure.shared.db.influxdb.InfluxDbConnectionManager;
import org.lucoenergia.conluz.infrastructure.shared.time.DateConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

@Repository
@Qualifier("getShellyConsumptionRepositoryInflux")
Expand Down Expand Up @@ -82,6 +85,27 @@ public List<ShellyConsumption> getAllConsumptions() {
}
}

@Override
public List<ShellyInstantConsumption> getShellyMqttPowerMessagesByRangeOfDates(OffsetDateTime startDate, OffsetDateTime endDate) {
String startDateAsString = dateConverter.convertToString(startDate);
String endDateAsString = dateConverter.convertToString(endDate);

try (InfluxDB connection = influxDbConnectionManager.getConnection()) {

// Get Shelly MQTT power messages
String query = String.format("SELECT time, %s, %s FROM \"%s\" WHERE time >= '%s' AND time <= '%s'",
ShellyMqttPowerMessagePoint.TOPIC, ShellyMqttPowerMessagePoint.VALUE,
ShellyMqttPowerMessagePoint.MEASUREMENT, startDateAsString, endDateAsString);
QueryResult queryResult = connection.query(new Query(query));

InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<ShellyMqttPowerMessagePoint> consumptionPoints = resultMapper.toPOJO(queryResult, ShellyMqttPowerMessagePoint.class);

// Parse results into instant consumptions
return mapShellyMqttPowerMessagesToInstantConsumption(consumptionPoints);
}
}

private List<ShellyInstantConsumption> mapToInstantConsumption(List<ShellyInstantConsumptionPoint> consumptionPoints) {
return consumptionPoints.stream()
.map(consumptionPoint -> new ShellyInstantConsumption.Builder()
Expand All @@ -102,4 +126,48 @@ private List<ShellyConsumption> mapToConsumption(List<ShellyConsumptionPoint> co
.build())
.toList();
}

private List<ShellyInstantConsumption> mapShellyMqttPowerMessagesToInstantConsumption(List<ShellyMqttPowerMessagePoint> consumptionPoints) {

List<ShellyInstantConsumption> parsedConsumptions = new ArrayList<>();

for (ShellyMqttPowerMessagePoint messagePoint : consumptionPoints) {
Optional<ShellyInstantConsumption> parsedConsumption = processMessage(messagePoint);
parsedConsumption.ifPresent(parsedConsumptions::add);
}

return parsedConsumptions;
}

private Optional<ShellyInstantConsumption> processMessage(ShellyMqttPowerMessagePoint message) {
if (getConsumptionInKw(message) > 0) {
return Optional.of(new ShellyInstantConsumption.Builder()
.withConsumptionKW(getConsumptionInKw(message))
.withTimestamp(message.getTime())
.withPrefix(getPrefix(message))
.withChannel(getChannel(message))
.build());
}
return Optional.empty();
}

private Double getConsumptionInKw(ShellyMqttPowerMessagePoint message) {
return convertFromWToKW(message.getValue());
}

private Double convertFromWToKW(Double energyInW) {
return energyInW / 1000;
}

private String getPrefix(ShellyMqttPowerMessagePoint message) {
String topic = message.getTopic();
String[] slices = topic.split("/");
return slices[1] + "/" + slices[2];
}

private String getChannel(ShellyMqttPowerMessagePoint message) {
String topic = message.getTopic();
String[] slices = topic.split("/");
return slices[4];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.lucoenergia.conluz.infrastructure.consumption.shelly.parse;

import org.lucoenergia.conluz.domain.consumption.shelly.ShellyInstantConsumption;
import org.lucoenergia.conluz.domain.consumption.shelly.get.GetShellyConsumptionRepository;
import org.lucoenergia.conluz.domain.consumption.shelly.persist.PersistShellyConsumptionRepository;
import org.springframework.stereotype.Component;

import java.time.OffsetDateTime;
import java.util.List;

@Component
public class ShellyMqttPowerMessagesToInstantConsumptionsProcessor {

private final GetShellyConsumptionRepository getShellyConsumptionRepository;
private final PersistShellyConsumptionRepository persistShellyConsumptionRepository;

public ShellyMqttPowerMessagesToInstantConsumptionsProcessor(
GetShellyConsumptionRepository getShellyConsumptionRepository,
PersistShellyConsumptionRepository persistShellyConsumptionRepository) {
this.getShellyConsumptionRepository = getShellyConsumptionRepository;
this.persistShellyConsumptionRepository = persistShellyConsumptionRepository;
}

public void process(OffsetDateTime startDate, OffsetDateTime endDate) {

List<ShellyInstantConsumption> instantConsumptions = getShellyConsumptionRepository.getShellyMqttPowerMessagesByRangeOfDates(
startDate, endDate);

persistShellyConsumptionRepository.persistInstantConsumptions(instantConsumptions);
}
}
Loading

0 comments on commit 302a458

Please sign in to comment.