import asyncio from datetime import datetime import math import os from statistics import median from typing import Optional import requests as r from ..hue import hue file_path: str = "bettwaage.csv" header: str = "timestamp;tl;tr;bl;br;total;" bett_ip: str = "http://192.168.178.110:80" matrix_clock_api: str = "http://192.168.178.84:8000" empty_weight: Optional[float] = None local_history = [] history_max_length: int = 24 * 60 * 60 # 24 hours min_noticable_difference: float = 25 # In kg show_scale_countdown: int = 0 # Number of updates for the scale, until return to clock average_person_weight: float = 75 is_warning_active: bool = False leg_capacity_limit_patterns = [ {"limit": 80, "pattern": 110, "duration": 1000}, {"limit": 90, "pattern": 110, "duration": 250}, {"limit": 100, "pattern": 10, "duration": 50}, ] def get_clusters(data: list[float], min_delta: float) -> dict: clusters = {} for point in data: for known in clusters.keys(): if math.abs(point - known) < min_delta: clusters[known].append(point) continue clusters[point] = [point] return clusters def show_time(): r.post(f"{matrix_clock_api}/time") def show_scale(weight: float): r.post(f"{matrix_clock_api}/message", json={"message": f"{weight:3.1f}kg"}) def is_capacity_reached() -> bool: latest = local_history[-1] highest_limit = None for value in [latest["tl"], latest["tr"], latest["br"], latest["bl"]]: for limit in leg_capacity_limit_patterns: if value >= limit["limit"] and ( highest_limit is None or limit["limit"] > highest_limit["limit"] ): highest_limit = limit global is_warning_active if highest_limit is None: if is_warning_active: is_warning_active = False show_time() return is_warning_active = True r.post( f"{matrix_clock_api}/pattern?pattern={highest_limit['pattern']}&step_ms={highest_limit['duration']}" ) def check_for_change(): # Check for capicity limits if is_capacity_reached(): return global show_scale_countdown latest = local_history[-1] if show_scale_countdown > 0 and show_scale_countdown % 3 == 0: show_scale(latest["total"]) show_scale_countdown -= 1 # Is triggered? delta = latest["total"] - local_history[-2]["total"] if math.abs(delta) < min_noticable_difference: return # Changed weight up or down? weight_increased = delta > 0 # Make sure there is a bed_weight global empty_weight if empty_weight is None: clusters = get_clusters([d["total"] for d in local_history]) empty_weight = min([median(cluster) for cluster in clusters.values()]) # Determine number of people number_of_people = round((latest["total"] - empty_weight) / average_person_weight) if number_of_people == 1 and weight_increased: show_scale_countdown = 60 # Should be a multiple of 3 elif number_of_people >= 2 and weight_increased: show_scale_countdown = 0 show_time() hue.in_room_activate_scene("Max Zimmer", "Sexy") elif number_of_people == 1 and not weight_increased: hue.in_room_activate_scene("Max Zimmer", "Tageslicht") else: show_scale_countdown = 0 show_time() def add_line_to_bed_history(line: str) -> None: if not os.path.exists(file_path): add_line_to_bed_history(header) with open(file_path, "a") as fp: fp.write(line + "\n") def add_weights_to_log(tl: float, tr: float, bl: float, br: float): total = tl + tr + bl + br timestamp = datetime.now() global local_history local_history.append({"tl": tl, "tr": tr, "bl": bl, "br": br, "total": total}) if len(local_history): local_history = local_history[len(local_history) - history_max_length :] add_line_to_bed_history(f"{str(timestamp)};{tl};{tr};{bl};{br};{total};") check_for_change() async def log_bed_weights(): while True: try: tl = r.get(f"{bett_ip}/sensor/tl/").json()["value"] tr = r.get(f"{bett_ip}/sensor/tr/").json()["value"] # bl = r.get(f"{bett_ip}/sensor/bl/").json()["value"] br = r.get(f"{bett_ip}/sensor/br/").json()["value"] # Remove later bl = br add_weights_to_log(tl, tr, bl, br) except: pass await asyncio.sleep(1)