*** Wartungsfenster jeden ersten Mittwoch vormittag im Monat ***

Skip to content
Snippets Groups Projects
Commit dce960d5 authored by Leolab Cavity CAD PC's avatar Leolab Cavity CAD PC
Browse files
parents 630d4b50 adfcbfb4
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
from blacs.device_base_class import DeviceTab, MODE_MANUAL, MODE_BUFFERED, define_state
from qtutils.qt import QtGui
from qtutils import UiLoader
import os
class SpectrumAWGTab(DeviceTab):
def initialise_GUI(self):
connection = self.settings['connection_table'].find_by_name(self.device_name)
props = connection.properties
self.create_worker(
'main_worker',
'user_devices.SpectrumAWG.blacs_workers.SpectrumAWGWorker',
props,
)
self.primary_worker = 'main_worker'
# Set the capabilities of this device
self.supports_remote_value_check(False)
self.supports_smart_programming(True)
# Create UI
self.ui = UiLoader().load(os.path.join(os.path.dirname(os.path.realpath(__file__)),"./blacs_widget.ui"))
self.start_icon = QtGui.QIcon(':/qtutils/fugue/control')
self.stop_icon = QtGui.QIcon(':/qtutils/fugue/control-stop-square')
self.ui.pushButton_SingleTone.setIcon(self.start_icon)
self.ui.pushButton_MemoryReplay.setIcon(self.start_icon)
self.ui.pushButton_SingleTone.clicked.connect(lambda: self.manual_single_tone())
self.ui.pushButton_MemoryReplay.clicked.connect(lambda: self.manual_memory_replay())
self.ui.pushButton_refresh.setIcon(QtGui.QIcon(':/qtutils/fugue/arrow-circle-double'))
self.ui.pushButton_refresh.clicked.connect(lambda: self.refresh_dropdown_menu())
self.manual_active = False
# Memory
self.ui.used_memory.setMaximum(props["memory_segments"])
self.auto_place_widgets(("AWG",{"AWG":self.ui}))
def get_front_panel_values(self):
return self._final_values
@define_state(MODE_BUFFERED,False)
def transition_to_manual(self,notify_queue,program=False):
super().transition_to_manual(notify_queue,program=False)
#self.ui.used_memory.setValue(len(self._final_values))
self.ui.used_memory.setValue(100)
@define_state(MODE_MANUAL,False)
def manual_single_tone(self):
if not self.manual_active:
frequency = self.ui.doubleSpinBox_frequency.value()
self.ui.pushButton_SingleTone.setIcon(self.stop_icon)
self.ui.pushButton_MemoryReplay.setEnabled(False)
self.manual_active = True
yield(self.queue_work(self._primary_worker,'program_manual',frequency))
else:
self.ui.pushButton_SingleTone.setIcon(self.start_icon)
self.ui.pushButton_MemoryReplay.setEnabled(True)
self.manual_active = False
yield(self.queue_work(self._primary_worker,'program_manual',None))
@define_state(MODE_MANUAL,False)
def manual_memory_replay(self):
if not self.manual_active:
index = self.ui.comboBox_memory.currentIndex()
if index!=-1:
self.ui.pushButton_MemoryReplay.setIcon(self.stop_icon)
self.ui.pushButton_SingleTone.setEnabled(False)
self.manual_active = True
yield(self.queue_work(self._primary_worker,'program_manual',index))
else:
self.ui.pushButton_MemoryReplay.setIcon(self.start_icon)
self.ui.pushButton_SingleTone.setEnabled(True)
self.manual_active = False
yield(self.queue_work(self._primary_worker,'program_manual',None))
@define_state(MODE_MANUAL,False)
def refresh_dropdown_menu(self):
self.ui.comboBox_memory.clear()
for memory_index,instruction in self._final_values.items():
self.ui.comboBox_memory.addItem(f"{memory_index} {instruction}")
self.ui.comboBox_memory.view().setMinimumWidth(self.ui.comboBox_memory.view().sizeHintForColumn(0)+30)
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Form</class>
<widget class="QWidget" name="Form">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>822</width>
<height>350</height>
</rect>
</property>
<property name="minimumSize">
<size>
<width>400</width>
<height>0</height>
</size>
</property>
<property name="windowTitle">
<string>Form</string>
</property>
<property name="toolTip">
<string/>
</property>
<layout class="QVBoxLayout" name="verticalLayout_2">
<item>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
<spacer name="horizontalSpacer_2">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="1">
<widget class="QComboBox" name="comboBox_memory"/>
</item>
<item row="0" column="0">
<widget class="QLabel" name="label_2">
<property name="text">
<string>Single Tone</string>
</property>
</widget>
</item>
<item row="1" column="3">
<widget class="QPushButton" name="pushButton_MemoryReplay">
<property name="toolTip">
<string>Start/Stop</string>
</property>
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="0" column="2">
<widget class="QLabel" name="label_5">
<property name="text">
<string>MHz</string>
</property>
</widget>
</item>
<item row="0" column="3">
<widget class="QPushButton" name="pushButton_SingleTone">
<property name="toolTip">
<string>Start/Stop</string>
</property>
<property name="text">
<string/>
</property>
</widget>
</item>
<item row="1" column="0">
<widget class="QLabel" name="label_3">
<property name="text">
<string>Memory Replay</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QDoubleSpinBox" name="doubleSpinBox_frequency">
<property name="decimals">
<number>3</number>
</property>
<property name="maximum">
<number>1250</number>
</property>
</widget>
</item>
<item row="1" column="2">
<widget class="QPushButton" name="pushButton_refresh">
<property name="toolTip">
<string>Refresh dropdown menu</string>
</property>
<property name="text">
<string/>
</property>
</widget>
</item>
</layout>
</item>
<item>
<layout class="QHBoxLayout" name="horizontalLayout_2">
<item>
<widget class="QLabel" name="label">
<property name="text">
<string>Memory in use</string>
</property>
</widget>
</item>
<item>
<widget class="QProgressBar" name="used_memory">
<property name="enabled">
<bool>true</bool>
</property>
<property name="tabletTracking">
<bool>false</bool>
</property>
<property name="contextMenuPolicy">
<enum>Qt::DefaultContextMenu</enum>
</property>
<property name="value">
<number>0</number>
</property>
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="textDirection">
<enum>QProgressBar::TopToBottom</enum>
</property>
<property name="format">
<string>%p%</string>
</property>
</widget>
</item>
</layout>
</item>
</layout>
</item>
<item>
<spacer name="horizontalSpacer">
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
</layout>
</item>
</layout>
</widget>
<resources/>
<connections/>
</ui>
import labscript_utils.h5_lock
import h5py
from blacs.tab_base_classes import Worker
from . import SpectrumCard
import numpy as np
class SpectrumAWGWorker(Worker):
def init(self):
print("### INITIALIZE ###\n")
self.AWG = SpectrumCard.SpectrumCard(self.device_path,timeout=self.timeout)
self.AWG.open()
if self.external_clock_rate is None:
self.AWG.set_clock('internal')
else:
self.AWG.set_clock('external',int(self.external_clock_rate))
self.AWG.set_sample_rate(int(self.sample_rate))
self.channels = []
for ch in range(2):
if hasattr(self,f"channel_amplitude_{ch}"):
self.channels.append(str(ch))
self.AWG.set_channel_status(ch,True)
self.AWG.set_channel_enable(ch,True)
self.AWG.set_channel_amplitude(ch,getattr(self,f"channel_amplitude_{ch}"))
self.AWG.set_channel_filter(ch,False)
self.AWG.set_channel_mode(ch,None)
self.AWG.set_ext_trigger_mode('ext0','pos',rearm=True)
self.AWG.set_ext_trigger_level('ext0',2000,800) # 2V tirgger, 0.8V rearm
self.AWG.set_trigger_or_mask(['ext0'])
self.AWG.set_generation_mode(mode='sequence')
self.AWG.seq_set_memory_segments(self.memory_segments)
print("\n### INITIALIZATION DONE ###\n")
# Initialize memory for smart programming
# Keys: hash of instructions, Values: position in memory
self.smart_cache = {}
def program_manual(self, values):
if values is None:
self.AWG.card_stop()
return{}
elif type(values) is float:
# Stream single frequency
data = SpectrumCard.generate_single_tone(values*1e6,4096,self.sample_rate) # TODO: Set num_samples dynamically
self.AWG.transfer_sequence_replay_samples(len(self.smart_cache),data) # Write in next free memory
self.AWG.seq_set_sequence_step(0,len(self.smart_cache),0,1,'on_trigger',last_step=False)
elif type(values) is int:
if values == -1:
return {} # not memory index selected
# Stream sample from memory
self.AWG.seq_set_sequence_step(0,values,0,1,'on_trigger',last_step=False)
else:
return{}
self.AWG.card_write_setup()
self.AWG.card_start()
self.AWG.card_force_trigger() # Start replay without a hardware trigger
return {}
def transition_to_buffered(self, device_name, h5_file, initial_values, fresh):
self.AWG.card_stop() # If card was still running, e.g. from manual mode
with h5py.File(h5_file,'r') as f:
group = f[f"devices/{device_name}"]
for ch in self.channels:
if fresh or len(self.smart_cache)+len(group[ch].attrs) > self.memory_segments:
# Reset smart programming and start writing memory from the beginning
# TODO: What is we want to always keep specific instruction in the memory?
self.smart_cache = {}
last_index = len(group[ch].attrs)-1
for index in range(len(group[ch].attrs)):
index_h5 = str(index) # The index in the h5 file is a str
### LOOP TROUGH STREAMING STEPS ###
instruction = group[ch].attrs[index_h5]
instruction_hash = hash(instruction.tobytes())
if instruction_hash in self.smart_cache:
memory_index = self.smart_cache[instruction_hash]
else:
memory_index = len(self.smart_cache)
self.smart_cache[instruction_hash] = memory_index
### CALCULATE DATA ###
num_samples = int(instruction[0])
if len(instruction)==2:
# SINGLE TONE
data = SpectrumCard.generate_single_tone(instruction[1],num_samples,self.sample_rate)
initial_values[memory_index] = f"{instruction[0]}Hz"
elif (len(instruction)-1)%3 == 0:
# MULTI TONE
num_tones = (len(instruction)-1)//3
freq = instruction[1:num_tones+1]
ampl = instruction[num_tones+1:2*num_tones+1]
phase= instruction[2*num_tones+1:]
data = SpectrumCard.generate_multi_tone(freq,ampl,phase,num_samples,self.sample_rate)
initial_values[memory_index] = f"f:{freq}, a:{ampl}, p:{phase}"
else:
raise RuntimeError("Instruction length does not match, what happened??")
if index_h5 in group[ch]["labels"].attrs:
initial_values[memory_index] = group[ch]["labels"].attrs[index_h5]
self.AWG.transfer_sequence_replay_samples(memory_index,data)
if index!=last_index:
self.AWG.seq_set_sequence_step(index,memory_index,index+1,1,'on_trigger',last_step=False)
else:
self.AWG.seq_set_sequence_step(index,memory_index,index,1,'on_trigger',last_step=False)
# If there is a trigger at the stop time, repeat one last sequence
# and then stop (is not card_stop() was called already)
self.AWG.seq_set_sequence_step(index+1,memory_index,0,1,'always',last_step=True)
# ONLY IMPLEMENTED FOR ONE CHANNEL, IF TWO CHANNELS ARE NEEDED WE ALREADY GET AN ERROR IN LABSCRIPT
# FOR IMPLEMENTATION ONE HASE TO INTERWEAVE THE DATA FOR BOTH CHANNELS
break
self.AWG.card_write_setup() # TODO: Do we have to call that every shot or just once after the initialization?
self.AWG.card_start()
self.AWG.card_enable_trigger()
return initial_values
def transition_to_manual(self):
self.AWG.card_stop()
return True
def shutdown(self):
self.AWG.card_stop()
self.AWG.card_close()
def abort_buffered(self):
return self.transition_to_manual()
def abort_transition_to_buffered(self):
return True
from labscript import Device, Output, Trigger, LabscriptError, config, set_passed_properties
import numpy as np
class AWGOutput(Output):
description = 'Arbitray Waveform Output'
def __init__(self, name, parent_device, connection, trigger_device, trigger_connection, channel_amplitude, **kwargs):
""" Create Output channel for AWG.
Parameters
----------
name : str
Name for device.
parent_device :
Instance of AWG
connection : str
Number of channel.
trigger_device :
Instance of device (InternediateDevice with digital output) that triggers the channel.
trigger_connection : str
Channel of Trigger in trigger_device
channel_amplitude : int
Maximal (positive and negative) amplitude of channel output in mV.
"""
parent_device.set_property(f"channel_amplitude_{connection}", channel_amplitude, "connection_table_properties")
super().__init__(name, parent_device, connection, limits=None, unit_conversion_class=None, unit_conversion_parameters=None, default_value=None, **kwargs)
if isinstance(trigger_device, Trigger):
if "rising" != trigger_device.trigger_edge_type:
raise LabscriptError('Trigger edge type for %s is \'%s\', ' % (name, self.trigger_edge_type) +
'but existing Trigger object %s ' % trigger_device.name +
'has edge type \'%s\'' % trigger_device.trigger_edge_type)
self.trigger_device = trigger_device
elif trigger_device is not None:
# Instantiate a trigger object to be our parent:
self.trigger_device = Trigger(name+"_trigger", trigger_device, trigger_connection, trigger_edge_type="rising")
def add_instruction(self, time, instruction, units=None):
if time in self.instructions:
raise LabscriptError(f"Spectrum AWG aleady has another instruction programmed at t={time}!")
self.trigger_device.trigger(time,duration=50e-6)
return super().add_instruction(time, instruction, units)
def calculate_num_samples(self, sample_duration):
# The sample size has to be multiples of 4096, so we have to quantize to that value.
# With the (maximal) sample rate of 1.25GHz, this means the sample duration has minimal
# steps of 3.2769µs.
num_samples = np.round(sample_duration*self.parent_device.sample_rate/4096)*4096
return num_samples
def generate_single_tone(self, t, sample_duration, frequency, label=None):
"""
Parameters
----------
t : float
Time when stream starts, in seconds.
sample_duration : float
Duration of sample (that gets repeated), in seconds.
frequency : float
in Hz.
label : str, optional
Description of the instruction
"""
num_samples = self.calculate_num_samples(sample_duration)
# Check frequency resolution
fundamental_frequency = np.round(1/sample_duration)
if frequency<fundamental_frequency:
raise LabscriptError(f"Freqeuncy of '{self.name}' at t={t} is smaller than the resolution ({fundamental_frequency:i}), change frequency or sample size/duration.")
self.add_instruction(t,(num_samples,frequency,label))
def generate_multiple_tones(self, t, sample_duration, frequencies, amplitudes=None, phases=None, label=None):
"""
Parameters
----------
t : float
Time.
frequencies :
Iterable of frequencies for each tone
amplitude : (optional)
Relative amplitude of each tone. If None, all have the same amplitude.
phase : (optional)
Phase for each tone. If None, all tones of zero (the same) phase.
label : str, optional
Description of the instruction
"""
num_samples = self.calculate_num_samples(sample_duration)
frequencies = np.array(frequencies)
if amplitudes is None:
amplitudes = np.ones(len(frequencies))
if phases is None:
phases = np.zeros(len(frequencies))
# TODO: for equally spaced frequencies using
# phases = np.pi*np.arange(len(frequencies))**2/len(frequencies)
# are good values.
# Check frequency resolution
freq_sorted = np.sort(frequencies)
fundamental_frequency = np.round(1/sample_duration)
if np.any(frequencies<fundamental_frequency) or np.any((freq_sorted[1:]-freq_sorted[:-1])<fundamental_frequency):
raise LabscriptError(f"Freqeuncy or difference of '{self.name}' at t={t} is smaller than the resolution ({fundamental_frequency:.0f}Hz), change frequency or sample size/duration.")
if not len(frequencies)==len(amplitudes)==len(phases):
raise LabscriptError(f"Instruction for '{self.name}' at t={t} must have a frequency, amplitude and phase for all tones or none.")
self.add_instruction(t, (num_samples,*frequencies,*amplitudes, *phases, label))
def do_checks(self):
for t,instruction in self.instructions.items():
if instruction[0]>self.parent_device.max_sample_size:
raise LabscriptError(f"Instruction for '{self.name}' at t={t} has too many samples (num_samples={instruction[0]:.0f}).")
class SpectrumAWG(Device):
description = "Spectrum Instrumentation Arbitray Waveform Generator"
allowed_children = [AWGOutput]
@set_passed_properties(
property_names={"connection_table_properties": ["device_path","timeout","external_clock_rate","sample_rate","memory_segments"],
"device_properties": []
}
)
def __init__(self, name, device_path, sample_rate, external_clock_rate=None, timeout=5000, channel_mode="seq", memory_segments=2**16, **kwargs):
""" Create SpectrumAWG instance.
Parameters
----------
name : str
Name for device.
device_path : str
Path to find the hardware for the spcm driver (e.g. '/dev/spcm0').
timeout : int
Timeout for opterations in BLACS worker, in milliseconds.
external_clock_rate : int
Frequency of the external clock (ClkIn) in Hz. If None, the card uses the internal clock.
channel_mode : str
Sets the output mode of the AWG channel between sequence ('seq') and streaming ('fifo').
TODO: implemet fifo mode
memory_segments : int
In sequence mode how many different segments can be stored in memory.
"""
super().__init__(name, parent_device=None, connection="None", **kwargs)
self.BLACS_connection = device_path
self.channel_mode = channel_mode
self.sample_rate = sample_rate
self.memory_segments = memory_segments
# Calculate maximal sample size
internal_memory = 2**32 # 4GB
self.max_sample_size = internal_memory//2//memory_segments # TODO: according to the messages in the worker we don't need the factor 2 here
def do_checks(self):
if len(self.child_devices)>1:
raise NotImplementedError("This code can just handle 1 output channel for now.")
def generate_code(self, hdf5_file):
group = hdf5_file.require_group(f"devices/{self.name}")
self.do_checks()
for output in self.child_devices:
output.do_checks()
change_times = output.get_change_times()
group.require_group(output.connection)
group[output.connection].require_group("labels")
for i,t in enumerate(np.sort(change_times)):
group[output.connection].attrs[str(i)] = output.instructions[t][:-1]
if output.instructions[t][-1] is not None:
group[output.connection]["labels"].attrs[str(i)] = output.instructions[t][-1]
if __name__=="__main__":
from labscript import start
import h5py
# Create hdf5 file for testing
with h5py.File("user_devices/SpectrumAWG/testing/labscript_devices.h5","w") as hdf5_file:
hdf5_file.require_group("devices")
SpectrumAWG("TestAWG","/dev/spcm0",timeout=5000,sample_rate=1250e6)
AWGOutput("Tweezers", TestAWG, "0", None, None, 100)
start()
Tweezers.generate_single_tone(1,1e-3,10e6)
Tweezers.generate_single_tone(0,1e-3,20e6)
Tweezers.generate_multiple_tones(10,1e-3,[1e6,2e6,3e6])
TestAWG.generate_code(hdf5_file)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from labscript_devices import register_classes
register_classes(
'SpectrumAWG',
BLACS_tab='user_devices.SpectrumAWG.blacs_tabs.SpectrumAWGTab',
runviewer_parser=None,
)
from ctypes import *
# load registers for easier access
from .py_header.regs import *
#
# **************************************************************************
# pvAllocMemPageAligned: creates a buffer for DMA that's page-aligned
# **************************************************************************
#
def pvAllocMemPageAligned (qwBytes):
dwAlignment = 4096
dwMask = dwAlignment - 1
# allocate non-aligned, slightly larger buffer
qwRequiredNonAlignedBytes = qwBytes * sizeof (c_char) + dwMask
pvNonAlignedBuf = (c_char * qwRequiredNonAlignedBytes)()
# get offset of next aligned address in non-aligned buffer
misalignment = addressof (pvNonAlignedBuf) & dwMask
if misalignment:
dwOffset = dwAlignment - misalignment
else:
dwOffset = 0
return(c_char * qwBytes).from_buffer(pvNonAlignedBuf, dwOffset)
This diff is collapsed.
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment