diff --git a/examples/server_datamodel.py b/examples/server_datamodel.py index 20eea1002..84dc10f56 100755 --- a/examples/server_datamodel.py +++ b/examples/server_datamodel.py @@ -55,11 +55,11 @@ def define_datamodel(): 5, [block_def, block5], ) == SimDevice( - id=5, type_check=False, registers=[block_def, block5] + id=5, type_check=False, simdata=[block_def, block5] ) # SimDevice can define either a shared or a non-shared register model - SimDevice(id=1, type_check=False, registers=[block_def, block5]) + SimDevice(id=1, type_check=False, simdata=[block_def, block5]) #SimDevice(2, False, # block_coil=[block1], # block_discrete=[block1], diff --git a/pymodbus/__init__.py b/pymodbus/__init__.py index be1c2e6f3..e4327abdc 100644 --- a/pymodbus/__init__.py +++ b/pymodbus/__init__.py @@ -13,11 +13,11 @@ "pymodbus_apply_logging_config" ] -from pymodbus.exceptions import ModbusException -from pymodbus.framer import FramerType -from pymodbus.logging import pymodbus_apply_logging_config -from pymodbus.pdu import ExceptionResponse -from pymodbus.pdu.device import ModbusDeviceIdentification +from .exceptions import ModbusException +from .framer import FramerType +from .logging import pymodbus_apply_logging_config +from .pdu import ExceptionResponse +from .pdu.device import ModbusDeviceIdentification __version__ = "4.0.0dev7" diff --git a/pymodbus/constants.py b/pymodbus/constants.py index 9635e9dac..6f03bcc1e 100644 --- a/pymodbus/constants.py +++ b/pymodbus/constants.py @@ -120,6 +120,16 @@ class MoreData(enum.IntEnum): NOTHING = 0x00 KEEP_READING = 0xFF +class RuntimeFlags(enum.IntEnum): + """Represents flags used by server runtime.""" + + REG_SIZE_1 = 1 # datatypes with 1 register e.g. INT16, STRING + REG_SIZE_2 = 2 # datatypes with 2 registers e.g. INT32 + REG_SIZE_4 = 4 # datatypes with 4 registers e.g. INT64 + REG_SIZES = 2**3 -1 # Isolate number of registers + REG_NEXT = 2**3 # No access with typecheck + READONLY = 2**4 # only read is allowed + INVALID = 2**5 # neither read nor write is allowed class DataType(enum.IntEnum): """Register types, used to define of a group of registers. @@ -162,16 +172,20 @@ class DataType(enum.IntEnum): #: Registers == 2 bytes (identical to UINT16) REGISTERS = enum.auto() -DATATYPE_STRUCT: dict[DataType, tuple[type | tuple[type, type], int]] = { # pylint: disable=consider-using-namedtuple-or-dataclass - DataType.INT16: (int, 1), - DataType.UINT16: (int, 1), - DataType.INT32: (int, 2), - DataType.UINT32: (int, 2), - DataType.INT64: (int, 4), - DataType.UINT64: (int, 4), - DataType.FLOAT32: (float, 2), - DataType.FLOAT64: (float, 4), - DataType.STRING: (str, -1), - DataType.BITS: ((list, int), -2), - DataType.REGISTERS: (int, 1), + #: 1 register + INVALID = enum.auto() + +DATATYPE_STRUCT: dict[DataType, tuple[type, str, int]] = { # pylint: disable=consider-using-namedtuple-or-dataclass + DataType.REGISTERS: (int, "h", 1), + DataType.INT16: (int, "h", 1), + DataType.UINT16: (int, "H", 1), + DataType.INT32: (int, "i", 2), + DataType.UINT32: (int, "I", 2), + DataType.INT64: (int, "q", 4), + DataType.UINT64: (int, "Q", 4), + DataType.FLOAT32: (float, "f", 2), + DataType.FLOAT64: (float, "d", 4), + DataType.STRING: (str, "s", 0), + DataType.BITS: (bool, "bits", 0), + DataType.INVALID: (int, "h", 1) } diff --git a/pymodbus/simulator/__init__.py b/pymodbus/simulator/__init__.py index 2d118ac02..35a880b0d 100644 --- a/pymodbus/simulator/__init__.py +++ b/pymodbus/simulator/__init__.py @@ -8,14 +8,9 @@ "SimCore", "SimData", "SimDevice", - "SimDevices", "SimValueType", ] from .simcore import SimCore -from .simdata import ( - SimAction, - SimData, - SimValueType, -) -from .simdevice import SimDevice, SimDevices +from .simdata import SimData, SimValueType +from .simdevice import SimAction, SimDevice diff --git a/pymodbus/simulator/simdata.py b/pymodbus/simulator/simdata.py index 5c6663fe4..7d8b05209 100644 --- a/pymodbus/simulator/simdata.py +++ b/pymodbus/simulator/simdata.py @@ -1,23 +1,18 @@ -"""Simulator data model classes. - -**REMARK** This code is experimental and not integrated into production. -""" +"""Simulator data model classes.""" from __future__ import annotations -import inspect -from collections.abc import Awaitable, Callable +import struct from dataclasses import dataclass from typing import TypeAlias, cast from ..constants import DATATYPE_STRUCT, DataType -from ..pdu import ExceptionResponse +from ..pdu.pdu import pack_bitstring SimValueTypeSimple: TypeAlias = int | float | str | bytes SimValueType: TypeAlias = SimValueTypeSimple | list[SimValueTypeSimple | bool] -SimAction: TypeAlias = Callable[[int, int, list[int]], Awaitable[list[int] | ExceptionResponse]] -@dataclass(order=True, frozen=True) +@dataclass class SimData: """Configure a group of continuous identical values/registers. @@ -39,6 +34,29 @@ class SimData: Each SimData defines 5 INT32 in total 10 registers (address 100-109) + + .. code-block:: python + + SimData( + address=0, + count=1000, + values=0x1234 + datatype=DataType.REGISTERS + ) + + Defines a range of registers (addresses) 0..999 each with the value 0x1234. + + + .. code-block:: python + + SimData( + address=0, + count=1000, + datatype=DataType.INVALID + ) + + Defines a range of registers (addresses) 0..999 each marked as invalid. + .. code-block:: python SimData( @@ -54,31 +72,19 @@ class SimData: ) SimData( address=100, - values=0xffff + values=0xffff, datatype=DataType.BITS ) SimData( address=100, - values=[0xffff] + values=[0xffff], datatype=DataType.BITS ) Each SimData defines 16 BITS (coils), with value True. - Value are stored in registers (16bit is 1 register), the address refers to the register. - - **Remark** when using offsets, only bit 0 of each register is used! - - .. code-block:: python - - SimData( - address=0, - count=1000, - values=0x1234 - datatype=DataType.REGISTERS - ) - - Defines a range of addresses 0..999 each with the value 0x1234. + Value are stored in registers (16bit is 1 register), the address refers to the register, unless + in non-shared mode where the address refers to the coil. """ #: Address of first register, starting with 0 (identical to the requests) @@ -91,38 +97,23 @@ class SimData: #: - count=1 datatype=DataType.STRING, values="ABCD" is 2 registers #: - count=2 datatype=DataType.STRING, values="ABCD" is 4 registers #: - #: Count cannot be used if values= is a list + #: if values= is a list, count will be applied to the whole list, e.g. + #: + #: - count=3 datatype=DataType.REGISTERS values=[3,2] is 6 registers. + #: - count=3 datatype=DataType.INT32 values=[3,2] is 12 registers. + #: - count=2 datatype=DataType.STRING, values=["ABCD", 'EFGH'] is 8 registers count: int = 1 #: Value/Values of datatype, #: will automatically be converted to registers, according to datatype. values: SimValueType = 0 - #: Used to check access and convert value to/from registers. - datatype: DataType = DataType.REGISTERS - - #: Optional function to call when registers are being read/written. - #: - #: **Example function:** - #: - #: .. code-block:: python - #: - #: async def my_action( - #: function_code: int, - #: address: int, - #: registers: list[int]) -> list[int] | ExceptionResponse: - #: - # return registers - #: - #: .. tip:: use functools.partial to add extra parameters if needed. - action: SimAction | None = None + #: Used to check access and convert value to/from registers or mark as invalid. + datatype: DataType = DataType.INVALID #: Mark register(s) as readonly. readonly: bool = False - #: Mark register(s) as invalid. - #: **remark** only to be used with address= and count= - invalid: bool = False def __check_simple(self): """Check simple parameters.""" @@ -130,28 +121,78 @@ def __check_simple(self): raise TypeError("0 <= address < 65535") if not isinstance(self.count, int) or not 1 <= self.count <= 65536: raise TypeError("1 <= count < 65536") - if not 1 <= self.address + self.count <= 65536: - raise TypeError("1 <= address + count < 65536") + if self.address + self.count -1 > 65535: + raise TypeError("address= + count= outside address range") if not isinstance(self.datatype, DataType): - raise TypeError("datatype= must by an DataType") - if self.action and not (callable(self.action) and inspect.iscoroutinefunction(self.action)): - raise TypeError("action= not a async function") + raise TypeError("datatype= must by a DataType") + if self.values and self.datatype == DataType.INVALID: + raise TypeError("values= cannot be used with invalid=True") + if isinstance(self.values, list) and not self.values: + raise TypeError("values= list cannot be empty") + + def __check_parameters(self): + """Check all parameters.""" + self.__check_simple() + x_values = self.values if isinstance(self.values, list) else [self.values] + x_datatype, _x_struct, _x_len = DATATYPE_STRUCT[self.datatype] + if self.datatype == DataType.BITS: + x_datatype = int if isinstance(x_values[0], int) else bool + if x_datatype is bool and len(x_values) % 16: + raise TypeError("values= must be a multiple of 16") + for x_value in x_values: + if not isinstance(x_value, x_datatype): + raise TypeError(f"values= {x_value} is not {x_datatype!s}") + if x_datatype is str and not x_value: + raise TypeError("values= cannot contain empty string") def __post_init__(self): """Define a group of registers.""" - self.__check_simple() - if self.datatype == DataType.STRING: - if not isinstance(self.values, str): - raise TypeError("datatype=DataType.STRING only allows values=\"string\"") - x_datatype, x_len = str, int((len(self.values) +1) / 2) + self.__check_parameters() + + def __convert_bytes_registers(self, byte_list: bytearray, word_order: str, byte_order: bool, data_type_len: int) -> list[int]: + """Convert bytearray to registers.""" + if byte_order: + regs = [ + int.from_bytes(byte_list[x : x + 2], "big") + for x in range(0, len(byte_list), 2) + ] + else: + regs = [ + int.from_bytes([byte_list[x+1], byte_list[x]], "big") + for x in range(0, len(byte_list), 2) + ] + if word_order == "big": + return regs + reversed_regs: list[int] = [] + for x in range(0, len(regs), data_type_len): + single_value_regs = regs[x: x + data_type_len] + single_value_regs.reverse() + reversed_regs = reversed_regs + single_value_regs + return reversed_regs + + def build_registers(self, endian: tuple[bool, bool], string_encoding: str) -> list[list[int]]: + """Convert values= to registers.""" + self.__check_parameters() + x_values = self.values if isinstance(self.values, list) else [self.values] + _x_datatype, x_struct, x_len = DATATYPE_STRUCT[self.datatype] + blocks_regs: list[list[int]] = [] + word_order = "big" if endian[0] else "little" + if self.datatype == DataType.BITS: + if isinstance(x_values[0], bool): + bytes_bits = bytearray(pack_bitstring(cast(list[bool], x_values))) + else: + bytes_bits = bytearray() + for v in x_values: + bytes_bits.extend(struct.pack(">H", v)) + blocks_regs.append(self.__convert_bytes_registers(bytes_bits, word_order, endian[1], x_len)) + elif self.datatype == DataType.STRING: + for value in x_values: + bytes_string = cast(str, value).encode(string_encoding) + if len(bytes_string) % 2: + bytes_string += b"\x00" + blocks_regs.append(self.__convert_bytes_registers(bytearray(bytes_string), word_order, endian[1], x_len)) else: - x = DATATYPE_STRUCT[self.datatype] - x_len = x[1] - x_datatype = cast(type[str], x[0]) - if not isinstance(self.values, list): - super().__setattr__("values", [self.values]) - for x_value in cast(list, self.values): - if not isinstance(x_value, x_datatype): - raise TypeError(f"value= can only contain {x_datatype!s}") - super().__setattr__("register_count", self.count * x_len) - super().__setattr__("type_size", x_len) + for v in x_values: + byte_list = struct.pack(f">{x_struct}", v) + blocks_regs.append(self.__convert_bytes_registers(bytearray(byte_list), word_order, endian[1], x_len)) + return blocks_regs * self.count diff --git a/pymodbus/simulator/simdevice.py b/pymodbus/simulator/simdevice.py index 082e7bbf8..9e4a54c8f 100644 --- a/pymodbus/simulator/simdevice.py +++ b/pymodbus/simulator/simdevice.py @@ -1,74 +1,75 @@ -"""Simulator device model classes. - -**REMARK** This code is experimental and not integrated into production. -""" +"""Simulator device model classes.""" from __future__ import annotations +import inspect +from collections.abc import Awaitable, Callable from dataclasses import dataclass -from typing import cast - -from pymodbus.constants import DATATYPE_STRUCT, DataType +from typing import TypeAlias, cast +from ..constants import DataType, RuntimeFlags +from ..pdu import ExceptionResponse from .simdata import SimData -@dataclass(order=True, frozen=True) +SimAction: TypeAlias = Callable[[int, int, list[int], list[int] | None], Awaitable[list[int] | None | ExceptionResponse]] +SimRegs: TypeAlias = tuple[int, list[int], list[int]] +TUPLE_NAMES = ( + "coils", + "discrete inputs", + "holding registers", + "input registers" + ) + + +@dataclass class SimDevice: """Configure a device with parameters and registers. - Registers are always defined as one block. + Registers are defined as a list of SimData objects (block). - Some old devices uses 4 distinct blocks instead of 1 block, to - support these devices, define 1 large block consisting of the - 4 blocks and use the offset_address= parameter. + Some old devices uses 4 distinct blocks instead of a shared block, to + support these devices, define the 4 blocks and add them as a set. When using distinct blocks, coils and discrete inputs are addressed differently, - each register represent 1 coil/relay. + each register represent 1 coil/relay **Device with shared registers**:: SimDevice( id=1, - registers=[SimData(...)] + simdata=[SimData(...)] ) **Device with non-shared registers**:: SimDevice( id=1, - registers=[SimData(...)], - offset_address=(0, 10, 20, 30), + simdata=([SimData(...)], [SimData(...)], [SimData(...)], [SimData(...)]), ) - Meaning registers: - - - 0-9 are coils - - 10-19 are relays - - 20-29 are holding registers - - 30-.. are input registers - - A server can contain either a single :class:`SimDevice` or a list of :class:`SimDevice` + A server can be configured with either a single :class:`SimDevice` or a list of :class:`SimDevice` to simulate a multipoint line. - - .. warning:: each block is sorted by address !! """ #: Address/id of device #: - #: Default 0 means accept all devices, except those specifically defined. + #: id=0 means all devices, except those specifically defined. id: int - #: List of registers. + #: List of register blocks (shared registers) + #: or a tuple with 4 lists of register blocks (non-shared registers) #: - registers: list[SimData] - - #: Default SimData to be used for registers not defined. - default: SimData | None = None - - #: Define starting address for each of the 4 blocks. + #: The tuple is defined as: + #: (, , , ) #: - #: .. tip:: Content (coil, discrete, holding, input) in growing order. - offset_address: tuple[int, int, int, int] | None = None + #: / have addressing calculated differently: + #: address register = address / 16 + #: to find the coil at address + #: count is number of coils, so registers returned are count +15 / 16. + #: + #: ..tip:: addresses not defined are invalid and will produce an ExceptionResponse + #: ..warning:: lists are sorted on starting address. + simdata: list[SimData] | tuple[list[SimData], list[SimData], list[SimData], list[SimData]] #: Enforce type checking, if True access are controlled to be conform with datatypes. #: @@ -84,118 +85,164 @@ class SimDevice: #: Byte order is defined in the modbus standard to be big-endian, #: however it is definable to test non-standard modbus devices #: - #: ..tip:: Content (word_order, byte_order), True means big-endian + #: ..tip:: Content (word_order, byte_order), True means big-endian. endian: tuple[bool, bool] = (True, True) - #: Set device identity + #: String encoding #: - identity: str = "pymodbus simulator/server" + string_encoding: str = "utf-8" + #: Set device identity + identity: str = "pymodbus simulator/server" - def __check_block(self, block: list[SimData]) -> list[SimData]: - """Check block content.""" - if not block: - return block - for inx, entry in enumerate(block): - if not isinstance(entry, SimData): - raise TypeError(f"registers[{inx}]= is a SimData entry") - block.sort(key=lambda x: x.address) - last_address = -1 - for entry in block: - last_address = self.__check_block_entries(last_address, entry) - if self.default and block: - first_address = block[0].address - if self.default.address > first_address: - raise TypeError(f"Default address is {self.default.address} but {first_address} is defined?") - def_last_address = self.default.address + self.default.count -1 - if last_address > def_last_address: - raise TypeError(f"Default address+count is {def_last_address} but {last_address} is defined?") - return block - - def __check_block_entries(self, last_address: int, entry: SimData) -> int: - """Check block entries.""" - values = entry.values if isinstance(entry.values, list) else [entry.values] - if entry.address <= last_address: - raise TypeError(f"SimData address {entry.address} is overlapping!") - if entry.datatype == DataType.BITS: - if isinstance(values[0], bool): - reg_count = int((len(values) + 15) / 16) - else: - reg_count = len(values) - return entry.address + reg_count * entry.count -1 - if entry.datatype == DataType.STRING: - return entry.address + len(cast(str, entry.values)) * entry.count -1 - register_count = DATATYPE_STRUCT[entry.datatype][1] - return entry.address + register_count * entry.count -1 + #: Function to call when registers are being accessed. + #: + #: **Example function:** + #: + #: .. code-block:: python + #: + #: async def my_action( + #: function_code: int, + #: start_address: int, + #: current_registers: list[int], + #: new_registers: list[int] | None) -> list[int] | ExceptionResponse: + #: + #: return registers + #: or + #: return None + #: + #: action, is called with current registers and if write request also the new registers. + #: result updates registers and if read request returned to the client. + #: + #: new_registers is None for read requests. + #: + #: if return is None it indicates no change. + #: + #: .. tip:: use functools.partial to add extra parameters if needed. + action: SimAction | None = None def __check_simple(self): """Check simple parameters.""" if not isinstance(self.id, int) or not 0 <= self.id <= 255: raise TypeError("0 <= id < 255") - if not isinstance(self.registers, list): - raise TypeError("registers= not a list") - if not self.default and not self.registers: - raise TypeError("Either registers= or default= must contain SimData") if not isinstance(self.type_check, bool): raise TypeError("type_check= not a bool") - if (not self.endian - or not isinstance(self.endian, tuple) - or len(self.endian) != 2 - or not isinstance(self.endian[0], bool) - or not isinstance(self.endian[1], bool) - ): - raise TypeError("endian= must be a tuple with 2 bool") if not isinstance(self.identity, str): raise TypeError("identity= must be a string") - if not self.default: - return - if not isinstance(self.default, SimData): - raise TypeError("default= must be a SimData object") - if not self.default.datatype == DataType.REGISTERS: - raise TypeError("default= only allow datatype=DataType.REGISTERS") + if self.action and not (callable(self.action) and inspect.iscoroutinefunction(self.action)): + raise TypeError("action= must be a async function") + if not (isinstance(self.endian, tuple) + and len(self.endian) == 2 + and isinstance(self.endian[0], bool) + and isinstance(self.endian[1], bool) + ): + raise TypeError("endian= must be a tuple with 2 bool") + test_str = "test string" + try: + test_str.encode(self.string_encoding) + except (UnicodeEncodeError, LookupError) as exc: + raise TypeError("string_encoding= not valid") from exc - def __post_init__(self): - """Define a device.""" - self.__check_simple() - super().__setattr__("registers", self.__check_block(self.registers)) - if self.offset_address is not None: - if not isinstance(self.offset_address, tuple): - raise TypeError("offset_address= must be a tuple") - if len(self.offset_address) != 4: - raise TypeError("offset_address= must be a tuple with 4 addresses") - if self.default: - reg_start = self.default.address - reg_end = self.default.address + self.default.count -1 - else: - reg_start = self.registers[0].address - reg_end = self.registers[-1].address + def __check_simple2(self): + """Check simple parameters.""" + if isinstance(self.simdata, list): + for inx, entry in enumerate(self.simdata): + if not isinstance(entry, SimData): + raise TypeError(f"simdata=list[{inx}] is not a SimData entry") + else: + if not (isinstance(self.simdata, tuple) + and len(self.simdata) == 4): + raise TypeError("simdata= must list or tuple") for i in range(4): - if not (reg_start < self.offset_address[i] < reg_end): - raise TypeError(f"offset_address[{i}] outside defined range") - if i and self.offset_address[i-1] >= self.offset_address[i]: - raise TypeError("offset_address= must be ascending addresses") - -@dataclass(order=True, frozen=True) -class SimDevices: - """Define a group of devices. + sim_list = cast(tuple, self.simdata)[i] + if not isinstance(sim_list, list): + raise TypeError(f"simdata=tuple[{TUPLE_NAMES[i]}] -> must be a list") + for inx, entry in enumerate(sim_list): + if not isinstance(entry, SimData): + raise TypeError(f"simdata[{inx}]=tuple[{TUPLE_NAMES[i]}] -> list[{inx}] is not a SimData entry") + if i < 2 and entry.datatype != DataType.BITS: + raise TypeError(f"simdata[{inx}]=tuple[{TUPLE_NAMES[i]}] -> list[{inx}] not DataType.BITS, not allowed") + + def __check_block(self, block: list[SimData], name: str): + """Check block content.""" + block.sort(key=lambda x: x.address) + last_address = block[0].address -1 + for entry in block: + last_address = self.__check_block_entries(last_address, entry, name) - If wanting to use multiple devices in a single server, - each SimDevice must be grouped with SimDevices. - """ + def __check_block_entries(self, last_address: int, entry: SimData, _name: str) -> int: + """Check block entries.""" + if entry.address <= last_address: + raise TypeError(f"SimData address {entry.address} is overlapping!") + blocks_regs = entry.build_registers(self.endian, self.string_encoding) + for registers in blocks_regs: + last_address += len(registers) + return last_address - #: Add a list of SimDevice - devices: list[SimDevice] + def __check_parameters(self): + """Check all parameters.""" + self.__check_simple() + self.__check_simple2() + if isinstance(self.simdata, list): + self.__check_block(self.simdata, "list") + else: + for i in range(4): + self.__check_block(self.simdata[i], TUPLE_NAMES[i]) def __post_init__(self): - """Define a group of devices.""" - if not isinstance(self.devices, list): - raise TypeError("devices= must be a list of SimDevice") - if not self.devices: - raise TypeError("devices= must contain at least 1 SimDevice") - list_id = [] - for device in self.devices: - if not isinstance(device, SimDevice): - raise TypeError("devices= contains non SimDevice entries") - if device.id in list_id: - raise TypeError(f"device_id={device.id} is duplicated") - list_id.append(device.id) + """Define a device.""" + self.__check_parameters() + + def __build_flags(self, simdata: SimData) -> tuple[int, int]: + """Create flags from SimData.""" + flag_normal = 0 + if simdata.datatype in (DataType.INT16, DataType.UINT16, DataType.BITS, DataType.STRING, DataType.INVALID, DataType.REGISTERS): + flag_normal |= RuntimeFlags.REG_SIZE_1 + elif simdata.datatype in (DataType.INT32, DataType.UINT32, DataType.FLOAT32): + flag_normal |= RuntimeFlags.REG_SIZE_2 + else: # simdata.datatype in (DataType.INT64, DataType.UINT64, DataType.FLOAT64): + flag_normal |= RuntimeFlags.REG_SIZE_4 + if simdata.datatype == DataType.INVALID: + flag_normal |= RuntimeFlags.INVALID + if simdata.readonly: + flag_normal |= RuntimeFlags.READONLY + return flag_normal, flag_normal | RuntimeFlags.REG_NEXT + + def __create_simdata(self, simdata: SimData, flag_list: list[int], reg_list: list[int]): + """Build registers for single SimData.""" + flag_normal, flag_next = self.__build_flags(simdata) + blocks_regs = simdata.build_registers(self.endian, self.string_encoding) + for registers in blocks_regs: + first = True + for reg in registers: + if first: + flag_list.append(flag_normal) + first = False + else: + flag_list.append(flag_next) + reg_list.append(reg) + + def __create_block(self, simdata: list[SimData]) -> SimRegs: + """Create registers for device.""" + flag_list: list[int] = [] + reg_list: list[int] = [] + start_address = simdata[0].address + flag_fill = RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID + for entry in simdata: + next_address = start_address + len(reg_list) + while next_address < entry.address: + flag_list.append(flag_fill) + reg_list.append(0) + next_address += 1 + self.__create_simdata(entry, flag_list, reg_list) + return (start_address, reg_list, flag_list) + + def build_device(self) -> SimRegs | list[SimRegs]: + """Check simdata and built runtime structure.""" + self.__check_parameters() + if isinstance(self.simdata, list): + return self.__create_block(self.simdata) + b: list[SimRegs] = [] + for i in range(4): + b.append(self.__create_block(cast(tuple, self.simdata)[i])) + return b diff --git a/pymodbus/simulator/simruntime.py b/pymodbus/simulator/simruntime.py deleted file mode 100644 index 091244f06..000000000 --- a/pymodbus/simulator/simruntime.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Simulator data model implementation. - -**REMARK** This code is experimental and not integrated into production. -""" -from __future__ import annotations - -from dataclasses import dataclass - -from pymodbus.constants import DataType - -from .simdata import SimAction, SimData -from .simdevice import SimDevices - - -class SimRuntimeRegister: - """Datastore for a single register.""" - - FLAG_REG_SIZE_1 = 0 # datatypes with 1 register e.g. INT16, STRING - FLAG_REG_SIZE_2 = 1 # datatypes with 2 registers e.g. INT32 - FLAG_REG_SIZE_4 = 2 # datatypes with 4 registers e.g. INT64 - FLAG_REGISTERS = 2**2 -1 # bits 0-1 is datatype size - FLAG_INVALID = 2**2 # bit 2, neither read nor write is allowed - FLAG_READONLY = 2**3 # bit 3, only read is allowed - FLAG_NO_DIRECT = 2**4 # bit 4, part of a Datatype e.g. INT32 - FLAG_ACTION = 2**5 # bit 5, Action defined - - def __init__(self, flags: int, register: int): - """Do setup register.""" - self.flags = flags - self.register = register - - def data_size(self) -> int: - """Get data size.""" - if not (sz := 2 * (self.flags & self.FLAG_REGISTERS)): - sz = 1 - return sz - - def is_invalid(self) -> bool: - """Check for invalid.""" - return bool(self.flags & self.FLAG_INVALID) - - def is_readonly(self) -> bool: - """Check for readonly.""" - return bool(self.flags & self.FLAG_READONLY) - - def is_no_direct(self) -> bool: - """Check for no direct register.""" - return bool(self.flags & self.FLAG_NO_DIRECT) - - def is_action(self) -> bool: - """Check for attached action.""" - return bool(self.flags & self.FLAG_ACTION) - - -@dataclass(order=True) -class SimRuntimeAction: - """Datastore for a single action.""" - - start_address: int - count: int - datatype: DataType - action: SimAction - -@dataclass(order=True) -class SimRuntimeDevice: - """Datastore for a device.""" - - device_id: int - start_address: int - end_address: int - registers: list[SimRuntimeRegister] - actions: list[SimRuntimeAction] - endian: tuple[str, str] - type_check: bool - identity: str - offset_index: tuple[int, int, int, int] | None - - -class SimSetupRuntime: - """Helper class to convert SimData/SimDevice to runtime data.""" - - def __init__(self) -> None: - """Build runtime lists.""" - - def build_simdata(self, entry: SimData, default: SimData) -> tuple[list[SimRuntimeRegister], SimRuntimeAction | None]: - """Convert single SimData.""" - _ = default - if entry.action: - action = SimRuntimeAction(entry.address, entry.count, entry.datatype, entry.action) - return ([], action) - return ([], None) - - def build_block(self, device: SimRuntimeDevice, listdata: list[SimData], default: SimData): - """Build register/action arrays.""" - registers: list[SimRuntimeRegister] = [] - actions: list[SimRuntimeAction] = [] - for entry in listdata: - regs, action = self.build_simdata(entry, default) - registers.extend(regs) - if action: - actions.append(action) - device.registers = registers - device.actions = actions - - def build_runtime(self, devices: SimDevices) -> dict[int, SimRuntimeDevice]: - """Build runtime classes.""" - if not isinstance(devices, SimDevices): - raise TypeError("Please use devices=") - runtime: dict[int, SimRuntimeDevice] = {} - for device in devices.devices: - default = device.default if device.default else SimData(0, invalid=True) - endian = ("big" if device.endian[0] else "little", - "big" if device.endian[1] else "little") - runtime_device = SimRuntimeDevice( - device_id = device.id, - start_address = default.address, - end_address = 0, - registers = [], - actions = [], - endian = endian, - type_check = device.type_check, - identity = device.identity, - offset_index = device.offset_address, - ) - self.build_block(runtime_device, device.registers, default) - runtime[device.id] = runtime_device - return runtime diff --git a/test/simulator/test_simcore.py b/test/simulator/test_simcore.py index d7ae37508..a21e06009 100644 --- a/test/simulator/test_simcore.py +++ b/test/simulator/test_simcore.py @@ -17,5 +17,5 @@ def test_build_block(self): def test_build_config(self): """Test that simdata can be objects.""" - device = SimDevice(17, registers=[SimData(0)]) + device = SimDevice(17, simdata=[SimData(0)]) SimCore.build_block([device]) diff --git a/test/simulator/test_simdata.py b/test/simulator/test_simdata.py index 85cdc7c00..3a4dedf4d 100644 --- a/test/simulator/test_simdata.py +++ b/test/simulator/test_simdata.py @@ -8,35 +8,24 @@ class TestSimData: """Test simulator data config.""" - async def async_dummy_action(self): - """Set action.""" - - def dummy_action(self): - """Set action.""" - @pytest.mark.parametrize("kwargs", [ {"address": 0}, {"address": 65535}, {"address": 65535, "count": 1}, {"address": 0, "count": 65536}, {"address": 1, "count": 65535}, - {"address": 1, "count": 10, "invalid": True}, + {"address": 1, "count": 10, "datatype": DataType.INVALID}, {"address": 3, "count": 10, "readonly": True}, {"address": 4, "datatype": DataType.INT16, "values": 17}, {"address": 5, "datatype": DataType.INT16, "values": [17, 18]}, {"address": 6, "count": 10, "datatype": DataType.INT16, "values": [17, 18]}, {"address": 7, "datatype": DataType.STRING, "values": "test"}, {"address": 8, "count": 10, "datatype": DataType.STRING, "values": "test"}, - {"address": 9, "action": async_dummy_action}, {"address": 0, "datatype": DataType.REGISTERS, "values": 17, "count": 5}, - {"address": 1, "datatype": DataType.INT16, "values": 17, "invalid": True}, {"address": 3, "datatype": DataType.INT16, "values": 17, "readonly": True}, {"address": 0, "count": 2^16 -1}, - {"address": 4, "datatype": DataType.BITS}, - {"address": 4, "datatype": DataType.BITS, "values": 117}, - {"address": 1, "datatype": DataType.BITS, "values": True}, + {"address": 4, "datatype": DataType.BITS, "values": True}, {"address": 4, "datatype": DataType.BITS, "values": [True, True]}, - {"address": 2, "values": 17}, ]) def test_simdata_instanciate(self, kwargs): """Test that simdata can be objects.""" @@ -47,7 +36,7 @@ def test_simdata_instanciate(self, kwargs): {"address": 1.0}, {"address": -1}, {"address": 70000}, - {"address": 1, "count": 65536}, + {"address": 1, "count": 65537}, {"address": 65535, "count": 2}, {"address": 1, "count": "not ok"}, {"address": 1, "count": 1.0}, @@ -56,8 +45,7 @@ def test_simdata_instanciate(self, kwargs): {"address": 1, "count": 0}, {"address": 1, "datatype": "not ok"}, {"address": 1, "datatype": 11}, - {"address": 1, "action": "not ok"}, - {"address": 1, "action": dummy_action}, + {"address": 2, "values": 17}, ]) def test_simdata_not_ok(self, kwargs): """Test that simdata can be objects.""" @@ -66,6 +54,7 @@ def test_simdata_not_ok(self, kwargs): @pytest.mark.parametrize(("value", "value_type"), [ ("ok str", DataType.STRING), + (["ok str", "ok2"], DataType.STRING), (1.0, DataType.FLOAT32), ([1.0, 2.0], DataType.FLOAT32), (1, DataType.INT32), @@ -80,9 +69,10 @@ def test_simdata_value_ok(self, value, value_type): SimData(0, values=value, datatype=value_type) @pytest.mark.parametrize(("value", "value_type"), [ - (["ok str", "ok2"], DataType.STRING), (1, DataType.STRING), + ("", DataType.STRING), (1, DataType.FLOAT32), + ([], DataType.INT16), ([1.0, 2], DataType.FLOAT32), (1.0, DataType.INT32), ([1, 2.0], DataType.INT32), @@ -96,3 +86,72 @@ def test_simdata_value_not_ok(self, value, value_type): """Test simdata value.""" with pytest.raises(TypeError): SimData(0, values=value, datatype=value_type) + + @pytest.mark.parametrize(("values"), [1, [1, 2] ]) + def test_simdata_value_invalid(self, values): + """Test invalid.""" + with pytest.raises(TypeError): + SimData(0, values=values, datatype=DataType.INVALID) + + @pytest.mark.parametrize(("value", "count", "value_type", "order", "regs"), [ + # test word order + (-27123, 1, DataType.INT16, (True, True), [[0x960D]]), + ([-27123, 27123], 1, DataType.INT16, (True, True), [[0x960D], [0x69F3]]), + ([-27123, 27123], 1, DataType.INT16, (False, True), [[0x960D], [0x69F3]]), + ([32145678, -32145678], 1, DataType.INT32, (True, True), [[0x01EA, 0x810E], [0xFE15, 0x7EF2]]), + ([32145678, -32145678], 1, DataType.INT32, (False, True), [[0x810E, 0x01EA], [0x7EF2, 0xFE15]]), + # test byte order + ([-27123, 27123], 1, DataType.REGISTERS, (True, False), [[0x0D96], [0xF369]]), + ([32145678, -32145678], 1, DataType.INT32, (True, False), [[0xEA01, 0x0E81], [0x15FE, 0xF27E]]), + # test count + ([-27123, 27123], 2, DataType.INT16, (True, True), [[0x960D], [0x69F3], [0x960D], [0x69F3]]), + # test data types + (27123, 1, DataType.REGISTERS, (True, True), [[0x69F3]]), + (-27124, 1, DataType.INT16, (True, True), [[0x960C]]), + (27123, 1, DataType.UINT16, (True, True), [[0x69F3]]), + (-32145678, 1, DataType.INT32, (True, True), [[0xFE15, 0x7EF2]]), + (32145678, 1, DataType.UINT32, (True, True), [[0x01EA, 0x810E]]), + (-1234567890123456789, 1, DataType.INT64, (True, True), [[0xEEDD, 0xEF0B, 0x8216, 0x7EEB]]), + (1234567890123456789, 1, DataType.UINT64, (True, True), [[0x1122, 0x10F4, 0x7DE9, 0x8115]]), + (27123.5, 1, DataType.FLOAT32, (True, True), [[0x46D3, 0xE700]]), + (3.141592, 1, DataType.FLOAT32, (True, True), [[0x4049, 0x0FD8]]), + (-3.141592, 1, DataType.FLOAT32, (True, True), [[0xC049, 0x0FD8]]), + (27123.5, 1, DataType.FLOAT64, (True, True), [[0x40DA, 0x7CE0, 0x0000, 0x0000]]), + (3.14159265358979, 1, DataType.FLOAT64, (True, True), [[0x4009, 0x21FB, 0x5444, 0x2D11]]), + (-3.14159265358979, 1, DataType.FLOAT64, (True, True), [[0xC009, 0x21FB, 0x5444, 0x2D11]]), + ([True] + [False] * 15, 1, DataType.BITS, (True, True), [[256]]), + (0x0100, 1, DataType.BITS, (True, True), [[256]]), + ([0x0100, 0x0001], 1, DataType.BITS, (True, True), [[256, 1]]), + + ]) + def test_simdata_build_registers(self, value, count, value_type, order, regs): + """Test simdata value.""" + sd = SimData(0, values=value, count=count, datatype=value_type) + build_regs = sd.build_registers((order[0], order[1]), "utf-8") + assert build_regs == regs + + @pytest.mark.parametrize(("value", "code", "expect"), [ + ("ABC", "utf-8", [[0x4142, 0x4300]]), + ("abcdÇ", "utf-8", [[0x6162, 0x6364, 0xc387]]), + ("abcdÇ", "cp437", [[0x6162, 0x6364, 0x8000]]), + (["ABC", "DEFG"], "utf-8", [[0x4142, 0x4300],[0x4445, 0x4647]]), + ]) + def test_simdata_build_string(self, value, code, expect): + """Test simdata value.""" + sd = SimData(0, values=value, datatype=DataType.STRING) + build_regs = sd.build_registers((True, True), code) + assert build_regs == expect + + def test_simdata_build_updated_simdata(self): + """Test simdata value.""" + sd = SimData(0, values="ABC", datatype=DataType.STRING) + build_regs = sd.build_registers((True, True), "utf-8") + assert build_regs == [[0x4142, 0x4300]] + sd.values="ABCDEF" + build_regs = sd.build_registers((True, True), "utf-8") + assert build_regs == [[0x4142, 0x4344, 0x4546]] + + sd.values=123 + with pytest.raises(TypeError): + sd.build_registers((True, True), "utf-8") + diff --git a/test/simulator/test_simdevice.py b/test/simulator/test_simdevice.py index d1877aaf2..96d4827ea 100644 --- a/test/simulator/test_simdevice.py +++ b/test/simulator/test_simdevice.py @@ -3,104 +3,93 @@ import pytest -from pymodbus.constants import DataType -from pymodbus.simulator import SimData, SimDevice, SimDevices +from pymodbus.constants import DataType, RuntimeFlags +from pymodbus.simulator import SimData, SimDevice class TestSimDevice: """Test simulator device config.""" - simdata1 = SimData(0, values=15) - simdata2 = SimData(1, values=16) - simdatadef = SimData(0, values=17, count=10) + async def my_action( + self, + _function_code, + _address, + current_registers, + _new_registers + ): + """Run action.""" + return current_registers + + def my_sync_action( + self, + _function_code, + _address, + current_registers, + _new_registers + ): + """Run action.""" + return current_registers + + simdata1 = SimData(0, datatype=DataType.INT16, values=15) + simdata2 = SimData(1, datatype=DataType.INT16, values=16) + simdata3 = SimData(1, datatype=DataType.BITS, values=16) @pytest.mark.parametrize("kwargs", [ - {"id": 0, "registers": [simdata1], "default": simdatadef}, - {"id": 0, "registers": [], "default": simdatadef}, - {"id": 0, "type_check": True, "registers": [simdata1]}, - {"id": 0, "registers": [], "default": simdatadef, "offset_address": (1, 4, 6, 8)}, - {"id": 0, "registers": [simdata1], "endian": (False, True)}, - {"id": 0, "registers": [simdata1], "endian": (True, False)}, - {"id": 0, "registers": [simdata1], "identity": "my server"}, + {"id": 0, "simdata": [SimData(2, datatype=DataType.STRING, values="test")], "string_encoding": "utf-8"}, + {"id": 0, "simdata": ([simdata3], [simdata3], [simdata1], [simdata3])}, + {"id": 0, "simdata": [simdata2, simdata1]}, + {"id": 0, "type_check": True, "simdata": [simdata1]}, + {"id": 0, "simdata": [simdata1], "endian": (False, True)}, + {"id": 0, "simdata": [simdata1], "endian": (True, False)}, + {"id": 0, "simdata": [simdata1], "identity": "my server"}, ]) def test_simdevice_instanciate(self, kwargs): """Test that simdata can be objects.""" SimDevice(**kwargs) @pytest.mark.parametrize("kwargs", [ - {"registers": [simdata1]}, {"id": 0}, - {"id": "not ok", "registers": [simdata1]}, - {"id": 1.0, "registers": [simdata1]}, - {"id": 256, "registers": [simdata1]}, - {"id": -1, "registers": [simdata1]}, - {"id": 1, "registers": []}, - {"id": 1, "registers": [simdata1], "word_order_big": "hmm"}, - {"id": 1, "registers": [simdata1], "byte_order_big": "hmm"}, - {"id": 0, "registers": [SimData(200)], "default": SimData(1, 10)}, - {"id": 0, "registers": [SimData(2)], "default": SimData(10, 10)}, - {"id": 0, "registers": [SimData(1, 2), SimData(2)]}, - {"id": 0, "registers": [simdatadef, SimData(2, 10)]}, - {"id": 0, "registers": [simdata1], "type_check": "hmm"}, - {"id": 0, "registers": [simdatadef], "offset_address": 117}, - {"id": 0, "registers": [simdatadef], "offset_address": (1, 2, 3)}, - {"id": 0, "registers": [simdatadef], "offset_address": (1, 3, 2, 4)}, - {"id": 0, "registers": [simdatadef], "offset_address": (1, 3, 2, 20)}, - {"id": 0, "registers": [SimData(1, 10)], "offset_address": (1, 3, 2, 15)}, - {"id": 0, "registers": [SimData(10, 10)], "offset_address": (1, 3, 2, 4)}, - {"id": 0, "registers": [simdatadef], "offset_address": ()}, - {"id": 0, "registers": [simdata1], "identity": 123}, - {"id": 0, "registers": [simdata1], "identity": None}, + {"id": 0, "simdata": [simdata1], "identity": 123}, + {"simdata": []}, + {"id": 0, "simdata": (simdata3, [simdata3], [simdata1], [simdata3])}, + {"id": 0, "simdata": (["not ok"], [simdata3], [simdata1], [simdata3])}, + {"id": 0, "simdata": ([simdata1], [simdata3], [simdata1], [simdata1])}, + {"id": 0, "simdata": ([simdata3], [simdata1], [simdata1], [simdata1])}, + {"id": 0, "simdata": ([simdata3], [simdata3], [simdata1], "not ok")}, + {"id": 0, "simdata": [simdata1], "string_encoding": "not ok"}, + {"id": "not ok", "simdata": [simdata1]}, + {"id": 1.0, "simdata": [simdata1]}, + {"id": 1, "simdata": [simdata1, simdata1]}, + {"id": 256, "simdata": [simdata1]}, + {"id": -1, "simdata": [simdata1]}, + {"id": 1, "simdata": [simdata1], "word_order_big": "hmm"}, + {"id": 1, "simdata": [simdata1], "byte_order_big": "hmm"}, + {"id": 0, "simdata": [simdata1], "type_check": "hmm"}, + {"id": 0, "simdata": [simdata1], "identity": None}, + {"id": 0, "simdata": ["not ok"]}, + {"id": 0, "simdata": SimData(1, datatype=DataType.INT16, values=3)}, ]) def test_simdevice_not_ok(self, kwargs): """Test that simdata can be objects.""" with pytest.raises(TypeError): SimDevice(**kwargs) - @pytest.mark.parametrize(("block", "default", "expect"), [ - ([simdata1], None, 0), - ([SimData(0, values=0xffff, datatype=DataType.BITS)], None, 0), - ([SimData(0, values=[0xffff], datatype=DataType.BITS)], None, 0), - ([SimData(0, values=[True], datatype=DataType.BITS)], None, 0), - ([SimData(0, values="hello", datatype=DataType.STRING)], None, 0), - ([], simdatadef, 1), - ([simdata1], simdatadef, 1), - ([simdata1, simdata2], simdatadef, 1), - ([simdata1, simdata1], simdatadef, 2), - (SimData(0), None, 2), - ("no valid", None, 2), - (["no valid"], None, 2), - ([simdata1], "no valid", 2), - ([simdata1], [simdata1], 2), - ([simdata1], SimData(1, 10, datatype=DataType.INT16), 2), + @pytest.mark.parametrize(("block", "expect"), [ + ([SimData(0, values=0xffff, datatype=DataType.BITS)], 0), + ([SimData(0, values=[0xffff], datatype=DataType.BITS)], 0), + ([SimData(0, values=[True], datatype=DataType.BITS)], 0), + ([SimData(0, values="hello", datatype=DataType.STRING)], 0), + (SimData(0), 2), + ("no valid", 2), + (["no valid"], 2), ]) - def test_simdevice_block(self, block, default, expect): + def test_simdevice_block(self, block, expect): """Test that simdata can be objects.""" if not expect: - SimDevice(id=0, default=default, registers=block) - elif expect == 1: - SimDevice(id=0, default=default, registers=block) + SimDevice(id=0, simdata=block) else: # expect == 2: with pytest.raises(TypeError): - SimDevice(id=0, default=default, registers=block) - - @pytest.mark.parametrize(("offset", "expect"), [ - ("not ok", 1), - (["not ok"], 1), - ([1, 2, 3, 4], 1), - ((1, 2), 1), - ((4, 2, 3, 5), 1), - ((1, 4, 3, 5), 1), - ((1, 2, 3, 11), 1), - ((1, 2, 3, 4), 0), - ]) - def test_simdevice_offset(self, offset, expect): - """Test offset.""" - if expect: - with pytest.raises(TypeError): - SimDevice(id=0, default=self.simdatadef, registers=[], offset_address=offset) - else: - SimDevice(id=0, default=self.simdatadef, registers=[], offset_address=offset) + SimDevice(id=0, simdata=block) @pytest.mark.parametrize(("endian", "expect"), [ ("not ok", 1), @@ -114,30 +103,104 @@ def test_simdevice_endian(self, endian, expect): """Test offset.""" if expect: with pytest.raises(TypeError): - SimDevice(id=0, registers=[self.simdata1], endian=endian) + SimDevice(id=0, simdata=[self.simdata1], endian=endian) else: - SimDevice(id=0, registers=[self.simdata1], endian=endian) - - @pytest.mark.parametrize("block", [ - [SimDevice(1, [simdata1])], - [SimDevice(0, [simdata1])], - [SimDevice(0, [simdata1]), SimDevice(1, [simdata1])], - [SimDevice(2, [simdata1]), SimDevice(3, [simdata1])], - ]) - def test_simdevices_instanciate(self, block): - """Test SimDevices.""" - SimDevices(block) + SimDevice(id=0, simdata=[self.simdata1], endian=endian) - @pytest.mark.parametrize("block", [ - "not ok", - ["not ok"], - [], - SimDevice(0, [simdata1]), - [SimDevice(0, [simdata1]), SimDevice(0, [simdata1])], - [SimDevice(2, [simdata1]), SimDevice(2, [simdata1])], - [SimDevice(2, [simdata1]), SimDevice(3, [simdata1]), SimDevice(3, [simdata1])], - ]) - def test_simdevices_not_ok(self, block): - """Test SimDevices.""" + async def test_simdevice_action(self): + """Test action.""" + await self.my_action(0, 0, [], None) + self.my_sync_action(0, 0, [], None) + SimDevice(1, simdata=[SimData(1)], action=self.my_action) + with pytest.raises(TypeError): + SimDevice(1, simdata=[SimData(1)], action=self.my_sync_action) with pytest.raises(TypeError): - SimDevices(block) + SimDevice(1, simdata=[SimData(1)], action="no good") + + @pytest.mark.parametrize(("block", "result"), [ + ([SimData(1, values=123, readonly=True, datatype=DataType.INT16)], + (1, [123], + [RuntimeFlags.REG_SIZE_1 | RuntimeFlags.READONLY])), + ([SimData(1, values="ABC", datatype=DataType.STRING)], + (1, [0x4142, 0x4300], + [RuntimeFlags.REG_SIZE_1, RuntimeFlags.REG_SIZE_1 | RuntimeFlags.REG_NEXT])), + ([SimData(0, values=0xffff, datatype=DataType.BITS)], (0, [65535], [RuntimeFlags.REG_SIZE_1])), + ([SimData(0, values=[0xffff, 0xffff], datatype=DataType.BITS)], + (0, [65535, 65535], [RuntimeFlags.REG_SIZE_1, RuntimeFlags.REG_SIZE_1 | RuntimeFlags.REG_NEXT])), + ([SimData(1, values=123, datatype=DataType.INT16), + SimData(3, values=456, datatype=DataType.INT16)], + (1, [123, 0, 456], + [RuntimeFlags.REG_SIZE_1, RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, RuntimeFlags.REG_SIZE_1])), + ([SimData(1, values=123, datatype=DataType.REGISTERS), + SimData(3, values=456, datatype=DataType.REGISTERS)], + (1, [123, 0, 456], + [RuntimeFlags.REG_SIZE_1, RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, RuntimeFlags.REG_SIZE_1])), + ([SimData(1, datatype=DataType.INVALID), + SimData(3, datatype=DataType.INVALID)], + (1, [0, 0, 0], + [RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID])), + ([SimData(0, values=123, datatype=DataType.INT32), + SimData(3, values=456, datatype=DataType.INT32)], + (0, [0, 123, 0, 0, 456], [RuntimeFlags.REG_SIZE_2, + RuntimeFlags.REG_SIZE_2 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, + RuntimeFlags.REG_SIZE_2, + RuntimeFlags.REG_SIZE_2 | RuntimeFlags.REG_NEXT])), + ([SimData(0, values=123, datatype=DataType.UINT32), + SimData(3, values=456, datatype=DataType.UINT32)], + (0, [0, 123, 0, 0, 456], [RuntimeFlags.REG_SIZE_2, + RuntimeFlags.REG_SIZE_2 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, + RuntimeFlags.REG_SIZE_2, + RuntimeFlags.REG_SIZE_2 | RuntimeFlags.REG_NEXT])), + ([SimData(0, values=27123.5, datatype=DataType.FLOAT32), + SimData(3, values=-3.141592, datatype=DataType.FLOAT32)], + (0, [0x46D3, 0xE700, 0, 0xC049, 0x0FD8], [RuntimeFlags.REG_SIZE_2, + RuntimeFlags.REG_SIZE_2 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, + RuntimeFlags.REG_SIZE_2, + RuntimeFlags.REG_SIZE_2 | RuntimeFlags.REG_NEXT])), + ([SimData(0, values=-1234567890123456789, datatype=DataType.INT64), + SimData(5, values=1234567890123456789, datatype=DataType.INT64)], + (0, [0xEEDD, 0xEF0B, 0x8216, 0x7EEB, 0, 0x1122, 0x10F4, 0x7DE9, 0x8115], [RuntimeFlags.REG_SIZE_4, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, + RuntimeFlags.REG_SIZE_4, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT])), + ([SimData(0, values=1234567890123456789, datatype=DataType.UINT64), + SimData(5, values=1234567890123456789, datatype=DataType.UINT64)], + (0, [0x1122, 0x10F4, 0x7DE9, 0x8115, 0, 0x1122, 0x10F4, 0x7DE9, 0x8115], [RuntimeFlags.REG_SIZE_4, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, + RuntimeFlags.REG_SIZE_4, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT])), + ([SimData(0, values=3.14159265358979, datatype=DataType.FLOAT64), + SimData(5, values=-3.14159265358979, datatype=DataType.FLOAT64)], + (0, [0x4009, 0x21FB, 0x5444, 0x2D11, 0, 0xC009, 0x21FB, 0x5444, 0x2D11], [RuntimeFlags.REG_SIZE_4, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_1 | RuntimeFlags.INVALID, + RuntimeFlags.REG_SIZE_4, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT, + RuntimeFlags.REG_SIZE_4 | RuntimeFlags.REG_NEXT])), + (([SimData(1, values=123, datatype=DataType.BITS)], [SimData(1, values=123, datatype=DataType.BITS)], + [SimData(1, values=123, datatype=DataType.INT16)], [SimData(1, values=123, datatype=DataType.INT16)]), + (((1, [123], [RuntimeFlags.REG_SIZE_1]), (1, [123], [RuntimeFlags.REG_SIZE_1]), + (1, [123], [RuntimeFlags.REG_SIZE_1]), (1, [123], [RuntimeFlags.REG_SIZE_1])))), + ]) + def test_simdevice_build(self, block, result): + """Test build_device() ok.""" + sd = SimDevice(id=1, simdata=block) + lists = sd.build_device() + assert lists[0] == result[0] + assert lists[1] == result[1] diff --git a/test/simulator/test_simruntime.py b/test/simulator/test_simruntime.py deleted file mode 100644 index 41ad0f3d9..000000000 --- a/test/simulator/test_simruntime.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Test SimRuntime.""" - -import pytest - -from pymodbus.pdu import ExceptionResponse -from pymodbus.simulator import SimData, SimDevice, SimDevices -from pymodbus.simulator.simruntime import SimRuntimeRegister, SimSetupRuntime - - -class TestSimRuntime: - """Test simulator runtime generator.""" - - async def my_action( - self, - function_code: int, - address: int, - registers: list[int] - ) -> list[int] | ExceptionResponse: - """Run action.""" - return registers - - - TOTAL_FLAGS = ( - SimRuntimeRegister.FLAG_ACTION | - SimRuntimeRegister.FLAG_INVALID | - SimRuntimeRegister.FLAG_NO_DIRECT | - SimRuntimeRegister.FLAG_READONLY - ) - - @pytest.mark.parametrize("onoff", [True, False]) - @pytest.mark.parametrize(("reg_flag", "exp_size"), [ - (SimRuntimeRegister.FLAG_REG_SIZE_1, 1), - (SimRuntimeRegister.FLAG_REG_SIZE_2, 2), - (SimRuntimeRegister.FLAG_REG_SIZE_4, 4), - ]) - @pytest.mark.parametrize(("test_flag", "exp_flags"), [ - (SimRuntimeRegister.FLAG_ACTION, [False, True, True, True]), - (SimRuntimeRegister.FLAG_INVALID, [True, False, True, True]), - (SimRuntimeRegister.FLAG_NO_DIRECT, [True, True, False, True]), - (SimRuntimeRegister.FLAG_READONLY, [True, True, True, False]), - ]) - def test_simruntimeregister_instanciate(self, onoff, reg_flag, exp_size, test_flag, exp_flags): - """Test that SimRuntimeRegister can be objects.""" - flags = reg_flag - res_flags = exp_flags.copy() - if onoff: - flags += test_flag - for i in range(4): - res_flags[i] = not res_flags[i] - else: - flags += self.TOTAL_FLAGS^test_flag - reg = SimRuntimeRegister(flags, 15) - assert reg.data_size() == exp_size - assert reg.is_action() == res_flags[0] - assert reg.is_invalid() == res_flags[1] - assert reg.is_no_direct() == res_flags[2] - assert reg.is_readonly() == res_flags[3] - - def test_simsetupruntime(self): - """Test simSetupRuntime.""" - SimSetupRuntime() - - @pytest.mark.parametrize(("devices", "expect"), [ - ("not ok", 1), - (SimDevices([SimDevice(0, [SimData(0)])]), 0), - ]) - def test_simsetupruntime_build(self, devices, expect): - """Test simSetupRuntime.""" - a = SimSetupRuntime() - if expect: - with pytest.raises(TypeError): - a.build_runtime(devices) - else: - a.build_runtime(devices) - - async def test_simsetupruntime_build_action(self): - """Test simSetupRuntime.""" - await self.my_action(0, 0, []) - a = SimSetupRuntime() - a.build_runtime(SimDevices([SimDevice(0, [SimData(0, action=self.my_action)])]))