forked from gijzelaerr/python-snap7
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatatypes.py
More file actions
312 lines (256 loc) · 10.9 KB
/
datatypes.py
File metadata and controls
312 lines (256 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""
S7 data types and conversion utilities.
Handles S7-specific data types, endianness conversion, and address encoding.
"""
import struct
from enum import IntEnum
from typing import List, NoReturn, Sequence, Tuple, Union
def _assert_never(value: NoReturn) -> NoReturn:
"""Exhaustive type check helper (equivalent to typing.assert_never for Python <3.11)."""
raise AssertionError(f"Unhandled value: {value!r}")
def _validate_bit_addr(bit_addr: int) -> None:
"""Validate that a bit address is in the valid range 0-7."""
if not 0 <= bit_addr <= 7:
raise ValueError(f"Bit address must be 0-7, got {bit_addr}")
class S7Area(IntEnum):
"""S7 memory area identifiers."""
PE = 0x81 # Process Input (Peripheral Input)
PA = 0x82 # Process Output (Peripheral Output)
MK = 0x83 # Memory/Merkers (Flags)
DB = 0x84 # Data Blocks
CT = 0x1C # Counters
TM = 0x1D # Timers
class S7WordLen(IntEnum):
"""S7 data word length identifiers."""
BIT = 0x01 # Single bit
BYTE = 0x02 # 8-bit byte
CHAR = 0x03 # 8-bit character
WORD = 0x04 # 16-bit word
INT = 0x05 # 16-bit signed integer
DWORD = 0x06 # 32-bit double word
DINT = 0x07 # 32-bit signed integer
REAL = 0x08 # 32-bit IEEE float
COUNTER = 0x1C # Counter value
TIMER = 0x1D # Timer value
class S7DataTypes:
"""S7 data type conversion utilities."""
# Word length to byte size mapping
WORD_LEN_SIZE = {
S7WordLen.BIT: 1, # Bit operations use 1 byte
S7WordLen.BYTE: 1, # 1 byte
S7WordLen.CHAR: 1, # 1 byte
S7WordLen.WORD: 2, # 2 bytes
S7WordLen.INT: 2, # 2 bytes
S7WordLen.DWORD: 4, # 4 bytes
S7WordLen.DINT: 4, # 4 bytes
S7WordLen.REAL: 4, # 4 bytes
S7WordLen.COUNTER: 2, # 2 bytes
S7WordLen.TIMER: 2, # 2 bytes
}
@staticmethod
def get_size_bytes(word_len: S7WordLen, count: int = 1) -> int:
"""Get total size in bytes for given word length and count."""
return S7DataTypes.WORD_LEN_SIZE[word_len] * count
@staticmethod
def encode_address(area: S7Area, db_number: int, start: int, word_len: S7WordLen, count: int) -> bytes:
"""
Encode S7 address into parameter format.
Returns 12-byte parameter section for read/write operations.
"""
# Parameter format for read/write operations
# Byte 0: Specification type (0x12 for address specification)
# Byte 1: Length of following address specification (0x0A = 10 bytes)
# Byte 2: Syntax ID (0x10 = S7-Any)
# Byte 3: Transport size (word length)
# Bytes 4-5: Count (number of items)
# Bytes 6-7: DB number (for DB area) or 0
# Bytes 8: Area code
# Bytes 9-11: Start address (byte.bit format)
if start < 0:
raise ValueError(f"Start address must be non-negative, got {start}")
# Convert start address to byte.bit format
if word_len == S7WordLen.BIT:
# For bit access: byte address + bit offset
byte_addr = start // 8
bit_addr = start % 8
address = (byte_addr << 3) | bit_addr
else:
# For word access: convert to bit address
address = start * 8
address_bytes = struct.pack(">I", address)[1:] # 3-byte address (big-endian)
return struct.pack(
">BBBBHHB3s",
0x12, # Specification type
0x0A, # Length of address spec
0x10, # Syntax ID (S7-Any)
word_len, # Transport size
count, # Count
db_number if area == S7Area.DB else 0, # DB number
area, # Area code
address_bytes, # 3-byte address (big-endian)
)
@staticmethod
def decode_s7_data(data: bytes, word_len: S7WordLen, count: int) -> List[Union[bool, int, float]]:
"""
Decode S7 data from bytes to Python values.
Handles Siemens big-endian byte order.
"""
values: List[Union[bool, int, float]] = []
offset = 0
for i in range(count):
if word_len == S7WordLen.BIT:
# Extract single bit
byte_val = data[offset]
values.append(bool(byte_val))
offset += 1
elif word_len == S7WordLen.BYTE or word_len == S7WordLen.CHAR:
# 8-bit values
values.append(data[offset])
offset += 1
elif word_len == S7WordLen.WORD or word_len == S7WordLen.COUNTER or word_len == S7WordLen.TIMER:
# 16-bit unsigned values (big-endian)
value = struct.unpack(">H", data[offset : offset + 2])[0]
values.append(value)
offset += 2
elif word_len == S7WordLen.INT:
# 16-bit signed values (big-endian)
value = struct.unpack(">h", data[offset : offset + 2])[0]
values.append(value)
offset += 2
elif word_len == S7WordLen.DWORD:
# 32-bit unsigned values (big-endian)
value = struct.unpack(">I", data[offset : offset + 4])[0]
values.append(value)
offset += 4
elif word_len == S7WordLen.DINT:
# 32-bit signed values (big-endian)
value = struct.unpack(">i", data[offset : offset + 4])[0]
values.append(value)
offset += 4
elif word_len == S7WordLen.REAL:
# 32-bit IEEE float (big-endian)
value = struct.unpack(">f", data[offset : offset + 4])[0]
values.append(value)
offset += 4
else:
_assert_never(word_len)
return values
@staticmethod
def encode_s7_data(values: Sequence[Union[bool, int, float]], word_len: S7WordLen) -> bytes:
"""
Encode Python values to S7 data bytes.
Handles Siemens big-endian byte order.
"""
data = bytearray()
for value in values:
if word_len == S7WordLen.BIT:
# Single bit to byte
data.append(0x01 if value else 0x00)
elif word_len == S7WordLen.BYTE or word_len == S7WordLen.CHAR:
# 8-bit values
data.append(int(value) & 0xFF)
elif word_len == S7WordLen.WORD or word_len == S7WordLen.COUNTER or word_len == S7WordLen.TIMER:
# 16-bit unsigned values (big-endian)
data.extend(struct.pack(">H", int(value) & 0xFFFF))
elif word_len == S7WordLen.INT:
# 16-bit signed values (big-endian)
data.extend(struct.pack(">h", int(value)))
elif word_len == S7WordLen.DWORD:
# 32-bit unsigned values (big-endian)
data.extend(struct.pack(">I", int(value) & 0xFFFFFFFF))
elif word_len == S7WordLen.DINT:
# 32-bit signed values (big-endian)
data.extend(struct.pack(">i", int(value)))
elif word_len == S7WordLen.REAL:
# 32-bit IEEE float (big-endian)
data.extend(struct.pack(">f", float(value)))
else:
_assert_never(word_len)
return bytes(data)
@staticmethod
def parse_address(address_str: str) -> Tuple[S7Area, int, int]:
"""
Parse S7 address string to area, DB number, and offset.
Examples:
- "DB1.DBX0.0" -> (DB, 1, 0)
- "M10.5" -> (MK, 0, 85) # bit 5 of byte 10 = bit 85
- "IW20" -> (PE, 0, 20)
"""
address_str = address_str.upper().strip()
# Data Block addresses: DB1.DBX0.0, DB1.DBW10, etc.
if address_str.startswith("DB"):
db_part, addr_part = address_str.split(".", 1)
db_number = int(db_part[2:])
if addr_part.startswith("DBX"):
# Bit address: DBX10.5
if "." in addr_part:
byte_addr, bit_addr = addr_part[3:].split(".")
_validate_bit_addr(int(bit_addr))
offset = int(byte_addr) * 8 + int(bit_addr)
else:
offset = int(addr_part[3:]) * 8
elif addr_part.startswith("DBB"):
# Byte address: DBB10
offset = int(addr_part[3:])
elif addr_part.startswith("DBW"):
# Word address: DBW10
offset = int(addr_part[3:])
elif addr_part.startswith("DBD"):
# Double word address: DBD10
offset = int(addr_part[3:])
else:
raise ValueError(f"Invalid DB address format: {address_str}")
return S7Area.DB, db_number, offset
# Memory/Flag addresses: M10.5, MW20, etc.
elif address_str.startswith("M"):
if "." in address_str:
# Bit address: M10.5
byte_addr, bit_addr = address_str[1:].split(".")
_validate_bit_addr(int(bit_addr))
offset = int(byte_addr) * 8 + int(bit_addr)
elif address_str.startswith("MW"):
# Word address: MW20
offset = int(address_str[2:])
elif address_str.startswith("MD"):
# Double word address: MD20
offset = int(address_str[2:])
else:
# Byte address: M10
offset = int(address_str[1:])
return S7Area.MK, 0, offset
# Input addresses: I0.0, IW10, etc.
elif address_str.startswith("I"):
if "." in address_str:
# Bit address: I0.0
byte_addr, bit_addr = address_str[1:].split(".")
_validate_bit_addr(int(bit_addr))
offset = int(byte_addr) * 8 + int(bit_addr)
elif address_str.startswith("IW"):
# Word address: IW10
offset = int(address_str[2:])
elif address_str.startswith("ID"):
# Double word address: ID10
offset = int(address_str[2:])
else:
# Byte address: I10
offset = int(address_str[1:])
return S7Area.PE, 0, offset
# Output addresses: Q0.0, QW10, etc.
elif address_str.startswith("Q"):
if "." in address_str:
# Bit address: Q0.0
byte_addr, bit_addr = address_str[1:].split(".")
_validate_bit_addr(int(bit_addr))
offset = int(byte_addr) * 8 + int(bit_addr)
elif address_str.startswith("QW"):
# Word address: QW10
offset = int(address_str[2:])
elif address_str.startswith("QD"):
# Double word address: QD10
offset = int(address_str[2:])
else:
# Byte address: Q10
offset = int(address_str[1:])
return S7Area.PA, 0, offset
else:
raise ValueError(f"Unsupported address format: {address_str}")