Skip to content

Commit

Permalink
feat(api): hardware info, get stt available, trigger wake listening
Browse files Browse the repository at this point in the history
  • Loading branch information
duhow committed Dec 12, 2024
1 parent 5468395 commit 2cc3c54
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 10 deletions.
82 changes: 82 additions & 0 deletions api/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re

class ConfigManager:
def __init__(self, file_path: str):
Expand Down Expand Up @@ -78,3 +79,84 @@ def set(self, key, value):
lines.append(new_value)
with open(self.file_path, 'w') as f:
f.writelines(lines)

class ConfigUci:
def __init__(self, file_path: str):
self.file_path = file_path
if not os.path.exists(self.file_path):
raise FileNotFoundError(f"File not found: {self.file_path}")
self.main_section = None
self.data = dict()
self.read()

def __getattr__(self, key):
if self.main_section:
return self.data[self.main_section].get(key.lower())
return self.data.get(key)

def _process_value(self, value):
""" Transform value from string to type """
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
if value.lower() == 'true':
return True
if value.lower() == 'false':
return False
if value.isdigit():
return int(value)
return value

def read(self):
result = {}
current_section = None

with open(self.file_path, 'r') as f:
for line in f:
line = line.strip()
if line.startswith('#') or not line:
continue
if line.startswith('config'):
current_section = line.split()[1].strip("'")
if self.main_section is None:
self.main_section = current_section
result[current_section] = {}
elif line.startswith('option') and current_section:
match = re.match(r"option\s+'?(\w+)'?\s+'(.+)'", line)
if not match:
continue
key, value = match.groups()
key = key.lower()
value = self._process_value(value.strip("'"))
result[current_section][key] = value

self.data = result

def to_dict(self) -> dict:
if self.main_section:
return self.data[self.main_section]
return self.data

def get(self, key: str, section: str = ""):
if self.main_section and not section:
section = self.main_section
return self.data[section].get(key)

# def set(self, section, key, value):
# config_dict = self.to_dict()
# if section not in config_dict:
# config_dict[section] = {}
# config_dict[section][key] = value
# self._write_config(config_dict)
#
# def _write_config(self, config_dict):
# lines = []
# for section, options in config_dict.items():
# lines.append(f"config '{section}'\n")
# for key, value in options.items():
# if isinstance(value, bool):
# value = str(value).lower()
# if ' ' in value:
# value = f'"{value}"'
# lines.append(f"\toption {key} '{value}'\n")
# with open(self.file_path, 'w') as f:
# f.writelines(lines)
2 changes: 2 additions & 0 deletions api/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

config_listener = '/data/listener'
config_tts = '/data/tts.conf'
mico_version = '/usr/share/mico/version'
board_info = '/data/etc/binfo'

wakewords_porcupine = '/usr/share/porcupine/keywords'

Expand Down
97 changes: 88 additions & 9 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
import re
import requests
import subprocess
import signal
import json
import base64
import time

from config import ConfigManager
from utils import get_ip_address
from config import ConfigManager, ConfigUci
from utils import get_ip_address, get_wifi_mac_address, get_bt_mac_address, get_device_id
import const

hostname = os.uname()[1]
Expand All @@ -14,14 +18,38 @@

config = ConfigManager(const.config_listener)
config_tts = ConfigManager(const.config_tts)
system_version = ConfigUci(const.mico_version)

@app.route('/')
def index():
return app.send_static_file('index.html')
def get_hass_token() -> str:
token = config.HA_TOKEN
if not token:
return ""
try:
token_parts = token.split('.')
if len(token_parts) != 3:
raise ValueError('Invalid HA token format')

payload = token_parts[1]
payload += '=' * (4 - len(payload) % 4) # Add padding if necessary
decoded_payload = json.loads(base64.urlsafe_b64decode(payload).decode('utf-8'))

if decoded_payload['exp'] < time.time():
if config.HA_REFRESH_TOKEN:
if home_assistant_refresh_token():
token = config.HA_TOKEN
else:
raise ValueError('Failed to refresh token')
else:
raise ValueError('Token expired and no refresh token available')
return token
except Exception as e:
raise ValueError(f'Failed to get HA token: {e}')

@app.route('/')
@app.route('/app.js')
def app_js():
return app.send_static_file('app.js')
def files():
file = 'index.html' if request.path == '/' else request.path.lstrip('/')
return app.send_static_file(file)

@app.get('/config')
def get_config():
Expand All @@ -43,15 +71,37 @@ def set_config():
updated = True

if request.form.get('tts_language'):
config_tts.LANGUAGE = request.form.get('tts_language')
updated = True
if config_tts.set("LANGUAGE", request.form.get('tts_language')) is not True:
updated = True

if updated:
service_path = os.path.join(const.services_dir, 'listener')
os.system(f'{service_path} reload')

return redirect('/', code=302)

@app.get('/config/stt')
def get_stt_providers():
""" Get all state entities from HA, and filter for STT ones only """
token = get_hass_token()
url = f'{config.HA_URL}/api/states'
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json',
}

req = requests.get(url, headers=headers)
req.raise_for_status()

providers = list()
data = req.json()
for entity in data:
if entity['entity_id'].startswith('stt.'):
entry = {"entity_id": entity['entity_id'], "name": entity['attributes'].get('friendly_name', entity['entity_id'])}
providers.append(entry)

return jsonify({'data': {'providers': providers, 'current': config.get('HA_STT_PROVIDER') or None}})

@app.get('/config/wakewords')
def get_wakewords():
""" Get the wakewords from Porcupine, remove the file name, and only get the wakeword name """
Expand Down Expand Up @@ -94,6 +144,21 @@ def parse_avahi_output(output):
except Exception as e:
return jsonify({'hostname': hostname, 'error': str(e)}), 500

@app.get('/device/info')
def device_info():
data = {
'hostname': hostname,
'ip': speaker_ip,
'model': system_version.HARDWARE,
'serial_number': get_device_id(),
'wifi': get_wifi_mac_address(),
'bluetooth': get_bt_mac_address(),
'version': system_version.to_dict(),
}
response = jsonify({'data': data})
response.cache_control.max_age = 3600
return response

@app.route('/mute')
@app.route('/unmute')
def manage_listener():
Expand All @@ -103,6 +168,20 @@ def manage_listener():
os.system(f'{silent} {service} {action}')
return ""

@app.route('/wake')
def trigger_wake():
process_name = '/usr/bin/porcupine'
try:
result = subprocess.run(['pgrep', '-x', process_name], capture_output=True, text=True)
if result.returncode == 0:
pid = result.stdout.strip()
os.kill(int(pid), signal.SIGINT)
return "", 200
else:
return "", 425
except Exception as e:
return jsonify({'error': str(e)}), 500

@app.post('/auth')
def home_assistant_auth():
ha_url = request.form.get('url', '').rstrip('/')
Expand Down
41 changes: 40 additions & 1 deletion api/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,50 @@
import socket
import fcntl
import struct
import subprocess
import re

def get_ip_address(ifname):
def get_ip_address(ifname) -> str:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15].encode('utf-8'))
)[20:24])

def get_unify_key(key: str) -> str:
try:
with open(f'/sys/class/unifykeys/name', 'w') as f:
f.write(key)
with open(f'/sys/class/unifykeys/read', 'r') as f:
return f.read().strip()
except IOError as e:
pass
return ""

def get_device_id() -> str:
did = get_unify_key('deviceid')
if did:
return did
return ""

def get_wifi_mac_address() -> str:
mac = get_unify_key('mac_wifi')
if mac:
return mac.upper()
return ""

def get_bt_mac_address() -> str:
mac = get_unify_key('mac_bt')
if mac:
return mac.upper()

try:
result = subprocess.run('hciconfig hci0'.split(' '), capture_output=True, text=True)
if result.returncode == 0:
match = re.search(r'BD Address: ([0-9A-F:]{17})', result.stdout)
if match:
return match.group(1)
except Exception as e:
pass
return ""

0 comments on commit 2cc3c54

Please sign in to comment.