Created proper sensor package

This commit is contained in:
Maximilian Giller 2022-08-06 00:11:53 +02:00
parent 4886983f96
commit e9db6ec497
4 changed files with 38 additions and 37 deletions

View file

@ -1,2 +1 @@
from sensors.tof_sensor import ToFSensor, Directions
from sensors.people_counter import PeopleCounter

View file

@ -1,4 +1,3 @@
from typing import Dict
from sensors import ToFSensor, Directions
from datetime import datetime
import threading
@ -13,11 +12,11 @@ TRIGGER_DISTANCES = "trigger_distances"
END_DISTANCE = "end_distance"
class PeopleCounter ():
class PeopleCounter:
def __init__(self, sensor: ToFSensor) -> None:
self.sensor = sensor
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []}
self.maxTriggerDistance = 120 # In cm
self.maxTriggerDistance = 120 # In cm
def hookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].append(cb)
@ -37,11 +36,8 @@ class PeopleCounter ():
def unhookChange(self, cb) -> None:
self.callbacks[CHANGE_CB].remove(cb)
def getInitialDirectionState(self) -> Dict:
return {
Directions.INSIDE: [],
Directions.OUTSIDE: []
}
def getInitialDirectionState(self) -> dict[Directions, list]:
return {Directions.INSIDE: [], Directions.OUTSIDE: []}
def run(self) -> None:
self.keepRunning = True
@ -60,13 +56,15 @@ class PeopleCounter ():
if changed:
countChange: int = self.getCountChange(self.directionState)
# Hooks
th = threading.Thread(target=self.handleCallbacks, args=(countChange,))
th.start()
# Reset state if state is finalised
if not self.isDirectionTriggered(Directions.INSIDE) and not self.isDirectionTriggered(Directions.OUTSIDE):
if not self.isDirectionTriggered(
Directions.INSIDE
) and not self.isDirectionTriggered(Directions.OUTSIDE):
self.directionState = self.getInitialDirectionState()
self.sensor.close()
@ -79,8 +77,11 @@ class PeopleCounter ():
return 0
# Did every record start and end?
if directionState[direction][0][START_TIME] is None or directionState[direction][-1][END_TIME] is None:
return 0 # Return no change if not valid
if (
directionState[direction][0][START_TIME] is None
or directionState[direction][-1][END_TIME] is None
):
return 0 # Return no change if not valid
# Get times into variables
insideStart = directionState[Directions.INSIDE][0][START_TIME]
@ -131,7 +132,7 @@ class PeopleCounter ():
def isTriggerDistance(self, distance: float) -> bool:
#! TODO: Should be based on the distance from the ground, not from the sensor
return distance <= self.maxTriggerDistance
def handleCallbacks(self, countChange: int):
self.handleChangeCallbacks(countChange)
self.handleCountingCallbacks(countChange)
@ -148,34 +149,39 @@ class PeopleCounter ():
def handleTriggerCallbacks(self) -> None:
triggerState = {
Directions.INSIDE: self.isDirectionTriggered(Directions.INSIDE),
Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE)
Directions.OUTSIDE: self.isDirectionTriggered(Directions.OUTSIDE),
}
for cb in self.callbacks[TRIGGER_CB]:
cb(triggerState)
def handleChangeCallbacks(self, countChange: int) -> None:
for cb in self.callbacks[CHANGE_CB]:
cb(countChange, self.directionState)
def isDirectionTriggered(self, direction: Directions) -> bool:
return len(self.directionState[direction]) > 0 and self.directionState[direction][-1][END_TIME] is None
return (
len(self.directionState[direction]) > 0
and self.directionState[direction][-1][END_TIME] is None
)
def updateState(self, direction: Directions, distance: float) -> bool:
triggered: bool = self.isTriggerDistance(distance)
previouslyTriggered = False
if len(self.directionState[direction]) > 0:
previouslyTriggered = self.directionState[direction][-1][END_TIME] is None
if triggered and not previouslyTriggered:
# Set as new beginning for this direction
self.directionState[direction].append({
START_TIME: datetime.now(),
END_TIME: None,
TRIGGER_DISTANCES: [distance],
END_DISTANCE: None
})
self.directionState[direction].append(
{
START_TIME: datetime.now(),
END_TIME: None,
TRIGGER_DISTANCES: [distance],
END_DISTANCE: None,
}
)
return True
elif not triggered and previouslyTriggered:
# Set as end for this direction

View file

@ -5,7 +5,7 @@ class Directions(str, Enum):
INSIDE = "indoor"
OUTSIDE = "outdoor"
def other(direction: 'Direction') -> 'Direction':
def other(direction: "Directions") -> "Directions":
if direction is Directions.INSIDE:
return Directions.OUTSIDE
else:
@ -20,13 +20,11 @@ class ToFSensor:
raise NotImplementedError()
def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction.
"""
"""Configure sensor to pick up the distance in a specific direction."""
raise NotImplementedError()
def getDistance(self) -> float:
"""Returns new distance in cm.
"""
"""Returns new distance in cm."""
raise NotImplementedError()
def close(self) -> None:

View file

@ -1,4 +1,4 @@
from sensor.tof_sensor import Directions, ToFSensor
from sensors import Directions, ToFSensor
import VL53L1X
# Reference: https://github.com/pimoroni/vl53l1x-python
@ -18,7 +18,7 @@ import VL53L1X
#
class VL53L1XSensor (ToFSensor):
class VL53L1XSensor(ToFSensor):
def __init__(self) -> None:
super().__init__()
@ -39,11 +39,10 @@ class VL53L1XSensor (ToFSensor):
# 3 = Long Range
def setDirection(self, direction: Directions) -> None:
"""Configure sensor to pick up the distance in a specific direction.
"""
"""Configure sensor to pick up the distance in a specific direction."""
direction_roi = {
Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0),
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12)
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12),
}
roi = direction_roi[direction]
@ -53,8 +52,7 @@ class VL53L1XSensor (ToFSensor):
self.sensor.start_ranging(self.ranging)
def getDistance(self) -> float:
"""Returns new distance in cm.
"""
"""Returns new distance in cm."""
distance = self.sensor.get_distance()
return distance / 10