first commit

This commit is contained in:
Maximilian Giller 2022-08-05 23:45:25 +02:00
commit 4886983f96
18 changed files with 964 additions and 0 deletions

10
README.md Normal file
View file

@ -0,0 +1,10 @@
# Max's Smart Home - MaSH
Should be a very simple implementation of what is required in Max's smart home. Trying not to overcomplicate things and thereby ruin motivation to work on this.
## ToDo
- Energy-saving/Off mode (Only one light slighty on to deal with the state) (How should power plugs be handled?)
- Daylight Adjustment (E.g. No ceiling lights during daytime)
- Save scene when turning off, to reapply same scene when turning on
- Detect fast flickering of light state, indicating an issue, and disable the system for a few minutes

8
requirements.txt Normal file
View file

@ -0,0 +1,8 @@
smbus2
vl53l1x
# For Philips Hue Counter
phue
# For statistics
matplotlib

18
src/console_counter.py Normal file
View file

@ -0,0 +1,18 @@
from sensor.people_counter import PeopleCounter
from sensor.vl53l1x_sensor import VL53L1XSensor
import logging
counter = PeopleCounter(VL53L1XSensor())
peopleCount = 0
logging.getLogger().setLevel(logging.INFO)
def countChange(change: int) -> None:
global peopleCount
peopleCount += change
logging.info(f'People count change to: {peopleCount}')
counter.hookCounting(countChange)
counter.run()

View file

@ -0,0 +1,44 @@
from sensor.people_counter import PeopleCounter
from sensor.vl53l1x_sensor import VL53L1XSensor
import paho.mqtt.client as mqtt
from HaMqtt.MQTTSensor import MQTTSensor
from HaMqtt.MQTTUtil import HaDeviceClass
import logging
HA_URL = ""
HA_PORT = 1883
HA_SENSOR_NAME = ""
HA_SENSOR_ID = ""
HA_SENSOR_DEVICE_CLASS = HaDeviceClass.NONE
SENSOR_UNIT = ""
# Setup connection to HA
mqttClient = mqtt.Client()
mqttClient.connect(HA_URL, HA_PORT)
mqttClient.loop_start() # Keep conneciton alive
# Setup mqtt binding
sensor = MQTTSensor(HA_SENSOR_NAME, HA_SENSOR_ID, mqttClient, SENSOR_UNIT, HA_SENSOR_DEVICE_CLASS)
logging.debug(f'Connected to topic {sensor.state_topic}')
def countChange(change: int) -> None:
"""Called when people count change is detected.
Sends update to the initialized HA instance.
Args:
change (int): Number of people leaving (<0) or entering (>0) a room.
"""
# Send update to HA
global sensor
sensor.publish_state(change)
logging.debug(f'People count changed by {change}')
# Setup people count sensor
counter = PeopleCounter(VL53L1XSensor())
counter.hookCounting(countChange)
counter.run()

283
src/philips_hue_counter.py Normal file
View file

@ -0,0 +1,283 @@
from datetime import datetime, time, timedelta
from typing import Dict
from interface.philips_hue import PhilipsHue
from sensor.people_counter import PeopleCounter
from sensor.tof_sensor import Directions
from sensor.vl53l1x_sensor import VL53L1XSensor
import logging
import json
from timeloop import Timeloop
# Should lights already turn on where there is any kind of motion in the sensor
ENABLE_MOTION_TRIGGERED_LIGHT = True
# Should lights change when a certain time in the schedule is reached
ENABLE_SCHEDULE_TRIGGERS = False # Not working correctly at the moment, so turned off by default
# Schedule (Key is time after scene should be used. Value is scene name to be used.)
# Needs to be sorted chronologically
SCHEDULE = {}
LOG_FILE_PATH = "log.txt" # Path for logs
hue_conf = {
'bridge_ip': '',
'transition_time': 10, # seconds
'light_group': '',
# If file exists, application is considered 'registered' at the bridge
'registered_file': 'smart_light_registered.bridge'
} # Custom configuration for philips hue
hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
peopleCount: int = 0 # Global count of people on the inside
motion_triggered_lights = False # Is light on because of any detected motion
timeloop: Timeloop = Timeloop() # Used for time triggered schedule
logging.getLogger().setLevel(logging.INFO)
def time_minus_time(time_a: time, time_b: time) -> timedelta:
"""Implementes a basic timedelta function for time objects.
Args:
time_a (time): Time to subtract from.
time_b (time): Time to be subtracted.
Returns:
timedelta: Delta between the two time objects.
"""
today = datetime.today()
dt_a = datetime.combine(today, time_a)
dt_b = datetime.combine(today, time_b)
return dt_a - dt_b
def get_scene_for_time(time: time) -> str:
"""Determines the correct scene to activate for a given time.
Args:
time (time): Time to find scene for.
Returns:
string: Scene name that should be active. None, if schedule is empty.
"""
global SCHEDULE
if SCHEDULE is None or len(SCHEDULE) <= 0:
return None
previous_scene = None
for start_time, scene in SCHEDULE.items():
# If current time is still after schedule time, just keep going
if start_time <= time:
previous_scene = scene
continue
# Schedule timef is now after current time, which is too late
# So if exists, take previous scene, since it was the last before the current time
if previous_scene:
return previous_scene
else:
break
# Only breaks if it could not find a valid scene, so use lates scene as fallback
return list(SCHEDULE.values())[-1]
def change_cb(countChange: int, directionState: Dict):
"""Handles basic logging of event data for later analysis.
Args:
countChange (int): The change in the number of people. Usually on of [-1, 0, 1].
directionState (Dict): Object describing the internal state of the sensor.
"""
data = {
'version': 'v0.0',
'previousPeopleCount': peopleCount,
'countChange': countChange,
'directionState': directionState,
'dateTime': datetime.now(),
'motionTriggeredLights': motion_triggered_lights
}
try:
with open(LOG_FILE_PATH, 'a') as f:
f.write(json.dumps(data, default=str) + "\n")
except Exception as ex:
logging.exception(f'Unable to write log file. {ex}')
def count_change(change: int) -> None:
"""Handles light state when people count changes
Args:
change (int): The change in the number of people. Usually on of [-1, 0, 1].
"""
global hue
global peopleCount
global motion_triggered_lights
# Are lights on at the moment?
previous_lights_state = get_light_state()
# Apply correction
if peopleCount <= 0 and previous_lights_state and not motion_triggered_lights:
# Count was 0, but lights were on (not because of motion triggers) => people count was not actually 0
peopleCount = 1
logging.debug(f'People count corrected to {peopleCount}')
elif peopleCount > 0 and not previous_lights_state:
# Count was >0, but lights were off => people count was actually 0
peopleCount = 0
logging.debug(f'People count corrected to {peopleCount}')
peopleCount += change
if peopleCount < 0:
peopleCount = 0
logging.debug(f'People count changed by {change}')
# Handle light
target_light_state = peopleCount > 0
# Return, if there is no change
if previous_lights_state == target_light_state:
if previous_lights_state:
# Signaling that the people count is taking control over the light now
motion_triggered_lights = False
return
set_light_state(target_light_state)
def trigger_change(triggerState: Dict):
"""Handles motion triggered light state.
Args:
triggerState (Dict): Describing in what directions the sensor is triggerd.
"""
global hue
global motion_triggered_lights
target_light_state = None
# Is someone walking close to the door?
motion_detected = triggerState[Directions.INSIDE] or triggerState[Directions.OUTSIDE]
target_light_state = motion_detected
# Does motion triggered light need to do anything?
if peopleCount > 0:
# State is successfully handled by the count
motion_triggered_lights = False
return
# Only look at changing situations
if target_light_state == motion_triggered_lights:
return
set_light_state(target_light_state)
# Save state
motion_triggered_lights = target_light_state
def set_light_scene(target_scene: str) -> bool:
"""Sets the lights to the given scene, but only, if lights are already on. Does not correct count if lights are in an unexpected state.
Args:
target_scene (string): Name of the scene to activate.
"""
# Is valid scene?
if target_scene is None:
return
# Are lights on at the moment? Only based on people count for simplicity
if peopleCount <= 0:
# Lights are probably off, not doing anything
return
# Set lights to scene
hue.set_group_scene(hue_conf['light_group'], target_scene)
logging.debug(
f'Light scene set to {target_scene}')
def set_light_state(target_light_state: bool) -> bool:
"""Sets the lights to the given state.
Args:
target_light_state (bool): Should lights on the inside be on or off.
Returns:
bool: Previous light state.
"""
# Are lights on at the moment?
previous_lights_state = get_light_state()
if target_light_state == previous_lights_state:
return previous_lights_state
# Adjust light as necessary
target_scene = get_scene_for_time(datetime.now().time())
if target_light_state and target_scene:
# Set to specific scene if exists
hue.set_group_scene(hue_conf['light_group'], target_scene)
logging.debug(
f'Light state changed to {target_light_state} with scene {target_scene}')
else:
hue.set_group(hue_conf['light_group'], {'on': target_light_state})
logging.debug(f'Light state changed to {target_light_state}')
return previous_lights_state
def get_light_state() -> bool:
"""
Returns:
bool: Current light state.
"""
return hue.get_group(hue_conf['light_group'])['state']['any_on']
def update_scene():
"""Called by time trigger to update light scene if lights are on.
"""
scene = get_scene_for_time(datetime.now().time())
if scene is None:
return
set_light_scene(scene)
logging.debug(f'Updated scene at {datetime.now().time()} to {scene}.')
def register_time_triggers():
"""Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on.
"""
global SCHEDULE
if SCHEDULE is None or len(SCHEDULE) <= 0:
return
for time in SCHEDULE.keys():
delta = time_minus_time(time, datetime.now().time())
if delta < timedelta(0):
delta += timedelta(1)
timeloop._add_job(update_scene, interval=timedelta(1), offset=delta)
timeloop.start(block=False)
logging.info("Registered time triggers.")
if __name__ == "__main__":
if ENABLE_SCHEDULE_TRIGGERS:
register_time_triggers()
# Represents callback trigger order
counter.hookChange(change_cb)
counter.hookCounting(count_change)
counter.hookTrigger(trigger_change)
counter.run()

2
src/sensors/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from sensors.tof_sensor import ToFSensor, Directions
from sensors.people_counter import PeopleCounter

View file

@ -0,0 +1,189 @@
from typing import Dict
from sensors import ToFSensor, Directions
from datetime import datetime
import threading
COUNTING_CB = "counting"
TRIGGER_CB = "trigger"
CHANGE_CB = "changes"
START_TIME = "start_time"
END_TIME = "end_time"
TRIGGER_DISTANCES = "trigger_distances"
END_DISTANCE = "end_distance"
class PeopleCounter ():
def __init__(self, sensor: ToFSensor) -> None:
self.sensor = sensor
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []}
self.maxTriggerDistance = 120 # In cm
def hookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].append(cb)
def unhookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].remove(cb)
def hookTrigger(self, cb) -> None:
self.callbacks[TRIGGER_CB].append(cb)
def unhookTrigger(self, cb) -> None:
self.callbacks[TRIGGER_CB].remove(cb)
def hookChange(self, cb) -> None:
self.callbacks[CHANGE_CB].append(cb)
def unhookChange(self, cb) -> None:
self.callbacks[CHANGE_CB].remove(cb)
def getInitialDirectionState(self) -> Dict:
return {
Directions.INSIDE: [],
Directions.OUTSIDE: []
}
def run(self) -> None:
self.keepRunning = True
direction = Directions.INSIDE
self.directionState = self.getInitialDirectionState()
self.sensor.open()
while self.keepRunning:
# Switch to other direction
direction: Directions = Directions.other(direction)
self.sensor.setDirection(direction)
distance: float = self.sensor.getDistance()
changed: bool = self.updateState(direction, distance)
if changed:
countChange: int = self.getCountChange(self.directionState)
# Hooks
th = threading.Thread(target=self.handleCallbacks, args=(countChange,))
th.start()
# Reset state if state is finalised
if not self.isDirectionTriggered(Directions.INSIDE) and not self.isDirectionTriggered(Directions.OUTSIDE):
self.directionState = self.getInitialDirectionState()
self.sensor.close()
def getCountChange(self, directionState) -> int:
# Is valid?
for direction in Directions:
# Is there at least one record for every direction?
if len(directionState[direction]) <= 0:
return 0
# Did every record start and end?
if directionState[direction][0][START_TIME] is None or directionState[direction][-1][END_TIME] is None:
return 0 # Return no change if not valid
# Get times into variables
insideStart = directionState[Directions.INSIDE][0][START_TIME]
insideEnd = directionState[Directions.INSIDE][-1][END_TIME]
outsideStart = directionState[Directions.OUTSIDE][0][START_TIME]
outsideEnd = directionState[Directions.OUTSIDE][-1][END_TIME]
# In what direction is the doorframe entered and left?
# Entering doorframe in the inside direction
enteringInside: bool = outsideStart < insideStart
# Leaving dooframe in the inside direction
leavingInside: bool = outsideEnd < insideEnd
# They have to be the same, otherwise they switch directions in between
if enteringInside != leavingInside:
# Someone did not go all the way
# Either
# Inside -######-
# Outside ---##---
# or
# Inside ---##---
# Outside -######-
return 0
# Are those times overlapping or disjunct?
if insideEnd < outsideStart or outsideEnd < insideStart:
# They are disjunct
# Either
# Inside -##-----
# Outside -----##-
# or
# Inside -----##-
# Outside -##-----
return 0
# What direction is the person taking?
if enteringInside:
# Entering the inside
# Inside ---####-
# Outside -####---
return 1
else:
# Leaving the inside
# Inside -####---
# Outside ---####-
return -1
def isTriggerDistance(self, distance: float) -> bool:
#! TODO: Should be based on the distance from the ground, not from the sensor
return distance <= self.maxTriggerDistance
def handleCallbacks(self, countChange: int):
self.handleChangeCallbacks(countChange)
self.handleCountingCallbacks(countChange)
self.handleTriggerCallbacks()
def handleCountingCallbacks(self, countChange: int) -> None:
# Only notify counting on actual count change
if countChange == 0:
return
for cb in self.callbacks[COUNTING_CB]:
cb(countChange)
def handleTriggerCallbacks(self) -> None:
triggerState = {
Directions.INSIDE: self.isDirectionTriggered(Directions.INSIDE),
Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE)
}
for cb in self.callbacks[TRIGGER_CB]:
cb(triggerState)
def handleChangeCallbacks(self, countChange: int) -> None:
for cb in self.callbacks[CHANGE_CB]:
cb(countChange, self.directionState)
def isDirectionTriggered(self, direction: Directions) -> bool:
return len(self.directionState[direction]) > 0 and self.directionState[direction][-1][END_TIME] is None
def updateState(self, direction: Directions, distance: float) -> bool:
triggered: bool = self.isTriggerDistance(distance)
previouslyTriggered = False
if len(self.directionState[direction]) > 0:
previouslyTriggered = self.directionState[direction][-1][END_TIME] is None
if triggered and not previouslyTriggered:
# Set as new beginning for this direction
self.directionState[direction].append({
START_TIME: datetime.now(),
END_TIME: None,
TRIGGER_DISTANCES: [distance],
END_DISTANCE: None
})
return True
elif not triggered and previouslyTriggered:
# Set as end for this direction
self.directionState[direction][-1][END_TIME] = datetime.now()
self.directionState[direction][-1][END_DISTANCE] = distance
return True
elif previouslyTriggered:
# Add distance at least
self.directionState[direction][-1][TRIGGER_DISTANCES].append(distance)
return False

33
src/sensors/tof_sensor.py Normal file
View file

@ -0,0 +1,33 @@
from enum import Enum
class Directions(str, Enum):
INSIDE = "indoor"
OUTSIDE = "outdoor"
def other(direction: 'Direction') -> 'Direction':
if direction is Directions.INSIDE:
return Directions.OUTSIDE
else:
return Directions.INSIDE
def __iter__():
return [Directions.INSIDE, Directions.OUTSIDE]
class ToFSensor:
def open(self) -> None:
raise NotImplementedError()
def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction.
"""
raise NotImplementedError()
def getDistance(self) -> float:
"""Returns new distance in cm.
"""
raise NotImplementedError()
def close(self) -> None:
raise NotImplementedError()

View file

@ -0,0 +1,64 @@
from sensor.tof_sensor import Directions, ToFSensor
import VL53L1X
# Reference: https://github.com/pimoroni/vl53l1x-python
#
# Left, right, top and bottom are relative to the SPAD matrix coordinates,
# which will be mirrored in real scene coordinates.
# (or even rotated, depending on the VM53L1X element alignment on the board and on the board position)
#
# ROI in SPAD matrix coords:
#
# 15 top-left
# | X____
# | | |
# | |____X
# | bottom-right
# 0__________15
#
class VL53L1XSensor (ToFSensor):
def __init__(self) -> None:
super().__init__()
def open(self) -> None:
self.sensor = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
self.sensor.open()
# Optionally set an explicit timing budget
# These values are measurement time in microseconds,
# and inter-measurement time in milliseconds.
# If you uncomment the line below to set a budget you
# should use `tof.start_ranging(0)`
# tof.set_timing(66000, 70)
self.ranging = 2
# 0 = Unchanged
# 1 = Short Range
# 2 = Medium Range
# 3 = Long Range
def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction.
"""
direction_roi = {
Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0),
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12)
}
roi = direction_roi[direction]
self.sensor.stop_ranging()
self.sensor.set_user_roi(roi)
self.sensor.start_ranging(self.ranging)
def getDistance(self) -> float:
"""Returns new distance in cm.
"""
distance = self.sensor.get_distance()
return distance / 10
def close(self) -> None:
self.sensor.stop_ranging()
self.sensor.close()

Binary file not shown.

View file

@ -0,0 +1,87 @@
from phue import Bridge
from time import sleep
from pathlib import Path
import logging
import socket
class PhilipsHue ():
def __init__(self, config):
self.config = config
self.connect()
def connect(self):
registered = Path(self.config['registered_file']).is_file()
success = False
while success == False:
try:
logging.info("Connecting to hue bridge")
self.bridge = Bridge(self.config['bridge_ip'])
self.bridge.connect()
success = True
except Exception as e:
logging.info("Failed to connect to bridge")
success = False
if registered == False:
logging.info("Trying again in 5 seconds..")
sleep(5)
else:
raise e
logging.info("Connected to hue bridge")
if registered == False:
# register
logging.info("Saving registration")
Path(self.config['registered_file']).touch()
def get_state(self):
return self.__execute__(lambda: self.bridge.get_api())
def get_scenes(self):
return self.__execute__(lambda: self.bridge.get_scene())
def get_scene_by_name(self, name):
for key, scene in self.get_scenes().items():
if scene['name'] == name:
scene['id'] = key
return scene
return None
def set_light(self, lights, command):
return self.__execute__(lambda: self.bridge.set_light(lights, command))
def get_light(self, id, command=None):
return self.__execute__(lambda: self.bridge.get_light(id, command))
def set_group(self, groups, command):
return self.__execute__(lambda: self.bridge.set_group(groups, command))
def get_group(self, id, command=None):
return self.__execute__(lambda: self.bridge.get_group(id, command))
def set_group_scene(self, group_name, scene_name):
scene_id = self.get_scene_by_name(scene_name)['id']
return self.__execute__(lambda: self.set_group(group_name, self.create_conf({'scene': scene_id})))
def create_conf(self, conf):
if 'transitiontime' not in conf.keys():
conf['transitiontime'] = self.config['transition_time']
return conf
def __execute__(self, function):
try:
return function()
except socket.timeout as e:
# Try to reconnect
logging.exception(
"Could not execute function. Trying to reconnect to bridge")
logging.exception(str(e))
try:
self.connect()
except Exception as e:
logging.exception(
"Reconnect did not succeed, skipping execution")
logging.exception(str(e))
return
# Now try again
return function()

View file

@ -0,0 +1,96 @@
from datetime import datetime
import json
from typing import Dict
from xmlrpc.client import Boolean
import matplotlib.pyplot as plt
# Config
FILE_PATH = "log.txt"
# Read file
content = None
with open(FILE_PATH, "r") as file:
content = file.readlines()
def parse_log_entry(entry: Dict) -> Dict:
# Only keep last record of a sequence
if not is_last_in_sequence(entry):
return False
entry["dateTime"] = datetime.strptime(
str(entry["dateTime"])[:19], "%Y-%m-%d %H:%M:%S")
if entry["dateTime"] < datetime(2022, 1, 1):
return False
return entry
def is_last_in_sequence(entry: Dict) -> Boolean:
indoor = entry["directionState"]["indoor"]
outdoor = entry["directionState"]["outdoor"]
if len(indoor) <= 0 or len(outdoor) <= 0:
return False
end_key = "end_distance"
# Check version
if end_key not in indoor[-1]:
end_key = "end"
if indoor[-1][end_key] is None or outdoor[-1][end_key] is None:
return False
return True
# Collect
log = [json.loads(line.strip("\x00")) for line in content]
print("Number of total entries:", len(log))
# Parse & Filter
log = [parse_log_entry(entry) for entry in log if parse_log_entry(entry)]
print("Number of filtered entries:", len(log))
# Render
fig, ax = plt.subplots() # Create a figure containing a single axes.
times: list[datetime] = [entry["dateTime"] for entry in log]
counts: list[int] = [entry["previousPeopleCount"] for entry in log]
ax.step(times, counts, where="pre")
plt.show()
print("-"*20)
# Print stats
walk_ins = [entry for entry in log if entry["countChange"] > 0]
walk_outs = [entry for entry in log if entry["countChange"] < 0]
walk_unders = [entry for entry in log if entry["countChange"] == 0]
print("Number of walk-ins:", len(walk_ins))
print("Number of walk-outs:", len(walk_outs))
print("Number of walk-unders:", len(walk_unders))
print("-"*20)
# Calculate faults
for c, n in zip(list(range(len(log))), list(range(len(log)))[1:]):
estimated_count: int = log[c]["previousPeopleCount"] + \
log[c]["countChange"]
faulty: bool = estimated_count != log[n]["previousPeopleCount"]
log[c]["faulty"] = faulty
log[c]["faultyCount"] = log[c]["previousPeopleCount"] if faulty else None
log = log[:-1]
fault_count = sum(1 for entry in log if entry["faulty"])
print("Number of faults:", fault_count)
print("Percentage of faults:", fault_count / len(log) * 100, "%")
print("-"*20)
faulty_off = [entry for entry in log if entry["faulty"]
and entry["faultyCount"] == 0]
faulty_on = [entry for entry in log if entry["faulty"]
and entry["faultyCount"] != 0]
print("Number of false-0:", len(faulty_off))
print("Number of false-1:", len(faulty_on))
print("Percentage of false-0:", len(faulty_off) / fault_count * 100, "%")
print("Percentage of false-1:", len(faulty_on) / fault_count * 100, "%")

21
src/timeloop/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2018 sankalpjonn
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

1
src/timeloop/__init__.py Normal file
View file

@ -0,0 +1 @@
from timeloop.app import Timeloop

72
src/timeloop/app.py Normal file
View file

@ -0,0 +1,72 @@
from datetime import datetime, timedelta
import logging
import sys
import signal
import time
from timeloop.exceptions import ServiceExit
from timeloop.job import Job
from timeloop.helpers import service_shutdown
class Timeloop():
def __init__(self):
self.jobs = []
logger = logging.getLogger('timeloop')
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(logging.INFO)
self.logger = logger
def _add_job(self, func, interval: timedelta, offset: timedelta=None, *args, **kwargs):
j = Job(interval, func, offset=offset, *args, **kwargs)
self.jobs.append(j)
def _block_main_thread(self):
signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)
while True:
try:
time.sleep(1)
except ServiceExit:
self.stop()
break
def _start_jobs(self, block):
for j in self.jobs:
j.daemon = not block
j.start()
self.logger.info("Registered job {}".format(j.execute))
def _stop_jobs(self):
for j in self.jobs:
self.logger.info("Stopping job {}".format(j.execute))
j.stop()
def job(self, interval: timedelta, offset: timedelta=None):
"""Decorator to define a timeloop for the decorated function.
Args:
interval (timedelta): How long to wait after every execution until the next one.
offset (timedelta, optional): Positive offset until the first execution of the function. If None, will wait with first execution until the first interval passed. If timedelta with length 0 (or smaller) will execute immediately. Defaults to None.
"""
def decorator(f):
self._add_job(f, interval, offset=offset)
return f
return decorator
def stop(self):
self._stop_jobs()
self.logger.info("Timeloop exited.")
def start(self, block=False):
self.logger.info("Starting Timeloop..")
self._start_jobs(block=block)
self.logger.info("Timeloop now started. Jobs will run based on the interval set")
if block:
self._block_main_thread()

View file

@ -0,0 +1,6 @@
class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass

5
src/timeloop/helpers.py Normal file
View file

@ -0,0 +1,5 @@
from timeloop.exceptions import ServiceExit
def service_shutdown(signum, frame):
raise ServiceExit

25
src/timeloop/job.py Normal file
View file

@ -0,0 +1,25 @@
from threading import Thread, Event
from datetime import timedelta
from time import sleep
class Job(Thread):
def __init__(self, interval: timedelta, execute, offset: timedelta=None, *args, **kwargs):
Thread.__init__(self)
self.stopped = Event()
self.interval: timedelta = interval
self.execute = execute
self.offset: timedelta = offset
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
if self.offset:
sleep(self.offset.total_seconds())
self.execute(*self.args, **self.kwargs)
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)