#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Application
===========
"""
from bacpypes.debugging import class_debugging, ModuleLogger
from bacpypes.comm import PDU, Client, Server, ServiceAccessPoint, bind
from bacpypes.tcp import TCPClientDirector, TCPServerDirector, StreamToPacket
from pdu import MPDU, request_types, response_types, ExceptionResponse
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# ModbusException
#
[docs]class ModbusException(RuntimeError):
"""Helper class for exceptions."""
_exceptionText = {
ExceptionResponse.ILLEGAL_FUNCTION: "illegal function",
ExceptionResponse.ILLEGAL_DATA_ADDRESS: "illegal data address",
ExceptionResponse.ILLEGAL_DATA_VALUE: "illegal data value",
ExceptionResponse.ILLEGAL_RESPONSE_LENGTH: "illegal response length",
ExceptionResponse.ACKNOWLEDGE: "acknowledge",
ExceptionResponse.SLAVE_DEVICE_BUSY: "slave device busy",
ExceptionResponse.NEGATIVE_ACKNOWLEDGE: "negative acknowledge",
ExceptionResponse.MEMORY_PARITY_ERROR: "memory parity error",
ExceptionResponse.GATEWAY_PATH_UNAVAILABLE:
"gateway path unavailable",
ExceptionResponse.GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND:
"gateway target device failed to respond",
}
def __init__(self, errCode, *args):
self.errCode = errCode
text = ModbusException._exceptionText.get(
errCode,
"unknown exception %d" % errCode,
)
self.args = (text,) + args
#
# stream_to_packet
#
[docs]def stream_to_packet(data):
"""
Chop a stream of data into MODBUS packets.
:param data: stream of data
:returns: a tuple of the data that is a packet with the remaining
data, or ``None``
"""
if len(data) < 6:
return None
pktlen = (ord(data[4]) << 8) + ord(data[5]) + 6
if (len(data) < pktlen):
return None
return (data[:pktlen], data[pktlen:])
#
# ModbusServiceAccessPoint
#
@class_debugging
[docs]class ModbusServiceAccessPoint(Client, Server, ServiceAccessPoint):
"""
A service access point so applications can be both a client and
a server.
"""
def __init__(self, port=502, sapID=None):
if _debug: ModbusServiceAccessPoint._debug("__init__ port=%r sapID=%r", port, sapID)
Client.__init__(self)
Server.__init__(self)
ServiceAccessPoint.__init__(self, sapID)
# create and bind the client side
self.clientDirector = TCPClientDirector()
bind(self, StreamToPacket(stream_to_packet), self.clientDirector)
# create and bind the server side
self.serverDirector = TCPServerDirector(port)
bind(self.serverDirector, StreamToPacket(stream_to_packet), self)
[docs] def indication(self, pdu):
"""Got some data from a client."""
if _debug: ModbusServiceAccessPoint._debug("indication %r", pdu)
mpdu = MPDU()
mpdu.decode(pdu)
if _debug: ModbusServiceAccessPoint._debug(" - mpdu: %r", mpdu)
# we don't know anything but MODBUS
if (mpdu.mpduProtocolID != 0):
return
# clients shouldn't be sending exceptions
if (mpdu.mpduFunctionCode >= 128):
return
# map the function code
klass = request_types.get(mpdu.mpduFunctionCode, None)
if not klass:
# create an error for now
resp = ExceptionResponse(
mpdu.mpduFunctionCode,
ExceptionResponse.ILLEGAL_FUNCTION,
)
# match the transaction information
resp.pduDestination = mpdu.pduSource
resp.mpduTransactionID = mpdu.mpduTransactionID
# return the response to the device
self.response(pdu)
req = klass()
req.decode(mpdu)
if _debug: ModbusServiceAccessPoint._debug(" - req: %r", req)
# pass it along to the service element
self.sap_request(req)
[docs] def confirmation(self, pdu):
"""Got some data from the server."""
if _debug: ModbusServiceAccessPoint._debug("confirmation %r", pdu)
mpdu = MPDU()
mpdu.decode(pdu)
if _debug: ModbusServiceAccessPoint._debug(" - mpdu: %r", mpdu)
# we don't know anything but MODBUS
if (mpdu.mpduProtocolID != 0):
return
# may be sending a problem
if (mpdu.mpduFunctionCode >= 128):
klass = ExceptionResponse
else:
klass = response_types.get(mpdu.mpduFunctionCode, None)
if not klass:
return
resp = klass()
resp.decode(mpdu)
if _debug: ModbusServiceAccessPoint._debug(" - resp: %r", resp)
# pass it along to the service element
self.sap_response(resp)
[docs] def sap_indication(self, req):
"""Got a request from the ASE."""
if _debug: ModbusServiceAccessPoint._debug("sap_indication %r", req)
mpdu = MPDU()
req.encode(mpdu)
if _debug: ModbusServiceAccessPoint._debug(" - mpdu: %r", mpdu)
pdu = PDU()
mpdu.encode(pdu)
if _debug: ModbusServiceAccessPoint._debug(" - pdu: %r", pdu)
# pass it along to the device
self.request(pdu)
[docs] def sap_confirmation(self, resp):
"""Got a response from the ASE."""
if _debug: ModbusServiceAccessPoint._debug("sap_confirmation %r", resp)
mpdu = MPDU()
resp.encode(mpdu)
if _debug: ModbusServiceAccessPoint._debug(" - mpdu: %r", mpdu)
pdu = PDU()
mpdu.encode(pdu)
if _debug: ModbusServiceAccessPoint._debug(" - pdu: %r", pdu)
# return the response to the device
self.response(pdu)
#
# ModbusClient
#
@class_debugging
[docs]class ModbusClient(Client, Server):
"""This class simplifies building MODBUS client applications. All of
the PDUs are MODBUS.
"""
def __init__(self, **kwargs):
"""Initialize a MODBUS client."""
if _debug: ModbusClient._debug("__init__ %r", kwargs)
Client.__init__(self)
Server.__init__(self)
# create and bind the client side
self.clientDirector = TCPClientDirector(**kwargs)
bind(self, StreamToPacket(stream_to_packet), self.clientDirector)
[docs] def indication(self, req):
"""Got a request from the application."""
if _debug: ModbusClient._debug("indication %r", req)
# encode it as a generic MPDU
mpdu = MPDU()
req.encode(mpdu)
if _debug: ModbusClient._debug(" - mpdu: %r", mpdu)
# encode it as a PDU
pdu = PDU()
mpdu.encode(pdu)
if _debug: ModbusClient._debug(" - pdu: %r", pdu)
# pass it along to the device
self.request(pdu)
[docs] def confirmation(self, pdu):
"""Got a response from the server."""
if _debug: ModbusClient._debug("confirmation %r", pdu)
# generic decode
mpdu = MPDU()
mpdu.decode(pdu)
if _debug: ModbusClient._debug(" - mpdu: %r", mpdu)
# we don't know anything but MODBUS
if (mpdu.mpduProtocolID != 0):
return
# may be sending a problem
if (mpdu.mpduFunctionCode >= 128):
klass = ExceptionResponse
else:
klass = response_types.get(mpdu.mpduFunctionCode, None)
if not klass:
return
resp = klass()
resp.decode(mpdu)
if _debug: ModbusClient._debug(" - resp: %r", resp)
# pass it along to the application
self.response(resp)
#
# ModbusServer
#
@class_debugging
class ModbusServer(Client, Server):
def __init__(self, port=502, **kwargs):
if _debug: ModbusServer._debug("__init__ port=%r %r", port, kwargs)
Client.__init__(self)
Server.__init__(self)
# create and bind
self.serverDirector = TCPServerDirector(('', port), **kwargs)
bind(self, StreamToPacket(stream_to_packet), self.serverDirector)
def confirmation(self, pdu):
"""This is a request from a client."""
if _debug: ModbusServer._debug("confirmation %r", pdu)
# generic decoding
mpdu = MPDU()
mpdu.decode(pdu)
if _debug: ModbusServer._debug(" - mpdu: %r", mpdu)
# we don't know anything but MODBUS
if (mpdu.mpduProtocolID != 0):
return
# clients shouldn't be sending exceptions
if (mpdu.mpduFunctionCode >= 128):
return
# map the function code
klass = request_types.get(mpdu.mpduFunctionCode, None)
if not klass:
# create an error for now
resp = ExceptionResponse(
mpdu.mpduFunctionCode,
ExceptionResponse.ILLEGAL_FUNCTION,
)
# match the transaction information
resp.pduDestination = mpdu.pduSource
resp.mpduTransactionID = mpdu.mpduTransactionID
if _debug: ModbusServer._debug(" - resp: %r", resp)
# return the response to the device
self.request(resp)
req = klass()
req.decode(mpdu)
if _debug: ModbusServer._debug(" - req: %r", req)
# pass it along to the application
self.response(req)
def indication(self, resp):
"""This is a response from the application."""
if _debug: ModbusServer._debug("indication %r", resp)
# encode as a generic MPDU
mpdu = MPDU()
resp.encode(mpdu)
if _debug: ModbusServer._debug(" - mpdu: %r", mpdu)
# encode as a generic PDU
pdu = PDU()
mpdu.encode(pdu)
if _debug: ModbusServer._debug(" - pdu: %r", pdu)
# return the response to the device
self.request(pdu)