diff --git a/.github/workflows/flake8.yaml b/.github/workflows/flake8.yaml index cb396611..aa09a19c 100644 --- a/.github/workflows/flake8.yaml +++ b/.github/workflows/flake8.yaml @@ -2,20 +2,20 @@ name: Python flake8 on: push: - branches: [ main, master, dev, development ] + branches: [ master, dev ] pull_request: - branches: [ main, master, dev, development ] + branches: [ master, dev ] jobs: test: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: matrix: python-version: [3.6, 3.7, 3.8, 3.9] steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/README.md b/README.md index 009315d1..81c6de5b 100644 --- a/README.md +++ b/README.md @@ -2,16 +2,17 @@ A Python module and CLI for controlling Broadlink devices locally. The following devices are supported: -- **Universal remotes**: RM home, RM mini 3, RM plus, RM pro, RM pro+, RM4 mini, RM4 pro, RM4C mini, RM4S +- **Universal remotes**: RM home, RM mini 3, RM plus, RM pro, RM pro+, RM4 mini, RM4 pro, RM4C mini, RM4S, RM4 TV mate - **Smart plugs**: SP mini, SP mini 3, SP mini+, SP1, SP2, SP2-BR, SP2-CL, SP2-IN, SP2-UK, SP3, SP3-EU, SP3S-EU, SP3S-US, SP4L-AU, SP4L-EU, SP4L-UK, SP4M, SP4M-US, Ankuoo NEO, Ankuoo NEO PRO, Efergy Ego, BG AHC/U-01 - **Switches**: MCB1, SC1, SCB1E, SCB2 - **Outlets**: BG 800, BG 900 - **Power strips**: MP1-1K3S2U, MP1-1K4S, MP2 - **Environment sensors**: A1 - **Alarm kits**: S1C, S2KIT -- **Light bulbs**: LB1, LB2, SB800TD +- **Light bulbs**: LB1, LB26 R1, LB27 R1, SB800TD - **Curtain motors**: Dooya DT360E-45/20 - **Thermostats**: Hysen HY02B05H +- **Hubs**: S3 ## Installation @@ -49,6 +50,13 @@ broadlink.setup('myssid', 'mynetworkpass', 3) Security mode options are (0 = none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) +#### Advanced options + +You may need to specify a broadcast address if setup is not working. +```python3 +broadlink.setup('myssid', 'mynetworkpass', 3, ip_address='192.168.0.255') +``` + ### Discovery Use this function to discover devices: @@ -60,17 +68,19 @@ devices = broadlink.discover() #### Advanced options You may need to specify `local_ip_address` or `discover_ip_address` if discovery does not return any devices. +Using the IP address of your local machine: ```python3 -devices = broadlink.discover(local_ip_address='192.168.0.100') # IP address of your local machine. +devices = broadlink.discover(local_ip_address='192.168.0.100') ``` +Using the broadcast address of your subnet: ```python3 -devices = broadlink.discover(discover_ip_address='192.168.0.255') # Broadcast address of your subnet. +devices = broadlink.discover(discover_ip_address='192.168.0.255') ``` If the device is locked, it may not be discoverable with broadcast. In such cases, you can use the unicast version `broadlink.hello()` for direct discovery: ```python3 -device = broadlink.hello('192.168.0.16') # IP address of your Broadlink device. +device = broadlink.hello('192.168.0.16') ``` If you are a perfomance freak, use `broadlink.xdiscover()` to create devices instantly: @@ -105,23 +115,33 @@ packet = device.check_data() ### Learning RF codes -Learning IR codes takes place in five steps. +Learning RF codes takes place in six steps. 1. Sweep the frequency: ```python3 device.sweep_frequency() ``` 2. When the LED blinks, point the remote at the Broadlink device for the first time and long press the button you want to learn. -3. Enter learning mode: +3. Check if the frequency was successfully identified: +```python3 +ok = device.check_frequency() +if ok: + print('Frequency found!') +``` +4. Enter learning mode: ```python3 device.find_rf_packet() ``` -4. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. -5. Get the RF packet: +5. When the LED blinks, point the remote at the Broadlink device for the second time and short press the button you want to learn. +6. Get the RF packet: ```python3 packet = device.check_data() ``` +#### Notes + +Universal remotes with product id 0x2712 use the same method for learning IR and RF codes. They don't need to sweep frequency. Just call `device.enter_learning()` and `device.check_data()`. + ### Canceling learning You can exit the learning mode in the middle of the process by calling this method: @@ -194,4 +214,34 @@ devices[0].set_state(bulb_colormode=1) ### Fetching sensor data ```python3 data = device.check_sensors() -``` \ No newline at end of file +``` + +## Hubs + +### Discovering subdevices +```python3 +device.get_subdevices() +``` + +### Fetching data +Use the DID obtained from get_subdevices() for the input parameter to query specific sub-device. + +```python3 +device.get_state(did="00000000000000000000a043b0d06963") +``` + +### Setting state attributes +The parameters depend on the type of subdevice that is being controlled. In this example, we are controlling LC-1 switches: + +#### Turn on +```python3 +device.set_state(did="00000000000000000000a043b0d0783a", pwr=1) +device.set_state(did="00000000000000000000a043b0d0783a", pwr1=1) +device.set_state(did="00000000000000000000a043b0d0783a", pwr2=1) +``` +#### Turn off +```python3 +device.set_state(did="00000000000000000000a043b0d0783a", pwr=0) +device.set_state(did="00000000000000000000a043b0d0783a", pwr1=0) +device.set_state(did="00000000000000000000a043b0d0783a", pwr2=0) +``` diff --git a/broadlink/__init__.py b/broadlink/__init__.py index fc64e844..d3135501 100644 --- a/broadlink/__init__.py +++ b/broadlink/__init__.py @@ -1,18 +1,19 @@ #!/usr/bin/env python3 """The python-broadlink library.""" import socket -import typing as t +from typing import Generator, List, Optional, Tuple, Union from . import exceptions as e from .const import DEFAULT_BCAST_ADDR, DEFAULT_PORT, DEFAULT_TIMEOUT from .alarm import S1C -from .climate import hysen -from .cover import dooya +from .climate import hvac, hysen +from .cover import dooya, dooya2, wser from .device import Device, ping, scan +from .hub import s3 from .light import lb1, lb2 from .remote import rm, rm4, rm4mini, rm4pro, rmmini, rmminib, rmpro -from .sensor import a1 -from .switch import bg1, mp1, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b +from .sensor import a1, a2 +from .switch import bg1, ehc31, mp1, mp1s, sp1, sp2, sp2s, sp3, sp3s, sp4, sp4b SUPPORTED_TYPES = { sp1: { @@ -32,6 +33,7 @@ 0x7544: ("SP2-CL", "Broadlink"), 0x7546: ("SP2-UK/BR/IN", "Broadlink (OEM)"), 0x7547: ("SC1", "Broadlink"), + 0x7549: ("SP mini 3", "Broadlink (OEM)"), 0x7918: ("SP2", "Broadlink (OEM)"), 0x7919: ("SP2-compatible", "Honeywell"), 0x791A: ("SP2-compatible", "Honeywell"), @@ -53,6 +55,7 @@ }, sp4: { 0x7568: ("SP4L-CN", "Broadlink"), + 0x756B: ("SP4M-JP", "Broadlink"), 0x756C: ("SP4M", "Broadlink"), 0x756F: ("MCB1", "Broadlink"), 0x7579: ("SP4L-EU", "Broadlink"), @@ -60,11 +63,15 @@ 0x7583: ("SP mini 3", "Broadlink"), 0x7587: ("SP4L-UK", "Broadlink"), 0x7D11: ("SP mini 3", "Broadlink"), + 0xA4F9: ("WS4", "Broadlink (OEM)"), + 0xA569: ("SP4L-UK", "Broadlink"), 0xA56A: ("MCB1", "Broadlink"), 0xA56B: ("SCB1E", "Broadlink"), 0xA56C: ("SP4L-EU", "Broadlink"), + 0xA576: ("SP4L-AU", "Broadlink"), 0xA589: ("SP4L-UK", "Broadlink"), 0xA5D3: ("SP4L-EU", "Broadlink"), + 0xA6F4: ("SP4D-US", "Broadlink"), }, sp4b: { 0x5115: ("SCB1E", "Broadlink"), @@ -74,11 +81,13 @@ 0x618B: ("SP4L-EU", "Broadlink"), 0x6489: ("SP4L-AU", "Broadlink"), 0x648B: ("SP4M-US", "Broadlink"), + 0x648C: ("SP4L-US", "Broadlink"), 0x6494: ("SCB2", "Broadlink"), }, rmmini: { 0x2737: ("RM mini 3", "Broadlink"), 0x278F: ("RM mini", "Broadlink"), + 0x27B7: ("RM mini 3", "Broadlink"), 0x27C2: ("RM mini 3", "Broadlink"), 0x27C7: ("RM mini 3", "Broadlink"), 0x27CC: ("RM mini 3", "Broadlink"), @@ -111,6 +120,13 @@ }, rm4mini: { 0x51DA: ("RM4 mini", "Broadlink"), + 0x5209: ("RM4 TV mate", "Broadlink"), + 0x520C: ("RM4 mini", "Broadlink"), + 0x520D: ("RM4C mini", "Broadlink"), + 0x5211: ("RM4C mate", "Broadlink"), + 0x5212: ("RM4 TV mate", "Broadlink"), + 0x5216: ("RM4 mini", "Broadlink"), + 0x521C: ("RM4 mini", "Broadlink"), 0x6070: ("RM4C mini", "Broadlink"), 0x610E: ("RM4 mini", "Broadlink"), 0x610F: ("RM4C mini", "Broadlink"), @@ -122,6 +138,9 @@ 0x653A: ("RM4 mini", "Broadlink"), }, rm4pro: { + 0x520B: ("RM4 pro", "Broadlink"), + 0x5213: ("RM4 pro", "Broadlink"), + 0x5218: ("RM4C pro", "Broadlink"), 0x6026: ("RM4 pro", "Broadlink"), 0x6184: ("RM4C pro", "Broadlink"), 0x61A2: ("RM4 pro", "Broadlink"), @@ -129,44 +148,72 @@ 0x653C: ("RM4 pro", "Broadlink"), }, a1: { - 0x2714: ("e-Sensor", "Broadlink"), + 0x2714: ("A1", "Broadlink"), + }, + a2: { + 0x4F60: ("A2", "Broadlink"), }, mp1: { 0x4EB5: ("MP1-1K4S", "Broadlink"), - 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), 0x4F1B: ("MP1-1K3S2U", "Broadlink (OEM)"), 0x4F65: ("MP1-1K3S2U", "Broadlink"), }, + mp1s: { + 0x4EF7: ("MP1-1K4S", "Broadlink (OEM)"), + }, lb1: { 0x5043: ("SB800TD", "Broadlink (OEM)"), 0x504E: ("LB1", "Broadlink"), + 0x606D: ("SLA22RGB9W81/SLA27RGB9W81", "Luceco"), 0x606E: ("SB500TD", "Broadlink (OEM)"), 0x60C7: ("LB1", "Broadlink"), 0x60C8: ("LB1", "Broadlink"), 0x6112: ("LB1", "Broadlink"), + 0x644B: ("LB1", "Broadlink"), + 0x644C: ("LB27 R1", "Broadlink"), + 0x644E: ("LB26 R1", "Broadlink"), + 0x6488: ("LB27 C1", "Broadlink"), }, lb2: { 0xA4F4: ("LB27 R1", "Broadlink"), + 0xA5F7: ("LB27 R1", "Broadlink"), + 0xA6EF: ("EFCF60WSMT", "Luceco"), }, S1C: { 0x2722: ("S2KIT", "Broadlink"), }, + s3: { + 0xA59C: ("S3", "Broadlink"), + 0xA64D: ("S3", "Broadlink"), + }, + hvac: { + 0x4E2A: ("HVAC", "Licensed manufacturer"), + }, hysen: { 0x4EAD: ("HY02/HY03", "Hysen"), }, dooya: { 0x4E4D: ("DT360E-45/20", "Dooya"), }, + dooya2: { + 0x4F6E: ("DT360E-45/20", "Dooya"), + }, + wser: { + 0x4F6C: ("WSER", "Wistar"), + }, bg1: { 0x51E3: ("BG800/BG900", "BG Electrical"), }, + ehc31: { + 0x6480: ("EHC31", "BG Electrical"), + }, } def gendevice( dev_type: int, - host: t.Tuple[str, int], - mac: t.Union[bytes, str], + host: Tuple[str, int], + mac: Union[bytes, str], name: str = "", is_locked: bool = False, ) -> Device: @@ -192,7 +239,7 @@ def gendevice( def hello( - host: str, + ip_address: str, port: int = DEFAULT_PORT, timeout: int = DEFAULT_TIMEOUT, ) -> Device: @@ -202,7 +249,11 @@ def hello( """ try: return next( - xdiscover(timeout=timeout, discover_ip_address=host, discover_ip_port=port) + xdiscover( + timeout=timeout, + discover_ip_address=ip_address, + discover_ip_port=port, + ) ) except StopIteration as err: raise e.NetworkTimeoutError( @@ -214,33 +265,42 @@ def hello( def discover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.List[Device]: +) -> List[Device]: """Discover devices connected to the local network.""" - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) return [gendevice(*resp) for resp in responses] def xdiscover( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.Generator[Device, None, None]: +) -> Generator[Device, None, None]: """Discover devices connected to the local network. This function returns a generator that yields devices instantly. """ - responses = scan(timeout, local_ip_address, discover_ip_address, discover_ip_port) + responses = scan( + timeout, local_ip_address, discover_ip_address, discover_ip_port + ) for resp in responses: yield gendevice(*resp) # Setup a new Broadlink device via AP Mode. Review the README to see how to enter AP Mode. # Only tested with Broadlink RM3 Mini (Blackbean) -def setup(ssid: str, password: str, security_mode: int) -> None: +def setup( + ssid: str, + password: str, + security_mode: int, + ip_address: str = DEFAULT_BCAST_ADDR, +) -> None: """Set up a new Broadlink device via AP mode.""" # Security mode options are (0 - none, 1 = WEP, 2 = WPA1, 3 = WPA2, 4 = WPA1/2) payload = bytearray(0x88) @@ -269,5 +329,5 @@ def setup(ssid: str, password: str, security_mode: int) -> None: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.sendto(payload, (DEFAULT_BCAST_ADDR, DEFAULT_PORT)) + sock.sendto(payload, (ip_address, DEFAULT_PORT)) sock.close() diff --git a/broadlink/climate.py b/broadlink/climate.py old mode 100644 new mode 100755 index eee5f119..1a0c6006 --- a/broadlink/climate.py +++ b/broadlink/climate.py @@ -1,5 +1,7 @@ -"""Support for HVAC units.""" -import typing as t +"""Support for climate control.""" +import enum +import struct +from typing import List, Sequence from . import exceptions as e from .device import Device @@ -19,7 +21,7 @@ class hysen(Device): TYPE = "HYS" - def send_request(self, request: t.Sequence[int]) -> bytes: + def send_request(self, request: Sequence[int]) -> bytes: """Send a request to the device.""" packet = bytearray() packet.extend((len(request) + 2).to_bytes(2, "little")) @@ -31,27 +33,34 @@ def send_request(self, request: t.Sequence[int]) -> bytes: payload = self.decrypt(response[0x38:]) p_len = int.from_bytes(payload[:0x02], "little") - if p_len + 2 > len(payload): - raise ValueError( - "hysen_response_error", "first byte of response is not length" - ) - - nom_crc = int.from_bytes(payload[p_len : p_len + 2], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") real_crc = CRC16.calculate(payload[0x02:p_len]) + if nom_crc != real_crc: - raise ValueError("hysen_response_error", "CRC check on response failed") + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) return payload[0x02:p_len] + def _decode_temp(self, payload, base_index): + base_temp = payload[base_index] / 2.0 + add_offset = (payload[4] >> 3) & 1 # should offset be added? + offset_raw_value = (payload[17] >> 4) & 3 # offset value + offset = (offset_raw_value + 1) / 10 if add_offset else 0.0 + return base_temp + offset + def get_temp(self) -> float: """Return the room temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return payload[0x05] / 2.0 + return self._decode_temp(payload, 5) def get_external_temp(self) -> float: """Return the external temperature in degrees celsius.""" payload = self.send_request([0x01, 0x03, 0x00, 0x00, 0x00, 0x08]) - return payload[18] / 2.0 + return self._decode_temp(payload, 18) def get_full_status(self) -> dict: """Return the state of the device. @@ -64,9 +73,10 @@ def get_full_status(self) -> dict: data["power"] = payload[4] & 1 data["active"] = (payload[4] >> 4) & 1 data["temp_manual"] = (payload[4] >> 6) & 1 - data["room_temp"] = payload[5] / 2.0 + data["heating_cooling"] = (payload[4] >> 7) & 1 + data["room_temp"] = self._decode_temp(payload, 5) data["thermostat_temp"] = payload[6] / 2.0 - data["auto_mode"] = payload[7] & 0xF + data["auto_mode"] = payload[7] & 0x0F data["loop_mode"] = payload[7] >> 4 data["sensor"] = payload[8] data["osv"] = payload[9] @@ -79,7 +89,7 @@ def get_full_status(self) -> dict: data["fre"] = payload[15] data["poweron"] = payload[16] data["unknown"] = payload[17] - data["external_temp"] = payload[18] / 2.0 + data["external_temp"] = self._decode_temp(payload, 18) data["hour"] = payload[19] data["min"] = payload[20] data["sec"] = payload[21] @@ -117,7 +127,9 @@ def get_full_status(self) -> dict: # E.g. loop_mode = 0 ("12345,67") means Saturday and Sunday (weekend schedule) # loop_mode = 2 ("1234567") means every day, including Saturday and Sunday (weekday schedule) # The sensor command is currently experimental - def set_mode(self, auto_mode: int, loop_mode: int, sensor: int = 0) -> None: + def set_mode( + self, auto_mode: int, loop_mode: int, sensor: int = 0 + ) -> None: """Set the mode of the device.""" mode_byte = ((loop_mode + 1) << 4) + auto_mode self.send_request([0x01, 0x06, 0x00, 0x02, mode_byte, sensor]) @@ -185,16 +197,32 @@ def set_temp(self, temp: float) -> None: # Set device on(1) or off(0), does not deactivate Wifi connectivity. # Remote lock disables control by buttons on thermostat. - def set_power(self, power: int = 1, remote_lock: int = 0) -> None: + # heating_cooling: heating(0) cooling(1) + def set_power( + self, power: int = 1, remote_lock: int = 0, heating_cooling: int = 0 + ) -> None: """Set the power state of the device.""" - self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, power]) + state = (heating_cooling << 7) + power + self.send_request([0x01, 0x06, 0x00, 0x00, remote_lock, state]) # set time on device # n.b. day=1 is Monday, ..., day=7 is Sunday def set_time(self, hour: int, minute: int, second: int, day: int) -> None: """Set the time.""" self.send_request( - [0x01, 0x10, 0x00, 0x08, 0x00, 0x02, 0x04, hour, minute, second, day] + [ + 0x01, + 0x10, + 0x00, + 0x08, + 0x00, + 0x02, + 0x04, + hour, + minute, + second, + day + ] ) # Set timer schedule @@ -203,7 +231,7 @@ def set_time(self, hour: int, minute: int, second: int, day: int) -> None: # {'start_hour':17, 'start_minute':30, 'temp': 22 } # Each one specifies the thermostat temp that will become effective at start_hour:start_minute # weekend is similar but only has 2 (e.g. switch on in morning and off in afternoon) - def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: + def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None: """Set timer schedule.""" request = [0x01, 0x10, 0x00, 0x0A, 0x00, 0x0C, 0x18] @@ -226,3 +254,221 @@ def set_schedule(self, weekday: t.List[dict], weekend: t.List[dict]) -> None: request.append(int(weekend[i]["temp"] * 2)) self.send_request(request) + + +class hvac(Device): + """Controls a HVAC. + + Supported models: + - Tornado SMART X SQ series + - Aux ASW-H12U3/JIR1DI-US + - Aux ASW-H36U2/LFR1DI-US + """ + + TYPE = "HVAC" + + @enum.unique + class Mode(enum.IntEnum): + """Enumerates modes.""" + + AUTO = 0 + COOL = 1 + DRY = 2 + HEAT = 3 + FAN = 4 + + @enum.unique + class Speed(enum.IntEnum): + """Enumerates fan speed.""" + + HIGH = 1 + MID = 2 + LOW = 3 + AUTO = 5 + + @enum.unique + class Preset(enum.IntEnum): + """Enumerates presets.""" + + NORMAL = 0 + TURBO = 1 + MUTE = 2 + + @enum.unique + class SwHoriz(enum.IntEnum): + """Enumerates horizontal swing.""" + + ON = 0 + OFF = 7 + + @enum.unique + class SwVert(enum.IntEnum): + """Enumerates vertical swing.""" + + ON = 0 + POS1 = 1 + POS2 = 2 + POS3 = 3 + POS4 = 4 + POS5 = 5 + OFF = 7 + + def _encode(self, data: bytes) -> bytes: + """Encode data for transport.""" + packet = bytearray(10) + p_len = 10 + len(data) + struct.pack_into( + " bytes: + """Decode data from transport.""" + # payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00]) + payload = self.decrypt(response[0x38:]) + p_len = int.from_bytes(payload[:0x02], "little") + nom_crc = int.from_bytes(payload[p_len:p_len+2], "little") + real_crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4) + + if nom_crc != real_crc: + raise e.DataValidationError( + -4008, + "Received data packet check error", + f"Expected a checksum of {nom_crc} and received {real_crc}", + ) + + d_len = int.from_bytes(payload[0x08:0x0A], "little") + return payload[0x0A:0x0A+d_len] + + def _send(self, command: int, data: bytes = b"") -> bytes: + """Send a command to the unit.""" + prefix = bytes([((command << 4) | 1), 1]) + packet = self._encode(prefix + data) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response)[0x02:] + + def _parse_state(self, data: bytes) -> dict: + """Parse state.""" + state = {} + state["power"] = bool(data[0x08] & 1 << 5) + state["target_temp"] = 8 + (data[0x00] >> 3) + (data[0x04] >> 7) * 0.5 + state["swing_v"] = self.SwVert(data[0x00] & 0b111) + state["swing_h"] = self.SwHoriz(data[0x01] >> 5) + state["mode"] = self.Mode(data[0x05] >> 5) + state["speed"] = self.Speed(data[0x03] >> 5) + state["preset"] = self.Preset(data[0x04] >> 6) + state["sleep"] = bool(data[0x05] & 1 << 2) + state["ifeel"] = bool(data[0x05] & 1 << 3) + state["health"] = bool(data[0x08] & 1 << 1) + state["clean"] = bool(data[0x08] & 1 << 2) + state["display"] = bool(data[0x0A] & 1 << 4) + state["mildew"] = bool(data[0x0A] & 1 << 3) + return state + + def set_state( + self, + power: bool, + target_temp: float, # 16<=target_temp<=32 + mode: Mode, + speed: Speed, + preset: Preset, + swing_h: SwHoriz, + swing_v: SwVert, + sleep: bool, + ifeel: bool, + display: bool, + health: bool, + clean: bool, + mildew: bool, + ) -> dict: + """Set the state of the device.""" + # TODO: decode unknown bits + UNK0 = 0b100 + UNK1 = 0b1101 + UNK2 = 0b101 + + target_temp = round(target_temp * 2) / 2 + + if preset == self.Preset.MUTE: + if mode != self.Mode.FAN: + raise ValueError("mute is only available in fan mode") + speed = self.Speed.LOW + + elif preset == self.Preset.TURBO: + if mode not in {self.Mode.COOL, self.Mode.HEAT}: + raise ValueError("turbo is only available in cooling/heating") + speed = self.Speed.HIGH + + data = bytearray(0x0D) + data[0x00] = (int(target_temp) - 8 << 3) | swing_v + data[0x01] = (swing_h << 5) | UNK0 + data[0x02] = ((target_temp % 1 == 0.5) << 7) | UNK1 + data[0x03] = speed << 5 + data[0x04] = preset << 6 + data[0x05] = mode << 5 | sleep << 2 | ifeel << 3 + data[0x08] = power << 5 | clean << 2 | (health and 0b11) + data[0x0A] = display << 4 | mildew << 3 + data[0x0C] = UNK2 + + resp = self._send(0, data) + return self._parse_state(resp) + + def get_state(self) -> dict: + """Returns a dictionary with the unit's parameters. + + Returns: + dict: + power (bool): + target_temp (float): temperature set point 16 dict: + """Returns dictionary with AC info. + + Returns: + dict: + power (bool): power + ambient_temp (float): ambient temperature + """ + resp = self._send(2) + + if len(resp) < 22: + raise e.DataValidationError( + -4007, + "Received data packet length error", + f"Expected at least 24 bytes and received {len(resp) + 2}", + ) + + ac_info = {} + ac_info["power"] = resp[0x1] & 1 + + ambient_temp = resp[0x05] & 0b11111, resp[0x15] & 0b11111 + if any(ambient_temp): + ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0 + + return ac_info diff --git a/broadlink/cover.py b/broadlink/cover.py index c0f08abb..75317943 100644 --- a/broadlink/cover.py +++ b/broadlink/cover.py @@ -1,5 +1,6 @@ """Support for covers.""" import time +from typing import Sequence from . import exceptions as e from .device import Device @@ -8,33 +9,34 @@ class dooya(Device): """Controls a Dooya curtain motor.""" - TYPE = "Dooya DT360E" + TYPE = "DT360E" - def _send(self, magic1: int, magic2: int) -> int: + def _send(self, command: int, attribute: int = 0) -> int: """Send a packet to the device.""" packet = bytearray(16) - packet[0] = 0x09 - packet[2] = 0xBB - packet[3] = magic1 - packet[4] = magic2 - packet[9] = 0xFA - packet[10] = 0x44 - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) + packet[0x00] = 0x09 + packet[0x02] = 0xBB + packet[0x03] = command + packet[0x04] = attribute + packet[0x09] = 0xFA + packet[0x0A] = 0x44 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) return payload[4] def open(self) -> int: """Open the curtain.""" - return self._send(0x01, 0x00) + return self._send(0x01) def close(self) -> int: """Close the curtain.""" - return self._send(0x02, 0x00) + return self._send(0x02) def stop(self) -> int: """Stop the curtain.""" - return self._send(0x03, 0x00) + return self._send(0x03) def get_percentage(self) -> int: """Return the position of the curtain.""" @@ -55,3 +57,126 @@ def set_percentage_and_wait(self, new_percentage: int) -> None: time.sleep(0.2) current = self.get_percentage() self.stop() + + +class dooya2(Device): + """Controls a Dooya curtain motor (version 2).""" + + TYPE = "DT360E-2" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def open(self) -> None: + """Open the curtain.""" + self._send(2, [0x00, 0x01, 0x00]) + + def close(self) -> None: + """Close the curtain.""" + self._send(2, [0x00, 0x02, 0x00]) + + def stop(self) -> None: + """Stop the curtain.""" + self._send(2, [0x00, 0x03, 0x00]) + + def get_percentage(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, [0x00, 0x06, 0x00]) + return resp[0x11] + + def set_percentage(self, new_percentage: int) -> None: + """Set the position of the curtain.""" + self._send(2, [0x00, 0x09, new_percentage]) + + +class wser(Device): + """Controls a Wistar curtain motor""" + + TYPE = "WSER" + + def _send(self, operation: int, data: Sequence = b""): + """Send a command to the device.""" + packet = bytearray(12) + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x08] = operation + packet[0x09] = 0x0B + + if data: + data_len = len(data) + packet[0x0A] = data_len & 0xFF + packet[0x0B] = data_len >> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def get_position(self) -> int: + """Return the position of the curtain.""" + resp = self._send(1, []) + position = resp[0x0E] + return position + + def open(self) -> int: + """Open the curtain.""" + resp = self._send(2, [0x4A, 0x31, 0xA0]) + position = resp[0x0E] + return position + + def close(self) -> int: + """Close the curtain.""" + resp = self._send(2, [0x61, 0x32, 0xA0]) + position = resp[0x0E] + return position + + def stop(self) -> int: + """Stop the curtain.""" + resp = self._send(2, [0x4C, 0x73, 0xA0]) + position = resp[0x0E] + return position + + def set_position(self, position: int) -> int: + """Set the position of the curtain.""" + resp = self._send(2, [position, 0x70, 0xA0]) + position = resp[0x0E] + return position diff --git a/broadlink/device.py b/broadlink/device.py index 74e916f4..5a10bc01 100644 --- a/broadlink/device.py +++ b/broadlink/device.py @@ -3,7 +3,7 @@ import threading import random import time -import typing as t +from typing import Generator, Optional, Tuple, Union from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes @@ -17,15 +17,15 @@ ) from .protocol import Datetime -HelloResponse = t.Tuple[int, t.Tuple[str, int], str, str, bool] +HelloResponse = Tuple[int, Tuple[str, int], str, str, bool] def scan( timeout: int = DEFAULT_TIMEOUT, - local_ip_address: str = None, + local_ip_address: Optional[str] = None, discover_ip_address: str = DEFAULT_BCAST_ADDR, discover_ip_port: int = DEFAULT_PORT, -) -> t.Generator[HelloResponse, None, None]: +) -> Generator[HelloResponse, None, None]: """Broadcast a hello message and yield responses.""" conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -76,7 +76,7 @@ def scan( conn.close() -def ping(address: str, port: int = DEFAULT_PORT) -> None: +def ping(ip_address: str, port: int = DEFAULT_PORT) -> None: """Send a ping packet to an address. This packet feeds the watchdog timer of firmwares >= v53. @@ -87,7 +87,7 @@ def ping(address: str, port: int = DEFAULT_PORT) -> None: conn.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) packet = bytearray(0x30) packet[0x26] = 1 - conn.sendto(packet, (address, port)) + conn.sendto(packet, (ip_address, port)) class Device: @@ -100,8 +100,8 @@ class Device: def __init__( self, - host: t.Tuple[str, int], - mac: t.Union[bytes, str], + host: Tuple[str, int], + mac: Union[bytes, str], devtype: int, timeout: int = DEFAULT_TIMEOUT, name: str = "", diff --git a/broadlink/helpers.py b/broadlink/helpers.py index 6ee54991..e7b3d4c9 100644 --- a/broadlink/helpers.py +++ b/broadlink/helpers.py @@ -1,5 +1,5 @@ """Helper functions and classes.""" -import typing as t +from typing import Dict, List, Sequence class CRC16: @@ -8,10 +8,10 @@ class CRC16: CRC tables are cached for performance. """ - _cache: t.Dict[int, t.List[int]] = {} + _cache: Dict[int, List[int]] = {} @classmethod - def get_table(cls, polynomial: int) -> t.List[int]: + def get_table(cls, polynomial: int) -> List[int]: """Return the CRC-16 table for a polynomial.""" try: crc_table = cls._cache[polynomial] @@ -31,7 +31,7 @@ def get_table(cls, polynomial: int) -> t.List[int]: @classmethod def calculate( cls, - sequence: t.Sequence[int], + sequence: Sequence[int], polynomial: int = 0xA001, # CRC-16-ANSI. init_value: int = 0xFFFF, ) -> int: diff --git a/broadlink/hub.py b/broadlink/hub.py new file mode 100644 index 00000000..0fd4ae53 --- /dev/null +++ b/broadlink/hub.py @@ -0,0 +1,98 @@ +"""Support for hubs.""" +import struct +import json +from typing import Optional + +from . import exceptions as e +from .device import Device + + +class s3(Device): + """Controls a Broadlink S3.""" + + TYPE = "S3" + MAX_SUBDEVICES = 8 + + def get_subdevices(self, step: int = 5) -> list: + """Return a list of sub devices.""" + total = self.MAX_SUBDEVICES + sub_devices = [] + seen = set() + index = 0 + + while index < total: + state = {"count": step, "index": index} + packet = self._encode(14, state) + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + resp = self._decode(resp) + + for device in resp["list"]: + did = device["did"] + if did in seen: + continue + + seen.add(did) + sub_devices.append(device) + + total = resp["total"] + if len(seen) >= total: + break + + index += step + + return sub_devices + + def get_state(self, did: Optional[str] = None) -> dict: + """Return the power state of the device.""" + state = {} + if did is not None: + state["did"] = did + + packet = self._encode(1, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def set_state( + self, + did: Optional[str] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + pwr3: Optional[bool] = None, + ) -> dict: + """Set the power state of the device.""" + state = {} + if did is not None: + state["did"] = did + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + def _encode(self, flag: int, state: dict) -> bytes: + """Encode a JSON packet.""" + # flag: 1 for reading, 2 for writing. + packet = bytearray(12) + data = json.dumps(state, separators=(",", ":")).encode() + struct.pack_into( + " dict: + """Decode a JSON packet.""" + payload = self.decrypt(response[0x38:]) + js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - red: int = None, - blue: int = None, - green: int = None, - brightness: int = None, - colortemp: int = None, - hue: int = None, - saturation: int = None, - transitionduration: int = None, - maxworktime: int = None, - bulb_colormode: int = None, - bulb_scenes: str = None, - bulb_scene: str = None, - bulb_sceneidx: int = None, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, + bulb_sceneidx: Optional[int] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -101,7 +102,7 @@ def _decode(self, response: bytes) -> dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - red: int = None, - blue: int = None, - green: int = None, - brightness: int = None, - colortemp: int = None, - hue: int = None, - saturation: int = None, - transitionduration: int = None, - maxworktime: int = None, - bulb_colormode: int = None, - bulb_scenes: str = None, - bulb_scene: str = None, + pwr: Optional[bool] = None, + red: Optional[int] = None, + blue: Optional[int] = None, + green: Optional[int] = None, + brightness: Optional[int] = None, + colortemp: Optional[int] = None, + hue: Optional[int] = None, + saturation: Optional[int] = None, + transitionduration: Optional[int] = None, + maxworktime: Optional[int] = None, + bulb_colormode: Optional[int] = None, + bulb_scenes: Optional[str] = None, + bulb_scene: Optional[str] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -183,7 +184,9 @@ def _encode(self, flag: int, state: dict) -> bytes: # flag: 1 for reading, 2 for writing. packet = bytearray(12) data = json.dumps(state, separators=(",", ":")).encode() - struct.pack_into(" dict: """Decode a JSON packet.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" bytes: + """Convert a microsecond duration sequence into a Broadlink IR packet.""" + result = bytearray(4) + result[0x00] = 0x26 + + for pulse in pulses: + div, mod = divmod(int(pulse // tick), 256) + if div: + result.append(0) + result.append(div) + result.append(mod) + + data_len = len(result) - 4 + result[0x02] = data_len & 0xFF + result[0x03] = data_len >> 8 + + return result + + +def data_to_pulses(data: bytes, tick: float = 32.84) -> List[int]: + """Parse a Broadlink packet into a microsecond duration sequence.""" + result = [] + index = 4 + end = min(256 * data[0x03] + data[0x02] + 4, len(data)) + + while index < end: + chunk = data[index] + index += 1 + + if chunk == 0: + try: + chunk = 256 * data[index] + data[index + 1] + except IndexError as err: + raise ValueError("Malformed data.") from err + index += 2 + + result.append(int(chunk * tick)) + + return result + + class rmmini(Device): """Controls a Broadlink RM mini 3.""" @@ -46,14 +88,19 @@ def sweep_frequency(self) -> None: """Sweep frequency.""" self._send(0x19) - def check_frequency(self) -> bool: + def check_frequency(self) -> Tuple[bool, float]: """Return True if the frequency was identified successfully.""" resp = self._send(0x1A) - return resp[0] == 1 + is_found = bool(resp[0]) + frequency = struct.unpack(" None: + def find_rf_packet(self, frequency: Optional[float] = None) -> None: """Enter radiofrequency learning mode.""" - self._send(0x1B) + payload = bytearray() + if frequency: + payload += struct.pack(" None: """Cancel sweep frequency.""" @@ -82,7 +129,7 @@ def _send(self, command: int, data: bytes = b"") -> bytes: e.check_error(resp[0x22:0x24]) payload = self.decrypt(resp[0x38:]) p_len = struct.unpack(" dict: def check_sensors_raw(self) -> dict: """Return the state of the sensors in raw format.""" packet = bytearray([0x1]) - response = self.send_packet(0x6A, packet) - e.check_error(response[0x22:0x24]) - payload = self.decrypt(response[0x38:]) - data = payload[0x4:] + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + data = self.decrypt(resp[0x38:]) - temperature = struct.unpack("> 8 + packet += bytes(2) + packet.extend(data) + + checksum = sum(packet, 0xBEAF) & 0xFFFF + packet[0x06] = checksum & 0xFF + packet[0x07] = checksum >> 8 + + packet_len = len(packet) - 2 + packet[0x00] = packet_len & 0xFF + packet[0x01] = packet_len >> 8 + + resp = self.send_packet(0x6A, packet) + e.check_error(resp[0x22:0x24]) + payload = self.decrypt(resp[0x38:]) + return payload + + def check_sensors_raw(self) -> dict: + """Return the state of the sensors in raw format.""" + data = self._send(1) return { - "temperature": temperature, - "humidity": humidity, - "light": data[0x4], - "air_quality": data[0x6], - "noise": data[0x8], + "temperature": data[0x13] * 256 + data[0x14], + "humidity": data[0x15] * 256 + data[0x16], + "pm10": data[0x0D] * 256 + data[0x0E], + "pm2_5": data[0x0F] * 256 + data[0x10], + "pm1": data[0x11] * 256 + data[0x12], } diff --git a/broadlink/switch.py b/broadlink/switch.py index 1079cde0..8393f6b1 100644 --- a/broadlink/switch.py +++ b/broadlink/switch.py @@ -1,6 +1,7 @@ """Support for switches.""" import json import struct +from typing import Optional from . import exceptions as e from .device import Device @@ -127,12 +128,12 @@ def set_nightlight(self, ntlight: bool) -> None: def set_state( self, - pwr: bool = None, - ntlight: bool = None, - indicator: bool = None, - ntlbrightness: int = None, - maxworktime: int = None, - childlock: bool = None, + pwr: Optional[bool] = None, + ntlight: Optional[bool] = None, + indicator: Optional[bool] = None, + ntlbrightness: Optional[int] = None, + maxworktime: Optional[int] = None, + childlock: Optional[bool] = None, ) -> dict: """Set state of device.""" state = {} @@ -186,7 +187,7 @@ def _decode(self, response: bytes) -> dict: e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: e.check_error(response[0x22:0x24]) payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: def set_state( self, - pwr: bool = None, - pwr1: bool = None, - pwr2: bool = None, - maxworktime: int = None, - maxworktime1: int = None, - maxworktime2: int = None, - idcbrightness: int = None, + pwr: Optional[bool] = None, + pwr1: Optional[bool] = None, + pwr2: Optional[bool] = None, + maxworktime: Optional[int] = None, + maxworktime1: Optional[int] = None, + maxworktime2: Optional[int] = None, + idcbrightness: Optional[int] = None, ) -> dict: """Set the power state of the device.""" state = {} @@ -291,7 +292,16 @@ def _encode(self, flag: int, state: dict) -> bytes: data = json.dumps(state).encode() length = 12 + len(data) struct.pack_into( - " dict: """Decode a message.""" payload = self.decrypt(response[0x38:]) js_len = struct.unpack_from(" dict: + """Set the power state of the device.""" + state = {} + if pwr is not None: + state["pwr"] = int(bool(pwr)) + if pwr1 is not None: + state["pwr1"] = int(bool(pwr1)) + if pwr2 is not None: + state["pwr2"] = int(bool(pwr2)) + if pwr3 is not None: + state["pwr3"] = int(bool(pwr3)) + if maxworktime1 is not None: + state["maxworktime1"] = maxworktime1 + if maxworktime2 is not None: + state["maxworktime2"] = maxworktime2 + if maxworktime3 is not None: + state["maxworktime3"] = maxworktime3 + if idcbrightness is not None: + state["idcbrightness"] = idcbrightness + if childlock is not None: + state["childlock"] = int(bool(childlock)) + if childlock1 is not None: + state["childlock1"] = int(bool(childlock1)) + if childlock2 is not None: + state["childlock2"] = int(bool(childlock2)) + if childlock3 is not None: + state["childlock3"] = int(bool(childlock3)) + if childlock4 is not None: + state["childlock4"] = int(bool(childlock4)) + + packet = self._encode(2, state) + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + return self._decode(response) + + class mp1(Device): """Controls a Broadlink MP1.""" @@ -360,3 +426,47 @@ def check_power(self) -> dict: "s3": bool(data & 4), "s4": bool(data & 8), } + + +class mp1s(mp1): + """Controls a Broadlink MP1S.""" + + TYPE = "MP1S" + + def get_state(self) -> dict: + """Return the power state of the device. + + voltage in V. + current in A. + power in W. + power consumption in kW·h. + """ + packet = bytearray(16) + packet[0x00] = 0x0E + packet[0x02] = 0xA5 + packet[0x03] = 0xA5 + packet[0x04] = 0x5A + packet[0x05] = 0x5A + packet[0x06] = 0xB2 + packet[0x07] = 0xC0 + packet[0x08] = 0x01 + packet[0x0A] = 0x04 + + response = self.send_packet(0x6A, packet) + e.check_error(response[0x22:0x24]) + payload = self.decrypt(response[0x38:]) + payload_str = payload.hex()[4:-6] + + def get_value(start, end, factors): + value = sum( + int(payload_str[i-2:i]) * factor + for i, factor in zip(range(start, end, -2), factors) + ) + return value + + return { + "volt": get_value(34, 30, [10, 0.1]), + "current": get_value(40, 34, [1, 0.01, 0.0001]), + "power": get_value(46, 40, [100, 1, 0.01]), + "totalconsum": get_value(54, 46, [10000, 100, 1, 0.01]), + } diff --git a/cli/README.md b/cli/README.md index 7b40b4cf..b7e48dc9 100644 --- a/cli/README.md +++ b/cli/README.md @@ -97,7 +97,7 @@ broadlink_cli --device @BEDROOM.device --temperature #### Check humidity ``` -broadlink_cli --device @BEDROOM.device --temperature +broadlink_cli --device @BEDROOM.device --humidity ``` ### Smart plugs diff --git a/cli/broadlink_cli b/cli/broadlink_cli index f7a24ade..7913e332 100755 --- a/cli/broadlink_cli +++ b/cli/broadlink_cli @@ -1,68 +1,32 @@ #!/usr/bin/env python3 import argparse import base64 -import codecs import time +from typing import List import broadlink from broadlink.const import DEFAULT_PORT from broadlink.exceptions import ReadError, StorageError +from broadlink.remote import data_to_pulses, pulses_to_data -TICK = 32.84 TIMEOUT = 30 -IR_TOKEN = 0x26 def auto_int(x): return int(x, 0) -def to_microseconds(bytes): - result = [] - # print bytes[0] # 0x26 = 38for IR - index = 4 - while index < len(bytes): - chunk = bytes[index] - index += 1 - if chunk == 0: - chunk = bytes[index] - chunk = 256 * chunk + bytes[index + 1] - index += 2 - result.append(int(round(chunk * TICK))) - if chunk == 0x0d05: - break - return result - - -def durations_to_broadlink(durations): - result = bytearray() - result.append(IR_TOKEN) - result.append(0) - result.append(len(durations) % 256) - result.append(len(durations) / 256) - for dur in durations: - num = int(round(dur / TICK)) - if num > 255: - result.append(0) - result.append(num / 256) - result.append(num % 256) - return result - - -def format_durations(data): - result = '' - for i in range(0, len(data)): - if len(result) > 0: - result += ' ' - result += ('+' if i % 2 == 0 else '-') + str(data[i]) - return result +def format_pulses(pulses: List[int]) -> str: + """Format pulses.""" + return " ".join( + f"+{pulse}" if i % 2 == 0 else f"-{pulse}" + for i, pulse in enumerate(pulses) + ) -def parse_durations(str): - result = [] - for s in str.split(): - result.append(abs(int(s))) - return result +def parse_pulses(data: List[str]) -> List[int]: + """Parse pulses.""" + return [abs(int(s)) for s in data] parser = argparse.ArgumentParser(fromfile_prefix_chars='@') @@ -83,7 +47,8 @@ parser.add_argument("--switch", action="store_true", help="switch state from on parser.add_argument("--send", action="store_true", help="send command") parser.add_argument("--sensors", action="store_true", help="check all sensors") parser.add_argument("--learn", action="store_true", help="learn command") -parser.add_argument("--rfscanlearn", action="store_true", help="rf scan learning") +parser.add_argument("--rflearn", action="store_true", help="rf scan learning") +parser.add_argument("--frequency", type=float, help="specify radiofrequency for learning") parser.add_argument("--learnfile", help="save learned command to a specified file") parser.add_argument("--durations", action="store_true", help="use durations in micro seconds instead of the Broadlink format") @@ -111,8 +76,8 @@ if args.joinwifi: if args.convert: data = bytearray.fromhex(''.join(args.data)) - durations = to_microseconds(data) - print(format_durations(durations)) + pulses = data_to_pulses(data) + print(format_pulses(pulses)) if args.temperature: print(dev.check_temperature()) if args.humidity: @@ -124,10 +89,13 @@ if args.sensors: for key in data: print("{} {}".format(key, data[key])) if args.send: - data = durations_to_broadlink(parse_durations(' '.join(args.data))) \ - if args.durations else bytearray.fromhex(''.join(args.data)) + data = ( + pulses_to_data(parse_pulses(args.data)) + if args.durations + else bytes.fromhex(''.join(args.data)) + ) dev.send_data(data) -if args.learn or (args.learnfile and not args.rfscanlearn): +if args.learn or (args.learnfile and not args.rflearn): dev.enter_learning() print("Learning...") start = time.time() @@ -143,17 +111,19 @@ if args.learn or (args.learnfile and not args.rfscanlearn): print("No data received...") exit(1) - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learn: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: " + str(base64.b64encode(decode_hex(learned)[0]))) + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) if args.check: if dev.check_power(): print('* ON *') @@ -195,28 +165,33 @@ if args.switch: else: dev.set_power(True) print('* Switch to ON *') -if args.rfscanlearn: - dev.sweep_frequency() - print("Learning RF Frequency, press and hold the button to learn...") - - start = time.time() - while time.time() - start < TIMEOUT: - time.sleep(1) - if dev.check_frequency(): - break +if args.rflearn: + if args.frequency: + frequency = args.frequency + print("Press the button you want to learn, a short press...") else: - print("RF Frequency not found") - dev.cancel_sweep_frequency() - exit(1) + dev.sweep_frequency() + print("Detecting radiofrequency, press and hold the button to learn...") + + start = time.time() + while time.time() - start < TIMEOUT: + time.sleep(1) + locked, frequency = dev.check_frequency() + if locked: + break + else: + print("Radiofrequency not found") + dev.cancel_sweep_frequency() + exit(1) - print("Found RF Frequency - 1 of 2!") - print("You can now let go of the button") + print("Radiofrequency detected: {}MHz".format(frequency)) + print("You can now let go of the button") - input("Press enter to continue...") + input("Press enter to continue...") - print("To complete learning, single press the button you want to learn") + print("Press the button again, now a short press.") - dev.find_rf_packet() + dev.find_rf_packet(frequency) start = time.time() while time.time() - start < TIMEOUT: @@ -231,15 +206,16 @@ if args.rfscanlearn: print("No data received...") exit(1) - print("Found RF Frequency - 2 of 2!") - learned = format_durations(to_microseconds(bytearray(data))) \ - if args.durations \ - else ''.join(format(x, '02x') for x in bytearray(data)) - if args.learnfile is None: - print(learned) - decode_hex = codecs.getdecoder("hex_codec") - print("Base64: {}".format(str(base64.b64encode(decode_hex(learned)[0])))) - if args.learnfile is not None: + print("Packet found!") + raw_fmt = data.hex() + base64_fmt = base64.b64encode(data).decode('ascii') + pulse_fmt = format_pulses(data_to_pulses(data)) + + print("Raw:", raw_fmt) + print("Base64:", base64_fmt) + print("Pulses:", pulse_fmt) + + if args.learnfile: print("Saving to {}".format(args.learnfile)) with open(args.learnfile, "w") as text_file: - text_file.write(learned) + text_file.write(pulse_fmt if args.durations else raw_fmt) diff --git a/setup.py b/setup.py index 505386b9..0426f148 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools import setup, find_packages -version = '0.18.0' +version = '0.19.0' setup( name="broadlink",