pkgbuilds/co2ampel-receiver/co2ampel-receiver.py

65 lines
1.9 KiB
Python
Raw Permalink Normal View History

2024-01-16 22:10:54 +01:00
import os
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import json
import logging
logging.basicConfig(level=logging.DEBUG)
# mqttc = mqtt.Client("client-id")
# but note that the client id must be unique on the broker. Leaving the client
# id parameter empty will generate a random id for you.
mqttc = mqtt.Client()
influx_client = InfluxDBClient("localhost", 8086, "co2ampel", "55zP5S5YOS5+zQ==", "co2ampel")
def messageReceived(client, userdata, message):
print("received topic" + str(message.topic) + "with payload: " + str(message.payload))
sensorId = message.topic.replace("/co2ampel/","")
print("sensor id: " + sensorId)
message_as_json = json.loads(message.payload.decode("utf-8"))
insert_json = [
{
"measurement": "measurement",
"tags": {
"sensor": sensorId,
},
"fields": {
"ppmCO2": float(message_as_json["ppmCO2"])
}
}
]
optional_description = message_as_json.get("description")
if (optional_description):
insert_json[0]["fields"]["description"]=optional_description
optional_temperature = message_as_json.get("temperatur")
if(optional_temperature):
insert_json[0]["fields"]["temperature"]=optional_temperature
print("sending")
print(insert_json)
if (influx_client.write_points(insert_json)):
print ("insert success")
else:
print ("insert fail")
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
if (rc == 0):
print("(Success)")
else:
print("(Fail)")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("/co2ampel/#", 0)
mqttc.username_pw_set("knurps", "LEPEZ1ELYDUmjg")
mqttc.on_message = messageReceived
mqttc.on_connect = on_connect
mqttc.connect("ccc-p.org", 1883, 60)
mqttc.loop_forever()