-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy patha2dp.py
More file actions
executable file
·394 lines (286 loc) · 11.5 KB
/
a2dp.py
File metadata and controls
executable file
·394 lines (286 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
#! /usr/bin/env python3.5
"""
Fixing bluetooth stereo headphone/headset problem in ubuntu 16.04 and also debian jessie, with bluez5.
Workaround for bug: https://bugs.launchpad.net/ubuntu/+source/indicator-sound/+bug/1577197
Run it with python3.5 or higher after pairing/connecting the bluetooth stereo headphone.
This will be only fixes the bluez5 problem mentioned above .
Licence: Freeware
See ``python3.5 a2dp.py -h``.
Shorthands:
$ alias speakers="a2dp.py 10:08:C1:44:AE:BC"
$ alias headphones="a2dp.py 00:22:37:3D:DA:50"
$ alias headset="a2dp.py 00:22:37:F8:A0:77 -p hsp"
$ speakers
Check here for the latest updates: https://gist.github.com/pylover/d68be364adac5f946887b85e6ed6e7ae
Thanks to:
* https://github.com/DominicWatson, for adding the ``-p/--profile`` argument.
* https://github.com/IzzySoft, for mentioning wait before connecting again.
* https://github.com/AmploDev, for v0.4.0
* https://github.com/Mihara, for autodetect & autorun service
* https://github.com/dabrovnijk, for systemd service
Change Log
----------
- 0.5.2
* Increasing the number of tries to 15.
- 0.5.2
* Optimizing waits.
- 0.5.1
* Increasing WAIT_TIME and TRIES
- 0.5.0
* Autodetect & autorun service
- 0.4.1
* Sorting device list
- 0.4.0
* Adding ignore_fail argument by @AmploDev.
* Sending all available streams into selected sink, after successfull connection by @AmploDev.
- 0.3.3
* Updating default sink before turning to ``off`` profile.
- 0.3.2
* Waiting a bit: ``-w/--wait`` before connecting again.
- 0.3.0
* Adding -p / --profile option for using the same script to switch between headset and A2DP audio profiles
- 0.2.5
* Mentioning [mac] argument.
- 0.2.4
* Removing duplicated devices in select device list.
- 0.2.3
* Matching ANSI escape characters. Tested on 16.10 & 16.04
- 0.2.2
* Some sort of code enhancements.
- 0.2.0
* Adding `-V/--version`, `-w/--wait` and `-t/--tries` CLI arguments.
- 0.1.1
* Supporting the `[NEW]` prefix for devices & controllers as advised by @wdullaer
* Drying the code.
"""
import sys
import re
import asyncio
import subprocess as sb
import argparse
__version__ = '0.5.2'
HEX_DIGIT_PATTERN = '[0-9A-F]'
HEX_BYTE_PATTERN = '%s{2}' % HEX_DIGIT_PATTERN
MAC_ADDRESS_PATTERN = ':'.join((HEX_BYTE_PATTERN, ) * 6)
DEVICE_PATTERN = re.compile('^(?:.*\s)?Device\s(?P<mac>%s)\s(?P<name>.*)' % MAC_ADDRESS_PATTERN)
CONTROLLER_PATTERN = re.compile('^(?:.*\s)?Controller\s(?P<mac>%s)\s(?P<name>.*)' % MAC_ADDRESS_PATTERN)
WAIT_TIME = 2.25
TRIES = 15
PROFILE = 'a2dp'
_profiles = {
'a2dp': 'a2dp_sink',
'hsp': 'headset_head_unit',
'off': 'off'
}
# CLI Arguments
parser = argparse.ArgumentParser(prog=sys.argv[0])
parser.add_argument('-e', '--echo', action='store_true', default=False,
help='If given, the subprocess stdout will be also printed on stdout.')
parser.add_argument('-w', '--wait', default=WAIT_TIME, type=float,
help='The seconds to wait for subprocess output, default is: %s' % WAIT_TIME)
parser.add_argument('-t', '--tries', default=TRIES, type=int,
help='The number of tries if subprocess is failed. default is: %s' % TRIES)
parser.add_argument('-p', '--profile', default=PROFILE,
help='The profile to switch to. available options are: hsp, a2dp. default is: %s' % PROFILE)
parser.add_argument('-V', '--version', action='store_true', help='Show the version.')
parser.add_argument('mac', nargs='?', default=None)
# Exceptions
class SubprocessError(Exception):
pass
class RetryExceededError(Exception):
pass
class BluetoothctlProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future, echo=True):
self.exit_future = exit_future
self.transport = None
self.output = None
self.echo = echo
def listen_output(self):
self.output = ''
def not_listen_output(self):
self.output = None
def pipe_data_received(self, fd, raw):
d = raw.decode()
if self.echo:
print(d, end='')
if self.output is not None:
self.output += d
def process_exited(self):
self.exit_future.set_result(True)
def connection_made(self, transport):
self.transport = transport
print('Connection MADE')
async def send_command(self, c):
stdin_transport = self.transport.get_pipe_transport(0)
# noinspection PyProtectedMember
stdin_transport._pipe.write(('%s\n' % c).encode())
async def search_in_output(self, expression, fail_expression=None):
if self.output is None:
return None
for l in self.output.splitlines():
if fail_expression and re.search(fail_expression, l, re.IGNORECASE):
raise SubprocessError('Expression "%s" failed with fail pattern: "%s"' % (l, fail_expression))
if re.search(expression, l, re.IGNORECASE):
return True
async def send_and_wait(self, cmd, wait_expression, fail_expression='fail'):
try:
self.listen_output()
await self.send_command(cmd)
while not await self.search_in_output(wait_expression.lower(), fail_expression=fail_expression):
await wait()
finally:
self.not_listen_output()
async def disconnect(self, mac):
print('Disconnecting the device.')
await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected')
async def connect(self, mac):
print('Connecting again.')
await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful')
async def trust(self, mac):
await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded')
async def quit(self):
await self.send_command('quit')
async def get_list(self, command, pattern):
result = set()
try:
self.listen_output()
await self.send_command(command)
await wait()
for l in self.output.splitlines():
m = pattern.match(l)
if m:
result.add(m.groups())
return sorted(list(result), key=lambda i: i[1])
finally:
self.not_listen_output()
async def list_devices(self):
return await self.get_list('devices', DEVICE_PATTERN)
async def list_paired_devices(self):
return await self.get_list('paired-devices', DEVICE_PATTERN)
async def list_controllers(self):
return await self.get_list('list', CONTROLLER_PATTERN)
async def select_paired_device(self):
print('Selecting device:')
devices = await self.list_paired_devices()
count = len(devices)
if count < 1:
raise SubprocessError('There is no connected device.')
elif count == 1:
return devices[0]
for i, d in enumerate(devices):
print('%d. %s %s' % (i+1, d[0], d[1]))
print('Select device[1]:')
selected = input()
return devices[0 if not selected.strip() else (int(selected) - 1)]
async def wait(delay=None):
return await asyncio.sleep(WAIT_TIME or delay)
async def execute_command(cmd, ignore_fail=False):
p = await asyncio.create_subprocess_shell(cmd, stdout=sb.PIPE, stderr=sb.PIPE)
stdout, stderr = await p.communicate()
stdout, stderr = \
stdout.decode() if stdout is not None else '', \
stderr.decode() if stderr is not None else ''
if p.returncode != 0 or stderr.strip() != '':
message = 'Command: %s failed with status: %s\nstderr: %s' % (cmd, p.returncode, stderr)
if ignore_fail:
print('Ignoring: %s' % message)
else:
raise SubprocessError(message)
return stdout
async def execute_find(cmd, pattern, tries=0, fail_safe=False):
tries = tries or TRIES
message = 'Cannot find `%s` using `%s`.' % (pattern, cmd)
retry_message = message + ' Retrying %d more times'
while True:
stdout = await execute_command(cmd)
match = re.search(pattern, stdout)
if match:
return match.group()
elif tries > 0:
await wait()
print(retry_message % tries)
tries -= 1
continue
if fail_safe:
return None
raise RetryExceededError('Retry times exceeded: %s' % message)
async def find_dev_id(mac, **kw):
return await execute_find('pactl list cards short', 'bluez_card.%s' % '_'.join(mac), **kw)
async def find_sink(mac, **kw):
return await execute_find('pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac), **kw)
async def set_profile(device_id, profile):
print('Setting the %s profile' % profile)
try:
return await execute_command('pactl set-card-profile %s %s' % (device_id, _profiles[profile]))
except KeyError:
print('Invalid profile: %s, please select one one of a2dp or hsp.' % profile, file=sys.stderr)
raise SystemExit(1)
async def set_default_sink(sink):
print('Updating default sink to %s' % sink)
return await execute_command('pacmd set-default-sink %s' % sink)
async def move_streams_to_sink(sink):
streams = await execute_command('pacmd list-sink-inputs | grep "index:"', True)
for i in streams.split():
i = ''.join(n for n in i if n.isdigit())
if i != '':
print('Moving stream %s to sink' % i)
await execute_command('pacmd move-sink-input %s %s' % (i, sink))
return sink
async def main(args):
global WAIT_TIME, TRIES
if args.version:
print(__version__)
return 0
mac = args.mac
# Hacking, Changing the constants!
WAIT_TIME = args.wait
TRIES = args.tries
exit_future = asyncio.Future()
transport, protocol = await asyncio.get_event_loop().subprocess_exec(
lambda: BluetoothctlProtocol(exit_future, echo=args.echo), 'bluetoothctl'
)
try:
if mac is None:
mac, _ = await protocol.select_paired_device()
mac = mac.split(':' if ':' in mac else '_')
print('Device MAC: %s' % ':'.join(mac))
device_id = await find_dev_id(mac, fail_safe=True)
if device_id is None:
print('It seems device: %s is not connected yet, trying to connect.' % ':'.join(mac))
await protocol.trust(mac)
await protocol.connect(mac)
device_id = await find_dev_id(mac)
sink = await find_sink(mac, fail_safe=True)
if sink is None:
await set_profile(device_id, args.profile)
sink = await find_sink(mac)
print('Device ID: %s' % device_id)
print('Sink: %s' % sink)
await set_default_sink(sink)
await wait()
await set_profile(device_id, 'off')
if args.profile is 'a2dp':
await protocol.disconnect(mac)
await wait()
await protocol.connect(mac)
device_id = await find_dev_id(mac)
print('Device ID: %s' % device_id)
await wait(2)
await set_profile(device_id, args.profile)
await set_default_sink(sink)
await move_streams_to_sink(sink)
except (SubprocessError, RetryExceededError) as ex:
print(str(ex), file=sys.stderr)
return 1
finally:
print('Exiting bluetoothctl')
await protocol.quit()
await exit_future
# Close the stdout pipe
transport.close()
if args.profile == 'a2dp':
print('"Enjoy" the HiFi stereo music :)')
else:
print('"Enjoy" your headset audio :)')
if __name__ == '__main__':
sys.exit(asyncio.get_event_loop().run_until_complete(main(parser.parse_args())))