156 lines
4.8 KiB
Python
156 lines
4.8 KiB
Python
import asyncio
|
|
from datetime import datetime
|
|
import math
|
|
import os
|
|
from statistics import median
|
|
from typing import Optional
|
|
import requests as r
|
|
from ..hue import hue
|
|
import logging
|
|
|
|
file_path: str = "bettwaage.csv"
|
|
header: str = "timestamp;tl;tr;bl;br;total;\n"
|
|
|
|
bett_ip: str = "http://192.168.178.110:80"
|
|
matrix_clock_api: str = "http://192.168.178.84:8000"
|
|
|
|
empty_weight: Optional[float] = None
|
|
local_history = []
|
|
history_max_length: int = 24 * 60 * 60 # 24 hours
|
|
min_noticable_difference: float = 25 # In kg
|
|
show_scale_countdown: int = 0 # Number of updates for the scale, until return to clock
|
|
|
|
average_person_weight: float = 75
|
|
|
|
is_warning_active: int = 0
|
|
leg_capacity_limit_patterns = [
|
|
{"limit": 80, "pattern": 110, "duration": 1000},
|
|
{"limit": 90, "pattern": 110, "duration": 250},
|
|
{"limit": 100, "pattern": 10, "duration": 50},
|
|
]
|
|
|
|
|
|
def get_clusters(data: list[float], min_delta: float) -> dict:
|
|
clusters = {}
|
|
for point in data:
|
|
for known in clusters.keys():
|
|
if math.abs(point - known) < min_delta:
|
|
clusters[known].append(point)
|
|
continue
|
|
clusters[point] = [point]
|
|
return clusters
|
|
|
|
|
|
def show_time():
|
|
r.post(f"{matrix_clock_api}/time")
|
|
|
|
|
|
def show_scale(weight: float):
|
|
r.post(f"{matrix_clock_api}/message", json={"message": f"{weight:3.1f}kg"})
|
|
|
|
|
|
def is_capacity_reached() -> bool:
|
|
latest = local_history[-1]
|
|
highest_limit = None
|
|
for value in [latest["tl"], latest["tr"], latest["br"], latest["bl"]]:
|
|
for limit in leg_capacity_limit_patterns:
|
|
if value >= limit["limit"] and (
|
|
highest_limit is None or limit["limit"] > highest_limit["limit"]
|
|
):
|
|
highest_limit = limit
|
|
|
|
global is_warning_active
|
|
if highest_limit is None:
|
|
if is_warning_active:
|
|
is_warning_active = 0
|
|
show_time()
|
|
return False
|
|
|
|
if is_warning_active != highest_limit["limit"]:
|
|
is_warning_active = highest_limit["limit"]
|
|
r.post(
|
|
f"{matrix_clock_api}/pattern?pattern={highest_limit['pattern']}&step_ms={highest_limit['duration']}"
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
def check_for_change():
|
|
# Check for capicity limits
|
|
if is_capacity_reached():
|
|
logging.info(f"Capacity reached")
|
|
return
|
|
|
|
global show_scale_countdown
|
|
latest = local_history[-1]
|
|
if show_scale_countdown > 0 and show_scale_countdown % 3 == 0:
|
|
show_scale(latest["total"])
|
|
show_scale_countdown -= 1
|
|
|
|
# Is triggered?
|
|
delta = latest["total"] - local_history[-2]["total"]
|
|
if math.abs(delta) < min_noticable_difference:
|
|
logging.info(f"Delta: {delta}")
|
|
return
|
|
|
|
# Changed weight up or down?
|
|
logging.info(f"Delta: {delta}")
|
|
weight_increased = delta > 0
|
|
|
|
# Make sure there is a bed_weight
|
|
global empty_weight
|
|
if empty_weight is None:
|
|
clusters = get_clusters([d["total"] for d in local_history])
|
|
empty_weight = min([median(cluster) for cluster in clusters.values()])
|
|
logging.info(f"Empty weight: {empty_weight}")
|
|
|
|
# Determine number of people
|
|
number_of_people = round((latest["total"] - empty_weight) / average_person_weight)
|
|
logging.info(f"Number of people: {number_of_people}")
|
|
|
|
if number_of_people == 1 and weight_increased:
|
|
show_scale_countdown = 60 # Should be a multiple of 3
|
|
elif number_of_people >= 2 and weight_increased:
|
|
show_scale_countdown = 0
|
|
show_time()
|
|
hue.in_room_activate_scene("Max Zimmer", "Sexy")
|
|
elif number_of_people == 1 and not weight_increased:
|
|
hue.in_room_activate_scene("Max Zimmer", "Tageslicht")
|
|
else:
|
|
show_scale_countdown = 0
|
|
show_time()
|
|
|
|
|
|
def add_line_to_bed_history(line: str) -> None:
|
|
exists = os.path.exists(file_path)
|
|
with open(file_path, "a") as fp:
|
|
if not exists:
|
|
fp.write(header)
|
|
fp.write(line + "\n")
|
|
|
|
|
|
def add_weights_to_log(tl: float, tr: float, bl: float, br: float):
|
|
total = tl + tr + bl + br
|
|
timestamp = datetime.now()
|
|
|
|
global local_history
|
|
local_history.append({"tl": tl, "tr": tr, "bl": bl, "br": br, "total": total})
|
|
if len(local_history) > history_max_length:
|
|
local_history = local_history[len(local_history) - history_max_length :]
|
|
|
|
add_line_to_bed_history(f"{str(timestamp)};{tl};{tr};{bl};{br};{total};")
|
|
|
|
|
|
async def log_bed_weights():
|
|
while True:
|
|
try:
|
|
tl = r.get(f"{bett_ip}/sensor/tl/").json()["value"]
|
|
tr = r.get(f"{bett_ip}/sensor/tr/").json()["value"]
|
|
bl = r.get(f"{bett_ip}/sensor/bl/").json()["value"]
|
|
br = r.get(f"{bett_ip}/sensor/br/").json()["value"]
|
|
|
|
add_weights_to_log(tl, tr, bl, br)
|
|
check_for_change()
|
|
except Exception as ex:
|
|
logging.exception(ex)
|
|
await asyncio.sleep(1)
|