matrix-clock/src/main.py

121 lines
2.9 KiB
Python

import asyncio
import logging
from typing import Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from actions import (
climate_sensor,
display_time,
log_temperature,
matrix_display,
display_pattern,
)
from config import climate_log_file
from handler.action_queue import ActionQueue
from handler.history import get_recent_entries
logging.getLogger().setLevel(logging.INFO)
# Start services
asyncio.create_task(log_temperature())
queue = ActionQueue(matrix_display.show_current_time)
app = FastAPI()
origins = [
"http://localhost",
"http://localhost:8000",
"http://raspberrypi",
"http://192.168.178.84:8000",
"http://192.168.178.84",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/time")
async def start_time_loop():
await queue.set_idle_action(display_time)
return {"message": "Time loop started"}
@app.post("/full")
async def turn_full():
await queue.set_idle_action(matrix_display.turn_full)
return {"message": "Full screen turned on"}
@app.post("/off")
async def turn_off():
await queue.set_idle_action(matrix_display.turn_off)
return {"message": "Display turned off"}
@app.post("/temperature")
async def temperature():
measurements = climate_sensor.get_last_read()
if measurements is None:
return {"message": "Failed to read temperature"}
await queue.add_action_to_queue(
matrix_display.show_text, "{0:0.1f}*C".format(measurements["temperature"])
)
return measurements
@app.post("/humidity")
async def humidity():
measurements = climate_sensor.get_last_read()
if measurements is None:
return {"message": "Failed to read humidity"}
await queue.add_action_to_queue(
matrix_display.show_text, "{0:0.1f}%".format(measurements["humidity"])
)
return measurements
@app.post("/history")
async def history():
day_entry_count = 24 * 60
return get_recent_entries(climate_log_file, day_entry_count)
@app.post("/flash")
async def flash(count: int = 1, contrast: Optional[int] = None):
await queue.add_action_to_queue(
matrix_display.flash, count=count, contrast=contrast
)
return {"message": "Display flashed"}
@app.post("/pattern")
async def flash(pattern: str = "01", step_ms: int = 500):
await queue.set_idle_action(display_pattern, pattern=pattern, step_ms=step_ms)
return {"message": "Activated pattern."}
@app.post("/contrast")
async def contrast(contrast: Optional[int] = None):
if contrast:
matrix_display.set_contrast(contrast)
return {"contrast": matrix_display.contrast}
@app.post("/message")
async def display_message(body: dict):
message_text = body.get("message")
await queue.add_action_to_queue(matrix_display.show_text, message_text)
return {"message": "Message displayed"}