Implemented api

This commit is contained in:
Maximilian Giller 2023-10-11 01:34:34 +02:00
parent 0492325cd4
commit 22353f34ab
5 changed files with 117 additions and 170 deletions

View file

@ -1,2 +1,3 @@
gpiozero
luma.led_matrix
luma.led_matrix
fastapi

54
src/main.py Normal file
View file

@ -0,0 +1,54 @@
from fastapi import FastAPI, HTTPException
from datetime import datetime
import requests
from matrix import MatrixDisplay
import asyncio
app = FastAPI()
matrix_display = MatrixDisplay()
should_run_time_loop = True
async def display_time():
while should_run_time_loop:
try:
matrix_display.show_current_time()
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display time on the matrix display: {e}",
)
seconds_until_next_minute = 60 - datetime.now().second
await asyncio.sleep(seconds_until_next_minute)
@app.post("/time")
async def start_time_loop():
global should_run_time_loop
should_run_time_loop = True
asyncio.create_task(display_time())
return {"message": "Time loop started"}
@app.post("/message")
async def display_message(body: dict):
global should_run_time_loop
should_run_time_loop = False
message_text = body.get("message")
try:
matrix_display.show_text(message_text)
except requests.exceptions.RequestException as e:
raise HTTPException(
status_code=500,
detail=f"Failed to display message on the matrix display: {e}",
)
return {"message": "Message displayed"}
@app.post("/stop")
async def stop_time_loop():
global should_run_time_loop
should_run_time_loop = False
return {"message": "Time loop stopped"}

61
src/matrix.py Normal file
View file

@ -0,0 +1,61 @@
import time
from datetime import datetime
from luma.core.interface.serial import spi, noop
from luma.core.render import canvas
from luma.led_matrix.device import max7219
from luma.core import legacy
from luma.core.virtual import viewport
from luma.core.legacy.font import proportional, CP437_FONT, LCD_FONT
class MatrixDisplay:
def __init__(self, *, contrast=0) -> None:
self.contrast = contrast
self.serial = spi(port=0, device=0, gpio=noop())
self.device = max7219(
self.serial, width=32, height=8, rotate=2, block_orientation=-90
)
self.device.contrast(self.contrast)
def show_text(self, text):
width = len(text) * 8 + 4 * 8
virtual = viewport(self.device, width + 4 * 8, height=8)
with canvas(virtual) as draw:
legacy.text(
draw, (4 * 8, 0), text, fill="white", font=proportional(CP437_FONT)
)
for offset in range(width):
virtual.set_position((offset, 0))
time.sleep(0.015)
def flash(self, count=1):
self.device.contrast(255)
while count > 0:
with canvas(self.device) as draw:
draw.rectangle((0, 0, 31, 7), outline="white", fill="white")
time.sleep(0.1)
with canvas(self.device) as draw:
draw.rectangle((0, 0, 31, 7), outline="black", fill="black")
time.sleep(0.1)
count -= 1
self.device.contrast(self.contrast)
def turn_off(self):
self.device.contrast(0)
with canvas(self.device) as draw:
draw.rectangle((0, 0, 31, 7), outline="black", fill="black")
self.device.contrast(self.contrast)
def show_current_time(self):
hour = str(datetime.now().hour).rjust(2, "0")
minute = str(datetime.now().minute).rjust(2, "0")
with canvas(self.device) as draw:
draw.rectangle((16, 0, 31, 7), outline="white", fill="white")
legacy.text(draw, (0, 1), hour, fill="white", font=proportional(CP437_FONT))
legacy.text(
draw, (17, 1), minute, fill="black", font=proportional(CP437_FONT)
)

View file

@ -1,133 +0,0 @@
import time
from datetime import datetime, timedelta
from luma.core.interface.serial import spi, noop
from luma.core.render import canvas
from luma.led_matrix.device import max7219
from luma.core import legacy
from luma.core.virtual import viewport
from luma.core.legacy.font import proportional, CP437_FONT, LCD_FONT
messageIncoming=False
stdContrast=0
directmessengerInitialized = False
def setMsgInc (state):
global messageIncoming
messageIncoming = state
def getMsgInc ():
global messageIncoming
return messageIncoming
def setDirectmessengerState (state):
global directmessengerInitialized
directmessengerInitialized = state
def getDirectmessengerState ():
global directmessengerInitialized
return directmessengerInitialized
def printTextToMatrix (text):
global device
width=len(text) * 8 + 4 * 8
virtual=viewport(device, width + 4 * 8, height=8)
with canvas(virtual) as draw:
legacy.text(draw, (4 * 8, 0), text, fill="white", font=proportional(CP437_FONT))
for offset in range(width):
virtual.set_position((offset, 0))
time.sleep(0.015)
def printFlash (count):
global device
device.contrast(255)
while count > 0:
with canvas(device) as draw:
draw.rectangle((0, 0, 31, 7), outline="white", fill="white")
time.sleep(0.1)
with canvas(device) as draw:
draw.rectangle((0, 0, 31, 7), outline="black", fill="black")
time.sleep(0.1)
count-=1
global stdContrast
device.contrast(stdContrast)
def printDark ():
global device
device.contrast(0)
with canvas(device) as draw:
draw.rectangle((0, 0, 31, 7), outline="black", fill="black")
global stdContrast
device.contrast(stdContrast)
def getEncodedMsg(message):
message.encoding='utf-8'
message=message.replace(chr(252), 'ue')
message=message.replace(chr(228), 'ae')
message=message.replace(chr(246), 'oe')
message=message.replace(chr(220), 'Ue')
message=message.replace(chr(196), 'Ae')
message=message.replace(chr(214), 'Oe')
return message
def initializeDirectmessenger ():
global directmessengerInitialized
try:
connection=stomp.Connection([('wncld.goip.de',61613)], keepalive=True)
connection.set_listener('', MyListener())
connection.start()
connection.connect('matrixclock', 'vj4.dV+vy2;ZBa4Y[Bg9=7<K{x9Z78', wait=True)
connection.subscribe(destination='/topic/directmessenger', id=1, ack='auto')
setDirectmessengerState(True)
except:
setDirectmessengerState(False)
def printClock ():
hour=str(datetime.now().hour).rjust(2, '0')
minute=str(datetime.now().minute).rjust(2, '0')
with canvas(device) as draw:
draw.rectangle((16, 0, 31, 7), outline="white", fill="white")
legacy.text(draw, (0, 1), hour, fill="white", font=proportional(CP437_FONT))
legacy.text(draw, (17, 1), minute, fill="black", font=proportional(CP437_FONT))
def timeInNightRange ():
now = datetime.now()
return now.hour < 6
def exitClock ():
printTextToMatrix("DOWN")
printDark()
serial=spi(port=0, device=0, gpio=noop())
device=max7219(serial, width=32, height=8, rotate=2, block_orientation=-90)
device.contrast(stdContrast)
printTextToMatrix("Guten Abend!")
while True:
#while getMsgInc():
# time.sleep(1)
# if timeInNightRange():
# printDark()
# now = datetime.now()
# timeUntilMorning = 6 - now.hour - (3600 - 60*now.minute - now.second)
# time.sleep(timeUntilMorning)
printClock()
#if getDirectmessengerState() == False :
# initializeDirectmessenger()
timeUntilNextMinute = 60 - datetime.now().second
time.sleep(timeUntilNextMinute)

View file

@ -1,36 +0,0 @@
import time
import sys
import stomp
from luma.core.interface.serial import spi, noop
from luma.core.render import canvas
from luma.led_matrix.device import max7219
from luma.core import legacy
from luma.core.legacy.font import proportional, CP437_FONT, LCD_FONT
class MyListener(stomp.ConnectionListener):
def on_error(self, headers, message):
print('received an error "%s"' % message)
def on_message(self, headers, message):
with canvas(device) as draw:
legacy.text(draw, (0, 0), message, fill="white", font=proportional(CP437_FONT))
print('received a message "%s"' % message)
conn = stomp.Connection([('192.168.178.36',61613)])
conn.set_listener('', MyListener())
conn.start()
conn.connect('matrixclock', 'vj4.dV+vy2;ZBa4Y[Bg9=7<K{x9Z78', wait=True)
conn.subscribe(destination='/topic/test', id=1, ack='auto')
serial = spi(port=0, device=0, gpio=noop())
device = max7219(serial, width=32, height=8, block_orientation=-90)
device.contrast(0)
while True:
try:
time.sleep(3)
except KeyboardInterrupt:
conn.disconnect()