mash-server/src/bridges/z2m/zigbee2mqtt_bridge.py
2025-01-11 01:17:12 +01:00

90 lines
3 KiB
Python

import logging
from typing import Optional
from core import Bridge
import paho.mqtt.client as mqtt
import json
class Z2mBridge(Bridge):
def __init__(
self,
*,
id: str,
ip: str,
port: int = 1883,
keepalive: int = 60,
topic: str = "zigbee2mqtt",
) -> None:
"""
Args:
id (str): Unique identifier of this bridge instance.
ip (str): IP-Address of MQTT broker.
port (int, optional): Port of MQTT broker. Defaults to 1883.
keepalive (int, optional): MQTT keepalive delay in seconds. Defaults to 60.
topic (str, optional): Base topic for Zigbee2MQTT interface. Defaults to "zigbee2mqtt".
"""
super().__init__(id=id, type="zigbee2mqtt")
self._ip = ip
self._port = port
self._keepalive = keepalive
self._device_callbacks: dict[str, list] = {}
self._topic = topic.strip("/")
self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self._client.on_connect = lambda client, userdata, flags, reason_code, properties: self.__on_connect__(
client, userdata, flags, reason_code, properties
)
self._client.on_message = lambda client, userdata, msg: self.__on_message__(
client, userdata, msg
)
def disconnect(self) -> None:
self._client.loop_stop()
self._client.disconnect()
logging.info(f"Disconnected from Zigbee2MQTT broker [{self.id}].")
def connect(self) -> None:
self._client.connect(self._ip, self._port, self._keepalive)
self._client.loop_start()
logging.info(f"Connect to Zigbee2MQTT broker [{self.id}].")
@property
def is_connected(self) -> bool | None:
return self._client.is_connected()
def __on_connect__(self, client, userdata, flags, reason_code, properties):
self._client.subscribe(f"{self._topic}/#")
def __on_message__(self, client, userdata, msg: any):
device_name = msg.topic.split(self._topic + "/", 2)[-1].split("/", 2)[0]
if device_name not in self._device_callbacks.keys():
return
for callback in self._device_callbacks[device_name]:
callback(device_name, json.loads(msg.payload))
@Bridge.requires_connection
def set_device(self, ieee_address: str, *, content: dict = {}) -> None:
self._client.publish(f"{self._topic}/{ieee_address}/set", json.dumps(content))
@Bridge.requires_connection
def get_device(self, ieee_address: str) -> None:
self._client.publish(
f"{self._topic}/{ieee_address}/get", json.dumps({"state": ""})
)
def subscribe_device(
self,
callback,
*,
ieee_address: Optional[str] = None,
friendly_name: Optional[str] = None,
) -> None:
for id in [ieee_address, friendly_name]:
if id not in self._device_callbacks.keys():
self._device_callbacks[id] = []
self._device_callbacks[id].append(callback)