FlexPLL with 81160A

This script can control DCA-X as the JTF receiver. DCA-M modules connected to the PC cannot be seen.

Example Script

Copy

FlexPLL-with-81160A.py

""" Script and FlexPLL on a remote PC controls JFT Receiver (DCA-X and 86108A)
and JTF Source (86160A).
"""

import pyvisa as visa  # import VISA library
import time

ADDRESS = 'TCPIP0::localhost::hislip1,4880::INSTR'  # FlexPLL on PC
JTF_SOURCE_HOST = '"141.121.83.53"'
JTF_RECEIVER_HOST = '"K-N1000A-00003"'


def open_flexpll_connection(address):
    """ Opens visa connection to FlexPLL. """
    print('Connecting to FlexPLL ...')
    try:
        rm = visa.ResourceManager()
        connection = rm.open_resource(address)
        connection.timeout = 60000  # Set connection timeout to 60s
        connection.read_termination = '\n'
        connection.write_termination = '\n'
        inst_id = connection.query('*IDN?')
        print('\nFlexPLL connection established to:\n' + inst_id, flush=True)
    except (visa.VisaIOError, visa.InvalidSession):
        print('\nVISA ERROR: Cannot open instrument address.\n', flush=True)
        return None
    except Exception as other:
        print('\nVISA ERROR: Cannot connect to instrument:', other, flush=True)
        print('\n')
        return None
    return connection


def connect_jtf_source(flexpll):
    """  Connect FlexPLL on PC to remote 81160A """
    global JTF_SOURCE_HOST
    flexpll.write(':RSOurce:CONNect:METhod LAN')
    flexpll.write(':RSOurce:CONNect:LSERver SOCKets')
    flexpll.write(':RSOurce:CONNect:SOCKets:PORT 5025')
    flexpll.write(':RSOurce:CONNect:HOST ' + JTF_SOURCE_HOST)
    flexpll.query(':RSOurce:CONNect;*OPC?')
    if 'CONN' in flexpll.query(':RSOurce:CONN:STATe?'):
        s = flexpll.query(':RSOurce:CONN:HOST?')
        print('\nConnected to JTF Source: ' + s)
        return True
    else:
        print('\nJTF Source not connected!')
        return False


def connect_jtf_receiver(flexpll):  # HiSLIP
    """  Connect FlexPLL on PC to remote DCA-X installed 86108A  """
    global JTF_RECEIVER_HOST
    flexpll.write(':RDCA:CONNect:METhod LAN')
    flexpll.write(':RDCA:CONNect:LSERver HLAN')  # HiSLIP
    flexpll.write(':RDCA:CONNect:HLAN:INDEX 0')
    flexpll.write(':RDCA:CONNect:HLAN:PORT 4880')
    flexpll.write(':RDCA:CONNect:HOST ' + JTF_RECEIVER_HOST)
    flexpll.query(':RDCA:CONNect;*OPC?')
    if 'CONN' in flexpll.query(':RDCA:CONN:STATe?'):
        s = flexpll.query(':RDCA:CONN:HOST?')
        print('\nConnected to JTF Receiver: ' + s)
        return True
    else:
        print('\nJTF Receiver not connected!')
        return False


def setup_jtf_source(flexpll):
    """ """
    flexpll.write(':JSOurce:CHANnel CHANnel1')
    flexpll.write(':JSOurce:SRATe 125E+6')  # in Hz
    flexpll.write(':JSOurce:AMPLitude 500E-3')
    flexpll.write(':JSOurce:OFFSet 0.0')
    flexpll.write(':JSOurce:JPPeak 50E-12')
    flexpll.write(':JSOurce:OENable OFF')


def setup_jtf_receiver(flexpll):
    """ Setting During Calibration. DUT not in test setup. """
    flexpll.write(':CRECovery:SOURce ELECtrical')
    flexpll.write(':CRECovery:SRATe 250E+6')  # in Bd
    flexpll.write(':CRECovery:CLBandwidth 30E+3')
    flexpll.write(':CRECovery:LSELect:AUTomatic ON')
    flexpll.write(':JSOurce:OENable OFF')


def change_receiver_for_DUT(flexpll):
    """ Setting During Measurements. DUT in test setup. """
    flexpll.write(':CRECovery:SRATe 1E+9')  # in Bd


def configure_acquisition(flexpll):
    """  """
    flexpll.write(':DISPlay:GRAPh:X:STARt 15.0E+3')
    flexpll.write(':DISPlay:GRAPh:X:STOP 50.0E+6')
    flexpll.write(':ACQuire:FREQuency:PPDecade 15')
    flexpll.write(':ACQuire:AVERaging ON')
    flexpll.write(':ACQuire:AVERaging:COUNt 4')


def module_cal(flexpll):
    """  """
    if flexpll.query(':CAL:MOD:STATus?') == 'CALIBRATED':
        print('\nModule calibration is valid.')
        return True
    flexpll.write(':CAL:MOD:ENABled ENABled')
    print('\nModule Calibration')
    print('Disconnect all inputs from Receiver.')
    input('Press any key to continue...')
    print('Module cal is running. Please wait...')
    flexpll.write(':CAL:MOD:STARt')
    flexpll.query(':CALibrate:SDONe?')  # mod cal complete?
    flexpll.query(':CALibrate:CONTinue;*OPC?')
    if flexpll.query(':CAL:MOD:STATus?') == 'UNCALIBRATED':
        print('\nModule calibration failed')
        flexpll.write(':CALibrate:CANCel')
        return False
    else:
        print('\nModule calibration is valid.')
        flexpll.write(':CALibrate:CONTinue')
        return True


def response_cal(flexpll):
    """  """
    if flexpll.query(':CAL:RESP:CDATa:STATus?') == 'CORR':
        print('Response calibration is valid.')
        return True
    flexpll.write(':JSOurce:OENable ON')
    print('Connect the Source directly to the Receiver input.')
    input('Press any key to continue...')
    print('\nResponse calibration is running...')
    flexpll.write(':CAL:RESPonse:STARt')
    flexpll.query(':CAL:RESPonse:SDONe?')  # dismiss cal instruction dialog
    flexpll.write(':CAL:RESPonse:CONTinue')
    flexpll.query(':CAL:RESPonse:SDONe?;*OPC?')  # wait for cal to complete
    flexpll.write(':CAL:RESP:CONT;*OPC?')  # Dismiss cal completed dialog
    if flexpll.query(':CAL:RESPonse:CDATa:STATus?') == 'CORR':
        flexpll.write(':CAL:RESPonse:ENABle ON')
        print('\nResponse Calibration completed.')
        print('\nConnect your DUT to start making JTF measurements.')
        input('Press any key to continue...')
        return True
    else:
        print('Response calibration failed')
        return False


def return_scalar(flexpll, query):
    class ScalarMeasQuestionableException(Exception):
        """ A scalar measurement result is questionable. """
        pass

    class ScalarMeasTimeOutException(Exception):
        """ A scalar measurement has timed out. """
        pass

    timeout = flexpll.timeout / 1000  # convert ms to s
    start_time = time.time()  # time in seconds
    while(True):
        time.sleep(0.2)
        status = flexpll.query(query[:-1] + ':STATus?')
        if 'CORR' in status:  # valid measurement result
            return flexpll.query(query)
        elif 'QUES' in status:
            s = (query[:-1]
                 + ':REASon: '
                 + flexpll.query(query[:-1]
                 + ':STATus:REASon?'))
            raise ScalarMeasQuestionableException(s)
        elif (int(time.time() - start_time)) > timeout:
            s = (query[:-1]
                 + ':REASon: '
                 + flexpll.query(query[:-1]
                 + ':STATus:REASon?'))
            raise ScalarMeasTimeOutException(s)


def make_response_measurement(flexpll):
    """ Clears display and makes one measurement sweep.
    Returns the BW of response. """
    flexpll.query(':ACQuire:STOP;CDISplay;SINGle;*OPC?')
    BW = return_scalar(flexpll, ':MEAS:RESP:BANDwidth?').strip('"')
    print('\nResponse Bandwidth: ' + BW + ' Hz')


FlexPLL = open_flexpll_connection(ADDRESS)
FlexPLL.query(':SYSTem:DEFault;*OPC?')
if connect_jtf_source(FlexPLL) and connect_jtf_receiver(FlexPLL):
    setup_jtf_source(FlexPLL)
    setup_jtf_receiver(FlexPLL)
    configure_acquisition(FlexPLL)
    FlexPLL.timeout = 600000  # 10 minutes in ms
    if module_cal(FlexPLL) and response_cal(FlexPLL):
        change_receiver_for_DUT(FlexPLL)
        print("Locking receiver's clock recovery. Please wait...")
        FlexPLL.query(':CRECovery:RELock;*OPC?')
        make_response_measurement(FlexPLL)
else:
    print('\nConnection to Source or Receiver failed.')
FlexPLL.write(':SYSTem:GTLocal')
FlexPLL.close()