This commit is contained in:
Maximilian Giller 2023-10-08 01:58:47 +02:00
commit 4c535a58f1
29 changed files with 585 additions and 17 deletions

161
.gitignore vendored Normal file
View file

@ -0,0 +1,161 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.vscode/settings.json

19
mash.yaml Normal file
View file

@ -0,0 +1,19 @@
# An example config file for MaSH
services:
philips-hue:
ip: 192.168.178.42
lights:
- name: Hallway Ceiling
philips-hue: Hallway Ceiling
- name: Desk
philips-hue: Desk Plug
rooms:
- name: Office
philips-hue: Work Room
- name: Hallway
philips-hue: Hallways
adjacent:
- Office

View file

@ -1,6 +1,12 @@
smbus2 smbus2
vl53l1x vl53l1x
# To parse the config file
pyyaml
# To hot reload the config file
watchdog
# For Philips Hue Counter # For Philips Hue Counter
phue phue

View file

@ -0,0 +1,36 @@
import logging
from config.reloadtrigger import ReloadTrigger
try:
from watchdog.observers import Observer
from watchdog.events import (
FileSystemEventHandler,
FileCreatedEvent,
FileModifiedEvent,
)
class HotDogTrigger(ReloadTrigger, FileSystemEventHandler):
"""Trigger for config files. Might be unbound, if watchdog is not installed. Check if equal to None before use."""
def __init__(self, path: str):
super().__init__(path)
self.observer = Observer()
self.observer.schedule(self, path, recursive=True)
self.observer.start()
def __del__(self):
self.observer.stop()
self.observer.join()
def on_modified(self, event: FileModifiedEvent):
self.__trigger_reload()
def on_created(self, event: FileCreatedEvent):
self.__trigger_reload()
logging.debug("Watchdog imported successfully. HotDogTrigger available.")
except ImportError:
logging.info("Watchdog is not installed. HotDogTrigger unavailable.")

49
src/config/parser.py Normal file
View file

@ -0,0 +1,49 @@
import abc
from typing import Any
from models.home import Home
class ConfigParser:
def get_config(self) -> Home: # TODO: Check type
"""Load and parse the config."""
return self.__parse_config(self.__load_config())
@abc.abstractmethod
def __load_config(self) -> Any:
"""Load the config."""
raise NotImplementedError()
@abc.abstractmethod
def __parse_config(self, config) -> Home:
"""Parse the config."""
raise NotImplementedError()
class FileConfigParser(ConfigParser):
"""Scaffoling for parsers based on files."""
path: str
"""Path to the config file."""
def __init__(self, path: str):
self.path = path
class YAMLConfig(FileConfigParser):
"""Loads and parses config from a YAML file."""
def __init__(self, path: str):
super().__init__(path)
def __load_config(self) -> dict:
"""Loads config from a YAML file."""
import yaml
with open(self.path, "r") as file:
return yaml.safe_load(file)
def __parse_config(self, config: dict) -> Home:
"""Parses config in dict format. Usually based on JSON or YAML."""
# TODO: Implement
raise NotImplementedError()

View file

@ -0,0 +1,26 @@
class ReloadTrigger(object):
"""Abstract interface for config hot reloading."""
path: str
"""Path to the config file."""
callbacks: list
"""List of functions to callback on reload of config."""
def __init__(self, path: str):
"""Initialize the trigger.
Args:
path (str): Path to the config file.
"""
self.path = path
self.callbacks = []
def on_reload(self, callback):
"""Register a callback to be called when the config is reloaded."""
self.callbacks.append(callback)
def __trigger_reload(self):
"""Trigger a reload of the config."""
for callback in self.callbacks:
callback()

48
src/mash.py Normal file
View file

@ -0,0 +1,48 @@
from config.parser import ConfigParser
from config.reloadtrigger import ReloadTrigger
from models.home import Home
class MaSH:
"""Max' Smart Home. Smart home automation framework."""
config_parser: ConfigParser
"""Parser for the config file."""
config_trigger: list[ReloadTrigger]
"""Active triggers for reloading the config."""
home: Home
"""Home to control and make smart."""
def __init__(self, config_parser: ConfigParser) -> None:
"""Initialize the framework.
Args:
config_parser (ConfigParser): Parser for the config file.
"""
self.config_parser = config_parser
self.config_trigger = []
self.reload_config()
def reload_config(self) -> None:
"""Reload the config."""
self.home = self.config_parser.get_config() # TODO: Check type
def register_config_trigger(self, trigger: ReloadTrigger) -> bool:
"""Register a trigger for hot-reloading the config.
Args:
trigger (ReloadTrigger): Trigger for hot-reloading the config based on ReloadTrigger.
Returns:
bool: True if the trigger was registered, False if not.
"""
if trigger is None or trigger in self.config_trigger:
return False
trigger.on_reload(self.reload_config)
self.config_trigger.append(trigger)
return True

23
src/models/colors.py Normal file
View file

@ -0,0 +1,23 @@
class RGBColor:
"""Represents a color in the RGB color space."""
r: int
"""Red value of the color."""
g: int
"""Green value of the color."""
b: int
"""Blue value of the color."""
def __init__(self, r: int, g: int, b: int):
"""Initialize the color.
Args:
r (int): Red value of the color.
g (int): Green value of the color.
b (int): Blue value of the color.
"""
self.r = r
self.g = g
self.b = b

View file

@ -0,0 +1,9 @@
from models.devices.colordevices import *
from models.devices.switchdevices import *
from models.devices.genericdevices import *
class AllDevice(SetSwitchDevice, SetColorDevice):
"""Inherits all intefaces from all devices."""
pass

View file

@ -0,0 +1,25 @@
from models.colors import RGBColor
from models.devices.genericdevices import GenericDevice
class ColorDevice(GenericDevice):
"""Abstract device that has a color."""
_color: RGBColor
"""Color of device."""
class GetColorDevice(ColorDevice):
"""Implement color getter for a color device."""
@property
def color(self) -> RGBColor:
return self._color
class SetColorDevice(GetColorDevice):
"""Implements color setter for a color device."""
@GetColorDevice.color.setter
def color(self, color: RGBColor):
self._color = color

View file

@ -0,0 +1,5 @@
class GenericDevice:
"""A generic device."""
def __init__(self, name: str):
self.name = name

View file

@ -0,0 +1,43 @@
from models.devices.genericdevices import GenericDevice
class SwitchDevice(GenericDevice):
"""Abstract device that can be turned on and off."""
_is_on: bool
"""Current state of the device."""
class ToggleSwitchDevice(SwitchDevice):
"""Implements toggle functionality for a switch device."""
def toggle(self):
self._is_on = not self._is_on
class TurnOffSwitchDevice(SwitchDevice):
"""Implements turn off functionality for a switch device."""
def turn_off(self):
self._is_on = False
class TurnOnSwitchDevice(SwitchDevice):
"""Implements turn on functionality for a switch device."""
def turn_on(self):
self._is_on = True
class GetSwitchDevice(SwitchDevice):
"""Implements is_on state getter for a switch device."""
@property
def is_on(self) -> bool:
return self._is_on
class SetSwitchDevice(TurnOffSwitchDevice, TurnOnSwitchDevice, ToggleSwitchDevice):
@GetSwitchDevice.is_on.setter
def is_on(self, set_on: bool):
self._is_on = set_on

2
src/models/exceptions.py Normal file
View file

@ -0,0 +1,2 @@
class NoDeviceFoundError(Exception):
pass

71
src/models/groups.py Normal file
View file

@ -0,0 +1,71 @@
from models.helper import filter_devices
from models.devices import AllDevice
from models.colors import RGBColor
class DeviceGroup(AllDevice):
"""A group of devices that allows group operations. Inherits from a complex device to offer group operations for all devices."""
def __init__(self, devices: list[GenericDevice]):
self._devices = devices
@property
def devices(self) -> list[GenericDevice]:
return self._devices
@property
def switches(self) -> list[SwitchDevice]:
return filter_devices(self.devices, SwitchDevice)
@property
def lights(self) -> list[LightDevice]:
return filter_devices(self.devices, LightDevice)
@property
def is_on(self) -> bool:
"""Returns true if any device is on."""
return any(device.is_on for device in self.switches)
@is_on.setter
def is_on(self, is_on: bool):
"""Sets all devices to the same state."""
for device in self.switches:
device.is_on = is_on
@property
def color(self) -> LightColor:
"""Returns the color of the first light in the group."""
return self.lights[0].color
@color.setter
def color(self, color: LightColor):
"""Sets all lights in the group to the same color."""
for device in self.lights:
device.color = color
@property
def scene(self) -> LightScene:
"""Returns the current scene of the group."""
return LightScene(self.name, {device: device.color for device in self.lights})
class Room(DeviceGroup):
"""A group of devices that has additional properties a can be seen as a room."""
def __init__(self, name: str, devices: list[GenericDevice], people_count: int = 0):
"""
Args:
name (str): Name of the room.
devices (list[GenericDevice]): Devices in this room.
people_count (int, optional): Number of people in this room. Defaults to 0.
"""
super().__init__(devices)
self.name = name
self._people_count = people_count
@property
def people_count(self) -> int:
return self._people_count
def __str__(self):
return self.name

28
src/models/helper.py Normal file
View file

@ -0,0 +1,28 @@
from typing import TypeVar
from models.devices.genericdevices import GenericDevice, LightDevice, SwitchDevice
from models.exceptions import NoDeviceFoundError
from models.groups import DeviceGroup, Room
DEVICE_TYPE = TypeVar(
"DEVICE_TYPE",
type(GenericDevice),
type(SwitchDevice),
type(LightDevice),
type(Room),
type(DeviceGroup),
)
def filter_devices(
devices: list[GenericDevice], type: DEVICE_TYPE
) -> list[DEVICE_TYPE]:
"""Filters out devices that are not of a specific type."""
filtered_devices: list[DEVICE_TYPE] = [
device for device in devices if isinstance(device, type)
]
if len(filtered_devices) == 0:
raise NoDeviceFoundError(f"No devices of type {type} found.")
return filtered_devices

19
src/models/home.py Normal file
View file

@ -0,0 +1,19 @@
from models.groups import DeviceGroup, Room
from models.helper import filter_devices
class Home(Room):
"""Combines all elements of a smart home."""
location: str
"""Physical location of the home, useful for sunset and sunrise times."""
@property
def rooms(self) -> list[Room]:
"""Returns all rooms in the home."""
return filter_devices(self.devices, Room)
@property
def groups(self) -> list[DeviceGroup]:
"""Returns all groups in the home."""
return filter_devices(self.devices, DeviceGroup)

View file

@ -1,6 +1,6 @@
from datetime import datetime, time, timedelta from datetime import datetime, time, timedelta
from typing import Dict from typing import Dict
from services.philips_hue import PhilipsHue from old.philips_hue import PhilipsHue
from sensors import PeopleCounter, Directions, VL53L1XSensor from sensors import PeopleCounter, Directions, VL53L1XSensor
import logging import logging
import json import json

View file

@ -22,7 +22,8 @@ def parse_log_entry(entry: Dict) -> Dict | bool:
return False return False
entry["dateTime"] = datetime.strptime( entry["dateTime"] = datetime.strptime(
str(entry["dateTime"])[:19], "%Y-%m-%d %H:%M:%S") str(entry["dateTime"])[:19], "%Y-%m-%d %H:%M:%S"
)
if entry["dateTime"] < AFTER_DATE: if entry["dateTime"] < AFTER_DATE:
return False return False
@ -32,15 +33,15 @@ def parse_log_entry(entry: Dict) -> Dict | bool:
def is_last_in_sequence(entry: Dict) -> Boolean: def is_last_in_sequence(entry: Dict) -> Boolean:
indoor = entry["directionState"]["indoor"] indoor = entry["directionState"]["indoor"]
outdoor = entry["directionState"]["outdoor"] outdoor = entry["directionState"]["outdoor"]
if len(indoor) <= 0 or len(outdoor) <= 0: if len(indoor) <= 0 or len(outdoor) <= 0:
return False return False
end_key = "end_distance" end_key = "end_distance"
# Check version # Check version
if end_key not in indoor[-1]: if end_key not in indoor[-1]:
end_key = "end" end_key = "end"
if indoor[-1][end_key] is None or outdoor[-1][end_key] is None: if indoor[-1][end_key] is None or outdoor[-1][end_key] is None:
return False return False
@ -48,21 +49,21 @@ def is_last_in_sequence(entry: Dict) -> Boolean:
# Collect # Collect
log = [json.loads(line.strip("\x00")) for line in content] log: list[Dict] = [json.loads(line.strip("\x00")) for line in content]
print("Number of total entries:", len(log)) print("Number of total entries:", len(log))
# Parse & Filter # Parse & Filter
log = [parse_log_entry(entry) for entry in log if parse_log_entry(entry)] log = [parse_log_entry(entry) for entry in log if parse_log_entry(entry)] # type: ignore
print("Number of filtered entries:", len(log)) print("Number of filtered entries:", len(log))
# Render # Render
fig, ax = plt.subplots() # Create a figure containing a single axes. fig, ax = plt.subplots() # Create a figure containing a single axes.
times: list[datetime] = [entry["dateTime"] for entry in log] times: list[datetime] = [entry["dateTime"] for entry in log]
counts: list[int] = [entry["previousPeopleCount"] for entry in log] counts: list[int] = [entry["previousPeopleCount"] for entry in log]
ax.step(times, counts, where="pre") ax.step(times, counts, where="pre") # type: ignore
print("-"*20) print("-"*20)
plt.show() plt.show()
print("-"*20) print("-" * 20)
# Print stats # Print stats
@ -72,13 +73,12 @@ walk_unders = [entry for entry in log if entry["countChange"] == 0]
print("Number of walk-ins:", len(walk_ins)) print("Number of walk-ins:", len(walk_ins))
print("Number of walk-outs:", len(walk_outs)) print("Number of walk-outs:", len(walk_outs))
print("Number of walk-unders:", len(walk_unders)) print("Number of walk-unders:", len(walk_unders))
print("-"*20) print("-" * 20)
# Calculate faults # Calculate faults
for c, n in zip(list(range(len(log))), list(range(len(log)))[1:]): for c, n in zip(list(range(len(log))), list(range(len(log)))[1:]):
estimated_count: int = log[c]["previousPeopleCount"] + \ estimated_count: int = log[c]["previousPeopleCount"] + log[c]["countChange"]
log[c]["countChange"]
faulty: bool = estimated_count != log[n]["previousPeopleCount"] faulty: bool = estimated_count != log[n]["previousPeopleCount"]
log[c]["faulty"] = faulty log[c]["faulty"] = faulty
log[c]["faultyCount"] = log[c]["previousPeopleCount"] if faulty else None log[c]["faultyCount"] = log[c]["previousPeopleCount"] if faulty else None
@ -89,11 +89,9 @@ fault_percentage = fault_count / len(log)
print("Number of faults:", fault_count) print("Number of faults:", fault_count)
print("Percentage of faults:", fault_percentage * 100, "%") print("Percentage of faults:", fault_percentage * 100, "%")
print("-"*20) print("-" * 20)
faulty_off = [entry for entry in log if entry["faulty"] faulty_off = [entry for entry in log if entry["faulty"] and entry["faultyCount"] == 0]
and entry["faultyCount"] == 0] faulty_on = [entry for entry in log if entry["faulty"] and entry["faultyCount"] != 0]
faulty_on = [entry for entry in log if entry["faulty"]
and entry["faultyCount"] != 0]
print("Number of false-0:", len(faulty_off)) print("Number of false-0:", len(faulty_off))
print("Number of false-1:", len(faulty_on)) print("Number of false-1:", len(faulty_on))
print("Percentage of false-0:", len(faulty_off) / fault_count * 100, "%") print("Percentage of false-0:", len(faulty_off) / fault_count * 100, "%")