Agilent Waveform generator and Oscilloscope example Jan 30, 2017 Snippet: __author__ = 'Frederik Lauber' import visa import zipfile class hp_infinium(object): known_types = ("Type:", "Points:", "Count:", "XInc:", "XOrg:", "XRef:", "YData range:", "YData center:", "Coupling:", "XRange:", "XOffset:", "YRange:", "YOffset:", "Date:", "Time:", "Frame:", "Acq mode:", "Completion:", "X Units:", "Y Units:", "Max bandwidth:", "Min bandwidth:") known_set = set(known_types) # not checked yet, only from spec type_spec = {"1": "raw", "2": "average", "3": "vhistogram", "4": "hhistogramm", "5": "versus", "6": "interpolation", "8": "cgrade", "10": "pdetect" } # only number 3 was not tested, no idea how coupling_spec = {"0": "AC", "1": "DC", "2": "50 Ohms", "3": "LFREJECT" } # not checked yet, only from spec acq_spec = {"0": "real time", "1": "ETIMe", "3": "PDETect" } # not checked yet, only from spec xy_units_spec = {"0": "unknown", "1": "Volt", "2": "second", "3": "constant", "4": "amp", "5": "decibel" } @classmethod def from_gpig_name(cls, gpib_identifier_string, timeout): re = visa.ResourceManager() gpib_connection = re.get_instrument(gpib_identifier_string) return cls(gpib_connection, timeout) def __init__(self, gpib_connection, timeout): self._gpib_connection = gpib_connection self._timeout = None # variable used by the .timeout setter/getter method self.timeout = timeout self.write = self._gpib_connection.write self.ask = self._gpib_connection.ask @property def timeout(self): return self._timeout @timeout.setter def timeout(self, new_timeout): self._timeout = new_timeout if not new_timeout is None: self._gpib_connection.timeout = new_timeout else: del self._gpib_connection.timeout @property def idn(self): return self.ask("*IDN?") def digitize(self): # deletes the data on the scope and arms it again # self.write("Digitize") self.write(":DIGitize") def arm_trigger(self): answer = self.ask("*TRG;*OPC?") return True if answer == "1" else False def trigger_armed(self): return self.ask("AER?").strip() def preample(self): header = self.ask(":WAVeform:PREamble?").split(",") tmp = {} # this are the headers I found in the original csv files # if I did not have a value I used the one from the spec # if you find a difference to the once produced by the oszi directly # please notify me tmp["Type:"] = self.type_spec[header[1]].strip() tmp["Points:"] = header[2].strip() tmp["Count:"] = header[3].strip() tmp["XInc:"] = header[4].strip() tmp["XOrg:"] = header[5].strip() tmp["XRef:"] = header[6].strip() tmp["YData range:"] = header[7].strip() tmp["YData center:"] = header[8].strip() # header[9] not used, spec says always 0 tmp["Coupling:"] = self.coupling_spec[header[10]].strip() tmp["XRange:"] = header[11].strip() tmp["XOffset:"] = header[12].strip() tmp["YRange:"] = header[13].strip() tmp["YOffset:"] = header[14].strip() tmp["Date:"] = header[15].strip('"') tmp["Time:"] = header[16].strip('"') tmp["Frame:"] = header[17].strip('"') tmp["Acq mode:"] = self.acq_spec[header[19]].strip() tmp["Completion:"] = header[20].strip() tmp["X Units:"] = self.xy_units_spec[header[21]].strip() tmp["Y Units:"] = self.xy_units_spec[header[22]].strip() tmp["Max bandwidth:"] = header[23].strip() tmp["Min bandwidth:"] = header[24].strip() # adding header so we always know which one # was taken by the program tmp["GPIB:"] = "YES" return tmp def data(self, header): y = self.ask(":WAVeform:DATA?").split(",") xorig = float(header["XOrg:"]) xinc = float(header["XInc:"]) x_data = tuple(xorig + i * xinc for i in range(len(y))) y_data = tuple(float(i) for i in y) return x_data, y_data def make_csv_like_string(self, preample, data): output = [] preample_keys = set(preample.keys()) # first, add known keys # second add unknown keys # -> order will be the same as in the csv files generated by oszi # known headers for key in self.known_types: # Schema is: # Keyname+white space until length 15 and actual value try: output.append("".join([key, (15 - len(key)) * " ", str(preample[key])])) except Exception: pass for key in preample_keys.difference(self.known_set): output.append("".join([key, (15 - len(key)) * " ", str(preample[key])])) output.append("Data:") x_data = data[0] y_data = data[1] # for x, y in zip(x_data, y_data): for i in range(len(x_data)): output.append("%E,%E" % (x_data[i], y_data[i])) return "\n".join(output) class Agilent_Waveform_Generator: def __init__(self, gpib_connection, timeout): self._gpib_connection = gpib_connection self._timeout = None # variable used by the .timeout setter/getter method self.timeout = timeout # add some shortcuts for gpib self.write = self._gpib_connection.write self.ask = self._gpib_connection.ask @classmethod def from_gpig_name(cls, gpib_identifier_string, timeout): re = visa.ResourceManager() gpib_connection = re.get_instrument(gpib_identifier_string) return cls(gpib_connection, timeout) @property def timeout(self): return self._timeout @timeout.setter def timeout(self, new_timeout): self._timeout = new_timeout if not new_timeout is None: self._gpib_connection.timeout = new_timeout else: del self._gpib_connection.timeout @property def idn(self): return self.ask("*IDN?") def trigger(self): self.write("*TRG") def trigger_wait(self): self.write("*TRG;*WAI") @property def trigger_source(self): return self.ask("TRIG:SOUR?").strip() @trigger_source.setter def trigger_source(self, value): self.write("TRIG:SOUR " + str(value)) @property def burst_mode(self): return self.ask("BURS:MODE?").strip() @burst_mode.setter def burst_mode(self, value): # allowed TRIG, GAT self.write("BURS:MODE " + str(value)) @property def function(self): return self.ask("FUNC?").strip() @function.setter def function(self, value): # allowed values are: # SIN, SQU, RAMP, PULS, NOIS, DC, USER self.write("FUNC " + str(value)) @property def frequency(self): return float(self.ask("FREQ?")) @frequency.setter def frequency(self, value): # allowed: MIN, MAX, floating point number self.write("FREQ " + str(value)) @property def voltage(self): return float(self.ask("VOLT?")) @voltage.setter def voltage(self, value): # allowed: MIN, MAX, floating point number self.write("VOLT " + str(value)) @property def voltage_high(self): return float(self.ask("VOLT:HIGH?")) @voltage_high.setter def voltage_high(self, value): self.write("VOLT:HIGH " + str(value)) @property def voltage_low(self): return float(self.ask("VOLT:LOW?")) @voltage_low.setter def voltage_low(self, value): self.write("VOLT:LOW " + str(value)) @property def pulse_width(self): return float(self.ask("PULS:WIDT?")) @pulse_width.setter def pulse_width(self, value): # MIN, MAX, floating point in seconds self.write("PULS:WIDT " + str(value)) @property def pulse_period(self): return float(self.ask("PULS:PER?")) @pulse_period.setter def pulse_period(self, value): # MIN, MAX, floating point in seconds self.write("PULS:PER " + str(value)) @property def pulse_transition(self): return float(self.ask("PULS:TRAN?")) @pulse_transition.setter def pulse_transition(self, value): # MIN, MAX, floating point in seconds self.write("PULS:TRAN " + str(value)) @property def burst_count(self): return float(self.ask("BURS:NCYC?")) @burst_count.setter def burst_count(self, value): # MIN, MAX, floating point in seconds self.write("BURS:NCYC " + str(value)) @property def burst_phase(self): return float(self.ask("BURS:PHAS?")) @burst_phase.setter def burst_phase(self, value): # MIN, MAX, floating point in seconds self.write("BURS:PHAS " + str(value)) @property def output(self): return False if self.ask("OUTP?").strip() == '0' else True @output.setter def output(self, value): # True, False if value: self.write("OUTP ON") else: self.write("OUTP OFF") @property def output_load(self): return float(self.ask("OUTP:LOAD?")) @output_load.setter def output_load(self, value): # INF, MIN, MAX, flaoting point in Ohms self.write("OUTP:LOAD " + str(value)) @property def output_trigger(self): return False if self.ask("OUTP:TRIG?").strip() == '0' else True @output_trigger.setter def output_trigger(self, value): # True, False if value: self.write("OUTP:TRIG ON") else: self.write("OUTP:TRIG OFF") @property def output_trigger_slope(self): return self.ask("OUTP:TRIG:SLOP?").strip() @output_trigger_slope.setter def output_trigger_slope(self, value): # POS, NEG self.write("OUTP:TRIG:SLOP " + str(value)) def enable_burst_mode(self): self.write("BURS:STAT ON") def clear_buffer(self): self.write("CLEAR 710") def clear_registers(self): self.write("*CLS") def list_all_devices(): re = visa.ResourceManager() device_list = re.list_resources() instrument_list = tuple(map(re.get_instrument, device_list)) for instrument in instrument_list: idn = instrument.ask("*IDN?") print(idn) return instrument_list #always set the waveform generator to the same settings def setup_WFG(WFG): WFG.output = False WFG.output_load = 50 WFG.output_trigger = True WFG.output_trigger_slope = "POS" WFG.function = "PULS" WFG.pulse_width = 1E-07 WFG.pulse_period = 2E-07 WFG.pulse_transition = 5E-09 WFG.voltage_high = 2.5 WFG.voltage_low = 0 WFG.burst_mode = "TRIG" WFG.burst_count = 1 WFG.burst_phase = 0 WFG.trigger_source = "BUS" if not WFG.function == "PULS" \ or not WFG.output_load == 50 \ or not WFG.output_trigger is True \ or not WFG.output_trigger_slope == "POS" \ or not WFG.voltage_high == 2.5 \ or not WFG.voltage_low == 0 \ or not WFG.pulse_width == 1E-07 \ or not WFG.burst_mode == "TRIG" \ or not WFG.trigger_source == "BUS" \ or not WFG.burst_count == 1: print("Setting for WFG failed failed") print(WFG.function == "PULS") print(WFG.output_load == 50) print(WFG.output_trigger is True) print(WFG.output_trigger_slope) print(WFG.voltage_high == 2.5) print(WFG.voltage_low == 0) print(WFG.pulse_width == 1E-07) print(WFG.burst_mode == "TRIG") print(WFG.trigger_source == "BUS") print(WFG.burst_count == 1) exit(1) WFG.output = True WFG.enable_burst_mode() print("WFG setup complete") def main(): instrument_list = list_all_devices() OSZI = hp_infinium(instrument_list[0], 10) WFG = Agilent_Waveform_Generator(instrument_list[1], 10) setup_WFG(WFG) #clear the the oscilloscope OSZI.write("DCL") OSZI.ask("ADER?") OSZI.ask("AER?") preample = OSZI.preample() with zipfile.ZipFile("test.zip", mode='w', compression=zipfile.ZIP_BZIP2) as file_zip: for counter in range(0, 6666, 1): print(counter) OSZI.arm_trigger() while not int(OSZI.trigger_armed): # wait for the arm register to be marked as armed pass WFG.trigger_wait() while not int(OSZI.ask("ADER?").strip()): # make sure we really get new data pass data = OSZI.data(preample) csv_string = OSZI.make_csv_like_string(preample, data) file_zip.writestr(str(counter).zfill(5) + ".csv", csv_string) if __name__ == "__main__": main()