matrix-clock/src/main.py

174 lines
4.6 KiB
Python
Raw Normal View History

2023-10-17 17:57:32 +02:00
import os
2023-10-11 01:34:34 +02:00
from fastapi import FastAPI, HTTPException
from datetime import datetime
import requests
2023-12-17 21:04:39 +01:00
from handler.history import get_recent_entries
from handler.matrix import MatrixDisplay
2023-10-11 02:06:38 +02:00
from fastapi.middleware.cors import CORSMiddleware
2023-10-11 01:34:34 +02:00
import asyncio
2023-12-17 21:04:39 +01:00
from handler.climate import Dht22Sensor
2023-10-11 01:34:34 +02:00
app = FastAPI()
2023-10-11 02:06:38 +02:00
origins = [
"http://localhost",
"http://localhost:8000",
"http://raspberrypi",
2023-12-17 21:04:39 +01:00
"http://192.168.178.84",
2023-10-11 02:06:38 +02:00
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
2023-10-11 01:34:34 +02:00
should_run_time_loop = True
2023-10-17 00:33:16 +02:00
dht22_pin = 17
2023-10-17 17:57:32 +02:00
climate_log_file = "./climate.csv"
2023-10-17 00:10:23 +02:00
matrix_display = MatrixDisplay()
dht22_sensor = Dht22Sensor(dht22_pin)
2023-10-11 01:34:34 +02:00
2023-12-17 21:04:39 +01:00
2023-10-17 17:57:32 +02:00
# Start background service to log temperature and humidity every minute
async def log_temperature():
# If file does not exist, create it and write header
if not os.path.isfile(climate_log_file):
with open(climate_log_file, "w") as f:
f.write("timestamp,temperature,humidity\n")
2023-12-17 21:04:39 +01:00
2023-10-17 17:57:32 +02:00
while True:
measurements = dht22_sensor.read()
if measurements is not None:
with open(climate_log_file, "a") as f:
2023-12-17 21:04:39 +01:00
f.write(
"{},{},{}\n".format(
datetime.now().isoformat(),
measurements["temperature"],
measurements["humidity"],
)
)
2023-10-17 17:57:32 +02:00
await asyncio.sleep(60)
2023-10-11 01:34:34 +02:00
2023-12-17 21:04:39 +01:00
2023-10-11 01:34:34 +02:00
async def display_time():
while should_run_time_loop:
try:
matrix_display.show_current_time()
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display time on the matrix display: {e}",
)
seconds_until_next_minute = 60 - datetime.now().second
await asyncio.sleep(seconds_until_next_minute)
2023-10-11 01:48:50 +02:00
asyncio.create_task(display_time())
2023-10-17 17:57:32 +02:00
asyncio.create_task(log_temperature())
2023-10-11 01:48:50 +02:00
2023-10-11 01:34:34 +02:00
@app.post("/time")
async def start_time_loop():
global should_run_time_loop
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Time loop started"}
2023-10-11 01:36:27 +02:00
@app.post("/off")
async def turn_off():
global should_run_time_loop
should_run_time_loop = False
2023-10-11 01:46:13 +02:00
matrix_display.turn_off()
2023-10-11 01:36:27 +02:00
return {"message": "Display turned off"}
2023-10-17 00:10:23 +02:00
@app.post("/temperature")
async def temperature():
2023-10-17 17:57:32 +02:00
measurements = dht22_sensor.get_last_read()
2023-10-17 00:10:23 +02:00
if measurements is None:
return {"message": "Failed to read temperature"}
2023-12-17 21:04:39 +01:00
2023-10-17 00:10:23 +02:00
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
2023-12-17 21:04:39 +01:00
2023-10-17 00:36:34 +02:00
matrix_display.show_text("{0:0.1f}*C".format(measurements["temperature"]))
2023-12-17 21:04:39 +01:00
2023-10-17 00:10:23 +02:00
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
2023-12-17 21:04:39 +01:00
2023-10-17 00:10:23 +02:00
return measurements
@app.post("/humidity")
async def humidity():
2023-10-17 17:57:32 +02:00
measurements = dht22_sensor.get_last_read()
2023-10-17 00:10:23 +02:00
if measurements is None:
return {"message": "Failed to read humidity"}
2023-12-17 21:04:39 +01:00
2023-10-17 00:10:23 +02:00
global should_run_time_loop
was_clock_runnign = should_run_time_loop
should_run_time_loop = False
2023-12-17 21:04:39 +01:00
2023-10-17 00:36:34 +02:00
matrix_display.show_text("{0:0.1f}%".format(measurements["humidity"]))
2023-12-17 21:04:39 +01:00
2023-10-17 00:10:23 +02:00
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
2023-12-17 21:04:39 +01:00
2023-10-17 00:10:23 +02:00
return measurements
2023-10-25 17:44:45 +02:00
@app.post("/history")
async def history():
day_entry_count = 24 * 60
2023-10-25 17:45:45 +02:00
return get_recent_entries(climate_log_file, day_entry_count)
2023-10-25 17:44:45 +02:00
2023-10-11 01:36:27 +02:00
@app.post("/flash")
async def flash(count: int = 1):
global should_run_time_loop
2023-10-11 01:51:32 +02:00
was_clock_runnign = should_run_time_loop
2023-10-11 01:36:27 +02:00
should_run_time_loop = False
2023-12-17 21:04:39 +01:00
2023-10-11 01:46:13 +02:00
matrix_display.flash(count)
2023-12-17 21:04:39 +01:00
2023-10-11 01:51:32 +02:00
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
2023-10-11 01:36:27 +02:00
return {"message": "Display flashed"}
2023-10-11 01:34:34 +02:00
@app.post("/message")
async def display_message(body: dict):
global should_run_time_loop
2023-10-11 01:52:55 +02:00
was_clock_runnign = should_run_time_loop
2023-10-11 01:34:34 +02:00
should_run_time_loop = False
message_text = body.get("message")
try:
2023-10-11 01:46:13 +02:00
matrix_display.show_text(message_text)
2023-10-11 01:34:34 +02:00
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display message on the matrix display: {e}",
)
2023-10-11 01:52:55 +02:00
finally:
if was_clock_runnign:
should_run_time_loop = True
asyncio.create_task(display_time())
2023-10-11 01:34:34 +02:00
return {"message": "Message displayed"}
@app.post("/stop")
async def stop_time_loop():
global should_run_time_loop
should_run_time_loop = False
return {"message": "Time loop stopped"}