matrix-clock/src/main.py
2023-10-17 17:57:32 +02:00

169 lines
4.4 KiB
Python

import os
from fastapi import FastAPI, HTTPException
from datetime import datetime
import requests
from matrix import MatrixDisplay
from fastapi.middleware.cors import CORSMiddleware
import asyncio
from climate import Dht22Sensor
app = FastAPI()
origins = [
"http://localhost",
"http://localhost:8000",
"http://raspberrypi",
"http://192.168.178.54"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
should_run_time_loop = True
dht22_pin = 17
climate_log_file = "./climate.csv"
matrix_display = MatrixDisplay()
dht22_sensor = Dht22Sensor(dht22_pin)
# Start background service to log temperature and humidity every minute
async def log_temperature():
# If file does not exist, create it and write header
if not os.path.isfile(climate_log_file):
with open(climate_log_file, "w") as f:
f.write("timestamp,temperature,humidity\n")
while True:
measurements = dht22_sensor.read()
if measurements is not None:
with open(climate_log_file, "a") as f:
f.write("{},{},{}\n".format(
datetime.now().isoformat(),
measurements["temperature"],
measurements["humidity"]
))
await asyncio.sleep(60)
async def display_time():
while should_run_time_loop:
try:
matrix_display.show_current_time()
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display time on the matrix display: {e}",
)
seconds_until_next_minute = 60 - datetime.now().second
await asyncio.sleep(seconds_until_next_minute)
asyncio.create_task(display_time())
asyncio.create_task(log_temperature())
@app.post("/time")
async def start_time_loop():
global should_run_time_loop
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Time loop started"}
@app.post("/off")
async def turn_off():
global should_run_time_loop
should_run_time_loop = False
matrix_display.turn_off()
return {"message": "Display turned off"}
@app.post("/temperature")
async def temperature():
measurements = dht22_sensor.get_last_read()
if measurements is None:
return {"message": "Failed to read temperature"}
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
matrix_display.show_text("{0:0.1f}*C".format(measurements["temperature"]))
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return measurements
@app.post("/humidity")
async def humidity():
measurements = dht22_sensor.get_last_read()
if measurements is None:
return {"message": "Failed to read humidity"}
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
matrix_display.show_text("{0:0.1f}%".format(measurements["humidity"]))
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return measurements
@app.post("/flash")
async def flash(count: int = 1):
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
matrix_display.flash(count)
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Display flashed"}
@app.post("/message")
async def display_message(body: dict):
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
message_text = body.get("message")
try:
matrix_display.show_text(message_text)
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display message on the matrix display: {e}",
)
finally:
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Message displayed"}
@app.post("/stop")
async def stop_time_loop():
global should_run_time_loop
should_run_time_loop = False
return {"message": "Time loop stopped"}