-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.py
75 lines (64 loc) · 2.32 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import socketserver
import sys
import threading
from datetime import datetime
import json
from loguru import logger
from config import HOST, PORT, TOKEN, AVAILABLE_COMMAND
# Test Connection Using `nc`: $ nc [hostname] [port]
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
cur = threading.current_thread()
logger.info(
f"[{datetime.utcnow()}] Client connected from {self.request.getpeername()} and [{cur.name}] is handling."
)
while True:
indata = self.request.recv(1024).strip()
if not indata:
# connection closed
self.request.close()
logger.info("client closed connection.")
break
extracted_data = self.extract_receive_data(indata.decode())
if extracted_data:
token, command = extracted_data
if not extracted_data or token not in TOKEN:
logger.warning("Data or Token Error")
self.request.send("False".encode())
break
if send_command(command):
self.request.send("True".encode())
else:
self.request.send("False".encode())
break
@staticmethod
def extract_receive_data(receive_data):
try:
data = json.loads(receive_data)
return data["token"], data["command"]
except Exception as err:
logger.warning(f"[DATA EXTRACT Failed] {err}")
return False
def send_command(command):
logger.info(f"[CONTROL] command: {command}")
if command in AVAILABLE_COMMAND:
cmd = f"irsend SEND_ONCE aircon {command}"
returned = os.system(cmd)
if returned == 0:
logger.success("[CONTROL] Success")
return True
logger.error(f"[CONTROL] Failed, error code: {returned}")
return False
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
allow_reuse_address = True
if __name__ == "__main__":
SERVER = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
logger.info(f"server start at: {HOST}:{PORT}")
try:
SERVER.serve_forever()
except KeyboardInterrupt:
sys.exit(0)