matrix-clock/src/main.py

106 lines
2.6 KiB
Python

import asyncio
from typing import Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from actions import dht22_sensor, display_time, log_temperature, matrix_display
from config import climate_log_file
from handler.action_queue import ActionQueue
from handler.history import get_recent_entries
queue = ActionQueue()
queue.set_idle_action(display_time)
app = FastAPI()
origins = [
"http://localhost",
"http://localhost:8000",
"http://raspberrypi",
"http://192.168.178.84:8000",
"http://192.168.178.84",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
asyncio.create_task(log_temperature())
@app.post("/time")
async def start_time_loop():
await queue.set_idle_action(display_time)
return {"message": "Time loop started"}
@app.post("/full")
async def turn_full():
await queue.set_idle_action(matrix_display.turn_full)
return {"message": "Full screen turned on"}
@app.post("/off")
async def turn_off():
await queue.set_idle_action(matrix_display.turn_off)
return {"message": "Display turned off"}
@app.post("/temperature")
async def temperature():
measurements = dht22_sensor.get_last_read()
if measurements is None:
return {"message": "Failed to read temperature"}
await queue.add_action_to_queue(
matrix_display.show_text, "{0:0.1f}*C".format(measurements["temperature"])
)
return measurements
@app.post("/humidity")
async def humidity():
measurements = dht22_sensor.get_last_read()
if measurements is None:
return {"message": "Failed to read humidity"}
await queue.add_action_to_queue(
matrix_display.show_text, "{0:0.1f}%".format(measurements["humidity"])
)
return measurements
@app.post("/history")
async def history():
day_entry_count = 24 * 60
return get_recent_entries(climate_log_file, day_entry_count)
@app.post("/flash")
async def flash(count: int = 1, contrast: Optional[int] = None):
await queue.add_action_to_queue(
matrix_display.flash, count=count, contrast=contrast
)
return {"message": "Display flashed"}
@app.post("/contrast")
async def contrast(contrast: Optional[int] = None):
if contrast:
matrix_display.set_contrast(contrast)
return {"contrast": matrix_display.contrast}
@app.post("/message")
async def display_message(body: dict):
message_text = body.get("message")
await queue.add_action_to_queue(matrix_display.show_text, message_text)
return {"message": "Message displayed"}