forked from NSLS-II-CHX/eiger_tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cam_ctrl.py
107 lines (94 loc) · 5.02 KB
/
cam_ctrl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import sys
sys.path.insert(0,"/usr/local/dectris/python")
import dectris.albula
from eigerclient import DEigerClient
import requests
class Eiger(object):
def __init__(self, ipaddr="164.54.124.191"):
self.ipaddr = ipaddr
self.photon_energy = 9000
self.threshold = None
self.flatfield = 1
self.num_img = 1
self.count_time = 1
self.seq_id = 0
self.initialized = False
def initialize(self, photon_energy=9000, threshold=9000/2.0):
self.eigerclient = DEigerClient(host=self.ipaddr)
# NOTE: initialize twice to get rid of the 'zebras'
print "Initialize the EIGER 1st time"
self.eigerclient.sendDetectorCommand("initialize")
self.eigerclient.setDetectorConfig("photon_energy", photon_energy)
self.photon_energy = photon_energy
self.eigerclient.setDetectorConfig("threshold_energy", threshold) # added
self.threshold = threshold # added
# take short initial exposure to prep flatfield calibration.
# NOTE: assumes (and recommends !!!) shutter is closed
self.expose(0.001, 1)
print "Initialize the EIGER 2nd time"
self.eigerclient.sendDetectorCommand("initialize")
self.eigerclient.setDetectorConfig("photon_energy", photon_energy)
self.photon_energy = photon_energy
self.eigerclient.setDetectorConfig("threshold_energy", threshold) # added
self.threshold = threshold # added
self.initialized = True
self.expose(0.001, 1)
print "EIGER successfully Initialized for " + str(photon_energy/1000.0) + "keV, using threshold: "+str(threshold/1000.0) +"keV"
print "No Zebras expected! :-)"
def is_initialized(self):
return self.initialized
def set_photon_energy(self, energy):
self.eigerclient.setDetectorConfig("photon_energy", photon_energy)
self.photon_energy = photon_energy
def set_threshold(self, threshold): #added
self.eigerclient.setDetectorConfig("threshold_energy", threshold) #added
self.threshold = threshold #added
# method assumes set_photon_energy() has been called previously
# try: method assumes that set_threshold() has been called previously
def expose(self, exposure_time, num_img=1, threshold = None, flatfield = 1):
if (threshold == None):
self.eigerclient.setDetectorConfig("threshold_energy",self.threshold) # changed from 'self.photon_energy/2.0'
else:
self.eigerclient.setDetectorConfig("threshold_energy",threshold)
self.eigerclient.setDetectorConfig("flatfield_correction_applied",flatfield)
self.eigerclient.setDetectorConfig("nimages",num_img)
if (num_img == 1):
self.eigerclient.setDetectorConfig("count_time",exposure_time)
else:
self.eigerclient.setDetectorConfig("frame_time",exposure_time)
self.eigerclient.setDetectorConfig("count_time",exposure_time-0.000020)
self.seq_id = self.eigerclient.sendDetectorCommand("arm")['sequence id']
print "Detector triggered " + str(num_img) + " image(s) of " + str(exposure_time) + "s"
self.eigerclient.sendDetectorCommand("trigger")
self.eigerclient.sendDetectorCommand("disarm")
print "data recorded"
ct = self.eigerclient.detectorConfig("count_time")
ft = self.eigerclient.detectorConfig("frame_time")
thresh = self.eigerclient.detectorConfig("threshold_energy")
print "used frame time: " + str(ft['value']) +"s"
print "used count time: " + str(ct['value']) +"s"
print "used threshold : " + str(thresh['value']/1000.0) +"keV"
def download_data(self, storage_path, show_image=1):
file_name = self.eigerclient.fileWriterConfig("name_pattern")['value'].replace("$id",str(self.seq_id))
# FIXME -- put some f'ing error checking in here!!
fm = requests.get("http://{0}/data/{1}_master.h5".format(self.ipaddr,file_name))
open("{0}/{1}_master.h5".format(storage_path,file_name),"wb").write(fm.content)
requests.delete("http://{0}/data/{1}_master.h5".format(self.ipaddr,file_name))
print "saved master file: " + "{0}/{1}_master.h5".format(storage_path,file_name)
id=0
while True:
fd = requests.get("http://{0}/data/{1}_data_{2:0>6}.h5".format(self.ipaddr,file_name,id))
if not fd.ok: break
print "saved data","{0}/{1}_data_{2:0>6}.h5".format(storage_path,file_name,id)
open("{0}/{1}_data_{2:0>6}.h5".format(storage_path,file_name,id),"wb").write(fd.content)
requests.delete("http://{0}/data/{1}_data_{2:0>6}.h5".format(self.ipaddr,file_name,id))
id +=1
if show_image == 1:
try:
# opens albula
m = dectris.albula.openMainFrame()
s = m.openSubFrame()
s.loadFile("{0}/{1}_master.h5".format(storage_path,file_name))
except:
print "albula got closed"
eiger = Eiger(ipaddr="164.54.124.191")