from labscript import Device, LabscriptError, config, set_passed_properties, MHz import numpy as np class EttusSDR(Device): description = "Software Defined Radio N210 Ettus" @set_passed_properties( property_names={"connection_table_properties": ["tx_frequency","sampling_rate"]} ) def __init__(self, name, tx_frequency, sampling_rate=50*MHz, **kwargs): super().__init__(name, parent_device=None, connection="None", **kwargs) self.BLACS_connection = "USRP" self.tx_frequency = tx_frequency self.sampling_rate = sampling_rate self.frequency_offsets = [] self.amplitudes = [] self.phases = [] def set_tx_frequency(self, frequency): """Set the TX frequency of the SDR.""" self.tx_frequency = frequency def set_stream_duration(self, t): self.duration = t def add_frequency(self, frequency, amplitude, phase=0): """Adds a tone to stream from the SDR. Parameters ---------- frequency: float Frequency of the tone, saved as offset from tx frequency. amplitude: float (Real Positive) Amplitude of the tone. phase: float Phase of the tone. """ if not hasattr(self, "tx_frequency"): raise LabscriptError(f"{self.name}: Set the tx center frequency first!") self.frequency_offsets.append(frequency-self.tx_frequency) self.amplitudes.append(amplitude) self.phases.append(phase) def add_frequencies(self, frequencies, amplitudes, phases=None): """Adds multiple tones to be streamed. Length of the arrays must be equal. Parameters ---------- frequencies : iterable amplitudes : iterable phases : iterable (opt.) """ if phases is None: phases = np.zeros(len(amplitudes)) for f, a, p in zip(frequencies, amplitudes, phases): self.add_frequency(f, a, p) def generate_code(self, hdf5_file): group = hdf5_file.create_group(f"devices/{self.name}") self.amplitudes = np.array(self.amplitudes) self.frequency_offsets = np.array(self.frequency_offsets) # Save data in h5 file dtypes = [("frequencies", np.float64), ("amplitudes", np.float64), ("phases", np.float64)] tones = np.rec.fromarrays([self.frequency_offsets, self.amplitudes, self.phases], dtype=dtypes) group.create_dataset("data", compression=config.compression, data=tones) group.attrs["tx_frequency"] = self.tx_frequency group.attrs["sampling_rate"] = self.sampling_rate if hasattr(self, "duration"): group.attrs["duration"] = self.duration