forked from AFMHZB/AFM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Epics_Control.py
45 lines (41 loc) · 1.58 KB
/
Epics_Control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import epics
import time
class Epics_Control:
def __init__(self):
self.observer = []
self.limit = 0.2
self.paused = False
self.writing = False
self.cur_state = epics.caget('MLSOPCCP:curState')
state = epics.PV('MLSOPCCP:curState')
state.add_callback(self.on_change)
self.cur_value = epics.caget('CUM1ZK3RP:rdCur')
current = epics.PV('CUM1ZK3RP:rdCur')
current.add_callback(self.on_change)
def bind_to(self, callback):
self.observer.append(callback)
def on_change(self, pvname=None, value=None, char_value=None, **kws):
if pvname == 'MLSOPCCP:curState':
if self.cur_state == value:
return
if self.cur_state == 'IDLE':
self.paused = True
self.send_message('epics_stop')
elif value == 'IDLE':
time.sleep(60)
self.send_message('epics_restart')
self.paused = False
self.cur_state = value
elif pvname == 'CUM1ZK3RP:rdCur':
if not self.paused:
if abs(value) > abs(self.cur_value * (1+self.limit)) or abs(value) < abs(self.cur_value * (1-self.limit)):
self.paused = True
self.send_message(('epics_error', (cur_value, value)))
self.cur_value = value
def send_message(self, text):
while self.writing:
time.sleep(0.01)
self.writing = True
for callback in self.observer:
callback(text)
self.writing = False