Compare commits
28 commits
bettwaage-
...
master
Author | SHA1 | Date | |
---|---|---|---|
7164d6e2fd | |||
1d46b570f6 | |||
5e8ad4f400 | |||
668e8e78b1 | |||
c3ec52005e | |||
c78546ffcf | |||
db5e826aea | |||
7b9ab32db1 | |||
e131f58849 | |||
08c4372c4e | |||
5defaba45f | |||
9aba0279d7 | |||
04e44849ee | |||
8a2773a97c | |||
f5b4a6c30f | |||
142439b7c3 | |||
f9fdffbdbd | |||
32853a0e7c | |||
51e446710b | |||
003284ccba | |||
0beaab9549 | |||
dac9d81777 | |||
72f206c8cd | |||
d90463069a | |||
7fdbdb161e | |||
df4957d618 | |||
e668fc1eaa | |||
62709b6698 |
36 changed files with 2933 additions and 201 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -162,4 +162,5 @@ cython_debug/
|
|||
.vscode/launch.json
|
||||
|
||||
# Custom
|
||||
hue_bridge_registered.txt
|
||||
history.json
|
||||
|
|
15
README.md
15
README.md
|
@ -1,10 +1,10 @@
|
|||
# Max's Smart Home - MaSH
|
||||
# Max' Smart Home - MaSH
|
||||
|
||||
Should be a very simple **server** implementation of what is required in Max's smart home. Trying not to overcomplicate things and thereby ruin motivation to work on this.
|
||||
Should be a (very simple?) **server** implementation of what is required in Max's smart home. Trying not to overcomplicate things and thereby ruin motivation to work on this.
|
||||
|
||||
## Sensors
|
||||
|
||||
- [ToF People Counter](https://github.com/mgfcf/mash-sensor-tof-pc)
|
||||
- [ToF People Counter](https://github.com/mgfcf/mash-sensor-tof-pc)
|
||||
|
||||
## ToDo
|
||||
|
||||
|
@ -12,3 +12,12 @@ Should be a very simple **server** implementation of what is required in Max's s
|
|||
- Daylight Adjustment (E.g. No ceiling lights during daytime)
|
||||
- Save scene when turning off, to reapply same scene when turning on
|
||||
- Detect fast flickering of light state, indicating an issue, and disable the system for a few minutes
|
||||
|
||||
## Structure
|
||||
|
||||
- `src/` - All code
|
||||
- `main.py` - Entry point for execution
|
||||
- `new_syntax_example.py` - Not in use, just noting some ideas about a potential syntax
|
||||
- `core/` - Contains more abstract framework definitions
|
||||
- `bridges/` - Contains latest code that manages connections to other services/devices
|
||||
- `endpoints/` - Contains API routes and older handlers/bridges
|
||||
|
|
36
example.conf.yaml
Normal file
36
example.conf.yaml
Normal file
|
@ -0,0 +1,36 @@
|
|||
features:
|
||||
hue:
|
||||
hue-bridge:
|
||||
ip: 192.168.178.23
|
||||
matrixclock:
|
||||
clock:
|
||||
ip: 192.168.178.23
|
||||
|
||||
home:
|
||||
latitude: 52.51860
|
||||
longitude: 13.37565
|
||||
|
||||
beds:
|
||||
- id: max-bed
|
||||
name: Bettwaage
|
||||
room: *max
|
||||
|
||||
rooms:
|
||||
- id : &hw hallway
|
||||
name: Flur
|
||||
doors:
|
||||
- to: ~
|
||||
- to: *bath
|
||||
- to: *kit
|
||||
- to: *living
|
||||
- id : &bath bath
|
||||
name: Badezimmer
|
||||
- id : &kit kitchen
|
||||
name: Küche
|
||||
- id : &living living
|
||||
name: Wohnzimmer
|
||||
doors:
|
||||
- to: *max
|
||||
- id : &max max
|
||||
name: Max' Zimmer
|
||||
|
350
poetry.lock
generated
Normal file
350
poetry.lock
generated
Normal file
|
@ -0,0 +1,350 @@
|
|||
[[package]]
|
||||
name = "annotated-types"
|
||||
version = "0.7.0"
|
||||
description = "Reusable constraint types to use with typing.Annotated"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[[package]]
|
||||
name = "anyio"
|
||||
version = "4.8.0"
|
||||
description = "High level compatibility layer for multiple asynchronous event loop implementations"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
|
||||
[package.dependencies]
|
||||
exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""}
|
||||
idna = ">=2.8"
|
||||
sniffio = ">=1.1"
|
||||
typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""}
|
||||
|
||||
[package.extras]
|
||||
trio = ["trio (>=0.26.1)"]
|
||||
test = ["anyio", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"]
|
||||
doc = ["packaging", "Sphinx (>=7.4,<8.0)", "sphinx-rtd-theme", "sphinx-autodoc-typehints (>=1.2.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "certifi"
|
||||
version = "2024.12.14"
|
||||
description = "Python package for providing Mozilla's CA Bundle."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
|
||||
[[package]]
|
||||
name = "charset-normalizer"
|
||||
version = "3.4.1"
|
||||
description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[[package]]
|
||||
name = "click"
|
||||
version = "8.1.8"
|
||||
description = "Composable command line interface toolkit"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[package.dependencies]
|
||||
colorama = {version = "*", markers = "platform_system == \"Windows\""}
|
||||
|
||||
[[package]]
|
||||
name = "colorama"
|
||||
version = "0.4.6"
|
||||
description = "Cross-platform colored terminal text."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
|
||||
|
||||
[[package]]
|
||||
name = "exceptiongroup"
|
||||
version = "1.2.2"
|
||||
description = "Backport of PEP 654 (exception groups)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[package.extras]
|
||||
test = ["pytest (>=6)"]
|
||||
|
||||
[[package]]
|
||||
name = "fastapi"
|
||||
version = "0.115.6"
|
||||
description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[package.dependencies]
|
||||
pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0"
|
||||
starlette = ">=0.40.0,<0.42.0"
|
||||
typing-extensions = ">=4.8.0"
|
||||
|
||||
[package.extras]
|
||||
standard = ["fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "email-validator (>=2.0.0)", "uvicorn[standard] (>=0.12.0)"]
|
||||
all = ["fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "itsdangerous (>=1.1.0)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "orjson (>=3.2.1)", "email-validator (>=2.0.0)", "uvicorn[standard] (>=0.12.0)", "pydantic-settings (>=2.0.0)", "pydantic-extra-types (>=2.0.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "fritzconnection"
|
||||
version = "1.14.0"
|
||||
description = "Communicate with the AVM FRITZ!Box"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[package.dependencies]
|
||||
requests = ">=2.22.0"
|
||||
|
||||
[package.extras]
|
||||
qr = ["segno (>=1.4.1)"]
|
||||
|
||||
[[package]]
|
||||
name = "h11"
|
||||
version = "0.14.0"
|
||||
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[[package]]
|
||||
name = "httptools"
|
||||
version = "0.6.4"
|
||||
description = "A collection of framework independent HTTP protocol utils."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8.0"
|
||||
|
||||
[package.extras]
|
||||
test = ["Cython (>=0.29.24)"]
|
||||
|
||||
[[package]]
|
||||
name = "idna"
|
||||
version = "3.10"
|
||||
description = "Internationalized Domain Names in Applications (IDNA)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.6"
|
||||
|
||||
[package.extras]
|
||||
all = ["ruff (>=0.6.2)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "flake8 (>=7.1.1)"]
|
||||
|
||||
[[package]]
|
||||
name = "paho-mqtt"
|
||||
version = "2.1.0"
|
||||
description = "MQTT version 5.0/3.1.1 client class"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[package.extras]
|
||||
proxy = ["pysocks"]
|
||||
|
||||
[[package]]
|
||||
name = "phue"
|
||||
version = "1.1"
|
||||
description = "A Philips Hue Python library"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[[package]]
|
||||
name = "pydantic"
|
||||
version = "2.10.5"
|
||||
description = "Data validation using Python type hints"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[package.dependencies]
|
||||
annotated-types = ">=0.6.0"
|
||||
pydantic-core = "2.27.2"
|
||||
typing-extensions = ">=4.12.2"
|
||||
|
||||
[package.extras]
|
||||
email = ["email-validator (>=2.0.0)"]
|
||||
timezone = ["tzdata"]
|
||||
|
||||
[[package]]
|
||||
name = "pydantic-core"
|
||||
version = "2.27.2"
|
||||
description = "Core functionality for Pydantic validation and serialization"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[package.dependencies]
|
||||
typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0"
|
||||
|
||||
[[package]]
|
||||
name = "python-dotenv"
|
||||
version = "1.0.1"
|
||||
description = "Read key-value pairs from a .env file and set them as environment variables"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[package.extras]
|
||||
cli = ["click (>=5.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "pyyaml"
|
||||
version = "6.0.2"
|
||||
description = "YAML parser and emitter for Python"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[[package]]
|
||||
name = "requests"
|
||||
version = "2.32.3"
|
||||
description = "Python HTTP for Humans."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[package.dependencies]
|
||||
certifi = ">=2017.4.17"
|
||||
charset-normalizer = ">=2,<4"
|
||||
idna = ">=2.5,<4"
|
||||
urllib3 = ">=1.21.1,<3"
|
||||
|
||||
[package.extras]
|
||||
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
|
||||
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
|
||||
|
||||
[[package]]
|
||||
name = "sniffio"
|
||||
version = "1.3.1"
|
||||
description = "Sniff out which async library your code is running under"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[[package]]
|
||||
name = "starlette"
|
||||
version = "0.41.3"
|
||||
description = "The little ASGI library that shines."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[package.dependencies]
|
||||
anyio = ">=3.4.0,<5"
|
||||
|
||||
[package.extras]
|
||||
full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.7)", "pyyaml"]
|
||||
|
||||
[[package]]
|
||||
name = "typing-extensions"
|
||||
version = "4.12.2"
|
||||
description = "Backported and Experimental Type Hints for Python 3.8+"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "2.3.0"
|
||||
description = "HTTP library with thread-safe connection pooling, file post, and more."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
|
||||
[package.extras]
|
||||
brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"]
|
||||
h2 = ["h2 (>=4,<5)"]
|
||||
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
|
||||
zstd = ["zstandard (>=0.18.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "uvicorn"
|
||||
version = "0.34.0"
|
||||
description = "The lightning-fast ASGI server."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
|
||||
[package.dependencies]
|
||||
click = ">=7.0"
|
||||
colorama = {version = ">=0.4", optional = true, markers = "sys_platform == \"win32\" and extra == \"standard\""}
|
||||
h11 = ">=0.8"
|
||||
httptools = {version = ">=0.6.3", optional = true, markers = "extra == \"standard\""}
|
||||
python-dotenv = {version = ">=0.13", optional = true, markers = "extra == \"standard\""}
|
||||
pyyaml = {version = ">=5.1", optional = true, markers = "extra == \"standard\""}
|
||||
typing-extensions = {version = ">=4.0", markers = "python_version < \"3.11\""}
|
||||
uvloop = {version = ">=0.14.0,<0.15.0 || >0.15.0,<0.15.1 || >0.15.1", optional = true, markers = "sys_platform != \"win32\" and sys_platform != \"cygwin\" and platform_python_implementation != \"PyPy\" and extra == \"standard\""}
|
||||
watchfiles = {version = ">=0.13", optional = true, markers = "extra == \"standard\""}
|
||||
websockets = {version = ">=10.4", optional = true, markers = "extra == \"standard\""}
|
||||
|
||||
[package.extras]
|
||||
standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"]
|
||||
|
||||
[[package]]
|
||||
name = "uvloop"
|
||||
version = "0.21.0"
|
||||
description = "Fast implementation of asyncio event loop on top of libuv"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.8.0"
|
||||
|
||||
[package.extras]
|
||||
dev = ["setuptools (>=60)", "Cython (>=3.0,<4.0)"]
|
||||
docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)"]
|
||||
test = ["aiohttp (>=3.10.5)", "flake8 (>=5.0,<6.0)", "psutil", "pycodestyle (>=2.9.0,<2.10.0)", "pyOpenSSL (>=23.0.0,<23.1.0)", "mypy (>=0.800)"]
|
||||
|
||||
[[package]]
|
||||
name = "watchfiles"
|
||||
version = "1.0.3"
|
||||
description = "Simple, modern and high performance file watching and code reload in python."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
|
||||
[package.dependencies]
|
||||
anyio = ">=3.0.0"
|
||||
|
||||
[[package]]
|
||||
name = "websockets"
|
||||
version = "14.1"
|
||||
description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
|
||||
[metadata]
|
||||
lock-version = "1.1"
|
||||
python-versions = "^3.10"
|
||||
content-hash = "470a473cadc51beaadf9c37d81c5ddbde315022964b11d596949a69d3905c583"
|
||||
|
||||
[metadata.files]
|
||||
annotated-types = []
|
||||
anyio = []
|
||||
certifi = []
|
||||
charset-normalizer = []
|
||||
click = []
|
||||
colorama = []
|
||||
exceptiongroup = []
|
||||
fastapi = []
|
||||
fritzconnection = []
|
||||
h11 = []
|
||||
httptools = []
|
||||
idna = []
|
||||
paho-mqtt = []
|
||||
phue = []
|
||||
pydantic = []
|
||||
pydantic-core = []
|
||||
python-dotenv = []
|
||||
pyyaml = []
|
||||
requests = []
|
||||
sniffio = []
|
||||
starlette = []
|
||||
typing-extensions = []
|
||||
urllib3 = []
|
||||
uvicorn = []
|
||||
uvloop = []
|
||||
watchfiles = []
|
||||
websockets = []
|
24
pyproject.toml
Normal file
24
pyproject.toml
Normal file
|
@ -0,0 +1,24 @@
|
|||
[tool.poetry]
|
||||
name = "mash-server"
|
||||
version = "0.1.0"
|
||||
description = "Max' Smart Home"
|
||||
authors = ["Max <m.giller.dev@gmail.com>"]
|
||||
package-mode = false
|
||||
|
||||
[tool.poetry.scripts]
|
||||
start = "main:app"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.10"
|
||||
phue = "^1.1"
|
||||
fritzconnection = "^1.14.0"
|
||||
fastapi = "^0.115.6"
|
||||
requests = "^2.32.3"
|
||||
paho-mqtt = "^2.1.0"
|
||||
uvicorn = {extras = ["standard"], version = "^0.34.0"}
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
build-backend = "poetry.core.masonry.api"
|
|
@ -1,10 +0,0 @@
|
|||
# For Philips Hue
|
||||
phue
|
||||
|
||||
# For Fritz.Box API
|
||||
fritzconnection
|
||||
|
||||
# API
|
||||
requests
|
||||
fastapi
|
||||
uvicorn[standard]
|
1
src/bridges/bedscale/__init__.py
Normal file
1
src/bridges/bedscale/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
from .bedscale_entity import BedscaleEntity
|
67
src/bridges/bedscale/bedscale_entity.py
Normal file
67
src/bridges/bedscale/bedscale_entity.py
Normal file
|
@ -0,0 +1,67 @@
|
|||
import requests
|
||||
import asyncio
|
||||
from core.entity import Entity
|
||||
|
||||
|
||||
class BedscaleWeightResult:
|
||||
def __init__(self, *, tr: float, tl: float, br: float, bl: float):
|
||||
self.top_right = tr
|
||||
self.top_left = tl
|
||||
self.bottom_right = br
|
||||
self.bottom_left = bl
|
||||
|
||||
# Calculate total if all values available
|
||||
self.total: float | None = None
|
||||
values = [bl, br, tl, tr]
|
||||
if None not in values:
|
||||
self.total = sum(values)
|
||||
|
||||
|
||||
class BedscaleEntity(Entity):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
ip_address: str,
|
||||
id: str,
|
||||
name: str,
|
||||
room: str,
|
||||
history_length: int = 60 * 5,
|
||||
):
|
||||
super().__init__(id=id, name=name, room=room, device_type="bedscale")
|
||||
self._ip_address = ip_address
|
||||
self._history: list[BedscaleWeightResult] = []
|
||||
self._history_length = history_length
|
||||
self.latest_weight: BedscaleWeightResult = None
|
||||
|
||||
async def update(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
tr_request = loop.run_in_executor(None, self.__poll_scale__, "tr")
|
||||
tl_request = loop.run_in_executor(None, self.__poll_scale__, "tl")
|
||||
br_request = loop.run_in_executor(None, self.__poll_scale__, "br")
|
||||
bl_request = loop.run_in_executor(None, self.__poll_scale__, "bl")
|
||||
|
||||
new_result = BedscaleWeightResult(
|
||||
tr=await tr_request,
|
||||
tl=await tl_request,
|
||||
br=await br_request,
|
||||
bl=await bl_request,
|
||||
)
|
||||
|
||||
# TODO: Sanity checks
|
||||
|
||||
# TODO: Keep track of empty-bed weight
|
||||
|
||||
self._history.append(new_result)
|
||||
if len(self._history) > self._history_length:
|
||||
self._history = self._history[-self._history_length :]
|
||||
|
||||
def __poll_scale__(self, leg: str) -> float | None:
|
||||
try:
|
||||
return requests.get(f"{self._ip_address}/sensor/{leg}/").json()["value"]
|
||||
except:
|
||||
return None
|
||||
|
||||
def get_history(self) -> list[BedscaleWeightResult]:
|
||||
return self._history
|
|
@ -5,7 +5,7 @@ import os
|
|||
from statistics import median
|
||||
from typing import Optional
|
||||
import requests as r
|
||||
from ..hue import hue
|
||||
from ...endpoints.hue import hue_bridge
|
||||
import logging
|
||||
|
||||
file_path: str = "bettwaage.csv"
|
||||
|
@ -126,9 +126,9 @@ def check_for_change():
|
|||
# Make room sexy
|
||||
if sexy_mode_detection:
|
||||
if number_of_people >= 2 and weight_increased:
|
||||
hue.in_room_activate_scene("Max Zimmer", "Sexy")
|
||||
hue_bridge.in_room_activate_scene("Max Zimmer", "Sexy")
|
||||
elif number_of_people == 1 and not weight_increased:
|
||||
hue.in_room_activate_scene("Max Zimmer", "Tageslicht")
|
||||
hue_bridge.in_room_activate_scene("Max Zimmer", "Tageslicht")
|
||||
|
||||
|
||||
def add_line_to_bed_history(line: str) -> None:
|
||||
|
@ -166,6 +166,6 @@ async def log_bed_weights():
|
|||
add_weights_to_log(tl, tr, bl, br)
|
||||
check_for_change()
|
||||
except Exception as ex:
|
||||
logging.exception(ex)
|
||||
pass
|
||||
finally:
|
||||
await asyncio.sleep(1)
|
1
src/bridges/fritzbox/__init__.py
Normal file
1
src/bridges/fritzbox/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
from .fritzbox_bridge import FritzBoxBridge
|
99
src/bridges/fritzbox/fritzbox_bridge.py
Normal file
99
src/bridges/fritzbox/fritzbox_bridge.py
Normal file
|
@ -0,0 +1,99 @@
|
|||
import logging
|
||||
from core.bridge import Bridge
|
||||
from fritzconnection import FritzConnection
|
||||
|
||||
|
||||
EXCEPTION_RECONNECT_TIMEOUT_SEC = 60
|
||||
|
||||
|
||||
class FritzDeviceState:
|
||||
"""Holds the state of a fritzbox device and implements comparisons."""
|
||||
|
||||
def __init__(self, mac_address: str, raw_state: dict) -> None:
|
||||
"""
|
||||
Args:
|
||||
mac_address (str): Mac Address of fritzbox device.
|
||||
raw_state (dict): Raw device state containing NewActive, NewIPAddress, NewAddressSource, NewLeaseTimeRemaining, NewInterfaceType & NewHostName.
|
||||
"""
|
||||
logging.debug(
|
||||
f"Fritz raw device state to mac [{mac_address}]: {raw_state}",
|
||||
extra=raw_state,
|
||||
)
|
||||
self.mac_address = mac_address
|
||||
self.active: bool = raw_state["NewActive"]
|
||||
self.ip_address: str = raw_state["NewIPAddress"]
|
||||
self.address_source: str = raw_state["NewAddressSource"]
|
||||
self.lease_time_remaining = raw_state["NewLeaseTimeRemaining"]
|
||||
self.interface_type: str = raw_state["NewInterfaceType"]
|
||||
self.host_name: str = raw_state["NewHostName"]
|
||||
|
||||
def __eq__(self, value: object) -> bool:
|
||||
return (
|
||||
type(value) is FritzDeviceState
|
||||
and self.mac_address == value.mac_address
|
||||
and self.ip_address == value.ip_address
|
||||
and self.address_source == value.address_source
|
||||
and self.lease_time_remaining == value.lease_time_remaining
|
||||
and self.interface_type == value.interface_type
|
||||
and self.host_name == value.host_name
|
||||
and self.active == value.active
|
||||
)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"[{self.mac_address} | {self.host_name}] {'Active' if self.active else 'Inactive'} - {self.ip_address} - {self.address_source} - {self.interface_type} - {self.lease_time_remaining}"
|
||||
|
||||
|
||||
class FritzBoxBridge(Bridge):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
id: str,
|
||||
ip: str,
|
||||
port: int | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Args:
|
||||
id (str): Id of fritzbox bridge.
|
||||
ip (str): IP Address of fritzbox bridge in network to connect to.
|
||||
port (int, optional): Port of fritzbox bridge in network to connect to. Defaults to None.
|
||||
"""
|
||||
self._ip = ip
|
||||
self._port = port
|
||||
self._fritz_api: FritzConnection = None
|
||||
|
||||
def connect(self) -> None:
|
||||
if self._fritz_api:
|
||||
self.disconnect()
|
||||
|
||||
self._fritz_api = FritzConnection(address=self._ip, port=self._port)
|
||||
logging.info("Connected")
|
||||
|
||||
def disconnect(self) -> None:
|
||||
logging.info("Disconnected")
|
||||
self._fritz_api = None
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool | None:
|
||||
return self._fritz_api is not None
|
||||
|
||||
@Bridge.requires_connection
|
||||
def get_known_devices(self) -> list[dict]:
|
||||
numberOfDevices = self._fritz_api.call_action(
|
||||
"Hosts", "GetHostNumberOfEntries"
|
||||
)["NewHostNumberOfEntries"]
|
||||
devices = []
|
||||
for i in range(numberOfDevices):
|
||||
devices.append(
|
||||
self._fritz_api.call_action("Hosts", "GetGenericHostEntry", NewIndex=i)
|
||||
)
|
||||
return devices
|
||||
|
||||
@Bridge.requires_connection
|
||||
def get_device_state(self, mac_address: str) -> FritzDeviceState:
|
||||
return FritzDeviceState(
|
||||
mac_address=mac_address,
|
||||
raw_state=self._fritz_api.call_action(
|
||||
"Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address
|
||||
),
|
||||
)
|
2
src/bridges/hue/__init__.py
Normal file
2
src/bridges/hue/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .hue_bridge import HueBridge
|
||||
from .hue_light import HueLight
|
124
src/bridges/hue/hue_bridge.py
Normal file
124
src/bridges/hue/hue_bridge.py
Normal file
|
@ -0,0 +1,124 @@
|
|||
import logging
|
||||
from .hue_light import HueLight
|
||||
from core import Bridge, Group
|
||||
from phue import Bridge as phueBridge
|
||||
from time import sleep
|
||||
|
||||
|
||||
class HueBridge(Bridge):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
ip_address: str,
|
||||
id: str,
|
||||
retry_limit: int = 10,
|
||||
retry_timeout_seconds: int = 5,
|
||||
) -> None:
|
||||
super().__init__(id=id, type="hue")
|
||||
self._retry_limit = retry_limit
|
||||
self._retry_timeout_seconds = retry_timeout_seconds
|
||||
self._hue: phueBridge = phueBridge(ip_address)
|
||||
|
||||
def disconnect(self) -> None:
|
||||
self._hue = None
|
||||
logging.info(f"Disconnected from Hue Bridge [{self.id}].")
|
||||
|
||||
def connect(self) -> None:
|
||||
for _ in range(self._retry_limit):
|
||||
try:
|
||||
self._hue.connect()
|
||||
logging.info(f"Connected to Hue Bridge [{self.id}].")
|
||||
return
|
||||
except Exception as e:
|
||||
logging.exception(
|
||||
f"Failed to connect to Hue bridge [ip {self._hue.ip}]. Retrying in [{self._retry_timeout_seconds}] seconds.",
|
||||
exc_info=e,
|
||||
)
|
||||
sleep(self._retry_timeout_seconds)
|
||||
|
||||
logging.error(
|
||||
f"Unable to connect to Hue bridge [{self.id}]. Retry count exceeded [{self._retry_limit}]."
|
||||
)
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool | None:
|
||||
return self._hue is not None
|
||||
|
||||
def list_api(self) -> dict:
|
||||
return self._hue.get_api()
|
||||
|
||||
def get_all_lights(self) -> Group:
|
||||
rooms = {
|
||||
r["name"]: [int(l) for l in r["lights"]]
|
||||
for r in self._hue.get_api()["groups"].values()
|
||||
if r["type"] == "Room"
|
||||
}
|
||||
|
||||
lights = []
|
||||
for l in self._hue.get_light_objects():
|
||||
if l.colormode != "xy":
|
||||
# TODO: Implement light to handle all color modes
|
||||
continue
|
||||
|
||||
room = "Unknown"
|
||||
for room_name, room_lights in rooms.items():
|
||||
if l.light_id in room_lights:
|
||||
room = room_name
|
||||
break
|
||||
|
||||
lights.append(
|
||||
HueLight(
|
||||
hue_bridge=self,
|
||||
hue_light=l,
|
||||
id=f"hue-light-{l.light_id}",
|
||||
name=l.name,
|
||||
room=room,
|
||||
)
|
||||
)
|
||||
|
||||
return Group(entities=lights, id="all-hue-lights", name="All Hue Lights")
|
||||
|
||||
def list_scenes(self) -> dict:
|
||||
return self._hue.get_scene()
|
||||
|
||||
def get_scene_by_name(self, name) -> dict | None:
|
||||
for key, scene in self.list_scenes().items():
|
||||
if scene["name"] == name:
|
||||
scene["id"] = key
|
||||
return scene
|
||||
return None
|
||||
|
||||
def set_light(self, lights: int | list[int], command):
|
||||
return self._hue.set_light(lights, command)
|
||||
|
||||
def get_light(self, id, command=None):
|
||||
return self._hue.get_light(id, command)
|
||||
|
||||
def set_group(self, groups, command):
|
||||
return self._hue.set_group(groups, command)
|
||||
|
||||
def get_group(self, id, command=None):
|
||||
return self._hue.get_group(id, command)
|
||||
|
||||
def in_room_activate_scene(self, room_name: str, scene_name: str):
|
||||
"""Activate a scene in a room.
|
||||
|
||||
Args:
|
||||
scene (str): The name of the scene to activate.
|
||||
room (str): The name of the room to activate the scene in.
|
||||
"""
|
||||
scene_id = self.get_scene_by_name(scene_name)["id"]
|
||||
if scene_id is None:
|
||||
raise "Scene not found."
|
||||
|
||||
self._hue.set_group(room_name, {"scene": scene_id})
|
||||
|
||||
def in_room_set_lights_active(self, room_name: str, active: bool):
|
||||
"""Deactivate all lights in a room.
|
||||
|
||||
Args:
|
||||
room_name (str): The name of the room to deactivate the lights in.
|
||||
active (bool): True, to turn all lights on, or False to turn them off.
|
||||
"""
|
||||
self._hue.set_group(room_name, {"on": active})
|
129
src/bridges/hue/hue_light.py
Normal file
129
src/bridges/hue/hue_light.py
Normal file
|
@ -0,0 +1,129 @@
|
|||
import asyncio
|
||||
from core import Entity, Color
|
||||
from phue import Light
|
||||
|
||||
|
||||
class HueLight(Entity):
|
||||
MAX_HUE_RANGE: int = 65535
|
||||
MAX_SAT_RANGE: int = 254
|
||||
MAX_BRI_RANGE: int = 254
|
||||
TRANSITION_TIME_SCALE: float = 0.1
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
hue_bridge: "HueBridge",
|
||||
hue_light: Light,
|
||||
id: str,
|
||||
name: str,
|
||||
room: str,
|
||||
groups: list[str] = [],
|
||||
) -> None:
|
||||
super().__init__(
|
||||
id=id, name=name, room=room, groups=groups, device_type="light"
|
||||
)
|
||||
self._bridge = hue_bridge
|
||||
self._hue_light = hue_light
|
||||
|
||||
self._on = False
|
||||
self._color: Color = Color()
|
||||
self._transition_duration_sec = 0
|
||||
|
||||
asyncio.run(self.update())
|
||||
|
||||
async def __commit_to_device__(self):
|
||||
self._hue_light.transitiontime = (
|
||||
self._transition_duration_sec * HueLight.TRANSITION_TIME_SCALE
|
||||
)
|
||||
|
||||
self._hue_light.on = self._on
|
||||
|
||||
self._hue_light.hue = self._color.hue * HueLight.MAX_HUE_RANGE
|
||||
self._hue_light.saturation = self._color.saturation * HueLight.MAX_SAT_RANGE
|
||||
self._hue_light.brightness = self._color.brightness * HueLight.MAX_BRI_RANGE
|
||||
|
||||
async def update(self):
|
||||
def __internal_update__():
|
||||
self._on = self._hue_light.on
|
||||
|
||||
# TODO: Update Color instead of overwriting to better track change
|
||||
self._color = Color(
|
||||
hue=self._hue_light.hue / HueLight.MAX_HUE_RANGE,
|
||||
saturation=self._hue_light.saturation / HueLight.MAX_SAT_RANGE,
|
||||
brightness=self._hue_light.brightness / HueLight.MAX_BRI_RANGE,
|
||||
)
|
||||
|
||||
await asyncio.get_running_loop().run_in_executor(None, __internal_update__)
|
||||
|
||||
@property
|
||||
def color(self) -> Color:
|
||||
return self._color
|
||||
|
||||
@property
|
||||
def on(self) -> bool:
|
||||
return self._on
|
||||
|
||||
@property
|
||||
def transition_time(self) -> float:
|
||||
return self._transition_duration_sec
|
||||
|
||||
async def set_brightness(self, brightness: float):
|
||||
if self._color.brightness == brightness:
|
||||
return
|
||||
|
||||
self._color.brightness = brightness
|
||||
|
||||
if self._on:
|
||||
await self.__commit_to_device__()
|
||||
|
||||
async def set_hue(self, hue: float):
|
||||
if self._color.hue == hue:
|
||||
return
|
||||
|
||||
self._color.hue = hue
|
||||
|
||||
if self._on:
|
||||
await self.__commit_to_device__()
|
||||
|
||||
async def set_saturation(self, saturation: float):
|
||||
if self._color.saturation == saturation:
|
||||
return
|
||||
|
||||
self._color.saturation = saturation
|
||||
|
||||
if self._on:
|
||||
await self.__commit_to_device__()
|
||||
|
||||
async def set_color(self, color: Color):
|
||||
if self._color == color:
|
||||
return
|
||||
|
||||
# TODO: Avoid overwriting to better track change
|
||||
self._color = color
|
||||
|
||||
if self._on:
|
||||
await self.__commit_to_device__()
|
||||
|
||||
async def set_transition_duration(self, seconds: float):
|
||||
if seconds < 0:
|
||||
raise ValueError(
|
||||
f"Given transition duration in seconds must be greater 0. Instead [{str(seconds)}] was provided."
|
||||
)
|
||||
|
||||
self._transition_duration_sec = seconds
|
||||
|
||||
async def turn_on(self):
|
||||
if self._on:
|
||||
return
|
||||
|
||||
self._on = True
|
||||
|
||||
await self.__commit_to_device__()
|
||||
|
||||
async def turn_off(self):
|
||||
if not self._on:
|
||||
return
|
||||
|
||||
self._on = False
|
||||
|
||||
await self.__commit_to_device__()
|
1
src/bridges/matrixclock/__init__.py
Normal file
1
src/bridges/matrixclock/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
from .matrixclock_entity import MatrixClockEntity
|
48
src/bridges/matrixclock/matrixclock_entity.py
Normal file
48
src/bridges/matrixclock/matrixclock_entity.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
from core import Entity
|
||||
import requests as r
|
||||
|
||||
|
||||
class MatrixClockEntity(Entity):
|
||||
|
||||
def __init__(self, *, id: str, name: str, room: str, ip: str, port: int) -> None:
|
||||
super().__init__(id=id, name=name, room=room, device_type="matrixclock")
|
||||
self._ip = ip
|
||||
self._port = port
|
||||
self._base_path = ""
|
||||
|
||||
def __post_request__(self, endpoint: str, content: dict = {}) -> r.Response:
|
||||
url: str = f"http://{self._ip}:{self._port}{self._base_path}/{endpoint}"
|
||||
return r.post(url=url, json=content)
|
||||
|
||||
def display_off(self):
|
||||
self.__post_request__("off")
|
||||
|
||||
def display_time(self):
|
||||
self.__post_request__("time")
|
||||
|
||||
def display_full(self):
|
||||
self.__post_request__("full")
|
||||
|
||||
def display_pattern(self, *, pattern: str = "01", step_ms: int = 500):
|
||||
self.__post_request__(f"pattern?pattern={pattern}&step_ms={step_ms}")
|
||||
|
||||
def get_climate_and_show_temperature(self) -> dict[str, float]:
|
||||
return self.__post_request__("temperature").json()
|
||||
|
||||
def get_climate_and_show_humidity(self) -> dict[str, float]:
|
||||
return self.__post_request__("humidity").json()
|
||||
|
||||
def flash_display(self, *, count: int = 1, contrast: int = None):
|
||||
path = f"flash?count={count}"
|
||||
if contrast:
|
||||
path += "&contrast={contrast}"
|
||||
self.__post_request__(path)
|
||||
|
||||
def set_contrast(self, *, contrast: int):
|
||||
self.__post_request__(f"contrast?contrast={contrast}")
|
||||
|
||||
def get_contrast(self) -> int:
|
||||
return self.__post_request__("contrast").json()["contrast"]
|
||||
|
||||
def show_message(self, *, message: str):
|
||||
self.__post_request__("message", {"message": message})
|
2
src/bridges/z2m/__init__.py
Normal file
2
src/bridges/z2m/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
|||
from .zigbee2mqtt_bridge import Z2mBridge
|
||||
from .contact_sensor_z2m import ContactSensorZ2M
|
5
src/bridges/z2m/contact_sensor_z2m.py
Normal file
5
src/bridges/z2m/contact_sensor_z2m.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
from core import Entity
|
||||
|
||||
|
||||
class ContactSensorZ2M(Entity):
|
||||
pass
|
90
src/bridges/z2m/zigbee2mqtt_bridge.py
Normal file
90
src/bridges/z2m/zigbee2mqtt_bridge.py
Normal file
|
@ -0,0 +1,90 @@
|
|||
import logging
|
||||
from typing import Optional
|
||||
from core import Bridge
|
||||
import paho.mqtt.client as mqtt
|
||||
import json
|
||||
|
||||
|
||||
class Z2mBridge(Bridge):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
id: str,
|
||||
ip: str,
|
||||
port: int = 1883,
|
||||
keepalive: int = 60,
|
||||
topic: str = "zigbee2mqtt",
|
||||
) -> None:
|
||||
"""
|
||||
Args:
|
||||
id (str): Unique identifier of this bridge instance.
|
||||
ip (str): IP-Address of MQTT broker.
|
||||
port (int, optional): Port of MQTT broker. Defaults to 1883.
|
||||
keepalive (int, optional): MQTT keepalive delay in seconds. Defaults to 60.
|
||||
topic (str, optional): Base topic for Zigbee2MQTT interface. Defaults to "zigbee2mqtt".
|
||||
"""
|
||||
super().__init__(id=id, type="zigbee2mqtt")
|
||||
self._ip = ip
|
||||
self._port = port
|
||||
self._keepalive = keepalive
|
||||
self._device_callbacks: dict[str, list] = {}
|
||||
self._topic = topic.strip("/")
|
||||
|
||||
self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||||
|
||||
self._client.on_connect = lambda client, userdata, flags, reason_code, properties: self.__on_connect__(
|
||||
client, userdata, flags, reason_code, properties
|
||||
)
|
||||
self._client.on_message = lambda client, userdata, msg: self.__on_message__(
|
||||
client, userdata, msg
|
||||
)
|
||||
|
||||
def disconnect(self) -> None:
|
||||
self._client.loop_stop()
|
||||
self._client.disconnect()
|
||||
logging.info(f"Disconnected from Zigbee2MQTT broker [{self.id}].")
|
||||
|
||||
def connect(self) -> None:
|
||||
self._client.connect(self._ip, self._port, self._keepalive)
|
||||
self._client.loop_start()
|
||||
logging.info(f"Connect to Zigbee2MQTT broker [{self.id}].")
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool | None:
|
||||
return self._client.is_connected()
|
||||
|
||||
def __on_connect__(self, client, userdata, flags, reason_code, properties):
|
||||
self._client.subscribe(f"{self._topic}/#")
|
||||
|
||||
def __on_message__(self, client, userdata, msg: any):
|
||||
device_name = msg.topic.split(self._topic + "/", 2)[-1].split("/", 2)[0]
|
||||
|
||||
if device_name not in self._device_callbacks.keys():
|
||||
return
|
||||
|
||||
for callback in self._device_callbacks[device_name]:
|
||||
callback(device_name, json.loads(msg.payload))
|
||||
|
||||
@Bridge.requires_connection
|
||||
def set_device(self, ieee_address: str, *, content: dict = {}) -> None:
|
||||
self._client.publish(f"{self._topic}/{ieee_address}/set", json.dumps(content))
|
||||
|
||||
@Bridge.requires_connection
|
||||
def get_device(self, ieee_address: str) -> None:
|
||||
self._client.publish(
|
||||
f"{self._topic}/{ieee_address}/get", json.dumps({"state": ""})
|
||||
)
|
||||
|
||||
def subscribe_device(
|
||||
self,
|
||||
callback,
|
||||
*,
|
||||
ieee_address: Optional[str] = None,
|
||||
friendly_name: Optional[str] = None,
|
||||
) -> None:
|
||||
for id in [ieee_address, friendly_name]:
|
||||
if id not in self._device_callbacks.keys():
|
||||
self._device_callbacks[id] = []
|
||||
|
||||
self._device_callbacks[id].append(callback)
|
5
src/core/__init__.py
Normal file
5
src/core/__init__.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
from .bridge import Bridge, BridgeException
|
||||
from .entity import Entity
|
||||
from .group import Group
|
||||
from .room import Room
|
||||
from .color import Color
|
46
src/core/bridge.py
Normal file
46
src/core/bridge.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
class BridgeException(Exception):
|
||||
def __init__(self, id: str, type: str, message: str) -> None:
|
||||
super().__init__(f"Bridge [{type} | {id}] has thrown an exception: {message}")
|
||||
|
||||
|
||||
class Bridge:
|
||||
def __init__(self, *, id: str, type: str) -> None:
|
||||
self._id = id
|
||||
self._type = type
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
return self._type
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool | None:
|
||||
return None
|
||||
|
||||
def __del__(self) -> None:
|
||||
self.disconnect()
|
||||
|
||||
def connect(self) -> None:
|
||||
pass
|
||||
|
||||
def disconnect(self) -> None:
|
||||
pass
|
||||
|
||||
def requires_connection(func, *, auto_connect=True):
|
||||
def inner(self: Bridge, *args, **kwargs):
|
||||
if self.is_connected is False: # Neither True, nor None
|
||||
if not auto_connect:
|
||||
raise BridgeException(
|
||||
self.id,
|
||||
self.type,
|
||||
f"Bridge must be manually connected before executing method [{func.__name__}].",
|
||||
)
|
||||
|
||||
self.connect()
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return inner
|
88
src/core/color.py
Normal file
88
src/core/color.py
Normal file
|
@ -0,0 +1,88 @@
|
|||
class Color:
|
||||
|
||||
def __init__(self, *, hue: float = 0, saturation: float = 0, brightness: float = 0):
|
||||
self.hue = hue
|
||||
self.saturation = saturation
|
||||
self.brightness = brightness
|
||||
|
||||
@property
|
||||
def hue(self) -> float:
|
||||
return self._hue
|
||||
|
||||
@hue.setter
|
||||
def hue(self, value: float):
|
||||
if 0 <= value <= 1:
|
||||
self._hue = value
|
||||
else:
|
||||
raise ValueError("Hue must be between 0 and 1")
|
||||
|
||||
@property
|
||||
def saturation(self) -> float:
|
||||
return self._saturation
|
||||
|
||||
@saturation.setter
|
||||
def saturation(self, value: float):
|
||||
if 0 <= value <= 1:
|
||||
self._saturation = value
|
||||
else:
|
||||
raise ValueError("Saturation must be between 0 and 1")
|
||||
|
||||
@property
|
||||
def brightness(self) -> float:
|
||||
return self._brightness
|
||||
|
||||
@brightness.setter
|
||||
def brightness(self, value: float):
|
||||
if 0 <= value <= 1:
|
||||
self._brightness = value
|
||||
else:
|
||||
raise ValueError("Brightness must be between 0 and 1")
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, Color):
|
||||
return (
|
||||
self.hue == other.hue
|
||||
and self.saturation == other.saturation
|
||||
and self.brightness == other.brightness
|
||||
)
|
||||
raise ValueError(
|
||||
"Can only compare Color instance to other Color instance. Instead, instance of another class was provided."
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
def __repr__(self):
|
||||
# Convert HSB to RGB
|
||||
rgb = self.to_rgb()
|
||||
# Convert RGB to HEX
|
||||
return f"#{int(rgb[0] * 255):02X}{int(rgb[1] * 255):02X}{int(rgb[2] * 255):02X}"
|
||||
|
||||
def to_rgb(self) -> tuple:
|
||||
"""Converts HSB to RGB as a tuple of floats in range [0, 1]."""
|
||||
h, s, b = self.hue, self.saturation, self.brightness
|
||||
|
||||
if s == 0:
|
||||
# Grayscale color (no saturation)
|
||||
return b, b, b
|
||||
|
||||
h = h * 6 # Scale hue to [0, 6]
|
||||
i = int(h) # Which color sector
|
||||
f = h - i # Fractional part of hue
|
||||
p = b * (1 - s)
|
||||
q = b * (1 - s * f)
|
||||
t = b * (1 - s * (1 - f))
|
||||
|
||||
i %= 6
|
||||
if i == 0:
|
||||
return b, t, p
|
||||
elif i == 1:
|
||||
return q, b, p
|
||||
elif i == 2:
|
||||
return p, b, t
|
||||
elif i == 3:
|
||||
return p, q, b
|
||||
elif i == 4:
|
||||
return t, p, b
|
||||
elif i == 5:
|
||||
return b, p, q
|
66
src/core/entity.py
Normal file
66
src/core/entity.py
Normal file
|
@ -0,0 +1,66 @@
|
|||
from .color import Color
|
||||
|
||||
|
||||
class EntityOpNotSupportedError(Exception):
|
||||
def __init__(self, operation: str, *args):
|
||||
super().__init__(f"Entity does not support '{operation}' operation.", *args)
|
||||
|
||||
|
||||
class Entity:
|
||||
|
||||
def __init__(self, *, id: str, name: str, category: str) -> None:
|
||||
self._id = id
|
||||
self._name = name
|
||||
self._category = category.strip().lower()
|
||||
|
||||
self._on: bool = False
|
||||
self._color: Color = Color()
|
||||
self._message_queue: list[str] = []
|
||||
self._pattern
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.name} [{self.id}, {self.category}]"
|
||||
|
||||
async def update(self):
|
||||
"""Implements an entity specific update operation to get the latest state."""
|
||||
raise EntityOpNotSupportedError("update")
|
||||
|
||||
@property
|
||||
def id(self) -> str:
|
||||
return self._id
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def category(self) -> str:
|
||||
return self._category
|
||||
|
||||
@property
|
||||
def update_period(self) -> float:
|
||||
return self._update_period
|
||||
|
||||
@property
|
||||
def color(self) -> Color:
|
||||
raise EntityOpNotSupportedError("color")
|
||||
|
||||
@property
|
||||
def brightness(self) -> float:
|
||||
return self.color.brightness
|
||||
|
||||
@property
|
||||
def hue(self) -> float:
|
||||
return self.color.hue
|
||||
|
||||
@property
|
||||
def saturation(self) -> float:
|
||||
return self.color.saturation
|
||||
|
||||
@property
|
||||
def on(self) -> bool:
|
||||
raise EntityOpNotSupportedError("on")
|
||||
|
||||
@property
|
||||
def transition_time(self) -> float:
|
||||
raise EntityOpNotSupportedError("transition_time")
|
97
src/core/group.py
Normal file
97
src/core/group.py
Normal file
|
@ -0,0 +1,97 @@
|
|||
from fnmatch import fnmatch
|
||||
from .entity import Entity, EntityOpNotSupportedError
|
||||
|
||||
|
||||
class Group(Entity):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
entities: list[Entity] = ...,
|
||||
id: str = "group",
|
||||
name: str = "Empty Group",
|
||||
):
|
||||
super().__init__(id=id, name=name, room=None, device_type="group")
|
||||
self._entities: list[Entity] = entities
|
||||
|
||||
# List of method names to dynamically create
|
||||
methods_to_create = [
|
||||
"set_brightness",
|
||||
"set_hue",
|
||||
"set_saturation",
|
||||
"set_color",
|
||||
"set_transition_duration",
|
||||
"turn_on",
|
||||
"turn_off",
|
||||
]
|
||||
|
||||
for method_name in methods_to_create:
|
||||
setattr(self, method_name, self._create_group_method(method_name))
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._entities)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._entities)
|
||||
|
||||
def __getitem__(self, id: str) -> "Group":
|
||||
if type(id) is int:
|
||||
raise "Numerical index not supported."
|
||||
|
||||
return self.__get_entities_with_specific_property__("id", id)
|
||||
|
||||
async def __call_method__(self, method_name: str, *args, **kwargs):
|
||||
for entity in self._entities:
|
||||
try:
|
||||
func = getattr(entity, method_name)
|
||||
await func(*args, **kwargs)
|
||||
except EntityOpNotSupportedError:
|
||||
pass
|
||||
|
||||
def _create_group_method(self, method_name: str):
|
||||
# Create a method that calls __call_method__ for the given method name
|
||||
async def group_method(*args, **kwargs):
|
||||
await self.__call_method__(method_name, *args, **kwargs)
|
||||
|
||||
return group_method
|
||||
|
||||
def __get_entities_with_specific_property__(
|
||||
self,
|
||||
property: str,
|
||||
target_pattern: str,
|
||||
) -> list[Entity]:
|
||||
"""Returns all entities for which the property getter matches the desired pattern.
|
||||
|
||||
Args:
|
||||
property_getter (callable[[Entity], str]): Takes one entity and returns the value of the filtered property.
|
||||
target_pattern (str): Pattern that is matched against.
|
||||
|
||||
Returns:
|
||||
list[Entity]: A new group of entities or an empty group, if no entity property matches the target pattern.
|
||||
"""
|
||||
return Group(
|
||||
entities=[
|
||||
e
|
||||
for e in self._entities
|
||||
if fnmatch(
|
||||
getattr(e, property),
|
||||
target_pattern,
|
||||
)
|
||||
],
|
||||
name=f"{property}={target_pattern}",
|
||||
)
|
||||
|
||||
def with_id(self, id_pattern: str) -> "Group":
|
||||
return self.__get_entities_with_specific_property__("id", id_pattern)
|
||||
|
||||
def with_name(self, name_pattern: str) -> "Group":
|
||||
return self.__get_entities_with_specific_property__("name", name_pattern)
|
||||
|
||||
def in_room(self, room_pattern: str) -> "Group":
|
||||
return self.__get_entities_with_specific_property__("room", room_pattern)
|
||||
|
||||
def of_device_type(self, type_pattern: str) -> "Group":
|
||||
return self.__get_entities_with_specific_property__("device_type", type_pattern)
|
||||
|
||||
def in_groups(self, groups_pattern: str) -> "Group":
|
||||
return self.__get_entities_with_specific_property__("groups", groups_pattern)
|
42
src/core/pattern.py
Normal file
42
src/core/pattern.py
Normal file
|
@ -0,0 +1,42 @@
|
|||
class Pattern:
|
||||
def __init__(
|
||||
self, *, sequence: str = "01", active_ms: int = 1000, inactive_ms: int = 1000
|
||||
):
|
||||
self._sequence = None
|
||||
self._active_ms = None
|
||||
self._inactive_ms = None
|
||||
self.sequence = sequence # Use the setter for validation
|
||||
self.active_ms = active_ms # Use the setter for validation
|
||||
self.inactive_ms = inactive_ms # Use the setter for validation
|
||||
|
||||
@property
|
||||
def sequence(self) -> str:
|
||||
return self._sequence
|
||||
|
||||
@sequence.setter
|
||||
def sequence(self, value: str):
|
||||
if not isinstance(value, str):
|
||||
raise ValueError("Sequence must be a string.")
|
||||
if not all(char in "01" for char in value):
|
||||
raise ValueError("Sequence must only contain '0' and '1'.")
|
||||
self._sequence = value
|
||||
|
||||
@property
|
||||
def active_ms(self) -> int:
|
||||
return self._active_ms
|
||||
|
||||
@active_ms.setter
|
||||
def active_ms(self, value: int):
|
||||
if not isinstance(value, int) or value < 0:
|
||||
raise ValueError("Active milliseconds must be a non-negative integer.")
|
||||
self._active_ms = value
|
||||
|
||||
@property
|
||||
def inactive_ms(self) -> int:
|
||||
return self._inactive_ms
|
||||
|
||||
@inactive_ms.setter
|
||||
def inactive_ms(self, value: int):
|
||||
if not isinstance(value, int) or value < 0:
|
||||
raise ValueError("Inactive milliseconds must be a non-negative integer.")
|
||||
self._inactive_ms = value
|
56
src/endpoints/bedscale.py
Normal file
56
src/endpoints/bedscale.py
Normal file
|
@ -0,0 +1,56 @@
|
|||
import asyncio
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
from fastapi import APIRouter
|
||||
|
||||
import os
|
||||
import csv
|
||||
|
||||
from bridges.bedscale import BedscaleEntity
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
bedscale = BedscaleEntity(
|
||||
ip_address="http://192.168.178.110:80",
|
||||
id="bedscale",
|
||||
name="Bettwaage",
|
||||
room="Max Zimmer",
|
||||
)
|
||||
|
||||
|
||||
async def bedscale_service():
|
||||
while True:
|
||||
r = await bedscale.update()
|
||||
await asyncio.sleep(bedscale.update_period)
|
||||
|
||||
|
||||
@router.get("/latest")
|
||||
async def get_latest():
|
||||
if len(bedscale.get_history()) == 0:
|
||||
return HTMLResponse(status_code=200, content="No data given yet")
|
||||
return bedscale.get_history()[-1]
|
||||
|
||||
# @router.get("/history")
|
||||
# async def get_history(count: int = None) -> list[dict]:
|
||||
|
||||
# points = []
|
||||
# with open(file_path, "r", encoding="UTF-8") as fp:
|
||||
# reader = csv.DictReader(fp, delimiter=";")
|
||||
# for row in reader:
|
||||
# if not row:
|
||||
# continue
|
||||
|
||||
# points.append(
|
||||
# {
|
||||
# "timestamp": row["timestamp"],
|
||||
# "total": float(row["total"]),
|
||||
# "tl": float(row["tl"]),
|
||||
# "tr": float(row["tr"]),
|
||||
# "bl": float(row["bl"]),
|
||||
# "br": float(row["br"]),
|
||||
# }
|
||||
# )
|
||||
|
||||
# if count:
|
||||
# return points[-count]
|
||||
# else:
|
||||
# return points
|
|
@ -1,56 +0,0 @@
|
|||
import asyncio
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
from fastapi import APIRouter
|
||||
|
||||
import os
|
||||
import csv
|
||||
from .handlers.bett import file_path, local_history, log_bed_weights
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
asyncio.create_task(log_bed_weights())
|
||||
|
||||
|
||||
@router.get("/file", tags=["file"])
|
||||
async def get_file():
|
||||
with open(file_path, "r", encoding="UTF-8") as fp:
|
||||
return HTMLResponse("\n".join(fp.readlines()))
|
||||
|
||||
|
||||
@router.get("/history")
|
||||
async def get_history(count: int = None) -> list[dict]:
|
||||
points = []
|
||||
with open(file_path, "r", encoding="UTF-8") as fp:
|
||||
reader = csv.DictReader(fp, delimiter=";")
|
||||
for row in reader:
|
||||
if not row:
|
||||
continue
|
||||
|
||||
points.append(
|
||||
{
|
||||
"timestamp": row["timestamp"],
|
||||
"total": float(row["total"]),
|
||||
"tl": float(row["tl"]),
|
||||
"tr": float(row["tr"]),
|
||||
"bl": float(row["bl"]),
|
||||
"br": float(row["br"]),
|
||||
}
|
||||
)
|
||||
|
||||
if count:
|
||||
return points[-count]
|
||||
else:
|
||||
return points
|
||||
|
||||
|
||||
@router.get("/latest")
|
||||
async def get_latest():
|
||||
if len(local_history) == 0:
|
||||
return HTMLResponse(status_code=200, content="No data given yet")
|
||||
return JSONResponse(local_history[-1])
|
||||
|
||||
|
||||
@router.delete("/delete", tags=["file"])
|
||||
async def delete_file():
|
||||
os.remove(file_path)
|
||||
return "Deleted file"
|
54
src/endpoints/fritzbox.py
Normal file
54
src/endpoints/fritzbox.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import HTMLResponse
|
||||
from bridges.fritzbox import FritzBoxBridge
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
fritzbox = FritzBoxBridge(id="fritzbox", ip="192.168.178.1")
|
||||
refresh_every_seconds = 10
|
||||
|
||||
macaddresses_to_track = ["B2:06:77:EE:A9:0F"] # Max' iPhone
|
||||
|
||||
devices_last_online: dict[str, datetime] = {}
|
||||
|
||||
|
||||
async def track_network_devices():
|
||||
global devices_last_online
|
||||
|
||||
# Initial values to avoid None
|
||||
for macaddress in macaddresses_to_track:
|
||||
devices_last_online[macaddress] = datetime(1970, 1, 1, 0, 0, 0)
|
||||
|
||||
while True:
|
||||
try:
|
||||
for macaddress in macaddresses_to_track:
|
||||
device = fritzbox.get_device_state(macaddress)
|
||||
if device.active:
|
||||
devices_last_online[macaddress] = datetime.now()
|
||||
|
||||
except Exception as ex:
|
||||
logging.exception(ex)
|
||||
finally:
|
||||
await asyncio.sleep(refresh_every_seconds)
|
||||
|
||||
|
||||
@router.get("/{mac_address}/state")
|
||||
async def get_latest(mac_address: str):
|
||||
if mac_address not in devices_last_online.keys():
|
||||
return HTMLResponse(status_code=200, content="Mac Address not being tracked.")
|
||||
|
||||
last_online_delta = datetime.now() - last_online_delta[mac_address]
|
||||
|
||||
return {
|
||||
"active": last_online_delta < refresh_every_seconds * 2,
|
||||
"last_active": last_online_delta[mac_address],
|
||||
}
|
||||
|
||||
|
||||
@router.get("/tracked")
|
||||
async def get_latest():
|
||||
return list(devices_last_online.keys())
|
|
@ -1,86 +0,0 @@
|
|||
from time import sleep
|
||||
from phue import Bridge
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class HueHandler:
|
||||
"""Handler for Hue API calls."""
|
||||
|
||||
registered_ips_file = "hue_bridge_registered.txt"
|
||||
|
||||
def __init__(self, bridge_ip: str):
|
||||
"""Initialize the HueHandler."""
|
||||
self.bridge = None
|
||||
self.connect(bridge_ip)
|
||||
|
||||
def connect(self, bridge_ip: str):
|
||||
if bridge_ip in self.get_registered_ips():
|
||||
self.bridge = Bridge(bridge_ip)
|
||||
self.bridge.connect()
|
||||
return
|
||||
|
||||
# Connect loop
|
||||
while True:
|
||||
try:
|
||||
self.bridge = Bridge(bridge_ip)
|
||||
self.bridge.connect()
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Failed to connect to bridge: {bridge_ip}")
|
||||
print(e)
|
||||
print("Trying again in 5 seconds..")
|
||||
sleep(5)
|
||||
|
||||
self.register_bridge(bridge_ip)
|
||||
|
||||
def get_registered_ips(self) -> list:
|
||||
"""Get a list of registered bridge IPs."""
|
||||
if not Path(HueHandler.registered_ips_file).is_file():
|
||||
return []
|
||||
|
||||
with open(HueHandler.registered_ips_file, "r") as f:
|
||||
return f.readlines()
|
||||
|
||||
def register_bridge(self, bridge_ip: str):
|
||||
"""Register a bridge IP."""
|
||||
with open(HueHandler.registered_ips_file, "a") as f:
|
||||
f.write(bridge_ip + "\n")
|
||||
|
||||
def list_scenes(self) -> dict:
|
||||
return self.bridge.get_scene()
|
||||
|
||||
def get_scene_by_name(self, name):
|
||||
for key, scene in self.list_scenes().items():
|
||||
if scene["name"] == name:
|
||||
scene["id"] = key
|
||||
return scene
|
||||
return None
|
||||
|
||||
def in_room_activate_scene(self, room_name: str, scene_name: str):
|
||||
"""Activate a scene in a room.
|
||||
|
||||
Args:
|
||||
scene (str): The name of the scene to activate.
|
||||
room (str): The name of the room to activate the scene in.
|
||||
"""
|
||||
scene_id = self.get_scene_by_name(scene_name)["id"]
|
||||
if scene_id is None:
|
||||
raise "Scene not found."
|
||||
|
||||
self.bridge.set_group(room_name, {"scene": scene_id})
|
||||
|
||||
def in_room_deactivate_lights(self, room_name: str):
|
||||
"""Deactivate all lights in a room.
|
||||
|
||||
Args:
|
||||
room_name (str): The name of the room to deactivate the lights in.
|
||||
"""
|
||||
self.bridge.set_group(room_name, {"on": False})
|
||||
|
||||
def in_room_activate_lights(self, room_name: str):
|
||||
"""Activate all lights in a room.
|
||||
|
||||
Args:
|
||||
room_name (str): The name of the room to activate the lights in.
|
||||
"""
|
||||
self.bridge.set_group(room_name, {"on": True})
|
|
@ -1,44 +1,60 @@
|
|||
import asyncio
|
||||
from fastapi import FastAPI, APIRouter
|
||||
|
||||
from bridges.hue import HueBridge, HueLight
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import HTMLResponse
|
||||
from .handlers.hue import HueHandler
|
||||
|
||||
router = APIRouter()
|
||||
hue = HueHandler("192.168.178.85")
|
||||
from core import Group
|
||||
|
||||
router = APIRouter(tags=["hue"])
|
||||
hue_bridge = HueBridge(ip_address="192.168.178.85", id="hue-bridge")
|
||||
hue_lights: Group = hue_bridge.get_all_lights()
|
||||
|
||||
update_period_seconds = 5
|
||||
|
||||
|
||||
@router.get("/scenes", tags=["scene"])
|
||||
async def hue_service():
|
||||
while True:
|
||||
try:
|
||||
await hue_lights.update()
|
||||
await asyncio.sleep(update_period_seconds)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
@router.get("/scenes")
|
||||
async def get_scenes():
|
||||
return hue.list_scenes()
|
||||
return hue_bridge.list_scenes()
|
||||
|
||||
|
||||
@router.post(
|
||||
"/room/{room_name}/scene/{scene_name}",
|
||||
tags=["room", "scene"],
|
||||
)
|
||||
@router.get("/lights")
|
||||
async def get_scenes():
|
||||
return {
|
||||
l.id: {"name": l.name, "room": l.room, "color": str(l.color), "on": l.on}
|
||||
for l in hue_lights
|
||||
}
|
||||
|
||||
|
||||
@router.post("/room/{room_name}/scene/{scene_name}")
|
||||
async def activate_scene(room_name: str, scene_name: str):
|
||||
try:
|
||||
hue.in_room_activate_scene(room_name, scene_name)
|
||||
hue_bridge.in_room_activate_scene(room_name, scene_name)
|
||||
except Exception as e:
|
||||
return HTMLResponse(status_code=400, content=str(e))
|
||||
|
||||
|
||||
@router.post(
|
||||
"/room/{room_name}/off",
|
||||
tags=["room"],
|
||||
)
|
||||
@router.post("/room/{room_name}/off")
|
||||
async def deactivate_room(room_name: str):
|
||||
try:
|
||||
hue.in_room_deactivate_lights(room_name)
|
||||
await hue_lights.in_room(room_name).turn_off()
|
||||
except Exception as e:
|
||||
return HTMLResponse(status_code=400, content=str(e))
|
||||
|
||||
|
||||
@router.post(
|
||||
"/room/{room_name}/on",
|
||||
tags=["room"],
|
||||
)
|
||||
@router.post("/room/{room_name}/on")
|
||||
async def activate_room(room_name: str):
|
||||
try:
|
||||
hue.in_room_activate_lights(room_name)
|
||||
await hue_lights.in_room(room_name).turn_on()
|
||||
except Exception as e:
|
||||
return HTMLResponse(status_code=400, content=str(e))
|
||||
|
|
1333
src/hue_api.json
Normal file
1333
src/hue_api.json
Normal file
File diff suppressed because it is too large
Load diff
|
@ -1,15 +0,0 @@
|
|||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
||||
192.168.178.85
|
47
src/main.py
47
src/main.py
|
@ -1,14 +1,47 @@
|
|||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
from fastapi import FastAPI
|
||||
from endpoints.hue import router as hue_router
|
||||
from endpoints.bettwaage import router as bettwaage_router
|
||||
from endpoints.handlers.fritz import track_network_devices
|
||||
import uvicorn
|
||||
from endpoints.hue import hue_service, router as hue_router
|
||||
from endpoints.bedscale import bedscale_service, router as bettwaage_router
|
||||
from endpoints.fritzbox import track_network_devices, router as fritzbox_router
|
||||
|
||||
app = FastAPI()
|
||||
asyncio.create_task(track_network_devices())
|
||||
# Background task references
|
||||
background_tasks = []
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Start background services."""
|
||||
# fritz_task = asyncio.create_task(track_network_devices(), name="Fritz!Box Tracker")
|
||||
bedscale_task = asyncio.create_task(bedscale_service(), name="Polling bed-scale")
|
||||
# hue_task = asyncio.create_task(hue_service(), name="Polling Hue Bridge")
|
||||
|
||||
# TODO: Fix background task execution. ^ these calls are blocking
|
||||
|
||||
# Store references to the tasks
|
||||
# background_tasks.extend([fritz_task, bedscale_task, hue_task])
|
||||
background_tasks.extend([bedscale_task])
|
||||
|
||||
yield
|
||||
|
||||
"""Stop background services."""
|
||||
for task in background_tasks:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass # Expected when cancelling tasks
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
# API Routes
|
||||
app.include_router(hue_router, prefix="/hue", tags=["hue"])
|
||||
app.include_router(bettwaage_router, prefix="/bettwaage", tags=["bett"])
|
||||
app.include_router(bettwaage_router, prefix="/bettwaage", tags=["bed"])
|
||||
app.include_router(fritzbox_router, prefix="/fritzbox", tags=["fritzbox"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run()
|
||||
# Run API server
|
||||
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|
||||
|
|
74
src/new_syntax_example.py
Normal file
74
src/new_syntax_example.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
class Device:
|
||||
def __init__(self, device_id: str, home: "Home") -> None:
|
||||
self.device_id: str = device_id
|
||||
self.home: Home = home
|
||||
|
||||
def trigger_change(self):
|
||||
self.home.trigger_device_change(self.device_id)
|
||||
|
||||
|
||||
class Home:
|
||||
automations: list["Automation"]
|
||||
|
||||
def _get_device_behaviours_(self, device_id: str) -> list["Automation"]:
|
||||
return [b for a in self.automations]
|
||||
|
||||
def trigger_device_change(self, device_id: str):
|
||||
pass
|
||||
|
||||
|
||||
class Behaviour:
|
||||
def __init__(self) -> None:
|
||||
self.action: function = None
|
||||
self.rule: function = None
|
||||
self.devices: list[str] = []
|
||||
|
||||
|
||||
class Automation:
|
||||
def __init__(self) -> None:
|
||||
self.behaviours: list[Behaviour] = []
|
||||
|
||||
def device(self, device_id: str) -> dict:
|
||||
return {"contrast": 3}
|
||||
|
||||
def trigger(self):
|
||||
def decorator(func):
|
||||
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class PeopleCountEngineV1(Automation):
|
||||
@Automation.trigger(
|
||||
devices=["matrixclock"], rule=lambda h: h.device("matrixclock").contrast == 6
|
||||
)
|
||||
def turn_light_on_sometimes(self, home: Home):
|
||||
home.room("max").lights().on = True
|
||||
|
||||
@Automation.trigger(people=["max"], rule=lambda h: h.person("max").athome())
|
||||
def turn_light_on_sometimes(self, home: Home):
|
||||
home.room("max").lights().on = h.person("max").athome()
|
||||
|
||||
|
||||
@Automation.state(h.room("Max").lights())
|
||||
def max_room_light():
|
||||
if max.ishome():
|
||||
return "off"
|
||||
|
||||
scene = "Daylight scene"
|
||||
|
||||
if nighttime:
|
||||
scene = "nighttime"
|
||||
|
||||
if max.working:
|
||||
scene.dim(0.5)
|
||||
|
||||
return scene
|
||||
|
||||
|
||||
from mash.mash import MaSH
|
||||
|
||||
mash = MaSH()
|
||||
|
||||
mash.add_automation(PeopleCountEngineV1())
|
Loading…
Reference in a new issue