2021-12-03 16:36:20 +01:00
|
|
|
from typing import Dict
|
2021-11-03 00:37:17 +01:00
|
|
|
from sensor.tof_sensor import ToFSensor, Directions
|
2021-10-07 01:37:48 +02:00
|
|
|
from datetime import datetime
|
2021-12-03 16:48:26 +01:00
|
|
|
import threading
|
2021-10-07 01:37:48 +02:00
|
|
|
|
|
|
|
|
|
|
|
COUNTING_CB = "counting"
|
2021-12-03 17:22:00 +01:00
|
|
|
TRIGGER_CB = "trigger"
|
2021-12-03 16:36:20 +01:00
|
|
|
CHANGE_CB = "changes"
|
2021-10-07 01:37:48 +02:00
|
|
|
START_TIME = "start"
|
|
|
|
END_TIME = "end"
|
|
|
|
|
|
|
|
|
|
|
|
class PeopleCounter ():
|
|
|
|
def __init__(self, sensor: ToFSensor) -> None:
|
|
|
|
self.sensor = sensor
|
2021-12-03 17:22:00 +01:00
|
|
|
self.callbacks = {COUNTING_CB: [], TRIGGER_CB: [], CHANGE_CB: []}
|
2021-10-07 01:37:48 +02:00
|
|
|
self.maxTriggerDistance = 120 # In cm
|
|
|
|
|
|
|
|
def hookCounting(self, cb) -> None:
|
|
|
|
self.callbacks[COUNTING_CB].append(cb)
|
|
|
|
|
|
|
|
def unhookCounting(self, cb) -> None:
|
|
|
|
self.callbacks[COUNTING_CB].remove(cb)
|
|
|
|
|
2021-12-03 17:22:00 +01:00
|
|
|
def hookTrigger(self, cb) -> None:
|
|
|
|
self.callbacks[TRIGGER_CB].append(cb)
|
2021-12-03 16:36:20 +01:00
|
|
|
|
2021-12-03 17:22:00 +01:00
|
|
|
def unhookTrigger(self, cb) -> None:
|
|
|
|
self.callbacks[TRIGGER_CB].remove(cb)
|
2021-12-03 16:36:20 +01:00
|
|
|
|
|
|
|
def hookChange(self, cb) -> None:
|
|
|
|
self.callbacks[CHANGE_CB].append(cb)
|
|
|
|
|
|
|
|
def unhookChange(self, cb) -> None:
|
|
|
|
self.callbacks[CHANGE_CB].remove(cb)
|
|
|
|
|
|
|
|
def getInitialDirectionState(self) -> Dict:
|
|
|
|
return {
|
|
|
|
Directions.INSIDE: [],
|
|
|
|
Directions.OUTSIDE: []
|
|
|
|
}
|
|
|
|
|
2021-10-07 01:37:48 +02:00
|
|
|
def run(self) -> None:
|
|
|
|
self.keepRunning = True
|
|
|
|
direction = Directions.INSIDE
|
2021-12-03 16:36:20 +01:00
|
|
|
self.directionState = self.getInitialDirectionState()
|
2021-10-07 01:37:48 +02:00
|
|
|
|
2021-10-07 12:28:04 +02:00
|
|
|
self.sensor.open()
|
2021-10-07 01:37:48 +02:00
|
|
|
while self.keepRunning:
|
|
|
|
# Switch to other direction
|
|
|
|
direction: Directions = Directions.other(direction)
|
|
|
|
|
|
|
|
self.sensor.setDirection(direction)
|
|
|
|
|
|
|
|
distance: float = self.sensor.getDistance()
|
|
|
|
triggered: bool = self.isTriggerDistance(distance)
|
|
|
|
changed: bool = self.updateState(direction, triggered)
|
|
|
|
|
|
|
|
if changed:
|
|
|
|
countChange: int = self.getCountChange(self.directionState)
|
2021-12-03 17:22:00 +01:00
|
|
|
|
|
|
|
# Hooks
|
2021-12-03 16:43:36 +01:00
|
|
|
self.handleChangeCallbacks(countChange)
|
|
|
|
self.handleCountingCallbacks(countChange)
|
2021-12-03 17:22:00 +01:00
|
|
|
self.handleTriggerCallbacks()
|
2021-10-07 12:33:00 +02:00
|
|
|
|
2021-12-03 16:36:20 +01:00
|
|
|
# Reset records
|
|
|
|
self.directionState = self.getInitialDirectionState()
|
|
|
|
|
2021-10-07 12:28:04 +02:00
|
|
|
self.sensor.close()
|
2021-10-07 01:37:48 +02:00
|
|
|
|
|
|
|
def getCountChange(self, directionState) -> int:
|
|
|
|
# Is valid?
|
|
|
|
for direction in Directions:
|
2021-12-03 16:36:20 +01:00
|
|
|
# Is there at least one record for every direction?
|
2021-12-03 17:39:38 +01:00
|
|
|
if len(directionState[direction]) <= 0:
|
2021-12-03 16:36:20 +01:00
|
|
|
return 0
|
|
|
|
|
|
|
|
# Did every record start and end?
|
|
|
|
if directionState[direction][0][START_TIME] is None or directionState[direction][-1][END_TIME] is None:
|
2021-10-07 01:37:48 +02:00
|
|
|
return 0 # Return no change if not valid
|
|
|
|
|
|
|
|
# Get times into variables
|
2021-12-03 16:36:20 +01:00
|
|
|
insideStart = directionState[Directions.INSIDE][0][START_TIME]
|
|
|
|
insideEnd = directionState[Directions.INSIDE][-1][END_TIME]
|
|
|
|
outsideStart = directionState[Directions.OUTSIDE][0][START_TIME]
|
|
|
|
outsideEnd = directionState[Directions.OUTSIDE][-1][END_TIME]
|
2021-10-07 01:37:48 +02:00
|
|
|
|
|
|
|
# In what direction is the doorframe entered and left?
|
|
|
|
# Entering doorframe in the inside direction
|
|
|
|
enteringInside: bool = outsideStart < insideStart
|
|
|
|
# Leaving dooframe in the inside direction
|
|
|
|
leavingInside: bool = outsideEnd < insideEnd
|
|
|
|
|
|
|
|
# They have to be the same, otherwise they switch directions in between
|
|
|
|
if enteringInside != leavingInside:
|
|
|
|
# Someone did not go all the way
|
|
|
|
# Either
|
|
|
|
# Inside -######-
|
|
|
|
# Outside ---##---
|
|
|
|
# or
|
|
|
|
# Inside ---##---
|
|
|
|
# Outside -######-
|
|
|
|
return 0
|
|
|
|
|
|
|
|
# Are those times overlapping or disjunct?
|
|
|
|
if insideEnd < outsideStart or outsideEnd < insideStart:
|
|
|
|
# They are disjunct
|
|
|
|
# Either
|
|
|
|
# Inside -##-----
|
|
|
|
# Outside -----##-
|
|
|
|
# or
|
|
|
|
# Inside -----##-
|
|
|
|
# Outside -##-----
|
|
|
|
return 0
|
|
|
|
|
|
|
|
# What direction is the person taking?
|
|
|
|
if enteringInside:
|
|
|
|
# Entering the inside
|
|
|
|
# Inside ---####-
|
|
|
|
# Outside -####---
|
|
|
|
return 1
|
|
|
|
else:
|
|
|
|
# Leaving the inside
|
|
|
|
# Inside -####---
|
|
|
|
# Outside ---####-
|
|
|
|
return -1
|
|
|
|
|
|
|
|
def isTriggerDistance(self, distance: float) -> bool:
|
|
|
|
#! TODO: Should be based on the distance from the ground, not them the sensor
|
|
|
|
return distance <= self.maxTriggerDistance
|
|
|
|
|
2021-12-03 16:43:36 +01:00
|
|
|
def handleCountingCallbacks(self, countChange: int) -> None:
|
|
|
|
# Only notify counting on actual count change
|
2021-10-07 01:37:48 +02:00
|
|
|
if countChange == 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
for cb in self.callbacks[COUNTING_CB]:
|
2021-12-03 16:48:26 +01:00
|
|
|
th = threading.Thread(target=cb, args=(countChange,))
|
|
|
|
th.start()
|
2021-10-07 01:37:48 +02:00
|
|
|
|
2021-12-03 17:22:00 +01:00
|
|
|
def handleTriggerCallbacks(self) -> None:
|
2021-12-03 17:57:34 +01:00
|
|
|
insideTrigger = len(self.directionState[Directions.INSIDE]) > 0 and self.directionState[Directions.INSIDE][-1][END_TIME] is None
|
|
|
|
outsideTrigger = len(self.directionState[Directions.OUTSIDE]) > 0 and self.directionState[Directions.OUTSIDE][-1][END_TIME] is None
|
2021-12-03 17:22:00 +01:00
|
|
|
|
|
|
|
triggerState = {
|
|
|
|
Directions.INSIDE: insideTrigger,
|
|
|
|
Directions.OUTSIDE: outsideTrigger
|
|
|
|
}
|
|
|
|
|
|
|
|
for cb in self.callbacks[TRIGGER_CB]:
|
|
|
|
th = threading.Thread(target=cb, args=(triggerState,))
|
2021-12-03 16:48:26 +01:00
|
|
|
th.start()
|
2021-12-03 16:43:36 +01:00
|
|
|
|
|
|
|
def handleChangeCallbacks(self, countChange: int) -> None:
|
|
|
|
for cb in self.callbacks[CHANGE_CB]:
|
2021-12-03 16:48:26 +01:00
|
|
|
th = threading.Thread(target=cb, args=(countChange, self.directionState))
|
|
|
|
th.start()
|
2021-12-03 16:43:36 +01:00
|
|
|
|
2021-12-03 16:36:20 +01:00
|
|
|
def getDirectionTime(self, direction: Directions, time: str) -> datetime:
|
|
|
|
if len(self.directionState[direction]) <= 0:
|
|
|
|
return None
|
|
|
|
|
|
|
|
return self.directionState[direction][-1][time]
|
|
|
|
|
2021-10-07 01:37:48 +02:00
|
|
|
def updateState(self, direction: Directions, triggered: bool) -> bool:
|
2021-12-03 16:36:20 +01:00
|
|
|
currentlyTriggered = False
|
|
|
|
if len(self.directionState[direction]) > 0:
|
|
|
|
currentlyTriggered = self.getDirectionTime(
|
|
|
|
direction, END_TIME) is None
|
2021-10-07 01:37:48 +02:00
|
|
|
|
|
|
|
if triggered and not currentlyTriggered:
|
|
|
|
# Set as new beginning for this direction
|
2021-12-03 16:36:20 +01:00
|
|
|
self.directionState[direction].append({
|
|
|
|
START_TIME: datetime.now(),
|
|
|
|
END_TIME: None
|
|
|
|
})
|
2021-10-07 01:37:48 +02:00
|
|
|
return True
|
|
|
|
elif not triggered and currentlyTriggered:
|
2021-12-03 16:36:20 +01:00
|
|
|
# Set as end for this direction
|
|
|
|
self.directionState[direction][-1][END_TIME] = datetime.now()
|
2021-10-07 01:37:48 +02:00
|
|
|
return True
|
|
|
|
|
|
|
|
return False
|