2025-01-12 17:39:28 +01:00
|
|
|
import requests
|
|
|
|
import asyncio
|
|
|
|
from core.entity import Entity
|
|
|
|
|
|
|
|
|
|
|
|
class BedscaleWeightResult:
|
|
|
|
def __init__(self, *, tr: float, tl: float, br: float, bl: float):
|
|
|
|
self.top_right = tr
|
|
|
|
self.top_left = tl
|
|
|
|
self.bottom_right = br
|
|
|
|
self.bottom_left = bl
|
|
|
|
|
|
|
|
# Calculate total if all values available
|
|
|
|
self.total: float | None = None
|
|
|
|
if None not in [bl, br, tl, tr]:
|
|
|
|
self.total = sum([bl, br, tl, tr])
|
|
|
|
|
|
|
|
|
|
|
|
class BedscaleEntity(Entity):
|
2025-01-16 19:14:34 +01:00
|
|
|
|
|
|
|
def __init__(self, *, ip_address: str, id: str, name: str, room: str):
|
2025-01-12 17:39:28 +01:00
|
|
|
super().__init__(id=id, name=name, room=room, device_type="bedscale")
|
|
|
|
self._ip_address = ip_address
|
|
|
|
|
|
|
|
async def poll_weights(self) -> BedscaleWeightResult:
|
2025-01-16 19:14:34 +01:00
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
|
|
|
tr_request = loop.run_in_executor(None, self.__poll_scale__, "tr")
|
|
|
|
tl_request = loop.run_in_executor(None, self.__poll_scale__, "tl")
|
|
|
|
br_request = loop.run_in_executor(None, self.__poll_scale__, "br")
|
|
|
|
bl_request = loop.run_in_executor(None, self.__poll_scale__, "bl")
|
|
|
|
|
2025-01-12 17:39:28 +01:00
|
|
|
results = BedscaleWeightResult(
|
2025-01-16 19:14:34 +01:00
|
|
|
tr=await tr_request,
|
|
|
|
tl=await tl_request,
|
|
|
|
br=await br_request,
|
|
|
|
bl=await bl_request,
|
2025-01-12 17:39:28 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
# TODO: Sanity checks
|
|
|
|
|
2025-01-16 19:14:34 +01:00
|
|
|
# TODO: Keep track of empty-bed weight
|
|
|
|
|
2025-01-12 17:39:28 +01:00
|
|
|
return results
|
|
|
|
|
|
|
|
def __poll_scale__(self, leg: str) -> float | None:
|
|
|
|
try:
|
2025-01-16 20:49:27 +01:00
|
|
|
return requests.get(f"{self._ip_address}/sensor/{leg}/").json()["value"]
|
2025-01-12 17:39:28 +01:00
|
|
|
except:
|
|
|
|
return None
|