Example of Instrument Driver
This is an example of a instrument driver written in Python. The driver connects to a Keysight 8163/4/6-series mainframe that has an 81634A optical power meter module installed. The driver connects to the instrument using the LAN port with a VISA address. The VISA address must be passed to the driver by entering the address
using the Instrument Connection Setup dialog..
For example, 'TCPIP0::MYINST::inst2::INSTR'
.
MIT License. Copyright(c) 2023 Keysight Technologies. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import sys
import json
import pyvisa
DONE = 'DONE'
InputNames = ['In']
Model = 'UNKNOWN'
Serial = 'UNKNOWN'
# FlexOTO is listening to the standard output
def send_to_FlexOTO(message: str):
print(message)
# Check the error queue
def check_error(inst: pyvisa.Resource) -> tuple[int, str]:
error = inst.query('SYST:ERR?').strip()
sections = error.split(',', 1)
return (int(sections[0]), sections[1].strip('"'))
# Open connection to instrument
def connect(visaAddress: str) -> pyvisa.Resource:
try:
rm = pyvisa.ResourceManager()
return rm.open_resource(visaAddress)
except:
send_to_FlexOTO(f'Failed to connect to "{visaAddress}"')
return None
# Check if this is a supported instrument.
def validate(inst: pyvisa.Resource) -> bool:
if inst is None: return False
# Make sure we are connected to a supported Lightwave Mainframe.
mainframeIdn = inst.query('*IDN?')
sections = mainframeIdn.split(',')
if len(sections) < 3:
send_to_FlexOTO('Please connect to a 816x mainframe with a power meter in Slot 1.')
return False
mfManufacturer = sections[0].upper().strip()
if not (mfManufacturer.startswith('KEYSIGHT') or
mfManufacturer.startswith('AGILENT') or
mfManufacturer.startswith('HEWLETT') or
mfManufacturer.startswith('HP')):
send_to_FlexOTO('Please connect to a 816x mainframe with a power meter in Slot 1.')
return False
mfModel = sections[1].strip()
if not mfModel.startswith('816'):
send_to_FlexOTO('Please connect to a 816x mainframe with a power meter in Slot 1.')
return False
# Found an 816x mainframe (e.g. 8163B)
# Make sure a supported power meter is in Slot 1.
moduleIdn = inst.query('SLOT1:IDN?')
sections = moduleIdn.split(',')
if len(sections) < 3:
send_to_FlexOTO('Please install the power meter module in Slot 1')
return False
manufacturer = sections[0].upper().strip()
if manufacturer.startswith('KEYSIGHT'):
manufacturer = 'Keysight'
elif manufacturer.startswith('AGILENT'):
manufacturer = 'Agilent'
elif manufacturer.startswith('HEWLETT') or manufacturer.startswith('HP'):
manufacturer = 'HP'
else:
send_to_FlexOTO('Unsupported manufacturer: ' + manufacturer)
return False
model = sections[1].strip()
if not model.startswith('8163'):
send_to_FlexOTO('Unrecognized power meter model: ' + model)
return False
# Found an 8163x power meter (e.g. 81634A)
# Save the model and serial numbers for later.
global Model, Serial
Model = manufacturer + " " + model
Serial = sections[2].strip()[-5:] # Get last 5 of serial number
return True
# Do the initial setup of the instrument
def initialize(inst: pyvisa.Resource):
# Set timeout to 10 sec. This should work for all commands except zeroing.
inst.timeout = 10000
# Make sure that the reference is not used.
inst.write('SENS1:CHAN1:POW:REF:STATE 0')
# Turn auto range on.
inst.write('SENS1:CHAN1:POW:RANGE:AUTO 1')
# Change the power unit to Watt.
inst.write('SENS1:CHAN1:POW:UNIT W')
# Set the averaging time for measuring to 0.5s.
inst.write('SENS1:CHAN1:POW:ATIME 0.5')
# Turn continuous measuring off.
inst.write('INIT1:CHAN1:CONT 0')
# Blocks until a command comes in from FlexOTO,
# and then extracts the command and arguments strings.
def wait_for_input() -> tuple[str, list[str]]:
# Commands come from the standard input.
rawInput = input()
sections = rawInput.split('"')
items = []
for s in sections:
s = s.strip()
if s: items.append(s)
command = items.pop(0)
args = items
return (command, args)
# Sends the JSON description of the instrument to FlexOTO.
def get_description():
# Use the model and serial numbers determined earlier
desc = json.dumps({ 'ModelNumber': Model,
'SerialNumber': Serial,
'Inputs': InputNames,
'MeasurementTimeoutSeconds': 10 })
send_to_FlexOTO(desc)
send_to_FlexOTO(DONE)
# Measure the active inputs and send the results to FlexOTO.
def measure(inst: pyvisa.Resource, activeInputs: list[str]):
# Clear error queue
inst.write('*CLS')
measList = []
for inputName in activeInputs:
inputNum = InputNames.index(inputName) + 1
# Make an average power measurement on this channel.
avgPower = float(inst.query('READ1:CHAN{0}:POW?'.format(inputNum)))
# Start a dictionary to describe a measurement of Average Power on this input.
measurement = { 'Name': 'Average Power', 'Input': inputName }
(errorCode, errorMsg) = check_error(inst)
if errorCode == 0:
measurement['Result'] = avgPower
measurement['FormattedResult'] = '{0:.2f} \u03BCW'.format(avgPower * 1e6) # Format in uW
else:
# Report the error
measurement['Result'] = float("NaN") # NaN ("not a number") indicates an invalid result
measurement['FormattedResult'] = errorMsg
# Add the result to the measurements list.
measList.append(measurement)
measJson = json.dumps({'Measurements': measList}, allow_nan=True)
send_to_FlexOTO(measJson)
send_to_FlexOTO(DONE)
# Program begins here
if len(sys.argv) < 2:
send_to_FlexOTO("Please provide the instrument's VISA address (TCPIP0::HOSTNAME::inst0::INSTR) in the Command Line Arguments.")
else:
# Get the instrument's VISA address from the command line args.
visaAddress = sys.argv[1]
inst = connect(visaAddress)
# Check if this is a valid instrument.
if validate(inst):
# Do initial configuration of instrument setup.
initialize(inst)
# Connection and initial setup is done.
send_to_FlexOTO(DONE)
########## Main loop ##########
# Loop until FlexOTO sends us 'exit'.
exit = False
while not exit:
(command, args) = wait_for_input()
if command == 'exit':
exit = True
elif command == 'get_description':
get_description()
elif command == 'measure':
measure(inst, args)
# Exiting...