diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 00000000..de98d3df --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,55 @@ + +## Context + + + +## Proposed change + + + +## Type of change + + +- [ ] Dependency upgrade +- [ ] Bugfix (non-breaking change which fixes an issue) +- [ ] New device +- [ ] New product id (the device is already supported with a different id) +- [ ] New feature (which adds functionality to an existing device) +- [ ] Breaking change (fix/feature causing existing functionality to break) +- [ ] Code quality improvements to existing code or addition of tests +- [ ] Documentation + +## Additional information + + +- This PR fixes issue: fixes # +- This PR is related to: +- Link to documentation pull request: + +## Checklist + + +- [ ] The code change is tested and works locally. +- [ ] The code has been formatted using Black. +- [ ] The code follows the [Zen of Python](https://www.python.org/dev/peps/pep-0020/). +- [ ] I am creating the Pull Request against the correct branch. +- [ ] Documentation added/updated. diff --git a/.github/workflows/flake8.yaml b/.github/workflows/flake8.yaml new file mode 100644 index 00000000..aa09a19c --- /dev/null +++ b/.github/workflows/flake8.yaml @@ -0,0 +1,33 @@ +name: Python flake8 + +on: + push: + branches: [ master, dev ] + pull_request: + branches: [ master, dev ] + +jobs: + test: + runs-on: ubuntu-20.04 + strategy: + matrix: + python-version: [3.6, 3.7, 3.8, 3.9] + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install wheel + pip install flake8 flake8-quotes + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. ignore magic numbers and use double quotes and ignore numbers with zeroes before them. + # and ignore lowercase hex numbers and ignore isort incorrect imports + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=90 --ignore=WPS432,WPS339,WPS341,I --inline-quotes double --statistics diff --git a/README.md b/README.md index 74473b8e..81c6de5b 100644 --- a/README.md +++ b/README.md @@ -1,90 +1,247 @@ -Python control for Broadlink RM2 IR controllers -=============================================== +# python-broadlink -A simple Python API for controlling IR controllers from [Broadlink](http://www.ibroadlink.com/rm/). At present, the following devices are currently supported: +A Python module and CLI for controlling Broadlink devices locally. The following devices are supported: -* RM Pro (referred to as RM2 in the codebase) -* A1 sensor platform devices are supported -* RM3 mini IR blaster +- **Universal remotes**: RM home, RM mini 3, RM plus, RM pro, RM pro+, RM4 mini, RM4 pro, RM4C mini, RM4S, RM4 TV mate +- **Smart plugs**: SP mini, SP mini 3, SP mini+, SP1, SP2, SP2-BR, SP2-CL, SP2-IN, SP2-UK, SP3, SP3-EU, SP3S-EU, SP3S-US, SP4L-AU, SP4L-EU, SP4L-UK, SP4M, SP4M-US, Ankuoo NEO, Ankuoo NEO PRO, Efergy Ego, BG AHC/U-01 +- **Switches**: MCB1, SC1, SCB1E, SCB2 +- **Outlets**: BG 800, BG 900 +- **Power strips**: MP1-1K3S2U, MP1-1K4S, MP2 +- **Environment sensors**: A1 +- **Alarm kits**: S1C, S2KIT +- **Light bulbs**: LB1, LB26 R1, LB27 R1, SB800TD +- **Curtain motors**: Dooya DT360E-45/20 +- **Thermostats**: Hysen HY02B05H +- **Hubs**: S3 -There is currently no support for the cloud API. +## Installation -Example use ------------ +Use pip3 to install the latest version of this module. -Setup a new device on your local wireless network: +``` +pip3 install broadlink +``` + +## Basic functions -1. Put the device into AP Mode - 1. Long press the reset button until the blue LED is blinking quickly. - 2. Long press again until blue LED is blinking slowly. - 3. Manually connect to the WiFi SSID named BroadlinkProv. -2. Run setup() and provide your ssid, network password (if secured), and set the security mode - 1. Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) +First, open Python 3 and import this module. + +``` +python3 ``` +```python3 import broadlink +``` +Now let's try some functions... + +### Setup + +In order to control the device, you need to connect it to your local network. If you have already configured the device with the Broadlink app, this step is not necessary. + +1. Put the device into AP Mode. + - Long press the reset button until the blue LED is blinking quickly. + - Long press again until blue LED is blinking slowly. + - Manually connect to the WiFi SSID named BroadlinkProv. +2. Connect the device to your local network with the setup function. +```python3 broadlink.setup('myssid', 'mynetworkpass', 3) ``` -Discover available devices on the local network: +Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) + +#### Advanced options + +You may need to specify a broadcast address if setup is not working. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255') ``` -import broadlink -devices = broadlink.discover(timeout=5) +### Discovery + +Use this function to discover devices: + +```python3 +devices = broadlink.discover() ``` -Obtain the authentication key required for further communication: +#### Advanced options +You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. + +Using the IP address of your local machine: +```python3 +devices = broadlink.discover(local_ip_address='192.168.0.100') ``` -devices[0].auth() + +Using the broadcast address of your subnet: +```python3 +devices = broadlink.discover(discover_ip_address='192.168.0.255') ``` -Enter learning mode: +If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery: +```python3 +device = broadlink.hello('192.168.0.16') ``` -devices[0].enter_learning() + +If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly: +```python3 +for device in broadlink.xdiscover(): + print(device) # Example action. Do whatever you want here. ``` -Obtain an IR or RF packet while in learning mode: +### Authentication +After discovering the device, call the `auth()` method to obtain the authentication key required for further communication: +```python3 +device.auth() +``` + +The next steps depend on the type of device you want to control. + +## Universal remotes + +### Learning IR codes + +Learning IR codes takes place in three steps. + +1. Enter learning mode: +```python3 +device.enter_learning() ``` -ir_packet = devices[0].check_data() +2. When the LED blinks, point the remote at the Broadlink device and press the button you want to learn. +3. Get the IR packet. +```python3 +packet = device.check_data() ``` -(This will return None if the device does not have a packet to return) -Send an IR or RF packet: +### Learning RF codes + +Learning RF codes takes place in six steps. + +1. Sweep the frequency: +```python3 +device.sweep_frequency() +``` +2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn. +3. Check if the frequency was successfully identified: +```python3 +ok = device.check_frequency() +if ok: + print('Frequency found!') ``` -devices[0].send_data(ir_packet) +4. Enter learning mode: +```python3 +device.find_rf_packet() ``` +5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. +6. Get the RF packet: +```python3 +packet = device.check_data() +``` + +#### Notes + +Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`. -Obtain temperature data from an RM2: +### Canceling learning + +You can exit the learning mode in the middle of the process by calling this method: +```python3 +device.cancel_sweep_frequency() ``` -devices[0].check_temperature() + +### Sending IR/RF packets +```python3 +device.send_data(packet) ``` -Obtain sensor data from an A1: +### Fetching sensor data +```python3 +data = device.check_sensors() +``` + +## Switches + +### Setting power state +```python3 +device.set_power(True) +device.set_power(False) ``` -data = devices[0].check_sensors() + +### Checking power state +```python3 +state = device.check_power() ``` -Set power state on a SmartPlug SP2/SP3: +### Checking energy consumption +```python3 +state = device.get_energy() ``` -devices[0].set_power(True) + +## Power strips + +### Setting power state +```python3 +device.set_power(1, True) # Example socket. It could be 2 or 3. +device.set_power(1, False) ``` -Check power state on a SmartPlug: +### Checking power state +```python3 +state = device.check_power() ``` -state = devices[0].check_power() + +## Light bulbs + +### Fetching data +```python3 +state = device.get_state() ``` -Check energy consumption on a SmartPlug: +### Setting state attributes +```python3 +devices[0].set_state(pwr=0) +devices[0].set_state(pwr=1) +devices[0].set_state(brightness=75) +devices[0].set_state(bulb_colormode=0) +devices[0].set_state(blue=255) +devices[0].set_state(red=0) +devices[0].set_state(green=128) +devices[0].set_state(bulb_colormode=1) ``` -state = devices[0].get_energy() + +## Environment sensors + +### Fetching sensor data +```python3 +data = device.check_sensors() ``` -Set power state for S1 on a SmartPowerStrip MP1: +## Hubs + +### Discovering subdevices +```python3 +device.get_subdevices() ``` -devices[0].set_power(1, True) + +### Fetching data +Use the DID obtained from get_subdevices() for the input parameter to query specific sub-device. + +```python3 +device.get_state(did="00000000000000000000a043b0d06963") ``` -Check power state on a SmartPowerStrip: +### Setting state attributes +The parameters depend on the type of subdevice that is being controlled. In this example, we are controlling LC-1 switches: + +#### Turn on +```python3 +device.set_state(did="00000000000000000000a043b0d0783a", pwr=1) +device.set_state(did="00000000000000000000a043b0d0783a", pwr1=1) +device.set_state(did="00000000000000000000a043b0d0783a", pwr2=1) ``` -state = devices[0].check_power() +#### Turn off +```python3 +device.set_state(did="00000000000000000000a043b0d0783a", pwr=0) +device.set_state(did="00000000000000000000a043b0d0783a", pwr1=0) +device.set_state(did="00000000000000000000a043b0d0783a", pwr2=0) ``` diff --git a/TROUBLESHOOTING.md b/TROUBLESHOOTING.md new file mode 100644 index 00000000..a20765b2 --- /dev/null +++ b/TROUBLESHOOTING.md @@ -0,0 +1,9 @@ +# Troubleshooting + +## Firmware issues + +### AP setup fails with non-alphanumeric passwords + +Some devices ship with firmware that cannot connect to WLANs with non-alphanumeric passwords. To fix this, update the firmware to the latest version. You can also change the password to one with just letters and numbers or create a separate guest network with a simpler password. + +_First seen on Broadlink RM4 pro 0x6026. Already fixed in firmware v52079._ diff --git a/broadlink/__init__.py b/broadlink/__init__.py index 14c1a3bf..d3135501 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,884 +1,333 @@ -#!/usr/bin/python - -from datetime import datetime -try: - from Crypto.Cipher import AES -except ImportError as e: - import pyaes - -import time -import random +#!/usr/bin/env python3 +"""The python-broadlink library.""" import socket -import sys -import threading -import codecs - - -def gendevice(devtype, host, mac): - devices = { - sp1: [0], - sp2: [0x2711, # SP2 - 0x2719, 0x7919, 0x271a, 0x791a, # Honeywell SP2 - 0x2720, # SPMini - 0x753e, # SP3 - 0x7D00, # OEM branded SP3 - 0x947a, 0x9479, # SP3S - 0x2728, # SPMini2 - 0x2733, 0x273e, # OEM branded SPMini - 0x7530, 0x7918, # OEM branded SPMini2 - 0x2736 # SPMiniPlus - ], - rm: [0x2712, # RM2 - 0x2737, # RM Mini - 0x273d, # RM Pro Phicomm - 0x2783, # RM2 Home Plus - 0x277c, # RM2 Home Plus GDT - 0x272a, # RM2 Pro Plus - 0x2787, # RM2 Pro Plus2 - 0x279d, # RM2 Pro Plus3 - 0x27a9, # RM2 Pro Plus_300 - 0x278b, # RM2 Pro Plus BL - 0x2797, # RM2 Pro Plus HYC - 0x27a1, # RM2 Pro Plus R1 - 0x27a6, # RM2 Pro PP - 0x278f, # RM Mini Shate - 0x27c2 # RM Mini 3 - ], - a1: [0x2714], # A1 - mp1: [0x4EB5, # MP1 - 0x4EF7 # Honyar oem mp1 - ], - hysen: [0x4EAD], # Hysen controller - S1C: [0x2722], # S1 (SmartOne Alarm Kit) - dooya: [0x4E4D] # Dooya DT360E (DOOYA_CURTAIN_V2) - } - - # Look for the class associated to devtype in devices - [deviceClass] = [dev for dev in devices if devtype in devices[dev]] or [None] - if deviceClass is None: - return device(host=host, mac=mac, devtype=devtype) - return deviceClass(host=host, mac=mac, devtype=devtype) - -def discover(timeout=None, local_ip_address=None): - if local_ip_address is None: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(('8.8.8.8', 53)) # connecting to a UDP address doesn't send packets - local_ip_address = s.getsockname()[0] - address = local_ip_address.split('.') - cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - cs.bind((local_ip_address,0)) - port = cs.getsockname()[1] - starttime = time.time() - - devices = [] - - timezone = int(time.timezone/-3600) - packet = bytearray(0x30) - - year = datetime.now().year - - if timezone < 0: - packet[0x08] = 0xff + timezone - 1 - packet[0x09] = 0xff - packet[0x0a] = 0xff - packet[0x0b] = 0xff - else: - packet[0x08] = timezone - packet[0x09] = 0 - packet[0x0a] = 0 - packet[0x0b] = 0 - packet[0x0c] = year & 0xff - packet[0x0d] = year >> 8 - packet[0x0e] = datetime.now().minute - packet[0x0f] = datetime.now().hour - subyear = str(year)[2:] - packet[0x10] = int(subyear) - packet[0x11] = datetime.now().isoweekday() - packet[0x12] = datetime.now().day - packet[0x13] = datetime.now().month - packet[0x18] = int(address[0]) - packet[0x19] = int(address[1]) - packet[0x1a] = int(address[2]) - packet[0x1b] = int(address[3]) - packet[0x1c] = port & 0xff - packet[0x1d] = port >> 8 - packet[0x26] = 6 - checksum = 0xbeaf - - for i in range(len(packet)): - checksum += packet[i] - checksum = checksum & 0xffff - packet[0x20] = checksum & 0xff - packet[0x21] = checksum >> 8 - - cs.sendto(packet, ('255.255.255.255', 80)) - if timeout is None: - response = cs.recvfrom(1024) - responsepacket = bytearray(response[0]) - host = response[1] - mac = responsepacket[0x3a:0x40] - devtype = responsepacket[0x34] | responsepacket[0x35] << 8 - - - return gendevice(devtype, host, mac) - else: - while (time.time() - starttime) < timeout: - cs.settimeout(timeout - (time.time() - starttime)) - try: - response = cs.recvfrom(1024) - except socket.timeout: - return devices - responsepacket = bytearray(response[0]) - host = response[1] - devtype = responsepacket[0x34] | responsepacket[0x35] << 8 - mac = responsepacket[0x3a:0x40] - dev = gendevice(devtype, host, mac) - devices.append(dev) - return devices - - - -class device: - def __init__(self, host, mac, devtype, timeout=10): - self.host = host - self.mac = mac - self.devtype = devtype - self.timeout = timeout - self.count = random.randrange(0xffff) - self.key = bytearray([0x09, 0x76, 0x28, 0x34, 0x3f, 0xe9, 0x9e, 0x23, 0x76, 0x5c, 0x15, 0x13, 0xac, 0xcf, 0x8b, 0x02]) - self.iv = bytearray([0x56, 0x2e, 0x17, 0x99, 0x6d, 0x09, 0x3d, 0x28, 0xdd, 0xb3, 0xba, 0x69, 0x5a, 0x2e, 0x6f, 0x58]) - self.id = bytearray([0, 0, 0, 0]) - self.cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - self.cs.bind(('',0)) - self.type = "Unknown" - self.lock = threading.Lock() - - if 'pyaes' in globals(): - self.encrypt = self.encrypt_pyaes - self.decrypt = self.decrypt_pyaes - else: - self.encrypt = self.encrypt_pycrypto - self.decrypt = self.decrypt_pycrypto - - def encrypt_pyaes(self, payload): - aes = pyaes.AESModeOfOperationCBC(self.key, iv = bytes(self.iv)) - return b"".join([aes.encrypt(bytes(payload[i:i+16])) for i in range(0, len(payload), 16)]) - - def decrypt_pyaes(self, payload): - aes = pyaes.AESModeOfOperationCBC(self.key, iv = bytes(self.iv)) - return b"".join([aes.decrypt(bytes(payload[i:i+16])) for i in range(0, len(payload), 16)]) - - def encrypt_pycrypto(self, payload): - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - return aes.encrypt(bytes(payload)) - - def decrypt_pycrypto(self, payload): - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - return aes.decrypt(bytes(payload)) - - def auth(self): - payload = bytearray(0x50) - payload[0x04] = 0x31 - payload[0x05] = 0x31 - payload[0x06] = 0x31 - payload[0x07] = 0x31 - payload[0x08] = 0x31 - payload[0x09] = 0x31 - payload[0x0a] = 0x31 - payload[0x0b] = 0x31 - payload[0x0c] = 0x31 - payload[0x0d] = 0x31 - payload[0x0e] = 0x31 - payload[0x0f] = 0x31 - payload[0x10] = 0x31 - payload[0x11] = 0x31 - payload[0x12] = 0x31 - payload[0x1e] = 0x01 - payload[0x2d] = 0x01 - payload[0x30] = ord('T') - payload[0x31] = ord('e') - payload[0x32] = ord('s') - payload[0x33] = ord('t') - payload[0x34] = ord(' ') - payload[0x35] = ord(' ') - payload[0x36] = ord('1') - - response = self.send_packet(0x65, payload) - - payload = self.decrypt(response[0x38:]) - - if not payload: - return False - - key = payload[0x04:0x14] - if len(key) % 16 != 0: - return False - - self.id = payload[0x00:0x04] - self.key = key - - return True - - def get_type(self): - return self.type - - def send_packet(self, command, payload): - self.count = (self.count + 1) & 0xffff - packet = bytearray(0x38) - packet[0x00] = 0x5a - packet[0x01] = 0xa5 - packet[0x02] = 0xaa - packet[0x03] = 0x55 - packet[0x04] = 0x5a - packet[0x05] = 0xa5 - packet[0x06] = 0xaa - packet[0x07] = 0x55 - packet[0x24] = 0x2a - packet[0x25] = 0x27 - packet[0x26] = command - packet[0x28] = self.count & 0xff - packet[0x29] = self.count >> 8 - packet[0x2a] = self.mac[0] - packet[0x2b] = self.mac[1] - packet[0x2c] = self.mac[2] - packet[0x2d] = self.mac[3] - packet[0x2e] = self.mac[4] - packet[0x2f] = self.mac[5] - packet[0x30] = self.id[0] - packet[0x31] = self.id[1] - packet[0x32] = self.id[2] - packet[0x33] = self.id[3] - - # pad the payload for AES encryption - if len(payload)>0: - numpad=(len(payload)//16+1)*16 - payload=payload.ljust(numpad, b"\x00") - - checksum = 0xbeaf - for i in range(len(payload)): - checksum += payload[i] - checksum = checksum & 0xffff - - payload = self.encrypt(payload) - - packet[0x34] = checksum & 0xff - packet[0x35] = checksum >> 8 - - for i in range(len(payload)): - packet.append(payload[i]) - - checksum = 0xbeaf - for i in range(len(packet)): - checksum += packet[i] - checksum = checksum & 0xffff - packet[0x20] = checksum & 0xff - packet[0x21] = checksum >> 8 - - starttime = time.time() - with self.lock: - while True: - try: - self.cs.sendto(packet, self.host) - self.cs.settimeout(1) - response = self.cs.recvfrom(2048) - break - except socket.timeout: - if (time.time() - starttime) > self.timeout: - raise - return bytearray(response[0]) - - -class mp1(device): - def __init__ (self, host, mac, devtype): - device.__init__(self, host, mac, devtype) - self.type = "MP1" - - def set_power_mask(self, sid_mask, state): - """Sets the power state of the smart power strip.""" - - packet = bytearray(16) - packet[0x00] = 0x0d - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xb2 + ((sid_mask<<1) if state else sid_mask) - packet[0x07] = 0xc0 - packet[0x08] = 0x02 - packet[0x0a] = 0x03 - packet[0x0d] = sid_mask - packet[0x0e] = sid_mask if state else 0 - - response = self.send_packet(0x6a, packet) - - err = response[0x22] | (response[0x23] << 8) - - def set_power(self, sid, state): - """Sets the power state of the smart power strip.""" - sid_mask = 0x01 << (sid - 1) - return self.set_power_mask(sid_mask, state) - - def check_power_raw(self): - """Returns the power state of the smart power strip in raw format.""" - packet = bytearray(16) - packet[0x00] = 0x0a - packet[0x02] = 0xa5 - packet[0x03] = 0xa5 - packet[0x04] = 0x5a - packet[0x05] = 0x5a - packet[0x06] = 0xae - packet[0x07] = 0xc0 - packet[0x08] = 0x01 - - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - payload = self.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - state = payload[0x0e] - else: - state = ord(payload[0x0e]) - return state - - def check_power(self): - """Returns the power state of the smart power strip.""" - state = self.check_power_raw() - data = {} - data['s1'] = bool(state & 0x01) - data['s2'] = bool(state & 0x02) - data['s3'] = bool(state & 0x04) - data['s4'] = bool(state & 0x08) - return data - - -class sp1(device): - def __init__ (self, host, mac, devtype): - device.__init__(self, host, mac, devtype) - self.type = "SP1" - - def set_power(self, state): - packet = bytearray(4) - packet[0] = state - self.send_packet(0x66, packet) - - -class sp2(device): - def __init__ (self, host, mac, devtype): - device.__init__(self, host, mac, devtype) - self.type = "SP2" - - def set_power(self, state): - """Sets the power state of the smart plug.""" - packet = bytearray(16) - packet[0] = 2 - if self.check_nightlight(): - packet[4] = 3 if state else 2 - else: - packet[4] = 1 if state else 0 - self.send_packet(0x6a, packet) - - def set_nightlight(self, state): - """Sets the night light state of the smart plug""" - packet = bytearray(16) - packet[0] = 2 - if self.check_power(): - packet[4] = 3 if state else 1 - else: - packet[4] = 2 if state else 0 - self.send_packet(0x6a, packet) - - def check_power(self): - """Returns the power state of the smart plug.""" - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - payload = self.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - if payload[0x4] == 1 or payload[0x4] == 3 or payload[0x4] == 0xFD: - state = True - else: - state = False - else: - if ord(payload[0x4]) == 1 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFD: - state = True - else: - state = False - return state - - def check_nightlight(self): - """Returns the power state of the smart plug.""" - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - payload = self.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - if payload[0x4] == 2 or payload[0x4] == 3 or payload[0x4] == 0xFF: - state = True - else: - state = False - else: - if ord(payload[0x4]) == 2 or ord(payload[0x4]) == 3 or ord(payload[0x4]) == 0xFF: - state = True - else: - state = False - return state - - def get_energy(self): - packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - payload = self.decrypt(bytes(response[0x38:])) - if type(payload[0x07]) == int: - energy = int(hex(payload[0x07] * 256 + payload[0x06])[2:]) + int(hex(payload[0x05])[2:])/100.0 - else: - energy = int(hex(ord(payload[0x07]) * 256 + ord(payload[0x06]))[2:]) + int(hex(ord(payload[0x05]))[2:])/100.0 - return energy - - -class a1(device): - def __init__ (self, host, mac, devtype): - device.__init__(self, host, mac, devtype) - self.type = "A1" - - def check_sensors(self): - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - data = {} - payload = self.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - data['temperature'] = (payload[0x4] * 10 + payload[0x5]) / 10.0 - data['humidity'] = (payload[0x6] * 10 + payload[0x7]) / 10.0 - light = payload[0x8] - air_quality = payload[0x0a] - noise = payload[0xc] - else: - data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 - data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0 - light = ord(payload[0x8]) - air_quality = ord(payload[0x0a]) - noise = ord(payload[0xc]) - if light == 0: - data['light'] = 'dark' - elif light == 1: - data['light'] = 'dim' - elif light == 2: - data['light'] = 'normal' - elif light == 3: - data['light'] = 'bright' - else: - data['light'] = 'unknown' - if air_quality == 0: - data['air_quality'] = 'excellent' - elif air_quality == 1: - data['air_quality'] = 'good' - elif air_quality == 2: - data['air_quality'] = 'normal' - elif air_quality == 3: - data['air_quality'] = 'bad' - else: - data['air_quality'] = 'unknown' - if noise == 0: - data['noise'] = 'quiet' - elif noise == 1: - data['noise'] = 'normal' - elif noise == 2: - data['noise'] = 'noisy' - else: - data['noise'] = 'unknown' - return data - - def check_sensors_raw(self): - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - data = {} - payload = self.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - data['temperature'] = (payload[0x4] * 10 + payload[0x5]) / 10.0 - data['humidity'] = (payload[0x6] * 10 + payload[0x7]) / 10.0 - data['light'] = payload[0x8] - data['air_quality'] = payload[0x0a] - data['noise'] = payload[0xc] - else: - data['temperature'] = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 - data['humidity'] = (ord(payload[0x6]) * 10 + ord(payload[0x7])) / 10.0 - data['light'] = ord(payload[0x8]) - data['air_quality'] = ord(payload[0x0a]) - data['noise'] = ord(payload[0xc]) - return data - - -class rm(device): - def __init__ (self, host, mac, devtype): - device.__init__(self, host, mac, devtype) - self.type = "RM2" - - def check_data(self): - packet = bytearray(16) - packet[0] = 4 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - payload = self.decrypt(bytes(response[0x38:])) - return payload[0x04:] - - def send_data(self, data): - packet = bytearray([0x02, 0x00, 0x00, 0x00]) - packet += data - self.send_packet(0x6a, packet) - - def enter_learning(self): - packet = bytearray(16) - packet[0] = 3 - self.send_packet(0x6a, packet) - - def check_temperature(self): - packet = bytearray(16) - packet[0] = 1 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - payload = self.decrypt(bytes(response[0x38:])) - if type(payload[0x4]) == int: - temp = (payload[0x4] * 10 + payload[0x5]) / 10.0 - else: - temp = (ord(payload[0x4]) * 10 + ord(payload[0x5])) / 10.0 - return temp - - -# For legacy compatibility - don't use this -class rm2(rm): - def __init__ (self): - device.__init__(self, None, None, None) - - def discover(self): - dev = discover() - self.host = dev.host - self.mac = dev.mac - - -class hysen(device): - def __init__ (self, host, mac, devtype): - device.__init__(self, host, mac, devtype) - self.type = "Hysen heating controller" - - # Send a request - # input_payload should be a bytearray, usually 6 bytes, e.g. bytearray([0x01,0x06,0x00,0x02,0x10,0x00]) - # Returns decrypted payload - # New behaviour: raises a ValueError if the device response indicates an error or CRC check fails - # The function prepends length (2 bytes) and appends CRC - def send_request(self,input_payload): - - from PyCRC.CRC16 import CRC16 - crc = CRC16(modbus_flag=True).calculate(bytes(input_payload)) - - # first byte is length, +2 for CRC16 - request_payload = bytearray([len(input_payload) + 2,0x00]) - request_payload.extend(input_payload) - - # append CRC - request_payload.append(crc & 0xFF) - request_payload.append((crc >> 8) & 0xFF) - - # send to device - response = self.send_packet(0x6a, request_payload) - - # check for error - err = response[0x22] | (response[0x23] << 8) - if err: - raise ValueError('broadlink_response_error',err) - - response_payload = bytearray(self.decrypt(bytes(response[0x38:]))) - - # experimental check on CRC in response (first 2 bytes are len, and trailing bytes are crc) - response_payload_len = response_payload[0] - if response_payload_len + 2 > len(response_payload): - raise ValueError('hysen_response_error','first byte of response is not length') - crc = CRC16(modbus_flag=True).calculate(bytes(response_payload[2:response_payload_len])) - if (response_payload[response_payload_len] == crc & 0xFF) and (response_payload[response_payload_len+1] == (crc >> 8) & 0xFF): - return response_payload[2:response_payload_len] - else: - raise ValueError('hysen_response_error','CRC check on response failed') - - - # Get current room temperature in degrees celsius - def get_temp(self): - payload = self.send_request(bytearray([0x01,0x03,0x00,0x00,0x00,0x08])) - return payload[0x05] / 2.0 - - # Get current external temperature in degrees celsius - def get_external_temp(self): - payload = self.send_request(bytearray([0x01,0x03,0x00,0x00,0x00,0x08])) - return payload[18] / 2.0 - - # Get full status (including timer schedule) - def get_full_status(self): - payload = self.send_request(bytearray([0x01,0x03,0x00,0x00,0x00,0x16])) - data = {} - data['remote_lock'] = payload[3] & 1 - data['power'] = payload[4] & 1 - data['active'] = (payload[4] >> 4) & 1 - data['temp_manual'] = (payload[4] >> 6) & 1 - data['room_temp'] = (payload[5] & 255)/2.0 - data['thermostat_temp'] = (payload[6] & 255)/2.0 - data['auto_mode'] = payload[7] & 15 - data['loop_mode'] = (payload[7] >> 4) & 15 - data['sensor'] = payload[8] - data['osv'] = payload[9] - data['dif'] = payload[10] - data['svh'] = payload[11] - data['svl'] = payload[12] - data['room_temp_adj'] = ((payload[13] << 8) + payload[14])/2.0 - if data['room_temp_adj'] > 32767: - data['room_temp_adj'] = 32767 - data['room_temp_adj'] - data['fre'] = payload[15] - data['poweron'] = payload[16] - data['unknown'] = payload[17] - data['external_temp'] = (payload[18] & 255)/2.0 - data['hour'] = payload[19] - data['min'] = payload[20] - data['sec'] = payload[21] - data['dayofweek'] = payload[22] - - weekday = [] - for i in range(0, 6): - weekday.append({'start_hour':payload[2*i + 23], 'start_minute':payload[2*i + 24],'temp':payload[i + 39]/2.0}) - - data['weekday'] = weekday - weekend = [] - for i in range(6, 8): - weekend.append({'start_hour':payload[2*i + 23], 'start_minute':payload[2*i + 24],'temp':payload[i + 39]/2.0}) - - data['weekend'] = weekend - return data - - # Change controller mode - # auto_mode = 1 for auto (scheduled/timed) mode, 0 for manual mode. - # Manual mode will activate last used temperature. In typical usage call set_temp to activate manual control and set temp. - # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] - # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday follow the "weekend" schedule - # loop_mode = 2 ("1234567") means every day (including Saturday and Sunday) follows the "weekday" schedule - # The sensor command is currently experimental - def set_mode(self, auto_mode, loop_mode,sensor=0): - mode_byte = ( (loop_mode + 1) << 4) + auto_mode - # print 'Mode byte: 0x'+ format(mode_byte, '02x') - self.send_request(bytearray([0x01,0x06,0x00,0x02,mode_byte,sensor])) - - # Advanced settings - # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, 2 for internal control temperature, external limit temperature. Factory default: 0. - # Set temperature range for external sensor (OSV) osv = 5..99. Factory default: 42C - # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C - # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C - # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C - # Actual temperature calibration (AdJ) adj = -0.5. Prescision 0.1C - # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, 1 for anti-freezing function open. Factory default: 0 - # Power on memory (POn) poweron = 0 for power on memory off, 1 for power on memory on. Factory default: 0 - def set_advanced(self, loop_mode, sensor, osv, dif, svh, svl, adj, fre, poweron): - input_payload = bytearray([0x01,0x10,0x00,0x02,0x00,0x05,0x0a, loop_mode, sensor, osv, dif, svh, svl, (int(adj*2)>>8 & 0xff), (int(adj*2) & 0xff), fre, poweron]) - self.send_request(input_payload) - - # For backwards compatibility only. Prefer calling set_mode directly. Note this function invokes loop_mode=0 and sensor=0. - def switch_to_auto(self): - self.set_mode(auto_mode=1, loop_mode=0) - - def switch_to_manual(self): - self.set_mode(auto_mode=0, loop_mode=0) - - # Set temperature for manual mode (also activates manual mode if currently in automatic) - def set_temp(self, temp): - self.send_request(bytearray([0x01,0x06,0x00,0x01,0x00,int(temp * 2)]) ) - - # Set device on(1) or off(0), does not deactivate Wifi connectivity. Remote lock disables control by buttons on thermostat. - def set_power(self, power=1, remote_lock=0): - self.send_request(bytearray([0x01,0x06,0x00,0x00,remote_lock,power]) ) - - # set time on device - # n.b. day=1 is Monday, ..., day=7 is Sunday - def set_time(self, hour, minute, second, day): - self.send_request(bytearray([0x01,0x10,0x00,0x08,0x00,0x02,0x04, hour, minute, second, day ])) - - # Set timer schedule - # Format is the same as you get from get_full_status. - # weekday is a list (ordered) of 6 dicts like: - # {'start_hour':17, 'start_minute':30, 'temp': 22 } - # Each one specifies the thermostat temp that will become effective at start_hour:start_minute - # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self,weekday,weekend): - # Begin with some magic values ... - input_payload = bytearray([0x01,0x10,0x00,0x0a,0x00,0x0c,0x18]) - - # Now simply append times/temps - # weekday times - for i in range(0, 6): - input_payload.append( weekday[i]['start_hour'] ) - input_payload.append( weekday[i]['start_minute'] ) - - # weekend times - for i in range(0, 2): - input_payload.append( weekend[i]['start_hour'] ) - input_payload.append( weekend[i]['start_minute'] ) - - # weekday temperatures - for i in range(0, 6): - input_payload.append( int(weekday[i]['temp'] * 2) ) - - # weekend temperatures - for i in range(0, 2): - input_payload.append( int(weekend[i]['temp'] * 2) ) - - self.send_request(input_payload) - - -S1C_SENSORS_TYPES = { - 0x31: 'Door Sensor', # 49 as hex - 0x91: 'Key Fob', # 145 as hex, as serial on fob corpse - 0x21: 'Motion Sensor' # 33 as hex +from typing import Generator, List, Optional, Tuple, Union + +from . import exceptions as e +from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT +from .alarm import S1C +from .climate import hvac, hysen +from .cover import dooya, dooya2, wser +from .device import Device, ping, scan +from .hub import s3 +from .light import lb1, lb2 +from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro +from .sensor import a1, a2 +from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b + +SUPPORTED_TYPES = { + sp1: { + 0x0000: ("SP1", "Broadlink"), + }, + sp2: { + 0x2717: ("NEO", "Ankuoo"), + 0x2719: ("SP2-compatible", "Honeywell"), + 0x271A: ("SP2-compatible", "Honeywell"), + 0x2720: ("SP mini", "Broadlink"), + 0x2728: ("SP2-compatible", "URANT"), + 0x273E: ("SP mini", "Broadlink"), + 0x7530: ("SP2", "Broadlink (OEM)"), + 0x7539: ("SP2-IL", "Broadlink (OEM)"), + 0x753E: ("SP mini 3", "Broadlink"), + 0x7540: ("MP2", "Broadlink"), + 0x7544: ("SP2-CL", "Broadlink"), + 0x7546: ("SP2-UK/BR/IN", "Broadlink (OEM)"), + 0x7547: ("SC1", "Broadlink"), + 0x7549: ("SP mini 3", "Broadlink (OEM)"), + 0x7918: ("SP2", "Broadlink (OEM)"), + 0x7919: ("SP2-compatible", "Honeywell"), + 0x791A: ("SP2-compatible", "Honeywell"), + 0x7D0D: ("SP mini 3", "Broadlink (OEM)"), + }, + sp2s: { + 0x2711: ("SP2", "Broadlink"), + 0x2716: ("NEO PRO", "Ankuoo"), + 0x271D: ("Ego", "Efergy"), + 0x2736: ("SP mini+", "Broadlink"), + }, + sp3: { + 0x2733: ("SP3", "Broadlink"), + 0x7D00: ("SP3-EU", "Broadlink (OEM)"), + }, + sp3s: { + 0x9479: ("SP3S-US", "Broadlink"), + 0x947A: ("SP3S-EU", "Broadlink"), + }, + sp4: { + 0x7568: ("SP4L-CN", "Broadlink"), + 0x756B: ("SP4M-JP", "Broadlink"), + 0x756C: ("SP4M", "Broadlink"), + 0x756F: ("MCB1", "Broadlink"), + 0x7579: ("SP4L-EU", "Broadlink"), + 0x757B: ("SP4L-AU", "Broadlink"), + 0x7583: ("SP mini 3", "Broadlink"), + 0x7587: ("SP4L-UK", "Broadlink"), + 0x7D11: ("SP mini 3", "Broadlink"), + 0xA4F9: ("WS4", "Broadlink (OEM)"), + 0xA569: ("SP4L-UK", "Broadlink"), + 0xA56A: ("MCB1", "Broadlink"), + 0xA56B: ("SCB1E", "Broadlink"), + 0xA56C: ("SP4L-EU", "Broadlink"), + 0xA576: ("SP4L-AU", "Broadlink"), + 0xA589: ("SP4L-UK", "Broadlink"), + 0xA5D3: ("SP4L-EU", "Broadlink"), + 0xA6F4: ("SP4D-US", "Broadlink"), + }, + sp4b: { + 0x5115: ("SCB1E", "Broadlink"), + 0x51E2: ("AHC/U-01", "BG Electrical"), + 0x6111: ("MCB1", "Broadlink"), + 0x6113: ("SCB1E", "Broadlink"), + 0x618B: ("SP4L-EU", "Broadlink"), + 0x6489: ("SP4L-AU", "Broadlink"), + 0x648B: ("SP4M-US", "Broadlink"), + 0x648C: ("SP4L-US", "Broadlink"), + 0x6494: ("SCB2", "Broadlink"), + }, + rmmini: { + 0x2737: ("RM mini 3", "Broadlink"), + 0x278F: ("RM mini", "Broadlink"), + 0x27B7: ("RM mini 3", "Broadlink"), + 0x27C2: ("RM mini 3", "Broadlink"), + 0x27C7: ("RM mini 3", "Broadlink"), + 0x27CC: ("RM mini 3", "Broadlink"), + 0x27CD: ("RM mini 3", "Broadlink"), + 0x27D0: ("RM mini 3", "Broadlink"), + 0x27D1: ("RM mini 3", "Broadlink"), + 0x27D3: ("RM mini 3", "Broadlink"), + 0x27DC: ("RM mini 3", "Broadlink"), + 0x27DE: ("RM mini 3", "Broadlink"), + }, + rmpro: { + 0x2712: ("RM pro/pro+", "Broadlink"), + 0x272A: ("RM pro", "Broadlink"), + 0x273D: ("RM pro", "Broadlink"), + 0x277C: ("RM home", "Broadlink"), + 0x2783: ("RM home", "Broadlink"), + 0x2787: ("RM pro", "Broadlink"), + 0x278B: ("RM plus", "Broadlink"), + 0x2797: ("RM pro+", "Broadlink"), + 0x279D: ("RM pro+", "Broadlink"), + 0x27A1: ("RM plus", "Broadlink"), + 0x27A6: ("RM plus", "Broadlink"), + 0x27A9: ("RM pro+", "Broadlink"), + 0x27C3: ("RM pro+", "Broadlink"), + }, + rmminib: { + 0x5F36: ("RM mini 3", "Broadlink"), + 0x6507: ("RM mini 3", "Broadlink"), + 0x6508: ("RM mini 3", "Broadlink"), + }, + rm4mini: { + 0x51DA: ("RM4 mini", "Broadlink"), + 0x5209: ("RM4 TV mate", "Broadlink"), + 0x520C: ("RM4 mini", "Broadlink"), + 0x520D: ("RM4C mini", "Broadlink"), + 0x5211: ("RM4C mate", "Broadlink"), + 0x5212: ("RM4 TV mate", "Broadlink"), + 0x5216: ("RM4 mini", "Broadlink"), + 0x521C: ("RM4 mini", "Broadlink"), + 0x6070: ("RM4C mini", "Broadlink"), + 0x610E: ("RM4 mini", "Broadlink"), + 0x610F: ("RM4C mini", "Broadlink"), + 0x62BC: ("RM4 mini", "Broadlink"), + 0x62BE: ("RM4C mini", "Broadlink"), + 0x6364: ("RM4S", "Broadlink"), + 0x648D: ("RM4 mini", "Broadlink"), + 0x6539: ("RM4C mini", "Broadlink"), + 0x653A: ("RM4 mini", "Broadlink"), + }, + rm4pro: { + 0x520B: ("RM4 pro", "Broadlink"), + 0x5213: ("RM4 pro", "Broadlink"), + 0x5218: ("RM4C pro", "Broadlink"), + 0x6026: ("RM4 pro", "Broadlink"), + 0x6184: ("RM4C pro", "Broadlink"), + 0x61A2: ("RM4 pro", "Broadlink"), + 0x649B: ("RM4 pro", "Broadlink"), + 0x653C: ("RM4 pro", "Broadlink"), + }, + a1: { + 0x2714: ("A1", "Broadlink"), + }, + a2: { + 0x4F60: ("A2", "Broadlink"), + }, + mp1: { + 0x4EB5: ("MP1-1K4S", "Broadlink"), + 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), + 0x4F65: ("MP1-1K3S2U", "Broadlink"), + }, + mp1s: { + 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), + }, + lb1: { + 0x5043: ("SB800TD", "Broadlink (OEM)"), + 0x504E: ("LB1", "Broadlink"), + 0x606D: ("SLA22RGB9W81/SLA27RGB9W81", "Luceco"), + 0x606E: ("SB500TD", "Broadlink (OEM)"), + 0x60C7: ("LB1", "Broadlink"), + 0x60C8: ("LB1", "Broadlink"), + 0x6112: ("LB1", "Broadlink"), + 0x644B: ("LB1", "Broadlink"), + 0x644C: ("LB27 R1", "Broadlink"), + 0x644E: ("LB26 R1", "Broadlink"), + 0x6488: ("LB27 C1", "Broadlink"), + }, + lb2: { + 0xA4F4: ("LB27 R1", "Broadlink"), + 0xA5F7: ("LB27 R1", "Broadlink"), + 0xA6EF: ("EFCF60WSMT", "Luceco"), + }, + S1C: { + 0x2722: ("S2KIT", "Broadlink"), + }, + s3: { + 0xA59C: ("S3", "Broadlink"), + 0xA64D: ("S3", "Broadlink"), + }, + hvac: { + 0x4E2A: ("HVAC", "Licensed manufacturer"), + }, + hysen: { + 0x4EAD: ("HY02/HY03", "Hysen"), + }, + dooya: { + 0x4E4D: ("DT360E-45/20", "Dooya"), + }, + dooya2: { + 0x4F6E: ("DT360E-45/20", "Dooya"), + }, + wser: { + 0x4F6C: ("WSER", "Wistar"), + }, + bg1: { + 0x51E3: ("BG800/BG900", "BG Electrical"), + }, + ehc31: { + 0x6480: ("EHC31", "BG Electrical"), + }, } -class S1C(device): - """ - Its VERY VERY VERY DIRTY IMPLEMENTATION of S1C - """ - def __init__(self, *a, **kw): - device.__init__(self, *a, **kw) - self.type = 'S1C' - - def get_sensors_status(self): - packet = bytearray(16) - packet[0] = 0x06 # 0x06 - get sensors info, 0x07 - probably add sensors - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv)) - - payload = aes.decrypt(bytes(response[0x38:])) - if payload: - head = payload[:4] - count = payload[0x4] #need to fix for python 2.x - sensors = payload[0x6:] - sensors_a = [bytearray(sensors[i * 83:(i + 1) * 83]) for i in range(len(sensors) // 83)] - - sens_res = [] - for sens in sensors_a: - status = ord(chr(sens[0])) - _name = str(bytes(sens[4:26]).decode()) - _order = ord(chr(sens[1])) - _type = ord(chr(sens[3])) - _serial = bytes(codecs.encode(sens[26:30],"hex")).decode() - - type_str = S1C_SENSORS_TYPES.get(_type, 'Unknown') - - r = { - 'status': status, - 'name': _name.strip('\x00'), - 'type': type_str, - 'order': _order, - 'serial': _serial, - } - if r['serial'] != '00000000': - sens_res.append(r) - result = { - 'count': count, - 'sensors': sens_res - } - return result - - -class dooya(device): - def __init__ (self, host, mac, devtype): - device.__init__(self, host, mac, devtype) - self.type = "Dooya DT360E" - - def _send(self, magic1, magic2): - packet = bytearray(16) - packet[0] = 0x09 - packet[2] = 0xbb - packet[3] = magic1 - packet[4] = magic2 - packet[9] = 0xfa - packet[10] = 0x44 - response = self.send_packet(0x6a, packet) - err = response[0x22] | (response[0x23] << 8) - if err == 0: - payload = self.decrypt(bytes(response[0x38:])) - return ord(payload[4]) - - def open(self): - return self._send(0x01, 0x00) - - def close(self): - return self._send(0x02, 0x00) - - def stop(self): - return self._send(0x03, 0x00) - - def get_percentage(self): - return self._send(0x06, 0x5d) - - def set_percentage_and_wait(self, new_percentage): - current = self.get_percentage() - if current > new_percentage: - self.close() - while current is not None and current > new_percentage: - time.sleep(0.2) - current = self.get_percentage() - - elif current < new_percentage: - self.open() - while current is not None and current < new_percentage: - time.sleep(0.2) - current = self.get_percentage() - self.stop() +def gendevice( + dev_type: int, + host: Tuple[str, int], + mac: Union[bytes, str], + name: str = "", + is_locked: bool = False, +) -> Device: + """Generate a device.""" + for dev_cls, products in SUPPORTED_TYPES.items(): + try: + model, manufacturer = products[dev_type] + + except KeyError: + continue + + return dev_cls( + host, + mac, + dev_type, + name=name, + model=model, + manufacturer=manufacturer, + is_locked=is_locked, + ) + + return Device(host, mac, dev_type, name=name, is_locked=is_locked) + + +def hello( + ip_address: str, + port: int = DEFAULT_PORT, + timeout: int = DEFAULT_TIMEOUT, +) -> Device: + """Direct device discovery. + + Useful if the device is locked. + """ + try: + return next( + xdiscover( + timeout=timeout, + discover_ip_address=ip_address, + discover_ip_port=port, + ) + ) + except StopIteration as err: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {timeout}s", + ) from err + + +def discover( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> List[Device]: + """Discover devices connected to the local network.""" + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) + return [gendevice(*resp) for resp in responses] + + +def xdiscover( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> Generator[Device, None, None]: + """Discover devices connected to the local network. + + This function returns a generator that yields devices instantly. + """ + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) + for resp in responses: + yield gendevice(*resp) # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid, password, security_mode): - # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) - payload = bytearray(0x88) - payload[0x26] = 0x14 # This seems to always be set to 14 - # Add the SSID to the payload - ssid_start = 68 - ssid_length = 0 - for letter in ssid: - payload[(ssid_start + ssid_length)] = ord(letter) - ssid_length += 1 - # Add the WiFi password to the payload - pass_start = 100 - pass_length = 0 - for letter in password: - payload[(pass_start + pass_length)] = ord(letter) - pass_length += 1 - - payload[0x84] = ssid_length # Character length of SSID - payload[0x85] = pass_length # Character length of password - payload[0x86] = security_mode # Type of encryption (00 - none, 01 = WEP, 02 = WPA1, 03 = WPA2, 04 = WPA1/2) - - checksum = 0xbeaf - for i in range(len(payload)): - checksum += payload[i] - checksum = checksum & 0xffff - - payload[0x20] = checksum & 0xff # Checksum 1 position - payload[0x21] = checksum >> 8 # Checksum 2 position - - sock = socket.socket(socket.AF_INET, # Internet - socket.SOCK_DGRAM) # UDP - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, ('255.255.255.255', 80)) +def setup( + ssid: str, + password: str, + security_mode: int, + ip_address: str = DEFAULT_BCAST_ADDR, +) -> None: + """Set up a new Broadlink device via AP mode.""" + # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) + payload = bytearray(0x88) + payload[0x26] = 0x14 # This seems to always be set to 14 + # Add the SSID to the payload + ssid_start = 68 + ssid_length = 0 + for letter in ssid: + payload[(ssid_start + ssid_length)] = ord(letter) + ssid_length += 1 + # Add the WiFi password to the payload + pass_start = 100 + pass_length = 0 + for letter in password: + payload[(pass_start + pass_length)] = ord(letter) + pass_length += 1 + + payload[0x84] = ssid_length # Character length of SSID + payload[0x85] = pass_length # Character length of password + payload[0x86] = security_mode # Type of encryption + + checksum = sum(payload, 0xBEAF) & 0xFFFF + payload[0x20] = checksum & 0xFF # Checksum 1 position + payload[0x21] = checksum >> 8 # Checksum 2 position + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.sendto(payload, (ip_address, DEFAULT_PORT)) + sock.close() diff --git a/broadlink/alarm.py b/broadlink/alarm.py new file mode 100644 index 00000000..a9b5e879 --- /dev/null +++ b/broadlink/alarm.py @@ -0,0 +1,43 @@ +"""Support for alarm kits.""" +from . import exceptions as e +from .device import Device + + +class S1C(Device): + """Controls a Broadlink S1C.""" + + TYPE = "S1C" + + _SENSORS_TYPES = { + 0x31: "Door Sensor", + 0x91: "Key Fob", + 0x21: "Motion Sensor", + } + + def get_sensors_status(self) -> dict: + """Return the state of the sensors.""" + packet = bytearray(16) + packet[0] = 0x06 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + count = payload[0x4] + sensor_data = payload[0x6:] + sensors = [ + bytearray(sensor_data[i * 83 : (i + 1) * 83]) + for i in range(len(sensor_data) // 83) + ] + return { + "count": count, + "sensors": [ + { + "status": sensor[0], + "name": sensor[4:26].decode().strip("\x00"), + "type": self._SENSORS_TYPES.get(sensor[3], "Unknown"), + "order": sensor[1], + "serial": sensor[26:30].hex(), + } + for sensor in sensors + if any(sensor[26:30]) + ], + } diff --git a/broadlink/climate.py b/broadlink/climate.py new file mode 100755 index 00000000..1a0c6006 --- /dev/null +++ b/broadlink/climate.py @@ -0,0 +1,474 @@ +"""Support for climate control.""" +import enum +import struct +from typing import List, Sequence + +from . import exceptions as e +from .device import Device +from .helpers import CRC16 + + +class hysen(Device): + """Controls a Hysen heating thermostat. + + This device is manufactured by Hysen and sold under different + brands, including Floureon, Beca Energy, Beok and Decdeal. + + Supported models: + - HY02B05H + - HY03WE + """ + + TYPE = "HYS" + + def send_request(self, request: Sequence[int]) -> bytes: + """Send a request to the device.""" + packet = bytearray() + packet.extend((len(request) + 2).to_bytes(2, "little")) + packet.extend(request) + packet.extend(CRC16.calculate(request).to_bytes(2, "little")) + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len]) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + return payload[0x02:p_len] + + def _decode_temp(self, payload, base_index): + base_temp = payload[base_index] / 2.0 + add_offset = (payload[4] >> 3) & 1 # should offset be added? + offset_raw_value = (payload[17] >> 4) & 3 # offset value + offset = (offset_raw_value + 1) / 10 if add_offset else 0.0 + return base_temp + offset + + def get_temp(self) -> float: + """Return the room temperature in degrees celsius.""" + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) + return self._decode_temp(payload, 5) + + def get_external_temp(self) -> float: + """Return the external temperature in degrees celsius.""" + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) + return self._decode_temp(payload, 18) + + def get_full_status(self) -> dict: + """Return the state of the device. + + Timer schedule included. + """ + payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x16]) + data = {} + data["remote_lock"] = payload[3] & 1 + data["power"] = payload[4] & 1 + data["active"] = (payload[4] >> 4) & 1 + data["temp_manual"] = (payload[4] >> 6) & 1 + data["heating_cooling"] = (payload[4] >> 7) & 1 + data["room_temp"] = self._decode_temp(payload, 5) + data["thermostat_temp"] = payload[6] / 2.0 + data["auto_mode"] = payload[7] & 0x0F + data["loop_mode"] = payload[7] >> 4 + data["sensor"] = payload[8] + data["osv"] = payload[9] + data["dif"] = payload[10] + data["svh"] = payload[11] + data["svl"] = payload[12] + data["room_temp_adj"] = ( + int.from_bytes(payload[13:15], "big", signed=True) / 10.0 + ) + data["fre"] = payload[15] + data["poweron"] = payload[16] + data["unknown"] = payload[17] + data["external_temp"] = self._decode_temp(payload, 18) + data["hour"] = payload[19] + data["min"] = payload[20] + data["sec"] = payload[21] + data["dayofweek"] = payload[22] + + weekday = [] + for i in range(0, 6): + weekday.append( + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekday"] = weekday + weekend = [] + for i in range(6, 8): + weekend.append( + { + "start_hour": payload[2 * i + 23], + "start_minute": payload[2 * i + 24], + "temp": payload[i + 39] / 2.0, + } + ) + + data["weekend"] = weekend + return data + + # Change controller mode + # auto_mode = 1 for auto (scheduled/timed) mode, 0 for manual mode. + # Manual mode will activate last used temperature. + # In typical usage call set_temp to activate manual control and set temp. + # loop_mode refers to index in [ "12345,67", "123456,7", "1234567" ] + # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) + # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) + # The sensor command is currently experimental + def set_mode( + self, auto_mode: int, loop_mode: int, sensor: int = 0 + ) -> None: + """Set the mode of the device.""" + mode_byte = ((loop_mode + 1) << 4) + auto_mode + self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) + + # Advanced settings + # Sensor mode (SEN) sensor = 0 for internal sensor, 1 for external sensor, + # 2 for internal control temperature, external limit temperature. Factory default: 0. + # Set temperature range for external sensor (OSV) osv = 5..99. Factory default: 42C + # Deadzone for floor temprature (dIF) dif = 1..9. Factory default: 2C + # Upper temperature limit for internal sensor (SVH) svh = 5..99. Factory default: 35C + # Lower temperature limit for internal sensor (SVL) svl = 5..99. Factory default: 5C + # Actual temperature calibration (AdJ) adj = -0.5. Precision 0.1C + # Anti-freezing function (FrE) fre = 0 for anti-freezing function shut down, + # 1 for anti-freezing function open. Factory default: 0 + # Power on memory (POn) poweron = 0 for off, 1 for on. Default: 0 + def set_advanced( + self, + loop_mode: int, + sensor: int, + osv: int, + dif: int, + svh: int, + svl: int, + adj: float, + fre: int, + poweron: int, + ) -> None: + """Set advanced options.""" + self.send_request( + [ + 0x01, + 0x10, + 0x00, + 0x02, + 0x00, + 0x05, + 0x0A, + loop_mode, + sensor, + osv, + dif, + svh, + svl, + int(adj * 10) >> 8 & 0xFF, + int(adj * 10) & 0xFF, + fre, + poweron, + ] + ) + + # For backwards compatibility only. Prefer calling set_mode directly. + # Note this function invokes loop_mode=0 and sensor=0. + def switch_to_auto(self) -> None: + """Switch mode to auto.""" + self.set_mode(auto_mode=1, loop_mode=0) + + def switch_to_manual(self) -> None: + """Switch mode to manual.""" + self.set_mode(auto_mode=0, loop_mode=0) + + # Set temperature for manual mode (also activates manual mode if currently in automatic) + def set_temp(self, temp: float) -> None: + """Set the target temperature.""" + self.send_request([0x01, 0x06, 0x00, 0x01, 0x00, int(temp * 2)]) + + # Set device on(1) or off(0), does not deactivate Wifi connectivity. + # Remote lock disables control by buttons on thermostat. + # heating_cooling: heating(0) cooling(1) + def set_power( + self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0 + ) -> None: + """Set the power state of the device.""" + state = (heating_cooling << 7) + power + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state]) + + # set time on device + # n.b. day=1 is Monday, ..., day=7 is Sunday + def set_time(self, hour: int, minute: int, second: int, day: int) -> None: + """Set the time.""" + self.send_request( + [ + 0x01, + 0x10, + 0x00, + 0x08, + 0x00, + 0x02, + 0x04, + hour, + minute, + second, + day + ] + ) + + # Set timer schedule + # Format is the same as you get from get_full_status. + # weekday is a list (ordered) of 6 dicts like: + # {'start_hour':17, 'start_minute':30, 'temp': 22 } + # Each one specifies the thermostat temp that will become effective at start_hour:start_minute + # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: + """Set timer schedule.""" + request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] + + # weekday times + for i in range(0, 6): + request.append(weekday[i]["start_hour"]) + request.append(weekday[i]["start_minute"]) + + # weekend times + for i in range(0, 2): + request.append(weekend[i]["start_hour"]) + request.append(weekend[i]["start_minute"]) + + # weekday temperatures + for i in range(0, 6): + request.append(int(weekday[i]["temp"] * 2)) + + # weekend temperatures + for i in range(0, 2): + request.append(int(weekend[i]["temp"] * 2)) + + self.send_request(request) + + +class hvac(Device): + """Controls a HVAC. + + Supported models: + - Tornado SMART X SQ series + - Aux ASW-H12U3/JIR1DI-US + - Aux ASW-H36U2/LFR1DI-US + """ + + TYPE = "HVAC" + + @enum.unique + class Mode(enum.IntEnum): + """Enumerates modes.""" + + AUTO = 0 + COOL = 1 + DRY = 2 + HEAT = 3 + FAN = 4 + + @enum.unique + class Speed(enum.IntEnum): + """Enumerates fan speed.""" + + HIGH = 1 + MID = 2 + LOW = 3 + AUTO = 5 + + @enum.unique + class Preset(enum.IntEnum): + """Enumerates presets.""" + + NORMAL = 0 + TURBO = 1 + MUTE = 2 + + @enum.unique + class SwHoriz(enum.IntEnum): + """Enumerates horizontal swing.""" + + ON = 0 + OFF = 7 + + @enum.unique + class SwVert(enum.IntEnum): + """Enumerates vertical swing.""" + + ON = 0 + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 7 + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + packet = bytearray(10) + p_len = 10 + len(data) + struct.pack_into( + " bytes: + """Decode data from transport.""" + # payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(response[0x38:]) + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + d_len = int.from_bytes(payload[0x08:0x0A], "little") + return payload[0x0A:0x0A+d_len] + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a command to the unit.""" + prefix = bytes([((command << 4) | 1), 1]) + packet = self._encode(prefix + data) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response)[0x02:] + + def _parse_state(self, data: bytes) -> dict: + """Parse state.""" + state = {} + state["power"] = bool(data[0x08] & 1 << 5) + state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5 + state["swing_v"] = self.SwVert(data[0x00] & 0b111) + state["swing_h"] = self.SwHoriz(data[0x01] >> 5) + state["mode"] = self.Mode(data[0x05] >> 5) + state["speed"] = self.Speed(data[0x03] >> 5) + state["preset"] = self.Preset(data[0x04] >> 6) + state["sleep"] = bool(data[0x05] & 1 << 2) + state["ifeel"] = bool(data[0x05] & 1 << 3) + state["health"] = bool(data[0x08] & 1 << 1) + state["clean"] = bool(data[0x08] & 1 << 2) + state["display"] = bool(data[0x0A] & 1 << 4) + state["mildew"] = bool(data[0x0A] & 1 << 3) + return state + + def set_state( + self, + power: bool, + target_temp: float, # 16<=target_temp<=32 + mode: Mode, + speed: Speed, + preset: Preset, + swing_h: SwHoriz, + swing_v: SwVert, + sleep: bool, + ifeel: bool, + display: bool, + health: bool, + clean: bool, + mildew: bool, + ) -> dict: + """Set the state of the device.""" + # TODO: decode unknown bits + UNK0 = 0b100 + UNK1 = 0b1101 + UNK2 = 0b101 + + target_temp = round(target_temp * 2) / 2 + + if preset == self.Preset.MUTE: + if mode != self.Mode.FAN: + raise ValueError("mute is only available in fan mode") + speed = self.Speed.LOW + + elif preset == self.Preset.TURBO: + if mode not in {self.Mode.COOL, self.Mode.HEAT}: + raise ValueError("turbo is only available in cooling/heating") + speed = self.Speed.HIGH + + data = bytearray(0x0D) + data[0x00] = (int(target_temp) - 8 << 3) | swing_v + data[0x01] = (swing_h << 5) | UNK0 + data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1 + data[0x03] = speed << 5 + data[0x04] = preset << 6 + data[0x05] = mode << 5 | sleep << 2 | ifeel << 3 + data[0x08] = power << 5 | clean << 2 | (health and 0b11) + data[0x0A] = display << 4 | mildew << 3 + data[0x0C] = UNK2 + + resp = self._send(0, data) + return self._parse_state(resp) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16 dict: + """Returns dictionary with AC info. + + Returns: + dict: + power (bool): power + ambient_temp (float): ambient temperature + """ + resp = self._send(2) + + if len(resp) < 22: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 24 bytes and received {len(resp) + 2}", + ) + + ac_info = {} + ac_info["power"] = resp[0x1] & 1 + + ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111 + if any(ambient_temp): + ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0 + + return ac_info diff --git a/broadlink/const.py b/broadlink/const.py new file mode 100644 index 00000000..19c37f52 --- /dev/null +++ b/broadlink/const.py @@ -0,0 +1,5 @@ +"""Constants.""" +DEFAULT_BCAST_ADDR = "255.255.255.255" +DEFAULT_PORT = 80 +DEFAULT_RETRY_INTVL = 1 +DEFAULT_TIMEOUT = 10 diff --git a/broadlink/cover.py b/broadlink/cover.py new file mode 100644 index 00000000..75317943 --- /dev/null +++ b/broadlink/cover.py @@ -0,0 +1,182 @@ +"""Support for covers.""" +import time +from typing import Sequence + +from . import exceptions as e +from .device import Device + + +class dooya(Device): + """Controls a Dooya curtain motor.""" + + TYPE = "DT360E" + + def _send(self, command: int, attribute: int = 0) -> int: + """Send a packet to the device.""" + packet = bytearray(16) + packet[0x00] = 0x09 + packet[0x02] = 0xBB + packet[0x03] = command + packet[0x04] = attribute + packet[0x09] = 0xFA + packet[0x0A] = 0x44 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload[4] + + def open(self) -> int: + """Open the curtain.""" + return self._send(0x01) + + def close(self) -> int: + """Close the curtain.""" + return self._send(0x02) + + def stop(self) -> int: + """Stop the curtain.""" + return self._send(0x03) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + return self._send(0x06, 0x5D) + + def set_percentage_and_wait(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + current = self.get_percentage() + if current > new_percentage: + self.close() + while current is not None and current > new_percentage: + time.sleep(0.2) + current = self.get_percentage() + + elif current < new_percentage: + self.open() + while current is not None and current < new_percentage: + time.sleep(0.2) + current = self.get_percentage() + self.stop() + + +class dooya2(Device): + """Controls a Dooya curtain motor (version 2).""" + + TYPE = "DT360E-2" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def open(self) -> None: + """Open the curtain.""" + self._send(2, [0x00, 0x01, 0x00]) + + def close(self) -> None: + """Close the curtain.""" + self._send(2, [0x00, 0x02, 0x00]) + + def stop(self) -> None: + """Stop the curtain.""" + self._send(2, [0x00, 0x03, 0x00]) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, [0x00, 0x06, 0x00]) + return resp[0x11] + + def set_percentage(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + self._send(2, [0x00, 0x09, new_percentage]) + + +class wser(Device): + """Controls a Wistar curtain motor""" + + TYPE = "WSER" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def get_position(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, []) + position = resp[0x0E] + return position + + def open(self) -> int: + """Open the curtain.""" + resp = self._send(2, [0x4A, 0x31, 0xA0]) + position = resp[0x0E] + return position + + def close(self) -> int: + """Close the curtain.""" + resp = self._send(2, [0x61, 0x32, 0xA0]) + position = resp[0x0E] + return position + + def stop(self) -> int: + """Stop the curtain.""" + resp = self._send(2, [0x4C, 0x73, 0xA0]) + position = resp[0x0E] + return position + + def set_position(self, position: int) -> int: + """Set the position of the curtain.""" + resp = self._send(2, [position, 0x70, 0xA0]) + position = resp[0x0E] + return position diff --git a/broadlink/device.py b/broadlink/device.py new file mode 100644 index 00000000..5a10bc01 --- /dev/null +++ b/broadlink/device.py @@ -0,0 +1,332 @@ +"""Support for Broadlink devices.""" +import socket +import threading +import random +import time +from typing import Generator, Optional, Tuple, Union + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from . import exceptions as e +from .const import ( + DEFAULT_BCAST_ADDR, + DEFAULT_PORT, + DEFAULT_RETRY_INTVL, + DEFAULT_TIMEOUT, +) +from .protocol import Datetime + +HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] + + +def scan( + timeout: int = DEFAULT_TIMEOUT, + local_ip_address: Optional[str] = None, + discover_ip_address: str = DEFAULT_BCAST_ADDR, + discover_ip_port: int = DEFAULT_PORT, +) -> Generator[HelloResponse, None, None]: + """Broadcast a hello message and yield responses.""" + conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + if local_ip_address: + conn.bind((local_ip_address, 0)) + port = conn.getsockname()[1] + else: + local_ip_address = "0.0.0.0" + port = 0 + + packet = bytearray(0x30) + packet[0x08:0x14] = Datetime.pack(Datetime.now()) + packet[0x18:0x1C] = socket.inet_aton(local_ip_address)[::-1] + packet[0x1C:0x1E] = port.to_bytes(2, "little") + packet[0x26] = 6 + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20:0x22] = checksum.to_bytes(2, "little") + + start_time = time.time() + discovered = [] + + try: + while (time.time() - start_time) < timeout: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) + conn.sendto(packet, (discover_ip_address, discover_ip_port)) + + while True: + try: + resp, host = conn.recvfrom(1024) + except socket.timeout: + break + + devtype = resp[0x34] | resp[0x35] << 8 + mac = resp[0x3A:0x40][::-1] + + if (host, mac, devtype) in discovered: + continue + discovered.append((host, mac, devtype)) + + name = resp[0x40:].split(b"\x00")[0].decode() + is_locked = bool(resp[0x7F]) + yield devtype, host, mac, name, is_locked + finally: + conn.close() + + +def ping(ip_address: str, port: int = DEFAULT_PORT) -> None: + """Send a ping packet to an address. + + This packet feeds the watchdog timer of firmwares >= v53. + Useful to prevent reboots when the cloud cannot be reached. + It must be sent every 2 minutes in such cases. + """ + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + packet = bytearray(0x30) + packet[0x26] = 1 + conn.sendto(packet, (ip_address, port)) + + +class Device: + """Controls a Broadlink device.""" + + TYPE = "Unknown" + + __INIT_KEY = "097628343fe99e23765c1513accf8b02" + __INIT_VECT = "562e17996d093d28ddb3ba695a2e6f58" + + def __init__( + self, + host: Tuple[str, int], + mac: Union[bytes, str], + devtype: int, + timeout: int = DEFAULT_TIMEOUT, + name: str = "", + model: str = "", + manufacturer: str = "", + is_locked: bool = False, + ) -> None: + """Initialize the controller.""" + self.host = host + self.mac = bytes.fromhex(mac) if isinstance(mac, str) else mac + self.devtype = devtype + self.timeout = timeout + self.name = name + self.model = model + self.manufacturer = manufacturer + self.is_locked = is_locked + self.count = random.randint(0x8000, 0xFFFF) + self.iv = bytes.fromhex(self.__INIT_VECT) + self.id = 0 + self.type = self.TYPE # For backwards compatibility. + self.lock = threading.Lock() + + self.aes = None + self.update_aes(bytes.fromhex(self.__INIT_KEY)) + + def __repr__(self) -> str: + """Return a formal representation of the device.""" + return ( + "%s.%s(%s, mac=%r, devtype=%r, timeout=%r, name=%r, " + "model=%r, manufacturer=%r, is_locked=%r)" + ) % ( + self.__class__.__module__, + self.__class__.__qualname__, + self.host, + self.mac, + self.devtype, + self.timeout, + self.name, + self.model, + self.manufacturer, + self.is_locked, + ) + + def __str__(self) -> str: + """Return a readable representation of the device.""" + return "%s (%s / %s:%s / %s)" % ( + self.name or "Unknown", + " ".join(filter(None, [self.manufacturer, self.model, hex(self.devtype)])), + *self.host, + ":".join(format(x, "02X") for x in self.mac), + ) + + def update_aes(self, key: bytes) -> None: + """Update AES.""" + self.aes = Cipher( + algorithms.AES(bytes(key)), modes.CBC(self.iv), backend=default_backend() + ) + + def encrypt(self, payload: bytes) -> bytes: + """Encrypt the payload.""" + encryptor = self.aes.encryptor() + return encryptor.update(bytes(payload)) + encryptor.finalize() + + def decrypt(self, payload: bytes) -> bytes: + """Decrypt the payload.""" + decryptor = self.aes.decryptor() + return decryptor.update(bytes(payload)) + decryptor.finalize() + + def auth(self) -> bool: + """Authenticate to the device.""" + self.id = 0 + self.update_aes(bytes.fromhex(self.__INIT_KEY)) + + packet = bytearray(0x50) + packet[0x04:0x14] = [0x31] * 16 + packet[0x1E] = 0x01 + packet[0x2D] = 0x01 + packet[0x30:0x36] = "Test 1".encode() + + response = self.send_packet(0x65, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + + self.id = int.from_bytes(payload[:0x4], "little") + self.update_aes(payload[0x04:0x14]) + return True + + def hello(self, local_ip_address=None) -> bool: + """Send a hello message to the device. + + Device information is checked before updating name and lock status. + """ + responses = scan( + timeout=self.timeout, + local_ip_address=local_ip_address, + discover_ip_address=self.host[0], + discover_ip_port=self.host[1], + ) + try: + devtype, _, mac, name, is_locked = next(responses) + + except StopIteration as err: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {self.timeout}s", + ) from err + + if mac != self.mac: + raise e.DataValidationError( + -2040, + "Device information is not intact", + "The MAC address is different", + f"Expected {self.mac} and received {mac}", + ) + + if devtype != self.devtype: + raise e.DataValidationError( + -2040, + "Device information is not intact", + "The product ID is different", + f"Expected {self.devtype} and received {devtype}", + ) + + self.name = name + self.is_locked = is_locked + return True + + def ping(self) -> None: + """Ping the device. + + This packet feeds the watchdog timer of firmwares >= v53. + Useful to prevent reboots when the cloud cannot be reached. + It must be sent every 2 minutes in such cases. + """ + ping(self.host[0], port=self.host[1]) + + def get_fwversion(self) -> int: + """Get firmware version.""" + packet = bytearray([0x68]) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x4] | payload[0x5] << 8 + + def set_name(self, name: str) -> None: + """Set device name.""" + packet = bytearray(4) + packet += name.encode("utf-8") + packet += bytearray(0x50 - len(packet)) + packet[0x43] = self.is_locked + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + self.name = name + + def set_lock(self, state: bool) -> None: + """Lock/unlock the device.""" + packet = bytearray(4) + packet += self.name.encode("utf-8") + packet += bytearray(0x50 - len(packet)) + packet[0x43] = bool(state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + self.is_locked = bool(state) + + def get_type(self) -> str: + """Return device type.""" + return self.type + + def send_packet(self, packet_type: int, payload: bytes) -> bytes: + """Send a packet to the device.""" + self.count = ((self.count + 1) | 0x8000) & 0xFFFF + packet = bytearray(0x38) + packet[0x00:0x08] = bytes.fromhex("5aa5aa555aa5aa55") + packet[0x24:0x26] = self.devtype.to_bytes(2, "little") + packet[0x26:0x28] = packet_type.to_bytes(2, "little") + packet[0x28:0x2A] = self.count.to_bytes(2, "little") + packet[0x2A:0x30] = self.mac[::-1] + packet[0x30:0x34] = self.id.to_bytes(4, "little") + + p_checksum = sum(payload, 0xBEAF) & 0xFFFF + packet[0x34:0x36] = p_checksum.to_bytes(2, "little") + + padding = (16 - len(payload)) % 16 + payload = self.encrypt(payload + bytes(padding)) + packet.extend(payload) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x20:0x22] = checksum.to_bytes(2, "little") + + with self.lock and socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as conn: + timeout = self.timeout + start_time = time.time() + + while True: + time_left = timeout - (time.time() - start_time) + conn.settimeout(min(DEFAULT_RETRY_INTVL, time_left)) + conn.sendto(packet, self.host) + + try: + resp = conn.recvfrom(2048)[0] + break + except socket.timeout as err: + if (time.time() - start_time) > timeout: + raise e.NetworkTimeoutError( + -4000, + "Network timeout", + f"No response received within {timeout}s", + ) from err + + if len(resp) < 0x30: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 48 bytes and received {len(resp)}", + ) + + nom_checksum = int.from_bytes(resp[0x20:0x22], "little") + real_checksum = sum(resp, 0xBEAF) - sum(resp[0x20:0x22]) & 0xFFFF + + if nom_checksum != real_checksum: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_checksum} and received {real_checksum}", + ) + + return resp diff --git a/broadlink/exceptions.py b/broadlink/exceptions.py new file mode 100644 index 00000000..2343ad6e --- /dev/null +++ b/broadlink/exceptions.py @@ -0,0 +1,152 @@ +"""Exceptions for Broadlink devices.""" +import collections +import struct + + +class BroadlinkException(Exception): + """Base class common to all Broadlink exceptions.""" + + def __init__(self, *args, **kwargs): + """Initialize the exception.""" + super().__init__(*args, **kwargs) + if len(args) >= 2: + self.errno = args[0] + self.strerror = ": ".join(str(arg) for arg in args[1:]) + elif len(args) == 1: + self.errno = None + self.strerror = str(args[0]) + else: + self.errno = None + self.strerror = "" + + def __str__(self): + """Return str(self).""" + if self.errno is not None: + return "[Errno %s] %s" % (self.errno, self.strerror) + return self.strerror + + def __eq__(self, other): + """Return self==value.""" + # pylint: disable=unidiomatic-typecheck + return type(self) == type(other) and self.args == other.args + + def __hash__(self): + """Return hash(self).""" + return hash((type(self), self.args)) + + +class MultipleErrors(BroadlinkException): + """Multiple errors.""" + + def __init__(self, *args, **kwargs): + """Initialize the exception.""" + errors = args[0][:] if args else [] + counter = collections.Counter(errors) + strerror = "Multiple errors occurred: %s" % counter + super().__init__(strerror, **kwargs) + self.errors = errors + + def __repr__(self): + """Return repr(self).""" + return "MultipleErrors(%r)" % self.errors + + def __str__(self): + """Return str(self).""" + return self.strerror + + +class AuthenticationError(BroadlinkException): + """Authentication error.""" + + +class AuthorizationError(BroadlinkException): + """Authorization error.""" + + +class CommandNotSupportedError(BroadlinkException): + """Command not supported error.""" + + +class ConnectionClosedError(BroadlinkException): + """Connection closed error.""" + + +class StructureAbnormalError(BroadlinkException): + """Structure abnormal error.""" + + +class DeviceOfflineError(BroadlinkException): + """Device offline error.""" + + +class ReadError(BroadlinkException): + """Read error.""" + + +class SendError(BroadlinkException): + """Send error.""" + + +class SSIDNotFoundError(BroadlinkException): + """SSID not found error.""" + + +class StorageError(BroadlinkException): + """Storage error.""" + + +class WriteError(BroadlinkException): + """Write error.""" + + +class NetworkTimeoutError(BroadlinkException): + """Network timeout error.""" + + +class DataValidationError(BroadlinkException): + """Data validation error.""" + + +class UnknownError(BroadlinkException): + """Unknown error.""" + + +BROADLINK_EXCEPTIONS = { + # Firmware-related errors are generated by the device. + -1: (AuthenticationError, "Authentication failed"), + -2: (ConnectionClosedError, "You have been logged out"), + -3: (DeviceOfflineError, "The device is offline"), + -4: (CommandNotSupportedError, "Command not supported"), + -5: (StorageError, "The device storage is full"), + -6: (StructureAbnormalError, "Structure is abnormal"), + -7: (AuthorizationError, "Control key is expired"), + -8: (SendError, "Send error"), + -9: (WriteError, "Write error"), + -10: (ReadError, "Read error"), + -11: (SSIDNotFoundError, "SSID could not be found in AP configuration"), + # SDK related errors are generated by this module. + -2040: (DataValidationError, "Device information is not intact"), + -4000: (NetworkTimeoutError, "Network timeout"), + -4007: (DataValidationError, "Received data packet length error"), + -4008: (DataValidationError, "Received data packet check error"), + -4009: (DataValidationError, "Received data packet information type error"), + -4010: (DataValidationError, "Received encrypted data packet length error"), + -4011: (DataValidationError, "Received encrypted data packet check error"), + -4012: (AuthorizationError, "Device control ID error"), +} + + +def exception(err_code: int) -> BroadlinkException: + """Return exception corresponding to an error code.""" + try: + exc, msg = BROADLINK_EXCEPTIONS[err_code] + return exc(err_code, msg) + except KeyError: + return UnknownError(err_code, "Unknown error") + + +def check_error(error: bytes) -> None: + """Raise exception if an error occurred.""" + error_code = struct.unpack("h", error)[0] + if error_code: + raise exception(error_code) diff --git a/broadlink/helpers.py b/broadlink/helpers.py new file mode 100644 index 00000000..e7b3d4c9 --- /dev/null +++ b/broadlink/helpers.py @@ -0,0 +1,43 @@ +"""Helper functions and classes.""" +from typing import Dict, List, Sequence + + +class CRC16: + """Helps with CRC-16 calculation. + + CRC tables are cached for performance. + """ + + _cache: Dict[int, List[int]] = {} + + @classmethod + def get_table(cls, polynomial: int) -> List[int]: + """Return the CRC-16 table for a polynomial.""" + try: + crc_table = cls._cache[polynomial] + except KeyError: + crc_table = [] + for dividend in range(0, 256): + remainder = dividend + for _ in range(0, 8): + if remainder & 1: + remainder = remainder >> 1 ^ polynomial + else: + remainder = remainder >> 1 + crc_table.append(remainder) + cls._cache[polynomial] = crc_table + return crc_table + + @classmethod + def calculate( + cls, + sequence: Sequence[int], + polynomial: int = 0xA001, # CRC-16-ANSI. + init_value: int = 0xFFFF, + ) -> int: + """Calculate the CRC-16 of a sequence of integers.""" + crc_table = cls.get_table(polynomial) + crc = init_value + for item in sequence: + crc = crc >> 8 ^ crc_table[(crc ^ item) & 0xFF] + return crc diff --git a/broadlink/hub.py b/broadlink/hub.py new file mode 100644 index 00000000..0fd4ae53 --- /dev/null +++ b/broadlink/hub.py @@ -0,0 +1,98 @@ +"""Support for hubs.""" +import struct +import json +from typing import Optional + +from . import exceptions as e +from .device import Device + + +class s3(Device): + """Controls a Broadlink S3.""" + + TYPE = "S3" + MAX_SUBDEVICES = 8 + + def get_subdevices(self, step: int = 5) -> list: + """Return a list of sub devices.""" + total = self.MAX_SUBDEVICES + sub_devices = [] + seen = set() + index = 0 + + while index < total: + state = {"count": step, "index": index} + packet = self._encode(14, state) + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + resp = self._decode(resp) + + for device in resp["list"]: + did = device["did"] + if did in seen: + continue + + seen.add(did) + sub_devices.append(device) + + total = resp["total"] + if len(seen) >= total: + break + + index += step + + return sub_devices + + def get_state(self, did: Optional[str] = None) -> dict: + """Return the power state of the device.""" + state = {} + if did is not None: + state["did"] = did + + packet = self._encode(1, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + did: Optional[str] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + pwr3: Optional[bool] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if did is not None: + state["did"] = did + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': '', 'bulb_sceneidx': 255}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + bulb_sceneidx: Optional[int] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if red is not None: + state["red"] = int(red) + if blue is not None: + state["blue"] = int(blue) + if green is not None: + state["green"] = int(green) + if brightness is not None: + state["brightness"] = int(brightness) + if colortemp is not None: + state["colortemp"] = int(colortemp) + if hue is not None: + state["hue"] = int(hue) + if saturation is not None: + state["saturation"] = int(saturation) + if transitionduration is not None: + state["transitionduration"] = int(transitionduration) + if maxworktime is not None: + state["maxworktime"] = int(maxworktime) + if bulb_colormode is not None: + state["bulb_colormode"] = int(bulb_colormode) + if bulb_scenes is not None: + state["bulb_scenes"] = str(bulb_scenes) + if bulb_scene is not None: + state["bulb_scene"] = str(bulb_scene) + if bulb_sceneidx is not None: + state["bulb_sceneidx"] = int(bulb_sceneidx) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(14) + data = json.dumps(state, separators=(",", ":")).encode() + p_len = 12 + len(data) + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{'red': 128, 'blue': 255, 'green': 128, 'pwr': 1, 'brightness': 75, 'colortemp': 2700, 'hue': 240, 'saturation': 50, 'transitionduration': 1500, 'maxworktime': 0, 'bulb_colormode': 1, 'bulb_scenes': '["@01686464,0,0,0", "#ffffff,10,0,#000000,190,0,0", "2700+100,0,0,0", "#ff0000,500,2500,#00FF00,500,2500,#0000FF,500,2500,0", "@01686464,100,2400,@01686401,100,2400,0", "@01686464,100,2400,@01686401,100,2400,@005a6464,100,2400,@005a6401,100,2400,0", "@01686464,10,0,@00000000,190,0,0", "@01686464,200,0,@005a6464,200,0,0"]', 'bulb_scene': ''}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if red is not None: + state["red"] = int(red) + if blue is not None: + state["blue"] = int(blue) + if green is not None: + state["green"] = int(green) + if brightness is not None: + state["brightness"] = int(brightness) + if colortemp is not None: + state["colortemp"] = int(colortemp) + if hue is not None: + state["hue"] = int(hue) + if saturation is not None: + state["saturation"] = int(saturation) + if transitionduration is not None: + state["transitionduration"] = int(transitionduration) + if maxworktime is not None: + state["maxworktime"] = int(maxworktime) + if bulb_colormode is not None: + state["bulb_colormode"] = int(bulb_colormode) + if bulb_scenes is not None: + state["bulb_scenes"] = str(bulb_scenes) + if bulb_scene is not None: + state["bulb_scene"] = str(bulb_scene) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" bytes: + """Pack the timestamp to be sent over the Broadlink protocol.""" + data = bytearray(12) + utcoffset = int(datetime.utcoffset().total_seconds() / 3600) + data[:0x04] = utcoffset.to_bytes(4, "little", signed=True) + data[0x04:0x06] = datetime.year.to_bytes(2, "little") + data[0x06] = datetime.minute + data[0x07] = datetime.hour + data[0x08] = int(datetime.strftime("%y")) + data[0x09] = datetime.isoweekday() + data[0x0A] = datetime.day + data[0x0B] = datetime.month + return data + + @staticmethod + def unpack(data: bytes) -> dt.datetime: + """Unpack a timestamp received over the Broadlink protocol.""" + utcoffset = int.from_bytes(data[0x00:0x04], "little", signed=True) + year = int.from_bytes(data[0x04:0x06], "little") + minute = data[0x06] + hour = data[0x07] + subyear = data[0x08] + isoweekday = data[0x09] + day = data[0x0A] + month = data[0x0B] + + tz_info = dt.timezone(dt.timedelta(hours=utcoffset)) + datetime = dt.datetime(year, month, day, hour, minute, 0, 0, tz_info) + + if datetime.isoweekday() != isoweekday: + raise ValueError("isoweekday does not match") + if int(datetime.strftime("%y")) != subyear: + raise ValueError("subyear does not match") + + return datetime + + @staticmethod + def now() -> dt.datetime: + """Return the current date and time with timezone info.""" + tz_info = dt.timezone(dt.timedelta(seconds=-time.timezone)) + return dt.datetime.now(tz_info) diff --git a/broadlink/remote.py b/broadlink/remote.py new file mode 100644 index 00000000..60c54ce2 --- /dev/null +++ b/broadlink/remote.py @@ -0,0 +1,173 @@ +"""Support for universal remotes.""" +import struct +from typing import List, Optional, Tuple + +from . import exceptions as e +from .device import Device + + +def pulses_to_data(pulses: List[int], tick: float = 32.84) -> bytes: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 + + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: + result.append(0) + result.append(div) + result.append(mod) + + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result + + +def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" + result = [] + index = 4 + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] + index += 1 + + if chunk == 0: + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError as err: + raise ValueError("Malformed data.") from err + index += 2 + + result.append(int(chunk * tick)) + + return result + + +class rmmini(Device): + """Controls a Broadlink RM mini 3.""" + + TYPE = "RMMINI" + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" None: + """Update device name and lock status.""" + resp = self._send(0x1) + self.name = resp[0x48:].split(b"\x00")[0].decode() + self.is_locked = bool(resp[0x87]) + + def send_data(self, data: bytes) -> None: + """Send a code to the device.""" + self._send(0x2, data) + + def enter_learning(self) -> None: + """Enter infrared learning mode.""" + self._send(0x3) + + def check_data(self) -> bytes: + """Return the last captured code.""" + return self._send(0x4) + + +class rmpro(rmmini): + """Controls a Broadlink RM pro.""" + + TYPE = "RMPRO" + + def sweep_frequency(self) -> None: + """Sweep frequency.""" + self._send(0x19) + + def check_frequency(self) -> Tuple[bool, float]: + """Return True if the frequency was identified successfully.""" + resp = self._send(0x1A) + is_found = bool(resp[0]) + frequency = struct.unpack(" None: + """Enter radiofrequency learning mode.""" + payload = bytearray() + if frequency: + payload += struct.pack(" None: + """Cancel sweep frequency.""" + self._send(0x1E) + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + resp = self._send(0x1) + temp = struct.unpack(" float: + """Return the temperature.""" + return self.check_sensors()["temperature"] + + +class rmminib(rmmini): + """Controls a Broadlink RM mini 3 (new firmware).""" + + TYPE = "RMMINIB" + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a packet to the device.""" + packet = struct.pack(" dict: + """Return the state of the sensors.""" + resp = self._send(0x24) + temp = struct.unpack(" float: + """Return the temperature.""" + return self.check_sensors()["temperature"] + + def check_humidity(self) -> float: + """Return the humidity.""" + return self.check_sensors()["humidity"] + + +class rm4pro(rm4mini, rmpro): + """Controls a Broadlink RM4 pro.""" + + TYPE = "RM4PRO" + + +class rm(rmpro): + """For backwards compatibility.""" + + TYPE = "RM2" + + +class rm4(rm4pro): + """For backwards compatibility.""" + + TYPE = "RM4" diff --git a/broadlink/sensor.py b/broadlink/sensor.py new file mode 100644 index 00000000..284576fa --- /dev/null +++ b/broadlink/sensor.py @@ -0,0 +1,90 @@ +"""Support for sensors.""" +from typing import Sequence + +from . import exceptions as e +from .device import Device + + +class a1(Device): + """Controls a Broadlink A1.""" + + TYPE = "A1" + + _SENSORS_AND_LEVELS = ( + ("light", ("dark", "dim", "normal", "bright")), + ("air_quality", ("excellent", "good", "normal", "bad")), + ("noise", ("quiet", "normal", "noisy")), + ) + + def check_sensors(self) -> dict: + """Return the state of the sensors.""" + data = self.check_sensors_raw() + for sensor, levels in self._SENSORS_AND_LEVELS: + try: + data[sensor] = levels[data[sensor]] + except IndexError: + data[sensor] = "unknown" + return data + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + packet = bytearray([0x1]) + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + data = self.decrypt(resp[0x38:]) + + return { + "temperature": data[0x04] + data[0x05] / 10.0, + "humidity": data[0x06] + data[0x07] / 10.0, + "light": data[0x08], + "air_quality": data[0x0A], + "noise": data[0x0C], + } + + +class a2(Device): + """Controls a Broadlink A2.""" + + TYPE = "A2" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + data = self._send(1) + + return { + "temperature": data[0x13] * 256 + data[0x14], + "humidity": data[0x15] * 256 + data[0x16], + "pm10": data[0x0D] * 256 + data[0x0E], + "pm2_5": data[0x0F] * 256 + data[0x10], + "pm1": data[0x11] * 256 + data[0x12], + } diff --git a/broadlink/switch.py b/broadlink/switch.py new file mode 100644 index 00000000..8393f6b1 --- /dev/null +++ b/broadlink/switch.py @@ -0,0 +1,472 @@ +"""Support for switches.""" +import json +import struct +from typing import Optional + +from . import exceptions as e +from .device import Device + + +class sp1(Device): + """Controls a Broadlink SP1.""" + + TYPE = "SP1" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(4) + packet[0] = bool(pwr) + response = self.send_packet(0x66, packet) + e.check_error(response[0x22:0x24]) + + +class sp2(Device): + """Controls a Broadlink SP2.""" + + TYPE = "SP2" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = bool(pwr) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def check_power(self) -> bool: + """Return the power state of the device.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4]) + + +class sp2s(sp2): + """Controls a Broadlink SP2S.""" + + TYPE = "SP2S" + + def get_energy(self) -> float: + """Return the power consumption in W.""" + packet = bytearray(16) + packet[0] = 4 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return int.from_bytes(payload[0x4:0x7], "little") / 1000 + + +class sp3(Device): + """Controls a Broadlink SP3.""" + + TYPE = "SP3" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = self.check_nightlight() << 1 | bool(pwr) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def set_nightlight(self, ntlight: bool) -> None: + """Set the night light state of the device.""" + packet = bytearray(16) + packet[0] = 2 + packet[4] = bool(ntlight) << 1 | self.check_power() + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def check_power(self) -> bool: + """Return the power state of the device.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4] & 1) + + def check_nightlight(self) -> bool: + """Return the state of the night light.""" + packet = bytearray(16) + packet[0] = 1 + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return bool(payload[0x4] & 2) + + +class sp3s(sp2): + """Controls a Broadlink SP3S.""" + + TYPE = "SP3S" + + def get_energy(self) -> float: + """Return the power consumption in W.""" + packet = bytearray([8, 0, 254, 1, 5, 1, 0, 0, 0, 45]) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + energy = payload[0x7:0x4:-1].hex() + return int(energy) / 100 + + +class sp4(Device): + """Controls a Broadlink SP4.""" + + TYPE = "SP4" + + def set_power(self, pwr: bool) -> None: + """Set the power state of the device.""" + self.set_state(pwr=pwr) + + def set_nightlight(self, ntlight: bool) -> None: + """Set the night light state of the device.""" + self.set_state(ntlight=ntlight) + + def set_state( + self, + pwr: Optional[bool] = None, + ntlight: Optional[bool] = None, + indicator: Optional[bool] = None, + ntlbrightness: Optional[int] = None, + maxworktime: Optional[int] = None, + childlock: Optional[bool] = None, + ) -> dict: + """Set state of device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if ntlight is not None: + state["ntlight"] = int(bool(ntlight)) + if indicator is not None: + state["indicator"] = int(bool(indicator)) + if ntlbrightness is not None: + state["ntlbrightness"] = ntlbrightness + if maxworktime is not None: + state["maxworktime"] = maxworktime + if childlock is not None: + state["childlock"] = int(bool(childlock)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + return self._decode(response) + + def check_power(self) -> bool: + """Return the power state of the device.""" + state = self.get_state() + return bool(state["pwr"]) + + def check_nightlight(self) -> bool: + """Return the state of the night light.""" + state = self.get_state() + return bool(state["ntlight"]) + + def get_state(self) -> dict: + """Get full state of device.""" + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a message.""" + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Get full state of device.""" + state = super().get_state() + + # Convert sensor data to float. Remove keys if sensors are not supported. + sensor_attrs = ["current", "volt", "power", "totalconsum", "overload"] + for attr in sensor_attrs: + value = state.pop(attr, -1) + if value != -1: + state[attr] = value / 1000 + return state + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(14) + data = json.dumps(state, separators=(",", ":")).encode() + length = 12 + len(data) + struct.pack_into( + " dict: + """Decode a message.""" + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Return the power state of the device. + + Example: `{"pwr":1,"pwr1":1,"pwr2":0,"maxworktime":60,"maxworktime1":60,"maxworktime2":0,"idcbrightness":50}` + """ + packet = self._encode(1, {}) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + pwr: Optional[bool] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + maxworktime: Optional[int] = None, + maxworktime1: Optional[int] = None, + maxworktime2: Optional[int] = None, + idcbrightness: Optional[int] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if maxworktime is not None: + state["maxworktime"] = maxworktime + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a message.""" + packet = bytearray(14) + data = json.dumps(state).encode() + length = 12 + len(data) + struct.pack_into( + " dict: + """Decode a message.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if maxworktime3 is not None: + state["maxworktime3"] = maxworktime3 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + if childlock is not None: + state["childlock"] = int(bool(childlock)) + if childlock1 is not None: + state["childlock1"] = int(bool(childlock1)) + if childlock2 is not None: + state["childlock2"] = int(bool(childlock2)) + if childlock3 is not None: + state["childlock3"] = int(bool(childlock3)) + if childlock4 is not None: + state["childlock4"] = int(bool(childlock4)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + +class mp1(Device): + """Controls a Broadlink MP1.""" + + TYPE = "MP1" + + def set_power_mask(self, sid_mask: int, pwr: bool) -> None: + """Set the power state of the device.""" + packet = bytearray(16) + packet[0x00] = 0x0D + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + ((sid_mask << 1) if pwr else sid_mask) + packet[0x07] = 0xC0 + packet[0x08] = 0x02 + packet[0x0A] = 0x03 + packet[0x0D] = sid_mask + packet[0x0E] = sid_mask if pwr else 0 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + + def set_power(self, sid: int, pwr: bool) -> None: + """Set the power state of the device.""" + sid_mask = 0x01 << (sid - 1) + self.set_power_mask(sid_mask, pwr) + + def check_power_raw(self) -> int: + """Return the power state of the device in raw format.""" + packet = bytearray(16) + packet[0x00] = 0x0A + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xAE + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + return payload[0x0E] + + def check_power(self) -> dict: + """Return the power state of the device.""" + data = self.check_power_raw() + return { + "s1": bool(data & 1), + "s2": bool(data & 2), + "s3": bool(data & 4), + "s4": bool(data & 8), + } + + +class mp1s(mp1): + """Controls a Broadlink MP1S.""" + + TYPE = "MP1S" + + def get_state(self) -> dict: + """Return the power state of the device. + + voltage in V. + current in A. + power in W. + power consumption in kW·h. + """ + packet = bytearray(16) + packet[0x00] = 0x0E + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + packet[0x0A] = 0x04 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + payload_str = payload.hex()[4:-6] + + def get_value(start, end, factors): + value = sum( + int(payload_str[i-2:i]) * factor + for i, factor in zip(range(start, end, -2), factors) + ) + return value + + return { + "volt": get_value(34, 30, [10, 0.1]), + "current": get_value(40, 34, [1, 0.01, 0.0001]), + "power": get_value(46, 40, [100, 1, 0.01]), + "totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]), + } diff --git a/cli/README.md b/cli/README.md index 5d7b3be1..b7e48dc9 100644 --- a/cli/README.md +++ b/cli/README.md @@ -1,37 +1,29 @@ Command line interface for python-broadlink =========================================== -This is a command line interface for broadlink python library - -Tested with BroadLink RMPRO / RM2 +This is a command line interface for the python-broadlink API. Requirements ------------ -You should have the broadlink python installed, this can be made in many linux distributions using : +You need to install the module first: ``` -sudo pip install broadlink +pip3 install broadlink ``` Installation ----------- -Just copy this files +Download "broadlink_cli" and "broadlink_discovery". Programs -------- +* broadlink_discovery: Discover Broadlink devices connected to the local network. - -* broadlink_discovery -used to run the discovery in the network -this program withh show the command line parameters to be used with -broadlink_cli to select broadlink device - -* broadlink_cli -used to send commands and query the broadlink device +* broadlink_cli: Send commands and query the Broadlink device. -device specification formats +Device specification formats ---------------------------- Using separate parameters for each information: @@ -48,33 +40,99 @@ Using file with parameters: ``` broadlink_cli --device @BEDROOM.device --temp ``` -This is prefered as the configuration is stored in file and you can change -just a file to point to a different hardware +This is prefered as the configuration is stored in a file and you can change +it later to point to a different device. -Sample usage ------------- +Example usage +------------- + +### Common commands -Learn commands : +#### Join device to the Wi-Fi network +``` +broadlink_cli --joinwifi SSID PASSWORD +``` + +#### Discover devices connected to the local network +``` +broadlink_discovery +``` + +### Universal remotes + +#### Learn IR code and show at console ``` -# Learn and save to file -broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power -# LEard and show at console broadlink_cli --device @BEDROOM.device --learn ``` +#### Learn RF code and show at console +``` +broadlink_cli --device @BEDROOM.device --rfscanlearn +``` + +#### Learn IR code and save to file +``` +broadlink_cli --device @BEDROOM.device --learnfile LG-TV.power +``` + +#### Learn RF code and save to file +``` +broadlink_cli --device @BEDROOM.device --rfscanlearn --learnfile LG-TV.power +``` -Send command : +#### Send code +``` +broadlink_cli --device @BEDROOM.device --send DATA +``` + +#### Send code from file ``` broadlink_cli --device @BEDROOM.device --send @LG-TV.power -broadlink_cli --device @BEDROOM.device --send ....datafromlearncommand... ``` -Get Temperature : +#### Check temperature ``` broadlink_cli --device @BEDROOM.device --temperature ``` -Get Energy Consumption (For a SmartPlug) : +#### Check humidity +``` +broadlink_cli --device @BEDROOM.device --humidity +``` + +### Smart plugs + +#### Turn on +``` +broadlink_cli --device @BEDROOM.device --turnon +``` + +#### Turn off +``` +broadlink_cli --device @BEDROOM.device --turnoff +``` + +#### Turn on nightlight +``` +broadlink_cli --device @BEDROOM.device --turnnlon +``` + +#### Turn off nightlight +``` +broadlink_cli --device @BEDROOM.device --turnnloff +``` + +#### Check power state +``` +broadlink_cli --device @BEDROOM.device --check +``` + +#### Check nightlight state +``` +broadlink_cli --device @BEDROOM.device --checknl +``` + +#### Check power consumption ``` broadlink_cli --device @BEDROOM.device --energy ``` diff --git a/cli/broadlink_cli b/cli/broadlink_cli index 54f02a07..7913e332 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,65 +1,32 @@ -#!/usr/bin/env python - -import broadlink -import sys +#!/usr/bin/env python3 import argparse +import base64 import time +from typing import List + +import broadlink +from broadlink.const import DEFAULT_PORT +from broadlink.exceptions import ReadError, StorageError +from broadlink.remote import data_to_pulses, pulses_to_data -TICK = 32.84 -IR_TOKEN = 0x26 +TIMEOUT = 30 def auto_int(x): return int(x, 0) -def to_microseconds(bytes): - result = [] - # print bytes[0] # 0x26 = 38for IR - length = bytes[2] + 256 * bytes[3] # presently ignored - index = 4 - while index < len(bytes): - chunk = bytes[index] - index += 1 - if chunk == 0: - chunk = bytes[index] - chunk = 256 * chunk + bytes[index + 1] - index += 2 - result.append(int(round(chunk*TICK))) - if chunk == 0x0d05: - break - return result - - -def durations_to_broadlink(durations): - result = bytearray() - result.append(IR_TOKEN) - result.append(0) - result.append(len(durations) % 256) - result.append(len(durations) / 256) - for dur in durations: - num = int(round(dur/TICK)) - if num > 255: - result.append(0) - result.append(num / 256) - result.append(num % 256) - return result +def format_pulses(pulses: List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) -def format_durations(data): - result = '' - for i in range(0, len(data)): - if len(result) > 0: - result += ' ' - result += ('+' if i % 2 == 0 else '-') + str(data[i]) - return result - - -def parse_durations(str): - result = [] - for s in str.split(): - result.append(abs(int(s))) - return result +def parse_pulses(data: List[str]) -> List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] parser = argparse.ArgumentParser(fromfile_prefix_chars='@') @@ -67,8 +34,9 @@ parser.add_argument("--device", help="device definition as 'type host mac'") parser.add_argument("--type", type=auto_int, default=0x2712, help="type of device") parser.add_argument("--host", help="host address") parser.add_argument("--mac", help="mac address (hex reverse), as used by python-broadlink library") -parser.add_argument("--temperature",action="store_true", help="request temperature from device") -parser.add_argument("--energy",action="store_true", help="request energy consumption from device") +parser.add_argument("--temperature", action="store_true", help="request temperature from device") +parser.add_argument("--humidity", action="store_true", help="request humidity from device") +parser.add_argument("--energy", action="store_true", help="request energy consumption from device") parser.add_argument("--check", action="store_true", help="check current power state") parser.add_argument("--checknl", action="store_true", help="check current nightlight state") parser.add_argument("--turnon", action="store_true", help="turn on device") @@ -79,105 +47,175 @@ parser.add_argument("--switch", action="store_true", help="switch state from on parser.add_argument("--send", action="store_true", help="send command") parser.add_argument("--sensors", action="store_true", help="check all sensors") parser.add_argument("--learn", action="store_true", help="learn command") -parser.add_argument("--learnfile", help="learn command and save to specified file") -parser.add_argument("--durations", action="store_true", help="use durations in micro seconds instead of the Broadlink format") +parser.add_argument("--rflearn", action="store_true", help="rf scan learning") +parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") +parser.add_argument("--learnfile", help="save learned command to a specified file") +parser.add_argument("--durations", action="store_true", + help="use durations in micro seconds instead of the Broadlink format") parser.add_argument("--convert", action="store_true", help="convert input data to durations") +parser.add_argument("--joinwifi", nargs=2, help="Args are SSID PASSPHRASE to configure Broadlink device with") parser.add_argument("data", nargs='*', help="Data to send or convert") args = parser.parse_args() if args.device: values = args.device.split() - type = int(values[0],0) + devtype = int(values[0], 0) host = values[1] mac = bytearray.fromhex(values[2]) elif args.mac: - type = args.type + devtype = args.type host = args.host mac = bytearray.fromhex(args.mac) if args.host or args.device: - dev = broadlink.gendevice(type, (host, 80), mac) + dev = broadlink.gendevice(devtype, (host, DEFAULT_PORT), mac) dev.auth() +if args.joinwifi: + broadlink.setup(args.joinwifi[0], args.joinwifi[1], 4) + if args.convert: data = bytearray.fromhex(''.join(args.data)) - durations = to_microseconds(data) - print format_durations(durations) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) if args.temperature: - print dev.check_temperature() + print(dev.check_temperature()) +if args.humidity: + print(dev.check_humidity()) if args.energy: - print dev.get_energy() + print(dev.get_energy()) if args.sensors: - try: - data = dev.check_sensors() - except: - data = {} - data['temperature'] = dev.check_temperature() + data = dev.check_sensors() for key in data: - print "{} {}".format(key, data[key]) + print("{} {}".format(key, data[key])) if args.send: - data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ - if args.durations else bytearray.fromhex(''.join(args.data)) + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) dev.send_data(data) -if args.learn or args.learnfile: +if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() - data = None - print "Learning..." - timeout = 30 - while (data is None) and (timeout > 0): - time.sleep(2) - timeout -= 2 - data = dev.check_data() - if data: - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learn: - print learned - if args.learnfile: - print "Saving to {}".format(args.learnfile) - with open(args.learnfile, "w") as text_file: - text_file.write(learned) + print("Learning...") + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + try: + data = dev.check_data() + except (ReadError, StorageError): + continue + else: + break else: - print "No data received..." + print("No data received...") + exit(1) + + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: + print("Saving to {}".format(args.learnfile)) + with open(args.learnfile, "w") as text_file: + text_file.write(pulse_fmt if args.durations else raw_fmt) if args.check: if dev.check_power(): - print '* ON *' + print('* ON *') else: - print '* OFF *' + print('* OFF *') if args.checknl: if dev.check_nightlight(): - print '* ON *' + print('* ON *') else: - print '* OFF *' + print('* OFF *') if args.turnon: dev.set_power(True) if dev.check_power(): - print '== Turned * ON * ==' + print('== Turned * ON * ==') else: - print '!! Still OFF !!' + print('!! Still OFF !!') if args.turnoff: dev.set_power(False) if dev.check_power(): - print '!! Still ON !!' + print('!! Still ON !!') else: - print '== Turned * OFF * ==' + print('== Turned * OFF * ==') if args.turnnlon: dev.set_nightlight(True) if dev.check_nightlight(): - print '== Turned * ON * ==' + print('== Turned * ON * ==') else: - print '!! Still OFF !!' + print('!! Still OFF !!') if args.turnnloff: dev.set_nightlight(False) if dev.check_nightlight(): - print '!! Still ON !!' + print('!! Still ON !!') else: - print '== Turned * OFF * ==' + print('== Turned * OFF * ==') if args.switch: if dev.check_power(): dev.set_power(False) - print '* Switch to OFF *' + print('* Switch to OFF *') else: dev.set_power(True) - print '* Switch to ON *' + print('* Switch to ON *') +if args.rflearn: + if args.frequency: + frequency = args.frequency + print("Press the button you want to learn, a short press...") + else: + dev.sweep_frequency() + print("Detecting radiofrequency, press and hold the button to learn...") + + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + locked, frequency = dev.check_frequency() + if locked: + break + else: + print("Radiofrequency not found") + dev.cancel_sweep_frequency() + exit(1) + + print("Radiofrequency detected: {}MHz".format(frequency)) + print("You can now let go of the button") + + input("Press enter to continue...") + + print("Press the button again, now a short press.") + + dev.find_rf_packet(frequency) + + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + try: + data = dev.check_data() + except (ReadError, StorageError): + continue + else: + break + else: + print("No data received...") + exit(1) + + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: + print("Saving to {}".format(args.learnfile)) + with open(args.learnfile, "w") as text_file: + text_file.write(pulse_fmt if args.durations else raw_fmt) diff --git a/cli/broadlink_discovery b/cli/broadlink_discovery index 385f1932..477e1bd7 100755 --- a/cli/broadlink_discovery +++ b/cli/broadlink_discovery @@ -1,24 +1,30 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 +import argparse import broadlink -import time -import argparse +from broadlink.const import DEFAULT_BCAST_ADDR, DEFAULT_TIMEOUT +from broadlink.exceptions import StorageError -parser = argparse.ArgumentParser(fromfile_prefix_chars='@'); -parser.add_argument("--timeout", type=int, default=5, help="timeout to wait for receiving discovery responses") +parser = argparse.ArgumentParser(fromfile_prefix_chars='@') +parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="timeout to wait for receiving discovery responses") +parser.add_argument("--ip", default=None, help="ip address to use in the discovery") +parser.add_argument("--dst-ip", default=DEFAULT_BCAST_ADDR, help="destination ip address to use in the discovery") args = parser.parse_args() -print "Discovering..." -devices = broadlink.discover(timeout=args.timeout) +print("Discovering...") +devices = broadlink.discover(timeout=args.timeout, local_ip_address=args.ip, discover_ip_address=args.dst_ip) for device in devices: if device.auth(): - print "###########################################" - print device.type - print "# broadlink_cli --type {} --host {} --mac {}".format(hex(device.devtype), device.host[0], ''.join(format(x, '02x') for x in device.mac)) - print "Device file data (to be used with --device @filename in broadlink_cli) : " - print "{} {} {}".format(hex(device.devtype), device.host[0], ''.join(format(x, '02x') for x in device.mac)) - if hasattr(device, 'check_temperature'): - print "temperature = {}".format(device.check_temperature()) - print "" + print("###########################################") + print(device.type) + print("# broadlink_cli --type {} --host {} --mac {}".format(hex(device.devtype), device.host[0], + ''.join(format(x, '02x') for x in device.mac))) + print("Device file data (to be used with --device @filename in broadlink_cli) : ") + print("{} {} {}".format(hex(device.devtype), device.host[0], ''.join(format(x, '02x') for x in device.mac))) + try: + print("temperature = {}".format(device.check_temperature())) + except (AttributeError, StorageError): + pass + print("") else: - print "Error authenticating with device : {}".format(device.host) + print("Error authenticating with device : {}".format(device.host)) diff --git a/protocol.md b/protocol.md index 2e388d74..e2825640 100644 --- a/protocol.md +++ b/protocol.md @@ -181,11 +181,13 @@ Send the following payload with a command byte of 0x006a |0x05|repeat count, (0 = no repeat, 1 send twice, .....)| |0x06-0x07|Length of the following data in little endian| |0x08 ....|Pulse lengths in 2^-15 s units (µs * 269 / 8192 works very well)| -|....|0x0d 0x05 at the end for IR only| +|....|For IR codes, the pulse lengths should be paired as ON, OFF| Each value is represented by one byte. If the length exceeds one byte then it is stored big endian with a leading 0. +Captures of IR codes from the device will always end with a constant OFF value of `0x00 0x0d 0x05` but the trailing silence can be anything on transmit. The likely reason for this value is a capped timeout value on detection. The value is about 102 milliseconds. + Example: The header for my Optoma projector is 8920 4450 8920 * 269 / 8192 = 0x124 4450 * 269 / 8192 = 0x92 diff --git a/requirements.txt b/requirements.txt index 9b20c33e..2c6c996c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -pycryptodome==3.6.6 +cryptography==3.2 diff --git a/setup.py b/setup.py index f4482eff..0426f148 100644 --- a/setup.py +++ b/setup.py @@ -1,38 +1,28 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import re -from setuptools import setup, find_packages -import sys -import warnings -try: - import pyaes - dynamic_requires = ["pyaes==1.6.0"] -except ImportError as e: - dynamic_requires = ['pycryptodome==3.6.6'] +from setuptools import setup, find_packages -# For Hysen thermostatic heating controller -dynamic_requires.append('PyCRC') -version = 0.9 +version = '0.19.0' setup( - name='broadlink', - version=0.9, - author='Matthew Garrett', - author_email='mjg59@srcf.ucam.org', - url='http://github.com/mjg59/python-broadlink', + name="broadlink", + version=version, + author="Matthew Garrett", + author_email="mjg59@srcf.ucam.org", + url="http://github.com/mjg59/python-broadlink", packages=find_packages(), scripts=[], - install_requires=dynamic_requires, - description='Python API for controlling Broadlink IR controllers', + install_requires=["cryptography>=3.2"], + description="Python API for controlling Broadlink devices", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Programming Language :: Python', + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python", ], include_package_data=True, zip_safe=False,