diff --git a/src/fritz_test.py b/src/fritz_test.py new file mode 100644 index 0000000..7a80f96 --- /dev/null +++ b/src/fritz_test.py @@ -0,0 +1,28 @@ +from mash.bridges.fritzbox import FritzBoxBridge, FritzDeviceState +from time import sleep +import logging + +# logging.getLogger().setLevel(logging.DEBUG) + + +bridge = FritzBoxBridge(id="fritzbox", ip="192.168.178.1", refresh_delay_sec=3) + + +def device_change(mac_address: str, device_state: FritzDeviceState) -> None: + print( + f"{device_state.host_name} - {'Connected' if device_state.active else 'Disconnected'}" + ) + print(str(device_state)) + + +bridge.subscribe_device(device_change, mac_address="FA:01:EC:90:50:49") + +bridge.connect() + +while True: + try: + sleep(3) + except KeyboardInterrupt: + break + +bridge.disconnect() diff --git a/src/mash/bridges/fritzbox.py b/src/mash/bridges/fritzbox.py index e69de29..89a10d4 100644 --- a/src/mash/bridges/fritzbox.py +++ b/src/mash/bridges/fritzbox.py @@ -0,0 +1,184 @@ +from datetime import datetime +import logging +from typing import Coroutine, Optional +from mash.bridges.bridge import Bridge +from fritzconnection import FritzConnection +import asyncio + + +EXCEPTION_RECONNECT_TIMEOUT_SEC = 60 + + +class FritzDeviceState: + def __init__(self, mac_address: str, raw_state: dict) -> None: + logging.debug( + f"Fritz raw device state to mac [{mac_address}]: {raw_state}", + extra=raw_state, + ) + self.mac_address = mac_address + self.active: bool = raw_state["NewActive"] + self.ip_address: str = raw_state["NewIPAddress"] + self.address_source: str = raw_state["NewAddressSource"] + self.lease_time_remaining = raw_state["NewLeaseTimeRemaining"] + self.interface_type: str = raw_state["NewInterfaceType"] + self.host_name: str = raw_state["NewHostName"] + + def __eq__(self, value: object) -> bool: + return ( + type(value) is FritzDeviceState + and self.mac_address == value.mac_address + and self.ip_address == value.ip_address + and self.address_source == value.address_source + and self.lease_time_remaining == value.lease_time_remaining + and self.interface_type == value.interface_type + and self.host_name == value.host_name + and self.active == value.active + ) + + def __str__(self) -> str: + return f"[{self.mac_address} | {self.host_name}] {'Active' if self.active else 'Inactive'} - {self.ip_address} - {self.address_source} - {self.interface_type} - {self.lease_time_remaining}" + + +class FritzBoxBridge(Bridge): + def __init__( + self, + *, + id: str, + ip: str, + port: Optional[int] = None, + refresh_delay_sec: int = 60, + ) -> None: + super().__init__(id=id, type="fritzbox") + self._ip = ip + self._port = port + self._refresh_delay_sec = refresh_delay_sec + self._device_callbacks: dict[str, list[callable]] = {} + self._device_states: dict[str, FritzDeviceState] = {} + self._background_service: Optional[asyncio.Task] = None + self._fritz_api: FritzConnection = None + + def connect(self) -> None: + self.disconnect() + self._background_service = asyncio.run(self.__start_service_worker__()) + + async def __start_service_worker__(self) -> Coroutine: + return await self.__service_worker__() + + def disconnect(self) -> None: + if self._background_service is None: + return + + self._background_service.cancel() + + async def __service_worker__(self) -> None: + while True: + try: + start_time = datetime.now() + + self.__establish_connection_if_required__() + self.__update_active_devices__() + + await self.__sleep_until_next_refresh__(start_time) + + except asyncio.CancelledError: + break + except KeyboardInterrupt: + break + except Exception as ex: + await self.__handle_service_exception__(ex) + + self._fritz_api = None + + async def __sleep_until_next_refresh__(self, start_time: datetime): + refresh_duration_sec = (datetime.now() - start_time).total_seconds() + remaining_delay = self._refresh_delay_sec - refresh_duration_sec + + if remaining_delay > 0: + logging.debug( + f"Fritz service worker sleeping for {remaining_delay} seconds." + ) + await asyncio.sleep(remaining_delay) + + def __update_active_devices__(self) -> None: + for mac_address in self._device_callbacks.keys(): + raw_state = self.__get_specific_device__(mac_address) + + new_device_state = FritzDeviceState( + mac_address=mac_address, raw_state=raw_state + ) + if self._device_states[mac_address] == new_device_state: + continue # No state change + + self._device_states[mac_address] = new_device_state + + # Trigger every callback for device change + for cb in self._device_callbacks[mac_address]: + cb(mac_address, new_device_state) + + async def __handle_service_exception__(self, exception: Exception) -> None: + logging.exception( + f"Exception in service worker of {self.type} caught: {exception}", + exc_info=exception, + ) + logging.debug( + "Exception occurred, sleeping for {EXCEPTION_RECONNECT_TIMEOUT_SEC} seconds and reconnecting again." + ) + + self._fritz_api = None # Trigger reconnect + + await asyncio.sleep(EXCEPTION_RECONNECT_TIMEOUT_SEC) + + def __establish_connection_if_required__(self) -> None: + if self._fritz_api: + return + + self._fritz_api = FritzConnection(address=self._ip, port=self._port) + logging.info("Connected") + + def subscribe_device(self, callback, mac_address: str) -> None: + """Register device and receive callbacks on state changes. + + Args: + callback (function): Function to call on state change. + mac_address (str): Mac address of device. + """ + self.register_device(mac_address) + self._device_callbacks[mac_address].append(callback) + + def register_device(self, mac_address: str) -> None: + """Make specified device known to bridge, to track device state. + + Args: + mac_address (str): Mac address of device. + """ + if mac_address not in self._device_callbacks.keys(): + self._device_callbacks[mac_address] = [] + self._device_states[mac_address] = None + + def __get_known_devices__(self) -> list[dict]: + numberOfDevices = self._fritz_api.call_action( + "Hosts", "GetHostNumberOfEntries" + )["NewHostNumberOfEntries"] + devices = [] + for i in range(numberOfDevices): + devices.append( + self._fritz_api.call_action("Hosts", "GetGenericHostEntry", NewIndex=i) + ) + return devices + + def __get_specific_device__(self, mac_address: str) -> dict: + return self._fritz_api.call_action( + "Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address + ) + + def get_device_info(self, mac_address: str) -> FritzDeviceState | None: + """Return latest device state or None if not registered. + Does not request new state, uses last known, cached state. + + Args: + mac_address (str): Mac address of device. + + Returns: + FritzDeviceState | None: Latest device state or None. + """ + return self._device_states[mac_address]