Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Sensor Module: Frequency Counter #378

Merged
merged 4 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ Hardware support is provided by specific GPIO, Sensor and Stream modules. It's e
- PMS5003 particulate sensor (`pms5003`)
- SHT40/SHT41/SHT45 temperature and humidity sensors (`sht4x`)
- YF-S201 flow rate sensor (`yfs201`)
- FREQUENCYCOUNTER Counts pulses from GPIOs and return the frequency in Hz (frequencycounter)
- FLOWSENSOR generic flow rate sensor like YF-S201 or YF-DN50 (`flowsensor`)


### Streams

- Serial port (`serial`)
Expand Down
93 changes: 93 additions & 0 deletions mqtt_io/modules/sensor/frequencycounter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""

Frequencycounter: Generic Frequency Counter

Example configuration:

sensor_modules:
- name: frequency
module: frequencycounter

sensor_inputs:
- name: flow_rate1
module: frequency
pin: 0
digits: 0
interval: 10

"""

from typing import Dict
from ...types import CerberusSchemaType, ConfigType, SensorValueType
from . import GenericSensor

REQUIREMENTS = ("gpiozero",)


class FREQUENCYCOUNTER:
"""
Frequency Counter class
Multiple instances support multiple sensors on different pins
"""

def __init__(self, gpiozero, name: str, pin: int) -> None: # type: ignore[no-untyped-def]
self.name = name
self.pin = gpiozero.DigitalInputDevice(pin)
self.pin.when_activated = self.count_pulse
self.count = 0

def count_pulse(self) -> None:
"""Increment pulse count."""
self.count += 1

def reset_count(self) -> None:
"""Reset pulse count."""
self.count = 0

def frequency(self, sample_window: int) -> float:
"""
sample_window is in seconds, so hz is pulse_count / sample_window
"""
hertz = self.count / sample_window
return hertz

def get_value(self, interval: int) -> float:
"""Return frequency over interval seconds and reset count."""
frequency = self.frequency(interval)
self.reset_count()
return frequency


class Sensor(GenericSensor):
"""
Frequency Counter
"""

SENSOR_SCHEMA: CerberusSchemaType = {
"pin": {
"type": 'integer',
"required": True,
"empty": False,
},
"interval": {
"type": 'integer',
"required": True,
"empty": False,
}
}

def setup_module(self) -> None:
# pylint: disable=import-outside-toplevel,import-error
import gpiozero # type: ignore

self.gpiozero = gpiozero
self.sensors: Dict[str, FREQUENCYCOUNTER] = {}

def setup_sensor(self, sens_conf: ConfigType) -> None:
sensor = FREQUENCYCOUNTER(
gpiozero=self.gpiozero, name=sens_conf["name"], pin=sens_conf["pin"]
)
self.sensors[sensor.name] = sensor

def get_value(self, sens_conf: ConfigType) -> SensorValueType:
return self.sensors[sens_conf["name"]].get_value(sens_conf["interval"])
Loading