Compare commits

..

No commits in common. "master" and "raspberrypi-v1.0" have entirely different histories.

15 changed files with 72 additions and 209 deletions

6
.gitignore vendored
View file

@ -161,8 +161,4 @@ cython_debug/
log.txt
.env
.vscode/c_cpp_properties.json
.vscode/extensions.json
.vscode/launch.json
.vscode/settings.json
.env

View file

@ -47,11 +47,11 @@ In the photos below you can see my current setup. The piece of cardboard is supp
The piece of cardboard is on the side of my room, which might help you to orient the sensor properly.
<p align="left">
<img height="200px" src="https://code.giller.dev/m.giller/mash-sensor-tof-pc/raw/branch/master/images/door.png" />
<img height="200px" src="/images/door.png" />
<br/>
<img height="200px" src="https://code.giller.dev/m.giller/mash-sensor-tof-pc/raw/branch/master/images/controller.jpeg" />
<img height="200px" src="/images/controller.jpeg" />
<br/>
<img height="200px" src="https://code.giller.dev/m.giller/mash-sensor-tof-pc/raw/branch/master/images/sensor.jpeg" />
<img height="200px" src="/images/sensor.jpeg" />
</p>
Sadly, due to my limited time, there is so far no proper, easy way of changing the direction of the sensor in software. It is on my todo list tho (see below) and please don't hesistate to open issue if you require a certain feature! I am more than happy to help and expand this project.

View file

@ -5,4 +5,4 @@ vl53l1x
phue
# For statistics
# matplotlib
matplotlib

View file

@ -1,10 +0,0 @@
#!/bin/bash
# Set the working directory
cd /home/pi/mash-sensor-tof-pc
# Activate the Python virtual environment
source /home/pi/mash-sensor-tof-pc/.env/bin/activate
# Run the Python script
python /home/pi/mash-sensor-tof-pc/src/simple_hue_counter.py

View file

@ -1,9 +1,12 @@
from sensors.people_counter import PeopleCounter
from sensors.vl53l1x_sensor import VL53L1XSensor
import logging
from setup import counter
counter = PeopleCounter(VL53L1XSensor())
peopleCount = 0
logging.getLogger().setLevel(logging.INFO)
def countChange(change: int) -> None:
global peopleCount

View file

@ -1,26 +0,0 @@
from sensors import Directions
from time import sleep
from setup import sensor
import logging
DELAY_SECONDS = 1
sensor.open()
try:
while True:
sensor.setDirection(Directions.INSIDE)
distance_inside = sensor.getDistance()
sensor.setDirection(Directions.OUTSIDE)
distance_outside = sensor.getDistance()
logging.info("----------")
logging.info(f"Inside: {distance_inside} cm")
logging.info(f"Outside: {distance_outside} cm")
sleep(DELAY_SECONDS)
finally:
sensor.close()

View file

@ -1,4 +1,3 @@
from sensors.vl53l1x_sensor import VL53L1XSensor
from sensors.vl53l3cx_sensor import VL53L3CXSensor
from sensors.people_counter import PeopleCounter
from sensors.tof_sensor import Directions

View file

@ -6,7 +6,6 @@ import threading
COUNTING_CB = "counting"
TRIGGER_CB = "trigger"
CHANGE_CB = "changes"
MEASUREMENT_CB = "measurement"
START_TIME = "start_time"
END_TIME = "end_time"
TRIGGER_DISTANCES = "trigger_distances"
@ -14,16 +13,10 @@ END_DISTANCE = "end_distance"
class PeopleCounter:
def __init__(self, sensor: ToFSensor, maxTriggerDistanceInCm: int = 90) -> None:
def __init__(self, sensor: ToFSensor) -> None:
self.sensor = sensor
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: [], MEASUREMENT_CB: []}
self.maxTriggerDistance = maxTriggerDistanceInCm
def hookMeasurement(self, cb) -> None:
self.callbacks[MEASUREMENT_CB].append(cb)
def unhookMeasurement(self, cb) -> None:
self.callbacks[MEASUREMENT_CB].remove(cb)
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []}
self.maxTriggerDistance = 120 # In cm
def hookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].append(cb)
@ -61,9 +54,6 @@ class PeopleCounter:
distance: float = self.sensor.getDistance()
changed: bool = self.updateState(direction, distance)
th = threading.Thread(target=self.handleMeasurementCallbacks, args=(direction, distance))
th.start()
if changed:
countChange: int = self.getCountChange(self.directionState)
@ -143,10 +133,6 @@ class PeopleCounter:
#! TODO: Should be based on the distance from the ground, not from the sensor
return distance <= self.maxTriggerDistance
def handleMeasurementCallbacks(self, direction: Directions, distance: float) -> None:
for cb in self.callbacks[MEASUREMENT_CB]:
cb(direction, distance)
def handleCallbacks(self, countChange: int):
self.handleChangeCallbacks(countChange)
self.handleCountingCallbacks(countChange)

View file

@ -22,10 +22,9 @@ class VL53L1XSensor(ToFSensor):
def __init__(self) -> None:
super().__init__()
def open(self, ranging_mode: int = 3) -> None:
def open(self) -> None:
self.sensor = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
self.sensor.open()
self.roi = VL53L1X.VL53L1xUserRoi()
# Optionally set an explicit timing budget
# These values are measurement time in microseconds,
@ -33,7 +32,7 @@ class VL53L1XSensor(ToFSensor):
# If you uncomment the line below to set a budget you
# should use `tof.start_ranging(0)`
# tof.set_timing(66000, 70)
self.ranging = ranging_mode
self.ranging = 2
# 0 = Unchanged
# 1 = Short Range
# 2 = Medium Range
@ -46,17 +45,18 @@ class VL53L1XSensor(ToFSensor):
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12),
}
self.roi = direction_roi[direction]
roi = direction_roi[direction]
self.sensor.stop_ranging()
self.sensor.set_user_roi(roi)
self.sensor.start_ranging(self.ranging)
def getDistance(self) -> float:
"""Returns new distance in cm."""
self.sensor.set_user_roi(self.roi)
self.sensor.start_ranging(self.ranging)
distance = self.sensor.get_distance()
self.sensor.stop_ranging()
return distance / 10
def close(self) -> None:
self.sensor.stop_ranging()
self.sensor.close()

View file

@ -1,61 +0,0 @@
from sensors.tof_sensor import Directions, ToFSensor
import VL53L1X
# Reference: https://github.com/pimoroni/vl53l1x-python
#
# Left, right, top and bottom are relative to the SPAD matrix coordinates,
# which will be mirrored in real scene coordinates.
# (or even rotated, depending on the VM53L1X element alignment on the board and on the board position)
#
# ROI in SPAD matrix coords:
#
# 15 top-left
# | X____
# | | |
# | |____X
# | bottom-right
# 0__________15
#
class VL53L3CXSensor(ToFSensor):
def __init__(self) -> None:
super().__init__()
def open(self, ranging_mode: int = 3) -> None:
self.sensor = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x52)
self.sensor.open()
self.roi = VL53L1X.VL53L1xUserRoi()
# Optionally set an explicit timing budget
# These values are measurement time in microseconds,
# and inter-measurement time in milliseconds.
# If you uncomment the line below to set a budget you
# should use `tof.start_ranging(0)`
# tof.set_timing(66000, 70)
self.ranging = ranging_mode
# 0 = Unchanged
# 1 = Short Range
# 2 = Medium Range
# 3 = Long Range
def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction."""
direction_roi = {
Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0),
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12),
}
self.roi = direction_roi[direction]
def getDistance(self) -> float:
"""Returns new distance in cm."""
self.sensor.set_user_roi(self.roi)
self.sensor.start_ranging(self.ranging)
distance = self.sensor.get_distance()
self.sensor.stop_ranging()
return distance / 10
def close(self) -> None:
self.sensor.close()

View file

@ -1,37 +0,0 @@
from sensors import VL53L1XSensor, VL53L3CXSensor, PeopleCounter
from statistics.debug_logging import register_debug_logger
from datetime import time
import logging
LOG_FILE_PATH = "log.txt" # Path for logs
logging.getLogger().setLevel(logging.DEBUG)
# If the distance (in cm) is lower or equal to this value, the people counter will trigger
MAX_TRIGGER_DISTANCE = 130
# Should lights already turn on where there is any kind of motion in the sensor
ENABLE_MOTION_TRIGGERED_LIGHT = True
# Should lights change when a certain time in the schedule is reached
ENABLE_SCHEDULE_TRIGGERS = (
False # Not working correctly at the moment, so turned off by default
)
# Schedule (Key is time after scene should be used. Value is scene name to be used.)
# Needs to be sorted chronologically
SCHEDULE: dict[time, str] = {}
# Philips Hue configuration
hue_conf = {
"bridge_ip": "192.168.178.85",
"transition_time": 10, # seconds
"light_group": "Max Zimmer",
# If file exists, application is considered 'registered' at the bridge
"registered_file": "smart_light_registered.bridge",
} # Custom configuration for philips hue
sensor = VL53L1XSensor()
counter: PeopleCounter = PeopleCounter(sensor, MAX_TRIGGER_DISTANCE) # Sensor object
register_debug_logger(counter)

View file

@ -1,14 +1,28 @@
from datetime import datetime
from services.philips_hue import PhilipsHue
from sensors.people_counter import PeopleCounter
from sensors.vl53l1x_sensor import VL53L1XSensor
import logging
import json
from setup import hue_conf, LOG_FILE_PATH, counter
LOG_FILE_PATH = "log.txt" # Path for logs
hue_conf = {
"bridge_ip": "",
"transition_time": 10, # seconds
# Light group to control
"light_group": "",
# If file exists, application is considered 'registered' at the bridge
"registered_file": "smart_light_registered.bridge",
} # Custom configuration for philips hue
hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
peopleCount: int = 0 # Global count of people on the inside
logging.getLogger().setLevel(logging.INFO)
def change_cb(countChange: int, directionState: dict):
"""Handles basic logging of event data for later analysis.
@ -49,16 +63,16 @@ def count_change(change: int) -> None:
if peopleCount <= 0 and previous_lights_state:
# Count was 0, but lights were on => people count was not actually 0
peopleCount = 1
logging.info(f"People count corrected to {peopleCount}")
logging.debug(f"People count corrected to {peopleCount}")
elif peopleCount > 0 and not previous_lights_state:
# Count was >0, but lights were off => people count was actually 0
peopleCount = 0
logging.info(f"People count corrected to {peopleCount}")
logging.debug(f"People count corrected to {peopleCount}")
peopleCount += change
if peopleCount < 0:
peopleCount = 0
logging.info(f"People count changed by {change}")
logging.debug(f"People count changed by {change}")
# Handle light
target_light_state = peopleCount > 0
@ -86,7 +100,7 @@ def set_light_state(target_light_state: bool) -> bool:
# Adjust light as necessary
hue.set_group(hue_conf["light_group"], {"on": target_light_state})
logging.info(f"Light state changed to {target_light_state}")
logging.debug(f"Light state changed to {target_light_state}")
return previous_lights_state

View file

@ -1,14 +1,36 @@
from datetime import datetime, time, timedelta
from typing import Optional
from services.philips_hue import PhilipsHue
from sensors import Directions
from sensors import PeopleCounter, Directions, VL53L1XSensor
import logging
import json
from timeloop import Timeloop
from setup import hue_conf, LOG_FILE_PATH, SCHEDULE, ENABLE_SCHEDULE_TRIGGERS, counter
# Should lights already turn on where there is any kind of motion in the sensor
ENABLE_MOTION_TRIGGERED_LIGHT = True
# Should lights change when a certain time in the schedule is reached
ENABLE_SCHEDULE_TRIGGERS = (
False # Not working correctly at the moment, so turned off by default
)
# Schedule (Key is time after scene should be used. Value is scene name to be used.)
# Needs to be sorted chronologically
SCHEDULE: dict[time, str] = {}
LOG_FILE_PATH = "log.txt" # Path for logs
hue_conf = {
"bridge_ip": "",
"transition_time": 10, # seconds
"light_group": "",
# If file exists, application is considered 'registered' at the bridge
"registered_file": "smart_light_registered.bridge",
} # Custom configuration for philips hue
hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
peopleCount: int = 0 # Global count of people on the inside
motion_triggered_lights = False # Is light on because of any detected motion
timeloop: Timeloop = Timeloop() # Used for time triggered schedule
@ -33,7 +55,7 @@ def time_minus_time(time_a: time, time_b: time) -> timedelta:
return dt_a - dt_b
def get_scene_for_time(time: time) -> Optional[str]:
def get_scene_for_time(time: time) -> str:
"""Determines the correct scene to activate for a given time.
Args:
@ -105,16 +127,16 @@ def count_change(change: int) -> None:
if peopleCount <= 0 and previous_lights_state and not motion_triggered_lights:
# Count was 0, but lights were on (not because of motion triggers) => people count was not actually 0
peopleCount = 1
logging.info(f"People count corrected to {peopleCount}")
logging.debug(f"People count corrected to {peopleCount}")
elif peopleCount > 0 and not previous_lights_state:
# Count was >0, but lights were off => people count was actually 0
peopleCount = 0
logging.info(f"People count corrected to {peopleCount}")
logging.debug(f"People count corrected to {peopleCount}")
peopleCount += change
if peopleCount < 0:
peopleCount = 0
logging.info(f"People count changed by {change}")
logging.debug(f"People count changed by {change}")
# Handle light
target_light_state = peopleCount > 0
@ -156,14 +178,13 @@ def trigger_change(triggerState: dict):
if target_light_state == motion_triggered_lights:
return
logging.info(f"Motion trigger set to {target_light_state}")
set_light_state(target_light_state)
# Save state
motion_triggered_lights = target_light_state
def set_light_scene(target_scene: str) -> None:
def set_light_scene(target_scene: str) -> bool:
"""Sets the lights to the given scene, but only, if lights are already on. Does not correct count if lights are in an unexpected state.
Args:
@ -180,7 +201,7 @@ def set_light_scene(target_scene: str) -> None:
# Set lights to scene
hue.set_group_scene(hue_conf["light_group"], target_scene)
logging.info(f"Light scene set to {target_scene}")
logging.debug(f"Light scene set to {target_scene}")
def set_light_state(target_light_state: bool) -> bool:
@ -202,12 +223,12 @@ def set_light_state(target_light_state: bool) -> bool:
if target_light_state and target_scene:
# Set to specific scene if exists
hue.set_group_scene(hue_conf["light_group"], target_scene)
logging.info(
logging.debug(
f"Light state changed to {target_light_state} with scene {target_scene}"
)
else:
hue.set_group(hue_conf["light_group"], {"on": target_light_state})
logging.info(f"Light state changed to {target_light_state}")
logging.debug(f"Light state changed to {target_light_state}")
return previous_lights_state
@ -228,7 +249,7 @@ def update_scene():
return
set_light_scene(scene)
logging.info(f"Updated scene at {datetime.now().time()} to {scene}.")
logging.debug(f"Updated scene at {datetime.now().time()} to {scene}.")
def register_time_triggers():

View file

@ -1,22 +0,0 @@
from sensors import Directions, PeopleCounter
import logging
import json
def _debug_log_change(countChange: int, directionState: dict) -> None:
json_state = json.dumps(directionState, default=str)
logging.debug(f"CHANGE;Count Change;{str(countChange)};Direction State;{json_state};")
def _debug_log_trigger(triggerState: dict) -> None:
logging.debug(f"TRIGGER;Inside Triggered;{str(triggerState[Directions.INSIDE])};Outside Triggered;{str(triggerState[Directions.OUTSIDE])};")
def _debug_log_counting(countChange: int) -> None:
logging.debug(f"COUNTING;Count Change;{str(countChange)};")
def _debug_log_measurement(direction: Directions, distance: float) -> None:
logging.debug(f"MEASUREMENT;Direction;{str(direction)};Distance;{str(distance)};")
def register_debug_logger(counter: PeopleCounter) -> None:
counter.hookChange(_debug_log_change)
counter.hookCounting(_debug_log_counting)
counter.hookTrigger(_debug_log_trigger)
counter.hookMeasurement(_debug_log_measurement)