Implemented sensor data series analysation

This commit is contained in:
Maximilian Giller 2021-12-03 16:36:20 +01:00
parent 4a11a3d990
commit 22db4a8283

View file

@ -1,9 +1,12 @@
from typing import Dict
from sensor.tof_sensor import ToFSensor, Directions from sensor.tof_sensor import ToFSensor, Directions
from datetime import datetime from datetime import datetime
import logging import logging
COUNTING_CB = "counting" COUNTING_CB = "counting"
ACTIVITY_CB = "activity"
CHANGE_CB = "changes"
START_TIME = "start" START_TIME = "start"
END_TIME = "end" END_TIME = "end"
@ -11,7 +14,7 @@ END_TIME = "end"
class PeopleCounter (): class PeopleCounter ():
def __init__(self, sensor: ToFSensor) -> None: def __init__(self, sensor: ToFSensor) -> None:
self.sensor = sensor self.sensor = sensor
self.callbacks = {COUNTING_CB: []} self.callbacks = {COUNTING_CB: [], ACTIVITY_CB: [], CHANGE_CB: []}
self.maxTriggerDistance = 120 # In cm self.maxTriggerDistance = 120 # In cm
def hookCounting(self, cb) -> None: def hookCounting(self, cb) -> None:
@ -20,17 +23,28 @@ class PeopleCounter ():
def unhookCounting(self, cb) -> None: def unhookCounting(self, cb) -> None:
self.callbacks[COUNTING_CB].remove(cb) self.callbacks[COUNTING_CB].remove(cb)
def hookActivity(self, cb) -> None:
self.callbacks[ACTIVITY_CB].append(cb)
def unhookActivity(self, cb) -> None:
self.callbacks[ACTIVITY_CB].remove(cb)
def hookChange(self, cb) -> None:
self.callbacks[CHANGE_CB].append(cb)
def unhookChange(self, cb) -> None:
self.callbacks[CHANGE_CB].remove(cb)
def getInitialDirectionState(self) -> Dict:
return {
Directions.INSIDE: [],
Directions.OUTSIDE: []
}
def run(self) -> None: def run(self) -> None:
self.keepRunning = True self.keepRunning = True
direction = Directions.INSIDE direction = Directions.INSIDE
self.directionState = { self.directionState = self.getInitialDirectionState()
Directions.INSIDE: {
START_TIME: None, END_TIME: None
},
Directions.OUTSIDE: {
START_TIME: None, END_TIME: None
}
}
self.sensor.open() self.sensor.open()
while self.keepRunning: while self.keepRunning:
@ -47,19 +61,27 @@ class PeopleCounter ():
countChange: int = self.getCountChange(self.directionState) countChange: int = self.getCountChange(self.directionState)
self.handleCallbacks(countChange) self.handleCallbacks(countChange)
# Reset records
self.directionState = self.getInitialDirectionState()
self.sensor.close() self.sensor.close()
def getCountChange(self, directionState) -> int: def getCountChange(self, directionState) -> int:
# Is valid? # Is valid?
for direction in Directions: for direction in Directions:
if directionState[direction][START_TIME] is None or directionState[direction][END_TIME] is None: # Is there at least one record for every direction?
if len(directionState[directionState]) <= 0:
return 0
# Did every record start and end?
if directionState[direction][0][START_TIME] is None or directionState[direction][-1][END_TIME] is None:
return 0 # Return no change if not valid return 0 # Return no change if not valid
# Get times into variables # Get times into variables
insideStart = directionState[Directions.INSIDE][START_TIME] insideStart = directionState[Directions.INSIDE][0][START_TIME]
insideEnd = directionState[Directions.INSIDE][END_TIME] insideEnd = directionState[Directions.INSIDE][-1][END_TIME]
outsideStart = directionState[Directions.OUTSIDE][START_TIME] outsideStart = directionState[Directions.OUTSIDE][0][START_TIME]
outsideEnd = directionState[Directions.OUTSIDE][END_TIME] outsideEnd = directionState[Directions.OUTSIDE][-1][END_TIME]
# In what direction is the doorframe entered and left? # In what direction is the doorframe entered and left?
# Entering doorframe in the inside direction # Entering doorframe in the inside direction
@ -113,17 +135,28 @@ class PeopleCounter ():
for cb in self.callbacks[COUNTING_CB]: for cb in self.callbacks[COUNTING_CB]:
cb(countChange) cb(countChange)
def getDirectionTime(self, direction: Directions, time: str) -> datetime:
if len(self.directionState[direction]) <= 0:
return None
return self.directionState[direction][-1][time]
def updateState(self, direction: Directions, triggered: bool) -> bool: def updateState(self, direction: Directions, triggered: bool) -> bool:
currentlyTriggered = self.directionState[direction][END_TIME] == None currentlyTriggered = False
if len(self.directionState[direction]) > 0:
currentlyTriggered = self.getDirectionTime(
direction, END_TIME) is None
if triggered and not currentlyTriggered: if triggered and not currentlyTriggered:
# Set as new beginning for this direction # Set as new beginning for this direction
self.directionState[direction][START_TIME] = datetime.now() self.directionState[direction].append({
self.directionState[direction][END_TIME] = None START_TIME: datetime.now(),
END_TIME: None
})
return True return True
elif not triggered and currentlyTriggered: elif not triggered and currentlyTriggered:
# Set as new end for this direction # Set as end for this direction
self.directionState[direction][END_TIME] = datetime.now() self.directionState[direction][-1][END_TIME] = datetime.now()
return True return True
return False return False