Refresh for public use

This commit is contained in:
Maximilian Giller 2022-09-14 00:15:37 +02:00
parent 8dcf095c60
commit 9c935ba22d
15 changed files with 356 additions and 170 deletions

119
README.md
View file

@ -1,7 +1,114 @@
# tof-people-count # ToF People Count
# ToDo Time-of-Flight based people count sensor, with the goal to work in a smart home environment. Currently setup to directly communicate with a Philips Hue Bridge and control a single group of lights.
- Energy-saving/Off mode (Only one light slighty on to deal with the state) (How should power plugs be handled?)
- Daylight Adjustment (E.g. No ceiling lights during daytime) This is very much a work in progress, so every issue and/or pull request is welcome.
- Save scene when turning off, to reapply same scene when turning on
- Detect fast flickering of light state, indicating an issue, and disable the system for a few minutes Feel free to join and talk on my little server: https://discord.gg/ftwwSdY9e5
## Initial setup and use
### Required hardware
- Some kind of device that can run python and connect to the required sensor (developed for **Raspberry Pi Zero W**)
- **VL53L1X** sensor, I got: https://www.berrybase.de/vl53l1x-time-of-flight-tof-bewegungs-sensor-breakout
- Philips Hue Bridge recommended, or anything else you want to control with this sensor
### Hardware setup
Most importantly, the sensor should be in the door frame, pointing down. The orientation of the sensor is also important, since you want it to increase the count for people walking into your room, and decrease the count for people walking out.
In the photos below you can see my current setup. The piece of cardboard is supposed to block sunlight from interfering with the sensor. Before the cardboard piece was attached, I had more issues with the sensors, especially in the morning, when sun was shining through a nearby window.
The piece of cardboard is on the side of my room, which might help you to orient the sensor properly.
<p align="left">
<img height="200px" src="/images/door.png" />
<br/>
<img height="200px" src="/images/controller.jpeg" />
<br/>
<img height="200px" src="/images/sensor.jpeg" />
</p>
Sadly, due to my limited time, there is so far no proper, easy way of changing the direction of the sensor in software. It is on my todo list tho (see below) and please don't hesistate to open issue if you require a certain feature! I am more than happy to help and expand this project.
### Software setup
This will go through the required setup for `simple_hue_counter.py` and `smart_hue_counter.py`.
1. Clone the repository to your controller
2. Make sure to install everything required from the `requirements.txt`
3. Open your desired script (`simple_hue_counter.py` or `smart_hue_counter.py`) and edit the `hue_conf`
- `bridge_ip`: Insert the local ip-address of your philips hue bridge here
- `transition_time`: Supposed to be the duration of the light turning on and off, doesn't seem to work as expected tho. Feel free to play around with it.
- `light_group`: Enter the name of the light group/room you want to switch on and off. Just use the human readable name you gave your group in the philips hue app.
- `registered_file`: No need to change this.
- If you are using the `smart_*`-script:
- `ENABLE_MOTION_TRIGGERED_LIGHT`: Already turns on lights if someone is _in_ the door. If this is off, the light will only switch when you completely walked through the sensors field of view. Having it on thereby makes it switch on faster, which is especially desirable during the night.
- `ENABLE_SCHEDULE_TRIGGERS`: When turned on, will setup time triggers for the `SCHEDULE` defined below, so that (when someone is in the room) the light will automatically switch to the current scene when the corresponding time is reached.
- `SCHEDULE`: If left empty, lights will simply be turned on and off, reusing the previous color and brightness when they were turned off. If you want specific scenes to be turned on during different times of the day, you can define a Schedule here.
```python
SCHEDULE: dict[time, str] = {
time(8, 0): "Good Morning",
time(18, 0): "Evening",
time(22, 0): "Nightlight",
}
```
This schedule will use the scene "Good Morning" from 8am til 6pm, "Evening" from 6pm til 10pm and "Nightlight" from 10pm til 8am.
The scene name is also just the human readable name from the philips hue app. If you renamed a scene, you might need to use the previous name.
4. Start your desired script for the first time. You need to press the pairing button on the philips hue bridge ideally in the first 30 seconds (the file referenced in `registered_file` should be created if successful)
5. Setup your script to automatically run when your controller powers on (I like to use supervisor to manage my autostart processes)
It should be working now, hopefully!
### Use
The initial count is 0.
If you walk in, it will increase the counter. If the counter is >0, the lights will turn (or stay) on.
If you walk out, it will decrease the counter. The counter does not go below 0. If the counter reaches 0, the lights will turn off, otherwise will stay on.
Everytime someone triggers an event, the current state of the lights is checked. If the count should be 0, but the lights were on, it will be adjusted to 1, before applying the corresponding count (-1 for walking out, +1 for walking in).
If the count should be >0, but the lights were off, it will be adjusted to 0, before applying the corresponding count.
This way you can correct the counter by simply turning the lights on and off via the app. I personally recommended to setup voice commands for ease of use with the personal assistant of your choosing.
A log is written with every event. Great for analysing wrong counts and fun statistics!
## Outlook
### ToDo
- Implement support for multiple sensor (server/clients) to control more than one group/room
- Energy-saving/Off mode (Only one light slighty turned on to keep track of current state) (How should power plugs be handled?)
- Daylight Adjustment (E.g. No ceiling lights during daytime)
- Save scene when turning off, to re-apply same scene when turning on (Because just turning lights on turns ALL lights on of that group)
- Detect fast flickering of light state, indicating an issue, and disable the system for a few minutes
- Simple software switch to change direction of sensor
- Better project structure and objects
- Packaging it up for ease of use in custom projects
- Create implementation that can be used on energy-efficient microcontrollers like an esp
- Try out other sensors like the VL53L3CX
- Write proper hardware-guide
### Goal
Knowing how many people are in a room, is a really fun information. If at least one person is in a room, turn on the lights! If someone is listening to music alone in the room, and a second person enters the room, turn down the music for an easier conversation. If noone is in the room, lock the computer and put it to sleep after a while.
I personally think, this has a lot of potential for future home automation, and actually makes peoples life easier. (Not like voice controlled light switches, just making it more difficult) Simply using distance sensors reduces the privacy implications significantly, compared to using some kind of camera setup for counting people.
To actually be usable for a wider range of people, it needs to get more reliable, close to 100%. It ideally needs to be as reliable as an actual light switch.
This shall remain an open source, privacy in mind, smart home application. Usable home-wide, not just for a single room. Certainly no requirement for an external server.
Making it compatible with systems like homeassistant is definitely a good idea. Though I am actually planning to create my own little python framework for my personal needs, since homeassistant seems weirdly complicated to use to me and is missing some concepts like "room" or "person" that I would like to have.
Take what you need, go wild, and have fun.

BIN
images/controller.jpeg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 248 KiB

BIN
images/door.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 773 KiB

BIN
images/sensor.jpeg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 319 KiB

View file

@ -1,11 +1,7 @@
smbus2 smbus2
vl53l1x vl53l1x
# For Home Assistant MQTT Sensor # For Philips Hue-based Counter
paho-mqtt
homeassistant-mqtt-binding
# For Philips Hue Counter
phue phue
# For statistics # For statistics

View file

@ -1,5 +1,5 @@
from sensor.people_counter import PeopleCounter from sensors.people_counter import PeopleCounter
from sensor.vl53l1x_sensor import VL53L1XSensor from sensors.vl53l1x_sensor import VL53L1XSensor
import logging import logging
counter = PeopleCounter(VL53L1XSensor()) counter = PeopleCounter(VL53L1XSensor())
@ -11,7 +11,7 @@ logging.getLogger().setLevel(logging.INFO)
def countChange(change: int) -> None: def countChange(change: int) -> None:
global peopleCount global peopleCount
peopleCount += change peopleCount += change
logging.info(f'People count change to: {peopleCount}') logging.info(f"People count change to: {peopleCount}")
counter.hookCounting(countChange) counter.hookCounting(countChange)

View file

@ -1,44 +0,0 @@
from sensor.people_counter import PeopleCounter
from sensor.vl53l1x_sensor import VL53L1XSensor
import paho.mqtt.client as mqtt
from HaMqtt.MQTTSensor import MQTTSensor
from HaMqtt.MQTTUtil import HaDeviceClass
import logging
HA_URL = ""
HA_PORT = 1883
HA_SENSOR_NAME = ""
HA_SENSOR_ID = ""
HA_SENSOR_DEVICE_CLASS = HaDeviceClass.NONE
SENSOR_UNIT = ""
# Setup connection to HA
mqttClient = mqtt.Client()
mqttClient.connect(HA_URL, HA_PORT)
mqttClient.loop_start() # Keep conneciton alive
# Setup mqtt binding
sensor = MQTTSensor(HA_SENSOR_NAME, HA_SENSOR_ID, mqttClient, SENSOR_UNIT, HA_SENSOR_DEVICE_CLASS)
logging.debug(f'Connected to topic {sensor.state_topic}')
def countChange(change: int) -> None:
"""Called when people count change is detected.
Sends update to the initialized HA instance.
Args:
change (int): Number of people leaving (<0) or entering (>0) a room.
"""
# Send update to HA
global sensor
sensor.publish_state(change)
logging.debug(f'People count changed by {change}')
# Setup people count sensor
counter = PeopleCounter(VL53L1XSensor())
counter.hookCounting(countChange)
counter.run()

3
src/sensors/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from sensors.vl53l1x_sensor import VL53L1XSensor
from sensors.people_counter import PeopleCounter
from sensors.tof_sensor import Directions

View file

@ -1,5 +1,4 @@
from typing import Dict from sensors.tof_sensor import ToFSensor, Directions
from sensor.tof_sensor import ToFSensor, Directions
from datetime import datetime from datetime import datetime
import threading import threading
@ -13,11 +12,11 @@ TRIGGER_DISTANCES = "trigger_distances"
END_DISTANCE = "end_distance" END_DISTANCE = "end_distance"
class PeopleCounter (): class PeopleCounter:
def __init__(self, sensor: ToFSensor) -> None: def __init__(self, sensor: ToFSensor) -> None:
self.sensor = sensor self.sensor = sensor
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []} self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []}
self.maxTriggerDistance = 120 # In cm self.maxTriggerDistance = 120 # In cm
def hookCounting(self, cb) -> None: def hookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].append(cb) self.callbacks[COUNTING_CB].append(cb)
@ -37,11 +36,8 @@ class PeopleCounter ():
def unhookChange(self, cb) -> None: def unhookChange(self, cb) -> None:
self.callbacks[CHANGE_CB].remove(cb) self.callbacks[CHANGE_CB].remove(cb)
def getInitialDirectionState(self) -> Dict: def getInitialDirectionState(self) -> dict[Directions, list]:
return { return {Directions.INSIDE: [], Directions.OUTSIDE: []}
Directions.INSIDE: [],
Directions.OUTSIDE: []
}
def run(self) -> None: def run(self) -> None:
self.keepRunning = True self.keepRunning = True
@ -66,7 +62,9 @@ class PeopleCounter ():
th.start() th.start()
# Reset state if state is finalised # Reset state if state is finalised
if not self.isDirectionTriggered(Directions.INSIDE) and not self.isDirectionTriggered(Directions.OUTSIDE): if not self.isDirectionTriggered(
Directions.INSIDE
) and not self.isDirectionTriggered(Directions.OUTSIDE):
self.directionState = self.getInitialDirectionState() self.directionState = self.getInitialDirectionState()
self.sensor.close() self.sensor.close()
@ -79,8 +77,11 @@ class PeopleCounter ():
return 0 return 0
# Did every record start and end? # Did every record start and end?
if directionState[direction][0][START_TIME] is None or directionState[direction][-1][END_TIME] is None: if (
return 0 # Return no change if not valid directionState[direction][0][START_TIME] is None
or directionState[direction][-1][END_TIME] is None
):
return 0 # Return no change if not valid
# Get times into variables # Get times into variables
insideStart = directionState[Directions.INSIDE][0][START_TIME] insideStart = directionState[Directions.INSIDE][0][START_TIME]
@ -148,7 +149,7 @@ class PeopleCounter ():
def handleTriggerCallbacks(self) -> None: def handleTriggerCallbacks(self) -> None:
triggerState = { triggerState = {
Directions.INSIDE: self.isDirectionTriggered(Directions.INSIDE), Directions.INSIDE: self.isDirectionTriggered(Directions.INSIDE),
Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE) Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE),
} }
for cb in self.callbacks[TRIGGER_CB]: for cb in self.callbacks[TRIGGER_CB]:
@ -159,7 +160,10 @@ class PeopleCounter ():
cb(countChange, self.directionState) cb(countChange, self.directionState)
def isDirectionTriggered(self, direction: Directions) -> bool: def isDirectionTriggered(self, direction: Directions) -> bool:
return len(self.directionState[direction]) > 0 and self.directionState[direction][-1][END_TIME] is None return (
len(self.directionState[direction]) > 0
and self.directionState[direction][-1][END_TIME] is None
)
def updateState(self, direction: Directions, distance: float) -> bool: def updateState(self, direction: Directions, distance: float) -> bool:
triggered: bool = self.isTriggerDistance(distance) triggered: bool = self.isTriggerDistance(distance)
@ -170,12 +174,14 @@ class PeopleCounter ():
if triggered and not previouslyTriggered: if triggered and not previouslyTriggered:
# Set as new beginning for this direction # Set as new beginning for this direction
self.directionState[direction].append({ self.directionState[direction].append(
START_TIME: datetime.now(), {
END_TIME: None, START_TIME: datetime.now(),
TRIGGER_DISTANCES: [distance], END_TIME: None,
END_DISTANCE: None TRIGGER_DISTANCES: [distance],
}) END_DISTANCE: None,
}
)
return True return True
elif not triggered and previouslyTriggered: elif not triggered and previouslyTriggered:
# Set as end for this direction # Set as end for this direction

View file

@ -5,7 +5,7 @@ class Directions(str, Enum):
INSIDE = "indoor" INSIDE = "indoor"
OUTSIDE = "outdoor" OUTSIDE = "outdoor"
def other(direction: 'Direction') -> 'Direction': def other(direction: "Directions") -> "Directions":
if direction is Directions.INSIDE: if direction is Directions.INSIDE:
return Directions.OUTSIDE return Directions.OUTSIDE
else: else:
@ -20,13 +20,11 @@ class ToFSensor:
raise NotImplementedError() raise NotImplementedError()
def setDirection(self, direction: Directions) -> None: def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction. """Configure sensor to pick up the distance in a specific direction."""
"""
raise NotImplementedError() raise NotImplementedError()
def getDistance(self) -> float: def getDistance(self) -> float:
"""Returns new distance in cm. """Returns new distance in cm."""
"""
raise NotImplementedError() raise NotImplementedError()
def close(self) -> None: def close(self) -> None:

View file

@ -1,4 +1,4 @@
from sensor.tof_sensor import Directions, ToFSensor from sensors.tof_sensor import Directions, ToFSensor
import VL53L1X import VL53L1X
# Reference: https://github.com/pimoroni/vl53l1x-python # Reference: https://github.com/pimoroni/vl53l1x-python
@ -18,7 +18,7 @@ import VL53L1X
# #
class VL53L1XSensor (ToFSensor): class VL53L1XSensor(ToFSensor):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
@ -39,11 +39,10 @@ class VL53L1XSensor (ToFSensor):
# 3 = Long Range # 3 = Long Range
def setDirection(self, direction: Directions) -> None: def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction. """Configure sensor to pick up the distance in a specific direction."""
"""
direction_roi = { direction_roi = {
Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0), Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0),
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12) Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12),
} }
roi = direction_roi[direction] roi = direction_roi[direction]
@ -53,8 +52,7 @@ class VL53L1XSensor (ToFSensor):
self.sensor.start_ranging(self.ranging) self.sensor.start_ranging(self.ranging)
def getDistance(self) -> float: def getDistance(self) -> float:
"""Returns new distance in cm. """Returns new distance in cm."""
"""
distance = self.sensor.get_distance() distance = self.sensor.get_distance()
return distance / 10 return distance / 10

View file

@ -5,18 +5,18 @@ import logging
import socket import socket
class PhilipsHue (): class PhilipsHue:
def __init__(self, config): def __init__(self, config):
self.config = config self.config = config
self.connect() self.connect()
def connect(self): def connect(self):
registered = Path(self.config['registered_file']).is_file() registered = Path(self.config["registered_file"]).is_file()
success = False success = False
while success == False: while success == False:
try: try:
logging.info("Connecting to hue bridge") logging.info("Connecting to hue bridge")
self.bridge = Bridge(self.config['bridge_ip']) self.bridge = Bridge(self.config["bridge_ip"])
self.bridge.connect() self.bridge.connect()
success = True success = True
except Exception as e: except Exception as e:
@ -32,7 +32,7 @@ class PhilipsHue ():
if registered == False: if registered == False:
# register # register
logging.info("Saving registration") logging.info("Saving registration")
Path(self.config['registered_file']).touch() Path(self.config["registered_file"]).touch()
def get_state(self): def get_state(self):
return self.__execute__(lambda: self.bridge.get_api()) return self.__execute__(lambda: self.bridge.get_api())
@ -42,8 +42,8 @@ class PhilipsHue ():
def get_scene_by_name(self, name): def get_scene_by_name(self, name):
for key, scene in self.get_scenes().items(): for key, scene in self.get_scenes().items():
if scene['name'] == name: if scene["name"] == name:
scene['id'] = key scene["id"] = key
return scene return scene
return None return None
@ -60,12 +60,14 @@ class PhilipsHue ():
return self.__execute__(lambda: self.bridge.get_group(id, command)) return self.__execute__(lambda: self.bridge.get_group(id, command))
def set_group_scene(self, group_name, scene_name): def set_group_scene(self, group_name, scene_name):
scene_id = self.get_scene_by_name(scene_name)['id'] scene_id = self.get_scene_by_name(scene_name)["id"]
return self.__execute__(lambda: self.set_group(group_name, self.create_conf({'scene': scene_id}))) return self.__execute__(
lambda: self.set_group(group_name, self.create_conf({"scene": scene_id}))
)
def create_conf(self, conf): def create_conf(self, conf):
if 'transitiontime' not in conf.keys(): if "transitiontime" not in conf.keys():
conf['transitiontime'] = self.config['transition_time'] conf["transitiontime"] = self.config["transition_time"]
return conf return conf
def __execute__(self, function): def __execute__(self, function):
@ -74,13 +76,13 @@ class PhilipsHue ():
except socket.timeout as e: except socket.timeout as e:
# Try to reconnect # Try to reconnect
logging.exception( logging.exception(
"Could not execute function. Trying to reconnect to bridge") "Could not execute function. Trying to reconnect to bridge"
)
logging.exception(str(e)) logging.exception(str(e))
try: try:
self.connect() self.connect()
except Exception as e: except Exception as e:
logging.exception( logging.exception("Reconnect did not succeed, skipping execution")
"Reconnect did not succeed, skipping execution")
logging.exception(str(e)) logging.exception(str(e))
return return
# Now try again # Now try again

121
src/simple_hue_counter.py Normal file
View file

@ -0,0 +1,121 @@
from datetime import datetime
from services.philips_hue import PhilipsHue
from sensors.people_counter import PeopleCounter
from sensors.vl53l1x_sensor import VL53L1XSensor
import logging
import json
LOG_FILE_PATH = "log.txt" # Path for logs
hue_conf = {
"bridge_ip": "",
"transition_time": 10, # seconds
# Light group to control
"light_group": "",
# If file exists, application is considered 'registered' at the bridge
"registered_file": "smart_light_registered.bridge",
} # Custom configuration for philips hue
hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
peopleCount: int = 0 # Global count of people on the inside
logging.getLogger().setLevel(logging.INFO)
def change_cb(countChange: int, directionState: dict):
"""Handles basic logging of event data for later analysis.
Args:
countChange (int): The change in the number of people. Usually on of [-1, 0, 1].
directionState (dict): Object describing the internal state of the sensor.
"""
data = {
"version": "v0.0",
"previousPeopleCount": peopleCount,
"countChange": countChange,
"directionState": directionState,
"dateTime": datetime.now(),
"motionTriggeredLights": False,
}
try:
with open(LOG_FILE_PATH, "a") as f:
f.write(json.dumps(data, default=str) + "\n")
except Exception as ex:
logging.exception(f"Unable to write log file. {ex}")
def count_change(change: int) -> None:
"""Handles light state when people count changes
Args:
change (int): The change in the number of people. Usually on of [-1, 0, 1].
"""
global hue
global peopleCount
# Are lights on at the moment?
previous_lights_state = get_light_state()
# Apply correction
if peopleCount <= 0 and previous_lights_state:
# Count was 0, but lights were on => people count was not actually 0
peopleCount = 1
logging.debug(f"People count corrected to {peopleCount}")
elif peopleCount > 0 and not previous_lights_state:
# Count was >0, but lights were off => people count was actually 0
peopleCount = 0
logging.debug(f"People count corrected to {peopleCount}")
peopleCount += change
if peopleCount < 0:
peopleCount = 0
logging.debug(f"People count changed by {change}")
# Handle light
target_light_state = peopleCount > 0
# Return, if there is no change
if previous_lights_state == target_light_state:
return
set_light_state(target_light_state)
def set_light_state(target_light_state: bool) -> bool:
"""Sets the lights to the given state.
Args:
target_light_state (bool): Should lights on the inside be on or off.
Returns:
bool: Previous light state.
"""
# Are lights on at the moment?
previous_lights_state = get_light_state()
if target_light_state == previous_lights_state:
return previous_lights_state
# Adjust light as necessary
hue.set_group(hue_conf["light_group"], {"on": target_light_state})
logging.debug(f"Light state changed to {target_light_state}")
return previous_lights_state
def get_light_state() -> bool:
"""
Returns:
bool: Current light state.
"""
return hue.get_group(hue_conf["light_group"])["state"]["any_on"]
if __name__ == "__main__":
# Represents callback trigger order
counter.hookChange(change_cb)
counter.hookCounting(count_change)
counter.run()

View file

@ -1,9 +1,6 @@
from datetime import datetime, time, timedelta from datetime import datetime, time, timedelta
from typing import Dict from services.philips_hue import PhilipsHue
from interface.philips_hue import PhilipsHue from sensors import PeopleCounter, Directions, VL53L1XSensor
from sensor.people_counter import PeopleCounter
from sensor.tof_sensor import Directions
from sensor.vl53l1x_sensor import VL53L1XSensor
import logging import logging
import json import json
from timeloop import Timeloop from timeloop import Timeloop
@ -13,27 +10,33 @@ from timeloop import Timeloop
ENABLE_MOTION_TRIGGERED_LIGHT = True ENABLE_MOTION_TRIGGERED_LIGHT = True
# Should lights change when a certain time in the schedule is reached # Should lights change when a certain time in the schedule is reached
ENABLE_SCHEDULE_TRIGGERS = False # Not working correctly at the moment, so turned off by default ENABLE_SCHEDULE_TRIGGERS = (
False # Not working correctly at the moment, so turned off by default
)
# Schedule (Key is time after scene should be used. Value is scene name to be used.) # Schedule (Key is time after scene should be used. Value is scene name to be used.)
# Needs to be sorted chronologically # Needs to be sorted chronologically
SCHEDULE = {} SCHEDULE: dict[time, str] = {
time(8, 0): "Good Morning",
time(18, 0): "Evening",
time(22, 0): "Nightlight",
}
LOG_FILE_PATH = "log.txt" # Path for logs LOG_FILE_PATH = "log.txt" # Path for logs
hue_conf = { hue_conf = {
'bridge_ip': '', "bridge_ip": "",
'transition_time': 10, # seconds "transition_time": 10, # seconds
'light_group': '', "light_group": "",
# If file exists, application is considered 'registered' at the bridge # If file exists, application is considered 'registered' at the bridge
'registered_file': 'smart_light_registered.bridge' "registered_file": "smart_light_registered.bridge",
} # Custom configuration for philips hue } # Custom configuration for philips hue
hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
peopleCount: int = 0 # Global count of people on the inside peopleCount: int = 0 # Global count of people on the inside
motion_triggered_lights = False # Is light on because of any detected motion motion_triggered_lights = False # Is light on because of any detected motion
timeloop: Timeloop = Timeloop() # Used for time triggered schedule timeloop: Timeloop = Timeloop() # Used for time triggered schedule
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
@ -88,27 +91,27 @@ def get_scene_for_time(time: time) -> str:
return list(SCHEDULE.values())[-1] return list(SCHEDULE.values())[-1]
def change_cb(countChange: int, directionState: Dict): def change_cb(countChange: int, directionState: dict):
"""Handles basic logging of event data for later analysis. """Handles basic logging of event data for later analysis.
Args: Args:
countChange (int): The change in the number of people. Usually on of [-1, 0, 1]. countChange (int): The change in the number of people. Usually on of [-1, 0, 1].
directionState (Dict): Object describing the internal state of the sensor. directionState (dict): Object describing the internal state of the sensor.
""" """
data = { data = {
'version': 'v0.0', "version": "v0.0",
'previousPeopleCount': peopleCount, "previousPeopleCount": peopleCount,
'countChange': countChange, "countChange": countChange,
'directionState': directionState, "directionState": directionState,
'dateTime': datetime.now(), "dateTime": datetime.now(),
'motionTriggeredLights': motion_triggered_lights "motionTriggeredLights": motion_triggered_lights,
} }
try: try:
with open(LOG_FILE_PATH, 'a') as f: with open(LOG_FILE_PATH, "a") as f:
f.write(json.dumps(data, default=str) + "\n") f.write(json.dumps(data, default=str) + "\n")
except Exception as ex: except Exception as ex:
logging.exception(f'Unable to write log file. {ex}') logging.exception(f"Unable to write log file. {ex}")
def count_change(change: int) -> None: def count_change(change: int) -> None:
@ -128,16 +131,16 @@ def count_change(change: int) -> None:
if peopleCount <= 0 and previous_lights_state and not motion_triggered_lights: if peopleCount <= 0 and previous_lights_state and not motion_triggered_lights:
# Count was 0, but lights were on (not because of motion triggers) => people count was not actually 0 # Count was 0, but lights were on (not because of motion triggers) => people count was not actually 0
peopleCount = 1 peopleCount = 1
logging.debug(f'People count corrected to {peopleCount}') logging.debug(f"People count corrected to {peopleCount}")
elif peopleCount > 0 and not previous_lights_state: elif peopleCount > 0 and not previous_lights_state:
# Count was >0, but lights were off => people count was actually 0 # Count was >0, but lights were off => people count was actually 0
peopleCount = 0 peopleCount = 0
logging.debug(f'People count corrected to {peopleCount}') logging.debug(f"People count corrected to {peopleCount}")
peopleCount += change peopleCount += change
if peopleCount < 0: if peopleCount < 0:
peopleCount = 0 peopleCount = 0
logging.debug(f'People count changed by {change}') logging.debug(f"People count changed by {change}")
# Handle light # Handle light
target_light_state = peopleCount > 0 target_light_state = peopleCount > 0
@ -152,11 +155,11 @@ def count_change(change: int) -> None:
set_light_state(target_light_state) set_light_state(target_light_state)
def trigger_change(triggerState: Dict): def trigger_change(triggerState: dict):
"""Handles motion triggered light state. """Handles motion triggered light state.
Args: Args:
triggerState (Dict): Describing in what directions the sensor is triggerd. triggerState (dict): Describing in what directions the sensor is triggerd.
""" """
global hue global hue
global motion_triggered_lights global motion_triggered_lights
@ -164,7 +167,9 @@ def trigger_change(triggerState: Dict):
target_light_state = None target_light_state = None
# Is someone walking close to the door? # Is someone walking close to the door?
motion_detected = triggerState[Directions.INSIDE] or triggerState[Directions.OUTSIDE] motion_detected = (
triggerState[Directions.INSIDE] or triggerState[Directions.OUTSIDE]
)
target_light_state = motion_detected target_light_state = motion_detected
# Does motion triggered light need to do anything? # Does motion triggered light need to do anything?
@ -199,9 +204,8 @@ def set_light_scene(target_scene: str) -> bool:
return return
# Set lights to scene # Set lights to scene
hue.set_group_scene(hue_conf['light_group'], target_scene) hue.set_group_scene(hue_conf["light_group"], target_scene)
logging.debug( logging.debug(f"Light scene set to {target_scene}")
f'Light scene set to {target_scene}')
def set_light_state(target_light_state: bool) -> bool: def set_light_state(target_light_state: bool) -> bool:
@ -218,16 +222,17 @@ def set_light_state(target_light_state: bool) -> bool:
if target_light_state == previous_lights_state: if target_light_state == previous_lights_state:
return previous_lights_state return previous_lights_state
# Adjust light as necessary # Adjust light if necessary
target_scene = get_scene_for_time(datetime.now().time()) target_scene = get_scene_for_time(datetime.now().time())
if target_light_state and target_scene: if target_light_state and target_scene:
# Set to specific scene if exists # Set to specific scene if exists
hue.set_group_scene(hue_conf['light_group'], target_scene) hue.set_group_scene(hue_conf["light_group"], target_scene)
logging.debug( logging.debug(
f'Light state changed to {target_light_state} with scene {target_scene}') f"Light state changed to {target_light_state} with scene {target_scene}"
)
else: else:
hue.set_group(hue_conf['light_group'], {'on': target_light_state}) hue.set_group(hue_conf["light_group"], {"on": target_light_state})
logging.debug(f'Light state changed to {target_light_state}') logging.debug(f"Light state changed to {target_light_state}")
return previous_lights_state return previous_lights_state
@ -237,24 +242,22 @@ def get_light_state() -> bool:
Returns: Returns:
bool: Current light state. bool: Current light state.
""" """
return hue.get_group(hue_conf['light_group'])['state']['any_on'] return hue.get_group(hue_conf["light_group"])["state"]["any_on"]
def update_scene(): def update_scene():
"""Called by time trigger to update light scene if lights are on. """Called by time trigger to update light scene if lights are on."""
"""
scene = get_scene_for_time(datetime.now().time()) scene = get_scene_for_time(datetime.now().time())
if scene is None: if scene is None:
return return
set_light_scene(scene) set_light_scene(scene)
logging.debug(f'Updated scene at {datetime.now().time()} to {scene}.') logging.debug(f"Updated scene at {datetime.now().time()} to {scene}.")
def register_time_triggers(): def register_time_triggers():
"""Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on. """Registeres time triggered callbacks based on the schedule, to adjust the current scene, if lights are on."""
"""
global SCHEDULE global SCHEDULE
if SCHEDULE is None or len(SCHEDULE) <= 0: if SCHEDULE is None or len(SCHEDULE) <= 0:
return return

View file

@ -1,7 +1,5 @@
from datetime import datetime from datetime import datetime
import json import json
from typing import Dict
from xmlrpc.client import Boolean
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@ -15,20 +13,21 @@ with open(FILE_PATH, "r") as file:
content = file.readlines() content = file.readlines()
def parse_log_entry(entry: Dict) -> Dict: def parse_log_entry(entry: dict) -> dict:
# Only keep last record of a sequence # Only keep last record of a sequence
if not is_last_in_sequence(entry): if not is_last_in_sequence(entry):
return False return False
entry["dateTime"] = datetime.strptime( entry["dateTime"] = datetime.strptime(
str(entry["dateTime"])[:19], "%Y-%m-%d %H:%M:%S") str(entry["dateTime"])[:19], "%Y-%m-%d %H:%M:%S"
)
if entry["dateTime"] < datetime(2022, 1, 1): if entry["dateTime"] < datetime(2022, 1, 1):
return False return False
return entry return entry
def is_last_in_sequence(entry: Dict) -> Boolean: def is_last_in_sequence(entry: dict) -> bool:
indoor = entry["directionState"]["indoor"] indoor = entry["directionState"]["indoor"]
outdoor = entry["directionState"]["outdoor"] outdoor = entry["directionState"]["outdoor"]
@ -60,7 +59,7 @@ times: list[datetime] = [entry["dateTime"] for entry in log]
counts: list[int] = [entry["previousPeopleCount"] for entry in log] counts: list[int] = [entry["previousPeopleCount"] for entry in log]
ax.step(times, counts, where="pre") ax.step(times, counts, where="pre")
plt.show() plt.show()
print("-"*20) print("-" * 20)
# Print stats # Print stats
@ -70,12 +69,11 @@ walk_unders = [entry for entry in log if entry["countChange"] == 0]
print("Number of walk-ins:", len(walk_ins)) print("Number of walk-ins:", len(walk_ins))
print("Number of walk-outs:", len(walk_outs)) print("Number of walk-outs:", len(walk_outs))
print("Number of walk-unders:", len(walk_unders)) print("Number of walk-unders:", len(walk_unders))
print("-"*20) print("-" * 20)
# Calculate faults # Calculate faults
for c, n in zip(list(range(len(log))), list(range(len(log)))[1:]): for c, n in zip(list(range(len(log))), list(range(len(log)))[1:]):
estimated_count: int = log[c]["previousPeopleCount"] + \ estimated_count: int = log[c]["previousPeopleCount"] + log[c]["countChange"]
log[c]["countChange"]
faulty: bool = estimated_count != log[n]["previousPeopleCount"] faulty: bool = estimated_count != log[n]["previousPeopleCount"]
log[c]["faulty"] = faulty log[c]["faulty"] = faulty
log[c]["faultyCount"] = log[c]["previousPeopleCount"] if faulty else None log[c]["faultyCount"] = log[c]["previousPeopleCount"] if faulty else None
@ -85,11 +83,9 @@ fault_count = sum(1 for entry in log if entry["faulty"])
print("Number of faults:", fault_count) print("Number of faults:", fault_count)
print("Percentage of faults:", fault_count / len(log) * 100, "%") print("Percentage of faults:", fault_count / len(log) * 100, "%")
print("-"*20) print("-" * 20)
faulty_off = [entry for entry in log if entry["faulty"] faulty_off = [entry for entry in log if entry["faulty"] and entry["faultyCount"] == 0]
and entry["faultyCount"] == 0] faulty_on = [entry for entry in log if entry["faulty"] and entry["faultyCount"] != 0]
faulty_on = [entry for entry in log if entry["faulty"]
and entry["faultyCount"] != 0]
print("Number of false-0:", len(faulty_off)) print("Number of false-0:", len(faulty_off))
print("Number of false-1:", len(faulty_on)) print("Number of false-1:", len(faulty_on))
print("Percentage of false-0:", len(faulty_off) / fault_count * 100, "%") print("Percentage of false-0:", len(faulty_off) / fault_count * 100, "%")