matrix-clock/src/main.py

150 lines
3.6 KiB
Python
Raw Normal View History

2023-10-11 01:34:34 +02:00
from fastapi import FastAPI, HTTPException
from datetime import datetime
import requests
from matrix import MatrixDisplay
2023-10-11 02:06:38 +02:00
from fastapi.middleware.cors import CORSMiddleware
2023-10-11 01:34:34 +02:00
import asyncio
2023-10-17 00:10:23 +02:00
from climate import Dht22Sensor
2023-10-11 01:34:34 +02:00
app = FastAPI()
2023-10-11 02:06:38 +02:00
origins = [
"http://localhost",
"http://localhost:8000",
"http://raspberrypi",
"http://192.168.178.54"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
2023-10-11 01:34:34 +02:00
should_run_time_loop = True
2023-10-17 00:33:16 +02:00
dht22_pin = 17
2023-10-17 00:10:23 +02:00
matrix_display = MatrixDisplay()
dht22_sensor = Dht22Sensor(dht22_pin)
2023-10-11 01:34:34 +02:00
async def display_time():
while should_run_time_loop:
try:
matrix_display.show_current_time()
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display time on the matrix display: {e}",
)
seconds_until_next_minute = 60 - datetime.now().second
await asyncio.sleep(seconds_until_next_minute)
2023-10-11 01:48:50 +02:00
asyncio.create_task(display_time())
2023-10-11 01:34:34 +02:00
@app.post("/time")
async def start_time_loop():
global should_run_time_loop
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Time loop started"}
2023-10-11 01:36:27 +02:00
@app.post("/off")
async def turn_off():
global should_run_time_loop
should_run_time_loop = False
2023-10-11 01:46:13 +02:00
matrix_display.turn_off()
2023-10-11 01:36:27 +02:00
return {"message": "Display turned off"}
2023-10-17 00:10:23 +02:00
@app.post("/temperature")
async def temperature():
measurements = dht22_sensor.read()
if measurements is None:
return {"message": "Failed to read temperature"}
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
2023-10-17 00:36:34 +02:00
matrix_display.show_text("{0:0.1f}*C".format(measurements["temperature"]))
2023-10-17 00:10:23 +02:00
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return measurements
@app.post("/humidity")
async def humidity():
measurements = dht22_sensor.read()
if measurements is None:
return {"message": "Failed to read humidity"}
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
2023-10-17 00:36:34 +02:00
matrix_display.show_text("{0:0.1f}%".format(measurements["humidity"]))
2023-10-17 00:10:23 +02:00
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return measurements
2023-10-11 01:36:27 +02:00
@app.post("/flash")
async def flash(count: int = 1):
global should_run_time_loop
2023-10-11 01:51:32 +02:00
was_clock_runnign = should_run_time_loop
2023-10-11 01:36:27 +02:00
should_run_time_loop = False
2023-10-11 01:51:32 +02:00
2023-10-11 01:46:13 +02:00
matrix_display.flash(count)
2023-10-11 01:51:32 +02:00
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
2023-10-11 01:36:27 +02:00
return {"message": "Display flashed"}
2023-10-11 01:34:34 +02:00
@app.post("/message")
async def display_message(body: dict):
global should_run_time_loop
2023-10-11 01:52:55 +02:00
was_clock_runnign = should_run_time_loop
2023-10-11 01:34:34 +02:00
should_run_time_loop = False
message_text = body.get("message")
try:
2023-10-11 01:46:13 +02:00
matrix_display.show_text(message_text)
2023-10-11 01:34:34 +02:00
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display message on the matrix display: {e}",
)
2023-10-11 01:52:55 +02:00
finally:
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
2023-10-11 01:34:34 +02:00
return {"message": "Message displayed"}
@app.post("/stop")
async def stop_time_loop():
global should_run_time_loop
should_run_time_loop = False
return {"message": "Time loop stopped"}