Implemented fritz box bridge

This commit is contained in:
Maximilian Giller 2024-07-17 21:02:05 +02:00
parent fd414c2cd1
commit ff7b23b716
2 changed files with 212 additions and 0 deletions

28
src/fritz_test.py Normal file
View file

@ -0,0 +1,28 @@
from mash.bridges.fritzbox import FritzBoxBridge, FritzDeviceState
from time import sleep
import logging
# logging.getLogger().setLevel(logging.DEBUG)
bridge = FritzBoxBridge(id="fritzbox", ip="192.168.178.1", refresh_delay_sec=3)
def device_change(mac_address: str, device_state: FritzDeviceState) -> None:
print(
f"{device_state.host_name} - {'Connected' if device_state.active else 'Disconnected'}"
)
print(str(device_state))
bridge.subscribe_device(device_change, mac_address="FA:01:EC:90:50:49")
bridge.connect()
while True:
try:
sleep(3)
except KeyboardInterrupt:
break
bridge.disconnect()

View file

@ -0,0 +1,184 @@
from datetime import datetime
import logging
from typing import Coroutine, Optional
from mash.bridges.bridge import Bridge
from fritzconnection import FritzConnection
import asyncio
EXCEPTION_RECONNECT_TIMEOUT_SEC = 60
class FritzDeviceState:
def __init__(self, mac_address: str, raw_state: dict) -> None:
logging.debug(
f"Fritz raw device state to mac [{mac_address}]: {raw_state}",
extra=raw_state,
)
self.mac_address = mac_address
self.active: bool = raw_state["NewActive"]
self.ip_address: str = raw_state["NewIPAddress"]
self.address_source: str = raw_state["NewAddressSource"]
self.lease_time_remaining = raw_state["NewLeaseTimeRemaining"]
self.interface_type: str = raw_state["NewInterfaceType"]
self.host_name: str = raw_state["NewHostName"]
def __eq__(self, value: object) -> bool:
return (
type(value) is FritzDeviceState
and self.mac_address == value.mac_address
and self.ip_address == value.ip_address
and self.address_source == value.address_source
and self.lease_time_remaining == value.lease_time_remaining
and self.interface_type == value.interface_type
and self.host_name == value.host_name
and self.active == value.active
)
def __str__(self) -> str:
return f"[{self.mac_address} | {self.host_name}] {'Active' if self.active else 'Inactive'} - {self.ip_address} - {self.address_source} - {self.interface_type} - {self.lease_time_remaining}"
class FritzBoxBridge(Bridge):
def __init__(
self,
*,
id: str,
ip: str,
port: Optional[int] = None,
refresh_delay_sec: int = 60,
) -> None:
super().__init__(id=id, type="fritzbox")
self._ip = ip
self._port = port
self._refresh_delay_sec = refresh_delay_sec
self._device_callbacks: dict[str, list[callable]] = {}
self._device_states: dict[str, FritzDeviceState] = {}
self._background_service: Optional[asyncio.Task] = None
self._fritz_api: FritzConnection = None
def connect(self) -> None:
self.disconnect()
self._background_service = asyncio.run(self.__start_service_worker__())
async def __start_service_worker__(self) -> Coroutine:
return await self.__service_worker__()
def disconnect(self) -> None:
if self._background_service is None:
return
self._background_service.cancel()
async def __service_worker__(self) -> None:
while True:
try:
start_time = datetime.now()
self.__establish_connection_if_required__()
self.__update_active_devices__()
await self.__sleep_until_next_refresh__(start_time)
except asyncio.CancelledError:
break
except KeyboardInterrupt:
break
except Exception as ex:
await self.__handle_service_exception__(ex)
self._fritz_api = None
async def __sleep_until_next_refresh__(self, start_time: datetime):
refresh_duration_sec = (datetime.now() - start_time).total_seconds()
remaining_delay = self._refresh_delay_sec - refresh_duration_sec
if remaining_delay > 0:
logging.debug(
f"Fritz service worker sleeping for {remaining_delay} seconds."
)
await asyncio.sleep(remaining_delay)
def __update_active_devices__(self) -> None:
for mac_address in self._device_callbacks.keys():
raw_state = self.__get_specific_device__(mac_address)
new_device_state = FritzDeviceState(
mac_address=mac_address, raw_state=raw_state
)
if self._device_states[mac_address] == new_device_state:
continue # No state change
self._device_states[mac_address] = new_device_state
# Trigger every callback for device change
for cb in self._device_callbacks[mac_address]:
cb(mac_address, new_device_state)
async def __handle_service_exception__(self, exception: Exception) -> None:
logging.exception(
f"Exception in service worker of {self.type} caught: {exception}",
exc_info=exception,
)
logging.debug(
"Exception occurred, sleeping for {EXCEPTION_RECONNECT_TIMEOUT_SEC} seconds and reconnecting again."
)
self._fritz_api = None # Trigger reconnect
await asyncio.sleep(EXCEPTION_RECONNECT_TIMEOUT_SEC)
def __establish_connection_if_required__(self) -> None:
if self._fritz_api:
return
self._fritz_api = FritzConnection(address=self._ip, port=self._port)
logging.info("Connected")
def subscribe_device(self, callback, mac_address: str) -> None:
"""Register device and receive callbacks on state changes.
Args:
callback (function): Function to call on state change.
mac_address (str): Mac address of device.
"""
self.register_device(mac_address)
self._device_callbacks[mac_address].append(callback)
def register_device(self, mac_address: str) -> None:
"""Make specified device known to bridge, to track device state.
Args:
mac_address (str): Mac address of device.
"""
if mac_address not in self._device_callbacks.keys():
self._device_callbacks[mac_address] = []
self._device_states[mac_address] = None
def __get_known_devices__(self) -> list[dict]:
numberOfDevices = self._fritz_api.call_action(
"Hosts", "GetHostNumberOfEntries"
)["NewHostNumberOfEntries"]
devices = []
for i in range(numberOfDevices):
devices.append(
self._fritz_api.call_action("Hosts", "GetGenericHostEntry", NewIndex=i)
)
return devices
def __get_specific_device__(self, mac_address: str) -> dict:
return self._fritz_api.call_action(
"Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address
)
def get_device_info(self, mac_address: str) -> FritzDeviceState | None:
"""Return latest device state or None if not registered.
Does not request new state, uses last known, cached state.
Args:
mac_address (str): Mac address of device.
Returns:
FritzDeviceState | None: Latest device state or None.
"""
return self._device_states[mac_address]