diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..de98d3df --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,55 @@ + +## Context + + + +## Proposed change + + + +## Type of change + + +- [ ] Dependency upgrade +- [ ] Bugfix (non-breaking change which fixes an issue) +- [ ] New device +- [ ] New product id (the device is already supported with a different id) +- [ ] New feature (which adds functionality to an existing device) +- [ ] Breaking change (fix/feature causing existing functionality to break) +- [ ] Code quality improvements to existing code or addition of tests +- [ ] Documentation + +## Additional information + + +- This PR fixes issue: fixes # +- This PR is related to: +- Link to documentation pull request: + +## Checklist + + +- [ ] The code change is tested and works locally. +- [ ] The code has been formatted using Black. +- [ ] The code follows the [Zen of Python](https://www.python.org/dev/peps/pep-0020/). +- [ ] I am creating the Pull Request against the correct branch. +- [ ] Documentation added/updated. diff --git a/.github/workflows/flake8.yaml b/.github/workflows/flake8.yaml index 2fd67945..aa09a19c 100644 --- a/.github/workflows/flake8.yaml +++ b/.github/workflows/flake8.yaml @@ -2,26 +2,27 @@ name: Python flake8 on: push: - branches: [ main, master, dev, development ] + branches: [ master, dev ] pull_request: - branches: [ main, master, dev, development ] + branches: [ master, dev ] jobs: test: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: matrix: python-version: [3.6, 3.7, 3.8, 3.9] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 wemake-python-styleguide + pip install wheel + pip install flake8 flake8-quotes if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: Lint with flake8 run: | diff --git a/README.md b/README.md index 8d00fc93..81c6de5b 100644 --- a/README.md +++ b/README.md @@ -1,131 +1,247 @@ -Python control for Broadlink RM2, RM3 and RM4 series controllers -=============================================== +# python-broadlink -A simple Python API for controlling IR/RF controllers from [Broadlink](http://www.ibroadlink.com/rm/). At present, the following devices are currently supported: +A Python module and CLI for controlling Broadlink devices locally. The following devices are supported: -* RM Pro (referred to as RM2 in the codebase) -* A1 sensor platform devices are supported -* RM3 mini IR blaster -* RM4 and RM4C mini blasters -- SP2/SP3/SP4 smart plugs +- **Universal remotes**: RM home, RM mini 3, RM plus, RM pro, RM pro+, RM4 mini, RM4 pro, RM4C mini, RM4S, RM4 TV mate +- **Smart plugs**: SP mini, SP mini 3, SP mini+, SP1, SP2, SP2-BR, SP2-CL, SP2-IN, SP2-UK, SP3, SP3-EU, SP3S-EU, SP3S-US, SP4L-AU, SP4L-EU, SP4L-UK, SP4M, SP4M-US, Ankuoo NEO, Ankuoo NEO PRO, Efergy Ego, BG AHC/U-01 +- **Switches**: MCB1, SC1, SCB1E, SCB2 +- **Outlets**: BG 800, BG 900 +- **Power strips**: MP1-1K3S2U, MP1-1K4S, MP2 +- **Environment sensors**: A1 +- **Alarm kits**: S1C, S2KIT +- **Light bulbs**: LB1, LB26 R1, LB27 R1, SB800TD +- **Curtain motors**: Dooya DT360E-45/20 +- **Thermostats**: Hysen HY02B05H +- **Hubs**: S3 -There is currently no support for the cloud API. +## Installation -Example use ------------ +Use pip3 to install the latest version of this module. -Setup a new device on your local wireless network: - -1. Put the device into AP Mode - 1. Long press the reset button until the blue LED is blinking quickly. - 2. Long press again until blue LED is blinking slowly. - 3. Manually connect to the WiFi SSID named BroadlinkProv. -2. Run setup() and provide your ssid, network password (if secured), and set the security mode - 1. Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) ``` -import broadlink - -broadlink.setup('myssid', 'mynetworkpass', 3) +pip3 install broadlink ``` -Discover available devices on the local network: +## Basic functions + +First, open Python 3 and import this module. + ``` +python3 +``` +```python3 import broadlink - -devices = broadlink.discover(timeout=5) ``` -You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. +Now let's try some functions... +### Setup -Using your machine's IP address with `local_ip_address` +In order to control the device, you need to connect it to your local network. If you have already configured the device with the Broadlink app, this step is not necessary. + +1. Put the device into AP Mode. + - Long press the reset button until the blue LED is blinking quickly. + - Long press again until blue LED is blinking slowly. + - Manually connect to the WiFi SSID named BroadlinkProv. +2. Connect the device to your local network with the setup function. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3) ``` -import broadlink -devices = broadlink.discover(timeout=5, local_ip_address=192.168.0.100) +Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) + +#### Advanced options + +You may need to specify a broadcast address if setup is not working. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255') ``` -Using your subnet's broadcast address with `discover_ip_address` +### Discovery + +Use this function to discover devices: +```python3 +devices = broadlink.discover() ``` -import broadlink -devices = broadlink.discover(timeout=5, discover_ip_address=192.168.0.255) +#### Advanced options +You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. + +Using the IP address of your local machine: +```python3 +devices = broadlink.discover(local_ip_address='192.168.0.100') ``` -Obtain the authentication key required for further communication: +Using the broadcast address of your subnet: +```python3 +devices = broadlink.discover(discover_ip_address='192.168.0.255') ``` -devices[0].auth() + +If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery: +```python3 +device = broadlink.hello('192.168.0.16') ``` -Enter learning mode: +If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly: +```python3 +for device in broadlink.xdiscover(): + print(device) # Example action. Do whatever you want here. ``` -devices[0].enter_learning() + +### Authentication +After discovering the device, call the `auth()` method to obtain the authentication key required for further communication: +```python3 +device.auth() ``` -Sweep RF frequencies: +The next steps depend on the type of device you want to control. + +## Universal remotes + +### Learning IR codes + +Learning IR codes takes place in three steps. + +1. Enter learning mode: +```python3 +device.enter_learning() ``` -devices[0].sweep_frequency() +2. When the LED blinks, point the remote at the Broadlink device and press the button you want to learn. +3. Get the IR packet. +```python3 +packet = device.check_data() ``` -Cancel sweep RF frequencies: +### Learning RF codes + +Learning RF codes takes place in six steps. + +1. Sweep the frequency: +```python3 +device.sweep_frequency() ``` -devices[0].cancel_sweep_frequency() +2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn. +3. Check if the frequency was successfully identified: +```python3 +ok = device.check_frequency() +if ok: + print('Frequency found!') ``` -Check whether a frequency has been found: +4. Enter learning mode: +```python3 +device.find_rf_packet() ``` -found = devices[0].check_frequency() +5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. +6. Get the RF packet: +```python3 +packet = device.check_data() ``` -(This will return True if the RM has locked onto a frequency, False otherwise) -Attempt to learn an RF packet: -``` -found = devices[0].find_rf_packet() -``` -(This will return True if a packet has been found, False otherwise) +#### Notes -Obtain an IR or RF packet while in learning mode: -``` -ir_packet = devices[0].check_data() -``` -(This will return None if the device does not have a packet to return) +Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`. -Send an IR or RF packet: -``` -devices[0].send_data(ir_packet) +### Canceling learning + +You can exit the learning mode in the middle of the process by calling this method: +```python3 +device.cancel_sweep_frequency() ``` -Obtain temperature data from an RM2: +### Sending IR/RF packets +```python3 +device.send_data(packet) ``` -devices[0].check_temperature() + +### Fetching sensor data +```python3 +data = device.check_sensors() ``` -Obtain sensor data from an A1: +## Switches + +### Setting power state +```python3 +device.set_power(True) +device.set_power(False) ``` -data = devices[0].check_sensors() + +### Checking power state +```python3 +state = device.check_power() ``` -Set power state on a SmartPlug SP2/SP3/SP4: +### Checking energy consumption +```python3 +state = device.get_energy() ``` -devices[0].set_power(True) + +## Power strips + +### Setting power state +```python3 +device.set_power(1, True) # Example socket. It could be 2 or 3. +device.set_power(1, False) ``` -Check power state on a SmartPlug: +### Checking power state +```python3 +state = device.check_power() ``` -state = devices[0].check_power() + +## Light bulbs + +### Fetching data +```python3 +state = device.get_state() ``` -Check energy consumption on a SmartPlug: +### Setting state attributes +```python3 +devices[0].set_state(pwr=0) +devices[0].set_state(pwr=1) +devices[0].set_state(brightness=75) +devices[0].set_state(bulb_colormode=0) +devices[0].set_state(blue=255) +devices[0].set_state(red=0) +devices[0].set_state(green=128) +devices[0].set_state(bulb_colormode=1) ``` -state = devices[0].get_energy() + +## Environment sensors + +### Fetching sensor data +```python3 +data = device.check_sensors() ``` -Set power state for S1 on a SmartPowerStrip MP1: +## Hubs + +### Discovering subdevices +```python3 +device.get_subdevices() ``` -devices[0].set_power(1, True) + +### Fetching data +Use the DID obtained from get_subdevices() for the input parameter to query specific sub-device. + +```python3 +device.get_state(did="00000000000000000000a043b0d06963") ``` -Check power state on a SmartPowerStrip: +### Setting state attributes +The parameters depend on the type of subdevice that is being controlled. In this example, we are controlling LC-1 switches: + +#### Turn on +```python3 +device.set_state(did="00000000000000000000a043b0d0783a", pwr=1) +device.set_state(did="00000000000000000000a043b0d0783a", pwr1=1) +device.set_state(did="00000000000000000000a043b0d0783a", pwr2=1) ``` -state = devices[0].check_power() +#### Turn off +```python3 +device.set_state(did="00000000000000000000a043b0d0783a", pwr=0) +device.set_state(did="00000000000000000000a043b0d0783a", pwr1=0) +device.set_state(did="00000000000000000000a043b0d0783a", pwr2=0) ``` diff --git a/TROUBLESHOOTING.md b/TROUBLESHOOTING.md new file mode 100644 index 00000000..a20765b2 --- /dev/null +++ b/TROUBLESHOOTING.md @@ -0,0 +1,9 @@ +# Troubleshooting + +## Firmware issues + +### AP setup fails with non-alphanumeric passwords + +Some devices ship with firmware that cannot connect to WLANs with non-alphanumeric passwords. To fix this, update the firmware to the latest version. You can also change the password to one with just letters and numbers or create a separate guest network with a simpler password. + +_First seen on Broadlink RM4 pro 0x6026. Already fixed in firmware v52079._ diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 11172514..d3135501 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,113 +1,212 @@ #!/usr/bin/env python3 """The python-broadlink library.""" import socket -from typing import Generator, List, Union, Tuple +from typing import Generator, List, Optional, Tuple, Union +from . import exceptions as e +from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .alarm import S1C -from .climate import hysen -from .cover import dooya -from .device import device, scan -from .exceptions import exception -from .light import lb1 -from .remote import rm, rm4 -from .sensor import a1 -from .switch import bg1, mp1, sp1, sp2, sp4, sp4b - +from .climate import hvac, hysen +from .cover import dooya, dooya2, wser +from .device import Device, ping, scan +from .hub import s3 +from .light import lb1, lb2 +from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro +from .sensor import a1, a2 +from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b SUPPORTED_TYPES = { - 0x0000: (sp1, "SP1", "Broadlink"), - 0x2711: (sp2, "SP2", "Broadlink"), - 0x2716: (sp2, "NEO PRO", "Ankuoo"), - 0x2717: (sp2, "NEO", "Ankuoo"), - 0x2719: (sp2, "SP2-compatible", "Honeywell"), - 0x271A: (sp2, "SP2-compatible", "Honeywell"), - 0x271D: (sp2, "Ego", "Efergy"), - 0x2720: (sp2, "SP mini", "Broadlink"), - 0x2728: (sp2, "SP2-compatible", "URANT"), - 0x2733: (sp2, "SP3", "Broadlink"), - 0x2736: (sp2, "SP mini+", "Broadlink"), - 0x273E: (sp2, "SP mini", "Broadlink"), - 0x7530: (sp2, "SP2", "Broadlink (OEM)"), - 0x7539: (sp2, "SP2-IL", "Broadlink (OEM)"), - 0x753E: (sp2, "SP mini 3", "Broadlink"), - 0x7540: (sp2, "MP2", "Broadlink"), - 0x7544: (sp2, "SP2-CL", "Broadlink"), - 0x7546: (sp2, "SP2-UK/BR/IN", "Broadlink (OEM)"), - 0x7547: (sp2, "SC1", "Broadlink"), - 0x7918: (sp2, "SP2", "Broadlink (OEM)"), - 0x7919: (sp2, "SP2-compatible", "Honeywell"), - 0x791A: (sp2, "SP2-compatible", "Honeywell"), - 0x7D00: (sp2, "SP3-EU", "Broadlink (OEM)"), - 0x7D0D: (sp2, "SP mini 3", "Broadlink (OEM)"), - 0x9479: (sp2, "SP3S-US", "Broadlink"), - 0x947A: (sp2, "SP3S-EU", "Broadlink"), - 0x756C: (sp4, "SP4M", "Broadlink"), - 0x756F: (sp4, "MCB1", "Broadlink"), - 0x7579: (sp4, "SP4L-EU", "Broadlink"), - 0x7583: (sp4, "SP mini 3", "Broadlink"), - 0x7D11: (sp4, "SP mini 3", "Broadlink"), - 0xA56A: (sp4, "MCB1", "Broadlink"), - 0xA589: (sp4, "SP4L-UK", "Broadlink"), - 0x5115: (sp4b, "SCB1E", "Broadlink"), - 0x51E2: (sp4b, "AHC/U-01", "BG Electrical"), - 0x6111: (sp4b, "MCB1", "Broadlink"), - 0x6113: (sp4b, "SCB1E", "Broadlink"), - 0x618B: (sp4b, "SP4L-EU", "Broadlink"), - 0x6489: (sp4b, "SP4L-AU", "Broadlink"), - 0x648B: (sp4b, "SP4M-US", "Broadlink"), - 0x2712: (rm, "RM pro/pro+", "Broadlink"), - 0x272A: (rm, "RM pro", "Broadlink"), - 0x2737: (rm, "RM mini 3", "Broadlink"), - 0x273D: (rm, "RM pro", "Broadlink"), - 0x277C: (rm, "RM home", "Broadlink"), - 0x2783: (rm, "RM home", "Broadlink"), - 0x2787: (rm, "RM pro", "Broadlink"), - 0x278B: (rm, "RM plus", "Broadlink"), - 0x278F: (rm, "RM mini", "Broadlink"), - 0x2797: (rm, "RM pro+", "Broadlink"), - 0x279D: (rm, "RM pro+", "Broadlink"), - 0x27A1: (rm, "RM plus", "Broadlink"), - 0x27A6: (rm, "RM plus", "Broadlink"), - 0x27A9: (rm, "RM pro+", "Broadlink"), - 0x27C2: (rm, "RM mini 3", "Broadlink"), - 0x27C3: (rm, "RM pro+", "Broadlink"), - 0x27C7: (rm, "RM mini 3", "Broadlink"), - 0x27CC: (rm, "RM mini 3", "Broadlink"), - 0x27CD: (rm, "RM mini 3", "Broadlink"), - 0x27D0: (rm, "RM mini 3", "Broadlink"), - 0x27D1: (rm, "RM mini 3", "Broadlink"), - 0x27D3: (rm, "RM mini 3", "Broadlink"), - 0x27DE: (rm, "RM mini 3", "Broadlink"), - 0x51DA: (rm4, "RM4 mini", "Broadlink"), - 0x5F36: (rm4, "RM mini 3", "Broadlink"), - 0x6026: (rm4, "RM4 pro", "Broadlink"), - 0x6070: (rm4, "RM4C mini", "Broadlink"), - 0x610E: (rm4, "RM4 mini", "Broadlink"), - 0x610F: (rm4, "RM4C mini", "Broadlink"), - 0x61A2: (rm4, "RM4 pro", "Broadlink"), - 0x62BC: (rm4, "RM4 mini", "Broadlink"), - 0x62BE: (rm4, "RM4C mini", "Broadlink"), - 0x6364: (rm4, "RM4S", "Broadlink"), - 0x648D: (rm4, "RM4 mini", "Broadlink"), - 0x649B: (rm4, "RM4 pro", "Broadlink"), - 0x6508: (rm4, "RM mini 3", "Broadlink"), - 0x6539: (rm4, "RM4C mini", "Broadlink"), - 0x653A: (rm4, "RM4 mini", "Broadlink"), - 0x653C: (rm4, "RM4 pro", "Broadlink"), - 0x2714: (a1, "e-Sensor", "Broadlink"), - 0x4EB5: (mp1, "MP1-1K4S", "Broadlink"), - 0x4EF7: (mp1, "MP1-1K4S", "Broadlink (OEM)"), - 0x4F1B: (mp1, "MP1-1K3S2U", "Broadlink (OEM)"), - 0x4F65: (mp1, "MP1-1K3S2U", "Broadlink"), - 0x5043: (lb1, "SB800TD", "Broadlink (OEM)"), - 0x504E: (lb1, "LB1", "Broadlink"), - 0x60C7: (lb1, "LB1", "Broadlink"), - 0x60C8: (lb1, "LB1", "Broadlink"), - 0x6112: (lb1, "LB1", "Broadlink"), - 0x2722: (S1C, "S2KIT", "Broadlink"), - 0x4EAD: (hysen, "HY02B05H", "Hysen"), - 0x4E4D: (dooya, "DT360E-45/20", "Dooya"), - 0x51E3: (bg1, "BG800/BG900", "BG Electrical"), + sp1: { + 0x0000: ("SP1", "Broadlink"), + }, + sp2: { + 0x2717: ("NEO", "Ankuoo"), + 0x2719: ("SP2-compatible", "Honeywell"), + 0x271A: ("SP2-compatible", "Honeywell"), + 0x2720: ("SP mini", "Broadlink"), + 0x2728: ("SP2-compatible", "URANT"), + 0x273E: ("SP mini", "Broadlink"), + 0x7530: ("SP2", "Broadlink (OEM)"), + 0x7539: ("SP2-IL", "Broadlink (OEM)"), + 0x753E: ("SP mini 3", "Broadlink"), + 0x7540: ("MP2", "Broadlink"), + 0x7544: ("SP2-CL", "Broadlink"), + 0x7546: ("SP2-UK/BR/IN", "Broadlink (OEM)"), + 0x7547: ("SC1", "Broadlink"), + 0x7549: ("SP mini 3", "Broadlink (OEM)"), + 0x7918: ("SP2", "Broadlink (OEM)"), + 0x7919: ("SP2-compatible", "Honeywell"), + 0x791A: ("SP2-compatible", "Honeywell"), + 0x7D0D: ("SP mini 3", "Broadlink (OEM)"), + }, + sp2s: { + 0x2711: ("SP2", "Broadlink"), + 0x2716: ("NEO PRO", "Ankuoo"), + 0x271D: ("Ego", "Efergy"), + 0x2736: ("SP mini+", "Broadlink"), + }, + sp3: { + 0x2733: ("SP3", "Broadlink"), + 0x7D00: ("SP3-EU", "Broadlink (OEM)"), + }, + sp3s: { + 0x9479: ("SP3S-US", "Broadlink"), + 0x947A: ("SP3S-EU", "Broadlink"), + }, + sp4: { + 0x7568: ("SP4L-CN", "Broadlink"), + 0x756B: ("SP4M-JP", "Broadlink"), + 0x756C: ("SP4M", "Broadlink"), + 0x756F: ("MCB1", "Broadlink"), + 0x7579: ("SP4L-EU", "Broadlink"), + 0x757B: ("SP4L-AU", "Broadlink"), + 0x7583: ("SP mini 3", "Broadlink"), + 0x7587: ("SP4L-UK", "Broadlink"), + 0x7D11: ("SP mini 3", "Broadlink"), + 0xA4F9: ("WS4", "Broadlink (OEM)"), + 0xA569: ("SP4L-UK", "Broadlink"), + 0xA56A: ("MCB1", "Broadlink"), + 0xA56B: ("SCB1E", "Broadlink"), + 0xA56C: ("SP4L-EU", "Broadlink"), + 0xA576: ("SP4L-AU", "Broadlink"), + 0xA589: ("SP4L-UK", "Broadlink"), + 0xA5D3: ("SP4L-EU", "Broadlink"), + 0xA6F4: ("SP4D-US", "Broadlink"), + }, + sp4b: { + 0x5115: ("SCB1E", "Broadlink"), + 0x51E2: ("AHC/U-01", "BG Electrical"), + 0x6111: ("MCB1", "Broadlink"), + 0x6113: ("SCB1E", "Broadlink"), + 0x618B: ("SP4L-EU", "Broadlink"), + 0x6489: ("SP4L-AU", "Broadlink"), + 0x648B: ("SP4M-US", "Broadlink"), + 0x648C: ("SP4L-US", "Broadlink"), + 0x6494: ("SCB2", "Broadlink"), + }, + rmmini: { + 0x2737: ("RM mini 3", "Broadlink"), + 0x278F: ("RM mini", "Broadlink"), + 0x27B7: ("RM mini 3", "Broadlink"), + 0x27C2: ("RM mini 3", "Broadlink"), + 0x27C7: ("RM mini 3", "Broadlink"), + 0x27CC: ("RM mini 3", "Broadlink"), + 0x27CD: ("RM mini 3", "Broadlink"), + 0x27D0: ("RM mini 3", "Broadlink"), + 0x27D1: ("RM mini 3", "Broadlink"), + 0x27D3: ("RM mini 3", "Broadlink"), + 0x27DC: ("RM mini 3", "Broadlink"), + 0x27DE: ("RM mini 3", "Broadlink"), + }, + rmpro: { + 0x2712: ("RM pro/pro+", "Broadlink"), + 0x272A: ("RM pro", "Broadlink"), + 0x273D: ("RM pro", "Broadlink"), + 0x277C: ("RM home", "Broadlink"), + 0x2783: ("RM home", "Broadlink"), + 0x2787: ("RM pro", "Broadlink"), + 0x278B: ("RM plus", "Broadlink"), + 0x2797: ("RM pro+", "Broadlink"), + 0x279D: ("RM pro+", "Broadlink"), + 0x27A1: ("RM plus", "Broadlink"), + 0x27A6: ("RM plus", "Broadlink"), + 0x27A9: ("RM pro+", "Broadlink"), + 0x27C3: ("RM pro+", "Broadlink"), + }, + rmminib: { + 0x5F36: ("RM mini 3", "Broadlink"), + 0x6507: ("RM mini 3", "Broadlink"), + 0x6508: ("RM mini 3", "Broadlink"), + }, + rm4mini: { + 0x51DA: ("RM4 mini", "Broadlink"), + 0x5209: ("RM4 TV mate", "Broadlink"), + 0x520C: ("RM4 mini", "Broadlink"), + 0x520D: ("RM4C mini", "Broadlink"), + 0x5211: ("RM4C mate", "Broadlink"), + 0x5212: ("RM4 TV mate", "Broadlink"), + 0x5216: ("RM4 mini", "Broadlink"), + 0x521C: ("RM4 mini", "Broadlink"), + 0x6070: ("RM4C mini", "Broadlink"), + 0x610E: ("RM4 mini", "Broadlink"), + 0x610F: ("RM4C mini", "Broadlink"), + 0x62BC: ("RM4 mini", "Broadlink"), + 0x62BE: ("RM4C mini", "Broadlink"), + 0x6364: ("RM4S", "Broadlink"), + 0x648D: ("RM4 mini", "Broadlink"), + 0x6539: ("RM4C mini", "Broadlink"), + 0x653A: ("RM4 mini", "Broadlink"), + }, + rm4pro: { + 0x520B: ("RM4 pro", "Broadlink"), + 0x5213: ("RM4 pro", "Broadlink"), + 0x5218: ("RM4C pro", "Broadlink"), + 0x6026: ("RM4 pro", "Broadlink"), + 0x6184: ("RM4C pro", "Broadlink"), + 0x61A2: ("RM4 pro", "Broadlink"), + 0x649B: ("RM4 pro", "Broadlink"), + 0x653C: ("RM4 pro", "Broadlink"), + }, + a1: { + 0x2714: ("A1", "Broadlink"), + }, + a2: { + 0x4F60: ("A2", "Broadlink"), + }, + mp1: { + 0x4EB5: ("MP1-1K4S", "Broadlink"), + 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), + 0x4F65: ("MP1-1K3S2U", "Broadlink"), + }, + mp1s: { + 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), + }, + lb1: { + 0x5043: ("SB800TD", "Broadlink (OEM)"), + 0x504E: ("LB1", "Broadlink"), + 0x606D: ("SLA22RGB9W81/SLA27RGB9W81", "Luceco"), + 0x606E: ("SB500TD", "Broadlink (OEM)"), + 0x60C7: ("LB1", "Broadlink"), + 0x60C8: ("LB1", "Broadlink"), + 0x6112: ("LB1", "Broadlink"), + 0x644B: ("LB1", "Broadlink"), + 0x644C: ("LB27 R1", "Broadlink"), + 0x644E: ("LB26 R1", "Broadlink"), + 0x6488: ("LB27 C1", "Broadlink"), + }, + lb2: { + 0xA4F4: ("LB27 R1", "Broadlink"), + 0xA5F7: ("LB27 R1", "Broadlink"), + 0xA6EF: ("EFCF60WSMT", "Luceco"), + }, + S1C: { + 0x2722: ("S2KIT", "Broadlink"), + }, + s3: { + 0xA59C: ("S3", "Broadlink"), + 0xA64D: ("S3", "Broadlink"), + }, + hvac: { + 0x4E2A: ("HVAC", "Licensed manufacturer"), + }, + hysen: { + 0x4EAD: ("HY02/HY03", "Hysen"), + }, + dooya: { + 0x4E4D: ("DT360E-45/20", "Dooya"), + }, + dooya2: { + 0x4F6E: ("DT360E-45/20", "Dooya"), + }, + wser: { + 0x4F6C: ("WSER", "Wistar"), + }, + bg1: { + 0x51E3: ("BG800/BG900", "BG Electrical"), + }, + ehc31: { + 0x6480: ("EHC31", "BG Electrical"), + }, } @@ -115,72 +214,93 @@ def gendevice( dev_type: int, host: Tuple[str, int], mac: Union[bytes, str], - name: str = None, - is_locked: bool = None, -) -> device: + name: str = "", + is_locked: bool = False, +) -> Device: """Generate a device.""" - try: - dev_class, model, manufacturer = SUPPORTED_TYPES[dev_type] - - except KeyError: - return device(host, mac, dev_type, name=name, is_locked=is_locked) - - return dev_class( - host, - mac, - dev_type, - name=name, - model=model, - manufacturer=manufacturer, - is_locked=is_locked, - ) + for dev_cls, products in SUPPORTED_TYPES.items(): + try: + model, manufacturer = products[dev_type] + + except KeyError: + continue + + return dev_cls( + host, + mac, + dev_type, + name=name, + model=model, + manufacturer=manufacturer, + is_locked=is_locked, + ) + + return Device(host, mac, dev_type, name=name, is_locked=is_locked) def hello( - host: str, - port: int = 80, - timeout: int = 10, - local_ip_address: str = None, -) -> device: + ip_address: str, + port: int = DEFAULT_PORT, + timeout: int = DEFAULT_TIMEOUT, +) -> Device: """Direct device discovery. Useful if the device is locked. """ try: - return next(xdiscover(timeout, local_ip_address, host, port)) - except StopIteration: - raise exception(-4000) # Network timeout. + return next( + xdiscover( + timeout=timeout, + discover_ip_address=ip_address, + discover_ip_port=port, + ) + ) + except StopIteration as err: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {timeout}s", + ) from err def discover( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = "255.255.255.255", - discover_ip_port: int = 80, -) -> List[device]: + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> List[Device]: """Discover devices connected to the local network.""" - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) return [gendevice(*resp) for resp in responses] def xdiscover( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = "255.255.255.255", - discover_ip_port: int = 80, -) -> Generator[device, None, None]: + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> Generator[Device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. """ - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) for resp in responses: yield gendevice(*resp) # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid: str, password: str, security_mode: int) -> None: +def setup( + ssid: str, + password: str, + security_mode: int, + ip_address: str = DEFAULT_BCAST_ADDR, +) -> None: """Set up a new Broadlink device via AP mode.""" # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) payload = bytearray(0x88) @@ -209,5 +329,5 @@ def setup(ssid: str, password: str, security_mode: int) -> None: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, ("255.255.255.255", 80)) + sock.sendto(payload, (ip_address, DEFAULT_PORT)) sock.close() diff --git a/broadlink/alarm.py b/broadlink/alarm.py index e73b8fad..a9b5e879 100644 --- a/broadlink/alarm.py +++ b/broadlink/alarm.py @@ -1,31 +1,26 @@ """Support for alarm kits.""" -from .device import device -from .exceptions import check_error +from . import exceptions as e +from .device import Device -class S1C(device): +class S1C(Device): """Controls a Broadlink S1C.""" + TYPE = "S1C" + _SENSORS_TYPES = { - 0x31: "Door Sensor", # 49 as hex - 0x91: "Key Fob", # 145 as hex, as serial on fob corpse - 0x21: "Motion Sensor", # 33 as hex + 0x31: "Door Sensor", + 0x91: "Key Fob", + 0x21: "Motion Sensor", } - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "S1C" - def get_sensors_status(self) -> dict: """Return the state of the sensors.""" packet = bytearray(16) - packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors + packet[0] = 0x06 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - if not payload: - return None count = payload[0x4] sensor_data = payload[0x6:] sensors = [ diff --git a/broadlink/climate.py b/broadlink/climate.py old mode 100644 new mode 100755 index 036cc9a8..1a0c6006 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,92 +1,95 @@ """Support for climate control.""" -from typing import List +import enum +import struct +from typing import List, Sequence -from .device import device -from .exceptions import check_error -from .helpers import calculate_crc16 +from . import exceptions as e +from .device import Device +from .helpers import CRC16 -class hysen(device): - """Controls a Hysen HVAC.""" +class hysen(Device): + """Controls a Hysen heating thermostat. - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "Hysen heating controller" + This device is manufactured by Hysen and sold under different + brands, including Floureon, Beca Energy, Beok and Decdeal. - # Send a request - # input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00]) - # Returns decrypted payload - # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails - # The function prepends length (2 bytes) and appends CRC + Supported models: + - HY02B05H + - HY03WE + """ - def send_request(self, input_payload: bytes) -> bytes: + TYPE = "HYS" + + def send_request(self, request: Sequence[int]) -> bytes: """Send a request to the device.""" - crc = calculate_crc16(input_payload) - - # first byte is length, +2 for CRC16 - request_payload = bytearray([len(input_payload) + 2, 0x00]) - request_payload.extend(input_payload) - - # append CRC - request_payload.append(crc & 0xFF) - request_payload.append((crc >> 8) & 0xFF) - - # send to device - response = self.send_packet(0x6A, request_payload) - check_error(response[0x22:0x24]) - response_payload = self.decrypt(response[0x38:]) - - # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) - response_payload_len = response_payload[0] - if response_payload_len + 2 > len(response_payload): - raise ValueError( - "hysen_response_error", "first byte of response is not length" + packet = bytearray() + packet.extend((len(request) + 2).to_bytes(2, "little")) + packet.extend(request) + packet.extend(CRC16.calculate(request).to_bytes(2, "little")) + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len]) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", ) - crc = calculate_crc16(response_payload[2:response_payload_len]) - if (response_payload[response_payload_len] == crc & 0xFF) and ( - response_payload[response_payload_len + 1] == (crc >> 8) & 0xFF - ): - return response_payload[2:response_payload_len] - raise ValueError("hysen_response_error", "CRC check on response failed") - - def get_temp(self) -> int: + + return payload[0x02:p_len] + + def _decode_temp(self, payload, base_index): + base_temp = payload[base_index] / 2.0 + add_offset = (payload[4] >> 3) & 1 # should offset be added? + offset_raw_value = (payload[17] >> 4) & 3 # offset value + offset = (offset_raw_value + 1) / 10 if add_offset else 0.0 + return base_temp + offset + + def get_temp(self) -> float: """Return the room temperature in degrees celsius.""" - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) - return payload[0x05] / 2.0 + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) + return self._decode_temp(payload, 5) - def get_external_temp(self) -> int: + def get_external_temp(self) -> float: """Return the external temperature in degrees celsius.""" - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x08])) - return payload[18] / 2.0 + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) + return self._decode_temp(payload, 18) def get_full_status(self) -> dict: """Return the state of the device. Timer schedule included. """ - payload = self.send_request(bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x16])) + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x16]) data = {} data["remote_lock"] = payload[3] & 1 data["power"] = payload[4] & 1 data["active"] = (payload[4] >> 4) & 1 data["temp_manual"] = (payload[4] >> 6) & 1 - data["room_temp"] = (payload[5] & 255) / 2.0 - data["thermostat_temp"] = (payload[6] & 255) / 2.0 - data["auto_mode"] = payload[7] & 15 - data["loop_mode"] = (payload[7] >> 4) & 15 + data["heating_cooling"] = (payload[4] >> 7) & 1 + data["room_temp"] = self._decode_temp(payload, 5) + data["thermostat_temp"] = payload[6] / 2.0 + data["auto_mode"] = payload[7] & 0x0F + data["loop_mode"] = payload[7] >> 4 data["sensor"] = payload[8] data["osv"] = payload[9] data["dif"] = payload[10] data["svh"] = payload[11] data["svl"] = payload[12] - data["room_temp_adj"] = ((payload[13] << 8) + payload[14]) / 2.0 - if data["room_temp_adj"] > 32767: - data["room_temp_adj"] = 32767 - data["room_temp_adj"] + data["room_temp_adj"] = ( + int.from_bytes(payload[13:15], "big", signed=True) / 10.0 + ) data["fre"] = payload[15] data["poweron"] = payload[16] data["unknown"] = payload[17] - data["external_temp"] = (payload[18] & 255) / 2.0 + data["external_temp"] = self._decode_temp(payload, 18) data["hour"] = payload[19] data["min"] = payload[20] data["sec"] = payload[21] @@ -121,13 +124,15 @@ def get_full_status(self) -> dict: # Manual mode will activate last used temperature. # In typical usage call set_temp to activate manual control and set temp. # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] - # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule - # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule + # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) + # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) # The sensor command is currently experimental - def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: + def set_mode( + self, auto_mode: int, loop_mode: int, sensor: int = 0 + ) -> None: """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode - self.send_request(bytearray([0x01, 0x06, 0x00, 0x02, mode_byte, sensor])) + self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) # Advanced settings # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, @@ -136,10 +141,10 @@ def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C - # Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C + # Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, # 1 for anti-freezing function open. Factory default: 0 - # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 + # Power on memory (POn) poweron = 0 for off, 1 for on. Default: 0 def set_advanced( self, loop_mode: int, @@ -153,7 +158,7 @@ def set_advanced( poweron: int, ) -> None: """Set advanced options.""" - input_payload = bytearray( + self.send_request( [ 0x01, 0x10, @@ -168,13 +173,12 @@ def set_advanced( dif, svh, svl, - (int(adj * 2) >> 8 & 0xFF), - (int(adj * 2) & 0xFF), + int(adj * 10) >> 8 & 0xFF, + int(adj * 10) & 0xFF, fre, poweron, ] ) - self.send_request(input_payload) # For backwards compatibility only. Prefer calling set_mode directly. # Note this function invokes loop_mode=0 and sensor=0. @@ -189,22 +193,36 @@ def switch_to_manual(self) -> None: # Set temperature for manual mode (also activates manual mode if currently in automatic) def set_temp(self, temp: float) -> None: """Set the target temperature.""" - self.send_request(bytearray([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)])) + self.send_request([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)]) # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. - def set_power(self, power: int = 1, remote_lock: int = 0) -> None: + # heating_cooling: heating(0) cooling(1) + def set_power( + self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0 + ) -> None: """Set the power state of the device.""" - self.send_request(bytearray([0x01, 0x06, 0x00, 0x00, remote_lock, power])) + state = (heating_cooling << 7) + power + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state]) # set time on device # n.b. day=1 is Monday, ..., day=7 is Sunday def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" self.send_request( - bytearray( - [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] - ) + [ + 0x01, + 0x10, + 0x00, + 0x08, + 0x00, + 0x02, + 0x04, + hour, + minute, + second, + day + ] ) # Set timer schedule @@ -215,26 +233,242 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None: # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" - # Begin with some magic values ... - input_payload = bytearray([0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18]) + request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] - # Now simply append times/temps # weekday times for i in range(0, 6): - input_payload.append(weekday[i]["start_hour"]) - input_payload.append(weekday[i]["start_minute"]) + request.append(weekday[i]["start_hour"]) + request.append(weekday[i]["start_minute"]) # weekend times for i in range(0, 2): - input_payload.append(weekend[i]["start_hour"]) - input_payload.append(weekend[i]["start_minute"]) + request.append(weekend[i]["start_hour"]) + request.append(weekend[i]["start_minute"]) # weekday temperatures for i in range(0, 6): - input_payload.append(int(weekday[i]["temp"] * 2)) + request.append(int(weekday[i]["temp"] * 2)) # weekend temperatures for i in range(0, 2): - input_payload.append(int(weekend[i]["temp"] * 2)) + request.append(int(weekend[i]["temp"] * 2)) + + self.send_request(request) + + +class hvac(Device): + """Controls a HVAC. + + Supported models: + - Tornado SMART X SQ series + - Aux ASW-H12U3/JIR1DI-US + - Aux ASW-H36U2/LFR1DI-US + """ + + TYPE = "HVAC" + + @enum.unique + class Mode(enum.IntEnum): + """Enumerates modes.""" + + AUTO = 0 + COOL = 1 + DRY = 2 + HEAT = 3 + FAN = 4 + + @enum.unique + class Speed(enum.IntEnum): + """Enumerates fan speed.""" + + HIGH = 1 + MID = 2 + LOW = 3 + AUTO = 5 + + @enum.unique + class Preset(enum.IntEnum): + """Enumerates presets.""" + + NORMAL = 0 + TURBO = 1 + MUTE = 2 + + @enum.unique + class SwHoriz(enum.IntEnum): + """Enumerates horizontal swing.""" + + ON = 0 + OFF = 7 + + @enum.unique + class SwVert(enum.IntEnum): + """Enumerates vertical swing.""" + + ON = 0 + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 7 + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + packet = bytearray(10) + p_len = 10 + len(data) + struct.pack_into( + " bytes: + """Decode data from transport.""" + # payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(response[0x38:]) + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + d_len = int.from_bytes(payload[0x08:0x0A], "little") + return payload[0x0A:0x0A+d_len] + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a command to the unit.""" + prefix = bytes([((command << 4) | 1), 1]) + packet = self._encode(prefix + data) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response)[0x02:] + + def _parse_state(self, data: bytes) -> dict: + """Parse state.""" + state = {} + state["power"] = bool(data[0x08] & 1 << 5) + state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5 + state["swing_v"] = self.SwVert(data[0x00] & 0b111) + state["swing_h"] = self.SwHoriz(data[0x01] >> 5) + state["mode"] = self.Mode(data[0x05] >> 5) + state["speed"] = self.Speed(data[0x03] >> 5) + state["preset"] = self.Preset(data[0x04] >> 6) + state["sleep"] = bool(data[0x05] & 1 << 2) + state["ifeel"] = bool(data[0x05] & 1 << 3) + state["health"] = bool(data[0x08] & 1 << 1) + state["clean"] = bool(data[0x08] & 1 << 2) + state["display"] = bool(data[0x0A] & 1 << 4) + state["mildew"] = bool(data[0x0A] & 1 << 3) + return state + + def set_state( + self, + power: bool, + target_temp: float, # 16<=target_temp<=32 + mode: Mode, + speed: Speed, + preset: Preset, + swing_h: SwHoriz, + swing_v: SwVert, + sleep: bool, + ifeel: bool, + display: bool, + health: bool, + clean: bool, + mildew: bool, + ) -> dict: + """Set the state of the device.""" + # TODO: decode unknown bits + UNK0 = 0b100 + UNK1 = 0b1101 + UNK2 = 0b101 + + target_temp = round(target_temp * 2) / 2 + + if preset == self.Preset.MUTE: + if mode != self.Mode.FAN: + raise ValueError("mute is only available in fan mode") + speed = self.Speed.LOW + + elif preset == self.Preset.TURBO: + if mode not in {self.Mode.COOL, self.Mode.HEAT}: + raise ValueError("turbo is only available in cooling/heating") + speed = self.Speed.HIGH + + data = bytearray(0x0D) + data[0x00] = (int(target_temp) - 8 << 3) | swing_v + data[0x01] = (swing_h << 5) | UNK0 + data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1 + data[0x03] = speed << 5 + data[0x04] = preset << 6 + data[0x05] = mode << 5 | sleep << 2 | ifeel << 3 + data[0x08] = power << 5 | clean << 2 | (health and 0b11) + data[0x0A] = display << 4 | mildew << 3 + data[0x0C] = UNK2 + + resp = self._send(0, data) + return self._parse_state(resp) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16 dict: + """Returns dictionary with AC info. + + Returns: + dict: + power (bool): power + ambient_temp (float): ambient temperature + """ + resp = self._send(2) + + if len(resp) < 22: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 24 bytes and received {len(resp) + 2}", + ) + + ac_info = {} + ac_info["power"] = resp[0x1] & 1 + + ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111 + if any(ambient_temp): + ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0 - self.send_request(input_payload) + return ac_info diff --git a/broadlink/const.py b/broadlink/const.py new file mode 100644 index 00000000..19c37f52 --- /dev/null +++ b/broadlink/const.py @@ -0,0 +1,5 @@ +"""Constants.""" +DEFAULT_BCAST_ADDR = "255.255.255.255" +DEFAULT_PORT = 80 +DEFAULT_RETRY_INTVL = 1 +DEFAULT_TIMEOUT = 10 diff --git a/broadlink/cover.py b/broadlink/cover.py index 2691fe97..75317943 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -1,43 +1,42 @@ """Support for covers.""" import time +from typing import Sequence -from .device import device -from .exceptions import check_error +from . import exceptions as e +from .device import Device -class dooya(device): +class dooya(Device): """Controls a Dooya curtain motor.""" - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "Dooya DT360E" + TYPE = "DT360E" - def _send(self, magic1: int, magic2: int) -> int: + def _send(self, command: int, attribute: int = 0) -> int: """Send a packet to the device.""" packet = bytearray(16) - packet[0] = 0x09 - packet[2] = 0xBB - packet[3] = magic1 - packet[4] = magic2 - packet[9] = 0xFA - packet[10] = 0x44 - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) + packet[0x00] = 0x09 + packet[0x02] = 0xBB + packet[0x03] = command + packet[0x04] = attribute + packet[0x09] = 0xFA + packet[0x0A] = 0x44 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) return payload[4] def open(self) -> int: """Open the curtain.""" - return self._send(0x01, 0x00) + return self._send(0x01) def close(self) -> int: """Close the curtain.""" - return self._send(0x02, 0x00) + return self._send(0x02) def stop(self) -> int: """Stop the curtain.""" - return self._send(0x03, 0x00) + return self._send(0x03) def get_percentage(self) -> int: """Return the position of the curtain.""" @@ -58,3 +57,126 @@ def set_percentage_and_wait(self, new_percentage: int) -> None: time.sleep(0.2) current = self.get_percentage() self.stop() + + +class dooya2(Device): + """Controls a Dooya curtain motor (version 2).""" + + TYPE = "DT360E-2" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def open(self) -> None: + """Open the curtain.""" + self._send(2, [0x00, 0x01, 0x00]) + + def close(self) -> None: + """Close the curtain.""" + self._send(2, [0x00, 0x02, 0x00]) + + def stop(self) -> None: + """Stop the curtain.""" + self._send(2, [0x00, 0x03, 0x00]) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, [0x00, 0x06, 0x00]) + return resp[0x11] + + def set_percentage(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + self._send(2, [0x00, 0x09, new_percentage]) + + +class wser(Device): + """Controls a Wistar curtain motor""" + + TYPE = "WSER" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def get_position(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, []) + position = resp[0x0E] + return position + + def open(self) -> int: + """Open the curtain.""" + resp = self._send(2, [0x4A, 0x31, 0xA0]) + position = resp[0x0E] + return position + + def close(self) -> int: + """Close the curtain.""" + resp = self._send(2, [0x61, 0x32, 0xA0]) + position = resp[0x0E] + return position + + def stop(self) -> int: + """Stop the curtain.""" + resp = self._send(2, [0x4C, 0x73, 0xA0]) + position = resp[0x0E] + return position + + def set_position(self, position: int) -> int: + """Set the position of the curtain.""" + resp = self._send(2, [position, 0x70, 0xA0]) + position = resp[0x0E] + return position diff --git a/broadlink/device.py b/broadlink/device.py index e6753774..5a10bc01 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -3,22 +3,28 @@ import threading import random import time -from datetime import datetime -from typing import Generator, Tuple, Union +from typing import Generator, Optional, Tuple, Union from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from .exceptions import check_error, exception +from . import exceptions as e +from .const import ( + DEFAULT_BCAST_ADDR, + DEFAULT_PORT, + DEFAULT_RETRY_INTVL, + DEFAULT_TIMEOUT, +) +from .protocol import Datetime HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] def scan( - timeout: int = 10, - local_ip_address: str = None, - discover_ip_address: str = "255.255.255.255", - discover_ip_port: int = 80, + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, ) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -33,42 +39,13 @@ def scan( port = 0 packet = bytearray(0x30) - - timezone = int(time.timezone / -3600) - if timezone < 0: - packet[0x08] = 0xFF + timezone - 1 - packet[0x09] = 0xFF - packet[0x0A] = 0xFF - packet[0x0B] = 0xFF - else: - packet[0x08] = timezone - packet[0x09] = 0 - packet[0x0A] = 0 - packet[0x0B] = 0 - - year = datetime.now().year - packet[0x0C] = year & 0xFF - packet[0x0D] = year >> 8 - packet[0x0E] = datetime.now().minute - packet[0x0F] = datetime.now().hour - subyear = str(year)[2:] - packet[0x10] = int(subyear) - packet[0x11] = datetime.now().isoweekday() - packet[0x12] = datetime.now().day - packet[0x13] = datetime.now().month - - address = local_ip_address.split(".") - packet[0x18] = int(address[3]) - packet[0x19] = int(address[2]) - packet[0x1A] = int(address[1]) - packet[0x1B] = int(address[0]) - packet[0x1C] = port & 0xFF - packet[0x1D] = port >> 8 + packet[0x08:0x14] = Datetime.pack(Datetime.now()) + packet[0x18:0x1C] = socket.inet_aton(local_ip_address)[::-1] + packet[0x1C:0x1E] = port.to_bytes(2, "little") packet[0x26] = 6 checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[0x20] = checksum & 0xFF - packet[0x21] = checksum >> 8 + packet[0x20:0x22] = checksum.to_bytes(2, "little") start_time = time.time() discovered = [] @@ -76,79 +53,105 @@ def scan( try: while (time.time() - start_time) < timeout: time_left = timeout - (time.time() - start_time) - conn.settimeout(min(1, time_left)) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) conn.sendto(packet, (discover_ip_address, discover_ip_port)) while True: try: - response, host = conn.recvfrom(1024) + resp, host = conn.recvfrom(1024) except socket.timeout: break - devtype = response[0x34] | response[0x35] << 8 - mac = bytes(reversed(response[0x3A:0x40])) + devtype = resp[0x34] | resp[0x35] << 8 + mac = resp[0x3A:0x40][::-1] + if (host, mac, devtype) in discovered: continue discovered.append((host, mac, devtype)) - name = response[0x40:].split(b"\x00")[0].decode("utf-8") - is_locked = bool(response[-1]) + name = resp[0x40:].split(b"\x00")[0].decode() + is_locked = bool(resp[0x7F]) yield devtype, host, mac, name, is_locked finally: conn.close() -class device: +def ping(ip_address: str, port: int = DEFAULT_PORT) -> None: + """Send a ping packet to an address. + + This packet feeds the watchdog timer of firmwares >= v53. + Useful to prevent reboots when the cloud cannot be reached. + It must be sent every 2 minutes in such cases. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + packet = bytearray(0x30) + packet[0x26] = 1 + conn.sendto(packet, (ip_address, port)) + + +class Device: """Controls a Broadlink device.""" + TYPE = "Unknown" + + __INIT_KEY = "097628343fe99e23765c1513accf8b02" + __INIT_VECT = "562e17996d093d28ddb3ba695a2e6f58" + def __init__( self, host: Tuple[str, int], mac: Union[bytes, str], devtype: int, - timeout: int = 10, - name: str = None, - model: str = None, - manufacturer: str = None, - is_locked: bool = None, + timeout: int = DEFAULT_TIMEOUT, + name: str = "", + model: str = "", + manufacturer: str = "", + is_locked: bool = False, ) -> None: """Initialize the controller.""" self.host = host self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac - self.devtype = devtype if devtype is not None else 0x272A + self.devtype = devtype self.timeout = timeout self.name = name self.model = model self.manufacturer = manufacturer self.is_locked = is_locked self.count = random.randint(0x8000, 0xFFFF) - self.iv = bytes.fromhex("562e17996d093d28ddb3ba695a2e6f58") - self.id = bytes(4) - self.type = "Unknown" + self.iv = bytes.fromhex(self.__INIT_VECT) + self.id = 0 + self.type = self.TYPE # For backwards compatibility. self.lock = threading.Lock() self.aes = None - key = bytes.fromhex("097628343fe99e23765c1513accf8b02") - self.update_aes(key) - - def __repr__(self): - return "<%s: %s %s (%s) at %s:%s | %s | %s | %s>" % ( - type(self).__name__, - self.manufacturer, - self.model, - hex(self.devtype), - self.host[0], - self.host[1], - ":".join(format(x, "02x") for x in self.mac), + self.update_aes(bytes.fromhex(self.__INIT_KEY)) + + def __repr__(self) -> str: + """Return a formal representation of the device.""" + return ( + "%s.%s(%s, mac=%r, devtype=%r, timeout=%r, name=%r, " + "model=%r, manufacturer=%r, is_locked=%r)" + ) % ( + self.__class__.__module__, + self.__class__.__qualname__, + self.host, + self.mac, + self.devtype, + self.timeout, self.name, - "Locked" if self.is_locked else "Unlocked", + self.model, + self.manufacturer, + self.is_locked, ) - def __str__(self): - return "%s (%s at %s)" % ( - self.name, - self.model or hex(self.devtype), - self.host[0], + def __str__(self) -> str: + """Return a readable representation of the device.""" + return "%s (%s / %s:%s / %s)" % ( + self.name or "Unknown", + " ".join(filter(None, [self.manufacturer, self.model, hex(self.devtype)])), + *self.host, + ":".join(format(x, "02X") for x in self.mac), ) def update_aes(self, key: bytes) -> None: @@ -169,42 +172,21 @@ def decrypt(self, payload: bytes) -> bytes: def auth(self) -> bool: """Authenticate to the device.""" - payload = bytearray(0x50) - payload[0x04] = 0x31 - payload[0x05] = 0x31 - payload[0x06] = 0x31 - payload[0x07] = 0x31 - payload[0x08] = 0x31 - payload[0x09] = 0x31 - payload[0x0A] = 0x31 - payload[0x0B] = 0x31 - payload[0x0C] = 0x31 - payload[0x0D] = 0x31 - payload[0x0E] = 0x31 - payload[0x0F] = 0x31 - payload[0x10] = 0x31 - payload[0x11] = 0x31 - payload[0x12] = 0x31 - payload[0x1E] = 0x01 - payload[0x2D] = 0x01 - payload[0x30] = ord("T") - payload[0x31] = ord("e") - payload[0x32] = ord("s") - payload[0x33] = ord("t") - payload[0x34] = ord(" ") - payload[0x35] = ord(" ") - payload[0x36] = ord("1") - - response = self.send_packet(0x65, payload) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) + self.id = 0 + self.update_aes(bytes.fromhex(self.__INIT_KEY)) - key = payload[0x04:0x14] - if len(key) % 16 != 0: - return False + packet = bytearray(0x50) + packet[0x04:0x14] = [0x31] * 16 + packet[0x1E] = 0x01 + packet[0x2D] = 0x01 + packet[0x30:0x36] = "Test 1".encode() - self.id = payload[0x03::-1] - self.update_aes(key) + response = self.send_packet(0x65, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + self.id = int.from_bytes(payload[:0x4], "little") + self.update_aes(payload[0x04:0x14]) return True def hello(self, local_ip_address=None) -> bool: @@ -219,22 +201,49 @@ def hello(self, local_ip_address=None) -> bool: discover_ip_port=self.host[1], ) try: - devtype, host, mac, name, is_locked = next(responses) - except StopIteration: - raise exception(-4000) # Network timeout. - - if (devtype, host, mac) != (self.devtype, self.host, self.mac): - raise exception(-2040) # Device information is not intact. + devtype, _, mac, name, is_locked = next(responses) + + except StopIteration as err: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {self.timeout}s", + ) from err + + if mac != self.mac: + raise e.DataValidationError( + -2040, + "Device information is not intact", + "The MAC address is different", + f"Expected {self.mac} and received {mac}", + ) + + if devtype != self.devtype: + raise e.DataValidationError( + -2040, + "Device information is not intact", + "The product ID is different", + f"Expected {self.devtype} and received {devtype}", + ) self.name = name self.is_locked = is_locked return True + def ping(self) -> None: + """Ping the device. + + This packet feeds the watchdog timer of firmwares >= v53. + Useful to prevent reboots when the cloud cannot be reached. + It must be sent every 2 minutes in such cases. + """ + ping(self.host[0], port=self.host[1]) + def get_fwversion(self) -> int: """Get firmware version.""" packet = bytearray([0x68]) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) return payload[0x4] | payload[0x5] << 8 @@ -243,9 +252,9 @@ def set_name(self, name: str) -> None: packet = bytearray(4) packet += name.encode("utf-8") packet += bytearray(0x50 - len(packet)) - packet[0x43] = bool(self.is_locked) + packet[0x43] = self.is_locked response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) self.name = name def set_lock(self, state: bool) -> None: @@ -255,81 +264,69 @@ def set_lock(self, state: bool) -> None: packet += bytearray(0x50 - len(packet)) packet[0x43] = bool(state) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) self.is_locked = bool(state) def get_type(self) -> str: """Return device type.""" return self.type - def send_packet(self, command: int, payload: bytes) -> bytes: + def send_packet(self, packet_type: int, payload: bytes) -> bytes: """Send a packet to the device.""" self.count = ((self.count + 1) | 0x8000) & 0xFFFF packet = bytearray(0x38) - packet[0x00] = 0x5A - packet[0x01] = 0xA5 - packet[0x02] = 0xAA - packet[0x03] = 0x55 - packet[0x04] = 0x5A - packet[0x05] = 0xA5 - packet[0x06] = 0xAA - packet[0x07] = 0x55 - packet[0x24] = self.devtype & 0xFF - packet[0x25] = self.devtype >> 8 - packet[0x26] = command - packet[0x28] = self.count & 0xFF - packet[0x29] = self.count >> 8 - packet[0x2A] = self.mac[5] - packet[0x2B] = self.mac[4] - packet[0x2C] = self.mac[3] - packet[0x2D] = self.mac[2] - packet[0x2E] = self.mac[1] - packet[0x2F] = self.mac[0] - packet[0x30] = self.id[3] - packet[0x31] = self.id[2] - packet[0x32] = self.id[1] - packet[0x33] = self.id[0] - - # pad the payload for AES encryption - padding = (16 - len(payload)) % 16 - if padding: - payload = bytearray(payload) - payload += bytearray(padding) + packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55") + packet[0x24:0x26] = self.devtype.to_bytes(2, "little") + packet[0x26:0x28] = packet_type.to_bytes(2, "little") + packet[0x28:0x2A] = self.count.to_bytes(2, "little") + packet[0x2A:0x30] = self.mac[::-1] + packet[0x30:0x34] = self.id.to_bytes(4, "little") - checksum = sum(payload, 0xBEAF) & 0xFFFF - packet[0x34] = checksum & 0xFF - packet[0x35] = checksum >> 8 + p_checksum = sum(payload, 0xBEAF) & 0xFFFF + packet[0x34:0x36] = p_checksum.to_bytes(2, "little") - payload = self.encrypt(payload) - for i in range(len(payload)): - packet.append(payload[i]) + padding = (16 - len(payload)) % 16 + payload = self.encrypt(payload + bytes(padding)) + packet.extend(payload) checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[0x20] = checksum & 0xFF - packet[0x21] = checksum >> 8 - - with self.lock: - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: - timeout = self.timeout - start_time = time.time() - - while True: - time_left = timeout - (time.time() - start_time) - conn.settimeout(min(1, time_left)) - conn.sendto(packet, self.host) - - try: - resp = conn.recvfrom(2048)[0] - break - except socket.timeout: - if (time.time() - start_time) > timeout: - raise exception(-4000) # Network timeout. + packet[0x20:0x22] = checksum.to_bytes(2, "little") - if len(resp) < 0x30: - raise exception(-4007) # Length error. + with self.lock and socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + timeout = self.timeout + start_time = time.time() + + while True: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) + conn.sendto(packet, self.host) - checksum = resp[0x20] | (resp[0x21] << 8) - if sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF != checksum: - raise exception(-4008) # Checksum error. + try: + resp = conn.recvfrom(2048)[0] + break + except socket.timeout as err: + if (time.time() - start_time) > timeout: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {timeout}s", + ) from err + + if len(resp) < 0x30: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 48 bytes and received {len(resp)}", + ) + + nom_checksum = int.from_bytes(resp[0x20:0x22], "little") + real_checksum = sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF + + if nom_checksum != real_checksum: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_checksum} and received {real_checksum}", + ) return resp diff --git a/broadlink/exceptions.py b/broadlink/exceptions.py index d0e5bc0f..2343ad6e 100644 --- a/broadlink/exceptions.py +++ b/broadlink/exceptions.py @@ -1,19 +1,17 @@ """Exceptions for Broadlink devices.""" +import collections import struct class BroadlinkException(Exception): - """Common base class for all Broadlink exceptions.""" + """Base class common to all Broadlink exceptions.""" def __init__(self, *args, **kwargs): """Initialize the exception.""" super().__init__(*args, **kwargs) - if len(args) >= 3: + if len(args) >= 2: self.errno = args[0] - self.strerror = "%s: %s" % (args[1], args[2]) - elif len(args) == 2: - self.errno = args[0] - self.strerror = str(args[1]) + self.strerror = ": ".join(str(arg) for arg in args[1:]) elif len(args) == 1: self.errno = None self.strerror = str(args[0]) @@ -22,78 +20,91 @@ def __init__(self, *args, **kwargs): self.strerror = "" def __str__(self): - """Return the error message.""" + """Return str(self).""" if self.errno is not None: return "[Errno %s] %s" % (self.errno, self.strerror) return self.strerror + def __eq__(self, other): + """Return self==value.""" + # pylint: disable=unidiomatic-typecheck + return type(self) == type(other) and self.args == other.args + + def __hash__(self): + """Return hash(self).""" + return hash((type(self), self.args)) + + +class MultipleErrors(BroadlinkException): + """Multiple errors.""" + + def __init__(self, *args, **kwargs): + """Initialize the exception.""" + errors = args[0][:] if args else [] + counter = collections.Counter(errors) + strerror = "Multiple errors occurred: %s" % counter + super().__init__(strerror, **kwargs) + self.errors = errors + + def __repr__(self): + """Return repr(self).""" + return "MultipleErrors(%r)" % self.errors -class FirmwareException(BroadlinkException): - """Common base class for all firmware exceptions.""" + def __str__(self): + """Return str(self).""" + return self.strerror -class AuthenticationError(FirmwareException): +class AuthenticationError(BroadlinkException): """Authentication error.""" -class AuthorizationError(FirmwareException): +class AuthorizationError(BroadlinkException): """Authorization error.""" -class CommandNotSupportedError(FirmwareException): +class CommandNotSupportedError(BroadlinkException): """Command not supported error.""" -class ConnectionClosedError(FirmwareException): +class ConnectionClosedError(BroadlinkException): """Connection closed error.""" -class DataValidationError(FirmwareException): - """Data validation error.""" +class StructureAbnormalError(BroadlinkException): + """Structure abnormal error.""" -class DeviceOfflineError(FirmwareException): +class DeviceOfflineError(BroadlinkException): """Device offline error.""" -class ReadError(FirmwareException): +class ReadError(BroadlinkException): """Read error.""" -class SendError(FirmwareException): +class SendError(BroadlinkException): """Send error.""" -class SSIDNotFoundError(FirmwareException): +class SSIDNotFoundError(BroadlinkException): """SSID not found error.""" -class StorageError(FirmwareException): +class StorageError(BroadlinkException): """Storage error.""" -class WriteError(FirmwareException): +class WriteError(BroadlinkException): """Write error.""" -class SDKException(BroadlinkException): - """Common base class for all SDK exceptions.""" - - -class DeviceInformationError(SDKException): - """Device information is not intact.""" - - -class ChecksumError(SDKException): - """Received data packet check error.""" - - -class LengthError(SDKException): - """Received data packet length error.""" +class NetworkTimeoutError(BroadlinkException): + """Network timeout error.""" -class NetworkTimeoutError(SDKException): - """Network timeout error.""" +class DataValidationError(BroadlinkException): + """Data validation error.""" class UnknownError(BroadlinkException): @@ -107,30 +118,34 @@ class UnknownError(BroadlinkException): -3: (DeviceOfflineError, "The device is offline"), -4: (CommandNotSupportedError, "Command not supported"), -5: (StorageError, "The device storage is full"), - -6: (DataValidationError, "Structure is abnormal"), + -6: (StructureAbnormalError, "Structure is abnormal"), -7: (AuthorizationError, "Control key is expired"), -8: (SendError, "Send error"), -9: (WriteError, "Write error"), -10: (ReadError, "Read error"), -11: (SSIDNotFoundError, "SSID could not be found in AP configuration"), # SDK related errors are generated by this module. - -2040: (DeviceInformationError, "Device information is not intact"), + -2040: (DataValidationError, "Device information is not intact"), -4000: (NetworkTimeoutError, "Network timeout"), - -4007: (LengthError, "Received data packet length error"), - -4008: (ChecksumError, "Received data packet check error"), + -4007: (DataValidationError, "Received data packet length error"), + -4008: (DataValidationError, "Received data packet check error"), + -4009: (DataValidationError, "Received data packet information type error"), + -4010: (DataValidationError, "Received encrypted data packet length error"), + -4011: (DataValidationError, "Received encrypted data packet check error"), + -4012: (AuthorizationError, "Device control ID error"), } -def exception(error_code): +def exception(err_code: int) -> BroadlinkException: """Return exception corresponding to an error code.""" try: - exc, msg = BROADLINK_EXCEPTIONS[error_code] - return exc(error_code, msg) + exc, msg = BROADLINK_EXCEPTIONS[err_code] + return exc(err_code, msg) except KeyError: - return UnknownError(error_code, "Unknown error") + return UnknownError(err_code, "Unknown error") -def check_error(error): +def check_error(error: bytes) -> None: """Raise exception if an error occurred.""" error_code = struct.unpack("h", error)[0] if error_code: diff --git a/broadlink/helpers.py b/broadlink/helpers.py index 404feadd..e7b3d4c9 100644 --- a/broadlink/helpers.py +++ b/broadlink/helpers.py @@ -1,26 +1,43 @@ -"""Helper functions.""" -from ctypes import c_ushort +"""Helper functions and classes.""" +from typing import Dict, List, Sequence -def calculate_crc16(input_data: bytes) -> int: - """Calculate the CRC-16 of a byte string.""" - crc16_tab = [] - crc16_constant = 0xA001 +class CRC16: + """Helps with CRC-16 calculation. - for i in range(0, 256): - crc = c_ushort(i).value - for j in range(0, 8): - if crc & 0x0001: - crc = c_ushort(crc >> 1).value ^ crc16_constant - else: - crc = c_ushort(crc >> 1).value - crc16_tab.append(hex(crc)) + CRC tables are cached for performance. + """ - crcValue = 0xFFFF + _cache: Dict[int, List[int]] = {} - for c in input_data: - tmp = crcValue ^ c - rotated = c_ushort(crcValue >> 8).value - crcValue = rotated ^ int(crc16_tab[(tmp & 0x00FF)], 0) + @classmethod + def get_table(cls, polynomial: int) -> List[int]: + """Return the CRC-16 table for a polynomial.""" + try: + crc_table = cls._cache[polynomial] + except KeyError: + crc_table = [] + for dividend in range(0, 256): + remainder = dividend + for _ in range(0, 8): + if remainder & 1: + remainder = remainder >> 1 ^ polynomial + else: + remainder = remainder >> 1 + crc_table.append(remainder) + cls._cache[polynomial] = crc_table + return crc_table - return crcValue + @classmethod + def calculate( + cls, + sequence: Sequence[int], + polynomial: int = 0xA001, # CRC-16-ANSI. + init_value: int = 0xFFFF, + ) -> int: + """Calculate the CRC-16 of a sequence of integers.""" + crc_table = cls.get_table(polynomial) + crc = init_value + for item in sequence: + crc = crc >> 8 ^ crc_table[(crc ^ item) & 0xFF] + return crc diff --git a/broadlink/hub.py b/broadlink/hub.py new file mode 100644 index 00000000..0fd4ae53 --- /dev/null +++ b/broadlink/hub.py @@ -0,0 +1,98 @@ +"""Support for hubs.""" +import struct +import json +from typing import Optional + +from . import exceptions as e +from .device import Device + + +class s3(Device): + """Controls a Broadlink S3.""" + + TYPE = "S3" + MAX_SUBDEVICES = 8 + + def get_subdevices(self, step: int = 5) -> list: + """Return a list of sub devices.""" + total = self.MAX_SUBDEVICES + sub_devices = [] + seen = set() + index = 0 + + while index < total: + state = {"count": step, "index": index} + packet = self._encode(14, state) + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + resp = self._decode(resp) + + for device in resp["list"]: + did = device["did"] + if did in seen: + continue + + seen.add(did) + sub_devices.append(device) + + total = resp["total"] + if len(seen) >= total: + break + + index += step + + return sub_devices + + def get_state(self, did: Optional[str] = None) -> dict: + """Return the power state of the device.""" + state = {} + if did is not None: + state["did"] = did + + packet = self._encode(1, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + did: Optional[str] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + pwr3: Optional[bool] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if did is not None: + state["did"] = did + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "SmartBulb" - - def send_command(self, command: str, type: str = "set") -> None: - """Send a command to the device.""" - packet = bytearray(16 + (int(len(command) / 16) + 1) * 16) - packet[0x00] = 0x0C + len(command) & 0xFF - packet[0x02] = 0xA5 - packet[0x03] = 0xA5 - packet[0x04] = 0x5A - packet[0x05] = 0x5A - packet[0x08] = 0x02 if type == "set" else 0x01 # 0x01 => query, # 0x02 => set - packet[0x09] = 0x0B - packet[0x0A] = len(command) - packet[0x0E:] = map(ord, command) + TYPE = "LB1" - checksum = sum(packet, 0xBEAF) & 0xFFFF - packet[0x06] = checksum & 0xFF # Checksum 1 position - packet[0x07] = checksum >> 8 # Checksum 2 position + @enum.unique + class ColorMode(enum.IntEnum): + """Enumerates color modes.""" + + RGB = 0 + WHITE = 1 + SCENE = 2 + + def get_state(self) -> dict: + """Return the power state of the device. + + Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': '', 'bulb_sceneidx': 255}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + bulb_sceneidx: Optional[int] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if red is not None: + state["red"] = int(red) + if blue is not None: + state["blue"] = int(blue) + if green is not None: + state["green"] = int(green) + if brightness is not None: + state["brightness"] = int(brightness) + if colortemp is not None: + state["colortemp"] = int(colortemp) + if hue is not None: + state["hue"] = int(hue) + if saturation is not None: + state["saturation"] = int(saturation) + if transitionduration is not None: + state["transitionduration"] = int(transitionduration) + if maxworktime is not None: + state["maxworktime"] = int(maxworktime) + if bulb_colormode is not None: + state["bulb_colormode"] = int(bulb_colormode) + if bulb_scenes is not None: + state["bulb_scenes"] = str(bulb_scenes) + if bulb_scene is not None: + state["bulb_scene"] = str(bulb_scene) + if bulb_sceneidx is not None: + state["bulb_sceneidx"] = int(bulb_sceneidx) + packet = self._encode(2, state) response = self.send_packet(0x6A, packet) - check_error(response[0x36:0x38]) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(14) + data = json.dumps(state, separators=(",", ":")).encode() + p_len = 12 + len(data) + struct.pack_into( + " dict: + """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" 0: - self.state_dict = json.loads(payload[0x0E : 0x0E + responseLength]) +class lb2(Device): + """Controls a Broadlink LB26/LB27.""" - def set_json(self, jsonstr: str) -> str: - """Send a command to the device and return state.""" - reconvert = json.loads(jsonstr) - if "bulb_sceneidx" in reconvert.keys(): - reconvert["bulb_sceneidx"] = self.effect_map_dict.get( - reconvert["bulb_sceneidx"], 255 - ) + TYPE = "LB2" - self.send_command(json.dumps(reconvert)) - return json.dumps(self.state_dict) + @enum.unique + class ColorMode(enum.IntEnum): + """Enumerates color modes.""" - def set_state(self, state: Union[str, int]) -> None: - """Set the state of the device.""" - cmd = '{"pwr":%d}' % (1 if state == "ON" or state == 1 else 0) - self.send_command(cmd) + RGB = 0 + WHITE = 1 + SCENE = 2 def get_state(self) -> dict: - """Return the state of the device.""" - cmd = "{}" - self.send_command(cmd) - return self.state_dict + """Return the power state of the device. + + Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': ''}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if red is not None: + state["red"] = int(red) + if blue is not None: + state["blue"] = int(blue) + if green is not None: + state["green"] = int(green) + if brightness is not None: + state["brightness"] = int(brightness) + if colortemp is not None: + state["colortemp"] = int(colortemp) + if hue is not None: + state["hue"] = int(hue) + if saturation is not None: + state["saturation"] = int(saturation) + if transitionduration is not None: + state["transitionduration"] = int(transitionduration) + if maxworktime is not None: + state["maxworktime"] = int(maxworktime) + if bulb_colormode is not None: + state["bulb_colormode"] = int(bulb_colormode) + if bulb_scenes is not None: + state["bulb_scenes"] = str(bulb_scenes) + if bulb_scene is not None: + state["bulb_scene"] = str(bulb_scene) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" bytes: + """Pack the timestamp to be sent over the Broadlink protocol.""" + data = bytearray(12) + utcoffset = int(datetime.utcoffset().total_seconds() / 3600) + data[:0x04] = utcoffset.to_bytes(4, "little", signed=True) + data[0x04:0x06] = datetime.year.to_bytes(2, "little") + data[0x06] = datetime.minute + data[0x07] = datetime.hour + data[0x08] = int(datetime.strftime("%y")) + data[0x09] = datetime.isoweekday() + data[0x0A] = datetime.day + data[0x0B] = datetime.month + return data + + @staticmethod + def unpack(data: bytes) -> dt.datetime: + """Unpack a timestamp received over the Broadlink protocol.""" + utcoffset = int.from_bytes(data[0x00:0x04], "little", signed=True) + year = int.from_bytes(data[0x04:0x06], "little") + minute = data[0x06] + hour = data[0x07] + subyear = data[0x08] + isoweekday = data[0x09] + day = data[0x0A] + month = data[0x0B] + + tz_info = dt.timezone(dt.timedelta(hours=utcoffset)) + datetime = dt.datetime(year, month, day, hour, minute, 0, 0, tz_info) + + if datetime.isoweekday() != isoweekday: + raise ValueError("isoweekday does not match") + if int(datetime.strftime("%y")) != subyear: + raise ValueError("subyear does not match") + + return datetime + + @staticmethod + def now() -> dt.datetime: + """Return the current date and time with timezone info.""" + tz_info = dt.timezone(dt.timedelta(seconds=-time.timezone)) + return dt.datetime.now(tz_info) diff --git a/broadlink/remote.py b/broadlink/remote.py index 6cf32f27..60c54ce2 100644 --- a/broadlink/remote.py +++ b/broadlink/remote.py @@ -1,29 +1,70 @@ """Support for universal remotes.""" import struct +from typing import List, Optional, Tuple -from .device import device -from .exceptions import check_error +from . import exceptions as e +from .device import Device -class rm(device): - """Controls a Broadlink RM.""" +def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "RM2" + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: + result.append(0) + result.append(div) + result.append(mod) - def _send(self, command: int, data: bytes = b'') -> bytes: + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result + + +def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" + result = [] + index = 4 + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] + index += 1 + + if chunk == 0: + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError as err: + raise ValueError("Malformed data.") from err + index += 2 + + result.append(int(chunk * tick)) + + return result + + +class rmmini(Device): + """Controls a Broadlink RM mini 3.""" + + TYPE = "RMMINI" + + def _send(self, command: int, data: bytes = b"") -> bytes: """Send a packet to the device.""" packet = struct.pack(" bytes: - """Return the last captured code.""" - return self._send(0x4) + def update(self) -> None: + """Update device name and lock status.""" + resp = self._send(0x1) + self.name = resp[0x48:].split(b"\x00")[0].decode() + self.is_locked = bool(resp[0x87]) def send_data(self, data: bytes) -> None: """Send a code to the device.""" @@ -33,66 +74,100 @@ def enter_learning(self) -> None: """Enter infrared learning mode.""" self._send(0x3) + def check_data(self) -> bytes: + """Return the last captured code.""" + return self._send(0x4) + + +class rmpro(rmmini): + """Controls a Broadlink RM pro.""" + + TYPE = "RMPRO" + def sweep_frequency(self) -> None: """Sweep frequency.""" self._send(0x19) - def cancel_sweep_frequency(self) -> None: - """Cancel sweep frequency.""" - self._send(0x1E) - - def check_frequency(self) -> bool: + def check_frequency(self) -> Tuple[bool, float]: """Return True if the frequency was identified successfully.""" resp = self._send(0x1A) - return resp[0] == 1 + is_found = bool(resp[0]) + frequency = struct.unpack(" bool: + def find_rf_packet(self, frequency: Optional[float] = None) -> None: """Enter radiofrequency learning mode.""" - resp = self._send(0x1B) - return resp[0] == 1 + payload = bytearray() + if frequency: + payload += struct.pack(" float: - """Return the temperature.""" - return self.check_sensors()["temperature"] + def cancel_sweep_frequency(self) -> None: + """Cancel sweep frequency.""" + self._send(0x1E) def check_sensors(self) -> dict: """Return the state of the sensors.""" resp = self._send(0x1) - temperature = struct.unpack(" float: + """Return the temperature.""" + return self.check_sensors()["temperature"] -class rm4(rm): - """Controls a Broadlink RM4.""" +class rmminib(rmmini): + """Controls a Broadlink RM mini 3 (new firmware).""" - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "RM4" + TYPE = "RMMINIB" - def _send(self, command: int, data: bytes = b'') -> bytes: + def _send(self, command: int, data: bytes = b"") -> bytes: """Send a packet to the device.""" packet = struct.pack(" bool: - """Enter radiofrequency learning mode.""" - self._send(0x1B) - return True - def check_humidity(self) -> float: - """Return the humidity.""" - return self.check_sensors()["humidity"] +class rm4mini(rmminib): + """Controls a Broadlink RM4 mini.""" + + TYPE = "RM4MINI" def check_sensors(self) -> dict: """Return the state of the sensors.""" resp = self._send(0x24) - temperature = struct.unpack(" float: + """Return the temperature.""" + return self.check_sensors()["temperature"] + + def check_humidity(self) -> float: + """Return the humidity.""" + return self.check_sensors()["humidity"] + + +class rm4pro(rm4mini, rmpro): + """Controls a Broadlink RM4 pro.""" + + TYPE = "RM4PRO" + + +class rm(rmpro): + """For backwards compatibility.""" + + TYPE = "RM2" + + +class rm4(rm4pro): + """For backwards compatibility.""" + + TYPE = "RM4" diff --git a/broadlink/sensor.py b/broadlink/sensor.py index 3a257b12..284576fa 100644 --- a/broadlink/sensor.py +++ b/broadlink/sensor.py @@ -1,24 +1,21 @@ """Support for sensors.""" -import struct +from typing import Sequence -from .device import device -from .exceptions import check_error +from . import exceptions as e +from .device import Device -class a1(device): +class a1(Device): """Controls a Broadlink A1.""" + TYPE = "A1" + _SENSORS_AND_LEVELS = ( ("light", ("dark", "dim", "normal", "bright")), ("air_quality", ("excellent", "good", "normal", "bad")), ("noise", ("quiet", "normal", "noisy")), ) - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "A1" - def check_sensors(self) -> dict: """Return the state of the sensors.""" data = self.check_sensors_raw() @@ -32,19 +29,62 @@ def check_sensors(self) -> dict: def check_sensors_raw(self) -> dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - data = payload[0x4:] + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + data = self.decrypt(resp[0x38:]) + + return { + "temperature": data[0x04] + data[0x05] / 10.0, + "humidity": data[0x06] + data[0x07] / 10.0, + "light": data[0x08], + "air_quality": data[0x0A], + "noise": data[0x0C], + } + + +class a2(Device): + """Controls a Broadlink A2.""" + + TYPE = "A2" - temperature = struct.unpack("> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + data = self._send(1) return { - "temperature": temperature, - "humidity": humidity, - "light": data[0x4], - "air_quality": data[0x6], - "noise": data[0x8], + "temperature": data[0x13] * 256 + data[0x14], + "humidity": data[0x15] * 256 + data[0x16], + "pm10": data[0x0D] * 256 + data[0x0E], + "pm2_5": data[0x0F] * 256 + data[0x10], + "pm1": data[0x11] * 256 + data[0x12], } diff --git a/broadlink/switch.py b/broadlink/switch.py index f7f23084..8393f6b1 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -1,280 +1,168 @@ """Support for switches.""" import json import struct +from typing import Optional -from .device import device -from .exceptions import check_error +from . import exceptions as e +from .device import Device -class mp1(device): - """Controls a Broadlink MP1.""" +class sp1(Device): + """Controls a Broadlink SP1.""" - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "MP1" + TYPE = "SP1" - def set_power_mask(self, sid_mask: int, state: bool) -> None: + def set_power(self, pwr: bool) -> None: """Set the power state of the device.""" - packet = bytearray(16) - packet[0x00] = 0x0D - packet[0x02] = 0xA5 - packet[0x03] = 0xA5 - packet[0x04] = 0x5A - packet[0x05] = 0x5A - packet[0x06] = 0xB2 + ((sid_mask << 1) if state else sid_mask) - packet[0x07] = 0xC0 - packet[0x08] = 0x02 - packet[0x0A] = 0x03 - packet[0x0D] = sid_mask - packet[0x0E] = sid_mask if state else 0 + packet = bytearray(4) + packet[0] = bool(pwr) + response = self.send_packet(0x66, packet) + e.check_error(response[0x22:0x24]) - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - def set_power(self, sid: int, state: bool) -> None: - """Set the power state of the device.""" - sid_mask = 0x01 << (sid - 1) - self.set_power_mask(sid_mask, state) +class sp2(Device): + """Controls a Broadlink SP2.""" - def check_power_raw(self) -> int: - """Return the power state of the device in raw format.""" - packet = bytearray(16) - packet[0x00] = 0x0A - packet[0x02] = 0xA5 - packet[0x03] = 0xA5 - packet[0x04] = 0x5A - packet[0x05] = 0x5A - packet[0x06] = 0xAE - packet[0x07] = 0xC0 - packet[0x08] = 0x01 + TYPE = "SP2" + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = bool(pwr) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - return payload[0x0E] + e.check_error(response[0x22:0x24]) - def check_power(self) -> dict: + def check_power(self) -> bool: """Return the power state of the device.""" - state = self.check_power_raw() - if state is None: - return {"s1": None, "s2": None, "s3": None, "s4": None} - data = {} - data["s1"] = bool(state & 0x01) - data["s2"] = bool(state & 0x02) - data["s3"] = bool(state & 0x04) - data["s4"] = bool(state & 0x08) - return data - - -class bg1(device): - """Controls a BG Electrical smart outlet.""" - - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "BG1" - - def get_state(self) -> dict: - """Return the power state of the device. - - Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` - """ - packet = self._encode(1, b"{}") - response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - return self._decode(response) - - def set_state( - self, - pwr: bool = None, - pwr1: bool = None, - pwr2: bool = None, - maxworktime: int = None, - maxworktime1: int = None, - maxworktime2: int = None, - idcbrightness: int = None, - ) -> dict: - """Set the power state of the device.""" - data = {} - if pwr is not None: - data["pwr"] = int(bool(pwr)) - if pwr1 is not None: - data["pwr1"] = int(bool(pwr1)) - if pwr2 is not None: - data["pwr2"] = int(bool(pwr2)) - if maxworktime is not None: - data["maxworktime"] = maxworktime - if maxworktime1 is not None: - data["maxworktime1"] = maxworktime1 - if maxworktime2 is not None: - data["maxworktime2"] = maxworktime2 - if idcbrightness is not None: - data["idcbrightness"] = idcbrightness - js = json.dumps(data).encode("utf8") - packet = self._encode(2, js) + packet = bytearray(16) + packet[0] = 1 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) - return self._decode(response) - - def _encode(self, flag: int, js: str) -> bytes: - """Encode a message.""" - # The packet format is: - # 0x00-0x01 length - # 0x02-0x05 header - # 0x06-0x07 00 - # 0x08 flag (1 for read or 2 write?) - # 0x09 unknown (0xb) - # 0x0a-0x0d length of json - # 0x0e- json data - packet = bytearray(14) - length = 4 + 2 + 2 + 4 + len(js) - struct.pack_into( - "> 8 - return packet - - def _decode(self, response: bytes) -> dict: - """Decode a message.""" + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - js_len = struct.unpack_from(" None: - """Initialize the device.""" - device.__init__(self, *args, **kwargs) - self.type = "SP1" + TYPE = "SP2S" - def set_power(self, state: bool) -> None: - """Set the power state of the device.""" - packet = bytearray(4) - packet[0] = state - response = self.send_packet(0x66, packet) - check_error(response[0x22:0x24]) + def get_energy(self) -> float: + """Return the power consumption in W.""" + packet = bytearray(16) + packet[0] = 4 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return int.from_bytes(payload[0x4:0x7], "little") / 1000 -class sp2(device): - """Controls a Broadlink SP2.""" +class sp3(Device): + """Controls a Broadlink SP3.""" - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "SP2" + TYPE = "SP3" - def set_power(self, state: bool) -> None: + def set_power(self, pwr: bool) -> None: """Set the power state of the device.""" packet = bytearray(16) packet[0] = 2 - if self.check_nightlight(): - packet[4] = 3 if state else 2 - else: - packet[4] = 1 if state else 0 + packet[4] = self.check_nightlight() << 1 | bool(pwr) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) - def set_nightlight(self, state: bool) -> None: + def set_nightlight(self, ntlight: bool) -> None: """Set the night light state of the device.""" packet = bytearray(16) packet[0] = 2 - if self.check_power(): - packet[4] = 3 if state else 1 - else: - packet[4] = 2 if state else 0 + packet[4] = bool(ntlight) << 1 | self.check_power() response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) def check_power(self) -> bool: """Return the power state of the device.""" packet = bytearray(16) packet[0] = 1 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return bool(payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD) + return bool(payload[0x4] & 1) def check_nightlight(self) -> bool: """Return the state of the night light.""" packet = bytearray(16) packet[0] = 1 response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) - return bool(payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF) + return bool(payload[0x4] & 2) + + +class sp3s(sp2): + """Controls a Broadlink SP3S.""" + + TYPE = "SP3S" def get_energy(self) -> float: """Return the power consumption in W.""" packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) response = self.send_packet(0x6A, packet) - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) energy = payload[0x7:0x4:-1].hex() return int(energy) / 100 -class sp4(device): +class sp4(Device): """Controls a Broadlink SP4.""" - def __init__(self, *args, **kwargs) -> None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "SP4" + TYPE = "SP4" - def set_power(self, state: bool) -> None: + def set_power(self, pwr: bool) -> None: """Set the power state of the device.""" - self.set_state(pwr=state) + self.set_state(pwr=pwr) - def set_nightlight(self, state: bool) -> None: + def set_nightlight(self, ntlight: bool) -> None: """Set the night light state of the device.""" - self.set_state(ntlight=state) + self.set_state(ntlight=ntlight) def set_state( self, - pwr: bool = None, - ntlight: bool = None, - indicator: bool = None, - ntlbrightness: int = None, - maxworktime: int = None, - childlock: bool = None, + pwr: Optional[bool] = None, + ntlight: Optional[bool] = None, + indicator: Optional[bool] = None, + ntlbrightness: Optional[int] = None, + maxworktime: Optional[int] = None, + childlock: Optional[bool] = None, ) -> dict: """Set state of device.""" - data = {} + state = {} if pwr is not None: - data["pwr"] = int(bool(pwr)) + state["pwr"] = int(bool(pwr)) if ntlight is not None: - data["ntlight"] = int(bool(ntlight)) + state["ntlight"] = int(bool(ntlight)) if indicator is not None: - data["indicator"] = int(bool(indicator)) + state["indicator"] = int(bool(indicator)) if ntlbrightness is not None: - data["ntlbrightness"] = ntlbrightness + state["ntlbrightness"] = ntlbrightness if maxworktime is not None: - data["maxworktime"] = maxworktime + state["maxworktime"] = maxworktime if childlock is not None: - data["childlock"] = int(bool(childlock)) + state["childlock"] = int(bool(childlock)) - packet = self._encode(2, data) + packet = self._encode(2, state) response = self.send_packet(0x6A, packet) return self._decode(response) def check_power(self) -> bool: """Return the power state of the device.""" state = self.get_state() - return state["pwr"] + return bool(state["pwr"]) def check_nightlight(self) -> bool: """Return the state of the night light.""" state = self.get_state() - return state["ntlight"] + return bool(state["ntlight"]) def get_state(self) -> dict: """Get full state of device.""" @@ -284,33 +172,29 @@ def get_state(self) -> dict: def _encode(self, flag: int, state: dict) -> bytes: """Encode a message.""" - payload = json.dumps(state, separators=(",", ":")).encode() packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() struct.pack_into( - "> 8 + packet[0x04:0x06] = checksum.to_bytes(2, "little") return packet def _decode(self, response: bytes) -> dict: """Decode a message.""" - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" None: - """Initialize the controller.""" - device.__init__(self, *args, **kwargs) - self.type = "SP4B" + TYPE = "SP4B" def get_state(self) -> dict: """Get full state of device.""" @@ -326,9 +210,9 @@ def get_state(self) -> dict: def _encode(self, flag: int, state: dict) -> bytes: """Encode a message.""" - payload = json.dumps(state, separators=(",", ":")).encode() packet = bytearray(14) - length = 4 + 2 + 2 + 4 + len(payload) + data = json.dumps(state, separators=(",", ":")).encode() + length = 12 + len(data) struct.pack_into( " bytes: 0x0000, flag, 0x0B, - len(payload), + len(data), ) - packet.extend(payload) - checksum = sum(packet[0x8:], 0xC0AD) & 0xFFFF - packet[0x06] = checksum & 0xFF - packet[0x07] = checksum >> 8 + packet.extend(data) + checksum = sum(packet[0x02:], 0xBEAF) & 0xFFFF + packet[0x06:0x08] = checksum.to_bytes(2, "little") return packet def _decode(self, response: bytes) -> dict: """Decode a message.""" - check_error(response[0x22:0x24]) + e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + maxworktime: Optional[int] = None, + maxworktime1: Optional[int] = None, + maxworktime2: Optional[int] = None, + idcbrightness: Optional[int] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if maxworktime is not None: + state["maxworktime"] = maxworktime + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(14) + data = json.dumps(state).encode() + length = 12 + len(data) + struct.pack_into( + " dict: + """Decode a message.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if maxworktime3 is not None: + state["maxworktime3"] = maxworktime3 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + if childlock is not None: + state["childlock"] = int(bool(childlock)) + if childlock1 is not None: + state["childlock1"] = int(bool(childlock1)) + if childlock2 is not None: + state["childlock2"] = int(bool(childlock2)) + if childlock3 is not None: + state["childlock3"] = int(bool(childlock3)) + if childlock4 is not None: + state["childlock4"] = int(bool(childlock4)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + +class mp1(Device): + """Controls a Broadlink MP1.""" + + TYPE = "MP1" + + def set_power_mask(self, sid_mask: int, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0x00] = 0x0D + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask) + packet[0x07] = 0xC0 + packet[0x08] = 0x02 + packet[0x0A] = 0x03 + packet[0x0D] = sid_mask + packet[0x0E] = sid_mask if pwr else 0 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def set_power(self, sid: int, pwr: bool) -> None: + """Set the power state of the device.""" + sid_mask = 0x01 << (sid - 1) + self.set_power_mask(sid_mask, pwr) + + def check_power_raw(self) -> int: + """Return the power state of the device in raw format.""" + packet = bytearray(16) + packet[0x00] = 0x0A + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xAE + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x0E] + + def check_power(self) -> dict: + """Return the power state of the device.""" + data = self.check_power_raw() + return { + "s1": bool(data & 1), + "s2": bool(data & 2), + "s3": bool(data & 4), + "s4": bool(data & 8), + } + + +class mp1s(mp1): + """Controls a Broadlink MP1S.""" + + TYPE = "MP1S" + + def get_state(self) -> dict: + """Return the power state of the device. + + voltage in V. + current in A. + power in W. + power consumption in kW·h. + """ + packet = bytearray(16) + packet[0x00] = 0x0E + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + packet[0x0A] = 0x04 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + payload_str = payload.hex()[4:-6] + + def get_value(start, end, factors): + value = sum( + int(payload_str[i-2:i]) * factor + for i, factor in zip(range(start, end, -2), factors) + ) + return value + + return { + "volt": get_value(34, 30, [10, 0.1]), + "current": get_value(40, 34, [1, 0.01, 0.0001]), + "power": get_value(46, 40, [100, 1, 0.01]), + "totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]), + } diff --git a/cli/README.md b/cli/README.md index 7e229e3e..b7e48dc9 100644 --- a/cli/README.md +++ b/cli/README.md @@ -1,37 +1,29 @@ Command line interface for python-broadlink =========================================== -This is a command line interface for broadlink python library - -Tested with BroadLink RMPRO / RM2 +This is a command line interface for the python-broadlink API. Requirements ------------ -You should have the broadlink python installed, this can be made in many linux distributions using : +You need to install the module first: ``` -sudo pip install broadlink +pip3 install broadlink ``` Installation ----------- -Just copy this files +Download "broadlink_cli" and "broadlink_discovery". Programs -------- +* broadlink_discovery: Discover Broadlink devices connected to the local network. - -* broadlink_discovery -used to run the discovery in the network -this program withh show the command line parameters to be used with -broadlink_cli to select broadlink device - -* broadlink_cli -used to send commands and query the broadlink device +* broadlink_cli: Send commands and query the Broadlink device. -device specification formats +Device specification formats ---------------------------- Using separate parameters for each information: @@ -48,38 +40,99 @@ Using file with parameters: ``` broadlink_cli --device @BEDROOM.device --temp ``` -This is prefered as the configuration is stored in file and you can change -just a file to point to a different hardware +This is prefered as the configuration is stored in a file and you can change +it later to point to a different device. -Sample usage ------------- +Example usage +------------- + +### Common commands -Learn commands : +#### Join device to the Wi-Fi network +``` +broadlink_cli --joinwifi SSID PASSWORD +``` + +#### Discover devices connected to the local network +``` +broadlink_discovery +``` + +### Universal remotes + +#### Learn IR code and show at console ``` -# Learn and save to file -broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power -# LEard and show at console broadlink_cli --device @BEDROOM.device --learn ``` +#### Learn RF code and show at console +``` +broadlink_cli --device @BEDROOM.device --rfscanlearn +``` + +#### Learn IR code and save to file +``` +broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power +``` + +#### Learn RF code and save to file +``` +broadlink_cli --device @BEDROOM.device --rfscanlearn --learnfile LG-TV.power +``` + +#### Send code +``` +broadlink_cli --device @BEDROOM.device --send DATA +``` -Send command : +#### Send code from file ``` broadlink_cli --device @BEDROOM.device --send @LG-TV.power -broadlink_cli --device @BEDROOM.device --send ....datafromlearncommand... ``` -Get Temperature : +#### Check temperature ``` broadlink_cli --device @BEDROOM.device --temperature ``` -Get Energy Consumption (For a SmartPlug) : +#### Check humidity ``` -broadlink_cli --device @BEDROOM.device --energy +broadlink_cli --device @BEDROOM.device --humidity +``` + +### Smart plugs + +#### Turn on +``` +broadlink_cli --device @BEDROOM.device --turnon +``` + +#### Turn off +``` +broadlink_cli --device @BEDROOM.device --turnoff ``` -Once joined to the Broadlink provisioning Wi-Fi, configure it with your Wi-Fi details: +#### Turn on nightlight +``` +broadlink_cli --device @BEDROOM.device --turnnlon ``` -broadlink_cli --joinwifi MySSID MyWifiPassword + +#### Turn off nightlight +``` +broadlink_cli --device @BEDROOM.device --turnnloff +``` + +#### Check power state +``` +broadlink_cli --device @BEDROOM.device --check +``` + +#### Check nightlight state +``` +broadlink_cli --device @BEDROOM.device --checknl +``` + +#### Check power consumption +``` +broadlink_cli --device @BEDROOM.device --energy ``` diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 31bbb2c6..7913e332 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,67 +1,32 @@ #!/usr/bin/env python3 import argparse import base64 -import codecs import time +from typing import List import broadlink +from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError +from broadlink.remote import data_to_pulses, pulses_to_data -TICK = 32.84 TIMEOUT = 30 -IR_TOKEN = 0x26 def auto_int(x): return int(x, 0) -def to_microseconds(bytes): - result = [] - # print bytes[0] # 0x26 = 38for IR - index = 4 - while index < len(bytes): - chunk = bytes[index] - index += 1 - if chunk == 0: - chunk = bytes[index] - chunk = 256 * chunk + bytes[index + 1] - index += 2 - result.append(int(round(chunk * TICK))) - if chunk == 0x0d05: - break - return result - - -def durations_to_broadlink(durations): - result = bytearray() - result.append(IR_TOKEN) - result.append(0) - result.append(len(durations) % 256) - result.append(len(durations) / 256) - for dur in durations: - num = int(round(dur / TICK)) - if num > 255: - result.append(0) - result.append(num / 256) - result.append(num % 256) - return result - - -def format_durations(data): - result = '' - for i in range(0, len(data)): - if len(result) > 0: - result += ' ' - result += ('+' if i % 2 == 0 else '-') + str(data[i]) - return result +def format_pulses(pulses: List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) -def parse_durations(str): - result = [] - for s in str.split(): - result.append(abs(int(s))) - return result +def parse_pulses(data: List[str]) -> List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] parser = argparse.ArgumentParser(fromfile_prefix_chars='@') @@ -70,6 +35,7 @@ parser.add_argument("--type", type=auto_int, default=0x2712, help="type of devic parser.add_argument("--host", help="host address") parser.add_argument("--mac", help="mac address (hex reverse), as used by python-broadlink library") parser.add_argument("--temperature", action="store_true", help="request temperature from device") +parser.add_argument("--humidity", action="store_true", help="request humidity from device") parser.add_argument("--energy", action="store_true", help="request energy consumption from device") parser.add_argument("--check", action="store_true", help="check current power state") parser.add_argument("--checknl", action="store_true", help="check current nightlight state") @@ -81,7 +47,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on parser.add_argument("--send", action="store_true", help="send command") parser.add_argument("--sensors", action="store_true", help="check all sensors") parser.add_argument("--learn", action="store_true", help="learn command") -parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning") +parser.add_argument("--rflearn", action="store_true", help="rf scan learning") +parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") parser.add_argument("--learnfile", help="save learned command to a specified file") parser.add_argument("--durations", action="store_true", help="use durations in micro seconds instead of the Broadlink format") @@ -92,16 +59,16 @@ args = parser.parse_args() if args.device: values = args.device.split() - type = int(values[0], 0) + devtype = int(values[0], 0) host = values[1] mac = bytearray.fromhex(values[2]) elif args.mac: - type = args.type + devtype = args.type host = args.host mac = bytearray.fromhex(args.mac) if args.host or args.device: - dev = broadlink.gendevice(type, (host, 80), mac) + dev = broadlink.gendevice(devtype, (host, DEFAULT_PORT), mac) dev.auth() if args.joinwifi: @@ -109,25 +76,26 @@ if args.joinwifi: if args.convert: data = bytearray.fromhex(''.join(args.data)) - durations = to_microseconds(data) - print(format_durations(durations)) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) if args.temperature: print(dev.check_temperature()) +if args.humidity: + print(dev.check_humidity()) if args.energy: print(dev.get_energy()) if args.sensors: - try: - data = dev.check_sensors() - except: - data = {} - data['temperature'] = dev.check_temperature() + data = dev.check_sensors() for key in data: print("{} {}".format(key, data[key])) if args.send: - data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ - if args.durations else bytearray.fromhex(''.join(args.data)) + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) dev.send_data(data) -if args.learn or (args.learnfile and not args.rfscanlearn): +if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() print("Learning...") start = time.time() @@ -143,17 +111,19 @@ if args.learn or (args.learnfile and not args.rfscanlearn): print("No data received...") exit(1) - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learn: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: " + str(base64.b64encode(decode_hex(learned)[0]))) + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) if args.check: if dev.check_power(): print('* ON *') @@ -195,28 +165,33 @@ if args.switch: else: dev.set_power(True) print('* Switch to ON *') -if args.rfscanlearn: - dev.sweep_frequency() - print("Learning RF Frequency, press and hold the button to learn...") - - start = time.time() - while time.time() - start < TIMEOUT: - time.sleep(1) - if dev.check_frequency(): - break +if args.rflearn: + if args.frequency: + frequency = args.frequency + print("Press the button you want to learn, a short press...") else: - print("RF Frequency not found") - dev.cancel_sweep_frequency() - exit(1) + dev.sweep_frequency() + print("Detecting radiofrequency, press and hold the button to learn...") + + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + locked, frequency = dev.check_frequency() + if locked: + break + else: + print("Radiofrequency not found") + dev.cancel_sweep_frequency() + exit(1) - print("Found RF Frequency - 1 of 2!") - print("You can now let go of the button") + print("Radiofrequency detected: {}MHz".format(frequency)) + print("You can now let go of the button") - input("Press enter to continue...") + input("Press enter to continue...") - print("To complete learning, single press the button you want to learn") + print("Press the button again, now a short press.") - dev.find_rf_packet() + dev.find_rf_packet(frequency) start = time.time() while time.time() - start < TIMEOUT: @@ -231,15 +206,16 @@ if args.rfscanlearn: print("No data received...") exit(1) - print("Found RF Frequency - 2 of 2!") - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learnfile is None: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) - if args.learnfile is not None: + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) diff --git a/cli/broadlink_discovery b/cli/broadlink_discovery index c098c91e..477e1bd7 100755 --- a/cli/broadlink_discovery +++ b/cli/broadlink_discovery @@ -2,12 +2,13 @@ import argparse import broadlink +from broadlink.const import DEFAULT_BCAST_ADDR, DEFAULT_TIMEOUT from broadlink.exceptions import StorageError parser = argparse.ArgumentParser(fromfile_prefix_chars='@') -parser.add_argument("--timeout", type=int, default=5, help="timeout to wait for receiving discovery responses") +parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="timeout to wait for receiving discovery responses") parser.add_argument("--ip", default=None, help="ip address to use in the discovery") -parser.add_argument("--dst-ip", default="255.255.255.255", help="destination ip address to use in the discovery") +parser.add_argument("--dst-ip", default=DEFAULT_BCAST_ADDR, help="destination ip address to use in the discovery") args = parser.parse_args() print("Discovering...") diff --git a/requirements.txt b/requirements.txt index 09f445bf..2c6c996c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -cryptography==2.6.1 +cryptography==3.2 diff --git a/setup.py b/setup.py index f662cc64..0426f148 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools import setup, find_packages -version = '0.16.0' +version = '0.19.0' setup( name="broadlink", @@ -15,8 +15,8 @@ url="http://github.com/mjg59/python-broadlink", packages=find_packages(), scripts=[], - install_requires=["cryptography>=2.1.1"], - description="Python API for controlling Broadlink IR controllers", + install_requires=["cryptography>=3.2"], + description="Python API for controlling Broadlink devices", classifiers=[ "Development Status :: 4 - Beta", "Intended Audience :: Developers",