-
Notifications
You must be signed in to change notification settings - Fork 10
/
control1.py
75 lines (59 loc) · 2.03 KB
/
control1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import pickle
import paho.mqtt.client as mqtt
from threading import Thread
import time
broker_ip = input('Enter Broker ip: ')
broker_dict = {'user': 'mec', 'pw': 'password', 'sub_topic': 'control', 'ip': broker_ip}
class BrokerCom:
def __init__(self, user, pw, ip, sub_topic):
self.user = user
self.pw = pw
self.ip = ip
self.port = 1883
self.topic = sub_topic
self.client = mqtt.Client()
self.stopped = []
self.run = 1
def on_connect(self, connect_client, userdata, flags, rc):
print("Connected with Code :" + str(rc))
# Subscribe Topic from here
connect_client.subscribe(self.topic)
def on_message(self, message_client, userdata, msg):
data = pickle.loads(msg.payload) # ['start', []], ['stop': id]
if (data[0] == 'stop') and (data[1] not in self.stopped):
self.stopped.append(data[1])
print(f'{data[1]} has stopped!')
def publish(self, topic, data):
self.client.publish(topic, data)
def broker_loop(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.username_pw_set(self.user, self.pw)
self.client.connect(self.ip, self.port, 60)
self.client.loop_start()
while True:
if self.run == 0:
self.client.loop_stop()
self.client.disconnect()
break
def __del__(self):
print('Broker Communication Object Deleted!')
def publish():
input('start> ')
messenger.publish(topic=broker_dict['sub_topic'], data=pickle.dumps(['start', {1, 2, 3}]))
print('published')
def exp_control():
global messenger
messenger = BrokerCom(**broker_dict)
h1 = Thread(target=messenger.broker_loop)
h1.start()
time.sleep(1)
while True:
try:
publish()
except KeyboardInterrupt:
print('terminated')
messenger.run = 0
break
if __name__ == '__main__':
exp_control()