Skip to content

Commit eb517a0

Browse files
committed
examples/usb: Add a USBDevice example implementing the DFU protocol.
Signed-off-by: Damien George <damien@micropython.org>
1 parent b2df89c commit eb517a0

File tree

1 file changed

+327
-0
lines changed

1 file changed

+327
-0
lines changed

examples/usb/usb_dfu_device.py

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
# Implementation of USB DFU device in Python.
2+
#
3+
# To run, just execute this file on a device with machine.USBDevice support. The device
4+
# will then change to DFU mode.
5+
#
6+
# For example, use `mpremote` (the `--no-follow` option starts the script running
7+
# without waiting for a response, because there won't be a response, the USB will change
8+
# to a DFU device):
9+
#
10+
# $ mpremote run --no-follow usb_dfu_device.py
11+
#
12+
# Then you can access the DFU device using the `pydfu.py` script in this repository, to
13+
# list DFU device, copy a file to the device, then exit DFU mode:
14+
#
15+
# $ ../../tools/pydfu.py -l
16+
# $ ../../tools/pydfu.py -u <file.dfu>
17+
#
18+
# After running the last command above, the USB CDC device and REPL should reappear.
19+
20+
import struct, machine
21+
22+
# USB constants for bmRequestType.
23+
USB_REQ_RECIP_INTERFACE = 0x01
24+
USB_REQ_TYPE_CLASS = 0x20
25+
USB_DIR_OUT = 0x00
26+
USB_DIR_IN = 0x80
27+
28+
# String describing the memory layout of the DFU device.
29+
MEMORY_LAYOUT = b"@Internal Flash /0x08000000/16*128Kg"
30+
31+
# VID and PID of the DFU device (these are the ST values).
32+
VID = 0x0483
33+
PID = 0xDF11
34+
35+
# Maximum transfer size for RX and TX.
36+
wTransferSize = 2048
37+
38+
# DFU device descriptor.
39+
_desc_dev = bytes(
40+
[
41+
0x12, # bLength
42+
0x01, # bDescriptorType: Device
43+
0x00,
44+
0x02, # USB version: 2.00
45+
0x00, # bDeviceClass
46+
0x00, # bDeviceSubClass
47+
0x00, # bDeviceProtocol
48+
0x40, # bMaxPacketSize
49+
VID & 0xFF,
50+
VID >> 8, # VID
51+
PID & 0xFF,
52+
PID >> 8, # PID
53+
0x00,
54+
0x01, # bcdDevice: 1.00
55+
0x11, # iManufacturer
56+
0x12, # iProduct
57+
0x13, # iSerialNumber
58+
0x01, # bNumConfigurations: 1
59+
]
60+
)
61+
62+
# DFU configuration descriptor.
63+
_desc_cfg = bytes(
64+
[
65+
# Configuration Descriptor.
66+
0x09, # bLength
67+
0x02, # bDescriptorType
68+
0x1B,
69+
0x00, # wTotalLength: 27
70+
0x01, # bNumInterfaces
71+
0x01, # bConfigurationValue
72+
0x00, # iConfiguration
73+
0x80, # bmAttributes (bus powered)
74+
0x32, # bMaxPower
75+
# Interface Descriptor.
76+
0x09, # bLength
77+
0x04, # bDescriptorType
78+
0x00, # bInterfaceNumber
79+
0x00, # bNumEndpointns
80+
0x00, # bAlternateSetting
81+
0xFE, # bInterfaceClass: application specific interface
82+
0x01, # bInterfaceSubClasse: device firmware update
83+
0x02, # bInterfaceProtocol
84+
0x14, # iInterface
85+
# Device Firmware Upgrade Interface Descriptor.
86+
0x09, # bLength
87+
0x21, # bDescriptorType
88+
0x0B, # bmAttributes (will detach, upload supported, download supported)
89+
0xFF,
90+
0x00, # wDetatchTimeout
91+
wTransferSize & 0xFF,
92+
wTransferSize >> 8, # wTransferSize
93+
0x1A,
94+
0x01, # bcdDFUVersion
95+
]
96+
)
97+
98+
# DFU strings.
99+
_desc_strs = {
100+
0x11: b"iManufacturer",
101+
0x12: b"iProduct",
102+
0x13: b"iSerialNumber",
103+
0x14: MEMORY_LAYOUT,
104+
}
105+
106+
107+
# This class handles the DFU USB device logic.
108+
class DFUOverUSB:
109+
def __init__(self, dfu):
110+
# USB buffer for transfers.
111+
self.usb_buf = bytearray(wTransferSize)
112+
# Instance of the DFU state machine.
113+
self.dfu = dfu
114+
115+
def _control_xfer_cb(self, stage, request):
116+
bmRequestType, bRequest, wValue, wIndex, wLength = struct.unpack("<BBHHH", request)
117+
if stage == 1: # SETUP
118+
if bmRequestType == USB_DIR_OUT | USB_REQ_TYPE_CLASS | USB_REQ_RECIP_INTERFACE:
119+
# Data coming from host, prepare to receive it.
120+
return memoryview(self.usb_buf)[:wLength]
121+
if bmRequestType == USB_DIR_IN | USB_REQ_TYPE_CLASS | USB_REQ_RECIP_INTERFACE:
122+
# Host requests data, prepare to send it.
123+
buf = memoryview(self.usb_buf)[:wLength]
124+
return self.dfu.handle_tx(bRequest, wValue, buf)
125+
elif stage == 3: # ACK
126+
if bmRequestType & USB_DIR_IN:
127+
# EP0 TX sent.
128+
self.dfu.process()
129+
else:
130+
# EP0 RX ready.
131+
buf = memoryview(self.usb_buf)[:wLength]
132+
self.dfu.handle_rx(bRequest, wValue, buf)
133+
return True
134+
135+
136+
# This class handles the DFU state machine.
137+
class DFU:
138+
# DFU class requests.
139+
DETACH = 0
140+
DNLOAD = 1
141+
UPLOAD = 2
142+
GETSTATUS = 3
143+
CLRSTATUS = 4
144+
GETSTATE = 5
145+
ABORT = 6
146+
147+
# DFU states.
148+
STATE_IDLE = 2
149+
STATE_BUSY = 4
150+
STATE_DNLOAD_IDLE = 5
151+
STATE_MANIFEST = 7
152+
STATE_UPLOAD_IDLE = 9
153+
STATE_ERROR = 0xA
154+
155+
# DFU commands.
156+
CMD_NONE = 0
157+
CMD_EXIT = 1
158+
CMD_UPLOAD = 7
159+
CMD_DNLOAD = 8
160+
161+
# Download sub-commands.
162+
CMD_DNLOAD_SET_ADDRESS = 0x21
163+
CMD_DNLOAD_ERASE = 0x41
164+
CMD_DNLOAD_READ_UNPROTECT = 0x92
165+
166+
# Error status flags.
167+
STATUS_OK = 0x00
168+
169+
def __init__(self):
170+
self.state = DFU.STATE_IDLE
171+
self.cmd = DFU.CMD_NONE
172+
self.status = DFU.STATUS_OK
173+
self.error = 0
174+
self.leave_dfu = False
175+
self.addr = 0
176+
self.dnload_block_num = 0
177+
self.dnload_len = 0
178+
self.dnload_buf = bytearray(wTransferSize)
179+
180+
def handle_rx(self, cmd, arg, buf):
181+
# Handle an incoming packet of data.
182+
if cmd == DFU.CLRSTATUS:
183+
self.state = DFU.STATE_IDLE
184+
self.cmd = DFU.CMD_NONE
185+
self.status = DFU.STATUS_OK
186+
self.error = 0
187+
elif cmd == DFU.ABORT:
188+
self.state = DFU.STATE_IDLE
189+
self.cmd = DFU.CMD_NONE
190+
self.status = DFU.STATUS_OK
191+
self.error = 0
192+
elif cmd == DFU.DNLOAD:
193+
if len(buf) == 0:
194+
# Exit DFU.
195+
self.cmd = DFU.CMD_EXIT
196+
else:
197+
# Download data to device.
198+
self.cmd = DFU.CMD_DNLOAD
199+
self.dnload_block_num = arg
200+
self.dnload_len = len(buf)
201+
self.dnload_buf[: len(buf)] = buf
202+
203+
def handle_tx(self, cmd, arg, buf):
204+
# Prepare data to go to the host.
205+
if cmd == DFU.UPLOAD:
206+
if arg >= 2:
207+
self.cmd = DFU.CMD_UPLOAD
208+
addr = (arg - 2) * len(buf) + self.addr
209+
self.do_read(addr, buf)
210+
return buf
211+
return None
212+
elif cmd == DFU.GETSTATUS and len(buf) == 6:
213+
if self.cmd == DFU.CMD_NONE:
214+
pass
215+
elif self.cmd == DFU.CMD_EXIT:
216+
self.state = DFU.STATE_MANIFEST
217+
elif self.cmd == DFU.CMD_UPLOAD:
218+
self.state = DFU.STATE_UPLOAD_IDLE
219+
elif self.cmd == DFU.CMD_DNLOAD:
220+
self.state = DFU.STATE_BUSY
221+
else:
222+
self.state = DFU.STATE_BUSY
223+
224+
# Populate the buffer to return to the host.
225+
buf[0] = self.status
226+
buf[1] = 0
227+
buf[2] = 0
228+
buf[3] = 0
229+
buf[4] = self.state
230+
buf[5] = self.error
231+
232+
# Clear errors now they've been sent to host.
233+
self.status = DFU.STATUS_OK
234+
self.error = 0
235+
236+
return buf
237+
else:
238+
return None
239+
240+
def process(self):
241+
# Transition the DFU state machine.
242+
if self.state == DFU.STATE_MANIFEST:
243+
self.leave_dfu = True
244+
elif self.state == DFU.STATE_BUSY:
245+
if self.cmd == DFU.CMD_DNLOAD:
246+
self.cmd = DFU.CMD_NONE
247+
self.state = self.process_dnload()
248+
249+
def process_dnload(self):
250+
ret = -1 # Assume error.
251+
if self.dnload_block_num == 0:
252+
# Download control commands.
253+
if self.dnload_len >= 1 and self.dnload_buf[0] == DFU.CMD_DNLOAD_ERASE:
254+
if self.dnload_len == 1:
255+
# Mass erase.
256+
ret = self.do_mass_erase()
257+
if ret != 0:
258+
self.cmd = DFU.CMD_NONE
259+
elif self.dnload_len == 5:
260+
# Erase page.
261+
addr = struct.unpack_from("<L", self.dnload_buf, 1)[0]
262+
ret = self.do_page_erase(addr)
263+
elif self.dnload_len >= 1 and self.dnload_buf[0] == DFU.CMD_DNLOAD_SET_ADDRESS:
264+
if self.dnload_len == 5:
265+
# Set address.
266+
self.addr = struct.unpack_from("<L", self.dnload_buf, 1)[0]
267+
ret = 0
268+
elif self.dnload_block_num > 1:
269+
# Write data to memory.
270+
addr = (self.dnload_block_num - 2) * wTransferSize + self.addr
271+
ret = self.do_write(addr, self.dnload_len, self.dnload_buf)
272+
if ret == 0:
273+
return DFU.STATE_DNLOAD_IDLE
274+
else:
275+
return DFU.STATE_ERROR
276+
277+
def do_mass_erase(self):
278+
# This function would implement a mass erase of flash memory.
279+
return 0 # indicate success
280+
281+
def do_page_erase(self, addr):
282+
# This function would implement an erase of a page in flash memory.
283+
return 0 # indicate success
284+
285+
def do_read(self, addr, buf):
286+
# This function would implement a read at the given address of flash memory.
287+
# Return some dummy bytes.
288+
for i in range(len(buf)):
289+
buf[i] = i & 0xFF
290+
return 0 # indicate success
291+
292+
def do_write(self, addr, size, buf):
293+
# This function would implement a write of the given data to flash memory.
294+
return 0 # indicate success
295+
296+
297+
# Create an instance of the DFU state machine.
298+
dfu = DFU()
299+
300+
# Create an instance of the DFU USB handler.
301+
dfu_usb = DFUOverUSB(dfu)
302+
303+
# Switch the USB device to the custom DFU driver.
304+
usbd = machine.USBDevice()
305+
usbd.active(0)
306+
usbd.builtin_driver = usbd.BUILTIN_NONE
307+
usbd.config(
308+
desc_dev=_desc_dev,
309+
desc_cfg=_desc_cfg,
310+
desc_strs=_desc_strs,
311+
control_xfer_cb=dfu_usb._control_xfer_cb,
312+
)
313+
usbd.active(1)
314+
315+
# Wait for the DFU state machine to complete.
316+
while not dfu.leave_dfu:
317+
machine.idle()
318+
319+
# Switch the USB device back to the default built-in driver.
320+
usbd.active(0)
321+
usbd.builtin_driver = usbd.BUILTIN_DEFAULT
322+
usbd.config(
323+
desc_dev=usbd.builtin_driver.desc_dev,
324+
desc_cfg=usbd.builtin_driver.desc_cfg,
325+
desc_strs=(),
326+
)
327+
usbd.active(1)

0 commit comments

Comments
 (0)