Implemented matrix and climate abstractions, and fixed startup

This commit is contained in:
Maximilian Giller 2023-12-24 15:07:26 +01:00
parent 393b14374b
commit d9cd7a8688
9 changed files with 141 additions and 23 deletions

View file

@ -1,12 +1,34 @@
import asyncio
from datetime import datetime
import logging
import os
from config import climate_log_file, dht22_pin
from handler.climate import Dht22Sensor
from handler.matrix import MatrixDisplay
matrix_display = MatrixDisplay()
dht22_sensor = Dht22Sensor(dht22_pin)
from handler.matrix.matrix_display import MatrixDisplayAdapter
from handler.climate.climate import ClimateAdapter
climate_sensor: ClimateAdapter = None
matrix_display: MatrixDisplayAdapter = None
try:
from handler.matrix.luma_matrix_display import LumaMatrixDisplay
matrix_display = LumaMatrixDisplay()
except:
from handler.matrix.dummy_matrix_display import DummyMatrixDisplay
logging.info("Using Dummy Matrix Display")
matrix_display = DummyMatrixDisplay()
try:
from handler.climate.dht22_climate import Dht22Climate
climate_sensor = Dht22Climate(dht22_pin)
except:
from handler.climate.dummy_climate import DummyClimate
logging.info("Using Dummy Climate")
climate_sensor = DummyClimate()
async def log_temperature():
@ -16,7 +38,7 @@ async def log_temperature():
f.write("timestamp,temperature,humidity\n")
while True:
measurements = dht22_sensor.read()
measurements = climate_sensor.read()
if measurements is not None:
with open(climate_log_file, "a") as f:
f.write(

View file

@ -20,6 +20,10 @@ class ActionQueue:
while len(self.queued_actions) > 0:
self.state = QueueState.POPPING
action = self.queued_actions.pop(0)
if action is None:
break
await action[0](*(action[1]), **(action[2]))
self.state = QueueState.IDLE

View file

@ -0,0 +1,11 @@
class ClimateAdapter:
def __init__(self):
self.last_read = None
def read(self):
raise NotImplemented()
def get_last_read(self):
if self.last_read is None:
return self.read()
return self.last_read

View file

@ -1,20 +1,17 @@
import Adafruit_DHT
from handler.climate.climate import ClimateAdapter
class Dht22Sensor:
class Dht22Climate(ClimateAdapter):
def __init__(self, pin):
super().__init__()
self.sensor = Adafruit_DHT.AM2302
self.pin = pin
self.last_read = None
def read(self):
humidity, temperature = Adafruit_DHT.read_retry(self.sensor, self.pin)
if humidity is not None and temperature is not None:
self.last_read = {'temperature': temperature, 'humidity': humidity}
self.last_read = {"temperature": temperature, "humidity": humidity}
return self.last_read
else:
return None
def get_last_read(self):
if self.last_read is None:
self.read()
return self.last_read

View file

@ -0,0 +1,7 @@
from handler.climate.climate import ClimateAdapter
class DummyClimate(ClimateAdapter):
def read(self):
self.last_read = {"temperature": 22.3, "humidity": 50.1}
return self.last_read

View file

@ -0,0 +1,40 @@
from handler.matrix.matrix_display import MatrixDisplayAdapter
import logging
class DummyMatrixDisplay(MatrixDisplayAdapter):
def __init__(self, *, contrast=0, text_speed=0.02) -> None:
self.contrast = contrast
self.text_speed = text_speed
logging.info("DummyMatrixDisplay: Initialized")
def set_contrast(self, contrast: int):
"""Set contrast for all actions.
Args:
contrast (int): [0, 255]
"""
self.contrast = contrast
logging.info(f"DummyMatrixDisplay: setting contrast to {contrast}")
def show_text(self, text):
logging.info(f"DummyMatrixDisplay: Showing text '{text}'")
def flash(self, count=1, contrast=None):
if contrast:
logging.info(
f"DummyMatrixDisplay: flashing {count} times with contrast {contrast}"
)
else:
logging.info(
f"DummyMatrixDisplay: flashing {count} times with general contrast"
)
def turn_off(self):
logging.info(f"DummyMatrixDisplay: turning off")
def turn_full(self):
logging.info(f"DummyMatrixDisplay: turning full")
def show_current_time(self):
logging.info(f"DummyMatrixDisplay: showing time")

View file

@ -5,10 +5,12 @@ from luma.core.render import canvas
from luma.led_matrix.device import max7219
from luma.core import legacy
from luma.core.virtual import viewport
from luma.core.legacy.font import proportional, CP437_FONT, LCD_FONT
from luma.core.legacy.font import proportional, CP437_FONT
from handler.matrix.matrix_display import MatrixDisplayAdapter
class MatrixDisplay:
class LumaMatrixDisplay(MatrixDisplayAdapter):
def __init__(self, *, contrast=0, text_speed=0.02) -> None:
self.contrast = contrast
self.text_speed = text_speed
@ -25,7 +27,7 @@ class MatrixDisplay:
Args:
contrast (int): [0, 255]
"""
self.contrast = contrast
super().set_contrast(contrast)
self.device.contrast(self.contrast)
def show_text(self, text):

View file

@ -0,0 +1,27 @@
class MatrixDisplayAdapter:
def __init__(self, *, contrast=0, text_speed=0.02) -> None:
self.contrast = contrast
self.text_speed = text_speed
def set_contrast(self, contrast: int):
"""Set contrast for all actions.
Args:
contrast (int): [0, 255]
"""
self.contrast = contrast
def show_text(self, text):
raise NotImplementedError()
def flash(self, count=1, contrast=None):
raise NotImplementedError()
def turn_off(self):
raise NotImplementedError()
def turn_full(self):
raise NotImplementedError()
def show_current_time(self):
raise NotImplementedError()

View file

@ -1,17 +1,18 @@
import asyncio
import logging
from typing import Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from actions import dht22_sensor, display_time, log_temperature, matrix_display
from actions import climate_sensor, display_time, log_temperature, matrix_display
from config import climate_log_file
from handler.action_queue import ActionQueue
from handler.history import get_recent_entries
queue = ActionQueue()
queue.set_idle_action(display_time)
logging.getLogger().setLevel(logging.INFO)
queue = ActionQueue()
app = FastAPI()
origins = [
@ -30,8 +31,6 @@ app.add_middleware(
allow_headers=["*"],
)
asyncio.create_task(log_temperature())
@app.post("/time")
async def start_time_loop():
@ -53,7 +52,7 @@ async def turn_off():
@app.post("/temperature")
async def temperature():
measurements = dht22_sensor.get_last_read()
measurements = climate_sensor.get_last_read()
if measurements is None:
return {"message": "Failed to read temperature"}
@ -66,7 +65,7 @@ async def temperature():
@app.post("/humidity")
async def humidity():
measurements = dht22_sensor.get_last_read()
measurements = climate_sensor.get_last_read()
if measurements is None:
return {"message": "Failed to read humidity"}
@ -104,3 +103,12 @@ async def display_message(body: dict):
message_text = body.get("message")
await queue.add_action_to_queue(matrix_display.show_text, message_text)
return {"message": "Message displayed"}
async def main():
await queue.set_idle_action(display_time)
if __name__ == "__main__":
asyncio.create_task(main())
asyncio.create_task(log_temperature())