import asyncio import logging from fritzconnection import FritzConnection from datetime import datetime from ..hue import hue refresh_every_seconds: int = 30 # Every x seconds devices are polled again trigger_away_after_seconds: int = ( 2 * 60 ) # After all away-devices are gone for x seconds, light is turned off away_devices = ["B2:06:77:EE:A9:0F"] # Max' iPhone macaddresses_to_track = ["B2:06:77:EE:A9:0F"] # Max' iPhone fritz_api = FritzConnection(address="192.168.178.1") # Referenced documentation: https://avm.de/fileadmin/user_upload/Global/Service/Schnittstellen/hostsSCPD.pdf devices_last_online = {} def get_all_devices() -> list: numberOfDevices = fritz_api.call_action("Hosts", "GetHostNumberOfEntries")[ "NewHostNumberOfEntries" ] devices = [] for i in range(numberOfDevices): devices.append( fritz_api.call_action("Hosts", "GetGenericHostEntry", NewIndex=i) ) return devices def get_specific_device(mac_adress: str) -> dict: return fritz_api.call_action( "Hosts", "GetSpecificHostEntry", NewMACAddress=mac_adress ) def check_for_change(): # Check if devices are away for away-mode all_away = True for device in away_devices: last_online = devices_last_online[device] if (datetime.now() - last_online).total_seconds() < trigger_away_after_seconds: all_away = False break # Execute away mode if all_away: hue.in_room_deactivate_lights("Max Zimmer") async def track_network_devices(): global devices_last_online # Initial values to avoid None for macaddress in macaddresses_to_track: devices_last_online[macaddress] = datetime(1970, 1, 1, 0, 0, 0) while True: try: for macaddress in macaddresses_to_track: is_online = get_specific_device(macaddress)["NewActive"] if is_online: devices_last_online[macaddress] = datetime.now() check_for_change() except Exception as ex: logging.exception(ex) finally: await asyncio.sleep(refresh_every_seconds)