*** Wartungsfenster jeden ersten Mittwoch vormittag im Monat ***

Skip to content
Snippets Groups Projects
labscript_devices_ADwin_modules.py 25.7 KiB
Newer Older
#####################################################################
#                                                                   #
# ADwinProII/labscript_devices_ADwin_modules.py                     #
#                                                                   #
# Copyright 2022, TU Vienna                                         #
#                                                                   #
# Implementation of the ADwin-Pro II for the labscript-suite,       #
# used in the Léonard lab for Experimental Quantum Information.     #
#                                                                   #
#####################################################################

from labscript import \
    Device, IntermediateDevice, AnalogOut, DigitalOut, AnalogIn, \
    LabscriptError, bitfield, set_passed_properties, fastflatten
from labscript_utils import dedent

from . import PROCESSDELAY_TiCo, TiCo_start_cycles, module_start_index, MAX_TICO_EVENTS, PIDNO
from .ADwin_utils import ADC
    
class _ADwinCard(IntermediateDevice):
    """Dummy Class with functionality shared by all ADwin Modules"""
    def __init__(self, name, parent_device, module_address, TiCo=False, PROCESSDELAY_TiCo=None, **kwargs):
        """Creates ADwin module and if given a TiCo PseudoClock.

        Parameters
        ----------
        name : str
            Python variable name to assign to device.
        parent_device : ClockLine
            Parent ClockLine device.
        module_address : int
            Number of the module, set in the ADwin configuration.
        TiCo : bool, optional
            Set is the module is timed by its own TiCo.
        PROCESSDELAY_TiCo : int, optional
            If TiCo=True, set the process cycle time.
        **kwargs : dict, optional
            Passed to IntermediateDevice.__init__
        """
        if not type(parent_device).__name__ == "ADwinProII":
            raise LabscriptError(
                f"The ADwin module {name} needs to be created with ADwin-pro II device as parent, not {parent_device}!"
                )
        self.module_address = module_address
        self.TiCo = TiCo
        self.PROCESSDELAY = PROCESSDELAY_TiCo or parent_device.PROCESSDELAY
        self.adwin_name = parent_device.name
        if TiCo:
            self._TiCo = parent_device.add_TiCo(name,module_address,PROCESSDELAY_TiCo)
            clockline = parent_device.TiCos[name].clockline
        else:
            clockline = parent_device._clockline_T12
        super().__init__(name, clockline, **kwargs)
    
    def do_checks(self):
        pass # TODO: Are any unversal checks necessary? 

    def add_device(self, device, num_channels):
        """Calls IntermediateDevice.add_device and checks if channel is allowed.
        
        Parameters
        ----------
        device : `labscript.Device`
            Child device to add, e.g. Output
        num_channels : int
            Number of channels at the module, indexed [1,num_channels]
        """
        try:
            if not 1 <= int(device.connection) <= num_channels:
                raise LabscriptError(f"Connection of {device.name} must be between in [1,{num_channels}].")
        except ValueError:
            raise LabscriptError(
                f"Connection of {device.name} must be a number (of type string!), not {device.connection}."
                )
        IntermediateDevice.add_device(self,device)
class ADwinAnalogOut(AnalogOut):
    description = "Analog Output of ADwin-Pro II AOut-8/16"

    @set_passed_properties(property_names={"connection_table_properties": ["limits"]})
            self, name, parent_device, connection, limits=(-10,10), 
            unit_conversion_class=None, unit_conversion_parameters=None, default_value=0, **kwargs
        ):
        
        AnalogOut.__init__(
                self, name, parent_device, connection, limits, 
                unit_conversion_class, unit_conversion_parameters, default_value, **kwargs
        )
        self.PID = {} # instructions for PID settings

    def init_PID(self,pid_no,P=0,I=0,D=0,limits=None):
        """Set parameters for PID once at beginning of shot.
            Channel of analog input for error siganl of PID feedback.
        P : float
            Proportional parameter
        I : float
            Integration parameter
        D : float
            Differential parameter
        limits : tuple of float, optional
            Limits for output voltage, defaults to output limits
        """
        if hasattr(self,"pid_no"):
            raise NotImplementedError("Only one set of PID parameters per channel is implemented.")
        if limits is None:
            # Use the Output limits if there are none specified here.
            limits = self.limits
        if limits[0]<self.limits[0] or limits[1]>self.limits[1]:
            raise LabscriptError(f"Limits of {self.name} with PID must not be larger than channel limits {self.limits}!")

        if isinstance(pid_no,AnalogIn) and isinstance(pid_no.parent_device,_ADwinCard):
            pid_no = int(pid_no.connection) + pid_no.parent_device.start_index
        self.pid_no = pid_no
        self.P = P
        self.I = I
        self.D = D
        self.PID_min = limits[0]
        self.PID_max = limits[1]


    def set_PID(self,t,pid_no,set_output=0):
        """(De-)activate PID for analog output, or change settings
        
        Parameters
        ----------
        t : float
            Time when to apply the PID settings
        pid_no : int or `AnalogIn` or None
            Channel of analog input for error siganl of PID feedback.
            If `None` PID is deactivated.
        set_value : float or "last"
            When the PID is turned on, 'set_value' is the initially chosen output value,
            'last' means that the I value from the previous PID is taken.
            When the PID is turned off, 'set_value' is programmed as the new output/target value,
            'last' means that the output from the PID loop is kept as 'set_target'.
        # Error check of bounds for set_output
        if set_output!="last":
            if set_output<self.PID_min or set_output>self.PID_max:
                raise LabscriptError(
                    f"{self.name}: PID 'set_output={set_output}' must be within ({self.PID_min},{self.PID_max})"
                )
                "start": set_output,
            # If we don't keep the PID output, set the output to the set_value
            if set_output!="last":
                self.constant(t,set_output)
        elif (isinstance(pid_no,AnalogIn) and isinstance(pid_no.parent_device,_ADwinCard)) \
                or isinstance(pid_no,int):
            self.PID[t] = {
                "start": set_output,
            # TODO: Do we need scale factors for setting a PID with integer?
            msg = f"""Setting a PID is only possible with an AnalogIn object that is
                      child of an ADwin module, or the integer number of the PID."""
            raise LabscriptError(dedent(msg))
Daniel Dux's avatar
Daniel Dux committed

        """Shortcut to generate raw output from ramps without collecting all change times from pseudoclock."""
        
        # Call function to create self.times and self.instructions. We also make
        # sure the output at the end of the shot is stored in the output table.
        times = self.times.copy() 
        if self.pseudoclock_device.stop_time not in times:
            times.append(self.pseudoclock_device.stop_time)
        self.make_timeseries(times)
        # We first have to collect all times for the ramps, then we can expand the output values.
        all_times = []
        flat_all_times_len = 0
        for i,t in enumerate(times):
            if isinstance(self.timeseries[i],dict): # Ramps are stored in dictionarys
                start = np.round(self.timeseries[i]['initial time'],9)
                end = np.round(self.timeseries[i]['end time'],9)
                    start,end,
                    int(np.round(self.timeseries[i]['clock rate'] * (end-start))),
                if ramp_times.size == 0:
                    raise LabscriptError(f"The ramp of {self.name} has no change times, increase the ramp duration or samplerate.")
                all_times.append(ramp_times)
                flat_all_times_len += ramp_times.size
            else: 
                all_times.append(t)
                flat_all_times_len += 1
        self.expand_timeseries(all_times,flat_all_times_len)
        # For the output table, we need all times flattened.
        self.all_times = fastflatten(all_times,float)

    def add_instruction(self,time,instruction,units=None):
        # Overwrite Output.add_instruction without limit check, becasue the value can be off-limits when this is the target value of the PID
        limits_temp = self.limits
        if hasattr(self,"pid_no"):
            self.limits = (-10,10)
        super().add_instruction(time,instruction,units)
        self.limits = limits_temp
    def expand_timeseries(self,all_times,flat_all_times_len):
        # Overwrite Output.add_instruction without limit check, becasue the value can be off-limits when this is the target value of the PID
        limits_temp = self.limits
        if hasattr(self,"pid_no"):
            self.limits = (-10,10)
        super().expand_timeseries(all_times,flat_all_times_len)
        self.limits = limits_temp

class ADwinAnalogIn(AnalogIn):
    """Analog Input for use with ADwin Pro II Input modules."""
    description = 'ADwin Analog Input'
    
   
    def acquire(self,label,start_time,end_time,wait_label='',scale_factor=None,units=None,storage_rate=None):
        """Command an acquisition for this input.

        Args:
            label (str): Unique label for the acquisition. Used to identify the saved trace.
            start_time (float): Time, in seconds, when the acquisition should start.
            end_time (float): Time, in seconds, when the acquisition should end.
            wait_label (str, optional): 
            scale_factor (float): Factor to scale the saved values by.
            units: Units of the input, consistent with the unit conversion class.
            storage_rate (int): Rate of data stored in the hdf5 file during shot, defaults to previously set value or ADwin processdelay.

        Returns:
            float: Duration of the acquistion, equivalent to `end_time - start_time`.
        """
        if storage_rate is not None:
        return super().acquire(label,start_time,end_time,wait_label='',scale_factor=None,units=None)
# There are 32 digital outputs on a card (DIO-32-TiCo)
class ADwinDIO32(_ADwinCard):
    description = 'ADWin-Pro II DIO32-TiCo'
    digital_dtype = np.uint32
    n_digitals = 32
    allowed_children = [DigitalOut] # DigitalIn ?

    @set_passed_properties(
        property_names={
            "connection_table_properties": ["module_address", "PROCESSDELAY_TiCo","num_DO", "num_DI", "DO_ports", "DI_ports"],
            "device_properties": [],
        }
    )
    def __init__(
            self, name, parent_device, module_address, 
            TiCo=True, PROCESSDELAY_TiCo=PROCESSDELAY_TiCo, 
            DO_ports=[], DI_ports=[], **kwargs):
        _ADwinCard.__init__(self, name, parent_device, module_address, TiCo, PROCESSDELAY_TiCo, **kwargs)
        
        self.DO_ports = DO_ports
        self.DI_ports = DI_ports
        if not DI_ports:
            self.DO_ports = [str(i) for i in range(1,33)]
        self.num_DO = len(self.DO_ports)
        self.num_DI = len(self.DI_ports)
    def add_device(self, device):
        _ADwinCard.add_device(self, device, self.num_DO) # TODO: DigitalIn ?
        # Raise an error if any DigitalOut is changed right after start, 
        # because the TiCo is only started after some cycles of the ADwin.
        times = self.digital_data["n_cycles"]
        if np.any((times>0) & (times<=TiCo_start_cycles)):
            raise LabscriptError(
                f"Digital Outputs of {self.name} cannnot change between 0 and {TiCo_start_cycles*self._TiCo.clock_resolution*1e6:.0f}µs."
            )
        # Check lenght of digital output array
        TiCo_events = self.digital_data["n_cycles"].size
        if  TiCo_events > MAX_TICO_EVENTS:
            raise LabscriptError(
                f"The number of outputs of {self.name} ({TiCo_events}) is larger than the array size in the TiCo ({MAX_TICO_EVENTS})!"
            )

    def generate_code(self, hdf5_file):
        Device.generate_code(self, hdf5_file)
        outputs, outputs_by_clockline = self._TiCo.get_outputs_by_clockline() 
        self._TiCo.parent_device.offset_instructions_from_trigger(outputs)
        all_change_times,_ = self._TiCo.collect_change_times(outputs,outputs_by_clockline)
        outputarray = [np.zeros(len(all_change_times),dtype=self.digital_dtype)]*self.n_digitals
        
        for output in outputs:
            output.make_timeseries(all_change_times)
            channel = int(output.connection) - 1
            outputarray[channel] = np.array(output.timeseries,dtype=self.digital_dtype)
        
        bits = bitfield(outputarray, dtype=self.digital_dtype)
        
        digital_dtypes = [("n_cycles",np.int32), ("bitfield",np.uint32)]
        self.digital_data = np.empty(len(all_change_times), dtype=digital_dtypes)
        self.digital_data["n_cycles"] = np.array(all_change_times) / self._TiCo.clock_resolution
        self.digital_data["bitfield"] = bits



# Voltages are specified with a 16 bit unsigned integer, mapping the range [-10,10) volts.
# There are 8 analog outputs on a card (AOut-8/16)    
Schabbauer, Johannes's avatar
Schabbauer, Johannes committed
    device_dtype = np.float64
    resolution_bits=16
    min_V = -10
    max_V = 10
    step_size = (max_V-min_V)/2**resolution_bits
Schabbauer, Johannes's avatar
Schabbauer, Johannes committed

    @set_passed_properties(
        property_names={
            "connection_table_properties": ["module_address","adwin_name","num_AO", "resolution_bits", "min_V", "max_V", "step_size", "start_index"],
    def __init__(self, name, parent_device, module_address, num_AO=8, **kwargs):
        self.start_index = module_start_index[int(module_address)]
        super().__init__(name, parent_device, module_address, **kwargs)
Schabbauer, Johannes's avatar
Schabbauer, Johannes committed

    def add_device(self, device):
        _ADwinCard.add_device(self, device, self.num_AO)

    def do_checks(self):
        _ADwinCard.do_checks(self)
        # Check if the sample_rate in ramp instructions is below the ADwin clockrate
        # This is necessary here, because we don't call the pseudoclock's expand_change_time()
        for output in self.child_devices:
            for instr in output.instructions.values():
                if isinstance(instr,dict) and instr["clock rate"]>self.parent_device.clock_limit:
                    rate_kHz = instr['clock rate']/1e3
                    ADwin_rate_kHz = self.parent_device.clock_limit/1e3
                    raise LabscriptError(
                        f"The ramp sample rate ({rate_kHz:.0f}kHz) of {output.name} must not be faster than ADwin ({ADwin_rate_kHz:.0f}kHz)."
                    )

            # Check limits of output, but only when PID is NOT enabled (becasue then the target can be out of limits)
            # Get all times when PID is not enabled
            PID = output.PID.copy()
            if 0 not in PID:
                # Because 'np.digitize' determines the bins, we also have to make sure t=0 is included.
                # If output.PID does not have the key t=0, then the PID is disabled in the beginning.
                PID[0] = {"PID_channel":0,"start":0}
            PID_times = np.array(list(PID.keys()))
            PID_times.sort()
            PID_off_times = []
            # For each output value, digitize gets the next highest time in  PID_times.
            # Using '-1' to get next lowest time.
Schabbauer, Johannes's avatar
Schabbauer, Johannes committed
            for i_out,i_PID in enumerate(np.digitize(np.round(output.all_times,6), np.round(PID_times,6))-1):
                t = PID_times[i_PID]
                if PID[t]["PID_channel"]==0:
                    # When we turn the PID off but keep the last output, we make sure that
                    #  - at least once in the end the output is (re)set, otherwise the get_final_values() in the Worker is wrong,
                    #  - don't try to also set the target value at the same time, as this would overwrite the target in the ADwin with the PID output.
                    if PID[t]["start"]=="last":
                        if i_out+1==len(output.all_times) and output.raw_output[i_out]==100_000:
                            raise LabscriptError(f"{output.name}: You must set the output at the end after turning off PID with 'last'.")
                        elif output.all_times[i_out] == t:
                            raise LabscriptError(f"{output.name}: Don't turn off PID with persitent value ('last') and also set new value.")
                    PID_off_times.append(i_out)
            PID_off_outputs = output.raw_output[PID_off_times]
            if np.any(PID_off_outputs < output.limits[0]) or np.any(PID_off_outputs > output.limits[1]):
                error_times = output.all_times[PID_off_times][(PID_off_outputs < output.limits[0]) < (PID_off_outputs > output.limits[1])]
                raise LabscriptError(
                    f"Limits of {output.name} (when PID is off) must be in {output.limits}, " +
                    f"you try to set ({PID_off_outputs.min()},{PID_off_outputs.max()}) " +
                    f"or turning off the PID with target value beyond limits at times {error_times}.")
        # Check if the PID channel is allowed
        if np.any(self.PID_table["PID_channel"] > PIDNO):
            raise LabscriptError(f"ADwin: Setting the PID channel to more than {PIDNO} is not possible!")
        if np.any(self.PID_table["PID_channel"] < 0):
            raise LabscriptError("ADwin: Setting the PID channel to less than 0 is not possible!")
Schabbauer, Johannes's avatar
Schabbauer, Johannes committed
    def generate_code(self,hdf5_file):
        Device.generate_code(self, hdf5_file)
        clockline = self.parent_device
        pseudoclock = clockline.parent_device
Daniel Dux's avatar
Daniel Dux committed

        output_dtypes = [("n_cycles",np.int32),("channel",np.int32),("value",np.int32)]
        PID_config_dtypes = [
            ("PID_channel",np.int32),("PID_P",np.float64),("PID_I",np.float64),("PID_D",np.float64),("PID_min",np.int32),("PID_max",np.int32)
        ]
        PID_table_dtypes = [
            ("n_cycles",np.int32),("AOUT_channel",np.int32),("PID_channel",np.int32),("PID_start",np.int32)
        for output in sorted(self.child_devices, key=lambda dev:int(dev.connection)):
Daniel Dux's avatar
Daniel Dux committed

            # Get input channels for PID, collect changed for time table and store bare channels as dataset
            if output.PID:
                # Get PID parameters
                if not hasattr(output,"pid_no"):
                    raise LabscriptError(f"For {self.name} you try to use a PID, but never set the parameters via {self.name}.init_PID().")
                PID_config. append(
                    np.array([
                        (output.pid_no,output.P,output.I,output.D,ADC(output.PID_min,self.resolution_bits,self.min_V,self.max_V),ADC(output.PID_max,self.resolution_bits,self.min_V,self.max_V))
                        ], dtype=PID_config_dtypes)
                )
                PID_array = np.zeros(len(output.PID),dtype=PID_table_dtypes)
                PID_times = np.array(list(output.PID.keys()))
                PID_array["n_cycles"] = np.round(PID_times * pseudoclock.clock_limit)
                # PID_array["PID_channel"] = list(output.PID_channel.values())
                PID_array["AOUT_channel"] = int(output.connection) + self.start_index
                # If a PID is enabled, the set values are not the actual voltage values of the 
                # Output, but those measured at the input. If the input has a gain enabled, the
                # set values have the be scaled too, to have the PID stabilized to the right values.
                indices = np.digitize(output.all_times, PID_times)
                for i,t in enumerate(PID_times):
                    if isinstance(output.PID[t]['PID_channel'],AnalogIn):
                        PID_array["PID_channel"][i] = int(output.PID[t]['PID_channel'].connection) + output.PID[t]['PID_channel'].parent_device.start_index
                        # Sacle output by gain of PID AIn channel
                        output.raw_output[indices==i+1] *= output.PID[t]['PID_channel'].scale_factor
                    elif isinstance(output.PID[t]['PID_channel'],int):
                        PID_array["PID_channel"][i] = output.PID[t]['PID_channel']
                    if output.PID[t]["start"]=="last":
                        # When we want to use the previous value during the shot,
                        # we use a value that's out of the 16 bit ADC range to identify.
                        PID_array["PID_start"][i] = 100_000
                    else:
                        PID_array["PID_start"][i] = ADC(output.PID[t]['start'],self.resolution_bits,self.min_V,self.max_V)
            # The ADwin has 16 bit output resolution, so we quantize the Voltage to the right values
            quantized_output = ADC(output.raw_output,self.resolution_bits,self.min_V,self.max_V)
            out = np.empty(quantized_output.size,dtype=output_dtypes)
Daniel Dux's avatar
Daniel Dux committed
            out["n_cycles"] = np.round(output.all_times * pseudoclock.clock_limit)
            out["value"] = quantized_output
            out["channel"] = int(output.connection) + self.start_index
Daniel Dux's avatar
Daniel Dux committed

        self.outputs = np.concatenate(outputs) if outputs else np.array([],dtype=output_dtypes)
        self.PID_table = np.concatenate(PID_table) if PID_table else np.array([],dtype=PID_table_dtypes)
        self.PID_config = np.concatenate(PID_config) if PID_config else np.array([],dtype=PID_config_dtypes)



# Voltages are specified with a 16 bit unsigned integer, mapping the range [-10,10) volts.
# There are 8 analog inputs on a card (AIn-F-8/16)
class ADwinAI8(_ADwinCard):
    description = "ADwin-Pro II-AIn-F-8/16 module"
    allowed_children = [AnalogIn]
    resolution_bits=16
    min_V = -10
    max_V = 10
    gain_modes = {1:0,2:1,4:2,8:3} # Scale factor 1 = Mode 0, Scale factor 2 = Mode 1, etc. 
    @set_passed_properties(
        property_names={
            "connection_table_properties": ["module_address","adwin_name","num_AI", "resolution_bits", "min_V", "max_V", "start_index"],
    def __init__(self, name, parent_device, module_address, num_AI = 8, **kwargs):
        self.num_AI = num_AI
        self.start_index = module_start_index[int(module_address)]
        _ADwinCard.__init__(self, name, parent_device, module_address, TiCo=False, **kwargs)

    def add_device(self, device):
        _ADwinCard.add_device(self, device, self.num_AI)

    def do_checks(self):
        _ADwinCard.do_checks(self)
        # TODO implement further checks if required
        if np.any(self.AIN_times["start_time"] > self.AIN_times["stop_time"]):
            raise LabscriptError(f"Start time for acquisition with Adwin {self.name} must not be later than stop time.")


    def generate_code(self,hdf5_file):
        Device.generate_code(self, hdf5_file)
        AI_group = hdf5_file.require_group(f"/devices/{self.adwin_name}/ANALOG_IN")

        clockline = self.parent_device
        pseudoclock = clockline.parent_device

        self.AIN_times = np.zeros(self.num_AI,dtype = [("start_time",np.int32),("stop_time",np.int32), ("gain_mode",np.int32)])

        # loop through all connected analog in channels and get start and end times
        for analogIn in self.child_devices:
            channel = int(analogIn.connection)
            try:
                self.AIN_times["gain_mode"][channel-1] = self.gain_modes[int(analogIn.scale_factor)]
                analogIn.set_property("scale_factor", int(analogIn.scale_factor), "connection_table_properties")
            except KeyError:
                raise LabscriptError(f"Scale factor for AnalogIn {analogIn.name} must be in {list(self.gain_modes.keys())}, not {analogIn.scale_factor}.")
                if len(analogIn.acquisitions)>1:
                    print(f"Warning: For channel {analogIn.name} more than one aquisition was defined, the data is also measured for all times in between!")
                # only a single acquisition is handled by adwin
                start_time = min([aqu["start_time"] for aqu in analogIn.acquisitions])
                stop_time = max([aqu["end_time"] for aqu in analogIn.acquisitions])
                # If the stop_time is later than the ADwin's, stop
                stop_time = min(stop_time,self.pseudoclock_device.stop_time)
                # convert to time steps
                start_time = np.round(start_time * pseudoclock.clock_limit)
                stop_time = np.round(stop_time * pseudoclock.clock_limit)
                self.AIN_times["start_time"][channel-1]=start_time
                self.AIN_times["stop_time"][channel-1]=stop_time
                label = ", ".join([aqu["label"] for aqu in analogIn.acquisitions])
                if not label:
                    label = analogIn.name
                AI_group.attrs[str(self.start_index+channel)] = label