From b9cb12a2d1b311d5da73545dee959fe46d0f6937 Mon Sep 17 00:00:00 2001 From: Maximilian Giller Date: Sun, 10 Dec 2023 01:57:23 +0100 Subject: [PATCH] Heavily reduced complexity to start of new --- mash.yaml | 19 -- requirements.txt | 12 -- src/config/hotdogtrigger.py | 36 ---- src/config/parser.py | 49 ----- src/config/reloadtrigger.py | 26 --- src/main.py | 0 src/mash.py | 48 ----- src/models/colors.py | 23 --- src/models/devices/__init__.py | 9 - src/models/devices/colordevices.py | 25 --- src/models/devices/genericdevices.py | 5 - src/models/devices/switchdevices.py | 43 ---- src/models/exceptions.py | 2 - src/models/groups.py | 71 ------- src/models/helper.py | 28 --- src/models/home.py | 19 -- src/old/philips_hue_counter.py | 283 --------------------------- src/old/sensors/__init__.py | 3 - src/old/sensors/people_counter.py | 195 ------------------ src/old/sensors/tof_sensor.py | 31 --- src/old/sensors/vl53l1x_sensor.py | 62 ------ src/old/statistics/statistics.py | 112 ----------- src/old/timeloop/LICENSE | 21 -- src/old/timeloop/__init__.py | 1 - src/old/timeloop/app.py | 72 ------- src/old/timeloop/exceptions.py | 6 - src/old/timeloop/helpers.py | 5 - src/old/timeloop/job.py | 25 --- src/{old => }/philips_hue.py | 0 29 files changed, 1231 deletions(-) delete mode 100644 mash.yaml delete mode 100644 src/config/hotdogtrigger.py delete mode 100644 src/config/parser.py delete mode 100644 src/config/reloadtrigger.py create mode 100644 src/main.py delete mode 100644 src/mash.py delete mode 100644 src/models/colors.py delete mode 100644 src/models/devices/__init__.py delete mode 100644 src/models/devices/colordevices.py delete mode 100644 src/models/devices/genericdevices.py delete mode 100644 src/models/devices/switchdevices.py delete mode 100644 src/models/exceptions.py delete mode 100644 src/models/groups.py delete mode 100644 src/models/helper.py delete mode 100644 src/models/home.py delete mode 100644 src/old/philips_hue_counter.py delete mode 100644 src/old/sensors/__init__.py delete mode 100644 src/old/sensors/people_counter.py delete mode 100644 src/old/sensors/tof_sensor.py delete mode 100644 src/old/sensors/vl53l1x_sensor.py delete mode 100644 src/old/statistics/statistics.py delete mode 100644 src/old/timeloop/LICENSE delete mode 100644 src/old/timeloop/__init__.py delete mode 100644 src/old/timeloop/app.py delete mode 100644 src/old/timeloop/exceptions.py delete mode 100644 src/old/timeloop/helpers.py delete mode 100644 src/old/timeloop/job.py rename src/{old => }/philips_hue.py (100%) diff --git a/mash.yaml b/mash.yaml deleted file mode 100644 index fb2c2c2..0000000 --- a/mash.yaml +++ /dev/null @@ -1,19 +0,0 @@ -# An example config file for MaSH - -services: - philips-hue: - ip: 192.168.178.42 - -lights: - - name: Hallway Ceiling - philips-hue: Hallway Ceiling - - name: Desk - philips-hue: Desk Plug - -rooms: - - name: Office - philips-hue: Work Room - - name: Hallway - philips-hue: Hallways - adjacent: - - Office diff --git a/requirements.txt b/requirements.txt index 01ecf54..420ffe5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,2 @@ -smbus2 -vl53l1x - -# To parse the config file -pyyaml - -# To hot reload the config file -watchdog - # For Philips Hue Counter phue - -# For statistics -matplotlib diff --git a/src/config/hotdogtrigger.py b/src/config/hotdogtrigger.py deleted file mode 100644 index 86f8c80..0000000 --- a/src/config/hotdogtrigger.py +++ /dev/null @@ -1,36 +0,0 @@ -import logging - -from config.reloadtrigger import ReloadTrigger - -try: - from watchdog.observers import Observer - from watchdog.events import ( - FileSystemEventHandler, - FileCreatedEvent, - FileModifiedEvent, - ) - - class HotDogTrigger(ReloadTrigger, FileSystemEventHandler): - """Trigger for config files. Might be unbound, if watchdog is not installed. Check if equal to None before use.""" - - def __init__(self, path: str): - super().__init__(path) - self.observer = Observer() - self.observer.schedule(self, path, recursive=True) - self.observer.start() - - def __del__(self): - self.observer.stop() - self.observer.join() - - def on_modified(self, event: FileModifiedEvent): - self.__trigger_reload() - - def on_created(self, event: FileCreatedEvent): - self.__trigger_reload() - - logging.debug("Watchdog imported successfully. HotDogTrigger available.") - - -except ImportError: - logging.info("Watchdog is not installed. HotDogTrigger unavailable.") diff --git a/src/config/parser.py b/src/config/parser.py deleted file mode 100644 index 5bef0ea..0000000 --- a/src/config/parser.py +++ /dev/null @@ -1,49 +0,0 @@ -import abc -from typing import Any - -from models.home import Home - - -class ConfigParser: - def get_config(self) -> Home: # TODO: Check type - """Load and parse the config.""" - return self.__parse_config(self.__load_config()) - - @abc.abstractmethod - def __load_config(self) -> Any: - """Load the config.""" - raise NotImplementedError() - - @abc.abstractmethod - def __parse_config(self, config) -> Home: - """Parse the config.""" - raise NotImplementedError() - - -class FileConfigParser(ConfigParser): - """Scaffoling for parsers based on files.""" - - path: str - """Path to the config file.""" - - def __init__(self, path: str): - self.path = path - - -class YAMLConfig(FileConfigParser): - """Loads and parses config from a YAML file.""" - - def __init__(self, path: str): - super().__init__(path) - - def __load_config(self) -> dict: - """Loads config from a YAML file.""" - import yaml - - with open(self.path, "r") as file: - return yaml.safe_load(file) - - def __parse_config(self, config: dict) -> Home: - """Parses config in dict format. Usually based on JSON or YAML.""" - # TODO: Implement - raise NotImplementedError() diff --git a/src/config/reloadtrigger.py b/src/config/reloadtrigger.py deleted file mode 100644 index c0129a0..0000000 --- a/src/config/reloadtrigger.py +++ /dev/null @@ -1,26 +0,0 @@ -class ReloadTrigger(object): - """Abstract interface for config hot reloading.""" - - path: str - """Path to the config file.""" - - callbacks: list - """List of functions to callback on reload of config.""" - - def __init__(self, path: str): - """Initialize the trigger. - - Args: - path (str): Path to the config file. - """ - self.path = path - self.callbacks = [] - - def on_reload(self, callback): - """Register a callback to be called when the config is reloaded.""" - self.callbacks.append(callback) - - def __trigger_reload(self): - """Trigger a reload of the config.""" - for callback in self.callbacks: - callback() diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mash.py b/src/mash.py deleted file mode 100644 index 1a7f995..0000000 --- a/src/mash.py +++ /dev/null @@ -1,48 +0,0 @@ -from config.parser import ConfigParser -from config.reloadtrigger import ReloadTrigger -from models.home import Home - - -class MaSH: - """Max' Smart Home. Smart home automation framework.""" - - config_parser: ConfigParser - """Parser for the config file.""" - - config_trigger: list[ReloadTrigger] - """Active triggers for reloading the config.""" - - home: Home - """Home to control and make smart.""" - - def __init__(self, config_parser: ConfigParser) -> None: - """Initialize the framework. - - Args: - config_parser (ConfigParser): Parser for the config file. - """ - self.config_parser = config_parser - self.config_trigger = [] - - self.reload_config() - - def reload_config(self) -> None: - """Reload the config.""" - self.home = self.config_parser.get_config() # TODO: Check type - - def register_config_trigger(self, trigger: ReloadTrigger) -> bool: - """Register a trigger for hot-reloading the config. - - Args: - trigger (ReloadTrigger): Trigger for hot-reloading the config based on ReloadTrigger. - - Returns: - bool: True if the trigger was registered, False if not. - """ - if trigger is None or trigger in self.config_trigger: - return False - - trigger.on_reload(self.reload_config) - self.config_trigger.append(trigger) - - return True diff --git a/src/models/colors.py b/src/models/colors.py deleted file mode 100644 index 34a7d8a..0000000 --- a/src/models/colors.py +++ /dev/null @@ -1,23 +0,0 @@ -class RGBColor: - """Represents a color in the RGB color space.""" - - r: int - """Red value of the color.""" - - g: int - """Green value of the color.""" - - b: int - """Blue value of the color.""" - - def __init__(self, r: int, g: int, b: int): - """Initialize the color. - - Args: - r (int): Red value of the color. - g (int): Green value of the color. - b (int): Blue value of the color. - """ - self.r = r - self.g = g - self.b = b diff --git a/src/models/devices/__init__.py b/src/models/devices/__init__.py deleted file mode 100644 index 2ab87ac..0000000 --- a/src/models/devices/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from models.devices.colordevices import * -from models.devices.switchdevices import * -from models.devices.genericdevices import * - - -class AllDevice(SetSwitchDevice, SetColorDevice): - """Inherits all intefaces from all devices.""" - - pass diff --git a/src/models/devices/colordevices.py b/src/models/devices/colordevices.py deleted file mode 100644 index dd79539..0000000 --- a/src/models/devices/colordevices.py +++ /dev/null @@ -1,25 +0,0 @@ -from models.colors import RGBColor -from models.devices.genericdevices import GenericDevice - - -class ColorDevice(GenericDevice): - """Abstract device that has a color.""" - - _color: RGBColor - """Color of device.""" - - -class GetColorDevice(ColorDevice): - """Implement color getter for a color device.""" - - @property - def color(self) -> RGBColor: - return self._color - - -class SetColorDevice(GetColorDevice): - """Implements color setter for a color device.""" - - @GetColorDevice.color.setter - def color(self, color: RGBColor): - self._color = color diff --git a/src/models/devices/genericdevices.py b/src/models/devices/genericdevices.py deleted file mode 100644 index a8eca85..0000000 --- a/src/models/devices/genericdevices.py +++ /dev/null @@ -1,5 +0,0 @@ -class GenericDevice: - """A generic device.""" - - def __init__(self, name: str): - self.name = name diff --git a/src/models/devices/switchdevices.py b/src/models/devices/switchdevices.py deleted file mode 100644 index 73b25df..0000000 --- a/src/models/devices/switchdevices.py +++ /dev/null @@ -1,43 +0,0 @@ -from models.devices.genericdevices import GenericDevice - - -class SwitchDevice(GenericDevice): - """Abstract device that can be turned on and off.""" - - _is_on: bool - """Current state of the device.""" - - -class ToggleSwitchDevice(SwitchDevice): - """Implements toggle functionality for a switch device.""" - - def toggle(self): - self._is_on = not self._is_on - - -class TurnOffSwitchDevice(SwitchDevice): - """Implements turn off functionality for a switch device.""" - - def turn_off(self): - self._is_on = False - - -class TurnOnSwitchDevice(SwitchDevice): - """Implements turn on functionality for a switch device.""" - - def turn_on(self): - self._is_on = True - - -class GetSwitchDevice(SwitchDevice): - """Implements is_on state getter for a switch device.""" - - @property - def is_on(self) -> bool: - return self._is_on - - -class SetSwitchDevice(TurnOffSwitchDevice, TurnOnSwitchDevice, ToggleSwitchDevice): - @GetSwitchDevice.is_on.setter - def is_on(self, set_on: bool): - self._is_on = set_on diff --git a/src/models/exceptions.py b/src/models/exceptions.py deleted file mode 100644 index afc2cbd..0000000 --- a/src/models/exceptions.py +++ /dev/null @@ -1,2 +0,0 @@ -class NoDeviceFoundError(Exception): - pass diff --git a/src/models/groups.py b/src/models/groups.py deleted file mode 100644 index 3439793..0000000 --- a/src/models/groups.py +++ /dev/null @@ -1,71 +0,0 @@ -from models.helper import filter_devices -from models.devices import AllDevice -from models.colors import RGBColor - - -class DeviceGroup(AllDevice): - """A group of devices that allows group operations. Inherits from a complex device to offer group operations for all devices.""" - - def __init__(self, devices: list[GenericDevice]): - self._devices = devices - - @property - def devices(self) -> list[GenericDevice]: - return self._devices - - @property - def switches(self) -> list[SwitchDevice]: - return filter_devices(self.devices, SwitchDevice) - - @property - def lights(self) -> list[LightDevice]: - return filter_devices(self.devices, LightDevice) - - @property - def is_on(self) -> bool: - """Returns true if any device is on.""" - return any(device.is_on for device in self.switches) - - @is_on.setter - def is_on(self, is_on: bool): - """Sets all devices to the same state.""" - for device in self.switches: - device.is_on = is_on - - @property - def color(self) -> LightColor: - """Returns the color of the first light in the group.""" - return self.lights[0].color - - @color.setter - def color(self, color: LightColor): - """Sets all lights in the group to the same color.""" - for device in self.lights: - device.color = color - - @property - def scene(self) -> LightScene: - """Returns the current scene of the group.""" - return LightScene(self.name, {device: device.color for device in self.lights}) - - -class Room(DeviceGroup): - """A group of devices that has additional properties a can be seen as a room.""" - - def __init__(self, name: str, devices: list[GenericDevice], people_count: int = 0): - """ - Args: - name (str): Name of the room. - devices (list[GenericDevice]): Devices in this room. - people_count (int, optional): Number of people in this room. Defaults to 0. - """ - super().__init__(devices) - self.name = name - self._people_count = people_count - - @property - def people_count(self) -> int: - return self._people_count - - def __str__(self): - return self.name diff --git a/src/models/helper.py b/src/models/helper.py deleted file mode 100644 index 9ccbafe..0000000 --- a/src/models/helper.py +++ /dev/null @@ -1,28 +0,0 @@ -from typing import TypeVar -from models.devices.genericdevices import GenericDevice, LightDevice, SwitchDevice -from models.exceptions import NoDeviceFoundError -from models.groups import DeviceGroup, Room - - -DEVICE_TYPE = TypeVar( - "DEVICE_TYPE", - type(GenericDevice), - type(SwitchDevice), - type(LightDevice), - type(Room), - type(DeviceGroup), -) - - -def filter_devices( - devices: list[GenericDevice], type: DEVICE_TYPE -) -> list[DEVICE_TYPE]: - """Filters out devices that are not of a specific type.""" - filtered_devices: list[DEVICE_TYPE] = [ - device for device in devices if isinstance(device, type) - ] - - if len(filtered_devices) == 0: - raise NoDeviceFoundError(f"No devices of type {type} found.") - - return filtered_devices diff --git a/src/models/home.py b/src/models/home.py deleted file mode 100644 index 9eb92a6..0000000 --- a/src/models/home.py +++ /dev/null @@ -1,19 +0,0 @@ -from models.groups import DeviceGroup, Room -from models.helper import filter_devices - - -class Home(Room): - """Combines all elements of a smart home.""" - - location: str - """Physical location of the home, useful for sunset and sunrise times.""" - - @property - def rooms(self) -> list[Room]: - """Returns all rooms in the home.""" - return filter_devices(self.devices, Room) - - @property - def groups(self) -> list[DeviceGroup]: - """Returns all groups in the home.""" - return filter_devices(self.devices, DeviceGroup) diff --git a/src/old/philips_hue_counter.py b/src/old/philips_hue_counter.py deleted file mode 100644 index db30d0e..0000000 --- a/src/old/philips_hue_counter.py +++ /dev/null @@ -1,283 +0,0 @@ -from datetime import datetime, time, timedelta -from typing import Dict -from old.philips_hue import PhilipsHue -from sensors import PeopleCounter, Directions, VL53L1XSensor -import logging -import json -from timeloop import Timeloop - - -# Should lights already turn on where there is any kind of motion in the sensor -ENABLE_MOTION_TRIGGERED_LIGHT = True - -# Should lights change when a certain time in the schedule is reached -ENABLE_SCHEDULE_TRIGGERS = ( - False # Not working correctly at the moment, so turned off by default -) - -# Schedule (Key is time after scene should be used. Value is scene name to be used.) -# Needs to be sorted chronologically -SCHEDULE = {} - - -LOG_FILE_PATH = "log.txt" # Path for logs -hue_conf = { - "bridge_ip": "", - "transition_time": 10, # seconds - "light_group": "", - # If file exists, application is considered 'registered' at the bridge - "registered_file": "smart_light_registered.bridge", -} # Custom configuration for philips hue - - -hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface -counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object -peopleCount: int = 0 # Global count of people on the inside -motion_triggered_lights = False # Is light on because of any detected motion -timeloop: Timeloop = Timeloop() # Used for time triggered schedule - -logging.getLogger().setLevel(logging.INFO) - - -def time_minus_time(time_a: time, time_b: time) -> timedelta: - """Implementes a basic timedelta function for time objects. - - Args: - time_a (time): Time to subtract from. - time_b (time): Time to be subtracted. - - Returns: - timedelta: Delta between the two time objects. - """ - today = datetime.today() - dt_a = datetime.combine(today, time_a) - dt_b = datetime.combine(today, time_b) - - return dt_a - dt_b - - -def get_scene_for_time(time: time) -> str: - """Determines the correct scene to activate for a given time. - - Args: - time (time): Time to find scene for. - - Returns: - string: Scene name that should be active. None, if schedule is empty. - """ - global SCHEDULE - - if SCHEDULE is None or len(SCHEDULE) <= 0: - return None - - previous_scene = None - for start_time, scene in SCHEDULE.items(): - # If current time is still after schedule time, just keep going - if start_time <= time: - previous_scene = scene - continue - - # Schedule timef is now after current time, which is too late - # So if exists, take previous scene, since it was the last before the current time - if previous_scene: - return previous_scene - else: - break - - # Only breaks if it could not find a valid scene, so use lates scene as fallback - return list(SCHEDULE.values())[-1] - - -def change_cb(countChange: int, directionState: Dict): - """Handles basic logging of event data for later analysis. - - Args: - countChange (int): The change in the number of people. Usually on of [-1, 0, 1]. - directionState (Dict): Object describing the internal state of the sensor. - """ - data = { - "version": "v0.0", - "previousPeopleCount": peopleCount, - "countChange": countChange, - "directionState": directionState, - "dateTime": datetime.now(), - "motionTriggeredLights": motion_triggered_lights, - } - - try: - with open(LOG_FILE_PATH, "a") as f: - f.write(json.dumps(data, default=str) + "\n") - except Exception as ex: - logging.exception(f"Unable to write log file. {ex}") - - -def count_change(change: int) -> None: - """Handles light state when people count changes - - Args: - change (int): The change in the number of people. Usually on of [-1, 0, 1]. - """ - global hue - global peopleCount - global motion_triggered_lights - - # Are lights on at the moment? - previous_lights_state = get_light_state() - - # Apply correction - if peopleCount <= 0 and previous_lights_state and not motion_triggered_lights: - # Count was 0, but lights were on (not because of motion triggers) => people count was not actually 0 - peopleCount = 1 - logging.debug(f"People count corrected to {peopleCount}") - elif peopleCount > 0 and not previous_lights_state: - # Count was >0, but lights were off => people count was actually 0 - peopleCount = 0 - logging.debug(f"People count corrected to {peopleCount}") - - peopleCount += change - if peopleCount < 0: - peopleCount = 0 - logging.debug(f"People count changed by {change}") - - # Handle light - target_light_state = peopleCount > 0 - - # Return, if there is no change - if previous_lights_state == target_light_state: - if previous_lights_state: - # Signaling that the people count is taking control over the light now - motion_triggered_lights = False - return - - set_light_state(target_light_state) - - -def trigger_change(triggerState: Dict): - """Handles motion triggered light state. - - Args: - triggerState (Dict): Describing in what directions the sensor is triggerd. - """ - global hue - global motion_triggered_lights - - target_light_state = None - - # Is someone walking close to the door? - motion_detected = ( - triggerState[Directions.INSIDE] or triggerState[Directions.OUTSIDE] - ) - target_light_state = motion_detected - - # Does motion triggered light need to do anything? - if peopleCount > 0: - # State is successfully handled by the count - motion_triggered_lights = False - return - - # Only look at changing situations - if target_light_state == motion_triggered_lights: - return - - set_light_state(target_light_state) - - # Save state - motion_triggered_lights = target_light_state - - -def set_light_scene(target_scene: str) -> bool: - """Sets the lights to the given scene, but only, if lights are already on. Does not correct count if lights are in an unexpected state. - - Args: - target_scene (string): Name of the scene to activate. - """ - # Is valid scene? - if target_scene is None: - return - - # Are lights on at the moment? Only based on people count for simplicity - if peopleCount <= 0: - # Lights are probably off, not doing anything - return - - # Set lights to scene - hue.set_group_scene(hue_conf["light_group"], target_scene) - logging.debug(f"Light scene set to {target_scene}") - - -def set_light_state(target_light_state: bool) -> bool: - """Sets the lights to the given state. - - Args: - target_light_state (bool): Should lights on the inside be on or off. - - Returns: - bool: Previous light state. - """ - # Are lights on at the moment? - previous_lights_state = get_light_state() - if target_light_state == previous_lights_state: - return previous_lights_state - - # Adjust light as necessary - target_scene = get_scene_for_time(datetime.now().time()) - if target_light_state and target_scene: - # Set to specific scene if exists - hue.set_group_scene(hue_conf["light_group"], target_scene) - logging.debug( - f"Light state changed to {target_light_state} with scene {target_scene}" - ) - else: - hue.set_group(hue_conf["light_group"], {"on": target_light_state}) - logging.debug(f"Light state changed to {target_light_state}") - - return previous_lights_state - - -def get_light_state() -> bool: - """ - Returns: - bool: Current light state. - """ - return hue.get_group(hue_conf["light_group"])["state"]["any_on"] - - -def update_scene(): - """Called by time trigger to update light scene if lights are on.""" - scene = get_scene_for_time(datetime.now().time()) - - if scene is None: - return - - set_light_scene(scene) - logging.debug(f"Updated scene at {datetime.now().time()} to {scene}.") - - -def register_time_triggers(): - """Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on.""" - global SCHEDULE - if SCHEDULE is None or len(SCHEDULE) <= 0: - return - - for time in SCHEDULE.keys(): - delta = time_minus_time(time, datetime.now().time()) - if delta < timedelta(0): - delta += timedelta(1) - - timeloop._add_job(update_scene, interval=timedelta(1), offset=delta) - - timeloop.start(block=False) - - logging.info("Registered time triggers.") - - -if __name__ == "__main__": - if ENABLE_SCHEDULE_TRIGGERS: - register_time_triggers() - - # Represents callback trigger order - counter.hookChange(change_cb) - counter.hookCounting(count_change) - counter.hookTrigger(trigger_change) - - counter.run() diff --git a/src/old/sensors/__init__.py b/src/old/sensors/__init__.py deleted file mode 100644 index b581f29..0000000 --- a/src/old/sensors/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from sensors.vl53l1x_sensor import VL53L1XSensor -from sensors.people_counter import PeopleCounter -from sensors.tof_sensor import Directions diff --git a/src/old/sensors/people_counter.py b/src/old/sensors/people_counter.py deleted file mode 100644 index 502b436..0000000 --- a/src/old/sensors/people_counter.py +++ /dev/null @@ -1,195 +0,0 @@ -from sensors.tof_sensor import ToFSensor, Directions -from datetime import datetime -import threading - - -COUNTING_CB = "counting" -TRIGGER_CB = "trigger" -CHANGE_CB = "changes" -START_TIME = "start_time" -END_TIME = "end_time" -TRIGGER_DISTANCES = "trigger_distances" -END_DISTANCE = "end_distance" - - -class PeopleCounter: - def __init__(self, sensor: ToFSensor) -> None: - self.sensor = sensor - self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []} - self.maxTriggerDistance = 120 # In cm - - def hookCounting(self, cb) -> None: - self.callbacks[COUNTING_CB].append(cb) - - def unhookCounting(self, cb) -> None: - self.callbacks[COUNTING_CB].remove(cb) - - def hookTrigger(self, cb) -> None: - self.callbacks[TRIGGER_CB].append(cb) - - def unhookTrigger(self, cb) -> None: - self.callbacks[TRIGGER_CB].remove(cb) - - def hookChange(self, cb) -> None: - self.callbacks[CHANGE_CB].append(cb) - - def unhookChange(self, cb) -> None: - self.callbacks[CHANGE_CB].remove(cb) - - def getInitialDirectionState(self) -> dict[Directions, list]: - return {Directions.INSIDE: [], Directions.OUTSIDE: []} - - def run(self) -> None: - self.keepRunning = True - direction = Directions.INSIDE - self.directionState = self.getInitialDirectionState() - - self.sensor.open() - while self.keepRunning: - # Switch to other direction - direction: Directions = Directions.other(direction) - - self.sensor.setDirection(direction) - - distance: float = self.sensor.getDistance() - changed: bool = self.updateState(direction, distance) - - if changed: - countChange: int = self.getCountChange(self.directionState) - - # Hooks - th = threading.Thread(target=self.handleCallbacks, args=(countChange,)) - th.start() - - # Reset state if state is finalised - if not self.isDirectionTriggered( - Directions.INSIDE - ) and not self.isDirectionTriggered(Directions.OUTSIDE): - self.directionState = self.getInitialDirectionState() - - self.sensor.close() - - def getCountChange(self, directionState) -> int: - # Is valid? - for direction in Directions: - # Is there at least one record for every direction? - if len(directionState[direction]) <= 0: - return 0 - - # Did every record start and end? - if ( - directionState[direction][0][START_TIME] is None - or directionState[direction][-1][END_TIME] is None - ): - return 0 # Return no change if not valid - - # Get times into variables - insideStart = directionState[Directions.INSIDE][0][START_TIME] - insideEnd = directionState[Directions.INSIDE][-1][END_TIME] - outsideStart = directionState[Directions.OUTSIDE][0][START_TIME] - outsideEnd = directionState[Directions.OUTSIDE][-1][END_TIME] - - # In what direction is the doorframe entered and left? - # Entering doorframe in the inside direction - enteringInside: bool = outsideStart < insideStart - # Leaving dooframe in the inside direction - leavingInside: bool = outsideEnd < insideEnd - - # They have to be the same, otherwise they switch directions in between - if enteringInside != leavingInside: - # Someone did not go all the way - # Either - # Inside -######- - # Outside ---##--- - # or - # Inside ---##--- - # Outside -######- - return 0 - - # Are those times overlapping or disjunct? - if insideEnd < outsideStart or outsideEnd < insideStart: - # They are disjunct - # Either - # Inside -##----- - # Outside -----##- - # or - # Inside -----##- - # Outside -##----- - return 0 - - # What direction is the person taking? - if enteringInside: - # Entering the inside - # Inside ---####- - # Outside -####--- - return 1 - else: - # Leaving the inside - # Inside -####--- - # Outside ---####- - return -1 - - def isTriggerDistance(self, distance: float) -> bool: - #! TODO: Should be based on the distance from the ground, not from the sensor - return distance <= self.maxTriggerDistance - - def handleCallbacks(self, countChange: int): - self.handleChangeCallbacks(countChange) - self.handleCountingCallbacks(countChange) - self.handleTriggerCallbacks() - - def handleCountingCallbacks(self, countChange: int) -> None: - # Only notify counting on actual count change - if countChange == 0: - return - - for cb in self.callbacks[COUNTING_CB]: - cb(countChange) - - def handleTriggerCallbacks(self) -> None: - triggerState = { - Directions.INSIDE: self.isDirectionTriggered(Directions.INSIDE), - Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE), - } - - for cb in self.callbacks[TRIGGER_CB]: - cb(triggerState) - - def handleChangeCallbacks(self, countChange: int) -> None: - for cb in self.callbacks[CHANGE_CB]: - cb(countChange, self.directionState) - - def isDirectionTriggered(self, direction: Directions) -> bool: - return ( - len(self.directionState[direction]) > 0 - and self.directionState[direction][-1][END_TIME] is None - ) - - def updateState(self, direction: Directions, distance: float) -> bool: - triggered: bool = self.isTriggerDistance(distance) - - previouslyTriggered = False - if len(self.directionState[direction]) > 0: - previouslyTriggered = self.directionState[direction][-1][END_TIME] is None - - if triggered and not previouslyTriggered: - # Set as new beginning for this direction - self.directionState[direction].append( - { - START_TIME: datetime.now(), - END_TIME: None, - TRIGGER_DISTANCES: [distance], - END_DISTANCE: None, - } - ) - return True - elif not triggered and previouslyTriggered: - # Set as end for this direction - self.directionState[direction][-1][END_TIME] = datetime.now() - self.directionState[direction][-1][END_DISTANCE] = distance - return True - elif previouslyTriggered: - # Add distance at least - self.directionState[direction][-1][TRIGGER_DISTANCES].append(distance) - - return False diff --git a/src/old/sensors/tof_sensor.py b/src/old/sensors/tof_sensor.py deleted file mode 100644 index f16389b..0000000 --- a/src/old/sensors/tof_sensor.py +++ /dev/null @@ -1,31 +0,0 @@ -from enum import Enum - - -class Directions(str, Enum): - INSIDE = "indoor" - OUTSIDE = "outdoor" - - def other(direction: "Directions") -> "Directions": - if direction is Directions.INSIDE: - return Directions.OUTSIDE - else: - return Directions.INSIDE - - def __iter__(): - return [Directions.INSIDE, Directions.OUTSIDE] - - -class ToFSensor: - def open(self) -> None: - raise NotImplementedError() - - def setDirection(self, direction: Directions) -> None: - """Configure sensor to pick up the distance in a specific direction.""" - raise NotImplementedError() - - def getDistance(self) -> float: - """Returns new distance in cm.""" - raise NotImplementedError() - - def close(self) -> None: - raise NotImplementedError() diff --git a/src/old/sensors/vl53l1x_sensor.py b/src/old/sensors/vl53l1x_sensor.py deleted file mode 100644 index 487d724..0000000 --- a/src/old/sensors/vl53l1x_sensor.py +++ /dev/null @@ -1,62 +0,0 @@ -from sensors.tof_sensor import Directions, ToFSensor -import VL53L1X - -# Reference: https://github.com/pimoroni/vl53l1x-python -# -# Left, right, top and bottom are relative to the SPAD matrix coordinates, -# which will be mirrored in real scene coordinates. -# (or even rotated, depending on the VM53L1X element alignment on the board and on the board position) -# -# ROI in SPAD matrix coords: -# -# 15 top-left -# | X____ -# | | | -# | |____X -# | bottom-right -# 0__________15 -# - - -class VL53L1XSensor(ToFSensor): - def __init__(self) -> None: - super().__init__() - - def open(self) -> None: - self.sensor = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29) - self.sensor.open() - - # Optionally set an explicit timing budget - # These values are measurement time in microseconds, - # and inter-measurement time in milliseconds. - # If you uncomment the line below to set a budget you - # should use `tof.start_ranging(0)` - # tof.set_timing(66000, 70) - self.ranging = 2 - # 0 = Unchanged - # 1 = Short Range - # 2 = Medium Range - # 3 = Long Range - - def setDirection(self, direction: Directions) -> None: - """Configure sensor to pick up the distance in a specific direction.""" - direction_roi = { - Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0), - Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12), - } - - roi = direction_roi[direction] - - self.sensor.stop_ranging() - self.sensor.set_user_roi(roi) - self.sensor.start_ranging(self.ranging) - - def getDistance(self) -> float: - """Returns new distance in cm.""" - distance = self.sensor.get_distance() - - return distance / 10 - - def close(self) -> None: - self.sensor.stop_ranging() - self.sensor.close() diff --git a/src/old/statistics/statistics.py b/src/old/statistics/statistics.py deleted file mode 100644 index c9e63de..0000000 --- a/src/old/statistics/statistics.py +++ /dev/null @@ -1,112 +0,0 @@ -from datetime import datetime -import json -from typing import Dict -from xmlrpc.client import Boolean - -import matplotlib.pyplot as plt - -# Config -FILE_PATH = "log.txt" -AFTER_DATE = datetime(2022, 1, 1) # Only keeps log entries after the specified date. Filter applied in parse_log_entry(..) - - -# Read file -content = None -with open(FILE_PATH, "r") as file: - content = file.readlines() - - -def parse_log_entry(entry: Dict) -> Dict | bool: - # Only keep last record of a sequence - if not is_last_in_sequence(entry): - return False - - entry["dateTime"] = datetime.strptime( - str(entry["dateTime"])[:19], "%Y-%m-%d %H:%M:%S" - ) - if entry["dateTime"] < AFTER_DATE: - return False - - return entry - - -def is_last_in_sequence(entry: Dict) -> Boolean: - indoor = entry["directionState"]["indoor"] - outdoor = entry["directionState"]["outdoor"] - - if len(indoor) <= 0 or len(outdoor) <= 0: - return False - - end_key = "end_distance" - # Check version - if end_key not in indoor[-1]: - end_key = "end" - - if indoor[-1][end_key] is None or outdoor[-1][end_key] is None: - return False - - return True - - -# Collect -log: list[Dict] = [json.loads(line.strip("\x00")) for line in content] -print("Number of total entries:", len(log)) - -# Parse & Filter -log = [parse_log_entry(entry) for entry in log if parse_log_entry(entry)] # type: ignore -print("Number of filtered entries:", len(log)) - -# Render -fig, ax = plt.subplots() # Create a figure containing a single axes. -times: list[datetime] = [entry["dateTime"] for entry in log] -counts: list[int] = [entry["previousPeopleCount"] for entry in log] -ax.step(times, counts, where="pre") # type: ignore -print("-"*20) -plt.show() -print("-" * 20) - - -# Print stats -walk_ins = [entry for entry in log if entry["countChange"] > 0] -walk_outs = [entry for entry in log if entry["countChange"] < 0] -walk_unders = [entry for entry in log if entry["countChange"] == 0] -print("Number of walk-ins:", len(walk_ins)) -print("Number of walk-outs:", len(walk_outs)) -print("Number of walk-unders:", len(walk_unders)) -print("-" * 20) - -# Calculate faults - -for c, n in zip(list(range(len(log))), list(range(len(log)))[1:]): - estimated_count: int = log[c]["previousPeopleCount"] + log[c]["countChange"] - faulty: bool = estimated_count != log[n]["previousPeopleCount"] - log[c]["faulty"] = faulty - log[c]["faultyCount"] = log[c]["previousPeopleCount"] if faulty else None - -log = log[:-1] -fault_count = sum(1 for entry in log if entry["faulty"]) -fault_percentage = fault_count / len(log) -print("Number of faults:", fault_count) -print("Percentage of faults:", fault_percentage * 100, "%") - -print("-" * 20) -faulty_off = [entry for entry in log if entry["faulty"] and entry["faultyCount"] == 0] -faulty_on = [entry for entry in log if entry["faulty"] and entry["faultyCount"] != 0] -print("Number of false-0:", len(faulty_off)) -print("Number of false-1:", len(faulty_on)) -print("Percentage of false-0:", len(faulty_off) / fault_count * 100, "%") -print("Percentage of false-1:", len(faulty_on) / fault_count * 100, "%") - -# Number of dates -unique_dates = set() -for entry in log: - date = entry["dateTime"].strftime("%Y-%m-%d") - unique_dates.add(date) -total_days = len(unique_dates) - -print("-"*20) -print("Number of days:", total_days) -print("Changes per day:", len(log) / total_days) - -corrections_per_day = len(log) / total_days * fault_percentage -print("Corrections per day:", corrections_per_day) diff --git a/src/old/timeloop/LICENSE b/src/old/timeloop/LICENSE deleted file mode 100644 index 30302f4..0000000 --- a/src/old/timeloop/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2018 sankalpjonn - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/src/old/timeloop/__init__.py b/src/old/timeloop/__init__.py deleted file mode 100644 index 8d95c4d..0000000 --- a/src/old/timeloop/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from timeloop.app import Timeloop diff --git a/src/old/timeloop/app.py b/src/old/timeloop/app.py deleted file mode 100644 index 4575d69..0000000 --- a/src/old/timeloop/app.py +++ /dev/null @@ -1,72 +0,0 @@ -from datetime import datetime, timedelta -import logging -import sys -import signal -import time - -from timeloop.exceptions import ServiceExit -from timeloop.job import Job -from timeloop.helpers import service_shutdown - - -class Timeloop(): - def __init__(self): - self.jobs = [] - logger = logging.getLogger('timeloop') - ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.INFO) - formatter = logging.Formatter('[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s') - ch.setFormatter(formatter) - logger.addHandler(ch) - logger.setLevel(logging.INFO) - self.logger = logger - - def _add_job(self, func, interval: timedelta, offset: timedelta=None, *args, **kwargs): - j = Job(interval, func, offset=offset, *args, **kwargs) - self.jobs.append(j) - - def _block_main_thread(self): - signal.signal(signal.SIGTERM, service_shutdown) - signal.signal(signal.SIGINT, service_shutdown) - - while True: - try: - time.sleep(1) - except ServiceExit: - self.stop() - break - - def _start_jobs(self, block): - for j in self.jobs: - j.daemon = not block - j.start() - self.logger.info("Registered job {}".format(j.execute)) - - def _stop_jobs(self): - for j in self.jobs: - self.logger.info("Stopping job {}".format(j.execute)) - j.stop() - - def job(self, interval: timedelta, offset: timedelta=None): - """Decorator to define a timeloop for the decorated function. - - Args: - interval (timedelta): How long to wait after every execution until the next one. - offset (timedelta, optional): Positive offset until the first execution of the function. If None, will wait with first execution until the first interval passed. If timedelta with length 0 (or smaller) will execute immediately. Defaults to None. - """ - def decorator(f): - self._add_job(f, interval, offset=offset) - return f - return decorator - - def stop(self): - self._stop_jobs() - self.logger.info("Timeloop exited.") - - def start(self, block=False): - self.logger.info("Starting Timeloop..") - self._start_jobs(block=block) - - self.logger.info("Timeloop now started. Jobs will run based on the interval set") - if block: - self._block_main_thread() diff --git a/src/old/timeloop/exceptions.py b/src/old/timeloop/exceptions.py deleted file mode 100644 index 4585852..0000000 --- a/src/old/timeloop/exceptions.py +++ /dev/null @@ -1,6 +0,0 @@ -class ServiceExit(Exception): - """ - Custom exception which is used to trigger the clean exit - of all running threads and the main program. - """ - pass diff --git a/src/old/timeloop/helpers.py b/src/old/timeloop/helpers.py deleted file mode 100644 index 0a1c2fe..0000000 --- a/src/old/timeloop/helpers.py +++ /dev/null @@ -1,5 +0,0 @@ -from timeloop.exceptions import ServiceExit - - -def service_shutdown(signum, frame): - raise ServiceExit diff --git a/src/old/timeloop/job.py b/src/old/timeloop/job.py deleted file mode 100644 index 5c32fdd..0000000 --- a/src/old/timeloop/job.py +++ /dev/null @@ -1,25 +0,0 @@ -from threading import Thread, Event -from datetime import timedelta -from time import sleep - -class Job(Thread): - def __init__(self, interval: timedelta, execute, offset: timedelta=None, *args, **kwargs): - Thread.__init__(self) - self.stopped = Event() - self.interval: timedelta = interval - self.execute = execute - self.offset: timedelta = offset - self.args = args - self.kwargs = kwargs - - def stop(self): - self.stopped.set() - self.join() - - def run(self): - if self.offset: - sleep(self.offset.total_seconds()) - self.execute(*self.args, **self.kwargs) - - while not self.stopped.wait(self.interval.total_seconds()): - self.execute(*self.args, **self.kwargs) diff --git a/src/old/philips_hue.py b/src/philips_hue.py similarity index 100% rename from src/old/philips_hue.py rename to src/philips_hue.py