Removed and restructured old unused snippets

This commit is contained in:
Maximilian Giller 2024-08-07 03:30:20 +02:00
parent 1a3bcb88f0
commit 69465c8edf
19 changed files with 0 additions and 361 deletions

View file

@ -1,27 +0,0 @@
import paho.mqtt.client as mqtt
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, reason_code, properties):
print(f"Connected with result code {reason_code}")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("zigbee2mqtt/#")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.connect("raspberrypi", 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
mqttc.loop_forever()

View file

@ -1 +0,0 @@
paho-mqtt

View file

@ -1,58 +0,0 @@
import asyncio
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi import APIRouter
import os
import csv
from .handlers.bett import file_path, local_history, log_bed_weights
router = APIRouter()
asyncio.create_task(log_bed_weights())
@router.get("/file", tags=["file"])
async def get_file():
with open(file_path, "r", encoding="UTF-8") as fp:
return HTMLResponse("\n".join(fp.readlines()))
@router.get("/history")
async def get_history(count: int = None) -> list[dict]:
points = []
with open(file_path, "r", encoding="UTF-8") as fp:
reader = csv.DictReader(fp, delimiter=";")
for row in reader:
if not row:
continue
points.append(
{
"timestamp": row["timestamp"],
"total": float(row["total"]),
"tl": float(row["tl"]),
"tr": float(row["tr"]),
"bl": float(row["bl"]),
"br": float(row["br"]),
}
)
if count:
return points[-count]
else:
return points
@router.get("/latest")
async def get_latest():
if len(local_history) == 0:
return HTMLResponse(status_code=200, content="No data given yet")
return JSONResponse(local_history[-1])
@router.delete("/delete", tags=["file"])
async def delete_file():
os.remove(file_path)
return "Deleted file"

View file

@ -1,78 +0,0 @@
import asyncio
import logging
from fritzconnection import FritzConnection
from datetime import datetime
from ..hue import hue
refresh_every_seconds: int = 60 # Every x seconds devices are polled again
trigger_away_after_seconds: int = (
3 * 60
) # After all away-devices are gone for x seconds
away_triggered = False
away_devices = ["B2:06:77:EE:A9:0F"] # Max' iPhone
macaddresses_to_track = ["B2:06:77:EE:A9:0F"] # Max' iPhone
fritz_api = FritzConnection(address="192.168.178.1")
# Referenced documentation: https://avm.de/fileadmin/user_upload/Global/Service/Schnittstellen/hostsSCPD.pdf
devices_last_online = {}
def get_all_devices() -> list:
numberOfDevices = fritz_api.call_action("Hosts", "GetHostNumberOfEntries")[
"NewHostNumberOfEntries"
]
devices = []
for i in range(numberOfDevices):
devices.append(
fritz_api.call_action("Hosts", "GetGenericHostEntry", NewIndex=i)
)
return devices
def get_specific_device(mac_address: str) -> dict:
return fritz_api.call_action(
"Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address
)
def check_for_change():
# Check if devices are away for away-mode
all_away = True
for device in away_devices:
last_online = devices_last_online[device]
if (datetime.now() - last_online).total_seconds() < trigger_away_after_seconds:
all_away = False
break
# Execute away mode
global away_triggered
if all_away:
if not away_triggered:
away_triggered = True
hue.in_room_deactivate_lights("Max Zimmer")
else:
away_triggered = False
async def track_network_devices():
global devices_last_online
# Initial values to avoid None
for macaddress in macaddresses_to_track:
devices_last_online[macaddress] = datetime(1970, 1, 1, 0, 0, 0)
while True:
try:
for macaddress in macaddresses_to_track:
is_online = get_specific_device(macaddress)["NewActive"]
if is_online:
devices_last_online[macaddress] = datetime.now()
check_for_change()
except Exception as ex:
logging.exception(ex)
finally:
await asyncio.sleep(refresh_every_seconds)

View file

@ -1,86 +0,0 @@
from time import sleep
from phue import Bridge
from pathlib import Path
class HueAdapter:
"""Handler for Hue API calls."""
registered_ips_file = "hue_bridge_registered.txt"
def __init__(self, bridge_ip: str):
"""Initialize the HueHandler."""
self.bridge = None
self.connect(bridge_ip)
def connect(self, bridge_ip: str):
if bridge_ip in self.get_registered_ips():
self.bridge = Bridge(bridge_ip)
self.bridge.connect()
return
# Connect loop
while True:
try:
self.bridge = Bridge(bridge_ip)
self.bridge.connect()
break
except Exception as e:
print(f"Failed to connect to bridge: {bridge_ip}")
print(e)
print("Trying again in 5 seconds..")
sleep(5)
self.register_bridge(bridge_ip)
def get_registered_ips(self) -> list:
"""Get a list of registered bridge IPs."""
if not Path(HueAdapter.registered_ips_file).is_file():
return []
with open(HueAdapter.registered_ips_file, "r") as f:
return [ad.strip() for ad in f.readlines()]
def register_bridge(self, bridge_ip: str):
"""Register a bridge IP."""
with open(HueAdapter.registered_ips_file, "a") as f:
f.write(bridge_ip + "\n")
def list_scenes(self) -> dict:
return self.bridge.get_scene()
def get_scene_by_name(self, name):
for key, scene in self.list_scenes().items():
if scene["name"] == name:
scene["id"] = key
return scene
return None
def in_room_activate_scene(self, room_name: str, scene_name: str):
"""Activate a scene in a room.
Args:
scene (str): The name of the scene to activate.
room (str): The name of the room to activate the scene in.
"""
scene_id = self.get_scene_by_name(scene_name)["id"]
if scene_id is None:
raise "Scene not found."
self.bridge.set_group(room_name, {"scene": scene_id})
def in_room_deactivate_lights(self, room_name: str):
"""Deactivate all lights in a room.
Args:
room_name (str): The name of the room to deactivate the lights in.
"""
self.bridge.set_group(room_name, {"on": False})
def in_room_activate_lights(self, room_name: str):
"""Activate all lights in a room.
Args:
room_name (str): The name of the room to activate the lights in.
"""
self.bridge.set_group(room_name, {"on": True})

View file

@ -1,60 +0,0 @@
from fastapi import FastAPI, APIRouter
from hue.hue_adapter import HueAdapter
from ..mash.core.feature import Feature
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
router = APIRouter(tags=["hue"])
hue = HueAdapter("192.168.178.85")
########## Integration ##########
class HueIntegration(Feature):
def __init__(self) -> None:
super().__init__("hue")
def add_routes(self, server: FastAPI) -> None:
server.include_router(router, prefix="/hue")
########## Routes ##########
@router.get("/scenes", tags=["scene"])
async def get_scenes():
return hue.list_scenes()
@router.post(
"/room/{room_name}/scene/{scene_name}",
tags=["room", "scene"],
)
async def activate_scene(room_name: str, scene_name: str):
try:
hue.in_room_activate_scene(room_name, scene_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))
@router.post(
"/room/{room_name}/off",
tags=["room"],
)
async def deactivate_room(room_name: str):
try:
hue.in_room_deactivate_lights(room_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))
@router.post(
"/room/{room_name}/on",
tags=["room"],
)
async def activate_room(room_name: str):
try:
hue.in_room_activate_lights(room_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))

View file

@ -1,28 +0,0 @@
from mash.bridges.fritzbox.fritzbox_bridge import FritzBoxBridge, FritzDeviceState
from time import sleep
import logging
# logging.getLogger().setLevel(logging.DEBUG)
bridge = FritzBoxBridge(id="fritzbox", ip="192.168.178.1", refresh_delay_sec=3)
def device_change(mac_address: str, device_state: FritzDeviceState) -> None:
print(
f"{device_state.host_name} - {'Connected' if device_state.active else 'Disconnected'}"
)
print(str(device_state))
bridge.subscribe_device(device_change, mac_address="FA:01:EC:90:50:49")
bridge.connect()
while True:
try:
sleep(3)
except KeyboardInterrupt:
break
bridge.disconnect()

View file

@ -1,23 +0,0 @@
from time import sleep
from mash.bridges.zigbee2mqtt.zigbee2mqtt_bridge import Z2mBridge
z2m = Z2mBridge(id="z2m", ip="192.168.178.115")
def wardrobe_cb(device_name: str, payload: dict) -> None:
print(f"{device_name} - {'closed' if payload['contact'] else 'open'}")
z2m.subscribe_device(wardrobe_cb, friendly_name="max-wardrobe-door")
z2m.subscribe_device(wardrobe_cb, friendly_name="max-window-contact")
z2m.connect()
while True:
try:
sleep(3)
except KeyboardInterrupt:
break
z2m.disconnect()