This repository has been archived by the owner on Dec 29, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
139 lines (108 loc) · 4.05 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import RPi.GPIO as GPIO
import subprocess
import time
import logging
import sys
import constant
LOG = logging.getLogger(__name__)
class Sensor:
"""
Interface with an electronic sensor connected to the board.
This class is used to check if a sensor is active or not, and send the
associated request to Houdini.
int pin: GPIO number where the positive wire of the sensor is connected
str name_get: trailing part of the URL where an HTTP request is sent when
the sensor is activated
bool reverse: indicate if activated sensor makes GPIO.input() True
(default) or False (reverse = True)
"""
def __init__(self, pin, name_get, reverse=False):
self.pin = pin
self.name_get = name_get
self.reverse = reverse
self.activated = False
GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def read(self):
"""Return the status of the sensor as True or False."""
# Flip the result if the sensor is reversed
return bool(GPIO.input(self.pin)) ^ self.reverse
def get_request(self):
"""Send a signal to Houdini for this sensor."""
LOG.debug("get request send: %s\n", self.name_get)
subprocess.call([
"curl", "-m", "1", "-qX", "GET", "{}{}".format(
constant.URL_DST, self.name_get)
])
def check_run(self):
"""
Check if the sensor is active, and send a request to Houdini.
This method combines self.read and self.get_request in a simple
one-shot method.
"""
if self.read() and not self.activated:
LOG.debug("Activate %s sensor.\n", self.name_get)
self.activated = True
self.get_request()
class Game:
"""
One game instance.
This class provides an interface to manage the logic of a game.
Instanciate this class to create a new game instance, and run game.start
to wait for the start button signal.
"""
def __init__(self):
GPIO.setmode(GPIO.BCM)
self.sensors = {
'start': Sensor(constant.START_GPIO, "start", reverse=False),
'maya': Sensor(constant.MAYA_GPIO, "maya", reverse=True),
'console': Sensor(constant.CONSOLE_GPIO, "console", reverse=True),
'usine': Sensor(constant.USINE_GPIO, "usine", reverse=True)
}
def wait_start(self):
"""Do nothing until the start button is pressed, then exit."""
while not self.sensors['start'].read():
time.sleep(0.1)
self.sensors['start'].activated = True
LOG.info("Start button pressed.\n")
subprocess.call(
["curl", "-qX", "GET", "{}start".format(constant.URL_DST)])
time.sleep(5)
subprocess.call(
["curl", "-qX", "GET", "{}intro".format(constant.URL_DST)])
def run(self):
"""Wait for events to send triggers."""
while not self.is_complete():
LOG.debug("Check maya.\n")
self.sensors['maya'].check_run()
LOG.debug("Check console.\n")
self.sensors['console'].check_run()
LOG.debug("Check usine.\n")
self.sensors['usine'].check_run()
time.sleep(0.1)
def start(self):
"""
Start a new game.
Waits for the start signal, then listen for sensor update to trigger
the associated action.
"""
LOG.info("Start service.\n")
try:
LOG.debug("Wait for game start.\n")
self.wait_start()
LOG.debug("Game started.\n")
self.run()
finally:
LOG.info("Stop service.\n")
GPIO.cleanup()
def is_complete(self):
"""
Check if this Game instance still has something to do before next game.
Return True is Game is complete. False otherwise.
"""
return self.sensors['usine'].activated
if __name__ == "__main__":
LOG.setLevel(logging.INFO)
STDOUT_HANDLER = logging.StreamHandler(sys.stdout)
LOG.addHandler(STDOUT_HANDLER)
while True:
Game().start()