Implement debug logging of measurements

This commit is contained in:
Maximilian Giller 2024-02-14 23:51:44 +01:00
parent aa5b6d3621
commit d399d9d098
3 changed files with 42 additions and 2 deletions

View file

@ -6,6 +6,7 @@ import threading
COUNTING_CB = "counting" COUNTING_CB = "counting"
TRIGGER_CB = "trigger" TRIGGER_CB = "trigger"
CHANGE_CB = "changes" CHANGE_CB = "changes"
MEASUREMENT_CB = "measurement"
START_TIME = "start_time" START_TIME = "start_time"
END_TIME = "end_time" END_TIME = "end_time"
TRIGGER_DISTANCES = "trigger_distances" TRIGGER_DISTANCES = "trigger_distances"
@ -15,9 +16,15 @@ END_DISTANCE = "end_distance"
class PeopleCounter: class PeopleCounter:
def __init__(self, sensor: ToFSensor, maxTriggerDistanceInCm: int = 90) -> None: def __init__(self, sensor: ToFSensor, maxTriggerDistanceInCm: int = 90) -> None:
self.sensor = sensor self.sensor = sensor
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []} self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: [], MEASUREMENT_CB: []}
self.maxTriggerDistance = maxTriggerDistanceInCm self.maxTriggerDistance = maxTriggerDistanceInCm
def hookMeasurement(self, cb) -> None:
self.callbacks[MEASUREMENT_CB].append(cb)
def unhookMeasurement(self, cb) -> None:
self.callbacks[MEASUREMENT_CB].remove(cb)
def hookCounting(self, cb) -> None: def hookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].append(cb) self.callbacks[COUNTING_CB].append(cb)
@ -54,6 +61,9 @@ class PeopleCounter:
distance: float = self.sensor.getDistance() distance: float = self.sensor.getDistance()
changed: bool = self.updateState(direction, distance) changed: bool = self.updateState(direction, distance)
th = threading.Thread(target=self.handleMeasurementCallbacks, args=(direction, distance))
th.start()
if changed: if changed:
countChange: int = self.getCountChange(self.directionState) countChange: int = self.getCountChange(self.directionState)
@ -133,6 +143,10 @@ class PeopleCounter:
#! TODO: Should be based on the distance from the ground, not from the sensor #! TODO: Should be based on the distance from the ground, not from the sensor
return distance <= self.maxTriggerDistance return distance <= self.maxTriggerDistance
def handleMeasurementCallbacks(self, direction: Directions, distance: float) -> None:
for cb in self.callbacks[MEASUREMENT_CB]:
cb(direction, distance)
def handleCallbacks(self, countChange: int): def handleCallbacks(self, countChange: int):
self.handleChangeCallbacks(countChange) self.handleChangeCallbacks(countChange)
self.handleCountingCallbacks(countChange) self.handleCountingCallbacks(countChange)

View file

@ -1,9 +1,10 @@
from sensors import VL53L1XSensor, VL53L3CXSensor, PeopleCounter from sensors import VL53L1XSensor, VL53L3CXSensor, PeopleCounter
from statistics.debug_logging import register_debug_logger
from datetime import time from datetime import time
import logging import logging
LOG_FILE_PATH = "log.txt" # Path for logs LOG_FILE_PATH = "log.txt" # Path for logs
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.DEBUG)
# If the distance (in cm) is lower or equal to this value, the people counter will trigger # If the distance (in cm) is lower or equal to this value, the people counter will trigger
MAX_TRIGGER_DISTANCE = 110 MAX_TRIGGER_DISTANCE = 110
@ -32,3 +33,5 @@ hue_conf = {
sensor = VL53L1XSensor() sensor = VL53L1XSensor()
counter: PeopleCounter = PeopleCounter(sensor, MAX_TRIGGER_DISTANCE) # Sensor object counter: PeopleCounter = PeopleCounter(sensor, MAX_TRIGGER_DISTANCE) # Sensor object
register_debug_logger(counter)

View file

@ -0,0 +1,23 @@
from ..sensors.tof_sensor import Directions
from ..sensors.people_counter import PeopleCounter
import logging
import json
def debug_log_change(countChange: int, directionState: dict) -> None:
json_state = json.dumps(directionState)
logging.debug(f"CHANGE;Count Change;{str(countChange)};Direction State;{json_state};")
def debug_log_trigger(triggerState: dict) -> None:
logging.debug(f"TRIGGER;Inside Triggered;{str(triggerState[Directions.INSIDE])};Outside Triggered;{str(triggerState[Directions.OUTSIDE])};")
def debug_log_counting(countChange: int) -> None:
logging.debug(f"COUNTING;Count Change;{str(countChange)};")
def debug_log_measurement(direction: Directions, distance: float) -> None:
logging.debug(f"MEASUREMENT;Direction;{str(direction)};Distance;{str(distance)};")
def register_debug_logger(counter: PeopleCounter) -> None:
counter.hookChange(debug_log_change)
counter.hookCounting(debug_log_counting)
counter.hookTrigger(debug_log_trigger)
counter.hookMeasurement(debug_log_measurement)