diff --git a/src/sensors/__init__.py b/src/sensors/__init__.py index 54354ab..f4951fb 100644 --- a/src/sensors/__init__.py +++ b/src/sensors/__init__.py @@ -1,2 +1 @@ from sensors.tof_sensor import ToFSensor, Directions -from sensors.people_counter import PeopleCounter diff --git a/src/sensors/people_counter.py b/src/sensors/people_counter.py index 9c3b75d..e76a46c 100644 --- a/src/sensors/people_counter.py +++ b/src/sensors/people_counter.py @@ -1,4 +1,3 @@ -from typing import Dict from sensors import ToFSensor, Directions from datetime import datetime import threading @@ -13,11 +12,11 @@ TRIGGER_DISTANCES = "trigger_distances" END_DISTANCE = "end_distance" -class PeopleCounter (): +class PeopleCounter: def __init__(self, sensor: ToFSensor) -> None: self.sensor = sensor self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []} - self.maxTriggerDistance = 120 # In cm + self.maxTriggerDistance = 120 # In cm def hookCounting(self, cb) -> None: self.callbacks[COUNTING_CB].append(cb) @@ -37,11 +36,8 @@ class PeopleCounter (): def unhookChange(self, cb) -> None: self.callbacks[CHANGE_CB].remove(cb) - def getInitialDirectionState(self) -> Dict: - return { - Directions.INSIDE: [], - Directions.OUTSIDE: [] - } + def getInitialDirectionState(self) -> dict[Directions, list]: + return {Directions.INSIDE: [], Directions.OUTSIDE: []} def run(self) -> None: self.keepRunning = True @@ -60,13 +56,15 @@ class PeopleCounter (): if changed: countChange: int = self.getCountChange(self.directionState) - + # Hooks th = threading.Thread(target=self.handleCallbacks, args=(countChange,)) th.start() # Reset state if state is finalised - if not self.isDirectionTriggered(Directions.INSIDE) and not self.isDirectionTriggered(Directions.OUTSIDE): + if not self.isDirectionTriggered( + Directions.INSIDE + ) and not self.isDirectionTriggered(Directions.OUTSIDE): self.directionState = self.getInitialDirectionState() self.sensor.close() @@ -79,8 +77,11 @@ class PeopleCounter (): return 0 # Did every record start and end? - if directionState[direction][0][START_TIME] is None or directionState[direction][-1][END_TIME] is None: - return 0 # Return no change if not valid + if ( + directionState[direction][0][START_TIME] is None + or directionState[direction][-1][END_TIME] is None + ): + return 0 # Return no change if not valid # Get times into variables insideStart = directionState[Directions.INSIDE][0][START_TIME] @@ -131,7 +132,7 @@ class PeopleCounter (): def isTriggerDistance(self, distance: float) -> bool: #! TODO: Should be based on the distance from the ground, not from the sensor return distance <= self.maxTriggerDistance - + def handleCallbacks(self, countChange: int): self.handleChangeCallbacks(countChange) self.handleCountingCallbacks(countChange) @@ -148,34 +149,39 @@ class PeopleCounter (): def handleTriggerCallbacks(self) -> None: triggerState = { Directions.INSIDE: self.isDirectionTriggered(Directions.INSIDE), - Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE) + Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE), } - + for cb in self.callbacks[TRIGGER_CB]: cb(triggerState) def handleChangeCallbacks(self, countChange: int) -> None: for cb in self.callbacks[CHANGE_CB]: cb(countChange, self.directionState) - + def isDirectionTriggered(self, direction: Directions) -> bool: - return len(self.directionState[direction]) > 0 and self.directionState[direction][-1][END_TIME] is None + return ( + len(self.directionState[direction]) > 0 + and self.directionState[direction][-1][END_TIME] is None + ) def updateState(self, direction: Directions, distance: float) -> bool: triggered: bool = self.isTriggerDistance(distance) - + previouslyTriggered = False if len(self.directionState[direction]) > 0: previouslyTriggered = self.directionState[direction][-1][END_TIME] is None if triggered and not previouslyTriggered: # Set as new beginning for this direction - self.directionState[direction].append({ - START_TIME: datetime.now(), - END_TIME: None, - TRIGGER_DISTANCES: [distance], - END_DISTANCE: None - }) + self.directionState[direction].append( + { + START_TIME: datetime.now(), + END_TIME: None, + TRIGGER_DISTANCES: [distance], + END_DISTANCE: None, + } + ) return True elif not triggered and previouslyTriggered: # Set as end for this direction diff --git a/src/sensors/tof_sensor.py b/src/sensors/tof_sensor.py index def4206..f16389b 100644 --- a/src/sensors/tof_sensor.py +++ b/src/sensors/tof_sensor.py @@ -5,7 +5,7 @@ class Directions(str, Enum): INSIDE = "indoor" OUTSIDE = "outdoor" - def other(direction: 'Direction') -> 'Direction': + def other(direction: "Directions") -> "Directions": if direction is Directions.INSIDE: return Directions.OUTSIDE else: @@ -20,13 +20,11 @@ class ToFSensor: raise NotImplementedError() def setDirection(self, direction: Directions) -> None: - """Configure sensor to pick up the distance in a specific direction. - """ + """Configure sensor to pick up the distance in a specific direction.""" raise NotImplementedError() def getDistance(self) -> float: - """Returns new distance in cm. - """ + """Returns new distance in cm.""" raise NotImplementedError() def close(self) -> None: diff --git a/src/sensors/vl53l1x_sensor.py b/src/sensors/vl53l1x_sensor.py index 9cd01d0..da2c29a 100644 --- a/src/sensors/vl53l1x_sensor.py +++ b/src/sensors/vl53l1x_sensor.py @@ -1,4 +1,4 @@ -from sensor.tof_sensor import Directions, ToFSensor +from sensors import Directions, ToFSensor import VL53L1X # Reference: https://github.com/pimoroni/vl53l1x-python @@ -18,7 +18,7 @@ import VL53L1X # -class VL53L1XSensor (ToFSensor): +class VL53L1XSensor(ToFSensor): def __init__(self) -> None: super().__init__() @@ -39,11 +39,10 @@ class VL53L1XSensor (ToFSensor): # 3 = Long Range def setDirection(self, direction: Directions) -> None: - """Configure sensor to pick up the distance in a specific direction. - """ + """Configure sensor to pick up the distance in a specific direction.""" direction_roi = { Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0), - Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12) + Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12), } roi = direction_roi[direction] @@ -53,8 +52,7 @@ class VL53L1XSensor (ToFSensor): self.sensor.start_ranging(self.ranging) def getDistance(self) -> float: - """Returns new distance in cm. - """ + """Returns new distance in cm.""" distance = self.sensor.get_distance() return distance / 10