This commit is contained in:
Maximilian Giller 2022-08-06 00:51:03 +02:00
parent 087978d1ac
commit dffa83870d
4 changed files with 39 additions and 100 deletions

View file

@ -1,18 +0,0 @@
from sensor.people_counter import PeopleCounter
from sensor.vl53l1x_sensor import VL53L1XSensor
import logging
counter = PeopleCounter(VL53L1XSensor())
peopleCount = 0
logging.getLogger().setLevel(logging.INFO)
def countChange(change: int) -> None:
global peopleCount
peopleCount += change
logging.info(f'People count change to: {peopleCount}')
counter.hookCounting(countChange)
counter.run()

View file

@ -1,44 +0,0 @@
from sensor.people_counter import PeopleCounter
from sensor.vl53l1x_sensor import VL53L1XSensor
import paho.mqtt.client as mqtt
from HaMqtt.MQTTSensor import MQTTSensor
from HaMqtt.MQTTUtil import HaDeviceClass
import logging
HA_URL = ""
HA_PORT = 1883
HA_SENSOR_NAME = ""
HA_SENSOR_ID = ""
HA_SENSOR_DEVICE_CLASS = HaDeviceClass.NONE
SENSOR_UNIT = ""
# Setup connection to HA
mqttClient = mqtt.Client()
mqttClient.connect(HA_URL, HA_PORT)
mqttClient.loop_start() # Keep conneciton alive
# Setup mqtt binding
sensor = MQTTSensor(HA_SENSOR_NAME, HA_SENSOR_ID, mqttClient, SENSOR_UNIT, HA_SENSOR_DEVICE_CLASS)
logging.debug(f'Connected to topic {sensor.state_topic}')
def countChange(change: int) -> None:
"""Called when people count change is detected.
Sends update to the initialized HA instance.
Args:
change (int): Number of people leaving (<0) or entering (>0) a room.
"""
# Send update to HA
global sensor
sensor.publish_state(change)
logging.debug(f'People count changed by {change}')
# Setup people count sensor
counter = PeopleCounter(VL53L1XSensor())
counter.hookCounting(countChange)
counter.run()

View file

@ -1,9 +1,7 @@
from datetime import datetime, time, timedelta
from typing import Dict
from interface.philips_hue import PhilipsHue
from sensor.people_counter import PeopleCounter
from sensor.tof_sensor import Directions
from sensor.vl53l1x_sensor import VL53L1XSensor
from services.philips_hue import PhilipsHue
from sensors import PeopleCounter, Directions, VL53L1XSensor
import logging
import json
from timeloop import Timeloop
@ -13,7 +11,9 @@ from timeloop import Timeloop
ENABLE_MOTION_TRIGGERED_LIGHT = True
# Should lights change when a certain time in the schedule is reached
ENABLE_SCHEDULE_TRIGGERS = False # Not working correctly at the moment, so turned off by default
ENABLE_SCHEDULE_TRIGGERS = (
False # Not working correctly at the moment, so turned off by default
)
# Schedule (Key is time after scene should be used. Value is scene name to be used.)
# Needs to be sorted chronologically
@ -22,11 +22,11 @@ SCHEDULE = {}
LOG_FILE_PATH = "log.txt" # Path for logs
hue_conf = {
'bridge_ip': '',
'transition_time': 10, # seconds
'light_group': '',
"bridge_ip": "",
"transition_time": 10, # seconds
"light_group": "",
# If file exists, application is considered 'registered' at the bridge
'registered_file': 'smart_light_registered.bridge'
"registered_file": "smart_light_registered.bridge",
} # Custom configuration for philips hue
@ -96,19 +96,19 @@ def change_cb(countChange: int, directionState: Dict):
directionState (Dict): Object describing the internal state of the sensor.
"""
data = {
'version': 'v0.0',
'previousPeopleCount': peopleCount,
'countChange': countChange,
'directionState': directionState,
'dateTime': datetime.now(),
'motionTriggeredLights': motion_triggered_lights
"version": "v0.0",
"previousPeopleCount": peopleCount,
"countChange": countChange,
"directionState": directionState,
"dateTime": datetime.now(),
"motionTriggeredLights": motion_triggered_lights,
}
try:
with open(LOG_FILE_PATH, 'a') as f:
with open(LOG_FILE_PATH, "a") as f:
f.write(json.dumps(data, default=str) + "\n")
except Exception as ex:
logging.exception(f'Unable to write log file. {ex}')
logging.exception(f"Unable to write log file. {ex}")
def count_change(change: int) -> None:
@ -128,16 +128,16 @@ def count_change(change: int) -> None:
if peopleCount <= 0 and previous_lights_state and not motion_triggered_lights:
# Count was 0, but lights were on (not because of motion triggers) => people count was not actually 0
peopleCount = 1
logging.debug(f'People count corrected to {peopleCount}')
logging.debug(f"People count corrected to {peopleCount}")
elif peopleCount > 0 and not previous_lights_state:
# Count was >0, but lights were off => people count was actually 0
peopleCount = 0
logging.debug(f'People count corrected to {peopleCount}')
logging.debug(f"People count corrected to {peopleCount}")
peopleCount += change
if peopleCount < 0:
peopleCount = 0
logging.debug(f'People count changed by {change}')
logging.debug(f"People count changed by {change}")
# Handle light
target_light_state = peopleCount > 0
@ -164,7 +164,9 @@ def trigger_change(triggerState: Dict):
target_light_state = None
# Is someone walking close to the door?
motion_detected = triggerState[Directions.INSIDE] or triggerState[Directions.OUTSIDE]
motion_detected = (
triggerState[Directions.INSIDE] or triggerState[Directions.OUTSIDE]
)
target_light_state = motion_detected
# Does motion triggered light need to do anything?
@ -199,9 +201,8 @@ def set_light_scene(target_scene: str) -> bool:
return
# Set lights to scene
hue.set_group_scene(hue_conf['light_group'], target_scene)
logging.debug(
f'Light scene set to {target_scene}')
hue.set_group_scene(hue_conf["light_group"], target_scene)
logging.debug(f"Light scene set to {target_scene}")
def set_light_state(target_light_state: bool) -> bool:
@ -222,12 +223,13 @@ def set_light_state(target_light_state: bool) -> bool:
target_scene = get_scene_for_time(datetime.now().time())
if target_light_state and target_scene:
# Set to specific scene if exists
hue.set_group_scene(hue_conf['light_group'], target_scene)
hue.set_group_scene(hue_conf["light_group"], target_scene)
logging.debug(
f'Light state changed to {target_light_state} with scene {target_scene}')
f"Light state changed to {target_light_state} with scene {target_scene}"
)
else:
hue.set_group(hue_conf['light_group'], {'on': target_light_state})
logging.debug(f'Light state changed to {target_light_state}')
hue.set_group(hue_conf["light_group"], {"on": target_light_state})
logging.debug(f"Light state changed to {target_light_state}")
return previous_lights_state
@ -237,24 +239,22 @@ def get_light_state() -> bool:
Returns:
bool: Current light state.
"""
return hue.get_group(hue_conf['light_group'])['state']['any_on']
return hue.get_group(hue_conf["light_group"])["state"]["any_on"]
def update_scene():
"""Called by time trigger to update light scene if lights are on.
"""
"""Called by time trigger to update light scene if lights are on."""
scene = get_scene_for_time(datetime.now().time())
if scene is None:
return
set_light_scene(scene)
logging.debug(f'Updated scene at {datetime.now().time()} to {scene}.')
logging.debug(f"Updated scene at {datetime.now().time()} to {scene}.")
def register_time_triggers():
"""Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on.
"""
"""Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on."""
global SCHEDULE
if SCHEDULE is None or len(SCHEDULE) <= 0:
return

View file

@ -1,2 +1,3 @@
from sensors.vl53l1x_sensor import VL53L1XSensor
from sensors.people_counter import PeopleCounter
from sensors.tof_sensor import Directions