Binary transfer of pattern waveform data
This Python example, binary_transfer_pattern.py, performs a binary transfer of a pattern waveform. The resulting data is graphed by the draw_graph() function and is shown in a tkinter GUI window. The main purpose of this program is to demonstrate how to return binary pattern data with the graph used as a visual confirmation that the data was correctly interpreted. The example works with FlexDCA offline and installs simulated modules. No hardware is required.
As shown in the following lines, the pyvisa library's query_binary_values() method is used to return the x waveform data. Returning the y data is similar.
message = ':WAVeform:XYFORmat:FLOat:XDATa?' x_data = FlexDCA.query_binary_values(message, datatype='f', container=list, is_big_endian=False, header_fmt='ieee')
The following picture shows an example of the displayed graph.
Example Script
Copy
binary-transfer-pattern.py
# -*- coding: utf-8 -*-
""" In Oscilloscope mode with an entire pattern viewed, returns the data
in XY-format. Uses the ':WAVeform:XYFORmat:FLOat:XDATa?' and
':YDATa?' queries.
This program installs a simulated module. Turns on pattern
lock, acquires entire pattern, and returns the binary pattern data.
The draw_graph() function shows a graph of the data in a tkinter GUI window.
"""
import pyvisa as visa # import VISA library
import sys
ADDRESS = 'TCPIP0::localhost::hislip0,4880::INSTR'
CHANNEL = '5A'
def open_flexdca_connection(address):
""" Opens visa connection to FlexFlexDCA. """
try:
rm = visa.ResourceManager()
connection = rm.open_resource(address)
connection.timeout = 20000 # Set connection timeout to 20s
connection.read_termination = '\n'
connection.write_termination = '\n'
inst_id = connection.query('*IDN?')
print('\nFlexDCA connection established to:\n' + inst_id, flush=True)
connection.write(':SYSTem:DEFault')
connection.query('*OPC?')
except (visa.VisaIOError, visa.InvalidSession):
print('\nVISA ERROR: Cannot open instrument address.\n', flush=True)
return None
except Exception as other:
print('\nVISA ERROR: Cannot connect to instrument:', other, flush=True)
print('\n')
return None
return connection
def install_simulated_module(flexdca, channel, model, signal='NRZ'):
""" Simplified installation of a simulated FlexDCA module.
channel
Simulated module's channel to use. Installation slot is
derived from channel.
model
Type of simulated module. "DEM" is a dual-electrical module.
"QEM" is a quad-electrical module. "DOM" is a dual-optical
module. "QEM" is a electrical-optical module.
signal
Format of signal. NRZ or PAM4.
"""
slot = channel[0]
flexdca.write(':EMODules:SLOT' + slot + ':SELection ' + model)
if signal in 'NRZ':
flexdca.write(':SOURce' + channel + ':FORMat NRZ')
else:
flexdca.write(':SOURce' + channel + ':FORMat PAM4')
flexdca.write(':SOURce' + channel + ':DRATe 9.95328E+9')
flexdca.write(':SOURce' + channel + ':WTYPe DATA')
flexdca.write(':SOURce' + channel + ':PLENgth 127')
flexdca.write(':SOURce' + channel + ':AMPLitude 90E-3')
flexdca.write(':SOURce' + channel + ':NOISe:RN 3.0E-6')
flexdca.write(':SOURce' + channel + ':JITTer:RJ 4.0E-12')
flexdca.write(':CHANnel' + channel + ':DISPlay ON')
def configure_FlexDCA(flexdca, channel):
""" Installs a simulated module and prepares FlexDCA for
measurements.
"""
flexdca.query(':SYSTem:DEFault;*OPC?')
install_simulated_module(flexdca, channel, 'DEM')
flexdca.write(':SOURce'+channel+':DRATe 9.95328E+9')
flexdca.write(':SOURce'+channel+':WTYPe DATA')
flexdca.write(':SOURce'+channel+':PLENgth 127')
flexdca.write(':SOURce'+channel+':AMPLitude 90E-3')
flexdca.write(':SOURce'+channel+':NOISe:RN 3.0E-6')
flexdca.write(':SOURce'+channel+':JITTer:RJ 4.0E-12')
flexdca.write(':CHAN' + channel + ':DISPlay ON')
flexdca.write(':ACQuire:RUN')
flexdca.query(':SYSTem:MODE OSCilloscope;*OPC?')
flexdca.write(':TRIGger:PLOCk ON')
flexdca.write(':ACQuire:EPATtern ON')
while True:
if flexdca.query(':WAVeform:PATTern:COMPlete?'):
break
flexdca.query(':SYSTem:AUToscale;*OPC?')
flexdca.write(':TIMebase:UNIT UINTerval')
pattern_length = flexdca.query(':TRIGger:PLENgth?')
flexdca.write(':TIMebase:UIRange ' + pattern_length)
flexdca.write(':ACQuire:STOP')
def eng_notation(numstring, resolution):
""" Converts a string in scientific notation to engineering notation.
Unit multiplier character is appended to in final return string.
numstring
Number string for conversion. For example, '12.3e-4'.
resolution
An real that indicates number of decimal places in
the result. For example, '0.01' would give '1.23 m'
Returns string representing number with multiplier character.
"""
import decimal as dc
MU = '\u03BC' # μ
if numstring in '9.91E+37':
return ''
dc.getcontext().prec = 10 # prevent limiting of decimal places
multiplier = {12: 'T', 9: 'G', 6: 'M', 3: 'k', 0: '', -3: 'm',
-6: MU, -9: 'n', -12: 'p', -15: 'f'}
numberReal = dc.Decimal(numstring)
# absolute value is not between 0 and 1 (fraction)
if abs(numberReal) >= 1:
exponentMult = 0
while int(numberReal) != 0:
numberReal = numberReal / 1000
exponentMult += 1
numberReal *= 1000
exponentMult -= 1
elif (abs(numberReal) > 0) and (abs(numberReal) < 1): # fraction
exponentMult = 0
while int(numberReal) == 0:
numberReal = numberReal * 1000
exponentMult += 1
exponentMult *= -1
elif numberReal == 0: # number must be zero
exponentMult = 0
exponentMult *= 3
if exponentMult == -15:
n = numberReal.quantize(dc.Decimal('1'))
else:
n = numberReal.quantize(dc.Decimal(str(resolution)))
return str(n) + ' ' + multiplier[exponentMult]
def get_pattern_info(flexdca, channel):
print('Get pattern scaling information.', flush=True)
values = {'p_length': '',
'p_points': '',
'xmin': '',
'xmax': '',
'ymin': '',
'ymax': '',
'xscale': '',
'yscale': ''}
flexdca.write(':WAVeform:SOURce CHANnel' + channel)
values['p_length'] = flexdca.query(':WAVeform:PATTern:NSYMbols?')
values['p_points'] = int(flexdca.query(':WAVeform:XYFORmat:POINts?'))
values['xmin'] = flexdca.query(':TIMebase:XLEFt?')
values['xmax'] = flexdca.query(':TIMebase:XRIGht?')
values['ymin'] = flexdca.query(':CHANnel' + channel + ':YBOTTom?')
values['ymax'] = flexdca.query(':CHANnel' + channel + ':YTOP?')
values['xscale'] = flexdca.query(':TIMebase:SCALe?')
values['yscale'] = flexdca.query(':CHANnel' + channel + ':YSCale?')
print('-' * 30)
print('X-scale maximum: ' + eng_notation(values['xmax'], '1.00') + 's')
print('X-scale minimum: ' + eng_notation(values['xmin'], '1.00') + 's')
print('Y-scale maximum: ' + eng_notation(values['ymax'], '1.00') + 'V')
print('Y-scale minimum: ' + eng_notation(values['ymin'], '1.00') + 'V')
print('Pattern length: ' + values['p_length'] + ' bits')
print('Data points: ' + str(values['p_points']))
print('-' * 30)
return values
def get_waveform_x_data(flexdca):
""" Reads x data as floats. Using pyvisa's read_raw() method requires
that :WAVeform:XYFORmat:FLOat:XDATa? query be sent using the write()
method followed by separate read_raw().
"""
print('Get pattern waveform X data.', flush=True)
x_data = [] # Python 3 raw byte string
endiansetting = flexdca.query(':SYSTem:BORDER?') # get current byte order
flexdca.write(':SYSTem:BORDER LENDian') # set little endian byte order
message = ':WAVeform:XYFORmat:FLOat:XDATa?'
flexdca.read_termination = ''
flexdca.write_termination = ''
x_data = flexdca.query_binary_values(message, datatype='f', container=list, is_big_endian=False, header_fmt='ieee')
flexdca.read_termination = '\n'
flexdca.write_termination = '\n'
flexdca.write(':SYSTem:BORDER ' + endiansetting)
# scale data
n = 0
while n < len(x_data):
x_data[n] *= 1E9 # data in mV
n += 1
return x_data
def get_waveform_y_data(flexdca):
""" Reads y data as floats. Using pyvisa's read_raw() method requires
that :WAVeform:XYFORmat:FLOat:XDATa? query be sent using the write()
method followed by separate read_raw().
"""
print('Get pattern waveform Y data.', flush=True)
y_data = []
endiansetting = flexdca.query(':SYSTem:BORDER?') # get current byte order
flexdca.write(':SYSTem:BORDER LENDian') # set little endian byte order
message = ':WAVeform:XYFORmat:FLOat:YDATa?'
flexdca.read_termination = ''
flexdca.write_termination = ''
y_data = flexdca.query_binary_values(message, datatype='f', container=list, is_big_endian=False, header_fmt='ieee')
flexdca.read_termination = '\n'
flexdca.write_termination = '\n'
flexdca.write(':SYSTem:BORDER ' + endiansetting)
# scale data
n = 0
while n < len(y_data):
y_data[n] *= 1E3 # data in ns
n += 1
return y_data
def draw_graph(ydata, xdata, values, channel):
""" Draw graph on tkinter window. """
import numpy as np
import tkinter
from matplotlib.backends.backend_tkagg import (
FigureCanvasTkAgg, NavigationToolbar2Tk)
from matplotlib.figure import Figure
xmin = round(float(values['xmin']) * 1E9, 1)
xmax = round(float(values['xmax']) * 1E9, 2)
ymin = round(float(values['ymin']) * 1E3, 1)
ymax = round(float(values['ymax']) * 1E3, 1)
yminimum = str(ymin)
ymaximum = str(ymax)
xminimum = str(xmin)
xmaximum = str(xmax)
y_ticks = np.linspace(ymin, ymax, num=9, endpoint=True)
x_ticks = np.linspace(xmin, xmax, num=11, endpoint=True)
s = round(ymax - (ymax-ymin)/2.0, 1)
y_axis_midpoint = str(s)
title = 'Channel ' + channel + ': ' +\
values['p_length'] + ' Bit Pattern Waveform'
print('Drawing graph.', flush=True)
root = tkinter.Tk()
root.wm_title(title)
fig = Figure(figsize=(8.0, 6.0), dpi=96, edgecolor='black') # width and height in inches
ax = fig.add_subplot(111)
ax.grid(b=True, which='major', axis='both')
ax.set_yticks(y_ticks)
ax.set_xticks(x_ticks)
ax.set_yticklabels([yminimum, '', '', '', y_axis_midpoint, '', '', '', ymaximum])
ax.set_xticklabels([xminimum, '', '', '', '', '', '', '', '', '', xmaximum])
ax.set_xlabel('Time (ns)')
ax.set_ylabel('Voltage (mV)')
ax.set_xlim(xmin, xmax)
ax.set_ylim(ymin, ymax)
ax.plot(xdata, ydata, 'g-') # data points hidden. line only
canvas = FigureCanvasTkAgg(fig, master=root)
canvas.draw()
toolbar = NavigationToolbar2Tk(canvas, root)
toolbar.update()
button = tkinter.Button(master=root, text="Quit", command=root.quit)
button.pack(side=tkinter.BOTTOM)
toolbar.pack(side=tkinter.BOTTOM, fill=tkinter.X)
canvas.get_tk_widget().pack(side=tkinter.TOP, fill=tkinter.BOTH, expand=1)
print('Graph of returned data is displayed on separate window.')
sys.stdout.flush() # Flush print message in stdout
tkinter.mainloop()
FlexDCA = open_flexdca_connection(ADDRESS)
configure_FlexDCA(FlexDCA, CHANNEL)
values = get_pattern_info(FlexDCA, CHANNEL)
x_data = get_waveform_x_data(FlexDCA)
y_data = get_waveform_y_data(FlexDCA)
draw_graph(y_data, x_data, values, CHANNEL)
FlexDCA.write(':SYSTem:GTLocal')
FlexDCA.close()