Merging new concepts with current version

This commit is contained in:
Maximilian Giller 2025-01-09 19:06:11 +01:00
parent 0beaab9549
commit 003284ccba
29 changed files with 767 additions and 314 deletions

350
poetry.lock generated Normal file
View file

@ -0,0 +1,350 @@
[[package]]
name = "annotated-types"
version = "0.7.0"
description = "Reusable constraint types to use with typing.Annotated"
category = "main"
optional = false
python-versions = ">=3.8"
[[package]]
name = "anyio"
version = "4.8.0"
description = "High level compatibility layer for multiple asynchronous event loop implementations"
category = "main"
optional = false
python-versions = ">=3.9"
[package.dependencies]
exceptiongroup = {version = ">=1.0.2", markers = "python_version < \"3.11\""}
idna = ">=2.8"
sniffio = ">=1.1"
typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""}
[package.extras]
trio = ["trio (>=0.26.1)"]
test = ["anyio", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"]
doc = ["packaging", "Sphinx (>=7.4,<8.0)", "sphinx-rtd-theme", "sphinx-autodoc-typehints (>=1.2.0)"]
[[package]]
name = "certifi"
version = "2024.12.14"
description = "Python package for providing Mozilla's CA Bundle."
category = "main"
optional = false
python-versions = ">=3.6"
[[package]]
name = "charset-normalizer"
version = "3.4.1"
description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet."
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "click"
version = "8.1.8"
description = "Composable command line interface toolkit"
category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "colorama"
version = "0.4.6"
description = "Cross-platform colored terminal text."
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
[[package]]
name = "exceptiongroup"
version = "1.2.2"
description = "Backport of PEP 654 (exception groups)"
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
test = ["pytest (>=6)"]
[[package]]
name = "fastapi"
version = "0.115.6"
description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production"
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0"
starlette = ">=0.40.0,<0.42.0"
typing-extensions = ">=4.8.0"
[package.extras]
standard = ["fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "email-validator (>=2.0.0)", "uvicorn[standard] (>=0.12.0)"]
all = ["fastapi-cli[standard] (>=0.0.5)", "httpx (>=0.23.0)", "jinja2 (>=2.11.2)", "python-multipart (>=0.0.7)", "itsdangerous (>=1.1.0)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "orjson (>=3.2.1)", "email-validator (>=2.0.0)", "uvicorn[standard] (>=0.12.0)", "pydantic-settings (>=2.0.0)", "pydantic-extra-types (>=2.0.0)"]
[[package]]
name = "fritzconnection"
version = "1.14.0"
description = "Communicate with the AVM FRITZ!Box"
category = "main"
optional = false
python-versions = ">=3.7"
[package.dependencies]
requests = ">=2.22.0"
[package.extras]
qr = ["segno (>=1.4.1)"]
[[package]]
name = "h11"
version = "0.14.0"
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "httptools"
version = "0.6.4"
description = "A collection of framework independent HTTP protocol utils."
category = "main"
optional = false
python-versions = ">=3.8.0"
[package.extras]
test = ["Cython (>=0.29.24)"]
[[package]]
name = "idna"
version = "3.10"
description = "Internationalized Domain Names in Applications (IDNA)"
category = "main"
optional = false
python-versions = ">=3.6"
[package.extras]
all = ["ruff (>=0.6.2)", "mypy (>=1.11.2)", "pytest (>=8.3.2)", "flake8 (>=7.1.1)"]
[[package]]
name = "paho-mqtt"
version = "2.1.0"
description = "MQTT version 5.0/3.1.1 client class"
category = "main"
optional = false
python-versions = ">=3.7"
[package.extras]
proxy = ["pysocks"]
[[package]]
name = "phue"
version = "1.1"
description = "A Philips Hue Python library"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "pydantic"
version = "2.10.5"
description = "Data validation using Python type hints"
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
annotated-types = ">=0.6.0"
pydantic-core = "2.27.2"
typing-extensions = ">=4.12.2"
[package.extras]
email = ["email-validator (>=2.0.0)"]
timezone = ["tzdata"]
[[package]]
name = "pydantic-core"
version = "2.27.2"
description = "Core functionality for Pydantic validation and serialization"
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0"
[[package]]
name = "python-dotenv"
version = "1.0.1"
description = "Read key-value pairs from a .env file and set them as environment variables"
category = "main"
optional = false
python-versions = ">=3.8"
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "pyyaml"
version = "6.0.2"
description = "YAML parser and emitter for Python"
category = "main"
optional = false
python-versions = ">=3.8"
[[package]]
name = "requests"
version = "2.32.3"
description = "Python HTTP for Humans."
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
certifi = ">=2017.4.17"
charset-normalizer = ">=2,<4"
idna = ">=2.5,<4"
urllib3 = ">=1.21.1,<3"
[package.extras]
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "sniffio"
version = "1.3.1"
description = "Sniff out which async library your code is running under"
category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "starlette"
version = "0.41.3"
description = "The little ASGI library that shines."
category = "main"
optional = false
python-versions = ">=3.8"
[package.dependencies]
anyio = ">=3.4.0,<5"
[package.extras]
full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart (>=0.0.7)", "pyyaml"]
[[package]]
name = "typing-extensions"
version = "4.12.2"
description = "Backported and Experimental Type Hints for Python 3.8+"
category = "main"
optional = false
python-versions = ">=3.8"
[[package]]
name = "urllib3"
version = "2.3.0"
description = "HTTP library with thread-safe connection pooling, file post, and more."
category = "main"
optional = false
python-versions = ">=3.9"
[package.extras]
brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"]
h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "uvicorn"
version = "0.34.0"
description = "The lightning-fast ASGI server."
category = "main"
optional = false
python-versions = ">=3.9"
[package.dependencies]
click = ">=7.0"
colorama = {version = ">=0.4", optional = true, markers = "sys_platform == \"win32\" and extra == \"standard\""}
h11 = ">=0.8"
httptools = {version = ">=0.6.3", optional = true, markers = "extra == \"standard\""}
python-dotenv = {version = ">=0.13", optional = true, markers = "extra == \"standard\""}
pyyaml = {version = ">=5.1", optional = true, markers = "extra == \"standard\""}
typing-extensions = {version = ">=4.0", markers = "python_version < \"3.11\""}
uvloop = {version = ">=0.14.0,<0.15.0 || >0.15.0,<0.15.1 || >0.15.1", optional = true, markers = "sys_platform != \"win32\" and sys_platform != \"cygwin\" and platform_python_implementation != \"PyPy\" and extra == \"standard\""}
watchfiles = {version = ">=0.13", optional = true, markers = "extra == \"standard\""}
websockets = {version = ">=10.4", optional = true, markers = "extra == \"standard\""}
[package.extras]
standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"]
[[package]]
name = "uvloop"
version = "0.21.0"
description = "Fast implementation of asyncio event loop on top of libuv"
category = "main"
optional = false
python-versions = ">=3.8.0"
[package.extras]
dev = ["setuptools (>=60)", "Cython (>=3.0,<4.0)"]
docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)"]
test = ["aiohttp (>=3.10.5)", "flake8 (>=5.0,<6.0)", "psutil", "pycodestyle (>=2.9.0,<2.10.0)", "pyOpenSSL (>=23.0.0,<23.1.0)", "mypy (>=0.800)"]
[[package]]
name = "watchfiles"
version = "1.0.3"
description = "Simple, modern and high performance file watching and code reload in python."
category = "main"
optional = false
python-versions = ">=3.9"
[package.dependencies]
anyio = ">=3.0.0"
[[package]]
name = "websockets"
version = "14.1"
description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)"
category = "main"
optional = false
python-versions = ">=3.9"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "470a473cadc51beaadf9c37d81c5ddbde315022964b11d596949a69d3905c583"
[metadata.files]
annotated-types = []
anyio = []
certifi = []
charset-normalizer = []
click = []
colorama = []
exceptiongroup = []
fastapi = []
fritzconnection = []
h11 = []
httptools = []
idna = []
paho-mqtt = []
phue = []
pydantic = []
pydantic-core = []
python-dotenv = []
pyyaml = []
requests = []
sniffio = []
starlette = []
typing-extensions = []
urllib3 = []
uvicorn = []
uvloop = []
watchfiles = []
websockets = []

View file

@ -4,6 +4,9 @@ version = "0.1.0"
description = "Max' Smart Home"
authors = ["Max <m.giller.dev@gmail.com>"]
[tool.poetry.scripts]
start = "main:app"
[tool.poetry.dependencies]
python = "^3.10"
phue = "^1.1"

View file

@ -0,0 +1 @@
from .hue_bridge import HueBridge

View file

@ -0,0 +1,92 @@
import logging
from mash.core.bridge import Bridge
from phue import Bridge as phueBridge
from time import sleep
class HueBridge(Bridge):
def __init__(
self,
*,
id: str,
ip: str,
retry_limit: int = 10,
retry_timeout_seconds: int = 5,
) -> None:
super().__init__(id=id, type="hue")
self._retry_limit = retry_limit
self._retry_timeout_seconds = retry_timeout_seconds
self._hue: phueBridge = phueBridge(ip)
def disconnect(self) -> None:
self._hue = None
logging.info(f"Disconnected from Hue Bridge [{self.id}].")
def connect(self) -> None:
for _ in range(self._retry_limit):
try:
self._hue.connect()
logging.info(f"Connected to Hue Bridge [{self.id}].")
return
except Exception as e:
logging.exception(
f"Failed to connect to Hue bridge [ip {self._hue.ip}]. Retrying in [{self._retry_timeout_seconds}] seconds.",
exc_info=e,
)
sleep(self._retry_timeout_seconds)
logging.error(
f"Unable to connect to Hue bridge [{self.id}]. Retry count exceeded [{self._retry_limit}]."
)
@property
def is_connected(self) -> bool | None:
return self._hue is not None
def list_api(self) -> dict:
return self._hue.get_api()
def list_scenes(self) -> dict:
return self._hue.get_scene()
def get_scene_by_name(self, name) -> dict | None:
for key, scene in self.list_scenes().items():
if scene["name"] == name:
scene["id"] = key
return scene
return None
def set_light(self, lights, command):
return self._hue.set_light(lights, command)
def get_light(self, id, command=None):
return self._hue.get_light(id, command)
def set_group(self, groups, command):
return self._hue.set_group(groups, command)
def get_group(self, id, command=None):
return self._hue.get_group(id, command)
def in_room_activate_scene(self, room_name: str, scene_name: str):
"""Activate a scene in a room.
Args:
scene (str): The name of the scene to activate.
room (str): The name of the room to activate the scene in.
"""
scene_id = self.get_scene_by_name(scene_name)["id"]
if scene_id is None:
raise "Scene not found."
self._hue.set_group(room_name, {"scene": scene_id})
def in_room_set_lights_active(self, room_name: str, active: bool):
"""Deactivate all lights in a room.
Args:
room_name (str): The name of the room to deactivate the lights in.
active (bool): True, to turn all lights on, or False to turn them off.
"""
self._hue.set_group(room_name, {"on": active})

View file

@ -0,0 +1,13 @@
from mash.core.entities.light import Light
from mash.core.utilities.glow import Glow
class HueLight(Light):
def __init__(
self, *, id: str, name: str, room: str, groups: list[str] = ...
) -> None:
super().__init__(id=id, name=name, room=room, groups=groups)
def __on_change__(self, current_on: bool, current_glow: Glow):
pass
# TODO: Requires reference to bridge

2
src/core/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from .bridge import Bridge, BridgeException
from .entity import Entity

46
src/core/bridge.py Normal file
View file

@ -0,0 +1,46 @@
class BridgeException(Exception):
def __init__(self, id: str, type: str, message: str) -> None:
super().__init__(f"Bridge [{type} | {id}] has thrown an exception: {message}")
class Bridge:
def __init__(self, *, id: str, type: str) -> None:
self._id = id
self._type = type
@property
def id(self) -> str:
return self._id
@property
def type(self) -> str:
return self._type
@property
def is_connected(self) -> bool | None:
return None
def __del__(self) -> None:
self.disconnect()
def connect(self) -> None:
pass
def disconnect(self) -> None:
pass
def requires_connection(func, *, auto_connect=True):
def inner(self: Bridge, *args, **kwargs):
if self.is_connected is False: # Neither True, nor None
if not auto_connect:
raise BridgeException(
self.id,
self.type,
f"Bridge must be manually connected before executing method [{func.__name__}].",
)
self.connect()
return func(*args, **kwargs)
return inner

View file

@ -8,7 +8,6 @@ from .handlers.bett import file_path, local_history, log_bed_weights
router = APIRouter()
asyncio.create_task(log_bed_weights())
@router.get("/file", tags=["file"])
@ -51,7 +50,6 @@ async def get_latest():
return JSONResponse(local_history[-1])
@router.delete("/delete", tags=["file"])
async def delete_file():
os.remove(file_path)

View file

@ -166,6 +166,6 @@ async def log_bed_weights():
add_weights_to_log(tl, tr, bl, br)
check_for_change()
except Exception as ex:
logging.exception(ex)
pass
finally:
await asyncio.sleep(1)

View file

@ -1,27 +1,12 @@
from fastapi import FastAPI, APIRouter
from hue.hue_adapter import HueAdapter
from ..mash.feature import Feature
from .handlers.hue import HueAdapter
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
router = APIRouter(tags=["hue"])
hue = HueAdapter("192.168.178.85")
########## Integration ##########
class HueIntegration(Feature):
def __init__(self) -> None:
super().__init__("hue")
def add_routes(self, server: FastAPI) -> None:
server.include_router(router, prefix="/hue")
########## Routes ##########
@router.get("/scenes", tags=["scene"])
async def get_scenes():
return hue.list_scenes()

1
src/fritzbox/__init__.py Normal file
View file

@ -0,0 +1 @@
from .fritzbox_bridge import FritzBoxBridge

View file

@ -0,0 +1,100 @@
import logging
from typing import Optional
from core.bridge import Bridge
from fritzconnection import FritzConnection
EXCEPTION_RECONNECT_TIMEOUT_SEC = 60
class FritzDeviceState:
"""Holds the state of a fritzbox device and implements comparisons."""
def __init__(self, mac_address: str, raw_state: dict) -> None:
"""
Args:
mac_address (str): Mac Address of fritzbox device.
raw_state (dict): Raw device state containing NewActive, NewIPAddress, NewAddressSource, NewLeaseTimeRemaining, NewInterfaceType & NewHostName.
"""
logging.debug(
f"Fritz raw device state to mac [{mac_address}]: {raw_state}",
extra=raw_state,
)
self.mac_address = mac_address
self.active: bool = raw_state["NewActive"]
self.ip_address: str = raw_state["NewIPAddress"]
self.address_source: str = raw_state["NewAddressSource"]
self.lease_time_remaining = raw_state["NewLeaseTimeRemaining"]
self.interface_type: str = raw_state["NewInterfaceType"]
self.host_name: str = raw_state["NewHostName"]
def __eq__(self, value: object) -> bool:
return (
type(value) is FritzDeviceState
and self.mac_address == value.mac_address
and self.ip_address == value.ip_address
and self.address_source == value.address_source
and self.lease_time_remaining == value.lease_time_remaining
and self.interface_type == value.interface_type
and self.host_name == value.host_name
and self.active == value.active
)
def __str__(self) -> str:
return f"[{self.mac_address} | {self.host_name}] {'Active' if self.active else 'Inactive'} - {self.ip_address} - {self.address_source} - {self.interface_type} - {self.lease_time_remaining}"
class FritzBoxBridge(Bridge):
def __init__(
self,
*,
id: str,
ip: str,
port: Optional[int] = None,
) -> None:
"""
Args:
id (str): Id of fritzbox bridge.
ip (str): IP Address of fritzbox bridge in network to connect to.
port (Optional[int], optional): Port of fritzbox bridge in network to connect to. Defaults to None.
"""
self._ip = ip
self._port = port
self._fritz_api: FritzConnection = None
def connect(self) -> None:
if self._fritz_api:
self.disconnect()
self._fritz_api = FritzConnection(address=self._ip, port=self._port)
logging.info("Connected")
def disconnect(self) -> None:
logging.info("Disconnected")
self._fritz_api = None
@property
def is_connected(self) -> bool | None:
return self._fritz_api is not None
@Bridge.requires_connection
def get_known_devices(self) -> list[dict]:
numberOfDevices = self._fritz_api.call_action(
"Hosts", "GetHostNumberOfEntries"
)["NewHostNumberOfEntries"]
devices = []
for i in range(numberOfDevices):
devices.append(
self._fritz_api.call_action("Hosts", "GetGenericHostEntry", NewIndex=i)
)
return devices
@Bridge.requires_connection
def get_device_state(self, mac_address: str) -> FritzDeviceState:
return FritzDeviceState(
mac_address=mac_address,
raw_state=self._fritz_api.call_action(
"Hosts", "GetSpecificHostEntry", NewMACAddress=mac_address
),
)

View file

@ -1,86 +0,0 @@
from time import sleep
from phue import Bridge
from pathlib import Path
class HueAdapter:
"""Handler for Hue API calls."""
registered_ips_file = "hue_bridge_registered.txt"
def __init__(self, bridge_ip: str):
"""Initialize the HueHandler."""
self.bridge = None
self.connect(bridge_ip)
def connect(self, bridge_ip: str):
if bridge_ip in self.get_registered_ips():
self.bridge = Bridge(bridge_ip)
self.bridge.connect()
return
# Connect loop
while True:
try:
self.bridge = Bridge(bridge_ip)
self.bridge.connect()
break
except Exception as e:
print(f"Failed to connect to bridge: {bridge_ip}")
print(e)
print("Trying again in 5 seconds..")
sleep(5)
self.register_bridge(bridge_ip)
def get_registered_ips(self) -> list:
"""Get a list of registered bridge IPs."""
if not Path(HueAdapter.registered_ips_file).is_file():
return []
with open(HueAdapter.registered_ips_file, "r") as f:
return [ad.strip() for ad in f.readlines()]
def register_bridge(self, bridge_ip: str):
"""Register a bridge IP."""
with open(HueAdapter.registered_ips_file, "a") as f:
f.write(bridge_ip + "\n")
def list_scenes(self) -> dict:
return self.bridge.get_scene()
def get_scene_by_name(self, name):
for key, scene in self.list_scenes().items():
if scene["name"] == name:
scene["id"] = key
return scene
return None
def in_room_activate_scene(self, room_name: str, scene_name: str):
"""Activate a scene in a room.
Args:
scene (str): The name of the scene to activate.
room (str): The name of the room to activate the scene in.
"""
scene_id = self.get_scene_by_name(scene_name)["id"]
if scene_id is None:
raise "Scene not found."
self.bridge.set_group(room_name, {"scene": scene_id})
def in_room_deactivate_lights(self, room_name: str):
"""Deactivate all lights in a room.
Args:
room_name (str): The name of the room to deactivate the lights in.
"""
self.bridge.set_group(room_name, {"on": False})
def in_room_activate_lights(self, room_name: str):
"""Activate all lights in a room.
Args:
room_name (str): The name of the room to activate the lights in.
"""
self.bridge.set_group(room_name, {"on": True})

View file

@ -1,60 +0,0 @@
from fastapi import FastAPI, APIRouter
from hue.hue_adapter import HueAdapter
from ..mash.feature import Feature
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
router = APIRouter(tags=["hue"])
hue = HueAdapter("192.168.178.85")
########## Integration ##########
class HueIntegration(Feature):
def __init__(self) -> None:
super().__init__("hue")
def add_routes(self, server: FastAPI) -> None:
server.include_router(router, prefix="/hue")
########## Routes ##########
@router.get("/scenes", tags=["scene"])
async def get_scenes():
return hue.list_scenes()
@router.post(
"/room/{room_name}/scene/{scene_name}",
tags=["room", "scene"],
)
async def activate_scene(room_name: str, scene_name: str):
try:
hue.in_room_activate_scene(room_name, scene_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))
@router.post(
"/room/{room_name}/off",
tags=["room"],
)
async def deactivate_room(room_name: str):
try:
hue.in_room_deactivate_lights(room_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))
@router.post(
"/room/{room_name}/on",
tags=["room"],
)
async def activate_room(room_name: str):
try:
hue.in_room_activate_lights(room_name)
except Exception as e:
return HTMLResponse(status_code=400, content=str(e))

View file

@ -1,14 +1,17 @@
import asyncio
from fastapi import FastAPI
import uvicorn
from endpoints.handlers.bett import log_bed_weights
from endpoints.hue import router as hue_router
from endpoints.bettwaage import router as bettwaage_router
from endpoints.handlers.fritz import track_network_devices
app = FastAPI()
asyncio.create_task(track_network_devices())
asyncio.create_task(log_bed_weights())
app.include_router(hue_router, prefix="/hue", tags=["hue"])
app.include_router(bettwaage_router, prefix="/bettwaage", tags=["bett"])
if __name__ == "__main__":
app.run()
asyncio.run(lambda: uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True))

View file

@ -1,66 +0,0 @@
from mash.entities.entity import Entity
from fnmatch import fnmatch
class Group:
def __init__(self, *, entities: list[Entity] = []) -> None:
self.entities: list[Entity] = entities
def __len__(self):
return len(self.entities)
def __getitem__(self, id: str) -> "Group":
if type(id) is int:
raise "Numerical index not supported."
return self.id(id)
def __get_entities_with_specific_property__(
entities: list[Entity],
property_getter: callable[[Entity], str],
target_pattern: str,
) -> list[Entity]:
"""Returns all entities for which the property getter matches the desired pattern.
Args:
entities (list[Entity]): List of entities.
property_getter (callable[[Entity], str]): Takes one entity and returns the value of the filtered property.
target_pattern (str): Pattern that is matched against.
Returns:
list[Entity]: A new group of entities or an empty group, if no entity property matches the target pattern.
"""
return Group(
entities=[
e for e in entities if fnmatch(property_getter(e), target_pattern)
]
)
def device_type(self, device_type: str) -> "Group":
return Group.__get_entities_with_specific_property__(
self.entities, lambda e: e.device_type, device_type
)
def id(self, id: str) -> "Group":
return Group.__get_entities_with_specific_property__(
self.entities, lambda e: e.id, id
)
def room(self, room: str) -> "Group":
return Group.__get_entities_with_specific_property__(
self.entities, lambda e: e.room, room
)
def name(self, name: str) -> "Group":
return Group.__get_entities_with_specific_property__(
self.entities, lambda e: e.name, name
)
def lights(self) -> "Group":
return self.device_type("light")
def beds(self) -> "Group":
return self.device_type("bed")
def max(self) -> "Group":
return self.room("max")

View file

@ -1,9 +0,0 @@
from fastapi import FastAPI
class Feature:
def __init__(self, feature_id: str) -> None:
self.integration_id = feature_id
def add_routes(self, server: FastAPI) -> None:
pass

View file

@ -1,7 +0,0 @@
from mash.entities.entity import Entity
from mash.entities.group import Group
class Home(Group):
def __init__(self, *, entities: list[Entity] = []) -> None:
super().__init__(entities=entities)

View file

@ -1,26 +0,0 @@
import yaml
from fastapi import FastAPI
from mash.feature import Feature
class MaSH:
def __init__(self, config_path: str) -> None:
self.server: FastAPI = FastAPI()
self.config: dict = None
self._load_config_(config_path)
def _load_config_(self, config_path: str) -> None:
try:
with open(config_path, "r", encoding="UTF-8") as fp:
self.config = yaml.safe_load(fp)
except FileNotFoundError:
raise f"Config file for MaSH server could not be opened at [{config_path}]."
def add_integration(self, feature: Feature) -> None:
feature.add_routes(self.server)
self.server.include_router(feature.get_router())
def run(self):
self.server.run()

View file

@ -1,13 +0,0 @@
import asyncio
import requests as r
class MatrixClockAdapter:
def __init__(self, ip_address: str) -> None:
self.ip_address = ip_address.strip("/")
if not self.ip_address.startswith("http"):
self.ip_address = f"http://{self.ip_address}"
async def turn_off(self):
await asyncio.run(r.post(f"{self.ip_address}/off"))

View file

@ -1,6 +0,0 @@
from mash.feature import Feature
class MatrixClockIntegration(Feature):
def __init__(self) -> None:
super().__init__("matrixclock")

View file

@ -0,0 +1 @@
from .matrixclock_entity import MatrixClockEntity

View file

@ -0,0 +1,48 @@
from core.entity import Entity
import requests as r
class MatrixClockEntity(Entity):
def __init__(self, *, id: str, name: str, room: str, ip: str, port: int) -> None:
super().__init__(id=id, name=name, room=room, device_type="matrixclock")
self._ip = ip
self._port = port
self._base_path = ""
def __post_request__(self, endpoint: str, content: dict = {}) -> r.Response:
url: str = f"http://{self._ip}:{self._port}{self._base_path}/{endpoint}"
return r.post(url=url, json=content)
def display_off(self):
self.__post_request__("off")
def display_time(self):
self.__post_request__("time")
def display_full(self):
self.__post_request__("full")
def display_pattern(self, *, pattern: str = "01", step_ms: int = 500):
self.__post_request__(f"pattern?pattern={pattern}&step_ms={step_ms}")
def get_climate_and_show_temperature(self) -> dict[str, float]:
return self.__post_request__("temperature").json()
def get_climate_and_show_humidity(self) -> dict[str, float]:
return self.__post_request__("humidity").json()
def flash_display(self, *, count: int = 1, contrast: int = None):
path = f"flash?count={count}"
if contrast:
path += "&contrast={contrast}"
self.__post_request__(path)
def set_contrast(self, *, contrast: int):
self.__post_request__(f"contrast?contrast={contrast}")
def get_contrast(self) -> int:
return self.__post_request__("contrast").json()["contrast"]
def show_message(self, *, message: str):
self.__post_request__("message", {"message": message})

View file

@ -35,50 +35,36 @@ class Automation:
def decorator(func):
return func
return decorator
class PeopleCountEngineV1(Automation):
@Automation.trigger(
devices=["matrixclock"],
rule=lambda h: h.device("matrixclock").contrast == 6
devices=["matrixclock"], rule=lambda h: h.device("matrixclock").contrast == 6
)
def turn_light_on_sometimes(self, home: Home):
home.room("max").lights().on = True
@Automation.trigger(
people=["max"],
rule=lambda h: h.person("max").athome()
)
@Automation.trigger(people=["max"], rule=lambda h: h.person("max").athome())
def turn_light_on_sometimes(self, home: Home):
home.room("max").lights().on = h.person("max").athome()
@Automation.state(h.room("Max").lights())
def max_room_light():
if max.ishome():
return "off"
scene = "Daylight scene"
if nighttime:
scene = "nighttime"
if max.working:
scene.dim(0.5)
return scene
return scene
from mash.mash import MaSH

2
src/z2m/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from .zigbee2mqtt_bridge import Z2mBridge
from .entities.contact_sensor_z2m import ContactSensorZ2M

View file

@ -0,0 +1,5 @@
from core import Entity
class ContactSensorZ2M(Entity):
pass

View file

@ -0,0 +1,90 @@
import logging
from typing import Optional
from core.bridge import Bridge
import paho.mqtt.client as mqtt
import json
class Z2mBridge(Bridge):
def __init__(
self,
*,
id: str,
ip: str,
port: int = 1883,
keepalive: int = 60,
topic: str = "zigbee2mqtt",
) -> None:
"""
Args:
id (str): Unique identifier of this bridge instance.
ip (str): IP-Address of MQTT broker.
port (int, optional): Port of MQTT broker. Defaults to 1883.
keepalive (int, optional): MQTT keepalive delay in seconds. Defaults to 60.
topic (str, optional): Base topic for Zigbee2MQTT interface. Defaults to "zigbee2mqtt".
"""
super().__init__(id=id, type="zigbee2mqtt")
self._ip = ip
self._port = port
self._keepalive = keepalive
self._device_callbacks: dict[str, list] = {}
self._topic = topic.strip("/")
self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self._client.on_connect = lambda client, userdata, flags, reason_code, properties: self.__on_connect__(
client, userdata, flags, reason_code, properties
)
self._client.on_message = lambda client, userdata, msg: self.__on_message__(
client, userdata, msg
)
def disconnect(self) -> None:
self._client.loop_stop()
self._client.disconnect()
logging.info(f"Disconnected from Zigbee2MQTT broker [{self.id}].")
def connect(self) -> None:
self._client.connect(self._ip, self._port, self._keepalive)
self._client.loop_start()
logging.info(f"Connect to Zigbee2MQTT broker [{self.id}].")
@property
def is_connected(self) -> bool | None:
return self._client.is_connected()
def __on_connect__(self, client, userdata, flags, reason_code, properties):
self._client.subscribe(f"{self._topic}/#")
def __on_message__(self, client, userdata, msg: any):
device_name = msg.topic.split(self._topic + "/", 2)[-1].split("/", 2)[0]
if device_name not in self._device_callbacks.keys():
return
for callback in self._device_callbacks[device_name]:
callback(device_name, json.loads(msg.payload))
@Bridge.requires_connection
def set_device(self, ieee_address: str, *, content: dict = {}) -> None:
self._client.publish(f"{self._topic}/{ieee_address}/set", json.dumps(content))
@Bridge.requires_connection
def get_device(self, ieee_address: str) -> None:
self._client.publish(
f"{self._topic}/{ieee_address}/get", json.dumps({"state": ""})
)
def subscribe_device(
self,
callback,
*,
ieee_address: Optional[str] = None,
friendly_name: Optional[str] = None,
) -> None:
for id in [ieee_address, friendly_name]:
if id not in self._device_callbacks.keys():
self._device_callbacks[id] = []
self._device_callbacks[id].append(callback)