Source code for pymodbus.datastore.context
from pymodbus.exceptions import ParameterException, NoSuchSlaveException
from pymodbus.interfaces import IModbusSlaveContext
from pymodbus.datastore.store import ModbusSequentialDataBlock
from pymodbus.constants import Defaults
from pymodbus.compat import iteritems, itervalues
#---------------------------------------------------------------------------#
# Logging
#---------------------------------------------------------------------------#
import logging
_logger = logging.getLogger(__name__)
#---------------------------------------------------------------------------#
# Slave Contexts
#---------------------------------------------------------------------------#
[docs]class ModbusSlaveContext(IModbusSlaveContext):
'''
This creates a modbus data model with each data access
stored in its own personal block
'''
[docs] def __init__(self, *args, **kwargs):
''' Initializes the datastores, defaults to fully populated
sequential data blocks if none are passed in.
:param kwargs: Each element is a ModbusDataBlock
'di' - Discrete Inputs initializer
'co' - Coils initializer
'hr' - Holding Register initializer
'ir' - Input Registers iniatializer
'''
self.store = {}
self.store['d'] = kwargs.get('di', ModbusSequentialDataBlock.create())
self.store['c'] = kwargs.get('co', ModbusSequentialDataBlock.create())
self.store['i'] = kwargs.get('ir', ModbusSequentialDataBlock.create())
self.store['h'] = kwargs.get('hr', ModbusSequentialDataBlock.create())
self.zero_mode = kwargs.get('zero_mode', Defaults.ZeroMode)
[docs] def __str__(self):
''' Returns a string representation of the context
:returns: A string representation of the context
'''
return "Modbus Slave Context"
[docs] def reset(self):
''' Resets all the datastores to their default values '''
for datastore in itervalues(self.store):
datastore.reset()
[docs] def validate(self, fx, address, count=1):
''' Validates the request to make sure it is in range
:param fx: The function we are working with
:param address: The starting address
:param count: The number of values to test
:returns: True if the request in within range, False otherwise
'''
if not self.zero_mode: address = address + 1
_logger.debug("validate[%d] %d:%d" % (fx, address, count))
return self.store[self.decode(fx)].validate(address, count)
[docs] def getValues(self, fx, address, count=1):
''' Validates the request to make sure it is in range
:param fx: The function we are working with
:param address: The starting address
:param count: The number of values to retrieve
:returns: The requested values from a:a+c
'''
if not self.zero_mode: address = address + 1
_logger.debug("getValues[%d] %d:%d" % (fx, address, count))
return self.store[self.decode(fx)].getValues(address, count)
[docs] def setValues(self, fx, address, values):
''' Sets the datastore with the supplied values
:param fx: The function we are working with
:param address: The starting address
:param values: The new values to be set
'''
if not self.zero_mode: address = address + 1
_logger.debug("setValues[%d] %d:%d" % (fx, address, len(values)))
self.store[self.decode(fx)].setValues(address, values)
[docs]class ModbusServerContext(object):
''' This represents a master collection of slave contexts.
If single is set to true, it will be treated as a single
context so every unit-id returns the same context. If single
is set to false, it will be interpreted as a collection of
slave contexts.
'''
[docs] def __init__(self, slaves=None, single=True):
''' Initializes a new instance of a modbus server context.
:param slaves: A dictionary of client contexts
:param single: Set to true to treat this as a single context
'''
self.single = single
self.__slaves = slaves or {}
if self.single:
self.__slaves = {Defaults.UnitId: self.__slaves}
[docs] def __iter__(self):
''' Iterater over the current collection of slave
contexts.
:returns: An iterator over the slave contexts
'''
return iteritems(self.__slaves)
[docs] def __contains__(self, slave):
''' Check if the given slave is in this list
:param slave: slave The slave to check for existance
:returns: True if the slave exists, False otherwise
'''
return slave in self.__slaves
[docs] def __setitem__(self, slave, context):
''' Used to set a new slave context
:param slave: The slave context to set
:param context: The new context to set for this slave
'''
if self.single: slave = Defaults.UnitId
if 0xf7 >= slave >= 0x00:
self.__slaves[slave] = context
else:
raise NoSuchSlaveException('slave index :{} out of range'.format(slave))
[docs] def __delitem__(self, slave):
''' Wrapper used to access the slave context
:param slave: The slave context to remove
'''
if not self.single and (0xf7 >= slave >= 0x00):
del self.__slaves[slave]
else:
raise NoSuchSlaveException('slave index: {} out of range'.format(slave))
[docs] def __getitem__(self, slave):
''' Used to get access to a slave context
:param slave: The slave context to get
:returns: The requested slave context
'''
if self.single: slave = Defaults.UnitId
if slave in self.__slaves:
return self.__slaves.get(slave)
else:
raise NoSuchSlaveException("slave - {} does not exist, or is out of range".format(slave))