mash-server/src/bridges/bedscale/bedscale_entity.py

68 lines
2 KiB
Python
Raw Normal View History

2025-01-12 17:39:28 +01:00
import requests
import asyncio
from core.entity import Entity
class BedscaleWeightResult:
def __init__(self, *, tr: float, tl: float, br: float, bl: float):
self.top_right = tr
self.top_left = tl
self.bottom_right = br
self.bottom_left = bl
# Calculate total if all values available
self.total: float | None = None
2025-01-20 06:41:18 +01:00
values = [bl, br, tl, tr]
if None not in values:
self.total = sum(values)
2025-01-12 17:39:28 +01:00
class BedscaleEntity(Entity):
2025-01-21 06:45:21 +01:00
def __init__(
self,
*,
ip_address: str,
id: str,
name: str,
room: str,
history_length: int = 60 * 5,
):
2025-01-12 17:39:28 +01:00
super().__init__(id=id, name=name, room=room, device_type="bedscale")
self._ip_address = ip_address
2025-01-21 06:45:21 +01:00
self._history: list[BedscaleWeightResult] = []
self._history_length = history_length
self.latest_weight: BedscaleWeightResult = None
2025-01-12 17:39:28 +01:00
2025-01-21 06:45:21 +01:00
async def update(self):
loop = asyncio.get_event_loop()
tr_request = loop.run_in_executor(None, self.__poll_scale__, "tr")
tl_request = loop.run_in_executor(None, self.__poll_scale__, "tl")
br_request = loop.run_in_executor(None, self.__poll_scale__, "br")
bl_request = loop.run_in_executor(None, self.__poll_scale__, "bl")
2025-01-21 06:45:21 +01:00
new_result = BedscaleWeightResult(
tr=await tr_request,
tl=await tl_request,
br=await br_request,
bl=await bl_request,
2025-01-12 17:39:28 +01:00
)
# TODO: Sanity checks
# TODO: Keep track of empty-bed weight
2025-01-21 06:45:21 +01:00
self._history.append(new_result)
if len(self._history) > self._history_length:
self._history = self._history[-self._history_length :]
2025-01-12 17:39:28 +01:00
def __poll_scale__(self, leg: str) -> float | None:
try:
2025-01-16 20:49:27 +01:00
return requests.get(f"{self._ip_address}/sensor/{leg}/").json()["value"]
2025-01-12 17:39:28 +01:00
except:
return None
2025-01-21 06:45:21 +01:00
def get_history(self) -> list[BedscaleWeightResult]:
return self._history