Implemented co2 library

This commit is contained in:
Maximilian Giller 2024-08-10 02:36:09 +02:00
parent e2fa06d94e
commit cb50f230b6
4 changed files with 10 additions and 57 deletions

View file

@ -5,3 +5,4 @@ uvicorn
fastapi-cors
Adafruit_DHT
requests
mh_z19

View file

@ -1,7 +1,7 @@
import asyncio
from datetime import datetime
import os
from config import climate_log_file, dht22_pin, mhz19co2_serial_port
from config import climate_log_file, dht22_pin
from handler.dht22_climate import Dht22Climate
from handler.matrix_display import MatrixDisplay
@ -9,7 +9,7 @@ from handler.mhz19_co2 import Mhz19Co2
climate_sensor = Dht22Climate(dht22_pin)
co2_sensor = Mhz19Co2(mhz19co2_serial_port)
co2_sensor = Mhz19Co2()
matrix_display = MatrixDisplay()

View file

@ -1,3 +1,2 @@
dht22_pin = 17
mhz19co2_serial_port = "/dev/serial0"
climate_log_file = "./climate.csv"

View file

@ -1,69 +1,22 @@
import serial
import mh_z19
import logging
class Mhz19Co2:
def __init__(self, serial_port: str):
def __init__(self):
self.last_read = None
self.serial_port = serial_port
self.baud_rate = 9600
self.byte_size = 8
self.parity = "N"
self.stop_bits = 1
self.timeout = None
def get_last_read(self) -> int | None:
if self.last_read is None:
return self.read()
return self.last_read
def read(self) -> int | None:
ser = None
try:
ser = serial.Serial(
port=self.serial_port,
baudrate=self.baud_rate,
bytesize=self.byte_size,
parity=self.parity,
stopbits=self.stop_bits,
timeout=self.timeout,
)
data: dict | None = mh_z19.read()
# send "Read CO2" command
command_data = bytes([0xFF, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79])
ser.write(command_data)
# read "Return Value (CO2 concentration)"
data = ser.read(9)
# print("data[2]", data[2])
# print("data[3]", data[3])
# print("data[4]", data[4])
# print("data[5]", data[5])
# print("data[6]", data[6])
# print("data[7]", data[7])
# print("data[8]", data[8])
# show CO2 concentration
concentration = data[2] * 256 + data[3]
# print(f"=== send data ===")
# print(f"send: {command_data}")
# print(f"=== read data ===")
# print(f"data: {data}")
# print(f"data[2] {data[2]}")
# print(f"data[3] {data[3]}")
# print(f"CO2 Concentration {concentration} ppm")
# print(f"=== === ===")
# print(f"")
except Exception as e:
logging.exception(f"Error reading data: {e}", exc_info=e)
if not data or "co2" not in data.keys():
return None
finally:
if ser:
ser.close()
ser = None
self.last_read = concentration
return concentration
self.last_read = data["co2"]
return self.last_read