Source code for modpypes.client

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Client
======

This executable module is a console application for generating
read and write MODBUS PDUs.
"""

import math

from bacpypes.debugging import class_debugging, ModuleLogger
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.consolelogging import ArgumentParser

from bacpypes.comm import Client, bind
from bacpypes.core import run

from pdu import ExceptionResponse, \
    ReadCoilsRequest, ReadCoilsResponse, \
    ReadDiscreteInputsRequest, ReadDiscreteInputsResponse, \
    ReadMultipleRegistersRequest, ReadMultipleRegistersResponse, \
    ModbusStruct
from app import ModbusClient, ModbusException

# some debugging
_debug = 0
_log = ModuleLogger(globals())


#
#   ConsoleClient
#


@class_debugging
[docs]class ConsoleClient(ConsoleCmd, Client): """ Console Client """ def __init__(self): if _debug: ConsoleClient._debug("__init__") ConsoleCmd.__init__(self) # no current request self.req = None
[docs] def do_read(self, args): """read <addr> <unitID> <register> [ <count> ] :param addr: IP address of the MODBUS/TCP device or gateway :param unitID: unit identifier :param register: register in 5-digit or 6-digit format :param count: number of registers to read, defaults to one This command generates a :class:`ReadCoilsRequest`, :class:`ReadDiscreteInputsRequest` or :class:`ReadMultipleRegistersRequest` depending on the address prefix; 1, 5, or 6. """ args = args.split() if _debug: ConsoleClient._debug("do_read %r", args) if (len(args) < 3): print "address, unit and register required" return # get the address and unit addr, unitID, register = args[:3] unitID = int(unitID) if _debug: ConsoleClient._debug(" - addr, unitID: %r, %r", addr, unitID) # get the register and count register = int(register) if len(args) == 4: rcount = int(args[3]) else: rcount = 1 if _debug: ConsoleClient._debug(" - register, rcount: %r, %r", register, rcount) # decode the register into a type digits = int(math.log10(register)) + 1 if digits < 4: # must be a coil registerType = 0 elif digits == 5: registerType = register // 10000 register = register % 10000 elif digits == 6: registerType = register // 100000 register = register % 100000 else: print "5 or 6 digit addresses please" return if _debug: ConsoleClient._debug(" - registerType, register: %r, %r", registerType, register) # build a request if registerType == 0: # coil req = ReadCoilsRequest(register - 1, rcount) elif registerType == 1: # input discrete req = ReadDiscreteInputsRequest(register - 1, 1) elif registerType == 4: # holding register req = ReadMultipleRegistersRequest(register - 1, rcount) else: print "unsupported register type" return # set the destination req.pduDestination = (addr, 502) req.mpduUnitID = unitID if _debug: ConsoleClient._debug(" - req: %r", req) # save the request self.req = req # send it along self.request(req)
[docs] def confirmation(self, pdu): """Prints out the contents of the response from the device. """ if _debug: ConsoleClient._debug("confirmation %r", pdu) if isinstance(pdu, ExceptionResponse): print ModbusException(pdu.exceptionCode) elif isinstance(pdu, ReadCoilsResponse): print " ::=", pdu.bits elif isinstance(pdu, ReadDiscreteInputsResponse): print " ::=", pdu.bits elif isinstance(pdu, ReadMultipleRegistersResponse): print " ::=", pdu.registers for dtype, codec in ModbusStruct.items(): try: value = codec.unpack(pdu.registers) print " ", dtype, "::=", value except Exception as err: if _debug: ConsoleClient._debug("unpack exception %r: %r", codec, err) else: raise TypeError("unsupported response") # # main #
def main(): try: # parse the command line arguments parser = ArgumentParser(description=__doc__) # now parse the arguments args = parser.parse_args() if _debug: _log.debug("initialization") if _debug: _log.debug(" - args: %r", args) # local IO functions bind(ConsoleClient(), ModbusClient()) _log.debug("running") run() except Exception, e: _log.exception("an error has occurred: %s", e) finally: _log.debug("finally") if __name__ == "__main__": main()