Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/server_datamodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ def define_datamodel():
5,
[block_def, block5],
) == SimDevice(
id=5, type_check=False, registers=[block_def, block5]
id=5, type_check=False, simdata=[block_def, block5]
)

# SimDevice can define either a shared or a non-shared register model
SimDevice(id=1, type_check=False, registers=[block_def, block5])
SimDevice(id=1, type_check=False, simdata=[block_def, block5])
#SimDevice(2, False,
# block_coil=[block1],
# block_discrete=[block1],
Expand Down
10 changes: 5 additions & 5 deletions pymodbus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
"pymodbus_apply_logging_config"
]

from pymodbus.exceptions import ModbusException
from pymodbus.framer import FramerType
from pymodbus.logging import pymodbus_apply_logging_config
from pymodbus.pdu import ExceptionResponse
from pymodbus.pdu.device import ModbusDeviceIdentification
from .exceptions import ModbusException
from .framer import FramerType
from .logging import pymodbus_apply_logging_config
from .pdu import ExceptionResponse
from .pdu.device import ModbusDeviceIdentification


__version__ = "4.0.0dev7"
Expand Down
38 changes: 26 additions & 12 deletions pymodbus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ class MoreData(enum.IntEnum):
NOTHING = 0x00
KEEP_READING = 0xFF

class RuntimeFlags(enum.IntEnum):
"""Represents flags used by server runtime."""

REG_SIZE_1 = 1 # datatypes with 1 register e.g. INT16, STRING
REG_SIZE_2 = 2 # datatypes with 2 registers e.g. INT32
REG_SIZE_4 = 4 # datatypes with 4 registers e.g. INT64
REG_SIZES = 2**3 -1 # Isolate number of registers
REG_NEXT = 2**3 # No access with typecheck
READONLY = 2**4 # only read is allowed
INVALID = 2**5 # neither read nor write is allowed

class DataType(enum.IntEnum):
"""Register types, used to define of a group of registers.
Expand Down Expand Up @@ -162,16 +172,20 @@ class DataType(enum.IntEnum):
#: Registers == 2 bytes (identical to UINT16)
REGISTERS = enum.auto()

DATATYPE_STRUCT: dict[DataType, tuple[type | tuple[type, type], int]] = { # pylint: disable=consider-using-namedtuple-or-dataclass
DataType.INT16: (int, 1),
DataType.UINT16: (int, 1),
DataType.INT32: (int, 2),
DataType.UINT32: (int, 2),
DataType.INT64: (int, 4),
DataType.UINT64: (int, 4),
DataType.FLOAT32: (float, 2),
DataType.FLOAT64: (float, 4),
DataType.STRING: (str, -1),
DataType.BITS: ((list, int), -2),
DataType.REGISTERS: (int, 1),
#: 1 register
INVALID = enum.auto()

DATATYPE_STRUCT: dict[DataType, tuple[type, str, int]] = { # pylint: disable=consider-using-namedtuple-or-dataclass
DataType.REGISTERS: (int, "h", 1),
DataType.INT16: (int, "h", 1),
DataType.UINT16: (int, "H", 1),
DataType.INT32: (int, "i", 2),
DataType.UINT32: (int, "I", 2),
DataType.INT64: (int, "q", 4),
DataType.UINT64: (int, "Q", 4),
DataType.FLOAT32: (float, "f", 2),
DataType.FLOAT64: (float, "d", 4),
DataType.STRING: (str, "s", 0),
DataType.BITS: (bool, "bits", 0),
DataType.INVALID: (int, "h", 1)
}
9 changes: 2 additions & 7 deletions pymodbus/simulator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@
"SimCore",
"SimData",
"SimDevice",
"SimDevices",
"SimValueType",
]

from .simcore import SimCore
from .simdata import (
SimAction,
SimData,
SimValueType,
)
from .simdevice import SimDevice, SimDevices
from .simdata import SimData, SimValueType
from .simdevice import SimAction, SimDevice
175 changes: 108 additions & 67 deletions pymodbus/simulator/simdata.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
"""Simulator data model classes.

**REMARK** This code is experimental and not integrated into production.
"""
"""Simulator data model classes."""
from __future__ import annotations

import inspect
from collections.abc import Awaitable, Callable
import struct
from dataclasses import dataclass
from typing import TypeAlias, cast

from ..constants import DATATYPE_STRUCT, DataType
from ..pdu import ExceptionResponse
from ..pdu.pdu import pack_bitstring


SimValueTypeSimple: TypeAlias = int | float | str | bytes
SimValueType: TypeAlias = SimValueTypeSimple | list[SimValueTypeSimple | bool]
SimAction: TypeAlias = Callable[[int, int, list[int]], Awaitable[list[int] | ExceptionResponse]]

@dataclass(order=True, frozen=True)
@dataclass
class SimData:
"""Configure a group of continuous identical values/registers.

Expand All @@ -39,6 +34,29 @@ class SimData:

Each SimData defines 5 INT32 in total 10 registers (address 100-109)


.. code-block:: python

SimData(
address=0,
count=1000,
values=0x1234
datatype=DataType.REGISTERS
)

Defines a range of registers (addresses) 0..999 each with the value 0x1234.


.. code-block:: python

SimData(
address=0,
count=1000,
datatype=DataType.INVALID
)

Defines a range of registers (addresses) 0..999 each marked as invalid.

.. code-block:: python

SimData(
Expand All @@ -54,31 +72,19 @@ class SimData:
)
SimData(
address=100,
values=0xffff
values=0xffff,
datatype=DataType.BITS
)
SimData(
address=100,
values=[0xffff]
values=[0xffff],
datatype=DataType.BITS
)

Each SimData defines 16 BITS (coils), with value True.

Value are stored in registers (16bit is 1 register), the address refers to the register.

**Remark** when using offsets, only bit 0 of each register is used!

.. code-block:: python

SimData(
address=0,
count=1000,
values=0x1234
datatype=DataType.REGISTERS
)

Defines a range of addresses 0..999 each with the value 0x1234.
Value are stored in registers (16bit is 1 register), the address refers to the register, unless
in non-shared mode where the address refers to the coil.
"""

#: Address of first register, starting with 0 (identical to the requests)
Expand All @@ -91,67 +97,102 @@ class SimData:
#: - count=1 datatype=DataType.STRING, values="ABCD" is 2 registers
#: - count=2 datatype=DataType.STRING, values="ABCD" is 4 registers
#:
#: Count cannot be used if values= is a list
#: if values= is a list, count will be applied to the whole list, e.g.
#:
#: - count=3 datatype=DataType.REGISTERS values=[3,2] is 6 registers.
#: - count=3 datatype=DataType.INT32 values=[3,2] is 12 registers.
#: - count=2 datatype=DataType.STRING, values=["ABCD", 'EFGH'] is 8 registers
count: int = 1

#: Value/Values of datatype,
#: will automatically be converted to registers, according to datatype.
values: SimValueType = 0

#: Used to check access and convert value to/from registers.
datatype: DataType = DataType.REGISTERS

#: Optional function to call when registers are being read/written.
#:
#: **Example function:**
#:
#: .. code-block:: python
#:
#: async def my_action(
#: function_code: int,
#: address: int,
#: registers: list[int]) -> list[int] | ExceptionResponse:
#:
# return registers
#:
#: .. tip:: use functools.partial to add extra parameters if needed.
action: SimAction | None = None
#: Used to check access and convert value to/from registers or mark as invalid.
datatype: DataType = DataType.INVALID

#: Mark register(s) as readonly.
readonly: bool = False

#: Mark register(s) as invalid.
#: **remark** only to be used with address= and count=
invalid: bool = False

def __check_simple(self):
"""Check simple parameters."""
if not isinstance(self.address, int) or not 0 <= self.address <= 65535:
raise TypeError("0 <= address < 65535")
if not isinstance(self.count, int) or not 1 <= self.count <= 65536:
raise TypeError("1 <= count < 65536")
if not 1 <= self.address + self.count <= 65536:
raise TypeError("1 <= address + count < 65536")
if self.address + self.count -1 > 65535:
raise TypeError("address= + count= outside address range")
if not isinstance(self.datatype, DataType):
raise TypeError("datatype= must by an DataType")
if self.action and not (callable(self.action) and inspect.iscoroutinefunction(self.action)):
raise TypeError("action= not a async function")
raise TypeError("datatype= must by a DataType")
if self.values and self.datatype == DataType.INVALID:
raise TypeError("values= cannot be used with invalid=True")
if isinstance(self.values, list) and not self.values:
raise TypeError("values= list cannot be empty")

def __check_parameters(self):
"""Check all parameters."""
self.__check_simple()
x_values = self.values if isinstance(self.values, list) else [self.values]
x_datatype, _x_struct, _x_len = DATATYPE_STRUCT[self.datatype]
if self.datatype == DataType.BITS:
x_datatype = int if isinstance(x_values[0], int) else bool
if x_datatype is bool and len(x_values) % 16:
raise TypeError("values= must be a multiple of 16")
for x_value in x_values:
if not isinstance(x_value, x_datatype):
raise TypeError(f"values= {x_value} is not {x_datatype!s}")
if x_datatype is str and not x_value:
raise TypeError("values= cannot contain empty string")

def __post_init__(self):
"""Define a group of registers."""
self.__check_simple()
if self.datatype == DataType.STRING:
if not isinstance(self.values, str):
raise TypeError("datatype=DataType.STRING only allows values=\"string\"")
x_datatype, x_len = str, int((len(self.values) +1) / 2)
self.__check_parameters()

def __convert_bytes_registers(self, byte_list: bytearray, word_order: str, byte_order: bool, data_type_len: int) -> list[int]:
"""Convert bytearray to registers."""
if byte_order:
regs = [
int.from_bytes(byte_list[x : x + 2], "big")
for x in range(0, len(byte_list), 2)
]
else:
regs = [
int.from_bytes([byte_list[x+1], byte_list[x]], "big")
for x in range(0, len(byte_list), 2)
]
if word_order == "big":
return regs
reversed_regs: list[int] = []
for x in range(0, len(regs), data_type_len):
single_value_regs = regs[x: x + data_type_len]
single_value_regs.reverse()
reversed_regs = reversed_regs + single_value_regs
return reversed_regs

def build_registers(self, endian: tuple[bool, bool], string_encoding: str) -> list[list[int]]:
"""Convert values= to registers."""
self.__check_parameters()
x_values = self.values if isinstance(self.values, list) else [self.values]
_x_datatype, x_struct, x_len = DATATYPE_STRUCT[self.datatype]
blocks_regs: list[list[int]] = []
word_order = "big" if endian[0] else "little"
if self.datatype == DataType.BITS:
if isinstance(x_values[0], bool):
bytes_bits = bytearray(pack_bitstring(cast(list[bool], x_values)))
else:
bytes_bits = bytearray()
for v in x_values:
bytes_bits.extend(struct.pack(">H", v))
blocks_regs.append(self.__convert_bytes_registers(bytes_bits, word_order, endian[1], x_len))
elif self.datatype == DataType.STRING:
for value in x_values:
bytes_string = cast(str, value).encode(string_encoding)
if len(bytes_string) % 2:
bytes_string += b"\x00"
blocks_regs.append(self.__convert_bytes_registers(bytearray(bytes_string), word_order, endian[1], x_len))
else:
x = DATATYPE_STRUCT[self.datatype]
x_len = x[1]
x_datatype = cast(type[str], x[0])
if not isinstance(self.values, list):
super().__setattr__("values", [self.values])
for x_value in cast(list, self.values):
if not isinstance(x_value, x_datatype):
raise TypeError(f"value= can only contain {x_datatype!s}")
super().__setattr__("register_count", self.count * x_len)
super().__setattr__("type_size", x_len)
for v in x_values:
byte_list = struct.pack(f">{x_struct}", v)
blocks_regs.append(self.__convert_bytes_registers(bytearray(byte_list), word_order, endian[1], x_len))
return blocks_regs * self.count
Loading