Newer
Older
import data.fodo_data_base as dat
import PyTango
from bs4 import BeautifulSoup
import urllib.request
class tangoIO():
'''
Class - provides an interface with TANGO (read/write magnets etc.)
'''
def __init__(self):
self.element_dict = {}
self.quad_dict = {}
self.undulator_dict = {}
self.lattice_config = None
self.cavity_timing = {}
self.cavity_phase = {}
self.cavtimprefix = 'f/timing/rttrigger_mod_'
self.cavphasuffix = '/mod/rfamp'
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def init_beam_status(self):
'''
Link to TANGO DeviceProxy objects for various beam status items
'''
self.tango_beam_status = PyTango.DeviceProxy('f/misc/beam_status_f')
self.get_beam_status_keys()
def get_element_type(self, element):
'''
Deprecated (?)
'''
return element.type.lower()
def create_tango_address(self, element, elem_type):
'''
Create TANGO address based on element type and name (i.e. making them
lower case, including server location)
:param element: the OCELOT element from the lattice
:param elem_type: the type of element
:return: str, full TANGO address
'''
self.id = element.id.lower()
self.area = self.id.split('.')[0].split('_')[1].lower()
if 'mbd' in self.area:
if 'fel01' in self.id:
self.area = 'mbd_fel01'
self.id.replace('mbd', self.area)
self.id = self.id[:-3] + '.' + self.id[-2:]
elif 'fel02' in self.id:
self.area = 'mbd_fel02'
self.id.replace('mbd', self.area)
self.id = self.id[:-3] + '.' + self.id[-2:]
else:
self.area = 'mbd'
self.id = self.id[:-3] + '.' + self.id[-2:]
if elem_type == 'quadrupole':
self.tango_address = self.area + '/' + 'magnet' + '/' + self.id
return self.tango_address
elif elem_type == 'undulator':
if 'fel01' in self.id:
self.area = 'fel01'
self.idstr = 'id'
elif 'fel02' in self.id:
self.area = 'fel02'
if 's01' in self.id:
self.idstr = 'id_s01'
elif 's02' in self.id:
self.idstr = 'id_s02'
self.tango_address = self.area + '/' + self.idstr + '/' + self.id
return self.tango_address
elif elem_type == 'insertion_device':
if 'fel01' in self.id:
self.area = 'fel01'
elif 'fel02' in self.id:
self.area = 'fel02'
self.tango_address = self.area + '/' + 'insertion_device' + '/' + \
self.lattice_config[self.id.upper()].lower()
return self.tango_address
elif elem_type == 'cavity':
self.area = self.lattice_config[self.id.upper()].lower()
self.tango_address = self.area + '/' + 'mod' + '/' + \
'llrf_'+self.area+'.01'
self.cavtimprox = PyTango.DeviceProxy(self.cavtimprefix+self.area+'.01')
try:
self.cavphaprox = PyTango.DeviceProxy(self.area+self.cavphasuffix)
except:
self.cavphaprox = PyTango.DeviceProxy(self.area+self.cavphasuffix+'_'+self.area+'.01')
self.cavity_timing.update({self.id.upper(): self.cavtimprox})
self.cavity_phase.update({self.id.upper(): self.cavphaprox})
return self.tango_address
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def create_tango_quad_list(self, quads):
'''
Create list of TANGO DeviceProxy objects for the quads in the OCELOT
lattice. Updates the \'quad_dict\' dictionary in the main data base
:param quads: the list of quads from OCELOT
'''
for quad in quads:
self.address = self.create_tango_address(
quad, elem_type='quadrupole')
self.id = quad.id.upper()
x = PyTango.DeviceProxy(self.address)
dat.fodo_data_base.values['quad_dict'].update({self.id: x})
dat.fodo_data_base.values['quads_prev'].update({self.id: getattr(x, 'Strength')})
def create_tango_undulator_list(self, undulators):
'''
Create list of TANGO DeviceProxy objects for the undulators in the
OCELOT lattice. Updates the \'undulator_dict\' dictionary and the
\'insertion_device_dict\' in the main data base
:param undulators: the list of undulators from OCELOT
'''
for und in undulators:
self.address = self.create_tango_address(
und, elem_type='undulator')
self.id = und.id.upper()
x = PyTango.DeviceProxy(self.address)
dat.fodo_data_base.values['undulator_dict'].update(
{self.id: x})
self.address = self.create_tango_address(
und, elem_type='insertion_device')
self.id = und.id.upper()
x = PyTango.DeviceProxy(self.address)
dat.fodo_data_base.values['insertion_device_dict'].update(
{self.id: x})
def create_tango_cavity_list(self, cavs):
'''
Create list of TANGO DeviceProxy objects for the cavities in the OCELOT
lattice. Updates the \'cavity_dict\' dictionary in the main data base
:param quads: the list of quads from OCELOT
'''
for cav in cavs:
if not 'WD' in cav.id:
self.address = self.create_tango_address(cav,
elem_type='cavity')
self.id = cav.id.upper()
x = PyTango.DeviceProxy(self.address)
dat.fodo_data_base.values['cavity_dict'].update({self.id: x})
def get_cavity_timing_dict(self):
return self.cavity_timing
def get_cavity_phase_dict(self):
return self.cavity_phase
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_tango_beam_status(self):
'''
Update the beam status items from the TANGO server
'''
for sta in self.beam_status:
dat.fodo_data_base.values['beam_status'].update({sta: self.tango_beam_status.read_attribute(sta)})
def get_tango_beam_sizes(self):
'''
Update the measured beam sizes from the TANGO server
'''
for size in self.beam_sizes:
self.si = size.replace('ccd','').upper()
dat.fodo_data_base.values['beam_sizes'].update({self.si:
self.tango_beam_sizes.read_attribute(size)})
def get_beam_status_keys(self):
'''
Get the names of the Twiss measurements at all locations, and the
measured beam energy
:return: list, keys for reading beam status
'''
self.beam_status = []
self.beam_status.append('DBD_Energy')
self.twi = ['beta_X', 'beta_Y', 'alpha_X', 'alpha_Y']
for li in ['OM_fel1', 'OM_fel2', 'Q_LH.04', 'Q_BC01.07', 'Q_TLS.02',
'Q_SFEL01.11', 'Q_SFEL02.08']:
for t in self.twi:
self.beam_status.append(li + '_' + t)
if 'Q_' in li:
self.beam_status.append(li + '_' + t + '_FELINO')
self.beam_status.append('LH_Energy')
self.beam_status.append('BC01_Energy')
self.beam_status.append('BC02_Energy')
self.beam_status.append('DBD_Energy')
self.beam_status.append('FB_Charge_target')
self.cavities = ['L1_1', 'L1_2', 'L1_X', 'L1_3', 'L1_4',
'L2_1', 'L2_2', 'L2_3', 'L2_4',
'L3_1', 'L3_2',
'L4_1', 'L4_2', 'L4_3', 'L4_4', 'L4_5', 'L4_6']
self.cavity_keys = ['RFamp', 'phase', 'weight']
for cav in self.cavities:
for key in self.cavity_keys:
self.beam_status.append(cav + '_' + key)
return self.beam_status
def refresh_optics(self):
'''
Refresh measured/simulated Twiss parameters at the measurement point.
Then update these values in the main data base dictionary
'''
self.get_tango_beam_status()
self.source = dat.fodo_data_base.values['twiss_source']
if self.source == 'OM':
if 'FEL02' in dat.fodo_data_base.values['line']:
self.meas_point = 'OM_fel2'
else:
self.meas_point = 'OM_fel1'
else:
self.linest = dat.fodo_data_base.values['line_start']
self.meas_point = self.lattice_config[self.linest + '_MEAS']
if self.source == 'FELINO':
self.suffix = '_FELINO'
self.lineen = dat.fodo_data_base.values['line_end']
self.meas_point_end = self.lattice_config[self.lineen + '_MEAS']
dat.fodo_data_base.values['betax_meas'] = self.beamstatus[self.meas_point + '_beta_X' + self.suffix].value
dat.fodo_data_base.values['betay_meas'] = self.beamstatus[self.meas_point + '_beta_Y' + self.suffix].value
dat.fodo_data_base.values['alphax_meas'] = self.beamstatus[self.meas_point + '_alpha_X' + self.suffix].value
dat.fodo_data_base.values['alphay_meas'] = self.beamstatus[self.meas_point + '_alpha_Y' + self.suffix].value
dat.fodo_data_base.values['betax_meas_end'] = self.beamstatus[self.meas_point_end + '_beta_X'].value
dat.fodo_data_base.values['betay_meas_end'] = self.beamstatus[self.meas_point_end + '_beta_Y'].value
dat.fodo_data_base.values['alphax_meas_end'] = self.beamstatus[self.meas_point_end + '_alpha_X'].value
dat.fodo_data_base.values['alphay_meas_end'] = self.beamstatus[self.meas_point_end + '_alpha_Y'].value
if 'LH' in self.meas_point:
dat.fodo_data_base.values['beam_energy'] = self.beamstatus['LH_Energy'].value
elif ('BC1' in self.meas_point) or ('BC01' in self.meas_point):
dat.fodo_data_base.values['beam_energy'] = self.beamstatus['BC01_Energy'].value
else:
dat.fodo_data_base.values['beam_energy'] = self.beamstatus['DBD_Energy'].value