First prototype
This commit is contained in:
parent
9af1079c6c
commit
3ea7d7453d
3 changed files with 148 additions and 13 deletions
125
src/peoplecounter.py
Normal file
125
src/peoplecounter.py
Normal file
|
@ -0,0 +1,125 @@
|
|||
from sensor.tofsensor import ToFSensor, Directions
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
COUNTING_CB = "counting"
|
||||
START_TIME = "start"
|
||||
END_TIME = "end"
|
||||
|
||||
|
||||
class PeopleCounter ():
|
||||
def __init__(self, sensor: ToFSensor) -> None:
|
||||
self.sensor = sensor
|
||||
self.callbacks = {COUNTING_CB: []}
|
||||
self.maxTriggerDistance = 120 # In cm
|
||||
|
||||
def hookCounting(self, cb) -> None:
|
||||
self.callbacks[COUNTING_CB].append(cb)
|
||||
|
||||
def unhookCounting(self, cb) -> None:
|
||||
self.callbacks[COUNTING_CB].remove(cb)
|
||||
|
||||
def run(self) -> None:
|
||||
self.keepRunning = True
|
||||
direction = Directions.INSIDE
|
||||
self.directionState = {
|
||||
Directions.INSIDE: {
|
||||
START_TIME: None, END_TIME: None
|
||||
},
|
||||
Directions.OUTSIDE: {
|
||||
START_TIME: None, END_TIME: None
|
||||
}
|
||||
}
|
||||
|
||||
while self.keepRunning:
|
||||
# Switch to other direction
|
||||
direction: Directions = Directions.other(direction)
|
||||
|
||||
self.sensor.setDirection(direction)
|
||||
|
||||
distance: float = self.sensor.getDistance()
|
||||
triggered: bool = self.isTriggerDistance(distance)
|
||||
changed: bool = self.updateState(direction, triggered)
|
||||
|
||||
if changed:
|
||||
countChange: int = self.getCountChange(self.directionState)
|
||||
self.handleCallbacks(countChange)
|
||||
|
||||
def getCountChange(self, directionState) -> int:
|
||||
# Is valid?
|
||||
for direction in Directions:
|
||||
if directionState[direction][START_TIME] is None or directionState[direction][END_TIME] is None:
|
||||
return 0 # Return no change if not valid
|
||||
|
||||
# Get times into variables
|
||||
insideStart = directionState[Directions.INSIDE][START_TIME]
|
||||
insideEnd = directionState[Directions.INSIDE][END_TIME]
|
||||
outsideStart = directionState[Directions.OUTSIDE][START_TIME]
|
||||
outsideEnd = directionState[Directions.OUTSIDE][END_TIME]
|
||||
|
||||
# In what direction is the doorframe entered and left?
|
||||
# Entering doorframe in the inside direction
|
||||
enteringInside: bool = outsideStart < insideStart
|
||||
# Leaving dooframe in the inside direction
|
||||
leavingInside: bool = outsideEnd < insideEnd
|
||||
|
||||
# They have to be the same, otherwise they switch directions in between
|
||||
if enteringInside != leavingInside:
|
||||
# Someone did not go all the way
|
||||
# Either
|
||||
# Inside -######-
|
||||
# Outside ---##---
|
||||
# or
|
||||
# Inside ---##---
|
||||
# Outside -######-
|
||||
return 0
|
||||
|
||||
# Are those times overlapping or disjunct?
|
||||
if insideEnd < outsideStart or outsideEnd < insideStart:
|
||||
# They are disjunct
|
||||
# Either
|
||||
# Inside -##-----
|
||||
# Outside -----##-
|
||||
# or
|
||||
# Inside -----##-
|
||||
# Outside -##-----
|
||||
return 0
|
||||
|
||||
# What direction is the person taking?
|
||||
if enteringInside:
|
||||
# Entering the inside
|
||||
# Inside ---####-
|
||||
# Outside -####---
|
||||
return 1
|
||||
else:
|
||||
# Leaving the inside
|
||||
# Inside -####---
|
||||
# Outside ---####-
|
||||
return -1
|
||||
|
||||
def isTriggerDistance(self, distance: float) -> bool:
|
||||
#! TODO: Should be based on the distance from the ground, not them the sensor
|
||||
return distance <= self.maxTriggerDistance
|
||||
|
||||
def handleCallbacks(self, countChange: int) -> None:
|
||||
if countChange == 0:
|
||||
# Do nothing if there is no change
|
||||
return
|
||||
|
||||
for cb in self.callbacks[COUNTING_CB]:
|
||||
cb(countChange)
|
||||
|
||||
def updateState(self, direction: Directions, triggered: bool) -> bool:
|
||||
currentlyTriggered = self.directionState[direction] == None
|
||||
|
||||
if triggered and not currentlyTriggered:
|
||||
# Set as new beginning for this direction
|
||||
self.directionState[direction][START_TIME] = datetime.now()
|
||||
self.directionState[direction][END_TIME] = None
|
||||
return True
|
||||
elif not triggered and currentlyTriggered:
|
||||
# Set as new end for this direction
|
||||
self.directionState[direction][END_TIME] = datetime.now()
|
||||
return True
|
||||
|
||||
return False
|
|
@ -1,16 +1,25 @@
|
|||
from enum import Enum
|
||||
|
||||
|
||||
class Direction(Enum):
|
||||
INDOOR = "indoor"
|
||||
OUTDOOR = "outdoor"
|
||||
class Directions(Enum):
|
||||
INSIDE = "indoor"
|
||||
OUTSIDE = "outdoor"
|
||||
|
||||
def other(self, direction: Direction) -> Direction:
|
||||
if direction is Directions.INSIDE:
|
||||
return Directions.OUTSIDE
|
||||
else:
|
||||
return Directions.INSIDE
|
||||
|
||||
def __iter__(self):
|
||||
return [self.INSIDE, self.OUTSIDE]
|
||||
|
||||
|
||||
class ToFSensor:
|
||||
def open(self):
|
||||
def open(self) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
def setDirection(self, direction: Direction):
|
||||
def setDirection(self, direction: Directions) -> None:
|
||||
"""Configure sensor to pick up the distance in a specific direction.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
@ -19,6 +28,6 @@ class ToFSensor:
|
|||
"""Returns new distance in cm.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def close(self):
|
||||
|
||||
def close(self) -> None:
|
||||
raise NotImplementedError()
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from sensor.tofsensor import Direction, ToFSensor
|
||||
from sensor.tofsensor import Directions, ToFSensor
|
||||
import VL53L1X
|
||||
|
||||
# Reference: https://github.com/pimoroni/vl53l1x-python
|
||||
|
@ -22,7 +22,7 @@ class VL53L1XSensor (TofSensor):
|
|||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
def open(self):
|
||||
def open(self) -> None:
|
||||
self.sensor = VL53L1X.VL53L1X(i2c_bus=1, i2c_address=0x29)
|
||||
self.sensor.open()
|
||||
|
||||
|
@ -37,12 +37,13 @@ class VL53L1XSensor (TofSensor):
|
|||
# 1 = Short Range
|
||||
# 2 = Medium Range
|
||||
# 3 = Long Range
|
||||
def setDirection(self, direction: Direction):
|
||||
|
||||
def setDirection(self, direction: Directions) -> None:
|
||||
"""Configure sensor to pick up the distance in a specific direction.
|
||||
"""
|
||||
direction_roi = {
|
||||
Direction.INDOOR: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0),
|
||||
Direction.OUTDOOR: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12)
|
||||
Directions.INSIDE: VL53L1X.VL53L1xUserRoi(6, 3, 9, 0),
|
||||
Directions.OUTSIDE: VL53L1X.VL53L1xUserRoi(6, 15, 9, 12)
|
||||
}
|
||||
|
||||
roi = direction_roi[direction]
|
||||
|
@ -58,6 +59,6 @@ class VL53L1XSensor (TofSensor):
|
|||
|
||||
return distance / 10
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
self.sensor.stop_ranging()
|
||||
self.sensor.close()
|
||||
|
|
Loading…
Reference in a new issue