Implemented time triggered schedule

This commit is contained in:
Maximilian Giller 2022-04-27 22:39:27 +02:00
parent 043f405a97
commit 47197f653d
7 changed files with 163 additions and 9 deletions

View file

@ -1,4 +1,4 @@
from datetime import datetime, time from datetime import datetime, time, timedelta
import string import string
from typing import Dict from typing import Dict
from interface.philips_hue import PhilipsHue from interface.philips_hue import PhilipsHue
@ -7,6 +7,7 @@ from sensor.tof_sensor import Directions
from sensor.vl53l1x_sensor import VL53L1XSensor from sensor.vl53l1x_sensor import VL53L1XSensor
import logging import logging
import json import json
from timeloop import Timeloop
# Should lights already turn on where there is any kind of motion in the sensor # Should lights already turn on where there is any kind of motion in the sensor
@ -31,6 +32,7 @@ hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
peopleCount: int = 0 # Global count of people on the inside peopleCount: int = 0 # Global count of people on the inside
motion_triggered_lights = False # Is light on because of any detected motion motion_triggered_lights = False # Is light on because of any detected motion
timeloop: Timeloop = Timeloop() # Used for time triggered schedule
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
@ -67,13 +69,6 @@ def get_scene_for_time(time: time) -> string:
return SCHEDULE.values()[-1] return SCHEDULE.values()[-1]
def register_time_triggers():
"""Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on.
"""
#! TODO
logging.info("Registered time triggers.")
def change_cb(countChange: int, directionState: Dict): def change_cb(countChange: int, directionState: Dict):
"""Handles basic logging of event data for later analysis. """Handles basic logging of event data for later analysis.
@ -212,7 +207,7 @@ def set_light_state(target_light_state: bool) -> bool:
return previous_lights_state return previous_lights_state
# Adjust light as necessary # Adjust light as necessary
target_scene = get_scene_for_time(datetime.now()) target_scene = get_scene_for_time(datetime.now().time())
# Set to specific scene if exists # Set to specific scene if exists
if target_scene: if target_scene:
hue.set_group_scene(hue_conf['light_group'], target_scene) hue.set_group_scene(hue_conf['light_group'], target_scene)
@ -233,6 +228,36 @@ def get_light_state() -> bool:
return hue.get_group(hue_conf['light_group'])['state']['any_on'] return hue.get_group(hue_conf['light_group'])['state']['any_on']
def update_scene():
"""Called by time trigger to update light scene if lights are on.
"""
scene = get_scene_for_time(datetime.now().time())
if scene is None:
return
set_light_scene(scene)
logging.debug(f'Updated scene at {datetime.now().time()} to {scene}.')
def register_time_triggers():
"""Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on.
"""
global SCHEDULE
if SCHEDULE is None or len(SCHEDULE) <= 0:
return
for time in SCHEDULE.keys():
delta = time - datetime.now().time()
if delta < 0:
delta += timedelta(1)
timeloop._add_job(update_scene, interval=timedelta(1), offset=delta)
timeloop.start(block=False)
logging.info("Registered time triggers.")
register_time_triggers() register_time_triggers()
# Represents callback trigger order # Represents callback trigger order

21
src/timeloop/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2018 sankalpjonn
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

1
src/timeloop/__init__.py Normal file
View file

@ -0,0 +1 @@
from timeloop.app import Timeloop

72
src/timeloop/app.py Normal file
View file

@ -0,0 +1,72 @@
from datetime import datetime, timedelta
import logging
import sys
import signal
import time
from timeloop.exceptions import ServiceExit
from timeloop.job import Job
from timeloop.helpers import service_shutdown
class Timeloop():
def __init__(self):
self.jobs = []
logger = logging.getLogger('timeloop')
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(logging.INFO)
self.logger = logger
def _add_job(self, func, interval: timedelta, offset: timedelta=None, *args, **kwargs):
j = Job(interval, func, offset=offset *args, **kwargs)
self.jobs.append(j)
def _block_main_thread(self):
signal.signal(signal.SIGTERM, service_shutdown)
signal.signal(signal.SIGINT, service_shutdown)
while True:
try:
time.sleep(1)
except ServiceExit:
self.stop()
break
def _start_jobs(self, block):
for j in self.jobs:
j.daemon = not block
j.start()
self.logger.info("Registered job {}".format(j.execute))
def _stop_jobs(self):
for j in self.jobs:
self.logger.info("Stopping job {}".format(j.execute))
j.stop()
def job(self, interval: timedelta, offset: timedelta=None):
"""Decorator to define a timeloop for the decorated function.
Args:
interval (timedelta): How long to wait after every execution until the next one.
offset (timedelta, optional): Positive offset until the first execution of the function. If None, will wait with first execution until the first interval passed. If timedelta with length 0 (or smaller) will execute immediately. Defaults to None.
"""
def decorator(f):
self._add_job(f, interval, offset=offset)
return f
return decorator
def stop(self):
self._stop_jobs()
self.logger.info("Timeloop exited.")
def start(self, block=False):
self.logger.info("Starting Timeloop..")
self._start_jobs(block=block)
self.logger.info("Timeloop now started. Jobs will run based on the interval set")
if block:
self._block_main_thread()

View file

@ -0,0 +1,6 @@
class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass

5
src/timeloop/helpers.py Normal file
View file

@ -0,0 +1,5 @@
from timeloop.exceptions import ServiceExit
def service_shutdown(signum, frame):
raise ServiceExit

24
src/timeloop/job.py Normal file
View file

@ -0,0 +1,24 @@
from threading import Thread, Event
from datetime import timedelta
from time import sleep
class Job(Thread):
def __init__(self, interval: timedelta, execute, offset: timedelta=None, *args, **kwargs):
Thread.__init__(self)
self.stopped = Event()
self.interval: timedelta = interval
self.execute = execute
self.offset: timedelta = offset
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
if self.offset:
sleep(self.offset.total_seconds())
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)