Compare commits

...

21 commits

Author SHA1 Message Date
Max
7164d6e2fd Slimming stuff down 2025-01-21 18:52:53 +01:00
1d46b570f6 Hmmmm, more or less working hue stuff 2025-01-21 06:45:21 +01:00
5e8ad4f400 Expanded color datatype 2025-01-21 05:03:47 +01:00
668e8e78b1 Removed default implementation for "on" property 2025-01-21 05:03:39 +01:00
c3ec52005e Small improvement 2025-01-20 06:41:18 +01:00
Max
c78546ffcf More hue implementation 2025-01-20 01:10:30 +01:00
Max
db5e826aea Some unfinished hue stuff 2025-01-19 17:38:25 +01:00
Max
7b9ab32db1 Implemented new fritzbox 2025-01-17 16:41:16 +01:00
Max
e131f58849 Reimplemented group entities 2025-01-16 20:52:55 +01:00
Max
08c4372c4e Fixed wrong url 2025-01-16 20:49:27 +01:00
Max
5defaba45f Background services running properly; New Bedscale Entity implemented 2025-01-16 19:14:34 +01:00
9aba0279d7 bescale api 2025-01-12 17:39:28 +01:00
04e44849ee Normalizing device type a bit 2025-01-12 17:18:02 +01:00
8a2773a97c Momentary handling for background services 2025-01-12 17:15:12 +01:00
f5b4a6c30f Added missing parameter to poetry conf 2025-01-12 17:13:58 +01:00
Max
142439b7c3 Sync commit 2025-01-11 01:17:12 +01:00
Max
f9fdffbdbd Refactor + documentation 2025-01-10 11:51:29 +01:00
Max
32853a0e7c Added some comments 2025-01-10 11:46:49 +01:00
Max
51e446710b Fixing task cancellation in main 2025-01-09 21:49:43 +01:00
Max
003284ccba Merging new concepts with current version 2025-01-09 19:06:11 +01:00
Max
0beaab9549 Switched to Poetry for dependency management 2025-01-09 16:43:13 +01:00
42 changed files with 2822 additions and 529 deletions

View file

@ -1,10 +1,10 @@
# Max' Smart Home - MaSH
Should be a very simple **server** implementation of what is required in Max's smart home. Trying not to overcomplicate things and thereby ruin motivation to work on this.
Should be a (very simple?) **server** implementation of what is required in Max's smart home. Trying not to overcomplicate things and thereby ruin motivation to work on this.
## Sensors
- [ToF People Counter](https://github.com/mgfcf/mash-sensor-tof-pc)
- [ToF People Counter](https://github.com/mgfcf/mash-sensor-tof-pc)
## ToDo
@ -12,3 +12,12 @@ Should be a very simple **server** implementation of what is required in Max's s
- Daylight Adjustment (E.g. No ceiling lights during daytime)
- Save scene when turning off, to reapply same scene when turning on
- Detect fast flickering of light state, indicating an issue, and disable the system for a few minutes
## Structure
- `src/` - All code
- `main.py` - Entry point for execution
- `new_syntax_example.py` - Not in use, just noting some ideas about a potential syntax
- `core/` - Contains more abstract framework definitions
- `bridges/` - Contains latest code that manages connections to other services/devices
- `endpoints/` - Contains API routes and older handlers/bridges

350
poetry.lock generated Normal file
View file

@ -0,0 +1,350 @@
[[package]]
name = "annotated-types"
version = "0.7.0"
description = "Reusable constraint types to use with typing.Annotated"
category = "main"
optional = false
python-versions = ">=3.8"
[[package]]
name = "anyio"
version = "4.8.0"
description = "High level compatibility layer for multiple asynchronous event loop implementations"
category = "main"
optional = false
python-versions = ">=3.9"
[package.dependencies]
exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""}
idna = ">=2.8"
sniffio = ">=1.1"
typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""}
[package.extras]
trio = ["trio (>=0.26.1)"]
test = ["anyio", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"]
doc = ["packaging", "Sphinx (>=7.4,<8.0)", "sphinx-rtd-theme", "sphinx-autodoc-typehints (>=1.2.0)"]
[[package]]
name = "certifi"
version = "2024.12.14"
description = "Python package for providing Mozilla's CA Bundle."
category = "main"
optional = false
python-versions = ">=3.6"
[[package]]
name = "charset-normalizer"
version = "3.4.1"
description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet."
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "click"
version = "8.1.8"
description = "Composable command line interface toolkit"
category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "colorama"
version = "0.4.6"
description = "Cross-platform colored terminal text."
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
[[package]]
name = "exceptiongroup"
version = "1.2.2"
description = "Backport of PEP 654 (exception groups)"
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
test = ["pytest (>=6)"]
[[package]]
name = "fastapi"
version = "0.115.6"
description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production"
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0"
starlette = ">=0.40.0,<0.42.0"
typing-extensions = ">=4.8.0"
[package.extras]
standard = ["fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "email-validator (>=2.0.0)", "uvicorn[standard] (>=0.12.0)"]
all = ["fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "itsdangerous (>=1.1.0)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "orjson (>=3.2.1)", "email-validator (>=2.0.0)", "uvicorn[standard] (>=0.12.0)", "pydantic-settings (>=2.0.0)", "pydantic-extra-types (>=2.0.0)"]
[[package]]
name = "fritzconnection"
version = "1.14.0"
description = "Communicate with the AVM FRITZ!Box"
category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
requests = ">=2.22.0"
[package.extras]
qr = ["segno (>=1.4.1)"]
[[package]]
name = "h11"
version = "0.14.0"
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "httptools"
version = "0.6.4"
description = "A collection of framework independent HTTP protocol utils."
category = "main"
optional = false
python-versions = ">=3.8.0"
[package.extras]
test = ["Cython (>=0.29.24)"]
[[package]]
name = "idna"
version = "3.10"
description = "Internationalized Domain Names in Applications (IDNA)"
category = "main"
optional = false
python-versions = ">=3.6"
[package.extras]
all = ["ruff (>=0.6.2)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "flake8 (>=7.1.1)"]
[[package]]
name = "paho-mqtt"
version = "2.1.0"
description = "MQTT version 5.0/3.1.1 client class"
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
proxy = ["pysocks"]
[[package]]
name = "phue"
version = "1.1"
description = "A Philips Hue Python library"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "pydantic"
version = "2.10.5"
description = "Data validation using Python type hints"
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
annotated-types = ">=0.6.0"
pydantic-core = "2.27.2"
typing-extensions = ">=4.12.2"
[package.extras]
email = ["email-validator (>=2.0.0)"]
timezone = ["tzdata"]
[[package]]
name = "pydantic-core"
version = "2.27.2"
description = "Core functionality for Pydantic validation and serialization"
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0"
[[package]]
name = "python-dotenv"
version = "1.0.1"
description = "Read key-value pairs from a .env file and set them as environment variables"
category = "main"
optional = false
python-versions = ">=3.8"
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "pyyaml"
version = "6.0.2"
description = "YAML parser and emitter for Python"
category = "main"
optional = false
python-versions = ">=3.8"
[[package]]
name = "requests"
version = "2.32.3"
description = "Python HTTP for Humans."
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
certifi = ">=2017.4.17"
charset-normalizer = ">=2,<4"
idna = ">=2.5,<4"
urllib3 = ">=1.21.1,<3"
[package.extras]
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "sniffio"
version = "1.3.1"
description = "Sniff out which async library your code is running under"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "starlette"
version = "0.41.3"
description = "The little ASGI library that shines."
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
anyio = ">=3.4.0,<5"
[package.extras]
full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.7)", "pyyaml"]
[[package]]
name = "typing-extensions"
version = "4.12.2"
description = "Backported and Experimental Type Hints for Python 3.8+"
category = "main"
optional = false
python-versions = ">=3.8"
[[package]]
name = "urllib3"
version = "2.3.0"
description = "HTTP library with thread-safe connection pooling, file post, and more."
category = "main"
optional = false
python-versions = ">=3.9"
[package.extras]
brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"]
h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "uvicorn"
version = "0.34.0"
description = "The lightning-fast ASGI server."
category = "main"
optional = false
python-versions = ">=3.9"
[package.dependencies]
click = ">=7.0"
colorama = {version = ">=0.4", optional = true, markers = "sys_platform == \"win32\" and extra == \"standard\""}
h11 = ">=0.8"
httptools = {version = ">=0.6.3", optional = true, markers = "extra == \"standard\""}
python-dotenv = {version = ">=0.13", optional = true, markers = "extra == \"standard\""}
pyyaml = {version = ">=5.1", optional = true, markers = "extra == \"standard\""}
typing-extensions = {version = ">=4.0", markers = "python_version < \"3.11\""}
uvloop = {version = ">=0.14.0,<0.15.0 || >0.15.0,<0.15.1 || >0.15.1", optional = true, markers = "sys_platform != \"win32\" and sys_platform != \"cygwin\" and platform_python_implementation != \"PyPy\" and extra == \"standard\""}
watchfiles = {version = ">=0.13", optional = true, markers = "extra == \"standard\""}
websockets = {version = ">=10.4", optional = true, markers = "extra == \"standard\""}
[package.extras]
standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"]
[[package]]
name = "uvloop"
version = "0.21.0"
description = "Fast implementation of asyncio event loop on top of libuv"
category = "main"
optional = false
python-versions = ">=3.8.0"
[package.extras]
dev = ["setuptools (>=60)", "Cython (>=3.0,<4.0)"]
docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)"]
test = ["aiohttp (>=3.10.5)", "flake8 (>=5.0,<6.0)", "psutil", "pycodestyle (>=2.9.0,<2.10.0)", "pyOpenSSL (>=23.0.0,<23.1.0)", "mypy (>=0.800)"]
[[package]]
name = "watchfiles"
version = "1.0.3"
description = "Simple, modern and high performance file watching and code reload in python."
category = "main"
optional = false
python-versions = ">=3.9"
[package.dependencies]
anyio = ">=3.0.0"
[[package]]
name = "websockets"
version = "14.1"
description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)"
category = "main"
optional = false
python-versions = ">=3.9"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "470a473cadc51beaadf9c37d81c5ddbde315022964b11d596949a69d3905c583"
[metadata.files]
annotated-types = []
anyio = []
certifi = []
charset-normalizer = []
click = []
colorama = []
exceptiongroup = []
fastapi = []
fritzconnection = []
h11 = []
httptools = []
idna = []
paho-mqtt = []
phue = []
pydantic = []
pydantic-core = []
python-dotenv = []
pyyaml = []
requests = []
sniffio = []
starlette = []
typing-extensions = []
urllib3 = []
uvicorn = []
uvloop = []
watchfiles = []
websockets = []

24
pyproject.toml Normal file
View file

@ -0,0 +1,24 @@
[tool.poetry]
name = "mash-server"
version = "0.1.0"
description = "Max' Smart Home"
authors = ["Max <m.giller.dev@gmail.com>"]
package-mode = false
[tool.poetry.scripts]
start = "main:app"
[tool.poetry.dependencies]
python = "^3.10"
phue = "^1.1"
fritzconnection = "^1.14.0"
fastapi = "^0.115.6"
requests = "^2.32.3"
paho-mqtt = "^2.1.0"
uvicorn = {extras = ["standard"], version = "^0.34.0"}
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

View file

@ -1,16 +0,0 @@
# For Philips Hue
phue
# For Fritz.Box API
fritzconnection
# API
requests
fastapi
uvicorn[standard]
# Clients
requests
# Config file
pyyaml

View file

@ -0,0 +1 @@
from .bedscale_entity import BedscaleEntity

View file

@ -0,0 +1,67 @@
import requests
import asyncio
from core.entity import Entity
class BedscaleWeightResult:
def __init__(self, *, tr: float, tl: float, br: float, bl: float):
self.top_right = tr
self.top_left = tl
self.bottom_right = br
self.bottom_left = bl
# Calculate total if all values available
self.total: float | None = None
values = [bl, br, tl, tr]
if None not in values:
self.total = sum(values)
class BedscaleEntity(Entity):
def __init__(
self,
*,
ip_address: str,
id: str,
name: str,
room: str,
history_length: int = 60 * 5,
):
super().__init__(id=id, name=name, room=room, device_type="bedscale")
self._ip_address = ip_address
self._history: list[BedscaleWeightResult] = []
self._history_length = history_length
self.latest_weight: BedscaleWeightResult = None
async def update(self):
loop = asyncio.get_event_loop()
tr_request = loop.run_in_executor(None, self.__poll_scale__, "tr")
tl_request = loop.run_in_executor(None, self.__poll_scale__, "tl")
br_request = loop.run_in_executor(None, self.__poll_scale__, "br")
bl_request = loop.run_in_executor(None, self.__poll_scale__, "bl")
new_result = BedscaleWeightResult(
tr=await tr_request,
tl=await tl_request,
br=await br_request,
bl=await bl_request,
)
# TODO: Sanity checks
# TODO: Keep track of empty-bed weight
self._history.append(new_result)
if len(self._history) > self._history_length:
self._history = self._history[-self._history_length :]
def __poll_scale__(self, leg: str) -> float | None:
try:
return requests.get(f"{self._ip_address}/sensor/{leg}/").json()["value"]
except:
return None
def get_history(self) -> list[BedscaleWeightResult]:
return self._history

View file

@ -5,7 +5,7 @@ import os
from statistics import median
from typing import Optional
import requests as r
from ..hue import hue
from ...endpoints.hue import hue_bridge
import logging
file_path: str = "bettwaage.csv"
@ -126,9 +126,9 @@ def check_for_change():
# Make room sexy
if sexy_mode_detection:
if number_of_people >= 2 and weight_increased:
hue.in_room_activate_scene("Max Zimmer", "Sexy")
hue_bridge.in_room_activate_scene("Max Zimmer", "Sexy")
elif number_of_people == 1 and not weight_increased:
hue.in_room_activate_scene("Max Zimmer", "Tageslicht")
hue_bridge.in_room_activate_scene("Max Zimmer", "Tageslicht")
def add_line_to_bed_history(line: str) -> None:
@ -166,6 +166,6 @@ async def log_bed_weights():
add_weights_to_log(tl, tr, bl, br)
check_for_change()
except Exception as ex:
logging.exception(ex)
pass
finally:
await asyncio.sleep(1)

View file

@ -0,0 +1 @@
from .fritzbox_bridge import FritzBoxBridge

View file

@ -0,0 +1,99 @@
import logging
from core.bridge import Bridge
from fritzconnection import FritzConnection
EXCEPTION_RECONNECT_TIMEOUT_SEC = 60
class FritzDeviceState:
"""Holds the state of a fritzbox device and implements comparisons."""
def __init__(self, mac_address: str, raw_state: dict) -> None:
"""
Args:
mac_address (str): Mac Address of fritzbox device.
raw_state (dict): Raw device state containing NewActive, NewIPAddress, NewAddressSource, NewLeaseTimeRemaining, NewInterfaceType & NewHostName.
"""
logging.debug(
f"Fritz raw device state to mac [{mac_address}]: {raw_state}",
extra=raw_state,
)
self.mac_address = mac_address
self.active: bool = raw_state["NewActive"]
self.ip_address: str = raw_state["NewIPAddress"]
self.address_source: str = raw_state["NewAddressSource"]
self.lease_time_remaining = raw_state["NewLeaseTimeRemaining"]
self.interface_type: str = raw_state["NewInterfaceType"]
self.host_name: str = raw_state["NewHostName"]
def __eq__(self, value: object) -> bool:
return (
type(value) is FritzDeviceState
and self.mac_address == value.mac_address
and self.ip_address == value.ip_address
and self.address_source == value.address_source
and self.lease_time_remaining == value.lease_time_remaining
and self.interface_type == value.interface_type
and self.host_name == value.host_name
and self.active == value.active
)
def __str__(self) -> str:
return f"[{self.mac_address} | {self.host_name}] {'Active' if self.active else 'Inactive'} - {self.ip_address} - {self.address_source} - {self.interface_type} - {self.lease_time_remaining}"
class FritzBoxBridge(Bridge):
def __init__(
self,
*,
id: str,
ip: str,
port: int | None = None,
) -> None:
"""
Args:
id (str): Id of fritzbox bridge.
ip (str): IP Address of fritzbox bridge in network to connect to.
port (int, optional): Port of fritzbox bridge in network to connect to. Defaults to None.
"""
self._ip = ip
self._port = port
self._fritz_api: FritzConnection = None
def connect(self) -> None:
if self._fritz_api:
self.disconnect()
self._fritz_api = FritzConnection(address=self._ip, port=self._port)
logging.info("Connected")
def disconnect(self) -> None:
logging.info("Disconnected")
self._fritz_api = None
@property
def is_connected(self) -> bool | None:
return self._fritz_api is not None
@Bridge.requires_connection
def get_known_devices(self) -> list[dict]:
numberOfDevices = self._fritz_api.call_action(
"Hosts", "GetHostNumberOfEntries"
)["NewHostNumberOfEntries"]
devices = []
for i in range(numberOfDevices):
devices.append(
self._fritz_api.call_action("Hosts", "GetGenericHostEntry", NewIndex=i)
)
return devices
@Bridge.requires_connection
def get_device_state(self, mac_address: str) -> FritzDeviceState:
return FritzDeviceState(
mac_address=mac_address,
raw_state=self._fritz_api.call_action(
"Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address
),
)

View file

@ -0,0 +1,2 @@
from .hue_bridge import HueBridge
from .hue_light import HueLight

View file

@ -0,0 +1,124 @@
import logging
from .hue_light import HueLight
from core import Bridge, Group
from phue import Bridge as phueBridge
from time import sleep
class HueBridge(Bridge):
def __init__(
self,
*,
ip_address: str,
id: str,
retry_limit: int = 10,
retry_timeout_seconds: int = 5,
) -> None:
super().__init__(id=id, type="hue")
self._retry_limit = retry_limit
self._retry_timeout_seconds = retry_timeout_seconds
self._hue: phueBridge = phueBridge(ip_address)
def disconnect(self) -> None:
self._hue = None
logging.info(f"Disconnected from Hue Bridge [{self.id}].")
def connect(self) -> None:
for _ in range(self._retry_limit):
try:
self._hue.connect()
logging.info(f"Connected to Hue Bridge [{self.id}].")
return
except Exception as e:
logging.exception(
f"Failed to connect to Hue bridge [ip {self._hue.ip}]. Retrying in [{self._retry_timeout_seconds}] seconds.",
exc_info=e,
)
sleep(self._retry_timeout_seconds)
logging.error(
f"Unable to connect to Hue bridge [{self.id}]. Retry count exceeded [{self._retry_limit}]."
)
@property
def is_connected(self) -> bool | None:
return self._hue is not None
def list_api(self) -> dict:
return self._hue.get_api()
def get_all_lights(self) -> Group:
rooms = {
r["name"]: [int(l) for l in r["lights"]]
for r in self._hue.get_api()["groups"].values()
if r["type"] == "Room"
}
lights = []
for l in self._hue.get_light_objects():
if l.colormode != "xy":
# TODO: Implement light to handle all color modes
continue
room = "Unknown"
for room_name, room_lights in rooms.items():
if l.light_id in room_lights:
room = room_name
break
lights.append(
HueLight(
hue_bridge=self,
hue_light=l,
id=f"hue-light-{l.light_id}",
name=l.name,
room=room,
)
)
return Group(entities=lights, id="all-hue-lights", name="All Hue Lights")
def list_scenes(self) -> dict:
return self._hue.get_scene()
def get_scene_by_name(self, name) -> dict | None:
for key, scene in self.list_scenes().items():
if scene["name"] == name:
scene["id"] = key
return scene
return None
def set_light(self, lights: int | list[int], command):
return self._hue.set_light(lights, command)
def get_light(self, id, command=None):
return self._hue.get_light(id, command)
def set_group(self, groups, command):
return self._hue.set_group(groups, command)
def get_group(self, id, command=None):
return self._hue.get_group(id, command)
def in_room_activate_scene(self, room_name: str, scene_name: str):
"""Activate a scene in a room.
Args:
scene (str): The name of the scene to activate.
room (str): The name of the room to activate the scene in.
"""
scene_id = self.get_scene_by_name(scene_name)["id"]
if scene_id is None:
raise "Scene not found."
self._hue.set_group(room_name, {"scene": scene_id})
def in_room_set_lights_active(self, room_name: str, active: bool):
"""Deactivate all lights in a room.
Args:
room_name (str): The name of the room to deactivate the lights in.
active (bool): True, to turn all lights on, or False to turn them off.
"""
self._hue.set_group(room_name, {"on": active})

View file

@ -0,0 +1,129 @@
import asyncio
from core import Entity, Color
from phue import Light
class HueLight(Entity):
MAX_HUE_RANGE: int = 65535
MAX_SAT_RANGE: int = 254
MAX_BRI_RANGE: int = 254
TRANSITION_TIME_SCALE: float = 0.1
def __init__(
self,
*,
hue_bridge: "HueBridge",
hue_light: Light,
id: str,
name: str,
room: str,
groups: list[str] = [],
) -> None:
super().__init__(
id=id, name=name, room=room, groups=groups, device_type="light"
)
self._bridge = hue_bridge
self._hue_light = hue_light
self._on = False
self._color: Color = Color()
self._transition_duration_sec = 0
asyncio.run(self.update())
async def __commit_to_device__(self):
self._hue_light.transitiontime = (
self._transition_duration_sec * HueLight.TRANSITION_TIME_SCALE
)
self._hue_light.on = self._on
self._hue_light.hue = self._color.hue * HueLight.MAX_HUE_RANGE
self._hue_light.saturation = self._color.saturation * HueLight.MAX_SAT_RANGE
self._hue_light.brightness = self._color.brightness * HueLight.MAX_BRI_RANGE
async def update(self):
def __internal_update__():
self._on = self._hue_light.on
# TODO: Update Color instead of overwriting to better track change
self._color = Color(
hue=self._hue_light.hue / HueLight.MAX_HUE_RANGE,
saturation=self._hue_light.saturation / HueLight.MAX_SAT_RANGE,
brightness=self._hue_light.brightness / HueLight.MAX_BRI_RANGE,
)
await asyncio.get_running_loop().run_in_executor(None, __internal_update__)
@property
def color(self) -> Color:
return self._color
@property
def on(self) -> bool:
return self._on
@property
def transition_time(self) -> float:
return self._transition_duration_sec
async def set_brightness(self, brightness: float):
if self._color.brightness == brightness:
return
self._color.brightness = brightness
if self._on:
await self.__commit_to_device__()
async def set_hue(self, hue: float):
if self._color.hue == hue:
return
self._color.hue = hue
if self._on:
await self.__commit_to_device__()
async def set_saturation(self, saturation: float):
if self._color.saturation == saturation:
return
self._color.saturation = saturation
if self._on:
await self.__commit_to_device__()
async def set_color(self, color: Color):
if self._color == color:
return
# TODO: Avoid overwriting to better track change
self._color = color
if self._on:
await self.__commit_to_device__()
async def set_transition_duration(self, seconds: float):
if seconds < 0:
raise ValueError(
f"Given transition duration in seconds must be greater 0. Instead [{str(seconds)}] was provided."
)
self._transition_duration_sec = seconds
async def turn_on(self):
if self._on:
return
self._on = True
await self.__commit_to_device__()
async def turn_off(self):
if not self._on:
return
self._on = False
await self.__commit_to_device__()

View file

@ -0,0 +1 @@
from .matrixclock_entity import MatrixClockEntity

View file

@ -0,0 +1,48 @@
from core import Entity
import requests as r
class MatrixClockEntity(Entity):
def __init__(self, *, id: str, name: str, room: str, ip: str, port: int) -> None:
super().__init__(id=id, name=name, room=room, device_type="matrixclock")
self._ip = ip
self._port = port
self._base_path = ""
def __post_request__(self, endpoint: str, content: dict = {}) -> r.Response:
url: str = f"http://{self._ip}:{self._port}{self._base_path}/{endpoint}"
return r.post(url=url, json=content)
def display_off(self):
self.__post_request__("off")
def display_time(self):
self.__post_request__("time")
def display_full(self):
self.__post_request__("full")
def display_pattern(self, *, pattern: str = "01", step_ms: int = 500):
self.__post_request__(f"pattern?pattern={pattern}&step_ms={step_ms}")
def get_climate_and_show_temperature(self) -> dict[str, float]:
return self.__post_request__("temperature").json()
def get_climate_and_show_humidity(self) -> dict[str, float]:
return self.__post_request__("humidity").json()
def flash_display(self, *, count: int = 1, contrast: int = None):
path = f"flash?count={count}"
if contrast:
path += "&contrast={contrast}"
self.__post_request__(path)
def set_contrast(self, *, contrast: int):
self.__post_request__(f"contrast?contrast={contrast}")
def get_contrast(self) -> int:
return self.__post_request__("contrast").json()["contrast"]
def show_message(self, *, message: str):
self.__post_request__("message", {"message": message})

View file

@ -0,0 +1,2 @@
from .zigbee2mqtt_bridge import Z2mBridge
from .contact_sensor_z2m import ContactSensorZ2M

View file

@ -0,0 +1,5 @@
from core import Entity
class ContactSensorZ2M(Entity):
pass

View file

@ -0,0 +1,90 @@
import logging
from typing import Optional
from core import Bridge
import paho.mqtt.client as mqtt
import json
class Z2mBridge(Bridge):
def __init__(
self,
*,
id: str,
ip: str,
port: int = 1883,
keepalive: int = 60,
topic: str = "zigbee2mqtt",
) -> None:
"""
Args:
id (str): Unique identifier of this bridge instance.
ip (str): IP-Address of MQTT broker.
port (int, optional): Port of MQTT broker. Defaults to 1883.
keepalive (int, optional): MQTT keepalive delay in seconds. Defaults to 60.
topic (str, optional): Base topic for Zigbee2MQTT interface. Defaults to "zigbee2mqtt".
"""
super().__init__(id=id, type="zigbee2mqtt")
self._ip = ip
self._port = port
self._keepalive = keepalive
self._device_callbacks: dict[str, list] = {}
self._topic = topic.strip("/")
self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self._client.on_connect = lambda client, userdata, flags, reason_code, properties: self.__on_connect__(
client, userdata, flags, reason_code, properties
)
self._client.on_message = lambda client, userdata, msg: self.__on_message__(
client, userdata, msg
)
def disconnect(self) -> None:
self._client.loop_stop()
self._client.disconnect()
logging.info(f"Disconnected from Zigbee2MQTT broker [{self.id}].")
def connect(self) -> None:
self._client.connect(self._ip, self._port, self._keepalive)
self._client.loop_start()
logging.info(f"Connect to Zigbee2MQTT broker [{self.id}].")
@property
def is_connected(self) -> bool | None:
return self._client.is_connected()
def __on_connect__(self, client, userdata, flags, reason_code, properties):
self._client.subscribe(f"{self._topic}/#")
def __on_message__(self, client, userdata, msg: any):
device_name = msg.topic.split(self._topic + "/", 2)[-1].split("/", 2)[0]
if device_name not in self._device_callbacks.keys():
return
for callback in self._device_callbacks[device_name]:
callback(device_name, json.loads(msg.payload))
@Bridge.requires_connection
def set_device(self, ieee_address: str, *, content: dict = {}) -> None:
self._client.publish(f"{self._topic}/{ieee_address}/set", json.dumps(content))
@Bridge.requires_connection
def get_device(self, ieee_address: str) -> None:
self._client.publish(
f"{self._topic}/{ieee_address}/get", json.dumps({"state": ""})
)
def subscribe_device(
self,
callback,
*,
ieee_address: Optional[str] = None,
friendly_name: Optional[str] = None,
) -> None:
for id in [ieee_address, friendly_name]:
if id not in self._device_callbacks.keys():
self._device_callbacks[id] = []
self._device_callbacks[id].append(callback)

5
src/core/__init__.py Normal file
View file

@ -0,0 +1,5 @@
from .bridge import Bridge, BridgeException
from .entity import Entity
from .group import Group
from .room import Room
from .color import Color

46
src/core/bridge.py Normal file
View file

@ -0,0 +1,46 @@
class BridgeException(Exception):
def __init__(self, id: str, type: str, message: str) -> None:
super().__init__(f"Bridge [{type} | {id}] has thrown an exception: {message}")
class Bridge:
def __init__(self, *, id: str, type: str) -> None:
self._id = id
self._type = type
@property
def id(self) -> str:
return self._id
@property
def type(self) -> str:
return self._type
@property
def is_connected(self) -> bool | None:
return None
def __del__(self) -> None:
self.disconnect()
def connect(self) -> None:
pass
def disconnect(self) -> None:
pass
def requires_connection(func, *, auto_connect=True):
def inner(self: Bridge, *args, **kwargs):
if self.is_connected is False: # Neither True, nor None
if not auto_connect:
raise BridgeException(
self.id,
self.type,
f"Bridge must be manually connected before executing method [{func.__name__}].",
)
self.connect()
return func(*args, **kwargs)
return inner

88
src/core/color.py Normal file
View file

@ -0,0 +1,88 @@
class Color:
def __init__(self, *, hue: float = 0, saturation: float = 0, brightness: float = 0):
self.hue = hue
self.saturation = saturation
self.brightness = brightness
@property
def hue(self) -> float:
return self._hue
@hue.setter
def hue(self, value: float):
if 0 <= value <= 1:
self._hue = value
else:
raise ValueError("Hue must be between 0 and 1")
@property
def saturation(self) -> float:
return self._saturation
@saturation.setter
def saturation(self, value: float):
if 0 <= value <= 1:
self._saturation = value
else:
raise ValueError("Saturation must be between 0 and 1")
@property
def brightness(self) -> float:
return self._brightness
@brightness.setter
def brightness(self, value: float):
if 0 <= value <= 1:
self._brightness = value
else:
raise ValueError("Brightness must be between 0 and 1")
def __eq__(self, other):
if isinstance(other, Color):
return (
self.hue == other.hue
and self.saturation == other.saturation
and self.brightness == other.brightness
)
raise ValueError(
"Can only compare Color instance to other Color instance. Instead, instance of another class was provided."
)
def __str__(self):
return self.__repr__()
def __repr__(self):
# Convert HSB to RGB
rgb = self.to_rgb()
# Convert RGB to HEX
return f"#{int(rgb[0] * 255):02X}{int(rgb[1] * 255):02X}{int(rgb[2] * 255):02X}"
def to_rgb(self) -> tuple:
"""Converts HSB to RGB as a tuple of floats in range [0, 1]."""
h, s, b = self.hue, self.saturation, self.brightness
if s == 0:
# Grayscale color (no saturation)
return b, b, b
h = h * 6 # Scale hue to [0, 6]
i = int(h) # Which color sector
f = h - i # Fractional part of hue
p = b * (1 - s)
q = b * (1 - s * f)
t = b * (1 - s * (1 - f))
i %= 6
if i == 0:
return b, t, p
elif i == 1:
return q, b, p
elif i == 2:
return p, b, t
elif i == 3:
return p, q, b
elif i == 4:
return t, p, b
elif i == 5:
return b, p, q

66
src/core/entity.py Normal file
View file

@ -0,0 +1,66 @@
from .color import Color
class EntityOpNotSupportedError(Exception):
def __init__(self, operation: str, *args):
super().__init__(f"Entity does not support '{operation}' operation.", *args)
class Entity:
def __init__(self, *, id: str, name: str, category: str) -> None:
self._id = id
self._name = name
self._category = category.strip().lower()
self._on: bool = False
self._color: Color = Color()
self._message_queue: list[str] = []
self._pattern
def __str__(self) -> str:
return f"{self.name} [{self.id}, {self.category}]"
async def update(self):
"""Implements an entity specific update operation to get the latest state."""
raise EntityOpNotSupportedError("update")
@property
def id(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@property
def category(self) -> str:
return self._category
@property
def update_period(self) -> float:
return self._update_period
@property
def color(self) -> Color:
raise EntityOpNotSupportedError("color")
@property
def brightness(self) -> float:
return self.color.brightness
@property
def hue(self) -> float:
return self.color.hue
@property
def saturation(self) -> float:
return self.color.saturation
@property
def on(self) -> bool:
raise EntityOpNotSupportedError("on")
@property
def transition_time(self) -> float:
raise EntityOpNotSupportedError("transition_time")

97
src/core/group.py Normal file
View file

@ -0,0 +1,97 @@
from fnmatch import fnmatch
from .entity import Entity, EntityOpNotSupportedError
class Group(Entity):
def __init__(
self,
*,
entities: list[Entity] = ...,
id: str = "group",
name: str = "Empty Group",
):
super().__init__(id=id, name=name, room=None, device_type="group")
self._entities: list[Entity] = entities
# List of method names to dynamically create
methods_to_create = [
"set_brightness",
"set_hue",
"set_saturation",
"set_color",
"set_transition_duration",
"turn_on",
"turn_off",
]
for method_name in methods_to_create:
setattr(self, method_name, self._create_group_method(method_name))
def __iter__(self):
return iter(self._entities)
def __len__(self):
return len(self._entities)
def __getitem__(self, id: str) -> "Group":
if type(id) is int:
raise "Numerical index not supported."
return self.__get_entities_with_specific_property__("id", id)
async def __call_method__(self, method_name: str, *args, **kwargs):
for entity in self._entities:
try:
func = getattr(entity, method_name)
await func(*args, **kwargs)
except EntityOpNotSupportedError:
pass
def _create_group_method(self, method_name: str):
# Create a method that calls __call_method__ for the given method name
async def group_method(*args, **kwargs):
await self.__call_method__(method_name, *args, **kwargs)
return group_method
def __get_entities_with_specific_property__(
self,
property: str,
target_pattern: str,
) -> list[Entity]:
"""Returns all entities for which the property getter matches the desired pattern.
Args:
property_getter (callable[[Entity], str]): Takes one entity and returns the value of the filtered property.
target_pattern (str): Pattern that is matched against.
Returns:
list[Entity]: A new group of entities or an empty group, if no entity property matches the target pattern.
"""
return Group(
entities=[
e
for e in self._entities
if fnmatch(
getattr(e, property),
target_pattern,
)
],
name=f"{property}={target_pattern}",
)
def with_id(self, id_pattern: str) -> "Group":
return self.__get_entities_with_specific_property__("id", id_pattern)
def with_name(self, name_pattern: str) -> "Group":
return self.__get_entities_with_specific_property__("name", name_pattern)
def in_room(self, room_pattern: str) -> "Group":
return self.__get_entities_with_specific_property__("room", room_pattern)
def of_device_type(self, type_pattern: str) -> "Group":
return self.__get_entities_with_specific_property__("device_type", type_pattern)
def in_groups(self, groups_pattern: str) -> "Group":
return self.__get_entities_with_specific_property__("groups", groups_pattern)

42
src/core/pattern.py Normal file
View file

@ -0,0 +1,42 @@
class Pattern:
def __init__(
self, *, sequence: str = "01", active_ms: int = 1000, inactive_ms: int = 1000
):
self._sequence = None
self._active_ms = None
self._inactive_ms = None
self.sequence = sequence # Use the setter for validation
self.active_ms = active_ms # Use the setter for validation
self.inactive_ms = inactive_ms # Use the setter for validation
@property
def sequence(self) -> str:
return self._sequence
@sequence.setter
def sequence(self, value: str):
if not isinstance(value, str):
raise ValueError("Sequence must be a string.")
if not all(char in "01" for char in value):
raise ValueError("Sequence must only contain '0' and '1'.")
self._sequence = value
@property
def active_ms(self) -> int:
return self._active_ms
@active_ms.setter
def active_ms(self, value: int):
if not isinstance(value, int) or value < 0:
raise ValueError("Active milliseconds must be a non-negative integer.")
self._active_ms = value
@property
def inactive_ms(self) -> int:
return self._inactive_ms
@inactive_ms.setter
def inactive_ms(self, value: int):
if not isinstance(value, int) or value < 0:
raise ValueError("Inactive milliseconds must be a non-negative integer.")
self._inactive_ms = value

56
src/endpoints/bedscale.py Normal file
View file

@ -0,0 +1,56 @@
import asyncio
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi import APIRouter
import os
import csv
from bridges.bedscale import BedscaleEntity
router = APIRouter()
bedscale = BedscaleEntity(
ip_address="http://192.168.178.110:80",
id="bedscale",
name="Bettwaage",
room="Max Zimmer",
)
async def bedscale_service():
while True:
r = await bedscale.update()
await asyncio.sleep(bedscale.update_period)
@router.get("/latest")
async def get_latest():
if len(bedscale.get_history()) == 0:
return HTMLResponse(status_code=200, content="No data given yet")
return bedscale.get_history()[-1]
# @router.get("/history")
# async def get_history(count: int = None) -> list[dict]:
# points = []
# with open(file_path, "r", encoding="UTF-8") as fp:
# reader = csv.DictReader(fp, delimiter=";")
# for row in reader:
# if not row:
# continue
# points.append(
# {
# "timestamp": row["timestamp"],
# "total": float(row["total"]),
# "tl": float(row["tl"]),
# "tr": float(row["tr"]),
# "bl": float(row["bl"]),
# "br": float(row["br"]),
# }
# )
# if count:
# return points[-count]
# else:
# return points

View file

@ -1,58 +0,0 @@
import asyncio
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi import APIRouter
import os
import csv
from .handlers.bett import file_path, local_history, log_bed_weights
router = APIRouter()
asyncio.create_task(log_bed_weights())
@router.get("/file", tags=["file"])
async def get_file():
with open(file_path, "r", encoding="UTF-8") as fp:
return HTMLResponse("\n".join(fp.readlines()))
@router.get("/history")
async def get_history(count: int = None) -> list[dict]:
points = []
with open(file_path, "r", encoding="UTF-8") as fp:
reader = csv.DictReader(fp, delimiter=";")
for row in reader:
if not row:
continue
points.append(
{
"timestamp": row["timestamp"],
"total": float(row["total"]),
"tl": float(row["tl"]),
"tr": float(row["tr"]),
"bl": float(row["bl"]),
"br": float(row["br"]),
}
)
if count:
return points[-count]
else:
return points
@router.get("/latest")
async def get_latest():
if len(local_history) == 0:
return HTMLResponse(status_code=200, content="No data given yet")
return JSONResponse(local_history[-1])
@router.delete("/delete", tags=["file"])
async def delete_file():
os.remove(file_path)
return "Deleted file"

54
src/endpoints/fritzbox.py Normal file
View file

@ -0,0 +1,54 @@
import asyncio
import logging
from datetime import datetime
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
from bridges.fritzbox import FritzBoxBridge
router = APIRouter()
fritzbox = FritzBoxBridge(id="fritzbox", ip="192.168.178.1")
refresh_every_seconds = 10
macaddresses_to_track = ["B2:06:77:EE:A9:0F"] # Max' iPhone
devices_last_online: dict[str, datetime] = {}
async def track_network_devices():
global devices_last_online
# Initial values to avoid None
for macaddress in macaddresses_to_track:
devices_last_online[macaddress] = datetime(1970, 1, 1, 0, 0, 0)
while True:
try:
for macaddress in macaddresses_to_track:
device = fritzbox.get_device_state(macaddress)
if device.active:
devices_last_online[macaddress] = datetime.now()
except Exception as ex:
logging.exception(ex)
finally:
await asyncio.sleep(refresh_every_seconds)
@router.get("/{mac_address}/state")
async def get_latest(mac_address: str):
if mac_address not in devices_last_online.keys():
return HTMLResponse(status_code=200, content="Mac Address not being tracked.")
last_online_delta = datetime.now() - last_online_delta[mac_address]
return {
"active": last_online_delta < refresh_every_seconds * 2,
"last_active": last_online_delta[mac_address],
}
@router.get("/tracked")
async def get_latest():
return list(devices_last_online.keys())

View file

@ -1,86 +0,0 @@
from time import sleep
from phue import Bridge
from pathlib import Path
class HueAdapter:
"""Handler for Hue API calls."""
registered_ips_file = "hue_bridge_registered.txt"
def __init__(self, bridge_ip: str):
"""Initialize the HueHandler."""
self.bridge = None
self.connect(bridge_ip)
def connect(self, bridge_ip: str):
if bridge_ip in self.get_registered_ips():
self.bridge = Bridge(bridge_ip)
self.bridge.connect()
return
# Connect loop
while True:
try:
self.bridge = Bridge(bridge_ip)
self.bridge.connect()
break
except Exception as e:
print(f"Failed to connect to bridge: {bridge_ip}")
print(e)
print("Trying again in 5 seconds..")
sleep(5)
self.register_bridge(bridge_ip)
def get_registered_ips(self) -> list:
"""Get a list of registered bridge IPs."""
if not Path(HueAdapter.registered_ips_file).is_file():
return []
with open(HueAdapter.registered_ips_file, "r") as f:
return [ad.strip() for ad in f.readlines()]
def register_bridge(self, bridge_ip: str):
"""Register a bridge IP."""
with open(HueAdapter.registered_ips_file, "a") as f:
f.write(bridge_ip + "\n")
def list_scenes(self) -> dict:
return self.bridge.get_scene()
def get_scene_by_name(self, name):
for key, scene in self.list_scenes().items():
if scene["name"] == name:
scene["id"] = key
return scene
return None
def in_room_activate_scene(self, room_name: str, scene_name: str):
"""Activate a scene in a room.
Args:
scene (str): The name of the scene to activate.
room (str): The name of the room to activate the scene in.
"""
scene_id = self.get_scene_by_name(scene_name)["id"]
if scene_id is None:
raise "Scene not found."
self.bridge.set_group(room_name, {"scene": scene_id})
def in_room_deactivate_lights(self, room_name: str):
"""Deactivate all lights in a room.
Args:
room_name (str): The name of the room to deactivate the lights in.
"""
self.bridge.set_group(room_name, {"on": False})
def in_room_activate_lights(self, room_name: str):
"""Activate all lights in a room.
Args:
room_name (str): The name of the room to activate the lights in.
"""
self.bridge.set_group(room_name, {"on": True})

View file

@ -1,60 +1,60 @@
import asyncio
from fastapi import FastAPI, APIRouter
from hue.hue_adapter import HueAdapter
from ..mash.feature import Feature
from bridges.hue import HueBridge, HueLight
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
from core import Group
router = APIRouter(tags=["hue"])
hue = HueAdapter("192.168.178.85")
hue_bridge = HueBridge(ip_address="192.168.178.85", id="hue-bridge")
hue_lights: Group = hue_bridge.get_all_lights()
########## Integration ##########
update_period_seconds = 5
class HueIntegration(Feature):
def __init__(self) -> None:
super().__init__("hue")
def add_routes(self, server: FastAPI) -> None:
server.include_router(router, prefix="/hue")
async def hue_service():
while True:
try:
await hue_lights.update()
await asyncio.sleep(update_period_seconds)
except:
pass
########## Routes ##########
@router.get("/scenes", tags=["scene"])
@router.get("/scenes")
async def get_scenes():
return hue.list_scenes()
return hue_bridge.list_scenes()
@router.post(
"/room/{room_name}/scene/{scene_name}",
tags=["room", "scene"],
)
@router.get("/lights")
async def get_scenes():
return {
l.id: {"name": l.name, "room": l.room, "color": str(l.color), "on": l.on}
for l in hue_lights
}
@router.post("/room/{room_name}/scene/{scene_name}")
async def activate_scene(room_name: str, scene_name: str):
try:
hue.in_room_activate_scene(room_name, scene_name)
hue_bridge.in_room_activate_scene(room_name, scene_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))
@router.post(
"/room/{room_name}/off",
tags=["room"],
)
@router.post("/room/{room_name}/off")
async def deactivate_room(room_name: str):
try:
hue.in_room_deactivate_lights(room_name)
await hue_lights.in_room(room_name).turn_off()
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))
@router.post(
"/room/{room_name}/on",
tags=["room"],
)
@router.post("/room/{room_name}/on")
async def activate_room(room_name: str):
try:
hue.in_room_activate_lights(room_name)
await hue_lights.in_room(room_name).turn_on()
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))

View file

@ -1,86 +0,0 @@
from time import sleep
from phue import Bridge
from pathlib import Path
class HueAdapter:
"""Handler for Hue API calls."""
registered_ips_file = "hue_bridge_registered.txt"
def __init__(self, bridge_ip: str):
"""Initialize the HueHandler."""
self.bridge = None
self.connect(bridge_ip)
def connect(self, bridge_ip: str):
if bridge_ip in self.get_registered_ips():
self.bridge = Bridge(bridge_ip)
self.bridge.connect()
return
# Connect loop
while True:
try:
self.bridge = Bridge(bridge_ip)
self.bridge.connect()
break
except Exception as e:
print(f"Failed to connect to bridge: {bridge_ip}")
print(e)
print("Trying again in 5 seconds..")
sleep(5)
self.register_bridge(bridge_ip)
def get_registered_ips(self) -> list:
"""Get a list of registered bridge IPs."""
if not Path(HueAdapter.registered_ips_file).is_file():
return []
with open(HueAdapter.registered_ips_file, "r") as f:
return [ad.strip() for ad in f.readlines()]
def register_bridge(self, bridge_ip: str):
"""Register a bridge IP."""
with open(HueAdapter.registered_ips_file, "a") as f:
f.write(bridge_ip + "\n")
def list_scenes(self) -> dict:
return self.bridge.get_scene()
def get_scene_by_name(self, name):
for key, scene in self.list_scenes().items():
if scene["name"] == name:
scene["id"] = key
return scene
return None
def in_room_activate_scene(self, room_name: str, scene_name: str):
"""Activate a scene in a room.
Args:
scene (str): The name of the scene to activate.
room (str): The name of the room to activate the scene in.
"""
scene_id = self.get_scene_by_name(scene_name)["id"]
if scene_id is None:
raise "Scene not found."
self.bridge.set_group(room_name, {"scene": scene_id})
def in_room_deactivate_lights(self, room_name: str):
"""Deactivate all lights in a room.
Args:
room_name (str): The name of the room to deactivate the lights in.
"""
self.bridge.set_group(room_name, {"on": False})
def in_room_activate_lights(self, room_name: str):
"""Activate all lights in a room.
Args:
room_name (str): The name of the room to activate the lights in.
"""
self.bridge.set_group(room_name, {"on": True})

View file

@ -1,60 +0,0 @@
from fastapi import FastAPI, APIRouter
from hue.hue_adapter import HueAdapter
from ..mash.feature import Feature
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
router = APIRouter(tags=["hue"])
hue = HueAdapter("192.168.178.85")
########## Integration ##########
class HueIntegration(Feature):
def __init__(self) -> None:
super().__init__("hue")
def add_routes(self, server: FastAPI) -> None:
server.include_router(router, prefix="/hue")
########## Routes ##########
@router.get("/scenes", tags=["scene"])
async def get_scenes():
return hue.list_scenes()
@router.post(
"/room/{room_name}/scene/{scene_name}",
tags=["room", "scene"],
)
async def activate_scene(room_name: str, scene_name: str):
try:
hue.in_room_activate_scene(room_name, scene_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))
@router.post(
"/room/{room_name}/off",
tags=["room"],
)
async def deactivate_room(room_name: str):
try:
hue.in_room_deactivate_lights(room_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))
@router.post(
"/room/{room_name}/on",
tags=["room"],
)
async def activate_room(room_name: str):
try:
hue.in_room_activate_lights(room_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))

1333
src/hue_api.json Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,14 +1,47 @@
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from endpoints.hue import router as hue_router
from endpoints.bettwaage import router as bettwaage_router
from endpoints.handlers.fritz import track_network_devices
import uvicorn
from endpoints.hue import hue_service, router as hue_router
from endpoints.bedscale import bedscale_service, router as bettwaage_router
from endpoints.fritzbox import track_network_devices, router as fritzbox_router
app = FastAPI()
asyncio.create_task(track_network_devices())
# Background task references
background_tasks = []
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start background services."""
# fritz_task = asyncio.create_task(track_network_devices(), name="Fritz!Box Tracker")
bedscale_task = asyncio.create_task(bedscale_service(), name="Polling bed-scale")
# hue_task = asyncio.create_task(hue_service(), name="Polling Hue Bridge")
# TODO: Fix background task execution. ^ these calls are blocking
# Store references to the tasks
# background_tasks.extend([fritz_task, bedscale_task, hue_task])
background_tasks.extend([bedscale_task])
yield
"""Stop background services."""
for task in background_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass # Expected when cancelling tasks
app = FastAPI(lifespan=lifespan)
# API Routes
app.include_router(hue_router, prefix="/hue", tags=["hue"])
app.include_router(bettwaage_router, prefix="/bettwaage", tags=["bett"])
app.include_router(bettwaage_router, prefix="/bettwaage", tags=["bed"])
app.include_router(fritzbox_router, prefix="/fritzbox", tags=["fritzbox"])
if __name__ == "__main__":
app.run()
# Run API server
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

View file

@ -1,32 +0,0 @@
class Entity:
def __init__(
self, *, id: str, name: str, room: str, device_type: str, groups: list[str] = []
) -> None:
self._id = id
self._name = name
self._room = room
self._device_type = device_type
self._groups = set(groups)
@property
def id(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@property
def room(self) -> str:
return self._room
@property
def device_type(self) -> str:
return self._device_type
@property
def groups(self) -> set[str]:
return self._groups
def __str__(self) -> str:
return f"{self.name} [{self.id}, type {self.device_type}, room {self.room}, in {len(self.groups)} groups]"

View file

@ -1,66 +0,0 @@
from mash.entities.entity import Entity
from fnmatch import fnmatch
class Group:
def __init__(self, *, entities: list[Entity] = []) -> None:
self.entities: list[Entity] = entities
def __len__(self):
return len(self.entities)
def __getitem__(self, id: str) -> "Group":
if type(id) is int:
raise "Numerical index not supported."
return self.id(id)
def __get_entities_with_specific_property__(
entities: list[Entity],
property_getter: callable[[Entity], str],
target_pattern: str,
) -> list[Entity]:
"""Returns all entities for which the property getter matches the desired pattern.
Args:
entities (list[Entity]): List of entities.
property_getter (callable[[Entity], str]): Takes one entity and returns the value of the filtered property.
target_pattern (str): Pattern that is matched against.
Returns:
list[Entity]: A new group of entities or an empty group, if no entity property matches the target pattern.
"""
return Group(
entities=[
e for e in entities if fnmatch(property_getter(e), target_pattern)
]
)
def device_type(self, device_type: str) -> "Group":
return Group.__get_entities_with_specific_property__(
self.entities, lambda e: e.device_type, device_type
)
def id(self, id: str) -> "Group":
return Group.__get_entities_with_specific_property__(
self.entities, lambda e: e.id, id
)
def room(self, room: str) -> "Group":
return Group.__get_entities_with_specific_property__(
self.entities, lambda e: e.room, room
)
def name(self, name: str) -> "Group":
return Group.__get_entities_with_specific_property__(
self.entities, lambda e: e.name, name
)
def lights(self) -> "Group":
return self.device_type("light")
def beds(self) -> "Group":
return self.device_type("bed")
def max(self) -> "Group":
return self.room("max")

View file

@ -1,9 +0,0 @@
from fastapi import FastAPI
class Feature:
def __init__(self, feature_id: str) -> None:
self.integration_id = feature_id
def add_routes(self, server: FastAPI) -> None:
pass

View file

@ -1,7 +0,0 @@
from mash.entities.entity import Entity
from mash.entities.group import Group
class Home(Group):
def __init__(self, *, entities: list[Entity] = []) -> None:
super().__init__(entities=entities)

View file

@ -1,26 +0,0 @@
import yaml
from fastapi import FastAPI
from mash.feature import Feature
class MaSH:
def __init__(self, config_path: str) -> None:
self.server: FastAPI = FastAPI()
self.config: dict = None
self._load_config_(config_path)
def _load_config_(self, config_path: str) -> None:
try:
with open(config_path, "r", encoding="UTF-8") as fp:
self.config = yaml.safe_load(fp)
except FileNotFoundError:
raise f"Config file for MaSH server could not be opened at [{config_path}]."
def add_integration(self, feature: Feature) -> None:
feature.add_routes(self.server)
self.server.include_router(feature.get_router())
def run(self):
self.server.run()

View file

@ -1,13 +0,0 @@
import asyncio
import requests as r
class MatrixClockAdapter:
def __init__(self, ip_address: str) -> None:
self.ip_address = ip_address.strip("/")
if not self.ip_address.startswith("http"):
self.ip_address = f"http://{self.ip_address}"
async def turn_off(self):
await asyncio.run(r.post(f"{self.ip_address}/off"))

View file

@ -1,6 +0,0 @@
from mash.feature import Feature
class MatrixClockIntegration(Feature):
def __init__(self) -> None:
super().__init__("matrixclock")

View file

@ -35,50 +35,36 @@ class Automation:
def decorator(func):
return func
return decorator
class PeopleCountEngineV1(Automation):
@Automation.trigger(
devices=["matrixclock"],
rule=lambda h: h.device("matrixclock").contrast == 6
devices=["matrixclock"], rule=lambda h: h.device("matrixclock").contrast == 6
)
def turn_light_on_sometimes(self, home: Home):
home.room("max").lights().on = True
@Automation.trigger(
people=["max"],
rule=lambda h: h.person("max").athome()
)
@Automation.trigger(people=["max"], rule=lambda h: h.person("max").athome())
def turn_light_on_sometimes(self, home: Home):
home.room("max").lights().on = h.person("max").athome()
@Automation.state(h.room("Max").lights())
def max_room_light():
if max.ishome():
return "off"
scene = "Daylight scene"
if nighttime:
scene = "nighttime"
if max.working:
scene.dim(0.5)
return scene
return scene
from mash.mash import MaSH