matrix-clock/src/main.py

107 lines
2.5 KiB
Python
Raw Normal View History

import asyncio
from typing import Optional
2023-10-11 01:34:34 +02:00
import requests
from fastapi import FastAPI, HTTPException
2023-10-11 02:06:38 +02:00
from fastapi.middleware.cors import CORSMiddleware
2023-10-11 01:34:34 +02:00
from actions import dht22_sensor, display_time, log_temperature, matrix_display
from config import climate_log_file
from handler.action_queue import ActionQueue
from handler.history import get_recent_entries
2023-12-23 18:57:55 +01:00
queue = ActionQueue()
queue.set_idle_action(display_time)
2023-10-11 01:34:34 +02:00
app = FastAPI()
2023-10-11 02:06:38 +02:00
origins = [
"http://localhost",
"http://localhost:8000",
"http://raspberrypi",
"http://192.168.178.84:8000",
2023-12-17 21:04:39 +01:00
"http://192.168.178.84",
2023-10-11 02:06:38 +02:00
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
2023-10-17 17:57:32 +02:00
asyncio.create_task(log_temperature())
2023-10-11 01:48:50 +02:00
2023-10-11 01:34:34 +02:00
@app.post("/time")
async def start_time_loop():
await queue.set_idle_action(display_time)
2023-10-11 01:34:34 +02:00
return {"message": "Time loop started"}
@app.post("/full")
async def turn_full():
await queue.set_idle_action(matrix_display.turn_full)
return {"message": "Full screen turned on"}
2023-10-11 01:36:27 +02:00
@app.post("/off")
async def turn_off():
await queue.set_idle_action(matrix_display.turn_off)
2023-10-11 01:36:27 +02:00
return {"message": "Display turned off"}
2023-10-17 00:10:23 +02:00
@app.post("/temperature")
async def temperature():
2023-10-17 17:57:32 +02:00
measurements = dht22_sensor.get_last_read()
2023-10-17 00:10:23 +02:00
if measurements is None:
return {"message": "Failed to read temperature"}
2023-12-17 21:04:39 +01:00
await queue.add_action_to_queue(
matrix_display.show_text, "{0:0.1f}*C".format(measurements["temperature"])
)
2023-12-17 21:04:39 +01:00
2023-10-17 00:10:23 +02:00
return measurements
@app.post("/humidity")
async def humidity():
2023-10-17 17:57:32 +02:00
measurements = dht22_sensor.get_last_read()
2023-10-17 00:10:23 +02:00
if measurements is None:
return {"message": "Failed to read humidity"}
2023-12-17 21:04:39 +01:00
await queue.add_action_to_queue(
matrix_display.show_text, "{0:0.1f}%".format(measurements["humidity"])
)
2023-12-17 21:04:39 +01:00
2023-10-17 00:10:23 +02:00
return measurements
2023-10-25 17:44:45 +02:00
@app.post("/history")
async def history():
day_entry_count = 24 * 60
2023-10-25 17:45:45 +02:00
return get_recent_entries(climate_log_file, day_entry_count)
2023-10-25 17:44:45 +02:00
2023-10-11 01:36:27 +02:00
@app.post("/flash")
async def flash(count: int = 1):
await queue.add_action_to_queue(matrix_display.flash, count)
2023-12-17 21:04:39 +01:00
2023-10-11 01:36:27 +02:00
return {"message": "Display flashed"}
@app.post("/contrast")
async def contrast(contrast: Optional[int] = None):
if contrast:
matrix_display.set_contrast(contrast)
return {"contrast": matrix_display.contrast}
2023-10-11 01:34:34 +02:00
@app.post("/message")
async def display_message(body: dict):
message_text = body.get("message")
await queue.add_action_to_queue(matrix_display.show_text, message_text)
2023-10-11 01:34:34 +02:00
return {"message": "Message displayed"}