Skip to content

Commit f6fd46c

Browse files
jimmodpgeorge
authored andcommitted
examples/bluetooth: Add bonding/passkey demo.
Signed-off-by: Jim Mussared <jim.mussared@gmail.com>
1 parent e4f27cb commit f6fd46c

File tree

1 file changed

+194
-0
lines changed

1 file changed

+194
-0
lines changed
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
# This example demonstrates a simple temperature sensor peripheral.
2+
#
3+
# The sensor's local value updates every second, and it will notify
4+
# any connected central every 10 seconds.
5+
#
6+
# Work-in-progress demo of implementing bonding and passkey auth.
7+
8+
import bluetooth
9+
import random
10+
import struct
11+
import time
12+
import json
13+
import binascii
14+
from ble_advertising import advertising_payload
15+
16+
from micropython import const
17+
18+
_IRQ_CENTRAL_CONNECT = const(1)
19+
_IRQ_CENTRAL_DISCONNECT = const(2)
20+
_IRQ_GATTS_INDICATE_DONE = const(20)
21+
22+
_IRQ_ENCRYPTION_UPDATE = const(28)
23+
_IRQ_PASSKEY_ACTION = const(31)
24+
25+
_IRQ_GET_SECRET = const(29)
26+
_IRQ_SET_SECRET = const(30)
27+
28+
_FLAG_READ = const(0x0002)
29+
_FLAG_NOTIFY = const(0x0010)
30+
_FLAG_INDICATE = const(0x0020)
31+
32+
_FLAG_READ_ENCRYPTED = const(0x0200)
33+
34+
# org.bluetooth.service.environmental_sensing
35+
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
36+
# org.bluetooth.characteristic.temperature
37+
_TEMP_CHAR = (
38+
bluetooth.UUID(0x2A6E),
39+
_FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE | _FLAG_READ_ENCRYPTED,
40+
)
41+
_ENV_SENSE_SERVICE = (
42+
_ENV_SENSE_UUID,
43+
(_TEMP_CHAR,),
44+
)
45+
46+
# org.bluetooth.characteristic.gap.appearance.xml
47+
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
48+
49+
_IO_CAPABILITY_DISPLAY_ONLY = const(0)
50+
_IO_CAPABILITY_DISPLAY_YESNO = const(1)
51+
_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
52+
_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
53+
_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
54+
55+
_PASSKEY_ACTION_INPUT = const(2)
56+
_PASSKEY_ACTION_DISP = const(3)
57+
_PASSKEY_ACTION_NUMCMP = const(4)
58+
59+
60+
class BLETemperature:
61+
def __init__(self, ble, name="mpy-temp"):
62+
self._ble = ble
63+
self._load_secrets()
64+
self._ble.irq(self._irq)
65+
self._ble.config(bond=True)
66+
self._ble.config(le_secure=True)
67+
self._ble.config(mitm=True)
68+
self._ble.config(io=_IO_CAPABILITY_DISPLAY_YESNO)
69+
self._ble.active(True)
70+
self._ble.config(addr_mode=2)
71+
((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
72+
self._connections = set()
73+
self._payload = advertising_payload(
74+
name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
75+
)
76+
self._advertise()
77+
78+
def _irq(self, event, data):
79+
# Track connections so we can send notifications.
80+
if event == _IRQ_CENTRAL_CONNECT:
81+
conn_handle, _, _ = data
82+
self._connections.add(conn_handle)
83+
elif event == _IRQ_CENTRAL_DISCONNECT:
84+
conn_handle, _, _ = data
85+
self._connections.remove(conn_handle)
86+
self._save_secrets()
87+
# Start advertising again to allow a new connection.
88+
self._advertise()
89+
elif event == _IRQ_ENCRYPTION_UPDATE:
90+
conn_handle, encrypted, authenticated, bonded, key_size = data
91+
print("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
92+
elif event == _IRQ_PASSKEY_ACTION:
93+
conn_handle, action, passkey = data
94+
print("passkey action", conn_handle, action, passkey)
95+
if action == _PASSKEY_ACTION_NUMCMP:
96+
accept = int(input("accept? "))
97+
self._ble.gap_passkey(conn_handle, action, accept)
98+
elif action == _PASSKEY_ACTION_DISP:
99+
print("displaying 123456")
100+
self._ble.gap_passkey(conn_handle, action, 123456)
101+
elif action == _PASSKEY_ACTION_INPUT:
102+
print("prompting for passkey")
103+
passkey = int(input("passkey? "))
104+
self._ble.gap_passkey(conn_handle, action, passkey)
105+
else:
106+
print("unknown action")
107+
elif event == _IRQ_GATTS_INDICATE_DONE:
108+
conn_handle, value_handle, status = data
109+
elif event == _IRQ_SET_SECRET:
110+
sec_type, key, value = data
111+
key = sec_type, bytes(key)
112+
value = bytes(value) if value else None
113+
print("set secret:", key, value)
114+
if value is None:
115+
if key in self._secrets:
116+
del self._secrets[key]
117+
return True
118+
else:
119+
return False
120+
else:
121+
self._secrets[key] = value
122+
return True
123+
elif event == _IRQ_GET_SECRET:
124+
sec_type, index, key = data
125+
print("get secret:", sec_type, index, bytes(key) if key else None)
126+
if key is None:
127+
i = 0
128+
for (t, _key), value in self._secrets.items():
129+
if t == sec_type:
130+
if i == index:
131+
return value
132+
i += 1
133+
return None
134+
else:
135+
key = sec_type, bytes(key)
136+
return self._secrets.get(key, None)
137+
138+
def set_temperature(self, temp_deg_c, notify=False, indicate=False):
139+
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
140+
# Write the local value, ready for a central to read.
141+
self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100)))
142+
if notify or indicate:
143+
for conn_handle in self._connections:
144+
if notify:
145+
# Notify connected centrals.
146+
self._ble.gatts_notify(conn_handle, self._handle)
147+
if indicate:
148+
# Indicate connected centrals.
149+
self._ble.gatts_indicate(conn_handle, self._handle)
150+
151+
def _advertise(self, interval_us=500000):
152+
self._ble.config(addr_mode=2)
153+
self._ble.gap_advertise(interval_us, adv_data=self._payload)
154+
155+
def _load_secrets(self):
156+
self._secrets = {}
157+
try:
158+
with open("secrets.json", "r") as f:
159+
entries = json.load(f)
160+
for sec_type, key, value in entries:
161+
self._secrets[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
162+
except:
163+
print("no secrets available")
164+
165+
def _save_secrets(self):
166+
try:
167+
with open("secrets.json", "w") as f:
168+
json_secrets = [
169+
(sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
170+
for (sec_type, key), value in self._secrets.items()
171+
]
172+
json.dump(json_secrets, f)
173+
except:
174+
print("failed to save secrets")
175+
176+
177+
def demo():
178+
ble = bluetooth.BLE()
179+
temp = BLETemperature(ble)
180+
181+
t = 25
182+
i = 0
183+
184+
while True:
185+
# Write every second, notify every 10 seconds.
186+
i = (i + 1) % 10
187+
temp.set_temperature(t, notify=i == 0, indicate=False)
188+
# Random walk the temperature.
189+
t += random.uniform(-0.5, 0.5)
190+
time.sleep_ms(1000)
191+
192+
193+
if __name__ == "__main__":
194+
demo()

0 commit comments

Comments
 (0)