175 lines
4.5 KiB
Python
175 lines
4.5 KiB
Python
import os
|
|
from fastapi import FastAPI, HTTPException
|
|
from datetime import datetime
|
|
import requests
|
|
from matrix import MatrixDisplay
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
import asyncio
|
|
from climate import Dht22Sensor
|
|
|
|
app = FastAPI()
|
|
|
|
origins = [
|
|
"http://localhost",
|
|
"http://localhost:8000",
|
|
"http://raspberrypi",
|
|
"http://192.168.178.54"
|
|
]
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=origins,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
should_run_time_loop = True
|
|
dht22_pin = 17
|
|
climate_log_file = "./climate.csv"
|
|
|
|
matrix_display = MatrixDisplay()
|
|
dht22_sensor = Dht22Sensor(dht22_pin)
|
|
|
|
# Start background service to log temperature and humidity every minute
|
|
async def log_temperature():
|
|
# If file does not exist, create it and write header
|
|
if not os.path.isfile(climate_log_file):
|
|
with open(climate_log_file, "w") as f:
|
|
f.write("timestamp,temperature,humidity\n")
|
|
|
|
while True:
|
|
measurements = dht22_sensor.read()
|
|
if measurements is not None:
|
|
with open(climate_log_file, "a") as f:
|
|
f.write("{},{},{}\n".format(
|
|
datetime.now().isoformat(),
|
|
measurements["temperature"],
|
|
measurements["humidity"]
|
|
))
|
|
await asyncio.sleep(60)
|
|
|
|
async def display_time():
|
|
while should_run_time_loop:
|
|
try:
|
|
matrix_display.show_current_time()
|
|
except requests.exceptions.RequestException as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to display time on the matrix display: {e}",
|
|
)
|
|
|
|
seconds_until_next_minute = 60 - datetime.now().second
|
|
await asyncio.sleep(seconds_until_next_minute)
|
|
|
|
|
|
|
|
asyncio.create_task(display_time())
|
|
asyncio.create_task(log_temperature())
|
|
|
|
|
|
|
|
@app.post("/time")
|
|
async def start_time_loop():
|
|
global should_run_time_loop
|
|
should_run_time_loop = True
|
|
asyncio.create_task(display_time())
|
|
return {"message": "Time loop started"}
|
|
|
|
|
|
@app.post("/off")
|
|
async def turn_off():
|
|
global should_run_time_loop
|
|
should_run_time_loop = False
|
|
matrix_display.turn_off()
|
|
return {"message": "Display turned off"}
|
|
|
|
|
|
|
|
@app.post("/temperature")
|
|
async def temperature():
|
|
measurements = dht22_sensor.get_last_read()
|
|
if measurements is None:
|
|
return {"message": "Failed to read temperature"}
|
|
|
|
|
|
global should_run_time_loop
|
|
was_clock_runnign = should_run_time_loop
|
|
should_run_time_loop = False
|
|
|
|
|
|
matrix_display.show_text("{0:0.1f}*C".format(measurements["temperature"]))
|
|
|
|
if was_clock_runnign:
|
|
should_run_time_loop = True
|
|
asyncio.create_task(display_time())
|
|
|
|
return measurements
|
|
|
|
|
|
@app.post("/humidity")
|
|
async def humidity():
|
|
measurements = dht22_sensor.get_last_read()
|
|
if measurements is None:
|
|
return {"message": "Failed to read humidity"}
|
|
|
|
|
|
global should_run_time_loop
|
|
was_clock_runnign = should_run_time_loop
|
|
should_run_time_loop = False
|
|
|
|
|
|
matrix_display.show_text("{0:0.1f}%".format(measurements["humidity"]))
|
|
|
|
if was_clock_runnign:
|
|
should_run_time_loop = True
|
|
asyncio.create_task(display_time())
|
|
|
|
return measurements
|
|
|
|
|
|
@app.post("/history")
|
|
async def history():
|
|
day_entry_count = 24 * 60
|
|
return get_recent_entries("history.csv", day_entry_count)
|
|
|
|
|
|
@app.post("/flash")
|
|
async def flash(count: int = 1):
|
|
global should_run_time_loop
|
|
was_clock_runnign = should_run_time_loop
|
|
should_run_time_loop = False
|
|
|
|
matrix_display.flash(count)
|
|
|
|
if was_clock_runnign:
|
|
should_run_time_loop = True
|
|
asyncio.create_task(display_time())
|
|
return {"message": "Display flashed"}
|
|
|
|
|
|
@app.post("/message")
|
|
async def display_message(body: dict):
|
|
global should_run_time_loop
|
|
was_clock_runnign = should_run_time_loop
|
|
should_run_time_loop = False
|
|
message_text = body.get("message")
|
|
try:
|
|
matrix_display.show_text(message_text)
|
|
except requests.exceptions.RequestException as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to display message on the matrix display: {e}",
|
|
)
|
|
finally:
|
|
if was_clock_runnign:
|
|
should_run_time_loop = True
|
|
asyncio.create_task(display_time())
|
|
return {"message": "Message displayed"}
|
|
|
|
|
|
@app.post("/stop")
|
|
async def stop_time_loop():
|
|
global should_run_time_loop
|
|
should_run_time_loop = False
|
|
return {"message": "Time loop stopped"}
|