__author__ = 'Frederik Lauber'

import visa
import zipfile


class hp_infinium(object):
    known_types = ("Type:", "Points:", "Count:", "XInc:", "XOrg:", "XRef:", "YData range:", "YData center:",
                   "Coupling:", "XRange:", "XOffset:", "YRange:", "YOffset:", "Date:", "Time:", "Frame:",
                   "Acq mode:", "Completion:", "X Units:", "Y Units:", "Max bandwidth:", "Min bandwidth:")
    known_set = set(known_types)

    # not checked yet, only from spec
    type_spec = {"1": "raw",
                 "2": "average",
                 "3": "vhistogram",
                 "4": "hhistogramm",
                 "5": "versus",
                 "6": "interpolation",
                 "8": "cgrade",
                 "10": "pdetect"
                 }

    # only number 3 was not tested, no idea how
    coupling_spec = {"0": "AC",
                     "1": "DC",
                     "2": "50 Ohms",
                     "3": "LFREJECT"
                     }

    # not checked yet, only from spec
    acq_spec = {"0": "real time",
                "1": "ETIMe",
                "3": "PDETect"
                }

    # not checked yet, only from spec
    xy_units_spec = {"0": "unknown",
                     "1": "Volt",
                     "2": "second",
                     "3": "constant",
                     "4": "amp",
                     "5": "decibel"
                     }

    @classmethod
    def from_gpig_name(cls, gpib_identifier_string, timeout):
        re = visa.ResourceManager()
        gpib_connection = re.get_instrument(gpib_identifier_string)
        return cls(gpib_connection, timeout)

    def __init__(self, gpib_connection, timeout):
        self._gpib_connection = gpib_connection
        self._timeout = None
        # variable used by the .timeout setter/getter method
        self.timeout = timeout
        self.write = self._gpib_connection.write
        self.ask = self._gpib_connection.ask

    @property
    def timeout(self):
        return self._timeout

    @timeout.setter
    def timeout(self, new_timeout):
        self._timeout = new_timeout
        if not new_timeout is None:
            self._gpib_connection.timeout = new_timeout
        else:
            del self._gpib_connection.timeout

    @property
    def idn(self):
        return self.ask("*IDN?")

    def digitize(self):
        # deletes the data on the scope and arms it again
        # self.write("Digitize")
        self.write(":DIGitize")

    def arm_trigger(self):
        answer = self.ask("*TRG;*OPC?")
        return True if answer == "1" else False

    def trigger_armed(self):
        return self.ask("AER?").strip()

    def preample(self):
        header = self.ask(":WAVeform:PREamble?").split(",")
        tmp = {}
        # this are the headers I found in the original csv files
        # if I did not have a value I used the one from the spec
        # if you  find a difference to the once produced by the oszi directly
        # please notify me
        tmp["Type:"] = self.type_spec[header[1]].strip()
        tmp["Points:"] = header[2].strip()
        tmp["Count:"] = header[3].strip()
        tmp["XInc:"] = header[4].strip()
        tmp["XOrg:"] = header[5].strip()
        tmp["XRef:"] = header[6].strip()
        tmp["YData range:"] = header[7].strip()
        tmp["YData center:"] = header[8].strip()
        # header[9] not used, spec says always 0
        tmp["Coupling:"] = self.coupling_spec[header[10]].strip()
        tmp["XRange:"] = header[11].strip()
        tmp["XOffset:"] = header[12].strip()
        tmp["YRange:"] = header[13].strip()
        tmp["YOffset:"] = header[14].strip()
        tmp["Date:"] = header[15].strip('"')
        tmp["Time:"] = header[16].strip('"')
        tmp["Frame:"] = header[17].strip('"')
        tmp["Acq mode:"] = self.acq_spec[header[19]].strip()
        tmp["Completion:"] = header[20].strip()
        tmp["X Units:"] = self.xy_units_spec[header[21]].strip()
        tmp["Y Units:"] = self.xy_units_spec[header[22]].strip()
        tmp["Max bandwidth:"] = header[23].strip()
        tmp["Min bandwidth:"] = header[24].strip()
        # adding header so we always know which one
        # was taken by the program
        tmp["GPIB:"] = "YES"
        return tmp

    def data(self, header):
        y = self.ask(":WAVeform:DATA?").split(",")
        xorig = float(header["XOrg:"])
        xinc = float(header["XInc:"])
        x_data = tuple(xorig + i * xinc for i in range(len(y)))
        y_data = tuple(float(i) for i in y)
        return x_data, y_data

    def make_csv_like_string(self, preample, data):
        output = []
        preample_keys = set(preample.keys())
        # first, add known keys
        # second add unknown keys
        # -> order will be the same as in the csv files generated by oszi
        # known headers
        for key in self.known_types:
            # Schema is:
            # Keyname+white space until length 15 and actual value
            try:
                output.append("".join([key, (15 - len(key)) * " ", str(preample[key])]))
            except Exception:
                pass
        for key in preample_keys.difference(self.known_set):
            output.append("".join([key, (15 - len(key)) * " ", str(preample[key])]))
        output.append("Data:")

        x_data = data[0]
        y_data = data[1]
        # for x, y in zip(x_data, y_data):
        for i in range(len(x_data)):
            output.append("%E,%E" % (x_data[i], y_data[i]))
        return "\n".join(output)


class Agilent_Waveform_Generator:
    def __init__(self, gpib_connection, timeout):
        self._gpib_connection = gpib_connection
        self._timeout = None
        # variable used by the .timeout setter/getter method
        self.timeout = timeout
        # add some shortcuts for gpib
        self.write = self._gpib_connection.write
        self.ask = self._gpib_connection.ask

    @classmethod
    def from_gpig_name(cls, gpib_identifier_string, timeout):
        re = visa.ResourceManager()
        gpib_connection = re.get_instrument(gpib_identifier_string)
        return cls(gpib_connection, timeout)

    @property
    def timeout(self):
        return self._timeout

    @timeout.setter
    def timeout(self, new_timeout):
        self._timeout = new_timeout
        if not new_timeout is None:
            self._gpib_connection.timeout = new_timeout
        else:
            del self._gpib_connection.timeout

    @property
    def idn(self):
        return self.ask("*IDN?")

    def trigger(self):
        self.write("*TRG")

    def trigger_wait(self):
        self.write("*TRG;*WAI")

    @property
    def trigger_source(self):
        return self.ask("TRIG:SOUR?").strip()

    @trigger_source.setter
    def trigger_source(self, value):
        self.write("TRIG:SOUR " + str(value))

    @property
    def burst_mode(self):
        return self.ask("BURS:MODE?").strip()

    @burst_mode.setter
    def burst_mode(self, value):
        # allowed TRIG, GAT
        self.write("BURS:MODE " + str(value))

    @property
    def function(self):
        return self.ask("FUNC?").strip()

    @function.setter
    def function(self, value):
        # allowed values are:
        # SIN, SQU, RAMP, PULS, NOIS, DC, USER
        self.write("FUNC " + str(value))

    @property
    def frequency(self):
        return float(self.ask("FREQ?"))

    @frequency.setter
    def frequency(self, value):
        # allowed: MIN, MAX, floating point number
        self.write("FREQ " + str(value))

    @property
    def voltage(self):
        return float(self.ask("VOLT?"))

    @voltage.setter
    def voltage(self, value):
        # allowed: MIN, MAX, floating point number
        self.write("VOLT " + str(value))

    @property
    def voltage_high(self):
        return float(self.ask("VOLT:HIGH?"))

    @voltage_high.setter
    def voltage_high(self, value):
        self.write("VOLT:HIGH " + str(value))

    @property
    def voltage_low(self):
        return float(self.ask("VOLT:LOW?"))

    @voltage_low.setter
    def voltage_low(self, value):
        self.write("VOLT:LOW " + str(value))

    @property
    def pulse_width(self):
        return float(self.ask("PULS:WIDT?"))

    @pulse_width.setter
    def pulse_width(self, value):
        # MIN, MAX, floating point in seconds
        self.write("PULS:WIDT " + str(value))

    @property
    def pulse_period(self):
        return float(self.ask("PULS:PER?"))

    @pulse_period.setter
    def pulse_period(self, value):
        # MIN, MAX, floating point in seconds
        self.write("PULS:PER " + str(value))

    @property
    def pulse_transition(self):
        return float(self.ask("PULS:TRAN?"))

    @pulse_transition.setter
    def pulse_transition(self, value):
        # MIN, MAX, floating point in seconds
        self.write("PULS:TRAN " + str(value))

    @property
    def burst_count(self):
        return float(self.ask("BURS:NCYC?"))

    @burst_count.setter
    def burst_count(self, value):
        # MIN, MAX, floating point in seconds
        self.write("BURS:NCYC " + str(value))

    @property
    def burst_phase(self):
        return float(self.ask("BURS:PHAS?"))

    @burst_phase.setter
    def burst_phase(self, value):
        # MIN, MAX, floating point in seconds
        self.write("BURS:PHAS " + str(value))

    @property
    def output(self):
        return False if self.ask("OUTP?").strip() == '0' else True

    @output.setter
    def output(self, value):
        # True, False
        if value:
            self.write("OUTP ON")
        else:
            self.write("OUTP OFF")

    @property
    def output_load(self):
        return float(self.ask("OUTP:LOAD?"))

    @output_load.setter
    def output_load(self, value):
        # INF, MIN, MAX, flaoting point in Ohms
        self.write("OUTP:LOAD " + str(value))

    @property
    def output_trigger(self):
        return False if self.ask("OUTP:TRIG?").strip() == '0' else True

    @output_trigger.setter
    def output_trigger(self, value):
        # True, False
        if value:
            self.write("OUTP:TRIG ON")
        else:
            self.write("OUTP:TRIG OFF")

    @property
    def output_trigger_slope(self):
        return self.ask("OUTP:TRIG:SLOP?").strip()

    @output_trigger_slope.setter
    def output_trigger_slope(self, value):
        # POS, NEG
        self.write("OUTP:TRIG:SLOP " + str(value))

    def enable_burst_mode(self):
        self.write("BURS:STAT ON")

    def clear_buffer(self):
        self.write("CLEAR 710")

    def clear_registers(self):
        self.write("*CLS")


def list_all_devices():
    re = visa.ResourceManager()
    device_list = re.list_resources()
    instrument_list = tuple(map(re.get_instrument, device_list))
    for instrument in instrument_list:
        idn = instrument.ask("*IDN?")
        print(idn)
    return instrument_list


#always set the waveform generator to the same settings
def setup_WFG(WFG):
    WFG.output = False
    WFG.output_load = 50
    WFG.output_trigger = True
    WFG.output_trigger_slope = "POS"
    WFG.function = "PULS"
    WFG.pulse_width = 1E-07
    WFG.pulse_period = 2E-07
    WFG.pulse_transition = 5E-09
    WFG.voltage_high = 2.5
    WFG.voltage_low = 0
    WFG.burst_mode = "TRIG"
    WFG.burst_count = 1
    WFG.burst_phase = 0
    WFG.trigger_source = "BUS"

    if not WFG.function == "PULS" \
            or not WFG.output_load == 50 \
            or not WFG.output_trigger is True \
            or not WFG.output_trigger_slope == "POS" \
            or not WFG.voltage_high == 2.5 \
            or not WFG.voltage_low == 0 \
            or not WFG.pulse_width == 1E-07 \
            or not WFG.burst_mode == "TRIG" \
            or not WFG.trigger_source == "BUS" \
            or not WFG.burst_count == 1:
        print("Setting for WFG failed failed")
        print(WFG.function == "PULS")
        print(WFG.output_load == 50)
        print(WFG.output_trigger is True)
        print(WFG.output_trigger_slope)
        print(WFG.voltage_high == 2.5)
        print(WFG.voltage_low == 0)
        print(WFG.pulse_width == 1E-07)
        print(WFG.burst_mode == "TRIG")
        print(WFG.trigger_source == "BUS")
        print(WFG.burst_count == 1)
        exit(1)
    WFG.output = True
    WFG.enable_burst_mode()
    print("WFG setup complete")


def main():
    instrument_list = list_all_devices()
    OSZI = hp_infinium(instrument_list[0], 10)
    WFG = Agilent_Waveform_Generator(instrument_list[1], 10)
    setup_WFG(WFG)
    #clear the the oscilloscope
    OSZI.write("DCL")
    OSZI.ask("ADER?")
    OSZI.ask("AER?")
    preample = OSZI.preample()
    with zipfile.ZipFile("test.zip", mode='w', compression=zipfile.ZIP_BZIP2) as file_zip:
        for counter in range(0, 6666, 1):
            print(counter)
            OSZI.arm_trigger()
            while not int(OSZI.trigger_armed):
                # wait for the arm register to be marked as armed
                pass
            WFG.trigger_wait()
            while not int(OSZI.ask("ADER?").strip()):
                # make sure we really get new data
                pass
            data = OSZI.data(preample)
            csv_string = OSZI.make_csv_like_string(preample, data)
            file_zip.writestr(str(counter).zfill(5) + ".csv", csv_string)


if __name__ == "__main__":
    main()

Published

Category

snippet

Tags