Improved code structure and implemented improved async action handling with queue and idle actions

This commit is contained in:
Maximilian Giller 2023-12-18 00:21:37 +01:00
parent feefd55e20
commit f860108b52
4 changed files with 114 additions and 105 deletions

40
src/actions.py Normal file
View file

@ -0,0 +1,40 @@
import asyncio
from datetime import datetime
import os
from config import climate_log_file, dht22_pin
from handler.climate import Dht22Sensor
from handler.matrix import MatrixDisplay
matrix_display = MatrixDisplay()
dht22_sensor = Dht22Sensor(dht22_pin)
async def log_temperature():
# If file does not exist, create it and write header
if not os.path.isfile(climate_log_file):
with open(climate_log_file, "w") as f:
f.write("timestamp,temperature,humidity\n")
while True:
measurements = dht22_sensor.read()
if measurements is not None:
with open(climate_log_file, "a") as f:
f.write(
"{},{},{}\n".format(
datetime.now().isoformat(),
measurements["temperature"],
measurements["humidity"],
)
)
await asyncio.sleep(60)
async def display_time():
while True:
try:
matrix_display.show_current_time()
except Exception as e:
raise "Failed to display time on the matrix display: {e}"
seconds_until_next_minute = 60 - datetime.now().second
await asyncio.sleep(seconds_until_next_minute)

2
src/config.py Normal file
View file

@ -0,0 +1,2 @@
dht22_pin = 17
climate_log_file = "./climate.csv"

View file

@ -0,0 +1,52 @@
import asyncio
from typing import Optional
from enum import Enum
class QueueState(Enum):
STOPPED = 0
IDLE = 1
POPPING = 2 # As in: Popping actions from the queue and performing them
class ActionQueue:
def __init__(self, idle_action=None) -> None:
self.queued_actions: list = []
self.idle_action: Optional[any] = None
self.state = QueueState.STOPPED
self.background_task = None
if idle_action:
self.set_idle_action(idle_action)
async def run_queue(self):
while len(self.queued_actions) > 0:
self.state = QueueState.POPPING
action = self.queued_actions.pop(0)
action[0](*(action[1]), **(action[2]))
self.state = QueueState.IDLE
self.idle_action()
async def stop_queue(self):
if self.background_task is None:
return
self.state = QueueState.STOPPED
self.background_task.cancel()
async def restart_queue(self):
await self.stop_queue()
self.background_task = asyncio.create_task(self.run_queue())
async def add_action_to_queue(self, action, *args, **kwargs):
self.queued_actions.append((action, args, kwargs))
if self.state == QueueState.IDLE:
await self.restart_queue()
async def set_idle_action(self, action):
self.idle_action = action
if self.state == QueueState.IDLE:
await self.restart_queue()

View file

@ -1,13 +1,15 @@
import os
from fastapi import FastAPI, HTTPException
from datetime import datetime
import requests
from handler.history import get_recent_entries
from handler.matrix import MatrixDisplay
from fastapi.middleware.cors import CORSMiddleware
import asyncio import asyncio
from handler.climate import Dht22Sensor
import requests
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from actions import dht22_sensor, display_time, log_temperature, matrix_display
from config import climate_log_file
from handler.action_queue import ActionQueue
from handler.history import get_recent_entries
queue = ActionQueue(display_time)
app = FastAPI() app = FastAPI()
origins = [ origins = [
@ -25,66 +27,18 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )
should_run_time_loop = True
dht22_pin = 17
climate_log_file = "./climate.csv"
matrix_display = MatrixDisplay()
dht22_sensor = Dht22Sensor(dht22_pin)
# Start background service to log temperature and humidity every minute
async def log_temperature():
# If file does not exist, create it and write header
if not os.path.isfile(climate_log_file):
with open(climate_log_file, "w") as f:
f.write("timestamp,temperature,humidity\n")
while True:
measurements = dht22_sensor.read()
if measurements is not None:
with open(climate_log_file, "a") as f:
f.write(
"{},{},{}\n".format(
datetime.now().isoformat(),
measurements["temperature"],
measurements["humidity"],
)
)
await asyncio.sleep(60)
async def display_time():
while should_run_time_loop:
try:
matrix_display.show_current_time()
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display time on the matrix display: {e}",
)
seconds_until_next_minute = 60 - datetime.now().second
await asyncio.sleep(seconds_until_next_minute)
asyncio.create_task(display_time())
asyncio.create_task(log_temperature()) asyncio.create_task(log_temperature())
@app.post("/time") @app.post("/time")
async def start_time_loop(): async def start_time_loop():
global should_run_time_loop await queue.set_idle_action(display_time)
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Time loop started"} return {"message": "Time loop started"}
@app.post("/off") @app.post("/off")
async def turn_off(): async def turn_off():
global should_run_time_loop await queue.set_idle_action(matrix_display.turn_off)
should_run_time_loop = False
matrix_display.turn_off()
return {"message": "Display turned off"} return {"message": "Display turned off"}
@ -94,15 +48,9 @@ async def temperature():
if measurements is None: if measurements is None:
return {"message": "Failed to read temperature"} return {"message": "Failed to read temperature"}
global should_run_time_loop await queue.add_action_to_queue(
was_clock_runnign = should_run_time_loop matrix_display.show_text, "{0:0.1f}*C".format(measurements["temperature"])
should_run_time_loop = False )
matrix_display.show_text("{0:0.1f}*C".format(measurements["temperature"]))
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return measurements return measurements
@ -113,15 +61,9 @@ async def humidity():
if measurements is None: if measurements is None:
return {"message": "Failed to read humidity"} return {"message": "Failed to read humidity"}
global should_run_time_loop await queue.add_action_to_queue(
was_clock_runnign = should_run_time_loop matrix_display.show_text, "{0:0.1f}%".format(measurements["humidity"])
should_run_time_loop = False )
matrix_display.show_text("{0:0.1f}%".format(measurements["humidity"]))
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return measurements return measurements
@ -134,40 +76,13 @@ async def history():
@app.post("/flash") @app.post("/flash")
async def flash(count: int = 1): async def flash(count: int = 1):
global should_run_time_loop await queue.add_action_to_queue(matrix_display.flash, count)
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
matrix_display.flash(count)
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Display flashed"} return {"message": "Display flashed"}
@app.post("/message") @app.post("/message")
async def display_message(body: dict): async def display_message(body: dict):
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
message_text = body.get("message") message_text = body.get("message")
try: await queue.add_action_to_queue(matrix_display.show_text, message_text)
matrix_display.show_text(message_text)
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display message on the matrix display: {e}",
)
finally:
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Message displayed"} return {"message": "Message displayed"}
@app.post("/stop")
async def stop_time_loop():
global should_run_time_loop
should_run_time_loop = False
return {"message": "Time loop stopped"}