Source code for pymodbus.mei_message

'''
Encapsulated Interface (MEI) Transport Messages
-----------------------------------------------

'''
import struct
from pymodbus.constants import DeviceInformation, MoreData
from pymodbus.pdu import ModbusRequest
from pymodbus.pdu import ModbusResponse
from pymodbus.device import ModbusControlBlock
from pymodbus.device import DeviceInformationFactory
from pymodbus.pdu import ModbusExceptions as merror
from pymodbus.compat import iteritems, byte2int, IS_PYTHON3

_MCB = ModbusControlBlock()


#---------------------------------------------------------------------------#
# Read Device Information
#---------------------------------------------------------------------------#
[docs]class ReadDeviceInformationRequest(ModbusRequest): ''' This function code allows reading the identification and additional information relative to the physical and functional description of a remote device, only. The Read Device Identification interface is modeled as an address space composed of a set of addressable data elements. The data elements are called objects and an object Id identifies them. ''' function_code = 0x2b sub_function_code = 0x0e _rtu_frame_size = 7
[docs] def __init__(self, read_code=None, object_id=0x00, **kwargs): ''' Initializes a new instance :param read_code: The device information read code :param object_id: The object to read from ''' ModbusRequest.__init__(self, **kwargs) self.read_code = read_code or DeviceInformation.Basic self.object_id = object_id
[docs] def encode(self): ''' Encodes the request packet :returns: The byte encoded packet ''' packet = struct.pack('>BBB', self.sub_function_code, self.read_code, self.object_id) return packet
[docs] def decode(self, data): ''' Decodes data part of the message. :param data: The incoming data ''' params = struct.unpack('>BBB', data) self.sub_function_code, self.read_code, self.object_id = params
[docs] def execute(self, context): ''' Run a read exeception status request against the store :param context: The datastore to request from :returns: The populated response ''' if not (0x00 <= self.object_id <= 0xff): return self.doException(merror.IllegalValue) if not (0x00 <= self.read_code <= 0x04): return self.doException(merror.IllegalValue) information = DeviceInformationFactory.get(_MCB, self.read_code, self.object_id) return ReadDeviceInformationResponse(self.read_code, information)
[docs] def __str__(self): ''' Builds a representation of the request :returns: The string representation of the request ''' params = (self.read_code, self.object_id) return "ReadDeviceInformationRequest(%d,%d)" % params
[docs]class ReadDeviceInformationResponse(ModbusResponse): ''' ''' function_code = 0x2b sub_function_code = 0x0e
[docs] @classmethod def calculateRtuFrameSize(cls, buffer): ''' Calculates the size of the message :param buffer: A buffer containing the data that have been received. :returns: The number of bytes in the response. ''' size = 8 # skip the header information count = byte2int(buffer[7]) while count > 0: _, object_length = struct.unpack('>BB', buffer[size:size+2]) size += object_length + 2 count -= 1 return size + 2
[docs] def __init__(self, read_code=None, information=None, **kwargs): ''' Initializes a new instance :param read_code: The device information read code :param information: The requested information request ''' ModbusResponse.__init__(self, **kwargs) self.read_code = read_code or DeviceInformation.Basic self.information = information or {} self.number_of_objects = len(self.information) self.conformity = 0x83 # I support everything right now # TODO calculate self.next_object_id = 0x00 # self.information[-1](0) self.more_follows = MoreData.Nothing
[docs] def encode(self): ''' Encodes the response :returns: The byte encoded message ''' packet = struct.pack('>BBBBBB', self.sub_function_code, self.read_code, self.conformity, self.more_follows, self.next_object_id, self.number_of_objects) for (object_id, data) in iteritems(self.information): packet += struct.pack('>BB', object_id, len(data)) if IS_PYTHON3: if isinstance(data, bytes): packet += data else: packet += data.encode() else: packet += data.encode() return packet
[docs] def decode(self, data): ''' Decodes a the response :param data: The packet data to decode ''' params = struct.unpack('>BBBBBB', data[0:6]) self.sub_function_code, self.read_code = params[0:2] self.conformity, self.more_follows = params[2:4] self.next_object_id, self.number_of_objects = params[4:6] self.information, count = {}, 6 # skip the header information while count < len(data): object_id, object_length = struct.unpack('>BB', data[count:count+2]) count += object_length + 2 self.information[object_id] = data[count-object_length:count]
[docs] def __str__(self): ''' Builds a representation of the response :returns: The string representation of the response ''' return "ReadDeviceInformationResponse(%d)" % self.read_code
#---------------------------------------------------------------------------# # Exported symbols #---------------------------------------------------------------------------# __all__ = [ "ReadDeviceInformationRequest", "ReadDeviceInformationResponse", ]