diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..de98d3df --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,55 @@ + +## Context + + + +## Proposed change + + + +## Type of change + + +- [ ] Dependency upgrade +- [ ] Bugfix (non-breaking change which fixes an issue) +- [ ] New device +- [ ] New product id (the device is already supported with a different id) +- [ ] New feature (which adds functionality to an existing device) +- [ ] Breaking change (fix/feature causing existing functionality to break) +- [ ] Code quality improvements to existing code or addition of tests +- [ ] Documentation + +## Additional information + + +- This PR fixes issue: fixes # +- This PR is related to: +- Link to documentation pull request: + +## Checklist + + +- [ ] The code change is tested and works locally. +- [ ] The code has been formatted using Black. +- [ ] The code follows the [Zen of Python](https://www.python.org/dev/peps/pep-0020/). +- [ ] I am creating the Pull Request against the correct branch. +- [ ] Documentation added/updated. diff --git a/.github/workflows/flake8.yaml b/.github/workflows/flake8.yaml new file mode 100644 index 00000000..aa09a19c --- /dev/null +++ b/.github/workflows/flake8.yaml @@ -0,0 +1,33 @@ +name: Python flake8 + +on: + push: + branches: [ master, dev ] + pull_request: + branches: [ master, dev ] + +jobs: + test: + runs-on: ubuntu-20.04 + strategy: + matrix: + python-version: [3.6, 3.7, 3.8, 3.9] + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install wheel + pip install flake8 flake8-quotes + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. ignore magic numbers and use double quotes and ignore numbers with zeroes before them. + # and ignore lowercase hex numbers and ignore isort incorrect imports + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=90 --ignore=WPS432,WPS339,WPS341,I --inline-quotes double --statistics diff --git a/README.md b/README.md index 3d7598f5..81c6de5b 100644 --- a/README.md +++ b/README.md @@ -1,65 +1,247 @@ -Python control for Broadlink RM2 IR controllers -=============================================== +# python-broadlink -A simple Python API for controlling IR controllers from [Broadlink](http://www.ibroadlink.com/rm/). At present, only RM Pro (referred to as RM2 in the codebase) and A1 sensor platform devices are supported. There is currently no support for the cloud API. +A Python module and CLI for controlling Broadlink devices locally. The following devices are supported: -Example use ------------ +- **Universal remotes**: RM home, RM mini 3, RM plus, RM pro, RM pro+, RM4 mini, RM4 pro, RM4C mini, RM4S, RM4 TV mate +- **Smart plugs**: SP mini, SP mini 3, SP mini+, SP1, SP2, SP2-BR, SP2-CL, SP2-IN, SP2-UK, SP3, SP3-EU, SP3S-EU, SP3S-US, SP4L-AU, SP4L-EU, SP4L-UK, SP4M, SP4M-US, Ankuoo NEO, Ankuoo NEO PRO, Efergy Ego, BG AHC/U-01 +- **Switches**: MCB1, SC1, SCB1E, SCB2 +- **Outlets**: BG 800, BG 900 +- **Power strips**: MP1-1K3S2U, MP1-1K4S, MP2 +- **Environment sensors**: A1 +- **Alarm kits**: S1C, S2KIT +- **Light bulbs**: LB1, LB26 R1, LB27 R1, SB800TD +- **Curtain motors**: Dooya DT360E-45/20 +- **Thermostats**: Hysen HY02B05H +- **Hubs**: S3 + +## Installation + +Use pip3 to install the latest version of this module. + +``` +pip3 install broadlink +``` + +## Basic functions + +First, open Python 3 and import this module. -Discover available devices on the local network: ``` +python3 +``` +```python3 import broadlink +``` + +Now let's try some functions... + +### Setup + +In order to control the device, you need to connect it to your local network. If you have already configured the device with the Broadlink app, this step is not necessary. + +1. Put the device into AP Mode. + - Long press the reset button until the blue LED is blinking quickly. + - Long press again until blue LED is blinking slowly. + - Manually connect to the WiFi SSID named BroadlinkProv. +2. Connect the device to your local network with the setup function. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3) +``` + +Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) -devices = broadlink.discover(timeout=5) +#### Advanced options + +You may need to specify a broadcast address if setup is not working. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255') ``` -Obtain the authentication key required for further communication: +### Discovery + +Use this function to discover devices: + +```python3 +devices = broadlink.discover() ``` -devices[0].auth() + +#### Advanced options +You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. + +Using the IP address of your local machine: +```python3 +devices = broadlink.discover(local_ip_address='192.168.0.100') ``` -Enter learning mode: +Using the broadcast address of your subnet: +```python3 +devices = broadlink.discover(discover_ip_address='192.168.0.255') ``` -devices[0].enter_learning() + +If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery: +```python3 +device = broadlink.hello('192.168.0.16') ``` -Obtain an IR or RF packet while in learning mode: +If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly: +```python3 +for device in broadlink.xdiscover(): + print(device) # Example action. Do whatever you want here. ``` -ir_packet = devices[0].check_data() + +### Authentication +After discovering the device, call the `auth()` method to obtain the authentication key required for further communication: +```python3 +device.auth() ``` -(This will return None if the device does not have a packet to return) -Send an IR or RF packet: +The next steps depend on the type of device you want to control. + +## Universal remotes + +### Learning IR codes + +Learning IR codes takes place in three steps. + +1. Enter learning mode: +```python3 +device.enter_learning() ``` -devices[0].send_data(ir_packet) +2. When the LED blinks, point the remote at the Broadlink device and press the button you want to learn. +3. Get the IR packet. +```python3 +packet = device.check_data() ``` -Obtain temperature data from an RM2: +### Learning RF codes + +Learning RF codes takes place in six steps. + +1. Sweep the frequency: +```python3 +device.sweep_frequency() +``` +2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn. +3. Check if the frequency was successfully identified: +```python3 +ok = device.check_frequency() +if ok: + print('Frequency found!') +``` +4. Enter learning mode: +```python3 +device.find_rf_packet() +``` +5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. +6. Get the RF packet: +```python3 +packet = device.check_data() ``` -devices[0].check_temperature() + +#### Notes + +Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`. + +### Canceling learning + +You can exit the learning mode in the middle of the process by calling this method: +```python3 +device.cancel_sweep_frequency() ``` -Obtain sensor data from an A1: +### Sending IR/RF packets +```python3 +device.send_data(packet) ``` -data = devices[0].check_sensors() + +### Fetching sensor data +```python3 +data = device.check_sensors() ``` -Set power state on a SmartPlug SP2/SP3: +## Switches + +### Setting power state +```python3 +device.set_power(True) +device.set_power(False) ``` -devices[0].set_power(True) + +### Checking power state +```python3 +state = device.check_power() ``` -Check power state on a SmartPlug: +### Checking energy consumption +```python3 +state = device.get_energy() ``` -state = devices[0].check_power() + +## Power strips + +### Setting power state +```python3 +device.set_power(1, True) # Example socket. It could be 2 or 3. +device.set_power(1, False) ``` -Set power state for S1 on a SmartPowerStrip MP1: +### Checking power state +```python3 +state = device.check_power() ``` -devices[0].set_power(1, True) + +## Light bulbs + +### Fetching data +```python3 +state = device.get_state() ``` -Check power state on a SmartPowerStrip: +### Setting state attributes +```python3 +devices[0].set_state(pwr=0) +devices[0].set_state(pwr=1) +devices[0].set_state(brightness=75) +devices[0].set_state(bulb_colormode=0) +devices[0].set_state(blue=255) +devices[0].set_state(red=0) +devices[0].set_state(green=128) +devices[0].set_state(bulb_colormode=1) +``` + +## Environment sensors + +### Fetching sensor data +```python3 +data = device.check_sensors() +``` + +## Hubs + +### Discovering subdevices +```python3 +device.get_subdevices() +``` + +### Fetching data +Use the DID obtained from get_subdevices() for the input parameter to query specific sub-device. + +```python3 +device.get_state(did="00000000000000000000a043b0d06963") +``` + +### Setting state attributes +The parameters depend on the type of subdevice that is being controlled. In this example, we are controlling LC-1 switches: + +#### Turn on +```python3 +device.set_state(did="00000000000000000000a043b0d0783a", pwr=1) +device.set_state(did="00000000000000000000a043b0d0783a", pwr1=1) +device.set_state(did="00000000000000000000a043b0d0783a", pwr2=1) +``` +#### Turn off +```python3 +device.set_state(did="00000000000000000000a043b0d0783a", pwr=0) +device.set_state(did="00000000000000000000a043b0d0783a", pwr1=0) +device.set_state(did="00000000000000000000a043b0d0783a", pwr2=0) ``` -state = devices[0].check_power() -``` \ No newline at end of file diff --git a/TROUBLESHOOTING.md b/TROUBLESHOOTING.md new file mode 100644 index 00000000..a20765b2 --- /dev/null +++ b/TROUBLESHOOTING.md @@ -0,0 +1,9 @@ +# Troubleshooting + +## Firmware issues + +### AP setup fails with non-alphanumeric passwords + +Some devices ship with firmware that cannot connect to WLANs with non-alphanumeric passwords. To fix this, update the firmware to the latest version. You can also change the password to one with just letters and numbers or create a separate guest network with a simpler password. + +_First seen on Broadlink RM4 pro 0x6026. Already fixed in firmware v52079._ diff --git a/broadlink/__init__.py b/broadlink/__init__.py index d5cc8d28..d3135501 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,484 +1,333 @@ -#!/usr/bin/python - -from datetime import datetime -from Crypto.Cipher import AES -import time -import random +#!/usr/bin/env python3 +"""The python-broadlink library.""" import socket -import threading - -def gendevice(devtype, host, mac): - if devtype == 0: # SP1 - return sp1(host=host, mac=mac) - if devtype == 0x2711: # SP2 - return sp2(host=host, mac=mac) - if devtype == 0x2719 or devtype == 0x7919 or devtype == 0x271a or devtype == 0x791a: # Honeywell SP2 - return sp2(host=host, mac=mac) - if devtype == 0x2720: # SPMini - return sp2(host=host, mac=mac) - elif devtype == 0x753e: # SP3 - return sp2(host=host, mac=mac) - elif devtype == 0x2728: # SPMini2 - return sp2(host=host, mac=mac) - elif devtype == 0x2733 or devtype == 0x273e: # OEM branded SPMini - return sp2(host=host, mac=mac) - elif devtype >= 0x7530 and devtype <= 0x7918: # OEM branded SPMini2 - return sp2(host=host, mac=mac) - elif devtype == 0x2736: # SPMiniPlus - return sp2(host=host, mac=mac) - elif devtype == 0x2712: # RM2 - return rm(host=host, mac=mac) - elif devtype == 0x2737: # RM Mini - return rm(host=host, mac=mac) - elif devtype == 0x273d: # RM Pro Phicomm - return rm(host=host, mac=mac) - elif devtype == 0x2783: # RM2 Home Plus - return rm(host=host, mac=mac) - elif devtype == 0x277c: # RM2 Home Plus GDT - return rm(host=host, mac=mac) - elif devtype == 0x272a: # RM2 Pro Plus - return rm(host=host, mac=mac) - elif devtype == 0x2787: # RM2 Pro Plus2 - return rm(host=host, mac=mac) - elif devtype == 0x278b: # RM2 Pro Plus BL - return rm(host=host, mac=mac) - elif devtype == 0x278f: # RM Mini Shate - return rm(host=host, mac=mac) - elif devtype == 0x2714: # A1 - return a1(host=host, mac=mac) - elif devtype == 0x4EB5: # MP1 - return mp1(host=host, mac=mac) - else: - return device(host=host, mac=mac) - -def discover(timeout=None, local_ip_address=None): - if local_ip_address is None: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(('8.8.8.8', 53)) # connecting to a UDP address doesn't send packets - local_ip_address = s.getsockname()[0] - address = local_ip_address.split('.') - cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - cs.bind((local_ip_address,0)) - port = cs.getsockname()[1] - starttime = time.time() - - devices = [] - - timezone = int(time.timezone/-3600) - packet = bytearray(0x30) - - year = datetime.now().year - - if timezone < 0: - packet[0x08] = 0xff + timezone - 1 - packet[0x09] = 0xff - packet[0x0a] = 0xff - packet[0x0b] = 0xff - else: - packet[0x08] = timezone - packet[0x09] = 0 - packet[0x0a] = 0 - packet[0x0b] = 0 - packet[0x0c] = year & 0xff - packet[0x0d] = year >> 8 - packet[0x0e] = datetime.now().minute - packet[0x0f] = datetime.now().hour - subyear = str(year)[2:] - packet[0x10] = int(subyear) - packet[0x11] = datetime.now().isoweekday() - packet[0x12] = datetime.now().day - packet[0x13] = datetime.now().month - packet[0x18] = int(address[0]) - packet[0x19] = int(address[1]) - packet[0x1a] = int(address[2]) - packet[0x1b] = int(address[3]) - packet[0x1c] = port & 0xff - packet[0x1d] = port >> 8 - packet[0x26] = 6 - checksum = 0xbeaf - - for i in range(len(packet)): - checksum += packet[i] - checksum = checksum & 0xffff - packet[0x20] = checksum & 0xff - packet[0x21] = checksum >> 8 - - cs.sendto(packet, ('255.255.255.255', 80)) - if timeout is None: - response = cs.recvfrom(1024) - responsepacket = bytearray(response[0]) - host = response[1] - mac = responsepacket[0x3a:0x40] - devtype = responsepacket[0x34] | responsepacket[0x35] << 8 - return gendevice(devtype, host, mac) - else: - while (time.time() - starttime) < timeout: - cs.settimeout(timeout - (time.time() - starttime)) - try: - response = cs.recvfrom(1024) - except socket.timeout: - return devices - responsepacket = bytearray(response[0]) - host = response[1] - devtype = responsepacket[0x34] | responsepacket[0x35] << 8 - mac = responsepacket[0x3a:0x40] - dev = gendevice(devtype, host, mac) - devices.append(dev) - return devices - - -class device: - def __init__(self, host, mac, timeout=10): - self.host = host - self.mac = mac - self.timeout = timeout - self.count = random.randrange(0xffff) - self.key = bytearray([0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) - self.iv = bytearray([0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58]) - self.id = bytearray([0, 0, 0, 0]) - self.cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - self.cs.bind(('',0)) - self.type = "Unknown" - self.lock = threading.Lock() - - def auth(self): - payload = bytearray(0x50) - payload[0x04] = 0x31 - payload[0x05] = 0x31 - payload[0x06] = 0x31 - payload[0x07] = 0x31 - payload[0x08] = 0x31 - payload[0x09] = 0x31 - payload[0x0a] = 0x31 - payload[0x0b] = 0x31 - payload[0x0c] = 0x31 - payload[0x0d] = 0x31 - payload[0x0e] = 0x31 - payload[0x0f] = 0x31 - payload[0x10] = 0x31 - payload[0x11] = 0x31 - payload[0x12] = 0x31 - payload[0x1e] = 0x01 - payload[0x2d] = 0x01 - payload[0x30] = ord('T') - payload[0x31] = ord('e') - payload[0x32] = ord('s') - payload[0x33] = ord('t') - payload[0x34] = ord(' ') - payload[0x35] = ord(' ') - payload[0x36] = ord('1') - - response = self.send_packet(0x65, payload) - - enc_payload = response[0x38:] - - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - payload = aes.decrypt(bytes(enc_payload)) - - if not payload: - return False - - key = payload[0x04:0x14] - if len(key) % 16 != 0: - return False - - self.id = payload[0x00:0x04] - self.key = key - return True - - def get_type(self): - return self.type - - def send_packet(self, command, payload): - self.count = (self.count + 1) & 0xffff - packet = bytearray(0x38) - packet[0x00] = 0x5a - packet[0x01] = 0xa5 - packet[0x02] = 0xaa - packet[0x03] = 0x55 - packet[0x04] = 0x5a - packet[0x05] = 0xa5 - packet[0x06] = 0xaa - packet[0x07] = 0x55 - packet[0x24] = 0x2a - packet[0x25] = 0x27 - packet[0x26] = command - packet[0x28] = self.count & 0xff - packet[0x29] = self.count >> 8 - packet[0x2a] = self.mac[0] - packet[0x2b] = self.mac[1] - packet[0x2c] = self.mac[2] - packet[0x2d] = self.mac[3] - packet[0x2e] = self.mac[4] - packet[0x2f] = self.mac[5] - packet[0x30] = self.id[0] - packet[0x31] = self.id[1] - packet[0x32] = self.id[2] - packet[0x33] = self.id[3] - - checksum = 0xbeaf - for i in range(len(payload)): - checksum += payload[i] - checksum = checksum & 0xffff - - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - payload = aes.encrypt(bytes(payload)) - - packet[0x34] = checksum & 0xff - packet[0x35] = checksum >> 8 - - for i in range(len(payload)): - packet.append(payload[i]) - - checksum = 0xbeaf - for i in range(len(packet)): - checksum += packet[i] - checksum = checksum & 0xffff - packet[0x20] = checksum & 0xff - packet[0x21] = checksum >> 8 - - starttime = time.time() - with self.lock: - while True: +from typing import Generator, List, Optional, Tuple, Union + +from . import exceptions as e +from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT +from .alarm import S1C +from .climate import hvac, hysen +from .cover import dooya, dooya2, wser +from .device import Device, ping, scan +from .hub import s3 +from .light import lb1, lb2 +from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro +from .sensor import a1, a2 +from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b + +SUPPORTED_TYPES = { + sp1: { + 0x0000: ("SP1", "Broadlink"), + }, + sp2: { + 0x2717: ("NEO", "Ankuoo"), + 0x2719: ("SP2-compatible", "Honeywell"), + 0x271A: ("SP2-compatible", "Honeywell"), + 0x2720: ("SP mini", "Broadlink"), + 0x2728: ("SP2-compatible", "URANT"), + 0x273E: ("SP mini", "Broadlink"), + 0x7530: ("SP2", "Broadlink (OEM)"), + 0x7539: ("SP2-IL", "Broadlink (OEM)"), + 0x753E: ("SP mini 3", "Broadlink"), + 0x7540: ("MP2", "Broadlink"), + 0x7544: ("SP2-CL", "Broadlink"), + 0x7546: ("SP2-UK/BR/IN", "Broadlink (OEM)"), + 0x7547: ("SC1", "Broadlink"), + 0x7549: ("SP mini 3", "Broadlink (OEM)"), + 0x7918: ("SP2", "Broadlink (OEM)"), + 0x7919: ("SP2-compatible", "Honeywell"), + 0x791A: ("SP2-compatible", "Honeywell"), + 0x7D0D: ("SP mini 3", "Broadlink (OEM)"), + }, + sp2s: { + 0x2711: ("SP2", "Broadlink"), + 0x2716: ("NEO PRO", "Ankuoo"), + 0x271D: ("Ego", "Efergy"), + 0x2736: ("SP mini+", "Broadlink"), + }, + sp3: { + 0x2733: ("SP3", "Broadlink"), + 0x7D00: ("SP3-EU", "Broadlink (OEM)"), + }, + sp3s: { + 0x9479: ("SP3S-US", "Broadlink"), + 0x947A: ("SP3S-EU", "Broadlink"), + }, + sp4: { + 0x7568: ("SP4L-CN", "Broadlink"), + 0x756B: ("SP4M-JP", "Broadlink"), + 0x756C: ("SP4M", "Broadlink"), + 0x756F: ("MCB1", "Broadlink"), + 0x7579: ("SP4L-EU", "Broadlink"), + 0x757B: ("SP4L-AU", "Broadlink"), + 0x7583: ("SP mini 3", "Broadlink"), + 0x7587: ("SP4L-UK", "Broadlink"), + 0x7D11: ("SP mini 3", "Broadlink"), + 0xA4F9: ("WS4", "Broadlink (OEM)"), + 0xA569: ("SP4L-UK", "Broadlink"), + 0xA56A: ("MCB1", "Broadlink"), + 0xA56B: ("SCB1E", "Broadlink"), + 0xA56C: ("SP4L-EU", "Broadlink"), + 0xA576: ("SP4L-AU", "Broadlink"), + 0xA589: ("SP4L-UK", "Broadlink"), + 0xA5D3: ("SP4L-EU", "Broadlink"), + 0xA6F4: ("SP4D-US", "Broadlink"), + }, + sp4b: { + 0x5115: ("SCB1E", "Broadlink"), + 0x51E2: ("AHC/U-01", "BG Electrical"), + 0x6111: ("MCB1", "Broadlink"), + 0x6113: ("SCB1E", "Broadlink"), + 0x618B: ("SP4L-EU", "Broadlink"), + 0x6489: ("SP4L-AU", "Broadlink"), + 0x648B: ("SP4M-US", "Broadlink"), + 0x648C: ("SP4L-US", "Broadlink"), + 0x6494: ("SCB2", "Broadlink"), + }, + rmmini: { + 0x2737: ("RM mini 3", "Broadlink"), + 0x278F: ("RM mini", "Broadlink"), + 0x27B7: ("RM mini 3", "Broadlink"), + 0x27C2: ("RM mini 3", "Broadlink"), + 0x27C7: ("RM mini 3", "Broadlink"), + 0x27CC: ("RM mini 3", "Broadlink"), + 0x27CD: ("RM mini 3", "Broadlink"), + 0x27D0: ("RM mini 3", "Broadlink"), + 0x27D1: ("RM mini 3", "Broadlink"), + 0x27D3: ("RM mini 3", "Broadlink"), + 0x27DC: ("RM mini 3", "Broadlink"), + 0x27DE: ("RM mini 3", "Broadlink"), + }, + rmpro: { + 0x2712: ("RM pro/pro+", "Broadlink"), + 0x272A: ("RM pro", "Broadlink"), + 0x273D: ("RM pro", "Broadlink"), + 0x277C: ("RM home", "Broadlink"), + 0x2783: ("RM home", "Broadlink"), + 0x2787: ("RM pro", "Broadlink"), + 0x278B: ("RM plus", "Broadlink"), + 0x2797: ("RM pro+", "Broadlink"), + 0x279D: ("RM pro+", "Broadlink"), + 0x27A1: ("RM plus", "Broadlink"), + 0x27A6: ("RM plus", "Broadlink"), + 0x27A9: ("RM pro+", "Broadlink"), + 0x27C3: ("RM pro+", "Broadlink"), + }, + rmminib: { + 0x5F36: ("RM mini 3", "Broadlink"), + 0x6507: ("RM mini 3", "Broadlink"), + 0x6508: ("RM mini 3", "Broadlink"), + }, + rm4mini: { + 0x51DA: ("RM4 mini", "Broadlink"), + 0x5209: ("RM4 TV mate", "Broadlink"), + 0x520C: ("RM4 mini", "Broadlink"), + 0x520D: ("RM4C mini", "Broadlink"), + 0x5211: ("RM4C mate", "Broadlink"), + 0x5212: ("RM4 TV mate", "Broadlink"), + 0x5216: ("RM4 mini", "Broadlink"), + 0x521C: ("RM4 mini", "Broadlink"), + 0x6070: ("RM4C mini", "Broadlink"), + 0x610E: ("RM4 mini", "Broadlink"), + 0x610F: ("RM4C mini", "Broadlink"), + 0x62BC: ("RM4 mini", "Broadlink"), + 0x62BE: ("RM4C mini", "Broadlink"), + 0x6364: ("RM4S", "Broadlink"), + 0x648D: ("RM4 mini", "Broadlink"), + 0x6539: ("RM4C mini", "Broadlink"), + 0x653A: ("RM4 mini", "Broadlink"), + }, + rm4pro: { + 0x520B: ("RM4 pro", "Broadlink"), + 0x5213: ("RM4 pro", "Broadlink"), + 0x5218: ("RM4C pro", "Broadlink"), + 0x6026: ("RM4 pro", "Broadlink"), + 0x6184: ("RM4C pro", "Broadlink"), + 0x61A2: ("RM4 pro", "Broadlink"), + 0x649B: ("RM4 pro", "Broadlink"), + 0x653C: ("RM4 pro", "Broadlink"), + }, + a1: { + 0x2714: ("A1", "Broadlink"), + }, + a2: { + 0x4F60: ("A2", "Broadlink"), + }, + mp1: { + 0x4EB5: ("MP1-1K4S", "Broadlink"), + 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), + 0x4F65: ("MP1-1K3S2U", "Broadlink"), + }, + mp1s: { + 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), + }, + lb1: { + 0x5043: ("SB800TD", "Broadlink (OEM)"), + 0x504E: ("LB1", "Broadlink"), + 0x606D: ("SLA22RGB9W81/SLA27RGB9W81", "Luceco"), + 0x606E: ("SB500TD", "Broadlink (OEM)"), + 0x60C7: ("LB1", "Broadlink"), + 0x60C8: ("LB1", "Broadlink"), + 0x6112: ("LB1", "Broadlink"), + 0x644B: ("LB1", "Broadlink"), + 0x644C: ("LB27 R1", "Broadlink"), + 0x644E: ("LB26 R1", "Broadlink"), + 0x6488: ("LB27 C1", "Broadlink"), + }, + lb2: { + 0xA4F4: ("LB27 R1", "Broadlink"), + 0xA5F7: ("LB27 R1", "Broadlink"), + 0xA6EF: ("EFCF60WSMT", "Luceco"), + }, + S1C: { + 0x2722: ("S2KIT", "Broadlink"), + }, + s3: { + 0xA59C: ("S3", "Broadlink"), + 0xA64D: ("S3", "Broadlink"), + }, + hvac: { + 0x4E2A: ("HVAC", "Licensed manufacturer"), + }, + hysen: { + 0x4EAD: ("HY02/HY03", "Hysen"), + }, + dooya: { + 0x4E4D: ("DT360E-45/20", "Dooya"), + }, + dooya2: { + 0x4F6E: ("DT360E-45/20", "Dooya"), + }, + wser: { + 0x4F6C: ("WSER", "Wistar"), + }, + bg1: { + 0x51E3: ("BG800/BG900", "BG Electrical"), + }, + ehc31: { + 0x6480: ("EHC31", "BG Electrical"), + }, +} + + +def gendevice( + dev_type: int, + host: Tuple[str, int], + mac: Union[bytes, str], + name: str = "", + is_locked: bool = False, +) -> Device: + """Generate a device.""" + for dev_cls, products in SUPPORTED_TYPES.items(): try: - self.cs.sendto(packet, self.host) - self.cs.settimeout(1) - response = self.cs.recvfrom(1024) - break - except socket.timeout: - if (time.time() - starttime) < self.timeout: - pass - raise - return bytearray(response[0]) - - -class mp1(device): - def __init__ (self, host, mac): - device.__init__(self, host, mac) - self.type = "MP1" - - def set_power_mask(self, sid_mask, state): - """Sets the power state of the smart power strip.""" - - packet = bytearray(16) - packet[0x00] = 0x0d - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xb2 + ((sid_mask<<1) if state else sid_mask) - packet[0x07] = 0xc0 - packet[0x08] = 0x02 - packet[0x0a] = 0x03 - packet[0x0d] = sid_mask - packet[0x0e] = sid_mask if state else 0 - - response = self.send_packet(0x6a, packet) - - err = response[0x22] | (response[0x23] << 8) - - def set_power(self, sid, state): - """Sets the power state of the smart power strip.""" - sid_mask = 0x01 << (sid - 1) - return self.set_power_mask(sid_mask, state) - - def check_power_raw(self): - """Returns the power state of the smart power strip in raw format.""" - packet = bytearray(16) - packet[0x00] = 0x0a - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xae - packet[0x07] = 0xc0 - packet[0x08] = 0x01 - - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - payload = aes.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - state = payload[0x0e] - else: - state = ord(payload[0x0e]) - return state - - def check_power(self): - """Returns the power state of the smart power strip.""" - state = self.check_power_raw() - data = {} - data['s1'] = bool(state & 0x01) - data['s2'] = bool(state & 0x02) - data['s3'] = bool(state & 0x04) - data['s4'] = bool(state & 0x08) - return data - - -class sp1(device): - def __init__ (self, host, mac): - device.__init__(self, host, mac) - self.type = "SP1" - - def set_power(self, state): - packet = bytearray(4) - packet[0] = state - self.send_packet(0x66, packet) - - -class sp2(device): - def __init__ (self, host, mac): - device.__init__(self, host, mac) - self.type = "SP2" - - def set_power(self, state): - """Sets the power state of the smart plug.""" - packet = bytearray(16) - packet[0] = 2 - packet[4] = 1 if state else 0 - self.send_packet(0x6a, packet) - - def check_power(self): - """Returns the power state of the smart plug.""" - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - payload = aes.decrypt(bytes(response[0x38:])) - return bool(payload[0x4]) - -class a1(device): - def __init__ (self, host, mac): - device.__init__(self, host, mac) - self.type = "A1" - - def check_sensors(self): - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - data = {} - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - payload = aes.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - data['temperature'] = (payload[0x4] * 10 + payload[0x5]) / 10.0 - data['humidity'] = (payload[0x6] * 10 + payload[0x7]) / 10.0 - light = payload[0x8] - air_quality = payload[0x0a] - noise = payload[0xc] - else: - data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 - data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0 - light = ord(payload[0x8]) - air_quality = ord(payload[0x0a]) - noise = ord(payload[0xc]) - if light == 0: - data['light'] = 'dark' - elif light == 1: - data['light'] = 'dim' - elif light == 2: - data['light'] = 'normal' - elif light == 3: - data['light'] = 'bright' - else: - data['light'] = 'unknown' - if air_quality == 0: - data['air_quality'] = 'excellent' - elif air_quality == 1: - data['air_quality'] = 'good' - elif air_quality == 2: - data['air_quality'] = 'normal' - elif air_quality == 3: - data['air_quality'] = 'bad' - else: - data['air_quality'] = 'unknown' - if noise == 0: - data['noise'] = 'quiet' - elif noise == 1: - data['noise'] = 'normal' - elif noise == 2: - data['noise'] = 'noisy' - else: - data['noise'] = 'unknown' - return data - - def check_sensors_raw(self): - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - data = {} - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - payload = aes.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - data['temperature'] = (payload[0x4] * 10 + payload[0x5]) / 10.0 - data['humidity'] = (payload[0x6] * 10 + payload[0x7]) / 10.0 - data['light'] = payload[0x8] - data['air_quality'] = payload[0x0a] - data['noise'] = payload[0xc] - else: - data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 - data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0 - data['light'] = ord(payload[0x8]) - data['air_quality'] = ord(payload[0x0a]) - data['noise'] = ord(payload[0xc]) - return data - - -class rm(device): - def __init__ (self, host, mac): - device.__init__(self, host, mac) - self.type = "RM2" - - def check_data(self): - packet = bytearray(16) - packet[0] = 4 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - payload = aes.decrypt(bytes(response[0x38:])) - return payload[0x04:] - - def send_data(self, data): - packet = bytearray([0x02, 0x00, 0x00, 0x00]) - packet += data - self.send_packet(0x6a, packet) - - def enter_learning(self): - packet = bytearray(16) - packet[0] = 3 - self.send_packet(0x6a, packet) - - def check_temperature(self): - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - payload = aes.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - temp = (payload[0x4] * 10 + payload[0x5]) / 10.0 - else: - temp = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 - return temp - -# For legay compatibility - don't use this -class rm2(rm): - def __init__ (self): - device.__init__(self, None, None) - - def discover(self): - dev = discover() - self.host = dev.host - self.mac = dev.mac + model, manufacturer = products[dev_type] + + except KeyError: + continue + + return dev_cls( + host, + mac, + dev_type, + name=name, + model=model, + manufacturer=manufacturer, + is_locked=is_locked, + ) + + return Device(host, mac, dev_type, name=name, is_locked=is_locked) + + +def hello( + ip_address: str, + port: int = DEFAULT_PORT, + timeout: int = DEFAULT_TIMEOUT, +) -> Device: + """Direct device discovery. + + Useful if the device is locked. + """ + try: + return next( + xdiscover( + timeout=timeout, + discover_ip_address=ip_address, + discover_ip_port=port, + ) + ) + except StopIteration as err: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {timeout}s", + ) from err + + +def discover( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> List[Device]: + """Discover devices connected to the local network.""" + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) + return [gendevice(*resp) for resp in responses] + + +def xdiscover( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> Generator[Device, None, None]: + """Discover devices connected to the local network. + + This function returns a generator that yields devices instantly. + """ + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) + for resp in responses: + yield gendevice(*resp) + + +# Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. +# Only tested with Broadlink RM3 Mini (Blackbean) +def setup( + ssid: str, + password: str, + security_mode: int, + ip_address: str = DEFAULT_BCAST_ADDR, +) -> None: + """Set up a new Broadlink device via AP mode.""" + # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) + payload = bytearray(0x88) + payload[0x26] = 0x14 # This seems to always be set to 14 + # Add the SSID to the payload + ssid_start = 68 + ssid_length = 0 + for letter in ssid: + payload[(ssid_start + ssid_length)] = ord(letter) + ssid_length += 1 + # Add the WiFi password to the payload + pass_start = 100 + pass_length = 0 + for letter in password: + payload[(pass_start + pass_length)] = ord(letter) + pass_length += 1 + + payload[0x84] = ssid_length # Character length of SSID + payload[0x85] = pass_length # Character length of password + payload[0x86] = security_mode # Type of encryption + + checksum = sum(payload, 0xBEAF) & 0xFFFF + payload[0x20] = checksum & 0xFF # Checksum 1 position + payload[0x21] = checksum >> 8 # Checksum 2 position + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.sendto(payload, (ip_address, DEFAULT_PORT)) + sock.close() diff --git a/broadlink/alarm.py b/broadlink/alarm.py new file mode 100644 index 00000000..a9b5e879 --- /dev/null +++ b/broadlink/alarm.py @@ -0,0 +1,43 @@ +"""Support for alarm kits.""" +from . import exceptions as e +from .device import Device + + +class S1C(Device): + """Controls a Broadlink S1C.""" + + TYPE = "S1C" + + _SENSORS_TYPES = { + 0x31: "Door Sensor", + 0x91: "Key Fob", + 0x21: "Motion Sensor", + } + + def get_sensors_status(self) -> dict: + """Return the state of the sensors.""" + packet = bytearray(16) + packet[0] = 0x06 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + count = payload[0x4] + sensor_data = payload[0x6:] + sensors = [ + bytearray(sensor_data[i * 83 : (i + 1) * 83]) + for i in range(len(sensor_data) // 83) + ] + return { + "count": count, + "sensors": [ + { + "status": sensor[0], + "name": sensor[4:26].decode().strip("\x00"), + "type": self._SENSORS_TYPES.get(sensor[3], "Unknown"), + "order": sensor[1], + "serial": sensor[26:30].hex(), + } + for sensor in sensors + if any(sensor[26:30]) + ], + } diff --git a/broadlink/climate.py b/broadlink/climate.py new file mode 100755 index 00000000..1a0c6006 --- /dev/null +++ b/broadlink/climate.py @@ -0,0 +1,474 @@ +"""Support for climate control.""" +import enum +import struct +from typing import List, Sequence + +from . import exceptions as e +from .device import Device +from .helpers import CRC16 + + +class hysen(Device): + """Controls a Hysen heating thermostat. + + This device is manufactured by Hysen and sold under different + brands, including Floureon, Beca Energy, Beok and Decdeal. + + Supported models: + - HY02B05H + - HY03WE + """ + + TYPE = "HYS" + + def send_request(self, request: Sequence[int]) -> bytes: + """Send a request to the device.""" + packet = bytearray() + packet.extend((len(request) + 2).to_bytes(2, "little")) + packet.extend(request) + packet.extend(CRC16.calculate(request).to_bytes(2, "little")) + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len]) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + return payload[0x02:p_len] + + def _decode_temp(self, payload, base_index): + base_temp = payload[base_index] / 2.0 + add_offset = (payload[4] >> 3) & 1 # should offset be added? + offset_raw_value = (payload[17] >> 4) & 3 # offset value + offset = (offset_raw_value + 1) / 10 if add_offset else 0.0 + return base_temp + offset + + def get_temp(self) -> float: + """Return the room temperature in degrees celsius.""" + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) + return self._decode_temp(payload, 5) + + def get_external_temp(self) -> float: + """Return the external temperature in degrees celsius.""" + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) + return self._decode_temp(payload, 18) + + def get_full_status(self) -> dict: + """Return the state of the device. + + Timer schedule included. + """ + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x16]) + data = {} + data["remote_lock"] = payload[3] & 1 + data["power"] = payload[4] & 1 + data["active"] = (payload[4] >> 4) & 1 + data["temp_manual"] = (payload[4] >> 6) & 1 + data["heating_cooling"] = (payload[4] >> 7) & 1 + data["room_temp"] = self._decode_temp(payload, 5) + data["thermostat_temp"] = payload[6] / 2.0 + data["auto_mode"] = payload[7] & 0x0F + data["loop_mode"] = payload[7] >> 4 + data["sensor"] = payload[8] + data["osv"] = payload[9] + data["dif"] = payload[10] + data["svh"] = payload[11] + data["svl"] = payload[12] + data["room_temp_adj"] = ( + int.from_bytes(payload[13:15], "big", signed=True) / 10.0 + ) + data["fre"] = payload[15] + data["poweron"] = payload[16] + data["unknown"] = payload[17] + data["external_temp"] = self._decode_temp(payload, 18) + data["hour"] = payload[19] + data["min"] = payload[20] + data["sec"] = payload[21] + data["dayofweek"] = payload[22] + + weekday = [] + for i in range(0, 6): + weekday.append( + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekday"] = weekday + weekend = [] + for i in range(6, 8): + weekend.append( + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekend"] = weekend + return data + + # Change controller mode + # auto_mode = 1 for auto (scheduled/timed) mode, 0 for manual mode. + # Manual mode will activate last used temperature. + # In typical usage call set_temp to activate manual control and set temp. + # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] + # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) + # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) + # The sensor command is currently experimental + def set_mode( + self, auto_mode: int, loop_mode: int, sensor: int = 0 + ) -> None: + """Set the mode of the device.""" + mode_byte = ((loop_mode + 1) << 4) + auto_mode + self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) + + # Advanced settings + # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, + # 2 for internal control temperature, external limit temperature. Factory default: 0. + # Set temperature range for external sensor (OSV) osv = 5..99. Factory default: 42C + # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C + # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C + # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C + # Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C + # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, + # 1 for anti-freezing function open. Factory default: 0 + # Power on memory (POn) poweron = 0 for off, 1 for on. Default: 0 + def set_advanced( + self, + loop_mode: int, + sensor: int, + osv: int, + dif: int, + svh: int, + svl: int, + adj: float, + fre: int, + poweron: int, + ) -> None: + """Set advanced options.""" + self.send_request( + [ + 0x01, + 0x10, + 0x00, + 0x02, + 0x00, + 0x05, + 0x0A, + loop_mode, + sensor, + osv, + dif, + svh, + svl, + int(adj * 10) >> 8 & 0xFF, + int(adj * 10) & 0xFF, + fre, + poweron, + ] + ) + + # For backwards compatibility only. Prefer calling set_mode directly. + # Note this function invokes loop_mode=0 and sensor=0. + def switch_to_auto(self) -> None: + """Switch mode to auto.""" + self.set_mode(auto_mode=1, loop_mode=0) + + def switch_to_manual(self) -> None: + """Switch mode to manual.""" + self.set_mode(auto_mode=0, loop_mode=0) + + # Set temperature for manual mode (also activates manual mode if currently in automatic) + def set_temp(self, temp: float) -> None: + """Set the target temperature.""" + self.send_request([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)]) + + # Set device on(1) or off(0), does not deactivate Wifi connectivity. + # Remote lock disables control by buttons on thermostat. + # heating_cooling: heating(0) cooling(1) + def set_power( + self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0 + ) -> None: + """Set the power state of the device.""" + state = (heating_cooling << 7) + power + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state]) + + # set time on device + # n.b. day=1 is Monday, ..., day=7 is Sunday + def set_time(self, hour: int, minute: int, second: int, day: int) -> None: + """Set the time.""" + self.send_request( + [ + 0x01, + 0x10, + 0x00, + 0x08, + 0x00, + 0x02, + 0x04, + hour, + minute, + second, + day + ] + ) + + # Set timer schedule + # Format is the same as you get from get_full_status. + # weekday is a list (ordered) of 6 dicts like: + # {'start_hour':17, 'start_minute':30, 'temp': 22 } + # Each one specifies the thermostat temp that will become effective at start_hour:start_minute + # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: + """Set timer schedule.""" + request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] + + # weekday times + for i in range(0, 6): + request.append(weekday[i]["start_hour"]) + request.append(weekday[i]["start_minute"]) + + # weekend times + for i in range(0, 2): + request.append(weekend[i]["start_hour"]) + request.append(weekend[i]["start_minute"]) + + # weekday temperatures + for i in range(0, 6): + request.append(int(weekday[i]["temp"] * 2)) + + # weekend temperatures + for i in range(0, 2): + request.append(int(weekend[i]["temp"] * 2)) + + self.send_request(request) + + +class hvac(Device): + """Controls a HVAC. + + Supported models: + - Tornado SMART X SQ series + - Aux ASW-H12U3/JIR1DI-US + - Aux ASW-H36U2/LFR1DI-US + """ + + TYPE = "HVAC" + + @enum.unique + class Mode(enum.IntEnum): + """Enumerates modes.""" + + AUTO = 0 + COOL = 1 + DRY = 2 + HEAT = 3 + FAN = 4 + + @enum.unique + class Speed(enum.IntEnum): + """Enumerates fan speed.""" + + HIGH = 1 + MID = 2 + LOW = 3 + AUTO = 5 + + @enum.unique + class Preset(enum.IntEnum): + """Enumerates presets.""" + + NORMAL = 0 + TURBO = 1 + MUTE = 2 + + @enum.unique + class SwHoriz(enum.IntEnum): + """Enumerates horizontal swing.""" + + ON = 0 + OFF = 7 + + @enum.unique + class SwVert(enum.IntEnum): + """Enumerates vertical swing.""" + + ON = 0 + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 7 + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + packet = bytearray(10) + p_len = 10 + len(data) + struct.pack_into( + " bytes: + """Decode data from transport.""" + # payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(response[0x38:]) + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + d_len = int.from_bytes(payload[0x08:0x0A], "little") + return payload[0x0A:0x0A+d_len] + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a command to the unit.""" + prefix = bytes([((command << 4) | 1), 1]) + packet = self._encode(prefix + data) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response)[0x02:] + + def _parse_state(self, data: bytes) -> dict: + """Parse state.""" + state = {} + state["power"] = bool(data[0x08] & 1 << 5) + state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5 + state["swing_v"] = self.SwVert(data[0x00] & 0b111) + state["swing_h"] = self.SwHoriz(data[0x01] >> 5) + state["mode"] = self.Mode(data[0x05] >> 5) + state["speed"] = self.Speed(data[0x03] >> 5) + state["preset"] = self.Preset(data[0x04] >> 6) + state["sleep"] = bool(data[0x05] & 1 << 2) + state["ifeel"] = bool(data[0x05] & 1 << 3) + state["health"] = bool(data[0x08] & 1 << 1) + state["clean"] = bool(data[0x08] & 1 << 2) + state["display"] = bool(data[0x0A] & 1 << 4) + state["mildew"] = bool(data[0x0A] & 1 << 3) + return state + + def set_state( + self, + power: bool, + target_temp: float, # 16<=target_temp<=32 + mode: Mode, + speed: Speed, + preset: Preset, + swing_h: SwHoriz, + swing_v: SwVert, + sleep: bool, + ifeel: bool, + display: bool, + health: bool, + clean: bool, + mildew: bool, + ) -> dict: + """Set the state of the device.""" + # TODO: decode unknown bits + UNK0 = 0b100 + UNK1 = 0b1101 + UNK2 = 0b101 + + target_temp = round(target_temp * 2) / 2 + + if preset == self.Preset.MUTE: + if mode != self.Mode.FAN: + raise ValueError("mute is only available in fan mode") + speed = self.Speed.LOW + + elif preset == self.Preset.TURBO: + if mode not in {self.Mode.COOL, self.Mode.HEAT}: + raise ValueError("turbo is only available in cooling/heating") + speed = self.Speed.HIGH + + data = bytearray(0x0D) + data[0x00] = (int(target_temp) - 8 << 3) | swing_v + data[0x01] = (swing_h << 5) | UNK0 + data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1 + data[0x03] = speed << 5 + data[0x04] = preset << 6 + data[0x05] = mode << 5 | sleep << 2 | ifeel << 3 + data[0x08] = power << 5 | clean << 2 | (health and 0b11) + data[0x0A] = display << 4 | mildew << 3 + data[0x0C] = UNK2 + + resp = self._send(0, data) + return self._parse_state(resp) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16 dict: + """Returns dictionary with AC info. + + Returns: + dict: + power (bool): power + ambient_temp (float): ambient temperature + """ + resp = self._send(2) + + if len(resp) < 22: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 24 bytes and received {len(resp) + 2}", + ) + + ac_info = {} + ac_info["power"] = resp[0x1] & 1 + + ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111 + if any(ambient_temp): + ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0 + + return ac_info diff --git a/broadlink/const.py b/broadlink/const.py new file mode 100644 index 00000000..19c37f52 --- /dev/null +++ b/broadlink/const.py @@ -0,0 +1,5 @@ +"""Constants.""" +DEFAULT_BCAST_ADDR = "255.255.255.255" +DEFAULT_PORT = 80 +DEFAULT_RETRY_INTVL = 1 +DEFAULT_TIMEOUT = 10 diff --git a/broadlink/cover.py b/broadlink/cover.py new file mode 100644 index 00000000..75317943 --- /dev/null +++ b/broadlink/cover.py @@ -0,0 +1,182 @@ +"""Support for covers.""" +import time +from typing import Sequence + +from . import exceptions as e +from .device import Device + + +class dooya(Device): + """Controls a Dooya curtain motor.""" + + TYPE = "DT360E" + + def _send(self, command: int, attribute: int = 0) -> int: + """Send a packet to the device.""" + packet = bytearray(16) + packet[0x00] = 0x09 + packet[0x02] = 0xBB + packet[0x03] = command + packet[0x04] = attribute + packet[0x09] = 0xFA + packet[0x0A] = 0x44 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload[4] + + def open(self) -> int: + """Open the curtain.""" + return self._send(0x01) + + def close(self) -> int: + """Close the curtain.""" + return self._send(0x02) + + def stop(self) -> int: + """Stop the curtain.""" + return self._send(0x03) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + return self._send(0x06, 0x5D) + + def set_percentage_and_wait(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + current = self.get_percentage() + if current > new_percentage: + self.close() + while current is not None and current > new_percentage: + time.sleep(0.2) + current = self.get_percentage() + + elif current < new_percentage: + self.open() + while current is not None and current < new_percentage: + time.sleep(0.2) + current = self.get_percentage() + self.stop() + + +class dooya2(Device): + """Controls a Dooya curtain motor (version 2).""" + + TYPE = "DT360E-2" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def open(self) -> None: + """Open the curtain.""" + self._send(2, [0x00, 0x01, 0x00]) + + def close(self) -> None: + """Close the curtain.""" + self._send(2, [0x00, 0x02, 0x00]) + + def stop(self) -> None: + """Stop the curtain.""" + self._send(2, [0x00, 0x03, 0x00]) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, [0x00, 0x06, 0x00]) + return resp[0x11] + + def set_percentage(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + self._send(2, [0x00, 0x09, new_percentage]) + + +class wser(Device): + """Controls a Wistar curtain motor""" + + TYPE = "WSER" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def get_position(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, []) + position = resp[0x0E] + return position + + def open(self) -> int: + """Open the curtain.""" + resp = self._send(2, [0x4A, 0x31, 0xA0]) + position = resp[0x0E] + return position + + def close(self) -> int: + """Close the curtain.""" + resp = self._send(2, [0x61, 0x32, 0xA0]) + position = resp[0x0E] + return position + + def stop(self) -> int: + """Stop the curtain.""" + resp = self._send(2, [0x4C, 0x73, 0xA0]) + position = resp[0x0E] + return position + + def set_position(self, position: int) -> int: + """Set the position of the curtain.""" + resp = self._send(2, [position, 0x70, 0xA0]) + position = resp[0x0E] + return position diff --git a/broadlink/device.py b/broadlink/device.py new file mode 100644 index 00000000..5a10bc01 --- /dev/null +++ b/broadlink/device.py @@ -0,0 +1,332 @@ +"""Support for Broadlink devices.""" +import socket +import threading +import random +import time +from typing import Generator, Optional, Tuple, Union + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from . import exceptions as e +from .const import ( + DEFAULT_BCAST_ADDR, + DEFAULT_PORT, + DEFAULT_RETRY_INTVL, + DEFAULT_TIMEOUT, +) +from .protocol import Datetime + +HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] + + +def scan( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> Generator[HelloResponse, None, None]: + """Broadcast a hello message and yield responses.""" + conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + if local_ip_address: + conn.bind((local_ip_address, 0)) + port = conn.getsockname()[1] + else: + local_ip_address = "0.0.0.0" + port = 0 + + packet = bytearray(0x30) + packet[0x08:0x14] = Datetime.pack(Datetime.now()) + packet[0x18:0x1C] = socket.inet_aton(local_ip_address)[::-1] + packet[0x1C:0x1E] = port.to_bytes(2, "little") + packet[0x26] = 6 + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20:0x22] = checksum.to_bytes(2, "little") + + start_time = time.time() + discovered = [] + + try: + while (time.time() - start_time) < timeout: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) + conn.sendto(packet, (discover_ip_address, discover_ip_port)) + + while True: + try: + resp, host = conn.recvfrom(1024) + except socket.timeout: + break + + devtype = resp[0x34] | resp[0x35] << 8 + mac = resp[0x3A:0x40][::-1] + + if (host, mac, devtype) in discovered: + continue + discovered.append((host, mac, devtype)) + + name = resp[0x40:].split(b"\x00")[0].decode() + is_locked = bool(resp[0x7F]) + yield devtype, host, mac, name, is_locked + finally: + conn.close() + + +def ping(ip_address: str, port: int = DEFAULT_PORT) -> None: + """Send a ping packet to an address. + + This packet feeds the watchdog timer of firmwares >= v53. + Useful to prevent reboots when the cloud cannot be reached. + It must be sent every 2 minutes in such cases. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + packet = bytearray(0x30) + packet[0x26] = 1 + conn.sendto(packet, (ip_address, port)) + + +class Device: + """Controls a Broadlink device.""" + + TYPE = "Unknown" + + __INIT_KEY = "097628343fe99e23765c1513accf8b02" + __INIT_VECT = "562e17996d093d28ddb3ba695a2e6f58" + + def __init__( + self, + host: Tuple[str, int], + mac: Union[bytes, str], + devtype: int, + timeout: int = DEFAULT_TIMEOUT, + name: str = "", + model: str = "", + manufacturer: str = "", + is_locked: bool = False, + ) -> None: + """Initialize the controller.""" + self.host = host + self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac + self.devtype = devtype + self.timeout = timeout + self.name = name + self.model = model + self.manufacturer = manufacturer + self.is_locked = is_locked + self.count = random.randint(0x8000, 0xFFFF) + self.iv = bytes.fromhex(self.__INIT_VECT) + self.id = 0 + self.type = self.TYPE # For backwards compatibility. + self.lock = threading.Lock() + + self.aes = None + self.update_aes(bytes.fromhex(self.__INIT_KEY)) + + def __repr__(self) -> str: + """Return a formal representation of the device.""" + return ( + "%s.%s(%s, mac=%r, devtype=%r, timeout=%r, name=%r, " + "model=%r, manufacturer=%r, is_locked=%r)" + ) % ( + self.__class__.__module__, + self.__class__.__qualname__, + self.host, + self.mac, + self.devtype, + self.timeout, + self.name, + self.model, + self.manufacturer, + self.is_locked, + ) + + def __str__(self) -> str: + """Return a readable representation of the device.""" + return "%s (%s / %s:%s / %s)" % ( + self.name or "Unknown", + " ".join(filter(None, [self.manufacturer, self.model, hex(self.devtype)])), + *self.host, + ":".join(format(x, "02X") for x in self.mac), + ) + + def update_aes(self, key: bytes) -> None: + """Update AES.""" + self.aes = Cipher( + algorithms.AES(bytes(key)), modes.CBC(self.iv), backend=default_backend() + ) + + def encrypt(self, payload: bytes) -> bytes: + """Encrypt the payload.""" + encryptor = self.aes.encryptor() + return encryptor.update(bytes(payload)) + encryptor.finalize() + + def decrypt(self, payload: bytes) -> bytes: + """Decrypt the payload.""" + decryptor = self.aes.decryptor() + return decryptor.update(bytes(payload)) + decryptor.finalize() + + def auth(self) -> bool: + """Authenticate to the device.""" + self.id = 0 + self.update_aes(bytes.fromhex(self.__INIT_KEY)) + + packet = bytearray(0x50) + packet[0x04:0x14] = [0x31] * 16 + packet[0x1E] = 0x01 + packet[0x2D] = 0x01 + packet[0x30:0x36] = "Test 1".encode() + + response = self.send_packet(0x65, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + self.id = int.from_bytes(payload[:0x4], "little") + self.update_aes(payload[0x04:0x14]) + return True + + def hello(self, local_ip_address=None) -> bool: + """Send a hello message to the device. + + Device information is checked before updating name and lock status. + """ + responses = scan( + timeout=self.timeout, + local_ip_address=local_ip_address, + discover_ip_address=self.host[0], + discover_ip_port=self.host[1], + ) + try: + devtype, _, mac, name, is_locked = next(responses) + + except StopIteration as err: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {self.timeout}s", + ) from err + + if mac != self.mac: + raise e.DataValidationError( + -2040, + "Device information is not intact", + "The MAC address is different", + f"Expected {self.mac} and received {mac}", + ) + + if devtype != self.devtype: + raise e.DataValidationError( + -2040, + "Device information is not intact", + "The product ID is different", + f"Expected {self.devtype} and received {devtype}", + ) + + self.name = name + self.is_locked = is_locked + return True + + def ping(self) -> None: + """Ping the device. + + This packet feeds the watchdog timer of firmwares >= v53. + Useful to prevent reboots when the cloud cannot be reached. + It must be sent every 2 minutes in such cases. + """ + ping(self.host[0], port=self.host[1]) + + def get_fwversion(self) -> int: + """Get firmware version.""" + packet = bytearray([0x68]) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x4] | payload[0x5] << 8 + + def set_name(self, name: str) -> None: + """Set device name.""" + packet = bytearray(4) + packet += name.encode("utf-8") + packet += bytearray(0x50 - len(packet)) + packet[0x43] = self.is_locked + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + self.name = name + + def set_lock(self, state: bool) -> None: + """Lock/unlock the device.""" + packet = bytearray(4) + packet += self.name.encode("utf-8") + packet += bytearray(0x50 - len(packet)) + packet[0x43] = bool(state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + self.is_locked = bool(state) + + def get_type(self) -> str: + """Return device type.""" + return self.type + + def send_packet(self, packet_type: int, payload: bytes) -> bytes: + """Send a packet to the device.""" + self.count = ((self.count + 1) | 0x8000) & 0xFFFF + packet = bytearray(0x38) + packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55") + packet[0x24:0x26] = self.devtype.to_bytes(2, "little") + packet[0x26:0x28] = packet_type.to_bytes(2, "little") + packet[0x28:0x2A] = self.count.to_bytes(2, "little") + packet[0x2A:0x30] = self.mac[::-1] + packet[0x30:0x34] = self.id.to_bytes(4, "little") + + p_checksum = sum(payload, 0xBEAF) & 0xFFFF + packet[0x34:0x36] = p_checksum.to_bytes(2, "little") + + padding = (16 - len(payload)) % 16 + payload = self.encrypt(payload + bytes(padding)) + packet.extend(payload) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20:0x22] = checksum.to_bytes(2, "little") + + with self.lock and socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + timeout = self.timeout + start_time = time.time() + + while True: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) + conn.sendto(packet, self.host) + + try: + resp = conn.recvfrom(2048)[0] + break + except socket.timeout as err: + if (time.time() - start_time) > timeout: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {timeout}s", + ) from err + + if len(resp) < 0x30: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 48 bytes and received {len(resp)}", + ) + + nom_checksum = int.from_bytes(resp[0x20:0x22], "little") + real_checksum = sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF + + if nom_checksum != real_checksum: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_checksum} and received {real_checksum}", + ) + + return resp diff --git a/broadlink/exceptions.py b/broadlink/exceptions.py new file mode 100644 index 00000000..2343ad6e --- /dev/null +++ b/broadlink/exceptions.py @@ -0,0 +1,152 @@ +"""Exceptions for Broadlink devices.""" +import collections +import struct + + +class BroadlinkException(Exception): + """Base class common to all Broadlink exceptions.""" + + def __init__(self, *args, **kwargs): + """Initialize the exception.""" + super().__init__(*args, **kwargs) + if len(args) >= 2: + self.errno = args[0] + self.strerror = ": ".join(str(arg) for arg in args[1:]) + elif len(args) == 1: + self.errno = None + self.strerror = str(args[0]) + else: + self.errno = None + self.strerror = "" + + def __str__(self): + """Return str(self).""" + if self.errno is not None: + return "[Errno %s] %s" % (self.errno, self.strerror) + return self.strerror + + def __eq__(self, other): + """Return self==value.""" + # pylint: disable=unidiomatic-typecheck + return type(self) == type(other) and self.args == other.args + + def __hash__(self): + """Return hash(self).""" + return hash((type(self), self.args)) + + +class MultipleErrors(BroadlinkException): + """Multiple errors.""" + + def __init__(self, *args, **kwargs): + """Initialize the exception.""" + errors = args[0][:] if args else [] + counter = collections.Counter(errors) + strerror = "Multiple errors occurred: %s" % counter + super().__init__(strerror, **kwargs) + self.errors = errors + + def __repr__(self): + """Return repr(self).""" + return "MultipleErrors(%r)" % self.errors + + def __str__(self): + """Return str(self).""" + return self.strerror + + +class AuthenticationError(BroadlinkException): + """Authentication error.""" + + +class AuthorizationError(BroadlinkException): + """Authorization error.""" + + +class CommandNotSupportedError(BroadlinkException): + """Command not supported error.""" + + +class ConnectionClosedError(BroadlinkException): + """Connection closed error.""" + + +class StructureAbnormalError(BroadlinkException): + """Structure abnormal error.""" + + +class DeviceOfflineError(BroadlinkException): + """Device offline error.""" + + +class ReadError(BroadlinkException): + """Read error.""" + + +class SendError(BroadlinkException): + """Send error.""" + + +class SSIDNotFoundError(BroadlinkException): + """SSID not found error.""" + + +class StorageError(BroadlinkException): + """Storage error.""" + + +class WriteError(BroadlinkException): + """Write error.""" + + +class NetworkTimeoutError(BroadlinkException): + """Network timeout error.""" + + +class DataValidationError(BroadlinkException): + """Data validation error.""" + + +class UnknownError(BroadlinkException): + """Unknown error.""" + + +BROADLINK_EXCEPTIONS = { + # Firmware-related errors are generated by the device. + -1: (AuthenticationError, "Authentication failed"), + -2: (ConnectionClosedError, "You have been logged out"), + -3: (DeviceOfflineError, "The device is offline"), + -4: (CommandNotSupportedError, "Command not supported"), + -5: (StorageError, "The device storage is full"), + -6: (StructureAbnormalError, "Structure is abnormal"), + -7: (AuthorizationError, "Control key is expired"), + -8: (SendError, "Send error"), + -9: (WriteError, "Write error"), + -10: (ReadError, "Read error"), + -11: (SSIDNotFoundError, "SSID could not be found in AP configuration"), + # SDK related errors are generated by this module. + -2040: (DataValidationError, "Device information is not intact"), + -4000: (NetworkTimeoutError, "Network timeout"), + -4007: (DataValidationError, "Received data packet length error"), + -4008: (DataValidationError, "Received data packet check error"), + -4009: (DataValidationError, "Received data packet information type error"), + -4010: (DataValidationError, "Received encrypted data packet length error"), + -4011: (DataValidationError, "Received encrypted data packet check error"), + -4012: (AuthorizationError, "Device control ID error"), +} + + +def exception(err_code: int) -> BroadlinkException: + """Return exception corresponding to an error code.""" + try: + exc, msg = BROADLINK_EXCEPTIONS[err_code] + return exc(err_code, msg) + except KeyError: + return UnknownError(err_code, "Unknown error") + + +def check_error(error: bytes) -> None: + """Raise exception if an error occurred.""" + error_code = struct.unpack("h", error)[0] + if error_code: + raise exception(error_code) diff --git a/broadlink/helpers.py b/broadlink/helpers.py new file mode 100644 index 00000000..e7b3d4c9 --- /dev/null +++ b/broadlink/helpers.py @@ -0,0 +1,43 @@ +"""Helper functions and classes.""" +from typing import Dict, List, Sequence + + +class CRC16: + """Helps with CRC-16 calculation. + + CRC tables are cached for performance. + """ + + _cache: Dict[int, List[int]] = {} + + @classmethod + def get_table(cls, polynomial: int) -> List[int]: + """Return the CRC-16 table for a polynomial.""" + try: + crc_table = cls._cache[polynomial] + except KeyError: + crc_table = [] + for dividend in range(0, 256): + remainder = dividend + for _ in range(0, 8): + if remainder & 1: + remainder = remainder >> 1 ^ polynomial + else: + remainder = remainder >> 1 + crc_table.append(remainder) + cls._cache[polynomial] = crc_table + return crc_table + + @classmethod + def calculate( + cls, + sequence: Sequence[int], + polynomial: int = 0xA001, # CRC-16-ANSI. + init_value: int = 0xFFFF, + ) -> int: + """Calculate the CRC-16 of a sequence of integers.""" + crc_table = cls.get_table(polynomial) + crc = init_value + for item in sequence: + crc = crc >> 8 ^ crc_table[(crc ^ item) & 0xFF] + return crc diff --git a/broadlink/hub.py b/broadlink/hub.py new file mode 100644 index 00000000..0fd4ae53 --- /dev/null +++ b/broadlink/hub.py @@ -0,0 +1,98 @@ +"""Support for hubs.""" +import struct +import json +from typing import Optional + +from . import exceptions as e +from .device import Device + + +class s3(Device): + """Controls a Broadlink S3.""" + + TYPE = "S3" + MAX_SUBDEVICES = 8 + + def get_subdevices(self, step: int = 5) -> list: + """Return a list of sub devices.""" + total = self.MAX_SUBDEVICES + sub_devices = [] + seen = set() + index = 0 + + while index < total: + state = {"count": step, "index": index} + packet = self._encode(14, state) + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + resp = self._decode(resp) + + for device in resp["list"]: + did = device["did"] + if did in seen: + continue + + seen.add(did) + sub_devices.append(device) + + total = resp["total"] + if len(seen) >= total: + break + + index += step + + return sub_devices + + def get_state(self, did: Optional[str] = None) -> dict: + """Return the power state of the device.""" + state = {} + if did is not None: + state["did"] = did + + packet = self._encode(1, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + did: Optional[str] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + pwr3: Optional[bool] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if did is not None: + state["did"] = did + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': '', 'bulb_sceneidx': 255}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + bulb_sceneidx: Optional[int] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if red is not None: + state["red"] = int(red) + if blue is not None: + state["blue"] = int(blue) + if green is not None: + state["green"] = int(green) + if brightness is not None: + state["brightness"] = int(brightness) + if colortemp is not None: + state["colortemp"] = int(colortemp) + if hue is not None: + state["hue"] = int(hue) + if saturation is not None: + state["saturation"] = int(saturation) + if transitionduration is not None: + state["transitionduration"] = int(transitionduration) + if maxworktime is not None: + state["maxworktime"] = int(maxworktime) + if bulb_colormode is not None: + state["bulb_colormode"] = int(bulb_colormode) + if bulb_scenes is not None: + state["bulb_scenes"] = str(bulb_scenes) + if bulb_scene is not None: + state["bulb_scene"] = str(bulb_scene) + if bulb_sceneidx is not None: + state["bulb_sceneidx"] = int(bulb_sceneidx) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(14) + data = json.dumps(state, separators=(",", ":")).encode() + p_len = 12 + len(data) + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': ''}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if red is not None: + state["red"] = int(red) + if blue is not None: + state["blue"] = int(blue) + if green is not None: + state["green"] = int(green) + if brightness is not None: + state["brightness"] = int(brightness) + if colortemp is not None: + state["colortemp"] = int(colortemp) + if hue is not None: + state["hue"] = int(hue) + if saturation is not None: + state["saturation"] = int(saturation) + if transitionduration is not None: + state["transitionduration"] = int(transitionduration) + if maxworktime is not None: + state["maxworktime"] = int(maxworktime) + if bulb_colormode is not None: + state["bulb_colormode"] = int(bulb_colormode) + if bulb_scenes is not None: + state["bulb_scenes"] = str(bulb_scenes) + if bulb_scene is not None: + state["bulb_scene"] = str(bulb_scene) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" bytes: + """Pack the timestamp to be sent over the Broadlink protocol.""" + data = bytearray(12) + utcoffset = int(datetime.utcoffset().total_seconds() / 3600) + data[:0x04] = utcoffset.to_bytes(4, "little", signed=True) + data[0x04:0x06] = datetime.year.to_bytes(2, "little") + data[0x06] = datetime.minute + data[0x07] = datetime.hour + data[0x08] = int(datetime.strftime("%y")) + data[0x09] = datetime.isoweekday() + data[0x0A] = datetime.day + data[0x0B] = datetime.month + return data + + @staticmethod + def unpack(data: bytes) -> dt.datetime: + """Unpack a timestamp received over the Broadlink protocol.""" + utcoffset = int.from_bytes(data[0x00:0x04], "little", signed=True) + year = int.from_bytes(data[0x04:0x06], "little") + minute = data[0x06] + hour = data[0x07] + subyear = data[0x08] + isoweekday = data[0x09] + day = data[0x0A] + month = data[0x0B] + + tz_info = dt.timezone(dt.timedelta(hours=utcoffset)) + datetime = dt.datetime(year, month, day, hour, minute, 0, 0, tz_info) + + if datetime.isoweekday() != isoweekday: + raise ValueError("isoweekday does not match") + if int(datetime.strftime("%y")) != subyear: + raise ValueError("subyear does not match") + + return datetime + + @staticmethod + def now() -> dt.datetime: + """Return the current date and time with timezone info.""" + tz_info = dt.timezone(dt.timedelta(seconds=-time.timezone)) + return dt.datetime.now(tz_info) diff --git a/broadlink/remote.py b/broadlink/remote.py new file mode 100644 index 00000000..60c54ce2 --- /dev/null +++ b/broadlink/remote.py @@ -0,0 +1,173 @@ +"""Support for universal remotes.""" +import struct +from typing import List, Optional, Tuple + +from . import exceptions as e +from .device import Device + + +def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 + + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: + result.append(0) + result.append(div) + result.append(mod) + + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result + + +def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" + result = [] + index = 4 + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] + index += 1 + + if chunk == 0: + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError as err: + raise ValueError("Malformed data.") from err + index += 2 + + result.append(int(chunk * tick)) + + return result + + +class rmmini(Device): + """Controls a Broadlink RM mini 3.""" + + TYPE = "RMMINI" + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" None: + """Update device name and lock status.""" + resp = self._send(0x1) + self.name = resp[0x48:].split(b"\x00")[0].decode() + self.is_locked = bool(resp[0x87]) + + def send_data(self, data: bytes) -> None: + """Send a code to the device.""" + self._send(0x2, data) + + def enter_learning(self) -> None: + """Enter infrared learning mode.""" + self._send(0x3) + + def check_data(self) -> bytes: + """Return the last captured code.""" + return self._send(0x4) + + +class rmpro(rmmini): + """Controls a Broadlink RM pro.""" + + TYPE = "RMPRO" + + def sweep_frequency(self) -> None: + """Sweep frequency.""" + self._send(0x19) + + def check_frequency(self) -> Tuple[bool, float]: + """Return True if the frequency was identified successfully.""" + resp = self._send(0x1A) + is_found = bool(resp[0]) + frequency = struct.unpack(" None: + """Enter radiofrequency learning mode.""" + payload = bytearray() + if frequency: + payload += struct.pack(" None: + """Cancel sweep frequency.""" + self._send(0x1E) + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + resp = self._send(0x1) + temp = struct.unpack(" float: + """Return the temperature.""" + return self.check_sensors()["temperature"] + + +class rmminib(rmmini): + """Controls a Broadlink RM mini 3 (new firmware).""" + + TYPE = "RMMINIB" + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" dict: + """Return the state of the sensors.""" + resp = self._send(0x24) + temp = struct.unpack(" float: + """Return the temperature.""" + return self.check_sensors()["temperature"] + + def check_humidity(self) -> float: + """Return the humidity.""" + return self.check_sensors()["humidity"] + + +class rm4pro(rm4mini, rmpro): + """Controls a Broadlink RM4 pro.""" + + TYPE = "RM4PRO" + + +class rm(rmpro): + """For backwards compatibility.""" + + TYPE = "RM2" + + +class rm4(rm4pro): + """For backwards compatibility.""" + + TYPE = "RM4" diff --git a/broadlink/sensor.py b/broadlink/sensor.py new file mode 100644 index 00000000..284576fa --- /dev/null +++ b/broadlink/sensor.py @@ -0,0 +1,90 @@ +"""Support for sensors.""" +from typing import Sequence + +from . import exceptions as e +from .device import Device + + +class a1(Device): + """Controls a Broadlink A1.""" + + TYPE = "A1" + + _SENSORS_AND_LEVELS = ( + ("light", ("dark", "dim", "normal", "bright")), + ("air_quality", ("excellent", "good", "normal", "bad")), + ("noise", ("quiet", "normal", "noisy")), + ) + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + data = self.check_sensors_raw() + for sensor, levels in self._SENSORS_AND_LEVELS: + try: + data[sensor] = levels[data[sensor]] + except IndexError: + data[sensor] = "unknown" + return data + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + packet = bytearray([0x1]) + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + data = self.decrypt(resp[0x38:]) + + return { + "temperature": data[0x04] + data[0x05] / 10.0, + "humidity": data[0x06] + data[0x07] / 10.0, + "light": data[0x08], + "air_quality": data[0x0A], + "noise": data[0x0C], + } + + +class a2(Device): + """Controls a Broadlink A2.""" + + TYPE = "A2" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + data = self._send(1) + + return { + "temperature": data[0x13] * 256 + data[0x14], + "humidity": data[0x15] * 256 + data[0x16], + "pm10": data[0x0D] * 256 + data[0x0E], + "pm2_5": data[0x0F] * 256 + data[0x10], + "pm1": data[0x11] * 256 + data[0x12], + } diff --git a/broadlink/switch.py b/broadlink/switch.py new file mode 100644 index 00000000..8393f6b1 --- /dev/null +++ b/broadlink/switch.py @@ -0,0 +1,472 @@ +"""Support for switches.""" +import json +import struct +from typing import Optional + +from . import exceptions as e +from .device import Device + + +class sp1(Device): + """Controls a Broadlink SP1.""" + + TYPE = "SP1" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(4) + packet[0] = bool(pwr) + response = self.send_packet(0x66, packet) + e.check_error(response[0x22:0x24]) + + +class sp2(Device): + """Controls a Broadlink SP2.""" + + TYPE = "SP2" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = bool(pwr) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def check_power(self) -> bool: + """Return the power state of the device.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4]) + + +class sp2s(sp2): + """Controls a Broadlink SP2S.""" + + TYPE = "SP2S" + + def get_energy(self) -> float: + """Return the power consumption in W.""" + packet = bytearray(16) + packet[0] = 4 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return int.from_bytes(payload[0x4:0x7], "little") / 1000 + + +class sp3(Device): + """Controls a Broadlink SP3.""" + + TYPE = "SP3" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = self.check_nightlight() << 1 | bool(pwr) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def set_nightlight(self, ntlight: bool) -> None: + """Set the night light state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = bool(ntlight) << 1 | self.check_power() + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def check_power(self) -> bool: + """Return the power state of the device.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4] & 1) + + def check_nightlight(self) -> bool: + """Return the state of the night light.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4] & 2) + + +class sp3s(sp2): + """Controls a Broadlink SP3S.""" + + TYPE = "SP3S" + + def get_energy(self) -> float: + """Return the power consumption in W.""" + packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + energy = payload[0x7:0x4:-1].hex() + return int(energy) / 100 + + +class sp4(Device): + """Controls a Broadlink SP4.""" + + TYPE = "SP4" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + self.set_state(pwr=pwr) + + def set_nightlight(self, ntlight: bool) -> None: + """Set the night light state of the device.""" + self.set_state(ntlight=ntlight) + + def set_state( + self, + pwr: Optional[bool] = None, + ntlight: Optional[bool] = None, + indicator: Optional[bool] = None, + ntlbrightness: Optional[int] = None, + maxworktime: Optional[int] = None, + childlock: Optional[bool] = None, + ) -> dict: + """Set state of device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if ntlight is not None: + state["ntlight"] = int(bool(ntlight)) + if indicator is not None: + state["indicator"] = int(bool(indicator)) + if ntlbrightness is not None: + state["ntlbrightness"] = ntlbrightness + if maxworktime is not None: + state["maxworktime"] = maxworktime + if childlock is not None: + state["childlock"] = int(bool(childlock)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + return self._decode(response) + + def check_power(self) -> bool: + """Return the power state of the device.""" + state = self.get_state() + return bool(state["pwr"]) + + def check_nightlight(self) -> bool: + """Return the state of the night light.""" + state = self.get_state() + return bool(state["ntlight"]) + + def get_state(self) -> dict: + """Get full state of device.""" + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a message.""" + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Get full state of device.""" + state = super().get_state() + + # Convert sensor data to float. Remove keys if sensors are not supported. + sensor_attrs = ["current", "volt", "power", "totalconsum", "overload"] + for attr in sensor_attrs: + value = state.pop(attr, -1) + if value != -1: + state[attr] = value / 1000 + return state + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(14) + data = json.dumps(state, separators=(",", ":")).encode() + length = 12 + len(data) + struct.pack_into( + " dict: + """Decode a message.""" + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + maxworktime: Optional[int] = None, + maxworktime1: Optional[int] = None, + maxworktime2: Optional[int] = None, + idcbrightness: Optional[int] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if maxworktime is not None: + state["maxworktime"] = maxworktime + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(14) + data = json.dumps(state).encode() + length = 12 + len(data) + struct.pack_into( + " dict: + """Decode a message.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if maxworktime3 is not None: + state["maxworktime3"] = maxworktime3 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + if childlock is not None: + state["childlock"] = int(bool(childlock)) + if childlock1 is not None: + state["childlock1"] = int(bool(childlock1)) + if childlock2 is not None: + state["childlock2"] = int(bool(childlock2)) + if childlock3 is not None: + state["childlock3"] = int(bool(childlock3)) + if childlock4 is not None: + state["childlock4"] = int(bool(childlock4)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + +class mp1(Device): + """Controls a Broadlink MP1.""" + + TYPE = "MP1" + + def set_power_mask(self, sid_mask: int, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0x00] = 0x0D + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask) + packet[0x07] = 0xC0 + packet[0x08] = 0x02 + packet[0x0A] = 0x03 + packet[0x0D] = sid_mask + packet[0x0E] = sid_mask if pwr else 0 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def set_power(self, sid: int, pwr: bool) -> None: + """Set the power state of the device.""" + sid_mask = 0x01 << (sid - 1) + self.set_power_mask(sid_mask, pwr) + + def check_power_raw(self) -> int: + """Return the power state of the device in raw format.""" + packet = bytearray(16) + packet[0x00] = 0x0A + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xAE + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x0E] + + def check_power(self) -> dict: + """Return the power state of the device.""" + data = self.check_power_raw() + return { + "s1": bool(data & 1), + "s2": bool(data & 2), + "s3": bool(data & 4), + "s4": bool(data & 8), + } + + +class mp1s(mp1): + """Controls a Broadlink MP1S.""" + + TYPE = "MP1S" + + def get_state(self) -> dict: + """Return the power state of the device. + + voltage in V. + current in A. + power in W. + power consumption in kW·h. + """ + packet = bytearray(16) + packet[0x00] = 0x0E + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + packet[0x0A] = 0x04 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + payload_str = payload.hex()[4:-6] + + def get_value(start, end, factors): + value = sum( + int(payload_str[i-2:i]) * factor + for i, factor in zip(range(start, end, -2), factors) + ) + return value + + return { + "volt": get_value(34, 30, [10, 0.1]), + "current": get_value(40, 34, [1, 0.01, 0.0001]), + "power": get_value(46, 40, [100, 1, 0.01]), + "totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]), + } diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 00000000..b7e48dc9 --- /dev/null +++ b/cli/README.md @@ -0,0 +1,138 @@ +Command line interface for python-broadlink +=========================================== + +This is a command line interface for the python-broadlink API. + + +Requirements +------------ +You need to install the module first: +``` +pip3 install broadlink +``` + +Installation +----------- +Download "broadlink_cli" and "broadlink_discovery". + + +Programs +-------- +* broadlink_discovery: Discover Broadlink devices connected to the local network. + +* broadlink_cli: Send commands and query the Broadlink device. + + +Device specification formats +---------------------------- + +Using separate parameters for each information: +``` +broadlink_cli --type 0x2712 --host 1.1.1.1 --mac aaaaaaaaaa --temp +``` + +Using all parameters as a single argument: +``` +broadlink_cli --device "0x2712 1.1.1.1 aaaaaaaaaa" --temp +``` + +Using file with parameters: +``` +broadlink_cli --device @BEDROOM.device --temp +``` +This is prefered as the configuration is stored in a file and you can change +it later to point to a different device. + +Example usage +------------- + +### Common commands + +#### Join device to the Wi-Fi network +``` +broadlink_cli --joinwifi SSID PASSWORD +``` + +#### Discover devices connected to the local network +``` +broadlink_discovery +``` + +### Universal remotes + +#### Learn IR code and show at console +``` +broadlink_cli --device @BEDROOM.device --learn +``` + +#### Learn RF code and show at console +``` +broadlink_cli --device @BEDROOM.device --rfscanlearn +``` + +#### Learn IR code and save to file +``` +broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power +``` + +#### Learn RF code and save to file +``` +broadlink_cli --device @BEDROOM.device --rfscanlearn --learnfile LG-TV.power +``` + +#### Send code +``` +broadlink_cli --device @BEDROOM.device --send DATA +``` + +#### Send code from file +``` +broadlink_cli --device @BEDROOM.device --send @LG-TV.power +``` + +#### Check temperature +``` +broadlink_cli --device @BEDROOM.device --temperature +``` + +#### Check humidity +``` +broadlink_cli --device @BEDROOM.device --humidity +``` + +### Smart plugs + +#### Turn on +``` +broadlink_cli --device @BEDROOM.device --turnon +``` + +#### Turn off +``` +broadlink_cli --device @BEDROOM.device --turnoff +``` + +#### Turn on nightlight +``` +broadlink_cli --device @BEDROOM.device --turnnlon +``` + +#### Turn off nightlight +``` +broadlink_cli --device @BEDROOM.device --turnnloff +``` + +#### Check power state +``` +broadlink_cli --device @BEDROOM.device --check +``` + +#### Check nightlight state +``` +broadlink_cli --device @BEDROOM.device --checknl +``` + +#### Check power consumption +``` +broadlink_cli --device @BEDROOM.device --energy +``` diff --git a/cli/broadlink_cli b/cli/broadlink_cli new file mode 100755 index 00000000..7913e332 --- /dev/null +++ b/cli/broadlink_cli @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +import argparse +import base64 +import time +from typing import List + +import broadlink +from broadlink.const import DEFAULT_PORT +from broadlink.exceptions import ReadError, StorageError +from broadlink.remote import data_to_pulses, pulses_to_data + +TIMEOUT = 30 + + +def auto_int(x): + return int(x, 0) + + +def format_pulses(pulses: List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) + + +def parse_pulses(data: List[str]) -> List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] + + +parser = argparse.ArgumentParser(fromfile_prefix_chars='@') +parser.add_argument("--device", help="device definition as 'type host mac'") +parser.add_argument("--type", type=auto_int, default=0x2712, help="type of device") +parser.add_argument("--host", help="host address") +parser.add_argument("--mac", help="mac address (hex reverse), as used by python-broadlink library") +parser.add_argument("--temperature", action="store_true", help="request temperature from device") +parser.add_argument("--humidity", action="store_true", help="request humidity from device") +parser.add_argument("--energy", action="store_true", help="request energy consumption from device") +parser.add_argument("--check", action="store_true", help="check current power state") +parser.add_argument("--checknl", action="store_true", help="check current nightlight state") +parser.add_argument("--turnon", action="store_true", help="turn on device") +parser.add_argument("--turnoff", action="store_true", help="turn off device") +parser.add_argument("--turnnlon", action="store_true", help="turn on nightlight on the device") +parser.add_argument("--turnnloff", action="store_true", help="turn off nightlight on the device") +parser.add_argument("--switch", action="store_true", help="switch state from on to off and off to on") +parser.add_argument("--send", action="store_true", help="send command") +parser.add_argument("--sensors", action="store_true", help="check all sensors") +parser.add_argument("--learn", action="store_true", help="learn command") +parser.add_argument("--rflearn", action="store_true", help="rf scan learning") +parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") +parser.add_argument("--learnfile", help="save learned command to a specified file") +parser.add_argument("--durations", action="store_true", + help="use durations in micro seconds instead of the Broadlink format") +parser.add_argument("--convert", action="store_true", help="convert input data to durations") +parser.add_argument("--joinwifi", nargs=2, help="Args are SSID PASSPHRASE to configure Broadlink device with") +parser.add_argument("data", nargs='*', help="Data to send or convert") +args = parser.parse_args() + +if args.device: + values = args.device.split() + devtype = int(values[0], 0) + host = values[1] + mac = bytearray.fromhex(values[2]) +elif args.mac: + devtype = args.type + host = args.host + mac = bytearray.fromhex(args.mac) + +if args.host or args.device: + dev = broadlink.gendevice(devtype, (host, DEFAULT_PORT), mac) + dev.auth() + +if args.joinwifi: + broadlink.setup(args.joinwifi[0], args.joinwifi[1], 4) + +if args.convert: + data = bytearray.fromhex(''.join(args.data)) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) +if args.temperature: + print(dev.check_temperature()) +if args.humidity: + print(dev.check_humidity()) +if args.energy: + print(dev.get_energy()) +if args.sensors: + data = dev.check_sensors() + for key in data: + print("{} {}".format(key, data[key])) +if args.send: + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) + dev.send_data(data) +if args.learn or (args.learnfile and not args.rflearn): + dev.enter_learning() + print("Learning...") + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + try: + data = dev.check_data() + except (ReadError, StorageError): + continue + else: + break + else: + print("No data received...") + exit(1) + + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: + print("Saving to {}".format(args.learnfile)) + with open(args.learnfile, "w") as text_file: + text_file.write(pulse_fmt if args.durations else raw_fmt) +if args.check: + if dev.check_power(): + print('* ON *') + else: + print('* OFF *') +if args.checknl: + if dev.check_nightlight(): + print('* ON *') + else: + print('* OFF *') +if args.turnon: + dev.set_power(True) + if dev.check_power(): + print('== Turned * ON * ==') + else: + print('!! Still OFF !!') +if args.turnoff: + dev.set_power(False) + if dev.check_power(): + print('!! Still ON !!') + else: + print('== Turned * OFF * ==') +if args.turnnlon: + dev.set_nightlight(True) + if dev.check_nightlight(): + print('== Turned * ON * ==') + else: + print('!! Still OFF !!') +if args.turnnloff: + dev.set_nightlight(False) + if dev.check_nightlight(): + print('!! Still ON !!') + else: + print('== Turned * OFF * ==') +if args.switch: + if dev.check_power(): + dev.set_power(False) + print('* Switch to OFF *') + else: + dev.set_power(True) + print('* Switch to ON *') +if args.rflearn: + if args.frequency: + frequency = args.frequency + print("Press the button you want to learn, a short press...") + else: + dev.sweep_frequency() + print("Detecting radiofrequency, press and hold the button to learn...") + + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + locked, frequency = dev.check_frequency() + if locked: + break + else: + print("Radiofrequency not found") + dev.cancel_sweep_frequency() + exit(1) + + print("Radiofrequency detected: {}MHz".format(frequency)) + print("You can now let go of the button") + + input("Press enter to continue...") + + print("Press the button again, now a short press.") + + dev.find_rf_packet(frequency) + + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + try: + data = dev.check_data() + except (ReadError, StorageError): + continue + else: + break + else: + print("No data received...") + exit(1) + + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: + print("Saving to {}".format(args.learnfile)) + with open(args.learnfile, "w") as text_file: + text_file.write(pulse_fmt if args.durations else raw_fmt) diff --git a/cli/broadlink_discovery b/cli/broadlink_discovery new file mode 100755 index 00000000..477e1bd7 --- /dev/null +++ b/cli/broadlink_discovery @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +import argparse + +import broadlink +from broadlink.const import DEFAULT_BCAST_ADDR, DEFAULT_TIMEOUT +from broadlink.exceptions import StorageError + +parser = argparse.ArgumentParser(fromfile_prefix_chars='@') +parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="timeout to wait for receiving discovery responses") +parser.add_argument("--ip", default=None, help="ip address to use in the discovery") +parser.add_argument("--dst-ip", default=DEFAULT_BCAST_ADDR, help="destination ip address to use in the discovery") +args = parser.parse_args() + +print("Discovering...") +devices = broadlink.discover(timeout=args.timeout, local_ip_address=args.ip, discover_ip_address=args.dst_ip) +for device in devices: + if device.auth(): + print("###########################################") + print(device.type) + print("# broadlink_cli --type {} --host {} --mac {}".format(hex(device.devtype), device.host[0], + ''.join(format(x, '02x') for x in device.mac))) + print("Device file data (to be used with --device @filename in broadlink_cli) : ") + print("{} {} {}".format(hex(device.devtype), device.host[0], ''.join(format(x, '02x') for x in device.mac))) + try: + print("temperature = {}".format(device.check_temperature())) + except (AttributeError, StorageError): + pass + print("") + else: + print("Error authenticating with device : {}".format(device.host)) diff --git a/protocol.md b/protocol.md index 1888a19d..e2825640 100644 --- a/protocol.md +++ b/protocol.md @@ -6,6 +6,30 @@ Encryption Packets include AES-based encryption in CBC mode. The initial key is 0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02. The IV is 0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58. +Checksum +-------- + +Construct the packet and set checksum bytes to zero. Add each byte to the starting value of 0xbeaf, wrapping after 0xffff. + +New device setup +---------------- + +To setup a new Broadlink device while in AP Mode a 136 byte packet needs to be sent to the device as follows: + +| Offset | Contents | +|---------|----------| +|0x00-0x19|00| +|0x20-0x21|Checksum as a little-endian 16 bit integer| +|0x26|14 (Always 14)| +|0x44-0x63|SSID Name (zero padding is appended)| +|0x64-0x83|Password (zero padding is appended)| +|0x84|Character length of SSID| +|0x85|Character length of password| +|0x86|Wireless security mode (00 - none, 01 = WEP, 02 = WPA1, 03 = WPA2, 04 = WPA1/2)| +|0x87-88|00| + +Send this packet as a UDP broadcast to 255.255.255.255 on port 80. + Network discovery ----------------- @@ -16,13 +40,14 @@ To discover Broadlink devices on the local network, send a 48 byte packet with t |0x00-0x07|00| |0x08-0x0b|Current offset from GMT as a little-endian 32 bit integer| |0x0c-0x0d|Current year as a little-endian 16 bit integer| -|0x0e|Current number of minutes past the hour| -|0x0f|Current number of hours past midnight| -|0x10|Current number of years past the century| -|0x11|Current day of the week (Monday = 0, Tuesday = 1, etc)| +|0x0e|Current number of seconds past the minute| +|0x0f|Current number of minutes past the hour| +|0x10|Current number of hours past midnight| +|0x11|Current day of the week (Monday = 1, Tuesday = 2, etc)| |0x12|Current day in month| |0x13|Current month| -|0x19-0x1b|Local IP address| +|0x14-0x17|00| +|0x18-0x1b|Local IP address| |0x1c-0x1d|Source port as a little-endian 16 bit integer| |0x1e-0x1f|00| |0x20-0x21|Checksum as a little-endian 16 bit integer| @@ -30,12 +55,40 @@ To discover Broadlink devices on the local network, send a 48 byte packet with t |0x26|06| |0x27-0x2f|00| -Send this packet as a UDP broadcast to 255.255.255.255 on port 80. Bytes 0x3a-0x40 of any unicast response will contain the MAC address of the target device. +Send this packet as a UDP broadcast to 255.255.255.255 on port 80. -Checksum --------- +Response (any unicast response): + +| Offset | Contents | +|---------|----------| +|0x34-0x35|Device type as a little-endian 16 bit integer (see device type mapping)| +|0x3a-0x3f|MAC address of the target device| + +Device type mapping: + +| Device type in response packet | Device type | Treat as | +|---------|----------|----------| +|0|SP1|SP1| +|0x2711|SP2|SP2| +|0x2719 or 0x7919 or 0x271a or 0x791a|Honeywell SP2|SP2| +|0x2720|SPMini|SP2| +|0x753e|SP3|SP2| +|0x2728|SPMini2|SP2 +|0x2733 or 0x273e|OEM branded SPMini|SP2| +|>= 0x7530 and <= 0x7918|OEM branded SPMini2|SP2| +|0x2736|SPMiniPlus|SP2| +|0x2712|RM2|RM| +|0x2737|RM Mini / RM3 Mini Blackbean|RM| +|0x273d|RM Pro Phicomm|RM| +|0x2783|RM2 Home Plus|RM| +|0x277c|RM2 Home Plus GDT|RM| +|0x272a|RM2 Pro Plus|RM| +|0x2787|RM2 Pro Plus2|RM| +|0x278b|RM2 Pro Plus BL|RM| +|0x278f|RM Mini Shate|RM| +|0x2714|A1|A1| +|0x4EB5|MP1|MP1| -Construct the packet and set checksum bytes to zero. Add each byte to the starting value of 0xbeaf, wrapping after 0xffff. Command packet format --------------------- @@ -55,20 +108,19 @@ The command packet header is 56 bytes long with the following format: |0x08-0x1f|00| |0x20-0x21|Checksum of full packet as a little-endian 16 bit integer| |0x22-0x23|00| -|0x24|0x2a| -|0x25|0x27| +|0x24-0x25|Device type as a little-endian 16 bit integer| |0x26-0x27|Command code as a little-endian 16 bit integer| |0x28-0x29|Packet count as a little-endian 16 bit integer| |0x2a-0x2f|Local MAC address| |0x30-0x33|Local device ID (obtained during authentication, 00 before authentication)| -|0x34-0x35|Checksum of packet header as a little-endian 16 bit integer +|0x34-0x35|Checksum of unencrypted payload as a little-endian 16 bit integer |0x36-0x37|00| -The payload is appended immediately after this. The checksum at 0x34 is calculated *before* the payload is appended, and covers only the header. The checksum at 0x20 is calculated *after* the payload is appended, and covers the entire packet (including the checksum at 0x34). Therefore: +The payload is appended immediately after this. The checksum at 0x20 is calculated *after* the payload is appended, and covers the entire packet (including the checksum at 0x34). Therefore: 1. Generate packet header with checksum values set to 0 -2. Set the checksum initialisation value to 0xbeaf and calculate the checksum of the packet header. Set 0x34-0x35 to this value. -3. Append the payload +2. Set the checksum initialisation value to 0xbeaf and calculate the checksum of the unencrypted payload. Set 0x34-0x35 to this value. +3. Encrypt and append the payload 4. Set the checksum initialisation value to 0xbeaf and calculate the checksum of the entire packet. Set 0x20-0x21 to this value. Authorisation @@ -125,11 +177,28 @@ Send the following payload with a command byte of 0x006a |------|--------| |0x00|0x02| |0x01-0x03|0x00| -|0x04-end|data| +|0x04|0x26 = IR, 0xb2 for RF 433Mhz, 0xd7 for RF 315Mhz| +|0x05|repeat count, (0 = no repeat, 1 send twice, .....)| +|0x06-0x07|Length of the following data in little endian| +|0x08 ....|Pulse lengths in 2^-15 s units (µs * 269 / 8192 works very well)| +|....|For IR codes, the pulse lengths should be paired as ON, OFF| + +Each value is represented by one byte. If the length exceeds one byte +then it is stored big endian with a leading 0. + +Captures of IR codes from the device will always end with a constant OFF value of `0x00 0x0d 0x05` but the trailing silence can be anything on transmit. The likely reason for this value is a capped timeout value on detection. The value is about 102 milliseconds. + +Example: The header for my Optoma projector is 8920 4450 +8920 * 269 / 8192 = 0x124 +4450 * 269 / 8192 = 0x92 + +So the data starts with `0x00 0x1 0x24 0x92 ....` Todo ---- * Support for other devices using the Broadlink protocol (various smart home devices) -* Figure out what the format of the data packets actually is. \ No newline at end of file +* Figure out what the format of the data packets actually is. +* Deal with the response after AP Mode WiFi network setup. + diff --git a/requirements.txt b/requirements.txt index f7d3e237..2c6c996c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -pycrypto==2.6.1 +cryptography==3.2 diff --git a/setup.py b/setup.py index a09222a0..0426f148 100644 --- a/setup.py +++ b/setup.py @@ -1,31 +1,28 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import re + from setuptools import setup, find_packages -import sys -import warnings -dynamic_requires = [] -version = 0.3 +version = '0.19.0' setup( - name='broadlink', - version=0.3, - author='Matthew Garrett', - author_email='mjg59@srcf.ucam.org', - url='http://github.com/mjg59/python-broadlink', + name="broadlink", + version=version, + author="Matthew Garrett", + author_email="mjg59@srcf.ucam.org", + url="http://github.com/mjg59/python-broadlink", packages=find_packages(), scripts=[], - install_requires=['pycrypto==2.6.1'], - description='Python API for controlling Broadlink IR controllers', + install_requires=["cryptography>=3.2"], + description="Python API for controlling Broadlink devices", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Programming Language :: Python', + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", ], include_package_data=True, zip_safe=False,