import logging from typing import Optional from core.bridge import Bridge import paho.mqtt.client as mqtt import json class Z2mBridge(Bridge): def __init__( self, *, id: str, ip: str, port: int = 1883, keepalive: int = 60, topic: str = "zigbee2mqtt", ) -> None: """ Args: id (str): Unique identifier of this bridge instance. ip (str): IP-Address of MQTT broker. port (int, optional): Port of MQTT broker. Defaults to 1883. keepalive (int, optional): MQTT keepalive delay in seconds. Defaults to 60. topic (str, optional): Base topic for Zigbee2MQTT interface. Defaults to "zigbee2mqtt". """ super().__init__(id=id, type="zigbee2mqtt") self._ip = ip self._port = port self._keepalive = keepalive self._device_callbacks: dict[str, list] = {} self._topic = topic.strip("/") self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) self._client.on_connect = lambda client, userdata, flags, reason_code, properties: self.__on_connect__( client, userdata, flags, reason_code, properties ) self._client.on_message = lambda client, userdata, msg: self.__on_message__( client, userdata, msg ) def disconnect(self) -> None: self._client.loop_stop() self._client.disconnect() logging.info(f"Disconnected from Zigbee2MQTT broker [{self.id}].") def connect(self) -> None: self._client.connect(self._ip, self._port, self._keepalive) self._client.loop_start() logging.info(f"Connect to Zigbee2MQTT broker [{self.id}].") @property def is_connected(self) -> bool | None: return self._client.is_connected() def __on_connect__(self, client, userdata, flags, reason_code, properties): self._client.subscribe(f"{self._topic}/#") def __on_message__(self, client, userdata, msg: any): device_name = msg.topic.split(self._topic + "/", 2)[-1].split("/", 2)[0] if device_name not in self._device_callbacks.keys(): return for callback in self._device_callbacks[device_name]: callback(device_name, json.loads(msg.payload)) @Bridge.requires_connection def set_device(self, ieee_address: str, *, content: dict = {}) -> None: self._client.publish(f"{self._topic}/{ieee_address}/set", json.dumps(content)) @Bridge.requires_connection def get_device(self, ieee_address: str) -> None: self._client.publish( f"{self._topic}/{ieee_address}/get", json.dumps({"state": ""}) ) def subscribe_device( self, callback, *, ieee_address: Optional[str] = None, friendly_name: Optional[str] = None, ) -> None: for id in [ieee_address, friendly_name]: if id not in self._device_callbacks.keys(): self._device_callbacks[id] = [] self._device_callbacks[id].append(callback)