__author__ = 'Frederik Lauber'
import visa
import zipfile
class hp_infinium(object):
known_types = ("Type:", "Points:", "Count:", "XInc:", "XOrg:", "XRef:", "YData range:", "YData center:",
"Coupling:", "XRange:", "XOffset:", "YRange:", "YOffset:", "Date:", "Time:", "Frame:",
"Acq mode:", "Completion:", "X Units:", "Y Units:", "Max bandwidth:", "Min bandwidth:")
known_set = set(known_types)
# not checked yet, only from spec
type_spec = {"1": "raw",
"2": "average",
"3": "vhistogram",
"4": "hhistogramm",
"5": "versus",
"6": "interpolation",
"8": "cgrade",
"10": "pdetect"
}
# only number 3 was not tested, no idea how
coupling_spec = {"0": "AC",
"1": "DC",
"2": "50 Ohms",
"3": "LFREJECT"
}
# not checked yet, only from spec
acq_spec = {"0": "real time",
"1": "ETIMe",
"3": "PDETect"
}
# not checked yet, only from spec
xy_units_spec = {"0": "unknown",
"1": "Volt",
"2": "second",
"3": "constant",
"4": "amp",
"5": "decibel"
}
@classmethod
def from_gpig_name(cls, gpib_identifier_string, timeout):
re = visa.ResourceManager()
gpib_connection = re.get_instrument(gpib_identifier_string)
return cls(gpib_connection, timeout)
def __init__(self, gpib_connection, timeout):
self._gpib_connection = gpib_connection
self._timeout = None
# variable used by the .timeout setter/getter method
self.timeout = timeout
self.write = self._gpib_connection.write
self.ask = self._gpib_connection.ask
@property
def timeout(self):
return self._timeout
@timeout.setter
def timeout(self, new_timeout):
self._timeout = new_timeout
if not new_timeout is None:
self._gpib_connection.timeout = new_timeout
else:
del self._gpib_connection.timeout
@property
def idn(self):
return self.ask("*IDN?")
def digitize(self):
# deletes the data on the scope and arms it again
# self.write("Digitize")
self.write(":DIGitize")
def arm_trigger(self):
answer = self.ask("*TRG;*OPC?")
return True if answer == "1" else False
def trigger_armed(self):
return self.ask("AER?").strip()
def preample(self):
header = self.ask(":WAVeform:PREamble?").split(",")
tmp = {}
# this are the headers I found in the original csv files
# if I did not have a value I used the one from the spec
# if you find a difference to the once produced by the oszi directly
# please notify me
tmp["Type:"] = self.type_spec[header[1]].strip()
tmp["Points:"] = header[2].strip()
tmp["Count:"] = header[3].strip()
tmp["XInc:"] = header[4].strip()
tmp["XOrg:"] = header[5].strip()
tmp["XRef:"] = header[6].strip()
tmp["YData range:"] = header[7].strip()
tmp["YData center:"] = header[8].strip()
# header[9] not used, spec says always 0
tmp["Coupling:"] = self.coupling_spec[header[10]].strip()
tmp["XRange:"] = header[11].strip()
tmp["XOffset:"] = header[12].strip()
tmp["YRange:"] = header[13].strip()
tmp["YOffset:"] = header[14].strip()
tmp["Date:"] = header[15].strip('"')
tmp["Time:"] = header[16].strip('"')
tmp["Frame:"] = header[17].strip('"')
tmp["Acq mode:"] = self.acq_spec[header[19]].strip()
tmp["Completion:"] = header[20].strip()
tmp["X Units:"] = self.xy_units_spec[header[21]].strip()
tmp["Y Units:"] = self.xy_units_spec[header[22]].strip()
tmp["Max bandwidth:"] = header[23].strip()
tmp["Min bandwidth:"] = header[24].strip()
# adding header so we always know which one
# was taken by the program
tmp["GPIB:"] = "YES"
return tmp
def data(self, header):
y = self.ask(":WAVeform:DATA?").split(",")
xorig = float(header["XOrg:"])
xinc = float(header["XInc:"])
x_data = tuple(xorig + i * xinc for i in range(len(y)))
y_data = tuple(float(i) for i in y)
return x_data, y_data
def make_csv_like_string(self, preample, data):
output = []
preample_keys = set(preample.keys())
# first, add known keys
# second add unknown keys
# -> order will be the same as in the csv files generated by oszi
# known headers
for key in self.known_types:
# Schema is:
# Keyname+white space until length 15 and actual value
try:
output.append("".join([key, (15 - len(key)) * " ", str(preample[key])]))
except Exception:
pass
for key in preample_keys.difference(self.known_set):
output.append("".join([key, (15 - len(key)) * " ", str(preample[key])]))
output.append("Data:")
x_data = data[0]
y_data = data[1]
# for x, y in zip(x_data, y_data):
for i in range(len(x_data)):
output.append("%E,%E" % (x_data[i], y_data[i]))
return "\n".join(output)
class Agilent_Waveform_Generator:
def __init__(self, gpib_connection, timeout):
self._gpib_connection = gpib_connection
self._timeout = None
# variable used by the .timeout setter/getter method
self.timeout = timeout
# add some shortcuts for gpib
self.write = self._gpib_connection.write
self.ask = self._gpib_connection.ask
@classmethod
def from_gpig_name(cls, gpib_identifier_string, timeout):
re = visa.ResourceManager()
gpib_connection = re.get_instrument(gpib_identifier_string)
return cls(gpib_connection, timeout)
@property
def timeout(self):
return self._timeout
@timeout.setter
def timeout(self, new_timeout):
self._timeout = new_timeout
if not new_timeout is None:
self._gpib_connection.timeout = new_timeout
else:
del self._gpib_connection.timeout
@property
def idn(self):
return self.ask("*IDN?")
def trigger(self):
self.write("*TRG")
def trigger_wait(self):
self.write("*TRG;*WAI")
@property
def trigger_source(self):
return self.ask("TRIG:SOUR?").strip()
@trigger_source.setter
def trigger_source(self, value):
self.write("TRIG:SOUR " + str(value))
@property
def burst_mode(self):
return self.ask("BURS:MODE?").strip()
@burst_mode.setter
def burst_mode(self, value):
# allowed TRIG, GAT
self.write("BURS:MODE " + str(value))
@property
def function(self):
return self.ask("FUNC?").strip()
@function.setter
def function(self, value):
# allowed values are:
# SIN, SQU, RAMP, PULS, NOIS, DC, USER
self.write("FUNC " + str(value))
@property
def frequency(self):
return float(self.ask("FREQ?"))
@frequency.setter
def frequency(self, value):
# allowed: MIN, MAX, floating point number
self.write("FREQ " + str(value))
@property
def voltage(self):
return float(self.ask("VOLT?"))
@voltage.setter
def voltage(self, value):
# allowed: MIN, MAX, floating point number
self.write("VOLT " + str(value))
@property
def voltage_high(self):
return float(self.ask("VOLT:HIGH?"))
@voltage_high.setter
def voltage_high(self, value):
self.write("VOLT:HIGH " + str(value))
@property
def voltage_low(self):
return float(self.ask("VOLT:LOW?"))
@voltage_low.setter
def voltage_low(self, value):
self.write("VOLT:LOW " + str(value))
@property
def pulse_width(self):
return float(self.ask("PULS:WIDT?"))
@pulse_width.setter
def pulse_width(self, value):
# MIN, MAX, floating point in seconds
self.write("PULS:WIDT " + str(value))
@property
def pulse_period(self):
return float(self.ask("PULS:PER?"))
@pulse_period.setter
def pulse_period(self, value):
# MIN, MAX, floating point in seconds
self.write("PULS:PER " + str(value))
@property
def pulse_transition(self):
return float(self.ask("PULS:TRAN?"))
@pulse_transition.setter
def pulse_transition(self, value):
# MIN, MAX, floating point in seconds
self.write("PULS:TRAN " + str(value))
@property
def burst_count(self):
return float(self.ask("BURS:NCYC?"))
@burst_count.setter
def burst_count(self, value):
# MIN, MAX, floating point in seconds
self.write("BURS:NCYC " + str(value))
@property
def burst_phase(self):
return float(self.ask("BURS:PHAS?"))
@burst_phase.setter
def burst_phase(self, value):
# MIN, MAX, floating point in seconds
self.write("BURS:PHAS " + str(value))
@property
def output(self):
return False if self.ask("OUTP?").strip() == '0' else True
@output.setter
def output(self, value):
# True, False
if value:
self.write("OUTP ON")
else:
self.write("OUTP OFF")
@property
def output_load(self):
return float(self.ask("OUTP:LOAD?"))
@output_load.setter
def output_load(self, value):
# INF, MIN, MAX, flaoting point in Ohms
self.write("OUTP:LOAD " + str(value))
@property
def output_trigger(self):
return False if self.ask("OUTP:TRIG?").strip() == '0' else True
@output_trigger.setter
def output_trigger(self, value):
# True, False
if value:
self.write("OUTP:TRIG ON")
else:
self.write("OUTP:TRIG OFF")
@property
def output_trigger_slope(self):
return self.ask("OUTP:TRIG:SLOP?").strip()
@output_trigger_slope.setter
def output_trigger_slope(self, value):
# POS, NEG
self.write("OUTP:TRIG:SLOP " + str(value))
def enable_burst_mode(self):
self.write("BURS:STAT ON")
def clear_buffer(self):
self.write("CLEAR 710")
def clear_registers(self):
self.write("*CLS")
def list_all_devices():
re = visa.ResourceManager()
device_list = re.list_resources()
instrument_list = tuple(map(re.get_instrument, device_list))
for instrument in instrument_list:
idn = instrument.ask("*IDN?")
print(idn)
return instrument_list
#always set the waveform generator to the same settings
def setup_WFG(WFG):
WFG.output = False
WFG.output_load = 50
WFG.output_trigger = True
WFG.output_trigger_slope = "POS"
WFG.function = "PULS"
WFG.pulse_width = 1E-07
WFG.pulse_period = 2E-07
WFG.pulse_transition = 5E-09
WFG.voltage_high = 2.5
WFG.voltage_low = 0
WFG.burst_mode = "TRIG"
WFG.burst_count = 1
WFG.burst_phase = 0
WFG.trigger_source = "BUS"
if not WFG.function == "PULS" \
or not WFG.output_load == 50 \
or not WFG.output_trigger is True \
or not WFG.output_trigger_slope == "POS" \
or not WFG.voltage_high == 2.5 \
or not WFG.voltage_low == 0 \
or not WFG.pulse_width == 1E-07 \
or not WFG.burst_mode == "TRIG" \
or not WFG.trigger_source == "BUS" \
or not WFG.burst_count == 1:
print("Setting for WFG failed failed")
print(WFG.function == "PULS")
print(WFG.output_load == 50)
print(WFG.output_trigger is True)
print(WFG.output_trigger_slope)
print(WFG.voltage_high == 2.5)
print(WFG.voltage_low == 0)
print(WFG.pulse_width == 1E-07)
print(WFG.burst_mode == "TRIG")
print(WFG.trigger_source == "BUS")
print(WFG.burst_count == 1)
exit(1)
WFG.output = True
WFG.enable_burst_mode()
print("WFG setup complete")
def main():
instrument_list = list_all_devices()
OSZI = hp_infinium(instrument_list[0], 10)
WFG = Agilent_Waveform_Generator(instrument_list[1], 10)
setup_WFG(WFG)
#clear the the oscilloscope
OSZI.write("DCL")
OSZI.ask("ADER?")
OSZI.ask("AER?")
preample = OSZI.preample()
with zipfile.ZipFile("test.zip", mode='w', compression=zipfile.ZIP_BZIP2) as file_zip:
for counter in range(0, 6666, 1):
print(counter)
OSZI.arm_trigger()
while not int(OSZI.trigger_armed):
# wait for the arm register to be marked as armed
pass
WFG.trigger_wait()
while not int(OSZI.ask("ADER?").strip()):
# make sure we really get new data
pass
data = OSZI.data(preample)
csv_string = OSZI.make_csv_like_string(preample, data)
file_zip.writestr(str(counter).zfill(5) + ".csv", csv_string)
if __name__ == "__main__":
main()