Removed background jobs from bridges

This commit is contained in:
Maximilian Giller 2024-07-31 04:13:35 +02:00
parent 91552bafb6
commit c1a5b61e30
3 changed files with 57 additions and 115 deletions

View file

@ -1,3 +1,8 @@
class BridgeException(Exception):
def __init__(self, id: str, type: str, message: str) -> None:
super().__init__(f"Bridge [{type} | {id}] has thrown an exception: {message}")
class Bridge:
def __init__(self, *, id: str, type: str) -> None:
self._id = id
@ -10,3 +15,32 @@ class Bridge:
@property
def type(self) -> str:
return self._type
@property
def is_connected(self) -> bool | None:
return None
def __del__(self) -> None:
self.disconnect()
def connect(self) -> None:
pass
def disconnect(self) -> None:
pass
def requires_connection(func, *, auto_connect=True):
def inner(self: Bridge, *args, **kwargs):
if self.is_connected is False: # Neither True, nor None
if not auto_connect:
raise BridgeException(
self.id,
self.type,
f"Bridge must be manually connected before executing method [{func.__name__}].",
)
self.connect()
return func(*args, **kwargs)
return inner

View file

@ -47,129 +47,42 @@ class FritzDeviceState:
class FritzBoxBridge(Bridge):
def __init__(
self,
*,
id: str,
ip: str,
port: Optional[int] = None,
refresh_delay_sec: int = 60,
) -> None:
"""
Args:
id (str): Id of fritzbox bridge.
ip (str): IP Address of fritzbox bridge in network to connect to.
port (Optional[int], optional): Port of fritzbox bridge in network to connect to. Defaults to None.
refresh_delay_sec (int, optional): Delay between pull cycles to get recent states from fritzbox in seconds. Defaults to 60.
"""
super().__init__(id=id, type="fritzbox")
self._ip = ip
self._port = port
self._refresh_delay_sec = refresh_delay_sec
self._device_callbacks: dict[str, list[callable]] = {}
self._device_states: dict[str, FritzDeviceState] = {}
self._background_service: Optional[asyncio.Task] = None
self._fritz_api: FritzConnection = None
def connect(self) -> None:
self.disconnect()
self._background_service = asyncio.run(self.__start_service_worker__())
async def __start_service_worker__(self) -> Coroutine:
return await self.__service_worker__()
def disconnect(self) -> None:
if self._background_service is None:
return
self._background_service.cancel()
async def __service_worker__(self) -> None:
while True:
try:
start_time = datetime.now()
self.__establish_connection_if_required__()
self.__update_active_devices__()
await self.__sleep_until_next_refresh__(start_time)
except asyncio.CancelledError:
break
except KeyboardInterrupt:
break
except Exception as ex:
await self.__handle_service_exception__(ex)
self._fritz_api = None
async def __sleep_until_next_refresh__(self, start_time: datetime):
refresh_duration_sec = (datetime.now() - start_time).total_seconds()
remaining_delay = self._refresh_delay_sec - refresh_duration_sec
if remaining_delay > 0:
logging.debug(
f"Fritz service worker sleeping for {remaining_delay} seconds."
)
await asyncio.sleep(remaining_delay)
def __update_active_devices__(self) -> None:
for mac_address in self._device_callbacks.keys():
raw_state = self.__get_specific_device__(mac_address)
new_device_state = FritzDeviceState(
mac_address=mac_address, raw_state=raw_state
)
if self._device_states[mac_address] == new_device_state:
continue # No state change
self._device_states[mac_address] = new_device_state
# Trigger every callback for device change
for cb in self._device_callbacks[mac_address]:
cb(mac_address, new_device_state)
async def __handle_service_exception__(self, exception: Exception) -> None:
logging.exception(
f"Exception in service worker of {self.type} caught: {exception}",
exc_info=exception,
)
logging.debug(
"Exception occurred, sleeping for {EXCEPTION_RECONNECT_TIMEOUT_SEC} seconds and reconnecting again."
)
self._fritz_api = None # Trigger reconnect
await asyncio.sleep(EXCEPTION_RECONNECT_TIMEOUT_SEC)
def __establish_connection_if_required__(self) -> None:
if self._fritz_api:
return
self.disconnect()
self._fritz_api = FritzConnection(address=self._ip, port=self._port)
logging.info("Connected")
def subscribe_device(self, callback, mac_address: str) -> None:
"""Register device and receive callbacks on state changes.
def disconnect(self) -> None:
logging.info("Disconnected")
self._fritz_api = None
Args:
callback (function): Function to call on state change.
mac_address (str): Mac address of device.
"""
self.register_device(mac_address)
self._device_callbacks[mac_address].append(callback)
@property
def is_connected(self) -> bool | None:
return self._fritz_api is not None
def register_device(self, mac_address: str) -> None:
"""Make specified device known to bridge, to track device state.
Args:
mac_address (str): Mac address of device.
"""
if mac_address not in self._device_callbacks.keys():
self._device_callbacks[mac_address] = []
self._device_states[mac_address] = None
def __get_known_devices__(self) -> list[dict]:
@Bridge.requires_connection
def get_known_devices(self) -> list[dict]:
numberOfDevices = self._fritz_api.call_action(
"Hosts", "GetHostNumberOfEntries"
)["NewHostNumberOfEntries"]
@ -180,19 +93,11 @@ class FritzBoxBridge(Bridge):
)
return devices
def __get_specific_device__(self, mac_address: str) -> dict:
return self._fritz_api.call_action(
"Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address
@Bridge.requires_connection
def get_device_state(self, mac_address: str) -> FritzDeviceState:
return FritzDeviceState(
mac_address=mac_address,
raw_state=self._fritz_api.call_action(
"Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address
),
)
def get_device_info(self, mac_address: str) -> FritzDeviceState | None:
"""Return latest device state or None if not registered.
Does not request new state, uses last known, cached state.
Args:
mac_address (str): Mac address of device.
Returns:
FritzDeviceState | None: Latest device state or None.
"""
return self._device_states[mac_address]

View file

@ -40,9 +40,6 @@ class Z2mBridge(Bridge):
client, userdata, msg
)
def __del__(self) -> None:
self.disconnect()
def disconnect(self) -> None:
self._client.loop_stop()
self._client.disconnect()
@ -53,6 +50,10 @@ class Z2mBridge(Bridge):
self._client.loop_start()
logging.info(f"Connect to Zigbee2MQTT broker [{self.id}].")
@property
def is_connected(self) -> bool | None:
return self._client.is_connected()
def __on_connect__(self, client, userdata, flags, reason_code, properties):
self._client.subscribe(f"{self._topic}/#")
@ -65,9 +66,11 @@ class Z2mBridge(Bridge):
for callback in self._device_callbacks[device_name]:
callback(device_name, json.loads(msg.payload))
@Bridge.requires_connection
def set_device(self, ieee_address: str, *, content: dict = {}) -> None:
self._client.publish(f"{self._topic}/{ieee_address}/set", json.dumps(content))
@Bridge.requires_connection
def get_device(self, ieee_address: str) -> None:
self._client.publish(
f"{self._topic}/{ieee_address}/get", json.dumps({"state": ""})