Heavily reduced complexity to start of new

This commit is contained in:
Maximilian Giller 2023-12-10 01:57:23 +01:00
parent aec000de81
commit b9cb12a2d1
29 changed files with 0 additions and 1231 deletions

View file

@ -1,19 +0,0 @@
# An example config file for MaSH
services:
philips-hue:
ip: 192.168.178.42
lights:
- name: Hallway Ceiling
philips-hue: Hallway Ceiling
- name: Desk
philips-hue: Desk Plug
rooms:
- name: Office
philips-hue: Work Room
- name: Hallway
philips-hue: Hallways
adjacent:
- Office

View file

@ -1,14 +1,2 @@
smbus2
vl53l1x
# To parse the config file
pyyaml
# To hot reload the config file
watchdog
# For Philips Hue Counter # For Philips Hue Counter
phue phue
# For statistics
matplotlib

View file

@ -1,36 +0,0 @@
import logging
from config.reloadtrigger import ReloadTrigger
try:
from watchdog.observers import Observer
from watchdog.events import (
FileSystemEventHandler,
FileCreatedEvent,
FileModifiedEvent,
)
class HotDogTrigger(ReloadTrigger, FileSystemEventHandler):
"""Trigger for config files. Might be unbound, if watchdog is not installed. Check if equal to None before use."""
def __init__(self, path: str):
super().__init__(path)
self.observer = Observer()
self.observer.schedule(self, path, recursive=True)
self.observer.start()
def __del__(self):
self.observer.stop()
self.observer.join()
def on_modified(self, event: FileModifiedEvent):
self.__trigger_reload()
def on_created(self, event: FileCreatedEvent):
self.__trigger_reload()
logging.debug("Watchdog imported successfully. HotDogTrigger available.")
except ImportError:
logging.info("Watchdog is not installed. HotDogTrigger unavailable.")

View file

@ -1,49 +0,0 @@
import abc
from typing import Any
from models.home import Home
class ConfigParser:
def get_config(self) -> Home: # TODO: Check type
"""Load and parse the config."""
return self.__parse_config(self.__load_config())
@abc.abstractmethod
def __load_config(self) -> Any:
"""Load the config."""
raise NotImplementedError()
@abc.abstractmethod
def __parse_config(self, config) -> Home:
"""Parse the config."""
raise NotImplementedError()
class FileConfigParser(ConfigParser):
"""Scaffoling for parsers based on files."""
path: str
"""Path to the config file."""
def __init__(self, path: str):
self.path = path
class YAMLConfig(FileConfigParser):
"""Loads and parses config from a YAML file."""
def __init__(self, path: str):
super().__init__(path)
def __load_config(self) -> dict:
"""Loads config from a YAML file."""
import yaml
with open(self.path, "r") as file:
return yaml.safe_load(file)
def __parse_config(self, config: dict) -> Home:
"""Parses config in dict format. Usually based on JSON or YAML."""
# TODO: Implement
raise NotImplementedError()

View file

@ -1,26 +0,0 @@
class ReloadTrigger(object):
"""Abstract interface for config hot reloading."""
path: str
"""Path to the config file."""
callbacks: list
"""List of functions to callback on reload of config."""
def __init__(self, path: str):
"""Initialize the trigger.
Args:
path (str): Path to the config file.
"""
self.path = path
self.callbacks = []
def on_reload(self, callback):
"""Register a callback to be called when the config is reloaded."""
self.callbacks.append(callback)
def __trigger_reload(self):
"""Trigger a reload of the config."""
for callback in self.callbacks:
callback()

0
src/main.py Normal file
View file

View file

@ -1,48 +0,0 @@
from config.parser import ConfigParser
from config.reloadtrigger import ReloadTrigger
from models.home import Home
class MaSH:
"""Max' Smart Home. Smart home automation framework."""
config_parser: ConfigParser
"""Parser for the config file."""
config_trigger: list[ReloadTrigger]
"""Active triggers for reloading the config."""
home: Home
"""Home to control and make smart."""
def __init__(self, config_parser: ConfigParser) -> None:
"""Initialize the framework.
Args:
config_parser (ConfigParser): Parser for the config file.
"""
self.config_parser = config_parser
self.config_trigger = []
self.reload_config()
def reload_config(self) -> None:
"""Reload the config."""
self.home = self.config_parser.get_config() # TODO: Check type
def register_config_trigger(self, trigger: ReloadTrigger) -> bool:
"""Register a trigger for hot-reloading the config.
Args:
trigger (ReloadTrigger): Trigger for hot-reloading the config based on ReloadTrigger.
Returns:
bool: True if the trigger was registered, False if not.
"""
if trigger is None or trigger in self.config_trigger:
return False
trigger.on_reload(self.reload_config)
self.config_trigger.append(trigger)
return True

View file

@ -1,23 +0,0 @@
class RGBColor:
"""Represents a color in the RGB color space."""
r: int
"""Red value of the color."""
g: int
"""Green value of the color."""
b: int
"""Blue value of the color."""
def __init__(self, r: int, g: int, b: int):
"""Initialize the color.
Args:
r (int): Red value of the color.
g (int): Green value of the color.
b (int): Blue value of the color.
"""
self.r = r
self.g = g
self.b = b

View file

@ -1,9 +0,0 @@
from models.devices.colordevices import *
from models.devices.switchdevices import *
from models.devices.genericdevices import *
class AllDevice(SetSwitchDevice, SetColorDevice):
"""Inherits all intefaces from all devices."""
pass

View file

@ -1,25 +0,0 @@
from models.colors import RGBColor
from models.devices.genericdevices import GenericDevice
class ColorDevice(GenericDevice):
"""Abstract device that has a color."""
_color: RGBColor
"""Color of device."""
class GetColorDevice(ColorDevice):
"""Implement color getter for a color device."""
@property
def color(self) -> RGBColor:
return self._color
class SetColorDevice(GetColorDevice):
"""Implements color setter for a color device."""
@GetColorDevice.color.setter
def color(self, color: RGBColor):
self._color = color

View file

@ -1,5 +0,0 @@
class GenericDevice:
"""A generic device."""
def __init__(self, name: str):
self.name = name

View file

@ -1,43 +0,0 @@
from models.devices.genericdevices import GenericDevice
class SwitchDevice(GenericDevice):
"""Abstract device that can be turned on and off."""
_is_on: bool
"""Current state of the device."""
class ToggleSwitchDevice(SwitchDevice):
"""Implements toggle functionality for a switch device."""
def toggle(self):
self._is_on = not self._is_on
class TurnOffSwitchDevice(SwitchDevice):
"""Implements turn off functionality for a switch device."""
def turn_off(self):
self._is_on = False
class TurnOnSwitchDevice(SwitchDevice):
"""Implements turn on functionality for a switch device."""
def turn_on(self):
self._is_on = True
class GetSwitchDevice(SwitchDevice):
"""Implements is_on state getter for a switch device."""
@property
def is_on(self) -> bool:
return self._is_on
class SetSwitchDevice(TurnOffSwitchDevice, TurnOnSwitchDevice, ToggleSwitchDevice):
@GetSwitchDevice.is_on.setter
def is_on(self, set_on: bool):
self._is_on = set_on

View file

@ -1,2 +0,0 @@
class NoDeviceFoundError(Exception):
pass

View file

@ -1,71 +0,0 @@
from models.helper import filter_devices
from models.devices import AllDevice
from models.colors import RGBColor
class DeviceGroup(AllDevice):
"""A group of devices that allows group operations. Inherits from a complex device to offer group operations for all devices."""
def __init__(self, devices: list[GenericDevice]):
self._devices = devices
@property
def devices(self) -> list[GenericDevice]:
return self._devices
@property
def switches(self) -> list[SwitchDevice]:
return filter_devices(self.devices, SwitchDevice)
@property
def lights(self) -> list[LightDevice]:
return filter_devices(self.devices, LightDevice)
@property
def is_on(self) -> bool:
"""Returns true if any device is on."""
return any(device.is_on for device in self.switches)
@is_on.setter
def is_on(self, is_on: bool):
"""Sets all devices to the same state."""
for device in self.switches:
device.is_on = is_on
@property
def color(self) -> LightColor:
"""Returns the color of the first light in the group."""
return self.lights[0].color
@color.setter
def color(self, color: LightColor):
"""Sets all lights in the group to the same color."""
for device in self.lights:
device.color = color
@property
def scene(self) -> LightScene:
"""Returns the current scene of the group."""
return LightScene(self.name, {device: device.color for device in self.lights})
class Room(DeviceGroup):
"""A group of devices that has additional properties a can be seen as a room."""
def __init__(self, name: str, devices: list[GenericDevice], people_count: int = 0):
"""
Args:
name (str): Name of the room.
devices (list[GenericDevice]): Devices in this room.
people_count (int, optional): Number of people in this room. Defaults to 0.
"""
super().__init__(devices)
self.name = name
self._people_count = people_count
@property
def people_count(self) -> int:
return self._people_count
def __str__(self):
return self.name

View file

@ -1,28 +0,0 @@
from typing import TypeVar
from models.devices.genericdevices import GenericDevice, LightDevice, SwitchDevice
from models.exceptions import NoDeviceFoundError
from models.groups import DeviceGroup, Room
DEVICE_TYPE = TypeVar(
"DEVICE_TYPE",
type(GenericDevice),
type(SwitchDevice),
type(LightDevice),
type(Room),
type(DeviceGroup),
)
def filter_devices(
devices: list[GenericDevice], type: DEVICE_TYPE
) -> list[DEVICE_TYPE]:
"""Filters out devices that are not of a specific type."""
filtered_devices: list[DEVICE_TYPE] = [
device for device in devices if isinstance(device, type)
]
if len(filtered_devices) == 0:
raise NoDeviceFoundError(f"No devices of type {type} found.")
return filtered_devices

View file

@ -1,19 +0,0 @@
from models.groups import DeviceGroup, Room
from models.helper import filter_devices
class Home(Room):
"""Combines all elements of a smart home."""
location: str
"""Physical location of the home, useful for sunset and sunrise times."""
@property
def rooms(self) -> list[Room]:
"""Returns all rooms in the home."""
return filter_devices(self.devices, Room)
@property
def groups(self) -> list[DeviceGroup]:
"""Returns all groups in the home."""
return filter_devices(self.devices, DeviceGroup)

View file

@ -1,283 +0,0 @@
from datetime import datetime, time, timedelta
from typing import Dict
from old.philips_hue import PhilipsHue
from sensors import PeopleCounter, Directions, VL53L1XSensor
import logging
import json
from timeloop import Timeloop
# Should lights already turn on where there is any kind of motion in the sensor
ENABLE_MOTION_TRIGGERED_LIGHT = True
# Should lights change when a certain time in the schedule is reached
ENABLE_SCHEDULE_TRIGGERS = (
False # Not working correctly at the moment, so turned off by default
)
# Schedule (Key is time after scene should be used. Value is scene name to be used.)
# Needs to be sorted chronologically
SCHEDULE = {}
LOG_FILE_PATH = "log.txt" # Path for logs
hue_conf = {
"bridge_ip": "",
"transition_time": 10, # seconds
"light_group": "",
# If file exists, application is considered 'registered' at the bridge
"registered_file": "smart_light_registered.bridge",
} # Custom configuration for philips hue
hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
peopleCount: int = 0 # Global count of people on the inside
motion_triggered_lights = False # Is light on because of any detected motion
timeloop: Timeloop = Timeloop() # Used for time triggered schedule
logging.getLogger().setLevel(logging.INFO)
def time_minus_time(time_a: time, time_b: time) -> timedelta:
"""Implementes a basic timedelta function for time objects.
Args:
time_a (time): Time to subtract from.
time_b (time): Time to be subtracted.
Returns:
timedelta: Delta between the two time objects.
"""
today = datetime.today()
dt_a = datetime.combine(today, time_a)
dt_b = datetime.combine(today, time_b)
return dt_a - dt_b
def get_scene_for_time(time: time) -> str:
"""Determines the correct scene to activate for a given time.
Args:
time (time): Time to find scene for.
Returns:
string: Scene name that should be active. None, if schedule is empty.
"""
global SCHEDULE
if SCHEDULE is None or len(SCHEDULE) <= 0:
return None
previous_scene = None
for start_time, scene in SCHEDULE.items():
# If current time is still after schedule time, just keep going
if start_time <= time:
previous_scene = scene
continue
# Schedule timef is now after current time, which is too late
# So if exists, take previous scene, since it was the last before the current time
if previous_scene:
return previous_scene
else:
break
# Only breaks if it could not find a valid scene, so use lates scene as fallback
return list(SCHEDULE.values())[-1]
def change_cb(countChange: int, directionState: Dict):
"""Handles basic logging of event data for later analysis.
Args:
countChange (int): The change in the number of people. Usually on of [-1, 0, 1].
directionState (Dict): Object describing the internal state of the sensor.
"""
data = {
"version": "v0.0",
"previousPeopleCount": peopleCount,
"countChange": countChange,
"directionState": directionState,
"dateTime": datetime.now(),
"motionTriggeredLights": motion_triggered_lights,
}
try:
with open(LOG_FILE_PATH, "a") as f:
f.write(json.dumps(data, default=str) + "\n")
except Exception as ex:
logging.exception(f"Unable to write log file. {ex}")
def count_change(change: int) -> None:
"""Handles light state when people count changes
Args:
change (int): The change in the number of people. Usually on of [-1, 0, 1].
"""
global hue
global peopleCount
global motion_triggered_lights
# Are lights on at the moment?
previous_lights_state = get_light_state()
# Apply correction
if peopleCount <= 0 and previous_lights_state and not motion_triggered_lights:
# Count was 0, but lights were on (not because of motion triggers) => people count was not actually 0
peopleCount = 1
logging.debug(f"People count corrected to {peopleCount}")
elif peopleCount > 0 and not previous_lights_state:
# Count was >0, but lights were off => people count was actually 0
peopleCount = 0
logging.debug(f"People count corrected to {peopleCount}")
peopleCount += change
if peopleCount < 0:
peopleCount = 0
logging.debug(f"People count changed by {change}")
# Handle light
target_light_state = peopleCount > 0
# Return, if there is no change
if previous_lights_state == target_light_state:
if previous_lights_state:
# Signaling that the people count is taking control over the light now
motion_triggered_lights = False
return
set_light_state(target_light_state)
def trigger_change(triggerState: Dict):
"""Handles motion triggered light state.
Args:
triggerState (Dict): Describing in what directions the sensor is triggerd.
"""
global hue
global motion_triggered_lights
target_light_state = None
# Is someone walking close to the door?
motion_detected = (
triggerState[Directions.INSIDE] or triggerState[Directions.OUTSIDE]
)
target_light_state = motion_detected
# Does motion triggered light need to do anything?
if peopleCount > 0:
# State is successfully handled by the count
motion_triggered_lights = False
return
# Only look at changing situations
if target_light_state == motion_triggered_lights:
return
set_light_state(target_light_state)
# Save state
motion_triggered_lights = target_light_state
def set_light_scene(target_scene: str) -> bool:
"""Sets the lights to the given scene, but only, if lights are already on. Does not correct count if lights are in an unexpected state.
Args:
target_scene (string): Name of the scene to activate.
"""
# Is valid scene?
if target_scene is None:
return
# Are lights on at the moment? Only based on people count for simplicity
if peopleCount <= 0:
# Lights are probably off, not doing anything
return
# Set lights to scene
hue.set_group_scene(hue_conf["light_group"], target_scene)
logging.debug(f"Light scene set to {target_scene}")
def set_light_state(target_light_state: bool) -> bool:
"""Sets the lights to the given state.
Args:
target_light_state (bool): Should lights on the inside be on or off.
Returns:
bool: Previous light state.
"""
# Are lights on at the moment?
previous_lights_state = get_light_state()
if target_light_state == previous_lights_state:
return previous_lights_state
# Adjust light as necessary
target_scene = get_scene_for_time(datetime.now().time())
if target_light_state and target_scene:
# Set to specific scene if exists
hue.set_group_scene(hue_conf["light_group"], target_scene)
logging.debug(
f"Light state changed to {target_light_state} with scene {target_scene}"
)
else:
hue.set_group(hue_conf["light_group"], {"on": target_light_state})
logging.debug(f"Light state changed to {target_light_state}")
return previous_lights_state
def get_light_state() -> bool:
"""
Returns:
bool: Current light state.
"""
return hue.get_group(hue_conf["light_group"])["state"]["any_on"]
def update_scene():
"""Called by time trigger to update light scene if lights are on."""
scene = get_scene_for_time(datetime.now().time())
if scene is None:
return
set_light_scene(scene)
logging.debug(f"Updated scene at {datetime.now().time()} to {scene}.")
def register_time_triggers():
"""Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on."""
global SCHEDULE
if SCHEDULE is None or len(SCHEDULE) <= 0:
return
for time in SCHEDULE.keys():
delta = time_minus_time(time, datetime.now().time())
if delta < timedelta(0):
delta += timedelta(1)
timeloop._add_job(update_scene, interval=timedelta(1), offset=delta)
timeloop.start(block=False)
logging.info("Registered time triggers.")
if __name__ == "__main__":
if ENABLE_SCHEDULE_TRIGGERS:
register_time_triggers()
# Represents callback trigger order
counter.hookChange(change_cb)
counter.hookCounting(count_change)
counter.hookTrigger(trigger_change)
counter.run()

View file

@ -1,3 +0,0 @@
from sensors.vl53l1x_sensor import VL53L1XSensor
from sensors.people_counter import PeopleCounter
from sensors.tof_sensor import Directions

View file

@ -1,195 +0,0 @@
from sensors.tof_sensor import ToFSensor, Directions
from datetime import datetime
import threading
COUNTING_CB = "counting"
TRIGGER_CB = "trigger"
CHANGE_CB = "changes"
START_TIME = "start_time"
END_TIME = "end_time"
TRIGGER_DISTANCES = "trigger_distances"
END_DISTANCE = "end_distance"
class PeopleCounter:
def __init__(self, sensor: ToFSensor) -> None:
self.sensor = sensor
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []}
self.maxTriggerDistance = 120 # In cm
def hookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].append(cb)
def unhookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].remove(cb)
def hookTrigger(self, cb) -> None:
self.callbacks[TRIGGER_CB].append(cb)
def unhookTrigger(self, cb) -> None:
self.callbacks[TRIGGER_CB].remove(cb)
def hookChange(self, cb) -> None:
self.callbacks[CHANGE_CB].append(cb)
def unhookChange(self, cb) -> None:
self.callbacks[CHANGE_CB].remove(cb)
def getInitialDirectionState(self) -> dict[Directions, list]:
return {Directions.INSIDE: [], Directions.OUTSIDE: []}
def run(self) -> None:
self.keepRunning = True
direction = Directions.INSIDE
self.directionState = self.getInitialDirectionState()
self.sensor.open()
while self.keepRunning:
# Switch to other direction
direction: Directions = Directions.other(direction)
self.sensor.setDirection(direction)
distance: float = self.sensor.getDistance()
changed: bool = self.updateState(direction, distance)
if changed:
countChange: int = self.getCountChange(self.directionState)
# Hooks
th = threading.Thread(target=self.handleCallbacks, args=(countChange,))
th.start()
# Reset state if state is finalised
if not self.isDirectionTriggered(
Directions.INSIDE
) and not self.isDirectionTriggered(Directions.OUTSIDE):
self.directionState = self.getInitialDirectionState()
self.sensor.close()
def getCountChange(self, directionState) -> int:
# Is valid?
for direction in Directions:
# Is there at least one record for every direction?
if len(directionState[direction]) <= 0:
return 0
# Did every record start and end?
if (
directionState[direction][0][START_TIME] is None
or directionState[direction][-1][END_TIME] is None
):
return 0 # Return no change if not valid
# Get times into variables
insideStart = directionState[Directions.INSIDE][0][START_TIME]
insideEnd = directionState[Directions.INSIDE][-1][END_TIME]
outsideStart = directionState[Directions.OUTSIDE][0][START_TIME]
outsideEnd = directionState[Directions.OUTSIDE][-1][END_TIME]
# In what direction is the doorframe entered and left?
# Entering doorframe in the inside direction
enteringInside: bool = outsideStart < insideStart
# Leaving dooframe in the inside direction
leavingInside: bool = outsideEnd < insideEnd
# They have to be the same, otherwise they switch directions in between
if enteringInside != leavingInside:
# Someone did not go all the way
# Either
# Inside -######-
# Outside ---##---
# or
# Inside ---##---
# Outside -######-
return 0
# Are those times overlapping or disjunct?
if insideEnd < outsideStart or outsideEnd < insideStart:
# They are disjunct
# Either
# Inside -##-----
# Outside -----##-
# or
# Inside -----##-
# Outside -##-----
return 0
# What direction is the person taking?
if enteringInside:
# Entering the inside
# Inside ---####-
# Outside -####---
return 1
else:
# Leaving the inside
# Inside -####---
# Outside ---####-
return -1
def isTriggerDistance(self, distance: float) -> bool:
#! TODO: Should be based on the distance from the ground, not from the sensor
return distance <= self.maxTriggerDistance
def handleCallbacks(self, countChange: int):
self.handleChangeCallbacks(countChange)
self.handleCountingCallbacks(countChange)
self.handleTriggerCallbacks()
def handleCountingCallbacks(self, countChange: int) -> None:
# Only notify counting on actual count change
if countChange == 0:
return
for cb in self.callbacks[COUNTING_CB]:
cb(countChange)
def handleTriggerCallbacks(self) -> None:
triggerState = {
Directions.INSIDE: self.isDirectionTriggered(Directions.INSIDE),
Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE),
}
for cb in self.callbacks[TRIGGER_CB]:
cb(triggerState)
def handleChangeCallbacks(self, countChange: int) -> None:
for cb in self.callbacks[CHANGE_CB]:
cb(countChange, self.directionState)
def isDirectionTriggered(self, direction: Directions) -> bool:
return (
len(self.directionState[direction]) > 0
and self.directionState[direction][-1][END_TIME] is None
)
def updateState(self, direction: Directions, distance: float) -> bool:
triggered: bool = self.isTriggerDistance(distance)
previouslyTriggered = False
if len(self.directionState[direction]) > 0:
previouslyTriggered = self.directionState[direction][-1][END_TIME] is None
if triggered and not previouslyTriggered:
# Set as new beginning for this direction
self.directionState[direction].append(
{
START_TIME: datetime.now(),
END_TIME: None,
TRIGGER_DISTANCES: [distance],
END_DISTANCE: None,
}
)
return True
elif not triggered and previouslyTriggered:
# Set as end for this direction
self.directionState[direction][-1][END_TIME] = datetime.now()
self.directionState[direction][-1][END_DISTANCE] = distance
return True
elif previouslyTriggered:
# Add distance at least
self.directionState[direction][-1][TRIGGER_DISTANCES].append(distance)
return False

View file

@ -1,31 +0,0 @@
from enum import Enum
class Directions(str, Enum):
INSIDE = "indoor"
OUTSIDE = "outdoor"
def other(direction: "Directions") -> "Directions":
if direction is Directions.INSIDE:
return Directions.OUTSIDE
else:
return Directions.INSIDE
def __iter__():
return [Directions.INSIDE, Directions.OUTSIDE]
class ToFSensor:
def open(self) -> None:
raise NotImplementedError()
def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction."""
raise NotImplementedError()
def getDistance(self) -> float:
"""Returns new distance in cm."""
raise NotImplementedError()
def close(self) -> None:
raise NotImplementedError()

View file

@ -1,62 +0,0 @@
from sensors.tof_sensor import Directions, ToFSensor
import VL53L1X
# Reference: https://github.com/pimoroni/vl53l1x-python
#
# Left, right, top and bottom are relative to the SPAD matrix coordinates,
# which will be mirrored in real scene coordinates.
# (or even rotated, depending on the VM53L1X element alignment on the board and on the board position)
#
# ROI in SPAD matrix coords:
#
# 15 top-left
# | X____
# | | |
# | |____X
# | bottom-right
# 0__________15
#
class VL53L1XSensor(ToFSensor):
def __init__(self) -> None:
super().__init__()
def open(self) -> None:
self.sensor = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
self.sensor.open()
# Optionally set an explicit timing budget
# These values are measurement time in microseconds,
# and inter-measurement time in milliseconds.
# If you uncomment the line below to set a budget you
# should use `tof.start_ranging(0)`
# tof.set_timing(66000, 70)
self.ranging = 2
# 0 = Unchanged
# 1 = Short Range
# 2 = Medium Range
# 3 = Long Range
def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction."""
direction_roi = {
Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0),
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12),
}
roi = direction_roi[direction]
self.sensor.stop_ranging()
self.sensor.set_user_roi(roi)
self.sensor.start_ranging(self.ranging)
def getDistance(self) -> float:
"""Returns new distance in cm."""
distance = self.sensor.get_distance()
return distance / 10
def close(self) -> None:
self.sensor.stop_ranging()
self.sensor.close()

View file

@ -1,112 +0,0 @@
from datetime import datetime
import json
from typing import Dict
from xmlrpc.client import Boolean
import matplotlib.pyplot as plt
# Config
FILE_PATH = "log.txt"
AFTER_DATE = datetime(2022, 1, 1) # Only keeps log entries after the specified date. Filter applied in parse_log_entry(..)
# Read file
content = None
with open(FILE_PATH, "r") as file:
content = file.readlines()
def parse_log_entry(entry: Dict) -> Dict | bool:
# Only keep last record of a sequence
if not is_last_in_sequence(entry):
return False
entry["dateTime"] = datetime.strptime(
str(entry["dateTime"])[:19], "%Y-%m-%d %H:%M:%S"
)
if entry["dateTime"] < AFTER_DATE:
return False
return entry
def is_last_in_sequence(entry: Dict) -> Boolean:
indoor = entry["directionState"]["indoor"]
outdoor = entry["directionState"]["outdoor"]
if len(indoor) <= 0 or len(outdoor) <= 0:
return False
end_key = "end_distance"
# Check version
if end_key not in indoor[-1]:
end_key = "end"
if indoor[-1][end_key] is None or outdoor[-1][end_key] is None:
return False
return True
# Collect
log: list[Dict] = [json.loads(line.strip("\x00")) for line in content]
print("Number of total entries:", len(log))
# Parse & Filter
log = [parse_log_entry(entry) for entry in log if parse_log_entry(entry)] # type: ignore
print("Number of filtered entries:", len(log))
# Render
fig, ax = plt.subplots() # Create a figure containing a single axes.
times: list[datetime] = [entry["dateTime"] for entry in log]
counts: list[int] = [entry["previousPeopleCount"] for entry in log]
ax.step(times, counts, where="pre") # type: ignore
print("-"*20)
plt.show()
print("-" * 20)
# Print stats
walk_ins = [entry for entry in log if entry["countChange"] > 0]
walk_outs = [entry for entry in log if entry["countChange"] < 0]
walk_unders = [entry for entry in log if entry["countChange"] == 0]
print("Number of walk-ins:", len(walk_ins))
print("Number of walk-outs:", len(walk_outs))
print("Number of walk-unders:", len(walk_unders))
print("-" * 20)
# Calculate faults
for c, n in zip(list(range(len(log))), list(range(len(log)))[1:]):
estimated_count: int = log[c]["previousPeopleCount"] + log[c]["countChange"]
faulty: bool = estimated_count != log[n]["previousPeopleCount"]
log[c]["faulty"] = faulty
log[c]["faultyCount"] = log[c]["previousPeopleCount"] if faulty else None
log = log[:-1]
fault_count = sum(1 for entry in log if entry["faulty"])
fault_percentage = fault_count / len(log)
print("Number of faults:", fault_count)
print("Percentage of faults:", fault_percentage * 100, "%")
print("-" * 20)
faulty_off = [entry for entry in log if entry["faulty"] and entry["faultyCount"] == 0]
faulty_on = [entry for entry in log if entry["faulty"] and entry["faultyCount"] != 0]
print("Number of false-0:", len(faulty_off))
print("Number of false-1:", len(faulty_on))
print("Percentage of false-0:", len(faulty_off) / fault_count * 100, "%")
print("Percentage of false-1:", len(faulty_on) / fault_count * 100, "%")
# Number of dates
unique_dates = set()
for entry in log:
date = entry["dateTime"].strftime("%Y-%m-%d")
unique_dates.add(date)
total_days = len(unique_dates)
print("-"*20)
print("Number of days:", total_days)
print("Changes per day:", len(log) / total_days)
corrections_per_day = len(log) / total_days * fault_percentage
print("Corrections per day:", corrections_per_day)

View file

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2018 sankalpjonn
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1 +0,0 @@
from timeloop.app import Timeloop

View file

@ -1,72 +0,0 @@
from datetime import datetime, timedelta
import logging
import sys
import signal
import time
from timeloop.exceptions import ServiceExit
from timeloop.job import Job
from timeloop.helpers import service_shutdown
class Timeloop():
def __init__(self):
self.jobs = []
logger = logging.getLogger('timeloop')
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(logging.INFO)
self.logger = logger
def _add_job(self, func, interval: timedelta, offset: timedelta=None, *args, **kwargs):
j = Job(interval, func, offset=offset, *args, **kwargs)
self.jobs.append(j)
def _block_main_thread(self):
signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)
while True:
try:
time.sleep(1)
except ServiceExit:
self.stop()
break
def _start_jobs(self, block):
for j in self.jobs:
j.daemon = not block
j.start()
self.logger.info("Registered job {}".format(j.execute))
def _stop_jobs(self):
for j in self.jobs:
self.logger.info("Stopping job {}".format(j.execute))
j.stop()
def job(self, interval: timedelta, offset: timedelta=None):
"""Decorator to define a timeloop for the decorated function.
Args:
interval (timedelta): How long to wait after every execution until the next one.
offset (timedelta, optional): Positive offset until the first execution of the function. If None, will wait with first execution until the first interval passed. If timedelta with length 0 (or smaller) will execute immediately. Defaults to None.
"""
def decorator(f):
self._add_job(f, interval, offset=offset)
return f
return decorator
def stop(self):
self._stop_jobs()
self.logger.info("Timeloop exited.")
def start(self, block=False):
self.logger.info("Starting Timeloop..")
self._start_jobs(block=block)
self.logger.info("Timeloop now started. Jobs will run based on the interval set")
if block:
self._block_main_thread()

View file

@ -1,6 +0,0 @@
class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass

View file

@ -1,5 +0,0 @@
from timeloop.exceptions import ServiceExit
def service_shutdown(signum, frame):
raise ServiceExit

View file

@ -1,25 +0,0 @@
from threading import Thread, Event
from datetime import timedelta
from time import sleep
class Job(Thread):
def __init__(self, interval: timedelta, execute, offset: timedelta=None, *args, **kwargs):
Thread.__init__(self)
self.stopped = Event()
self.interval: timedelta = interval
self.execute = execute
self.offset: timedelta = offset
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
if self.offset:
sleep(self.offset.total_seconds())
self.execute(*self.args, **self.kwargs)
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)