Compare commits
24 commits
raspberryp
...
master
Author | SHA1 | Date | |
---|---|---|---|
cbf2fa18a2 | |||
e6c15732ce | |||
aefa0fa683 | |||
eee9e2e85c | |||
8693d1463d | |||
d399d9d098 | |||
aa5b6d3621 | |||
6b5985febf | |||
667edf92eb | |||
23d2add388 | |||
16ebe4a670 | |||
b85ee8ab25 | |||
63509438b0 | |||
e7d397563a | |||
83ee05d300 | |||
a8cf4068db | |||
4adcbcd9ee | |||
c87e076939 | |||
5e81fecb78 | |||
aba9014145 | |||
aaf5d10154 | |||
91a8700b8e | |||
4cb7e3f731 | |||
11814a211d |
15 changed files with 209 additions and 72 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -162,3 +162,7 @@ cython_debug/
|
|||
|
||||
log.txt
|
||||
.env
|
||||
.vscode/c_cpp_properties.json
|
||||
.vscode/extensions.json
|
||||
.vscode/launch.json
|
||||
.vscode/settings.json
|
||||
|
|
|
@ -47,11 +47,11 @@ In the photos below you can see my current setup. The piece of cardboard is supp
|
|||
The piece of cardboard is on the side of my room, which might help you to orient the sensor properly.
|
||||
|
||||
<p align="left">
|
||||
<img height="200px" src="/images/door.png" />
|
||||
<img height="200px" src="https://code.giller.dev/m.giller/mash-sensor-tof-pc/raw/branch/master/images/door.png" />
|
||||
<br/>
|
||||
<img height="200px" src="/images/controller.jpeg" />
|
||||
<img height="200px" src="https://code.giller.dev/m.giller/mash-sensor-tof-pc/raw/branch/master/images/controller.jpeg" />
|
||||
<br/>
|
||||
<img height="200px" src="/images/sensor.jpeg" />
|
||||
<img height="200px" src="https://code.giller.dev/m.giller/mash-sensor-tof-pc/raw/branch/master/images/sensor.jpeg" />
|
||||
</p>
|
||||
|
||||
Sadly, due to my limited time, there is so far no proper, easy way of changing the direction of the sensor in software. It is on my todo list tho (see below) and please don't hesistate to open issue if you require a certain feature! I am more than happy to help and expand this project.
|
||||
|
|
|
@ -5,4 +5,4 @@ vl53l1x
|
|||
phue
|
||||
|
||||
# For statistics
|
||||
matplotlib
|
||||
# matplotlib
|
||||
|
|
10
run.bash
Normal file
10
run.bash
Normal file
|
@ -0,0 +1,10 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Set the working directory
|
||||
cd /home/pi/mash-sensor-tof-pc
|
||||
|
||||
# Activate the Python virtual environment
|
||||
source /home/pi/mash-sensor-tof-pc/.env/bin/activate
|
||||
|
||||
# Run the Python script
|
||||
python /home/pi/mash-sensor-tof-pc/src/simple_hue_counter.py
|
|
@ -1,12 +1,9 @@
|
|||
from sensors.people_counter import PeopleCounter
|
||||
from sensors.vl53l1x_sensor import VL53L1XSensor
|
||||
import logging
|
||||
from setup import counter
|
||||
|
||||
counter = PeopleCounter(VL53L1XSensor())
|
||||
peopleCount = 0
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
|
||||
def countChange(change: int) -> None:
|
||||
global peopleCount
|
||||
|
|
26
src/measurer.py
Normal file
26
src/measurer.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
from sensors import Directions
|
||||
from time import sleep
|
||||
from setup import sensor
|
||||
import logging
|
||||
|
||||
|
||||
DELAY_SECONDS = 1
|
||||
|
||||
sensor.open()
|
||||
|
||||
try:
|
||||
while True:
|
||||
sensor.setDirection(Directions.INSIDE)
|
||||
distance_inside = sensor.getDistance()
|
||||
|
||||
sensor.setDirection(Directions.OUTSIDE)
|
||||
distance_outside = sensor.getDistance()
|
||||
|
||||
logging.info("----------")
|
||||
logging.info(f"Inside: {distance_inside} cm")
|
||||
logging.info(f"Outside: {distance_outside} cm")
|
||||
|
||||
sleep(DELAY_SECONDS)
|
||||
|
||||
finally:
|
||||
sensor.close()
|
|
@ -1,3 +1,4 @@
|
|||
from sensors.vl53l1x_sensor import VL53L1XSensor
|
||||
from sensors.vl53l3cx_sensor import VL53L3CXSensor
|
||||
from sensors.people_counter import PeopleCounter
|
||||
from sensors.tof_sensor import Directions
|
||||
|
|
|
@ -6,6 +6,7 @@ import threading
|
|||
COUNTING_CB = "counting"
|
||||
TRIGGER_CB = "trigger"
|
||||
CHANGE_CB = "changes"
|
||||
MEASUREMENT_CB = "measurement"
|
||||
START_TIME = "start_time"
|
||||
END_TIME = "end_time"
|
||||
TRIGGER_DISTANCES = "trigger_distances"
|
||||
|
@ -13,10 +14,16 @@ END_DISTANCE = "end_distance"
|
|||
|
||||
|
||||
class PeopleCounter:
|
||||
def __init__(self, sensor: ToFSensor) -> None:
|
||||
def __init__(self, sensor: ToFSensor, maxTriggerDistanceInCm: int = 90) -> None:
|
||||
self.sensor = sensor
|
||||
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []}
|
||||
self.maxTriggerDistance = 120 # In cm
|
||||
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: [], MEASUREMENT_CB: []}
|
||||
self.maxTriggerDistance = maxTriggerDistanceInCm
|
||||
|
||||
def hookMeasurement(self, cb) -> None:
|
||||
self.callbacks[MEASUREMENT_CB].append(cb)
|
||||
|
||||
def unhookMeasurement(self, cb) -> None:
|
||||
self.callbacks[MEASUREMENT_CB].remove(cb)
|
||||
|
||||
def hookCounting(self, cb) -> None:
|
||||
self.callbacks[COUNTING_CB].append(cb)
|
||||
|
@ -54,6 +61,9 @@ class PeopleCounter:
|
|||
distance: float = self.sensor.getDistance()
|
||||
changed: bool = self.updateState(direction, distance)
|
||||
|
||||
th = threading.Thread(target=self.handleMeasurementCallbacks, args=(direction, distance))
|
||||
th.start()
|
||||
|
||||
if changed:
|
||||
countChange: int = self.getCountChange(self.directionState)
|
||||
|
||||
|
@ -133,6 +143,10 @@ class PeopleCounter:
|
|||
#! TODO: Should be based on the distance from the ground, not from the sensor
|
||||
return distance <= self.maxTriggerDistance
|
||||
|
||||
def handleMeasurementCallbacks(self, direction: Directions, distance: float) -> None:
|
||||
for cb in self.callbacks[MEASUREMENT_CB]:
|
||||
cb(direction, distance)
|
||||
|
||||
def handleCallbacks(self, countChange: int):
|
||||
self.handleChangeCallbacks(countChange)
|
||||
self.handleCountingCallbacks(countChange)
|
||||
|
|
|
@ -22,9 +22,10 @@ class VL53L1XSensor(ToFSensor):
|
|||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def open(self) -> None:
|
||||
def open(self, ranging_mode: int = 3) -> None:
|
||||
self.sensor = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
|
||||
self.sensor.open()
|
||||
self.roi = VL53L1X.VL53L1xUserRoi()
|
||||
|
||||
# Optionally set an explicit timing budget
|
||||
# These values are measurement time in microseconds,
|
||||
|
@ -32,7 +33,7 @@ class VL53L1XSensor(ToFSensor):
|
|||
# If you uncomment the line below to set a budget you
|
||||
# should use `tof.start_ranging(0)`
|
||||
# tof.set_timing(66000, 70)
|
||||
self.ranging = 2
|
||||
self.ranging = ranging_mode
|
||||
# 0 = Unchanged
|
||||
# 1 = Short Range
|
||||
# 2 = Medium Range
|
||||
|
@ -45,18 +46,17 @@ class VL53L1XSensor(ToFSensor):
|
|||
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12),
|
||||
}
|
||||
|
||||
roi = direction_roi[direction]
|
||||
self.roi = direction_roi[direction]
|
||||
|
||||
self.sensor.stop_ranging()
|
||||
self.sensor.set_user_roi(roi)
|
||||
self.sensor.start_ranging(self.ranging)
|
||||
|
||||
def getDistance(self) -> float:
|
||||
"""Returns new distance in cm."""
|
||||
self.sensor.set_user_roi(self.roi)
|
||||
self.sensor.start_ranging(self.ranging)
|
||||
distance = self.sensor.get_distance()
|
||||
self.sensor.stop_ranging()
|
||||
|
||||
return distance / 10
|
||||
|
||||
def close(self) -> None:
|
||||
self.sensor.stop_ranging()
|
||||
self.sensor.close()
|
||||
|
|
61
src/sensors/vl53l3cx_sensor.py
Normal file
61
src/sensors/vl53l3cx_sensor.py
Normal file
|
@ -0,0 +1,61 @@
|
|||
from sensors.tof_sensor import Directions, ToFSensor
|
||||
import VL53L1X
|
||||
|
||||
# Reference: https://github.com/pimoroni/vl53l1x-python
|
||||
#
|
||||
# Left, right, top and bottom are relative to the SPAD matrix coordinates,
|
||||
# which will be mirrored in real scene coordinates.
|
||||
# (or even rotated, depending on the VM53L1X element alignment on the board and on the board position)
|
||||
#
|
||||
# ROI in SPAD matrix coords:
|
||||
#
|
||||
# 15 top-left
|
||||
# | X____
|
||||
# | | |
|
||||
# | |____X
|
||||
# | bottom-right
|
||||
# 0__________15
|
||||
#
|
||||
|
||||
|
||||
class VL53L3CXSensor(ToFSensor):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def open(self, ranging_mode: int = 3) -> None:
|
||||
self.sensor = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x52)
|
||||
self.sensor.open()
|
||||
self.roi = VL53L1X.VL53L1xUserRoi()
|
||||
|
||||
# Optionally set an explicit timing budget
|
||||
# These values are measurement time in microseconds,
|
||||
# and inter-measurement time in milliseconds.
|
||||
# If you uncomment the line below to set a budget you
|
||||
# should use `tof.start_ranging(0)`
|
||||
# tof.set_timing(66000, 70)
|
||||
self.ranging = ranging_mode
|
||||
# 0 = Unchanged
|
||||
# 1 = Short Range
|
||||
# 2 = Medium Range
|
||||
# 3 = Long Range
|
||||
|
||||
def setDirection(self, direction: Directions) -> None:
|
||||
"""Configure sensor to pick up the distance in a specific direction."""
|
||||
direction_roi = {
|
||||
Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0),
|
||||
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12),
|
||||
}
|
||||
|
||||
self.roi = direction_roi[direction]
|
||||
|
||||
def getDistance(self) -> float:
|
||||
"""Returns new distance in cm."""
|
||||
self.sensor.set_user_roi(self.roi)
|
||||
self.sensor.start_ranging(self.ranging)
|
||||
distance = self.sensor.get_distance()
|
||||
self.sensor.stop_ranging()
|
||||
|
||||
return distance / 10
|
||||
|
||||
def close(self) -> None:
|
||||
self.sensor.close()
|
37
src/setup.py
Normal file
37
src/setup.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
from sensors import VL53L1XSensor, VL53L3CXSensor, PeopleCounter
|
||||
from statistics.debug_logging import register_debug_logger
|
||||
from datetime import time
|
||||
import logging
|
||||
|
||||
LOG_FILE_PATH = "log.txt" # Path for logs
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
# If the distance (in cm) is lower or equal to this value, the people counter will trigger
|
||||
MAX_TRIGGER_DISTANCE = 130
|
||||
|
||||
# Should lights already turn on where there is any kind of motion in the sensor
|
||||
ENABLE_MOTION_TRIGGERED_LIGHT = True
|
||||
|
||||
# Should lights change when a certain time in the schedule is reached
|
||||
ENABLE_SCHEDULE_TRIGGERS = (
|
||||
False # Not working correctly at the moment, so turned off by default
|
||||
)
|
||||
|
||||
# Schedule (Key is time after scene should be used. Value is scene name to be used.)
|
||||
# Needs to be sorted chronologically
|
||||
SCHEDULE: dict[time, str] = {}
|
||||
|
||||
# Philips Hue configuration
|
||||
hue_conf = {
|
||||
"bridge_ip": "192.168.178.85",
|
||||
"transition_time": 10, # seconds
|
||||
"light_group": "Max Zimmer",
|
||||
# If file exists, application is considered 'registered' at the bridge
|
||||
"registered_file": "smart_light_registered.bridge",
|
||||
} # Custom configuration for philips hue
|
||||
|
||||
sensor = VL53L1XSensor()
|
||||
|
||||
counter: PeopleCounter = PeopleCounter(sensor, MAX_TRIGGER_DISTANCE) # Sensor object
|
||||
|
||||
register_debug_logger(counter)
|
|
@ -1,28 +1,14 @@
|
|||
from datetime import datetime
|
||||
from services.philips_hue import PhilipsHue
|
||||
from sensors.people_counter import PeopleCounter
|
||||
from sensors.vl53l1x_sensor import VL53L1XSensor
|
||||
import logging
|
||||
import json
|
||||
|
||||
|
||||
LOG_FILE_PATH = "log.txt" # Path for logs
|
||||
hue_conf = {
|
||||
"bridge_ip": "",
|
||||
"transition_time": 10, # seconds
|
||||
# Light group to control
|
||||
"light_group": "",
|
||||
# If file exists, application is considered 'registered' at the bridge
|
||||
"registered_file": "smart_light_registered.bridge",
|
||||
} # Custom configuration for philips hue
|
||||
from setup import hue_conf, LOG_FILE_PATH, counter
|
||||
|
||||
|
||||
hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
|
||||
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
|
||||
peopleCount: int = 0 # Global count of people on the inside
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
|
||||
def change_cb(countChange: int, directionState: dict):
|
||||
"""Handles basic logging of event data for later analysis.
|
||||
|
@ -63,16 +49,16 @@ def count_change(change: int) -> None:
|
|||
if peopleCount <= 0 and previous_lights_state:
|
||||
# Count was 0, but lights were on => people count was not actually 0
|
||||
peopleCount = 1
|
||||
logging.debug(f"People count corrected to {peopleCount}")
|
||||
logging.info(f"People count corrected to {peopleCount}")
|
||||
elif peopleCount > 0 and not previous_lights_state:
|
||||
# Count was >0, but lights were off => people count was actually 0
|
||||
peopleCount = 0
|
||||
logging.debug(f"People count corrected to {peopleCount}")
|
||||
logging.info(f"People count corrected to {peopleCount}")
|
||||
|
||||
peopleCount += change
|
||||
if peopleCount < 0:
|
||||
peopleCount = 0
|
||||
logging.debug(f"People count changed by {change}")
|
||||
logging.info(f"People count changed by {change}")
|
||||
|
||||
# Handle light
|
||||
target_light_state = peopleCount > 0
|
||||
|
@ -100,7 +86,7 @@ def set_light_state(target_light_state: bool) -> bool:
|
|||
|
||||
# Adjust light as necessary
|
||||
hue.set_group(hue_conf["light_group"], {"on": target_light_state})
|
||||
logging.debug(f"Light state changed to {target_light_state}")
|
||||
logging.info(f"Light state changed to {target_light_state}")
|
||||
|
||||
return previous_lights_state
|
||||
|
||||
|
|
|
@ -1,36 +1,14 @@
|
|||
from datetime import datetime, time, timedelta
|
||||
from typing import Optional
|
||||
from services.philips_hue import PhilipsHue
|
||||
from sensors import PeopleCounter, Directions, VL53L1XSensor
|
||||
from sensors import Directions
|
||||
import logging
|
||||
import json
|
||||
from timeloop import Timeloop
|
||||
|
||||
|
||||
# Should lights already turn on where there is any kind of motion in the sensor
|
||||
ENABLE_MOTION_TRIGGERED_LIGHT = True
|
||||
|
||||
# Should lights change when a certain time in the schedule is reached
|
||||
ENABLE_SCHEDULE_TRIGGERS = (
|
||||
False # Not working correctly at the moment, so turned off by default
|
||||
)
|
||||
|
||||
# Schedule (Key is time after scene should be used. Value is scene name to be used.)
|
||||
# Needs to be sorted chronologically
|
||||
SCHEDULE: dict[time, str] = {}
|
||||
|
||||
|
||||
LOG_FILE_PATH = "log.txt" # Path for logs
|
||||
hue_conf = {
|
||||
"bridge_ip": "",
|
||||
"transition_time": 10, # seconds
|
||||
"light_group": "",
|
||||
# If file exists, application is considered 'registered' at the bridge
|
||||
"registered_file": "smart_light_registered.bridge",
|
||||
} # Custom configuration for philips hue
|
||||
from setup import hue_conf, LOG_FILE_PATH, SCHEDULE, ENABLE_SCHEDULE_TRIGGERS, counter
|
||||
|
||||
|
||||
hue: PhilipsHue = PhilipsHue(hue_conf) # Light interface
|
||||
counter: PeopleCounter = PeopleCounter(VL53L1XSensor()) # Sensor object
|
||||
peopleCount: int = 0 # Global count of people on the inside
|
||||
motion_triggered_lights = False # Is light on because of any detected motion
|
||||
timeloop: Timeloop = Timeloop() # Used for time triggered schedule
|
||||
|
@ -55,7 +33,7 @@ def time_minus_time(time_a: time, time_b: time) -> timedelta:
|
|||
return dt_a - dt_b
|
||||
|
||||
|
||||
def get_scene_for_time(time: time) -> str:
|
||||
def get_scene_for_time(time: time) -> Optional[str]:
|
||||
"""Determines the correct scene to activate for a given time.
|
||||
|
||||
Args:
|
||||
|
@ -127,16 +105,16 @@ def count_change(change: int) -> None:
|
|||
if peopleCount <= 0 and previous_lights_state and not motion_triggered_lights:
|
||||
# Count was 0, but lights were on (not because of motion triggers) => people count was not actually 0
|
||||
peopleCount = 1
|
||||
logging.debug(f"People count corrected to {peopleCount}")
|
||||
logging.info(f"People count corrected to {peopleCount}")
|
||||
elif peopleCount > 0 and not previous_lights_state:
|
||||
# Count was >0, but lights were off => people count was actually 0
|
||||
peopleCount = 0
|
||||
logging.debug(f"People count corrected to {peopleCount}")
|
||||
logging.info(f"People count corrected to {peopleCount}")
|
||||
|
||||
peopleCount += change
|
||||
if peopleCount < 0:
|
||||
peopleCount = 0
|
||||
logging.debug(f"People count changed by {change}")
|
||||
logging.info(f"People count changed by {change}")
|
||||
|
||||
# Handle light
|
||||
target_light_state = peopleCount > 0
|
||||
|
@ -178,13 +156,14 @@ def trigger_change(triggerState: dict):
|
|||
if target_light_state == motion_triggered_lights:
|
||||
return
|
||||
|
||||
logging.info(f"Motion trigger set to {target_light_state}")
|
||||
set_light_state(target_light_state)
|
||||
|
||||
# Save state
|
||||
motion_triggered_lights = target_light_state
|
||||
|
||||
|
||||
def set_light_scene(target_scene: str) -> bool:
|
||||
def set_light_scene(target_scene: str) -> None:
|
||||
"""Sets the lights to the given scene, but only, if lights are already on. Does not correct count if lights are in an unexpected state.
|
||||
|
||||
Args:
|
||||
|
@ -201,7 +180,7 @@ def set_light_scene(target_scene: str) -> bool:
|
|||
|
||||
# Set lights to scene
|
||||
hue.set_group_scene(hue_conf["light_group"], target_scene)
|
||||
logging.debug(f"Light scene set to {target_scene}")
|
||||
logging.info(f"Light scene set to {target_scene}")
|
||||
|
||||
|
||||
def set_light_state(target_light_state: bool) -> bool:
|
||||
|
@ -223,12 +202,12 @@ def set_light_state(target_light_state: bool) -> bool:
|
|||
if target_light_state and target_scene:
|
||||
# Set to specific scene if exists
|
||||
hue.set_group_scene(hue_conf["light_group"], target_scene)
|
||||
logging.debug(
|
||||
logging.info(
|
||||
f"Light state changed to {target_light_state} with scene {target_scene}"
|
||||
)
|
||||
else:
|
||||
hue.set_group(hue_conf["light_group"], {"on": target_light_state})
|
||||
logging.debug(f"Light state changed to {target_light_state}")
|
||||
logging.info(f"Light state changed to {target_light_state}")
|
||||
|
||||
return previous_lights_state
|
||||
|
||||
|
@ -249,7 +228,7 @@ def update_scene():
|
|||
return
|
||||
|
||||
set_light_scene(scene)
|
||||
logging.debug(f"Updated scene at {datetime.now().time()} to {scene}.")
|
||||
logging.info(f"Updated scene at {datetime.now().time()} to {scene}.")
|
||||
|
||||
|
||||
def register_time_triggers():
|
||||
|
|
0
src/statistics/__init__.py
Normal file
0
src/statistics/__init__.py
Normal file
22
src/statistics/debug_logging.py
Normal file
22
src/statistics/debug_logging.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
from sensors import Directions, PeopleCounter
|
||||
import logging
|
||||
import json
|
||||
|
||||
def _debug_log_change(countChange: int, directionState: dict) -> None:
|
||||
json_state = json.dumps(directionState, default=str)
|
||||
logging.debug(f"CHANGE;Count Change;{str(countChange)};Direction State;{json_state};")
|
||||
|
||||
def _debug_log_trigger(triggerState: dict) -> None:
|
||||
logging.debug(f"TRIGGER;Inside Triggered;{str(triggerState[Directions.INSIDE])};Outside Triggered;{str(triggerState[Directions.OUTSIDE])};")
|
||||
|
||||
def _debug_log_counting(countChange: int) -> None:
|
||||
logging.debug(f"COUNTING;Count Change;{str(countChange)};")
|
||||
|
||||
def _debug_log_measurement(direction: Directions, distance: float) -> None:
|
||||
logging.debug(f"MEASUREMENT;Direction;{str(direction)};Distance;{str(distance)};")
|
||||
|
||||
def register_debug_logger(counter: PeopleCounter) -> None:
|
||||
counter.hookChange(_debug_log_change)
|
||||
counter.hookCounting(_debug_log_counting)
|
||||
counter.hookTrigger(_debug_log_trigger)
|
||||
counter.hookMeasurement(_debug_log_measurement)
|
Loading…
Reference in a new issue