forked from gijzelaerr/python-snap7
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_multipacket.py
More file actions
628 lines (530 loc) · 23.5 KB
/
test_multipacket.py
File metadata and controls
628 lines (530 loc) · 23.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
"""Tests for S7 protocol behavior.
Tests USERDATA response parameter parsing, follow-up request building,
fragment-aware SZL parsing, multi-packet accumulation, protocol error codes,
and TPDU size configuration.
"""
import struct
from typing import Any, Dict
import pytest
from snap7.s7protocol import S7Protocol, S7PDUType, S7Function, S7UserDataGroup, S7UserDataSubfunction
from snap7.datatypes import S7Area, S7WordLen
from snap7.error import (
S7_PROTOCOL_ERROR_CODES,
S7ProtocolError,
S7StalePacketError,
S7PacketLostError,
get_protocol_error_message,
)
from snap7.connection import TPDUSize, ISOTCPConnection
@pytest.mark.client
class TestUserdataResponseParamParsing:
"""Test _parse_userdata_response_params via _parse_parameters."""
def setup_method(self) -> None:
self.protocol = S7Protocol()
def test_last_packet(self) -> None:
"""Parse USERDATA response params indicating last data unit."""
# 12-byte param section: last_data_unit=0x00 (done)
param_data = bytes(
[
0x00, # Reserved
0x01, # Param count
0x12, # Type header
0x08, # Length
0x12, # Method (response)
0x84, # Type(8) | Group(4=SZL)
0x01, # Subfunction (READ_SZL)
0x01, # Sequence number
0x00, # Data unit reference
0x00, # Last data unit (done)
0x00,
0x00, # Error code
]
)
result = self.protocol._parse_parameters(param_data)
assert result["group"] == S7UserDataGroup.SZL
assert result["subfunction"] == S7UserDataSubfunction.READ_SZL
assert result["sequence_number"] == 0x01
assert result["last_data_unit"] == 0x00
assert result["error_code"] == 0x0000
def test_more_data(self) -> None:
"""Parse USERDATA response params indicating more data coming."""
# last_data_unit=0x01 means more data
param_data = bytes(
[
0x00,
0x01,
0x12,
0x08,
0x12,
0x84, # Group 4 = SZL
0x01, # Subfunction
0x02, # Sequence number
0xD5, # Data unit reference
0x01, # Last data unit (MORE)
0x00,
0x00,
]
)
result = self.protocol._parse_parameters(param_data)
assert result["group"] == S7UserDataGroup.SZL
assert result["sequence_number"] == 0x02
assert result["last_data_unit"] == 0x01
def test_block_info_group(self) -> None:
"""Parse USERDATA response with block info group."""
param_data = bytes(
[
0x00,
0x01,
0x12,
0x08,
0x12,
0x83, # Type(8) | Group(3=BlockInfo)
0x02, # Subfunction (LIST_BLOCKS_OF_TYPE)
0x03, # Sequence number
0x00, # Data unit ref
0x01, # More data
0x00,
0x00,
]
)
result = self.protocol._parse_parameters(param_data)
assert result["group"] == S7UserDataGroup.BLOCK_INFO
assert result["subfunction"] == S7UserDataSubfunction.LIST_BLOCKS_OF_TYPE
assert result["sequence_number"] == 0x03
assert result["last_data_unit"] == 0x01
def test_non_userdata_still_works(self) -> None:
"""Non-USERDATA params still dispatch to existing parsers."""
# READ_AREA response
param_data = bytes([0x04, 0x01])
result = self.protocol._parse_parameters(param_data)
assert result["function_code"] == 0x04
assert result["item_count"] == 1
@pytest.mark.client
class TestFollowupRequestBuilder:
"""Test build_userdata_followup_request byte format."""
def setup_method(self) -> None:
self.protocol = S7Protocol()
def test_szl_followup(self) -> None:
"""Follow-up request for SZL has correct structure."""
pdu = self.protocol.build_userdata_followup_request(
group=S7UserDataGroup.SZL,
subfunction=S7UserDataSubfunction.READ_SZL,
sequence_number=0x02,
)
# S7 header: 10 bytes for USERDATA
assert pdu[0] == 0x32 # Protocol ID
assert pdu[1] == 0x07 # USERDATA
# Extract param_len and data_len from header
param_len = struct.unpack(">H", pdu[6:8])[0]
data_len = struct.unpack(">H", pdu[8:10])[0]
assert param_len == 8
assert data_len == 4
# Parameter section starts at offset 10
params = pdu[10 : 10 + param_len]
assert params[0] == 0x00 # Reserved
assert params[1] == 0x01 # Param count
assert params[2] == 0x12 # Type header
assert params[3] == 0x04 # Length
assert params[4] == 0x11 # Method (request)
assert params[5] == 0x44 # Type(4) | Group(4=SZL)
assert params[6] == 0x01 # Subfunction
assert params[7] == 0x02 # DataRef = sequence_number
# Data section
data = pdu[10 + param_len :]
assert data == bytes([0x0A, 0x00, 0x00, 0x00])
def test_block_info_followup(self) -> None:
"""Follow-up request for block info group."""
pdu = self.protocol.build_userdata_followup_request(
group=S7UserDataGroup.BLOCK_INFO,
subfunction=S7UserDataSubfunction.LIST_BLOCKS_OF_TYPE,
sequence_number=0x05,
)
params = pdu[10:18]
assert params[5] == 0x43 # Type(4) | Group(3=BlockInfo)
assert params[6] == 0x02 # LIST_BLOCKS_OF_TYPE
assert params[7] == 0x05 # DataRef
@pytest.mark.client
class TestSzlFragmentParsing:
"""Test parse_read_szl_response with first_fragment flag."""
def setup_method(self) -> None:
self.protocol = S7Protocol()
def test_first_fragment_parses_header(self) -> None:
"""First fragment extracts SZL ID and Index from data."""
response: Dict[str, Any] = {
"data": {
"data": b"\x00\x1c\x00\x00" + b"\xaa\xbb\xcc",
}
}
result = self.protocol.parse_read_szl_response(response, first_fragment=True)
assert result["szl_id"] == 0x001C
assert result["szl_index"] == 0x0000
assert result["data"] == b"\xaa\xbb\xcc"
def test_followup_fragment_raw_data(self) -> None:
"""Follow-up fragment returns all data as raw payload."""
payload = b"\xdd\xee\xff\x01\x02\x03"
response: Dict[str, Any] = {
"data": {
"data": payload,
}
}
result = self.protocol.parse_read_szl_response(response, first_fragment=False)
assert result["szl_id"] == 0
assert result["szl_index"] == 0
assert result["data"] == payload
def test_default_is_first_fragment(self) -> None:
"""Default behavior (no flag) treats as first fragment."""
response: Dict[str, Any] = {
"data": {
"data": b"\x00\x11\x00\x22" + b"\x33",
}
}
result = self.protocol.parse_read_szl_response(response)
assert result["szl_id"] == 0x0011
assert result["szl_index"] == 0x0022
assert result["data"] == b"\x33"
@pytest.mark.client
class TestMultiPacketSzlIntegration:
"""Integration test: mock connection returning 2-packet SZL response.
Uses real packet data from nikteliy's pcap captures.
"""
def _build_full_pdu(self, param_bytes: bytes, data_bytes: bytes) -> bytes:
"""Helper to build a complete S7 USERDATA PDU."""
header = struct.pack(
">BBHHHH",
0x32, # Protocol ID
0x07, # USERDATA
0x0000, # Reserved
0x0001, # Sequence
len(param_bytes),
len(data_bytes),
)
return header + param_bytes + data_bytes
def test_two_packet_szl_response(self) -> None:
"""Simulate a 2-packet SZL response using nikteliy's test data."""
# First response: last_data_unit=0x01 (more data), seq=0x02
# Real packet data from pcap
response1 = (
b"\x32\x07\x00\x00\x02\x00\x00\x0c\x00\xda"
b"\x00\x01\x12\x08\x12\x84\x01\x02\xd5\x01\x00\x00"
b"\xff\x09\x00\xd6"
b"\x00\x1c\x00\x00"
b"\x00\x22\x00\x0a"
b"\x00\x01"
b"\x53\x37\x33\x30\x30\x2f\x45\x54\x32\x30\x30\x4d"
b"\x20\x73\x74\x61\x74\x69\x6f\x6e\x5f\x31\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00"
b"\x00\x02"
b"\x50\x4c\x43\x5f\x31\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00"
b"\x00\x00\x00\x03"
b"\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x04"
b"\x4f\x72\x69\x67\x69\x6e\x61\x6c"
b"\x20\x53\x69\x65\x6d\x65\x6e\x73\x20\x45\x71\x75\x69\x70"
b"\x6d\x65\x6e\x74\x00\x00\x00\x00\x00\x00"
b"\x00\x05"
b"\x53\x20\x43\x2d\x42\x31\x55\x33\x39\x33\x31\x34\x32\x30\x31\x31"
b"\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x07"
b"\x43\x50\x55\x20\x33\x31\x35\x2d\x32\x20\x50\x4e\x2f\x44\x50"
b"\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08"
)
# Second response: last_data_unit=0x00 (done), seq=0x02
response2 = (
b"\x32\x07\x00\x00\x03\x00\x00\x0c\x00\x8a"
b"\x00\x01\x12\x08\x12\x84\x01\x02\xd5\x00\x00\x00"
b"\xff\x09\x00\x86"
b"\x4d\x4d\x43\x20\x34\x41\x31\x41\x43\x30\x31\x39"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x09"
b"\x00\x2a\xf6\x00"
b"\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x0a"
b"\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x0b"
b"\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
protocol = S7Protocol()
# Parse first response
parsed1 = protocol.parse_response(response1)
params1 = parsed1["parameters"]
assert params1["last_data_unit"] == 0x01 # More data
assert params1["sequence_number"] == 0x02
# Parse SZL header from first fragment
szl1 = protocol.parse_read_szl_response(parsed1, first_fragment=True)
assert szl1["szl_id"] == 0x001C
first_data = szl1["data"]
# Parse second response
parsed2 = protocol.parse_response(response2)
params2 = parsed2["parameters"]
assert params2["last_data_unit"] == 0x00 # Done
# Parse follow-up fragment (no SZL header)
szl2 = protocol.parse_read_szl_response(parsed2, first_fragment=False)
second_data = szl2["data"]
# Combined data should be larger than either fragment alone
combined = first_data + second_data
assert len(combined) > len(first_data)
assert len(combined) > len(second_data)
assert len(combined) == len(first_data) + len(second_data)
def test_single_packet_no_loop(self) -> None:
"""Single-packet response (last_data_unit=0x00) skips follow-up."""
protocol = S7Protocol()
# Build a single-packet SZL response with last_data_unit=0x00
param_bytes = bytes(
[
0x00,
0x01,
0x12,
0x08,
0x12,
0x84,
0x01,
0x01,
0x00,
0x00,
0x00,
0x00,
]
)
data_bytes = (
b"\xff\x09\x00\x08" # return_code, transport_size, length
b"\x00\x1c\x00\x00" # SZL ID=0x001C, Index=0
b"\xaa\xbb\xcc\xdd" # payload
)
pdu = self._build_full_pdu(param_bytes, data_bytes)
parsed = protocol.parse_response(pdu)
params = parsed["parameters"]
assert params["last_data_unit"] == 0x00
szl = protocol.parse_read_szl_response(parsed, first_fragment=True)
assert szl["szl_id"] == 0x001C
assert szl["data"] == b"\xaa\xbb\xcc\xdd"
@pytest.mark.client
class TestProtocolErrorCodes:
"""Test S7 protocol error code dictionary and exception hierarchy."""
def test_error_codes_count(self) -> None:
"""S7_PROTOCOL_ERROR_CODES should have ~210 entries."""
assert len(S7_PROTOCOL_ERROR_CODES) > 200
def test_known_code_lookup(self) -> None:
assert get_protocol_error_message(0x0000) == "No error"
assert get_protocol_error_message(0x8104) == (
"This service is not implemented on the module or a frame error was reported"
)
assert "Illegal job number" in get_protocol_error_message(0xD001)
def test_unknown_code_lookup(self) -> None:
msg = get_protocol_error_message(0xFFFF)
assert "Unknown protocol error" in msg
assert "0xffff" in msg
def test_code_ranges_present(self) -> None:
"""Verify codes from key ranges are included."""
assert 0x0110 in S7_PROTOCOL_ERROR_CODES # block-related
assert 0x8100 in S7_PROTOCOL_ERROR_CODES # service/protocol
assert 0xD001 in S7_PROTOCOL_ERROR_CODES # USERDATA parameter
assert 0xD601 in S7_PROTOCOL_ERROR_CODES # USERDATA parameter
assert 0xE201 in S7_PROTOCOL_ERROR_CODES # sync
def test_exception_hierarchy(self) -> None:
"""Stale/Lost exceptions inherit from S7ProtocolError."""
assert issubclass(S7StalePacketError, S7ProtocolError)
assert issubclass(S7PacketLostError, S7ProtocolError)
def test_parse_response_raises_on_error_class(self) -> None:
"""parse_response should raise S7ProtocolError when error_class != 0."""
proto = S7Protocol()
pdu = struct.pack(
">BBHHHHBB",
0x32, # protocol ID
0x03, # ACK_DATA
0x0000, # reserved
0x0001, # sequence
0x0000, # param length
0x0000, # data length
0x81, # error class
0x04, # error code
)
with pytest.raises(S7ProtocolError, match="protocol error"):
proto.parse_response(pdu)
@pytest.mark.client
class TestTPDUSize:
"""Test TPDUSize enum and COTP negotiation."""
def test_enum_values(self) -> None:
assert int(TPDUSize.S_128) == 0x07
assert int(TPDUSize.S_256) == 0x08
assert int(TPDUSize.S_512) == 0x09
assert int(TPDUSize.S_1024) == 0x0A
assert int(TPDUSize.S_2048) == 0x0B
assert int(TPDUSize.S_4096) == 0x0C
assert int(TPDUSize.S_8192) == 0x0D
def test_actual_sizes(self) -> None:
"""Verify 2^code gives correct byte sizes."""
assert 1 << TPDUSize.S_128 == 128
assert 1 << TPDUSize.S_1024 == 1024
assert 1 << TPDUSize.S_8192 == 8192
def test_default_tpdu_size(self) -> None:
"""ISOTCPConnection should default to S_1024."""
conn = ISOTCPConnection("127.0.0.1")
assert conn.tpdu_size == TPDUSize.S_1024
def test_custom_tpdu_size(self) -> None:
"""ISOTCPConnection should accept custom TPDU size."""
conn = ISOTCPConnection("127.0.0.1", tpdu_size=TPDUSize.S_4096)
assert conn.tpdu_size == TPDUSize.S_4096
def test_tpdu_size_in_cotp_cr(self) -> None:
"""COTP CR PDU should contain the configured TPDU size."""
conn = ISOTCPConnection("127.0.0.1", tpdu_size=TPDUSize.S_2048)
cr_pdu = conn._build_cotp_cr()
assert cr_pdu[-3:] == bytes([0xC0, 0x01, TPDUSize.S_2048])
# -----------------------------------------------------------------------
# Multi-item read request/response tests
# -----------------------------------------------------------------------
@pytest.mark.client
class TestBuildMultiReadRequest:
"""Test build_multi_read_request PDU construction."""
def setup_method(self) -> None:
self.protocol = S7Protocol()
def test_single_item(self) -> None:
"""Single-item multi-read should produce valid PDU."""
pdu = self.protocol.build_multi_read_request([(S7Area.DB, 1, 0, 4)])
# Request header: BBHHHH = 10 bytes
assert pdu[0] == 0x32 # Protocol ID
assert pdu[1] == S7PDUType.REQUEST
# Param length = 2 + 1*12 = 14 (at offset 6)
param_len = struct.unpack(">H", pdu[6:8])[0]
assert param_len == 14
# Data length = 0 (at offset 8)
data_len = struct.unpack(">H", pdu[8:10])[0]
assert data_len == 0
# Function code and item count (at offset 10)
assert pdu[10] == S7Function.READ_AREA
assert pdu[11] == 1 # item count
def test_multiple_items(self) -> None:
"""Multi-item request should pack N address specs."""
items = [
(S7Area.DB, 1, 0, 4),
(S7Area.DB, 1, 10, 8),
(S7Area.MK, 0, 0, 2),
]
pdu = self.protocol.build_multi_read_request(items)
param_len = struct.unpack(">H", pdu[6:8])[0]
assert param_len == 2 + 3 * 12 # 38
assert pdu[11] == 3 # item count
# Total PDU length: 10 (header) + 38 (params)
assert len(pdu) == 10 + 38
def test_address_spec_format(self) -> None:
"""Address specs should use BYTE word length and correct encoding."""
pdu = self.protocol.build_multi_read_request([(S7Area.DB, 5, 10, 20)])
# Address spec starts at byte 12 (header=10, func+count=2)
addr = pdu[12:]
assert addr[0] == 0x12 # Spec type
assert addr[1] == 0x0A # Length
assert addr[2] == 0x10 # Syntax ID
assert addr[3] == S7WordLen.BYTE # Word length
# Count = 20
count = struct.unpack(">H", addr[4:6])[0]
assert count == 20
# DB number = 5
db_num = struct.unpack(">H", addr[6:8])[0]
assert db_num == 5
# Area = DB
assert addr[8] == S7Area.DB
def test_sequence_increments(self) -> None:
"""Each call should increment the sequence number."""
pdu1 = self.protocol.build_multi_read_request([(S7Area.DB, 1, 0, 4)])
pdu2 = self.protocol.build_multi_read_request([(S7Area.DB, 1, 0, 4)])
seq1 = struct.unpack(">H", pdu1[4:6])[0]
seq2 = struct.unpack(">H", pdu2[4:6])[0]
assert seq2 == seq1 + 1
@pytest.mark.client
class TestExtractMultiReadData:
"""Test extract_multi_read_data response parsing."""
def setup_method(self) -> None:
self.protocol = S7Protocol()
def _build_item(self, data: bytes, return_code: int = 0xFF, transport_size: int = 0x04) -> bytes:
"""Build a single response data item."""
bit_length = len(data) * 8
return struct.pack(">BBH", return_code, transport_size, bit_length) + data
def test_single_item(self) -> None:
data_section = self._build_item(b"\x01\x02\x03\x04")
response: Dict[str, Any] = {"raw_data_section": data_section}
result = self.protocol.extract_multi_read_data(response, 1)
assert len(result) == 1
assert result[0] == bytearray(b"\x01\x02\x03\x04")
def test_multiple_items_even_length(self) -> None:
"""Multiple items with even-length data — no fill bytes needed."""
item1 = self._build_item(b"\x01\x02\x03\x04") # 4 bytes (even)
item2 = self._build_item(b"\x05\x06") # 2 bytes (even)
response: Dict[str, Any] = {"raw_data_section": item1 + item2}
result = self.protocol.extract_multi_read_data(response, 2)
assert result[0] == bytearray(b"\x01\x02\x03\x04")
assert result[1] == bytearray(b"\x05\x06")
def test_fill_byte_between_items(self) -> None:
"""Odd-length item should have a fill byte before the next item."""
item1_data = b"\x01\x02\x03" # 3 bytes (odd)
item1 = self._build_item(item1_data)
fill = b"\x00" # Fill byte
item2 = self._build_item(b"\x04\x05\x06\x07")
response: Dict[str, Any] = {"raw_data_section": item1 + fill + item2}
result = self.protocol.extract_multi_read_data(response, 2)
assert result[0] == bytearray(b"\x01\x02\x03")
assert result[1] == bytearray(b"\x04\x05\x06\x07")
def test_no_fill_after_last_item(self) -> None:
"""Last item with odd length should NOT expect a fill byte."""
item1 = self._build_item(b"\x01\x02\x03\x04") # even
item2 = self._build_item(b"\x05\x06\x07") # odd, but last
response: Dict[str, Any] = {"raw_data_section": item1 + item2}
result = self.protocol.extract_multi_read_data(response, 2)
assert result[1] == bytearray(b"\x05\x06\x07")
def test_error_item_raises(self) -> None:
"""Non-success return code should raise S7ProtocolError."""
data_section = self._build_item(b"", return_code=0x05) # Invalid address
response: Dict[str, Any] = {"raw_data_section": data_section}
with pytest.raises(S7ProtocolError, match="Multi-read item 0 failed"):
self.protocol.extract_multi_read_data(response, 1)
def test_missing_raw_data_section(self) -> None:
with pytest.raises(S7ProtocolError, match="No raw data section"):
self.protocol.extract_multi_read_data({}, 1)
def test_three_items_mixed_fill(self) -> None:
"""Three items: even, odd (fill), even."""
item1 = self._build_item(b"\x01\x02") # 2 bytes, even
item2 = self._build_item(b"\x03\x04\x05") # 3 bytes, odd
fill = b"\x00"
item3 = self._build_item(b"\x06\x07\x08\x09") # 4 bytes, even
response: Dict[str, Any] = {"raw_data_section": item1 + item2 + fill + item3}
result = self.protocol.extract_multi_read_data(response, 3)
assert result[0] == bytearray(b"\x01\x02")
assert result[1] == bytearray(b"\x03\x04\x05")
assert result[2] == bytearray(b"\x06\x07\x08\x09")
@pytest.mark.client
class TestParseResponseRawDataSection:
"""Test that parse_response preserves raw_data_section."""
def setup_method(self) -> None:
self.protocol = S7Protocol()
def test_raw_data_section_preserved(self) -> None:
"""parse_response should store raw_data_section for multi-read use."""
# Build a minimal ACK_DATA response with data
data_section = struct.pack(">BBH", 0xFF, 0x04, 32) + b"\x01\x02\x03\x04"
# We need to set the sequence so it matches
self.protocol.sequence = 1
pdu = struct.pack(
">BBHHHHBB",
0x32, # Protocol ID
S7PDUType.ACK_DATA,
0x0000, # Reserved
1, # Sequence
0x0002, # Param length
len(data_section), # Data length
0x00, # Error class
0x00, # Error code
)
pdu += struct.pack(">BB", S7Function.READ_AREA, 0x01) # Parameters
pdu += data_section
response = self.protocol.parse_response(pdu)
assert "raw_data_section" in response
assert response["raw_data_section"] == data_section