Skip to content
Snippets Groups Projects
tango_io.py 10.1 KiB
Newer Older
adb-xkc85723's avatar
adb-xkc85723 committed
import data.fodo_data_base as dat
import PyTango
from bs4 import BeautifulSoup
import urllib.request

class tangoIO():
    '''
    Class - provides an interface with TANGO (read/write magnets etc.)
    '''
    def __init__(self):
        self.element_dict = {}
        self.quad_dict = {}
        self.undulator_dict = {}
        self.lattice_config = None
        self.cavity_timing = {}
        self.cavity_phase = {}
        self.cavtimprefix = 'f/timing/rttrigger_mod_'
        self.cavphasuffix = '/mod/rfamp'
adb-xkc85723's avatar
adb-xkc85723 committed
        
    def init_beam_status(self):
        '''
        Link to TANGO DeviceProxy objects for various beam status items
        '''
        self.tango_beam_status = PyTango.DeviceProxy('f/misc/beam_status_f')
        self.get_beam_status_keys()

    def get_element_type(self, element):
        '''
        Deprecated (?)
        '''
        return element.type.lower()

    def create_tango_address(self, element, elem_type):
        '''
        Create TANGO address based on element type and name (i.e. making them
        lower case, including server location)
        
        :param element: the OCELOT element from the lattice
        :param elem_type: the type of element
        :return: str, full TANGO address
        '''
        self.id = element.id.lower()
        self.area = self.id.split('.')[0].split('_')[1].lower()
        if 'mbd' in self.area:
            if 'fel01' in self.id:
                self.area = 'mbd_fel01'
                self.id.replace('mbd', self.area)
                self.id = self.id[:-3] + '.' + self.id[-2:]
            elif 'fel02' in self.id:
                self.area = 'mbd_fel02'
                self.id.replace('mbd', self.area)
                self.id = self.id[:-3] + '.' + self.id[-2:]
            else:
                self.area = 'mbd'
                self.id = self.id[:-3] + '.' + self.id[-2:]
        if elem_type == 'quadrupole':
            self.tango_address = self.area + '/' + 'magnet' + '/' + self.id
            return self.tango_address
        elif elem_type == 'undulator':
            if 'fel01' in self.id:
                self.area = 'fel01'
                self.idstr = 'id'
            elif 'fel02' in self.id:
                self.area = 'fel02'
                if 's01' in self.id:
                    self.idstr = 'id_s01'
                elif 's02' in self.id:
                    self.idstr = 'id_s02'
            self.tango_address = self.area + '/' + self.idstr + '/' + self.id
            return self.tango_address
        elif elem_type == 'insertion_device':
            if 'fel01' in self.id:
                self.area = 'fel01'
            elif 'fel02' in self.id:
                self.area = 'fel02'
            self.tango_address = self.area + '/' + 'insertion_device' + '/' + \
            self.lattice_config[self.id.upper()].lower()
            return self.tango_address
        elif elem_type == 'cavity':
            self.area = self.lattice_config[self.id.upper()].lower()
            self.tango_address = self.area + '/' + 'mod' + '/' + \
            'llrf_'+self.area+'.01'
            self.cavtimprox = PyTango.DeviceProxy(self.cavtimprefix+self.area+'.01')
            try:
                self.cavphaprox = PyTango.DeviceProxy(self.area+self.cavphasuffix)
            except:
                self.cavphaprox = PyTango.DeviceProxy(self.area+self.cavphasuffix+'_'+self.area+'.01')
            self.cavity_timing.update({self.id.upper(): self.cavtimprox})
            self.cavity_phase.update({self.id.upper(): self.cavphaprox})
            return self.tango_address
adb-xkc85723's avatar
adb-xkc85723 committed

    def create_tango_quad_list(self, quads):
        '''
        Create list of TANGO DeviceProxy objects for the quads in the OCELOT
        lattice. Updates the \'quad_dict\' dictionary in the main data base
        
        :param quads: the list of quads from OCELOT
        '''
        for quad in quads:
            self.address = self.create_tango_address(
                    quad, elem_type='quadrupole')
            self.id = quad.id.upper()
            x = PyTango.DeviceProxy(self.address)
            dat.fodo_data_base.values['quad_dict'].update({self.id: x})
            dat.fodo_data_base.values['quads_prev'].update({self.id: getattr(x, 'Strength')})
            
    def create_tango_undulator_list(self, undulators):
        '''
        Create list of TANGO DeviceProxy objects for the undulators in the 
        OCELOT lattice. Updates the \'undulator_dict\' dictionary and the
        \'insertion_device_dict\' in the main data base
        
        :param undulators: the list of undulators from OCELOT
        '''
        for und in undulators:
            self.address = self.create_tango_address(
                    und, elem_type='undulator')
            self.id = und.id.upper()
            x = PyTango.DeviceProxy(self.address)
            dat.fodo_data_base.values['undulator_dict'].update(
                    {self.id: x})
Alexander Darius Brynes's avatar
Alexander Darius Brynes committed
            self.address = self.create_tango_address(
                    und, elem_type='insertion_device')
            self.id = und.id.upper()
            x = PyTango.DeviceProxy(self.address)
            dat.fodo_data_base.values['insertion_device_dict'].update(
                    {self.id: x})
            
    def create_tango_cavity_list(self, cavs):
        '''
        Create list of TANGO DeviceProxy objects for the cavities in the OCELOT
        lattice. Updates the \'cavity_dict\' dictionary in the main data base

        :param quads: the list of quads from OCELOT
        '''
        for cav in cavs:
            if not 'WD' in cav.id:
                self.address = self.create_tango_address(cav,
                                                         elem_type='cavity')
                self.id = cav.id.upper()
                x = PyTango.DeviceProxy(self.address)
                dat.fodo_data_base.values['cavity_dict'].update({self.id: x})
            
    def get_cavity_timing_dict(self):
        return self.cavity_timing
    
    def get_cavity_phase_dict(self):
        return self.cavity_phase
adb-xkc85723's avatar
adb-xkc85723 committed

    def get_tango_beam_status(self):
        '''
        Update the beam status items from the TANGO server
        '''
        for sta in self.beam_status:
            dat.fodo_data_base.values['beam_status'].update({sta: self.tango_beam_status.read_attribute(sta)})
            
    def get_tango_beam_sizes(self):
        '''
        Update the measured beam sizes from the TANGO server
        '''
        for size in self.beam_sizes:
            self.si = size.replace('ccd','').upper()
            dat.fodo_data_base.values['beam_sizes'].update({self.si:
                self.tango_beam_sizes.read_attribute(size)})
    
    def get_beam_status_keys(self):
        '''
        Get the names of the Twiss measurements at all locations, and the 
        measured beam energy
        
        :return: list, keys for reading beam status
        '''
        self.beam_status = []
        self.beam_status.append('DBD_Energy')
        self.twi = ['beta_X', 'beta_Y', 'alpha_X', 'alpha_Y']
        for li in ['OM_fel1', 'OM_fel2', 'Q_LH.04', 'Q_BC01.07', 'Q_TLS.02',
                   'Q_SFEL01.11', 'Q_SFEL02.08']:
adb-xkc85723's avatar
adb-xkc85723 committed
            for t in self.twi:
                self.beam_status.append(li + '_' + t)
                if 'Q_' in li:
                    self.beam_status.append(li + '_' + t + '_FELINO')
        self.beam_status.append('LH_Energy')
        self.beam_status.append('BC01_Energy')
        self.beam_status.append('BC02_Energy')
        self.beam_status.append('DBD_Energy')
        self.beam_status.append('FB_Charge_target')
        self.cavities = ['L1_1', 'L1_2', 'L1_X', 'L1_3', 'L1_4', 
                         'L2_1', 'L2_2', 'L2_3', 'L2_4', 
                         'L3_1', 'L3_2', 
                         'L4_1', 'L4_2', 'L4_3', 'L4_4', 'L4_5', 'L4_6']
        self.cavity_keys = ['RFamp', 'phase', 'weight']
        for cav in self.cavities:
            for key in self.cavity_keys:
                self.beam_status.append(cav + '_' + key)
adb-xkc85723's avatar
adb-xkc85723 committed
        return self.beam_status
    
    def refresh_optics(self):
        '''
        Refresh measured/simulated Twiss parameters at the measurement point.
        Then update these values in the main data base dictionary
        '''
        self.get_tango_beam_status()
        self.source = dat.fodo_data_base.values['twiss_source']
        if self.source == 'OM':
            if 'FEL02' in dat.fodo_data_base.values['line']:
                self.meas_point = 'OM_fel2'
            else:
                self.meas_point = 'OM_fel1'
        else:
            self.linest = dat.fodo_data_base.values['line_start']
            self.meas_point = self.lattice_config[self.linest + '_MEAS']
        if self.source == 'FELINO':
            self.suffix = '_FELINO'
adb-xkc85723's avatar
adb-xkc85723 committed
        else:
            self.suffix = ''
        self.lineen = dat.fodo_data_base.values['line_end']
        self.meas_point_end = self.lattice_config[self.lineen + '_MEAS']
adb-xkc85723's avatar
adb-xkc85723 committed
        self.beamstatus = dat.fodo_data_base.values['beam_status']
        dat.fodo_data_base.values['betax_meas'] = self.beamstatus[self.meas_point + '_beta_X' + self.suffix].value
        dat.fodo_data_base.values['betay_meas'] = self.beamstatus[self.meas_point + '_beta_Y' + self.suffix].value
        dat.fodo_data_base.values['alphax_meas'] = self.beamstatus[self.meas_point + '_alpha_X' + self.suffix].value
        dat.fodo_data_base.values['alphay_meas'] = self.beamstatus[self.meas_point + '_alpha_Y' + self.suffix].value
        dat.fodo_data_base.values['betax_meas_end'] = self.beamstatus[self.meas_point_end + '_beta_X'].value
        dat.fodo_data_base.values['betay_meas_end'] = self.beamstatus[self.meas_point_end + '_beta_Y'].value
        dat.fodo_data_base.values['alphax_meas_end'] = self.beamstatus[self.meas_point_end + '_alpha_X'].value
        dat.fodo_data_base.values['alphay_meas_end'] = self.beamstatus[self.meas_point_end + '_alpha_Y'].value
        if 'LH' in self.meas_point:
            dat.fodo_data_base.values['beam_energy'] = self.beamstatus['LH_Energy'].value
        elif ('BC1' in self.meas_point) or ('BC01' in self.meas_point):
            dat.fodo_data_base.values['beam_energy'] = self.beamstatus['BC01_Energy'].value
        else:
            dat.fodo_data_base.values['beam_energy'] = self.beamstatus['DBD_Energy'].value