FlexEye testing with graphs in subprocess

This Python example demonstrates using FlexEye independent acquisition with a child process that creates a tkinter graph and displays the graph in a window. The example works with FlexDCA offline and installs simulated modules. No hardware is required. For this example, two programs are used. One program is used to launch another program (child process) that draws a graph of eye width over time. To work, both programs must be located in the same folder. From the sidebar shown in this topic, you must save these two files:

  • FlexEye-graph.py, and
  • FlexEye-graph-child.py (child process)

The child process, FlexEye-graph-child.py, is run on six simulated FlexEye channels using Python's standard subprocess library. For each channel, the following graph of eye width over time is displayed. The graph data is acquired over one minute so the program can appear to be locked up. Therefore, be sure to give the program enough time to complete. The child process uses matplotlib to draw the graph which is then placed in a tkinter canvas widget to be displayed in a Tkinter window. Both matplotlib and tkinter are provided with Anaconda's distribution of Python. The child process will not end until you close the Tkinter window.

Normally, the parent process would complete long before the child processes have even started to gather the plot data. But in this example, the parent tests for the completion of all child processes allowing the parent program to report the time that it took to display all of the graphs. As you can see in the graph below, the Tkinter window includes toolbar that allows you to save the graph to a bitmap or vector graphics file.

Parent Process (FlexEye-graph.py)

In the parent process, the run_child_processes() function first launches all of the child processes. Then, the function checks the child processes one-by-one for a stdout message indicating that the graph has been displayed. Once all graphs are displayed, the overall time spent creating the graphs is calculated and displayed. The function cannot wait for the child processes to be completed as in the FlexEye stdout example because the child process does not end until you manually close all of the graph windows.

Child Process (FlexEye-graph-child.py)

In the child process, the monitor_eye_width() function records the data and the create_time_data() function scales the data for the correct time units: seconds, minutes, or hours. The draw_graph() function shown below uses matplotlib combined with tkinter to display the data in a graph. At the end of the function, a print statement is used to return the message to the parent process that the graph is displayed. In the main body of the child script, the test_time and time_sep variables determine the total time span of the graph, in this case one minute, and the time separation between measurement points.

Example Scripts

Copy

FlexEye-graph.py

# -*- coding: utf-8 -*-
""" Launches subprocess: FlexEye_graph_child.py
Demonstrates FlexEye independent eye acquisition using FlexDCA offline
with a simulated module.
- The "sessions" dictionary defines the FlexEye sessions to be created. Dictionary keys represent
FlexEye session numbers and the dictionary values represents valid channels to measure for the session.
- Each session will run as independent oscilloscope.
- Six Eye Width measurements are monitored over a period of 60
seconds on each channel. A Tkinter graph is shown for each measurement. """

import subprocess
import sys
import time
import pyvisa as visa  # import VISA library

IOTIMEOUT = 60000
ADDRESS = 'TCPIP0::localhost::hislip0,4880::INSTR'
SLOT5 = '5'
SLOT6 = '6'

# FlexEye channel assignments for sessions
# For this program, only one channel should be assigned per session.
sessions = {1: '5A', 2: '5B', 3: '', 4: '6C', 5: '6D',
            6: '', 7: '', 8: '', 9: '', 10: '', 11: '', 12: '',
            13: '', 14: '', 15: ''}


def open_flexdca_connection(address):
    """ Opens visa connection to FlexFlexDCA. """
    print('Connecting to Flexdca ...')
    try:
        rm = visa.ResourceManager()
        connection = rm.open_resource(address)
        connection.timeout = 20000  # Set connection timeout to 20s
        connection.read_termination = '\n'
        connection.write_termination = '\n'
        inst_id = connection.query('*IDN?')
        print('\nFlexDCA connection established to:\n' + inst_id, flush=True)
    except (visa.VisaIOError, visa.InvalidSession):
        print('\nVISA ERROR: Cannot open instrument address.\n', flush=True)
        return None
    except Exception as other:
        print('\nVISA ERROR: Cannot connect to instrument:', other, flush=True)
        print('\n')
        return None
    return connection


def initialize_FlexDCA(FlexDCA):
    """ Place FlexDCA in a know starting state.
    Checks to see if FlexEye is already running. If so exit FlexEye. Perform
    a default setup.
    """
    flexeye_state = FlexDCA.query(':FEYE:STATe?')
    if 'RUN' in flexeye_state:
        FlexDCA.query(':FEYE:STATe PAUSe;*OPC?')
        FlexDCA.query(':FEYE:STATe STOP;*OPC?')
    elif 'PAUSe' in flexeye_state:
        FlexDCA.query(':FEYE:STATe STOP;*OPC?')
    FlexDCA.write('*CLS')
    FlexDCA.query(':SYSTem:DEFault;*OPC?')
    FlexDCA.query(':SYSTem:MODE EYE;*OPC?')


def install_simulated_module(FlexDCA, slot):
    """ Installs a simulated module in the first available slot.
    Second argument selects type of module based on arguments to
    :EMODules:SLOT:SELection command. If argument is missing, a
    quad-electrical module is installed. Returns a slot number (string). '0'
    if no available slots.
    """
    FlexDCA.write(':EMODules:SLOT'+slot+':SELection QEM')
    for c in ['A', 'B', 'C', 'D']:
        channel = slot + c
        FlexDCA.write(':SOURce'+channel+':FORMat NRZ')
        FlexDCA.write(':SOURce'+channel+':DRATe 9.95328e9')
        FlexDCA.write(':SOURce'+channel+':WTYPe DATA')
        FlexDCA.write(':SOURce'+channel+':PLENgth 127')
        # 90 mV amplitude
        FlexDCA.write(':SOURce'+channel+':AMPLitude 90E-3')
        # Add 3 uV random noise
        FlexDCA.write(':SOURce'+channel+':NOISe:RN 3.0E-6')
        # Add 4 ps random jitter
        FlexDCA.write(':SOURce'+channel+':JITTer:RJ 4.0E-12')
        print('Simulated module installed in slot ' + slot, flush=True)
        return slot


def all_channels_off(FlexDCA):
    """ Turns all available channels off. """
    for slot in '12345678':
        for letter in 'ABCD':
            channel = slot + letter
            FlexDCA.write(':CHANnel' + channel + ':DISPlay OFF')


def configure_FlexEye(FlexDCA, sessions):
    """ Configures each FlexEye session to the state defined in the
    sessions dictionary. Then, turns FlexEye on. This populates FlexDCA's
    FlexEye Streaming Setup dialog.
    """
    print('Configuring FlexEye. Please wait...', flush=True)
    for session in sessions:  # iterate through 15 session keys
        # create iterable list of channels from comma separated string
        channels = sessions[session].split(',')
        if channels[0]:  # if channels list has channels
            for ch in channels:  # run through channel tupple
                if ch[0] in 'D':  # diff channel, eg, D1A
                    channel = ch[1:3]   # eg, 1A
                    FlexDCA.write(':SOURce'+channel+':DIFFerential ON')
                    FlexDCA.query(':DIFF'+channel+':DMODe ON;*OPC?')
                    FlexDCA.write(':DIFF'+channel+':DISPlay ON')
                    FlexDCA.write(':FEYE:CHANnel' + channel +
                                  ':SID SESSion' + str(session))
                    FlexDCA.write(':FEYE:DIFF' + channel + ':ENABled ON')
                else:  # configure channel, eg, 1A
                    FlexDCA.write(':CHANnel' + ch + ':DISPlay ON')
                    FlexDCA.write(':FEYE:CHANnel' + ch + ':SID SESSion' +
                                  str(session))
                    FlexDCA.write(':FEYE:CHANnel' + ch + ':ENABled ON')
    FlexDCA.query(':FEYE:STATe RUN;*OPC?')
    print('Sessions are configured.', flush=True)
    sys.stdout.flush()  # Flush print message in stdout


def run_child_processes(primary_address, sessions):
    """ Starts a FlexEye child process for each channel. Read's child
    processes stdout to determine when graph is displayed. Each child
    processes is ended by user when manually closing graph window.
    """
    print('Starting child processes.')
    sys.stdout.flush()
    child_processes = []  # list of child processes
    command = ['python', 'FlexEye_graph_child.py', '', '']
    for session in sessions:  # iterate through 15 session keys
        channels = sessions[session]  # string of channels
        if channels:  # if valid session with channels
            command[2] = primary_address.replace('hislip0',
                                                 'hislip' + str(session))
            command[3] = channels  # string of comma separated channels
            p = subprocess.Popen(command,
                                 stdout=subprocess.PIPE,
                                 universal_newlines=True)
            child_processes.append(p)
    return child_processes


def wait_for_child_processes(child_processes):
    """ Pauses parent execution waiting for "graph displayed"
    message from all child processes.
    """
    print('Making measurements for graphs. Wait about 60s...')
    delay = 55  # 55 seconds
    for n in range(0, delay):
        time.sleep(1)
        print(' .', end='')  # prints dots across screen
    print('\n')
    for n in range(len(child_processes)):
        while True:
            line = child_processes[n].stdout.readline()
            if line:
                print(line)  # print graph displayed message from child
                sys.stdout.flush()
                break


def time_elapsed(start_time):
    """ Reports time for child processes to complete. """
    process_time = int(time.time() - start_time)
    m, s = divmod(process_time, 60)
    print('A child process takes 1 minute to make measurement.\n' +
          'Time required for 4 parallel child processes:\n' +
          '  '+str(m)+' minute, '+str(s)+' seconds.\n', flush=True)


FlexDCA = open_flexdca_connection(ADDRESS)
FlexDCA.timeout = IOTIMEOUT
initialize_FlexDCA(FlexDCA)
install_simulated_module(FlexDCA, SLOT5)
install_simulated_module(FlexDCA, SLOT6)
all_channels_off(FlexDCA)
configure_FlexEye(FlexDCA, sessions)
start_time = time.time()
child_processes = run_child_processes(ADDRESS, sessions)
wait_for_child_processes(child_processes)
time_elapsed(start_time)
FlexDCA.query(':FEYE:STATe PAUSE;*OPC?')
FlexDCA.query(':FEYE:STATe STOP;*OPC?')
FlexDCA.write(':DISPlay:WINDow:TIME1:DMODe STILed')
FlexDCA.write(':SYSTem:GTLocal')  # Close Visa Connection
FlexDCA.close()
print('Program Finished.')
Copy

FlexEye-graph-child.py

# -*- coding: utf-8 -*-
""" Subprocess launched by FlexEye_graph.py
This script is an example of FlexEye independent acquisitions. It displays
a tkinter graph of the change in Eye Width over time.
If more than one channel is passed as an argument to this child program,
only the first channel will be used. All others are ignored. """

from decimal import Decimal
import sys
import time
import pyvisa as visa  # import VISA library


def monitor_eye_width(channel, minutes, seconds):
    """ Monitors the temperature for required minutes by performing a
    measurement every second. channel is a string, minutes (float) and
    seconds (int).
    """
    width_data = []  # list of Decimals
    start = time.time()
    elapsed_time = 0
    FlexDCA.write(':MEASure:EYE:LIST:CLEar')
    FlexDCA.write(':MEASure:EYE:EWIDth:SOURce ' + channel)
    FlexDCA.write(':MEASure:EYE:EWIDth')
    while elapsed_time < (minutes * 60):
        reading = float(FlexDCA.query(':MEASure:EYE:EWIDth?'))
        width = Decimal(reading / 1E-12)
        width_data.append(width.quantize(Decimal('0.01')))
        elapsed_time = time.time() - start
        time.sleep(seconds)
    return width_data


def create_time_data(number_data_points, minutes, time_sep):
    """
    Creates list of time data and scales it for seconds, minutes, or hours
    units. The units (xunits) is returned. The argument, number_of_data_points,
    is an int. The minutes argument is the total test time (float). The
    time_sep argument is the seconds (int) between data points.
    """
    xunits = ''
    time_data = []  # list of times for each data point
    if 0 < minutes < 5:  # 5 minutes
        time_data = [(i*time_sep) for i in range(0, number_data_points)]
        xunits = 'seconds'
    elif 5 <= minutes < 180:  # 5 to 180 minutes (3 hours)
        time_data = [(i*time_sep / 60) for i in range(0, number_data_points)]
        xunits = 'minutes'
    else:  # >= 3 hours
        time_data = [(i*time_sep / 3600) for i in range(0, number_data_points)]
        xunits = 'hours'
    return time_data, xunits


def draw_graph(y_data, x_data, xunits, channel):
    """
    y_data argument is a list of Decimals of temperature readings.
    x_data argument is corresponding time positions.
    xunits is 'seconds', 'minutes', or 'hours'.
    channel argument is a string. eg, '1A'.
    """
    import tkinter

    from matplotlib.backends.backend_tkagg import (
        FigureCanvasTkAgg, NavigationToolbar2Tk)
    from matplotlib.figure import Figure

    root = tkinter.Tk()
    root.wm_title("Graph of Eye Width")
    fig = Figure(figsize=(8, 8), dpi=72)
    ax = fig.add_subplot(111)
    ax.grid(b=True, which='major', axis='both')
    title = 'Channel ' + channel + ' Eye Width over Time'
    title = title + '\n (Test time: ' + str(round(x_data[-1], 1)) + ' ' + xunits + ')'
    ax.set_title(title)
    ax.set_xlabel(xunits.capitalize())
    ax.set_ylabel('Eye Width (Time in ps)')
    max_y = float(max(y_data)) * 1.1
    min_y = float(min(y_data)) * 0.9
    ax.set_ylim(min_y, max_y)
    if len(y_data) < 20:
        ax.plot(x_data, y_data, 'g.-')  # data points shown
    else:
        ax.plot(x_data, y_data, 'g-')  # data points hidden. line only
    canvas = FigureCanvasTkAgg(fig, master=root)
    canvas.draw()
    toolbar = NavigationToolbar2Tk(canvas, root)
    toolbar.update()
    button = tkinter.Button(master=root, text="Quit", command=root.quit)
    button.pack(side=tkinter.BOTTOM)
    toolbar.pack(side=tkinter.BOTTOM, fill=tkinter.X)
    canvas.get_tk_widget().pack(side=tkinter.TOP, fill=tkinter.BOTH, expand=1)
    print('Channel ' + channel + ' graph is displayed in separate window.')
    sys.stdout.flush()  # Flush print message in stdout
    tkinter.mainloop()


address = sys.argv[1]  # VISA address command-line argument
s = sys.argv[2]  # comma separated string of scope channels
channels = s.split(',')  # convert comma separated string to iterable list
TEST_TIME = 1.0  # minutes
MEAS_SEPARATION = 1  # seconds
rm = visa.ResourceManager()
FlexDCA = rm.open_resource(address)
FlexDCA.timeout = 20000  # Set connection timeout to 20s.
FlexDCA.read_termination = '\n'
FlexDCA.query(':SYSTem:DEFault;*OPC?')
FlexDCA.write(':CHANnel' + channels[0] + ':DISPlay ON')
FlexDCA.query(':SYSTem:AUToscale;*OPC?')
FlexDCA.write(':ACQuire:STOP')
FlexDCA.write(':ACQuire:CDISplay')
# A limit acquisition test ensures that enough waveform data is available
# to start measuring the eye.
FlexDCA.write(':LTESt:ACQuire:CTYPe:WAVeforms 10')
FlexDCA.write(':LTESt:ACQuire:STATe ON')
FlexDCA.query(':ACQuire:RUN;*OPC?')
FlexDCA.write(':LTESt:ACQuire:STATe OFF')
FlexDCA.query(':ACQuire:RUN;*OPC?')
test_time = 1.0  # minutes
time_sep = 1  # seconds
y_data = monitor_eye_width(channels[0], TEST_TIME, MEAS_SEPARATION)
FlexDCA.write(':SYSTem:GTLocal')
FlexDCA.close()  # Close VISA session
x_data, xunits = create_time_data(len(y_data), TEST_TIME, MEAS_SEPARATION)
draw_graph(y_data, x_data, xunits, channels[0])