Removed unnecessary abstraction; Implemented pattern endpoint
This commit is contained in:
parent
30571d2439
commit
075e7d277a
8 changed files with 47 additions and 114 deletions
|
@ -1,34 +1,14 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
|
||||||
import os
|
import os
|
||||||
from config import climate_log_file, dht22_pin
|
from config import climate_log_file, dht22_pin
|
||||||
|
|
||||||
from handler.matrix.matrix_display import MatrixDisplayAdapter
|
from .handler.dht22_climate import Dht22Climate
|
||||||
from handler.climate.climate import ClimateAdapter
|
from .handler.matrix_display import MatrixDisplay
|
||||||
|
|
||||||
climate_sensor: ClimateAdapter = None
|
|
||||||
matrix_display: MatrixDisplayAdapter = None
|
|
||||||
|
|
||||||
try:
|
climate_sensor = Dht22Climate(dht22_pin)
|
||||||
from handler.matrix.luma_matrix_display import LumaMatrixDisplay
|
matrix_display = MatrixDisplay()
|
||||||
|
|
||||||
matrix_display = LumaMatrixDisplay()
|
|
||||||
except:
|
|
||||||
from handler.matrix.dummy_matrix_display import DummyMatrixDisplay
|
|
||||||
|
|
||||||
logging.info("Using Dummy Matrix Display")
|
|
||||||
matrix_display = DummyMatrixDisplay()
|
|
||||||
|
|
||||||
try:
|
|
||||||
from handler.climate.dht22_climate import Dht22Climate
|
|
||||||
|
|
||||||
climate_sensor = Dht22Climate(dht22_pin)
|
|
||||||
except:
|
|
||||||
from handler.climate.dummy_climate import DummyClimate
|
|
||||||
|
|
||||||
logging.info("Using Dummy Climate")
|
|
||||||
climate_sensor = DummyClimate()
|
|
||||||
|
|
||||||
|
|
||||||
async def log_temperature():
|
async def log_temperature():
|
||||||
|
@ -57,3 +37,8 @@ async def display_time():
|
||||||
|
|
||||||
seconds_until_next_minute = 60 - datetime.now().second
|
seconds_until_next_minute = 60 - datetime.now().second
|
||||||
await asyncio.sleep(seconds_until_next_minute)
|
await asyncio.sleep(seconds_until_next_minute)
|
||||||
|
|
||||||
|
|
||||||
|
async def display_pattern(*args, **kwargs):
|
||||||
|
while True:
|
||||||
|
await matrix_display.pattern(*args, **kwargs)
|
||||||
|
|
|
@ -1,11 +0,0 @@
|
||||||
class ClimateAdapter:
|
|
||||||
def __init__(self):
|
|
||||||
self.last_read = None
|
|
||||||
|
|
||||||
def read(self):
|
|
||||||
raise NotImplemented()
|
|
||||||
|
|
||||||
def get_last_read(self):
|
|
||||||
if self.last_read is None:
|
|
||||||
return self.read()
|
|
||||||
return self.last_read
|
|
|
@ -1,7 +0,0 @@
|
||||||
from handler.climate.climate import ClimateAdapter
|
|
||||||
|
|
||||||
|
|
||||||
class DummyClimate(ClimateAdapter):
|
|
||||||
def read(self):
|
|
||||||
self.last_read = {"temperature": 22.3, "humidity": 50.1}
|
|
||||||
return self.last_read
|
|
|
@ -1,8 +1,7 @@
|
||||||
import Adafruit_DHT
|
import Adafruit_DHT
|
||||||
from handler.climate.climate import ClimateAdapter
|
|
||||||
|
|
||||||
|
|
||||||
class Dht22Climate(ClimateAdapter):
|
class Dht22Climate():
|
||||||
def __init__(self, pin):
|
def __init__(self, pin):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.sensor = Adafruit_DHT.AM2302
|
self.sensor = Adafruit_DHT.AM2302
|
|
@ -1,40 +0,0 @@
|
||||||
from handler.matrix.matrix_display import MatrixDisplayAdapter
|
|
||||||
import logging
|
|
||||||
|
|
||||||
|
|
||||||
class DummyMatrixDisplay(MatrixDisplayAdapter):
|
|
||||||
def __init__(self, *, contrast=0, text_speed=0.02) -> None:
|
|
||||||
self.contrast = contrast
|
|
||||||
self.text_speed = text_speed
|
|
||||||
logging.info("DummyMatrixDisplay: Initialized")
|
|
||||||
|
|
||||||
def set_contrast(self, contrast: int):
|
|
||||||
"""Set contrast for all actions.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
contrast (int): [0, 255]
|
|
||||||
"""
|
|
||||||
self.contrast = contrast
|
|
||||||
logging.info(f"DummyMatrixDisplay: setting contrast to {contrast}")
|
|
||||||
|
|
||||||
def show_text(self, text):
|
|
||||||
logging.info(f"DummyMatrixDisplay: Showing text '{text}'")
|
|
||||||
|
|
||||||
def flash(self, count=1, contrast=None):
|
|
||||||
if contrast:
|
|
||||||
logging.info(
|
|
||||||
f"DummyMatrixDisplay: flashing {count} times with contrast {contrast}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logging.info(
|
|
||||||
f"DummyMatrixDisplay: flashing {count} times with general contrast"
|
|
||||||
)
|
|
||||||
|
|
||||||
def turn_off(self):
|
|
||||||
logging.info(f"DummyMatrixDisplay: turning off")
|
|
||||||
|
|
||||||
def turn_full(self):
|
|
||||||
logging.info(f"DummyMatrixDisplay: turning full")
|
|
||||||
|
|
||||||
def show_current_time(self):
|
|
||||||
logging.info(f"DummyMatrixDisplay: showing time")
|
|
|
@ -1,27 +0,0 @@
|
||||||
class MatrixDisplayAdapter:
|
|
||||||
def __init__(self, *, contrast=0, text_speed=0.02) -> None:
|
|
||||||
self.contrast = contrast
|
|
||||||
self.text_speed = text_speed
|
|
||||||
|
|
||||||
def set_contrast(self, contrast: int):
|
|
||||||
"""Set contrast for all actions.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
contrast (int): [0, 255]
|
|
||||||
"""
|
|
||||||
self.contrast = contrast
|
|
||||||
|
|
||||||
def show_text(self, text):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def flash(self, count=1, contrast=None):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def turn_off(self):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def turn_full(self):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def show_current_time(self):
|
|
||||||
raise NotImplementedError()
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from luma.core.interface.serial import spi, noop
|
from luma.core.interface.serial import spi, noop
|
||||||
|
@ -7,10 +8,8 @@ from luma.core import legacy
|
||||||
from luma.core.virtual import viewport
|
from luma.core.virtual import viewport
|
||||||
from luma.core.legacy.font import proportional, CP437_FONT
|
from luma.core.legacy.font import proportional, CP437_FONT
|
||||||
|
|
||||||
from handler.matrix.matrix_display import MatrixDisplayAdapter
|
|
||||||
|
|
||||||
|
class MatrixDisplay:
|
||||||
class LumaMatrixDisplay(MatrixDisplayAdapter):
|
|
||||||
def __init__(self, *, contrast=0, text_speed=0.02) -> None:
|
def __init__(self, *, contrast=0, text_speed=0.02) -> None:
|
||||||
self.contrast = contrast
|
self.contrast = contrast
|
||||||
self.text_speed = text_speed
|
self.text_speed = text_speed
|
||||||
|
@ -69,6 +68,33 @@ class LumaMatrixDisplay(MatrixDisplayAdapter):
|
||||||
with canvas(self.device) as draw:
|
with canvas(self.device) as draw:
|
||||||
draw.rectangle((0, 0, 31, 7), outline="white", fill="white")
|
draw.rectangle((0, 0, 31, 7), outline="white", fill="white")
|
||||||
|
|
||||||
|
async def pattern(self, pattern: str = "0,1", off_ms: int = 500, on_ms: int = 500):
|
||||||
|
# Preprocess pattern
|
||||||
|
pattern_steps = [step.strip() == "1" for step in pattern.split(",")]
|
||||||
|
|
||||||
|
class SimplifiedStep:
|
||||||
|
state: bool
|
||||||
|
size: int
|
||||||
|
|
||||||
|
def __init__(self, state: bool) -> None:
|
||||||
|
self.state = state
|
||||||
|
self.size = 1
|
||||||
|
|
||||||
|
pattern_simplified: list[SimplifiedStep] = []
|
||||||
|
for step in pattern_steps:
|
||||||
|
if len(pattern_simplified) == 0 or step != pattern_simplified[-1].state:
|
||||||
|
pattern_simplified.append(SimplifiedStep(step, off_ms, on_ms))
|
||||||
|
else:
|
||||||
|
pattern_simplified[-1].size += 1
|
||||||
|
|
||||||
|
# Execute pattern
|
||||||
|
for step in pattern_simplified:
|
||||||
|
if step.state:
|
||||||
|
self.turn_full()
|
||||||
|
else:
|
||||||
|
self.turn_off()
|
||||||
|
await asyncio.sleep(step.size * (on_ms if step.state else off_ms) / 1000)
|
||||||
|
|
||||||
def show_current_time(self):
|
def show_current_time(self):
|
||||||
self.device.contrast(self.contrast)
|
self.device.contrast(self.contrast)
|
||||||
hour = str(datetime.now().hour).rjust(2, "0")
|
hour = str(datetime.now().hour).rjust(2, "0")
|
|
@ -90,6 +90,14 @@ async def flash(count: int = 1, contrast: Optional[int] = None):
|
||||||
return {"message": "Display flashed"}
|
return {"message": "Display flashed"}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/pattern")
|
||||||
|
async def flash(pattern: str="0,1", off_ms: int = 500, on_ms: int = 500):
|
||||||
|
await queue.set_idle_action(
|
||||||
|
matrix_display.pattern, pattern=pattern, off_ms=off_ms, on_ms=on_ms
|
||||||
|
)
|
||||||
|
return {"message": "Activated pattern."}
|
||||||
|
|
||||||
|
|
||||||
@app.post("/contrast")
|
@app.post("/contrast")
|
||||||
async def contrast(contrast: Optional[int] = None):
|
async def contrast(contrast: Optional[int] = None):
|
||||||
if contrast:
|
if contrast:
|
||||||
|
|
Loading…
Reference in a new issue