78 lines
2.3 KiB
Python
78 lines
2.3 KiB
Python
import asyncio
|
|
import logging
|
|
from fritzconnection import FritzConnection
|
|
from datetime import datetime
|
|
from ..hue import hue
|
|
|
|
|
|
refresh_every_seconds: int = 60 # Every x seconds devices are polled again
|
|
trigger_away_after_seconds: int = (
|
|
3 * 60
|
|
) # After all away-devices are gone for x seconds
|
|
away_triggered = False
|
|
away_devices = ["B2:06:77:EE:A9:0F"] # Max' iPhone
|
|
macaddresses_to_track = ["B2:06:77:EE:A9:0F"] # Max' iPhone
|
|
|
|
fritz_api = FritzConnection(address="192.168.178.1")
|
|
|
|
# Referenced documentation: https://avm.de/fileadmin/user_upload/Global/Service/Schnittstellen/hostsSCPD.pdf
|
|
|
|
|
|
devices_last_online = {}
|
|
|
|
|
|
def get_all_devices() -> list:
|
|
numberOfDevices = fritz_api.call_action("Hosts", "GetHostNumberOfEntries")[
|
|
"NewHostNumberOfEntries"
|
|
]
|
|
devices = []
|
|
for i in range(numberOfDevices):
|
|
devices.append(
|
|
fritz_api.call_action("Hosts", "GetGenericHostEntry", NewIndex=i)
|
|
)
|
|
return devices
|
|
|
|
|
|
def get_specific_device(mac_address: str) -> dict:
|
|
return fritz_api.call_action(
|
|
"Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address
|
|
)
|
|
|
|
|
|
def check_for_change():
|
|
# Check if devices are away for away-mode
|
|
all_away = True
|
|
for device in away_devices:
|
|
last_online = devices_last_online[device]
|
|
if (datetime.now() - last_online).total_seconds() < trigger_away_after_seconds:
|
|
all_away = False
|
|
break
|
|
|
|
# Execute away mode
|
|
global away_triggered
|
|
if all_away:
|
|
if not away_triggered:
|
|
away_triggered = True
|
|
hue.in_room_deactivate_lights("Max Zimmer")
|
|
else:
|
|
away_triggered = False
|
|
|
|
async def track_network_devices():
|
|
global devices_last_online
|
|
|
|
# Initial values to avoid None
|
|
for macaddress in macaddresses_to_track:
|
|
devices_last_online[macaddress] = datetime(1970, 1, 1, 0, 0, 0)
|
|
|
|
while True:
|
|
try:
|
|
for macaddress in macaddresses_to_track:
|
|
is_online = get_specific_device(macaddress)["NewActive"]
|
|
if is_online:
|
|
devices_last_online[macaddress] = datetime.now()
|
|
|
|
check_for_change()
|
|
except Exception as ex:
|
|
logging.exception(ex)
|
|
finally:
|
|
await asyncio.sleep(refresh_every_seconds)
|