import os import paho.mqtt.client as mqtt from influxdb import InfluxDBClient import json import logging logging.basicConfig(level=logging.DEBUG) # mqttc = mqtt.Client("client-id") # but note that the client id must be unique on the broker. Leaving the client # id parameter empty will generate a random id for you. mqttc = mqtt.Client() influx_client = InfluxDBClient("localhost", 8086, "co2ampel", "55zP5S5YOS5+zQ==", "co2ampel") def messageReceived(client, userdata, message): print("received topic" + str(message.topic) + "with payload: " + str(message.payload)) sensorId = message.topic.replace("/co2ampel/","") print("sensor id: " + sensorId) message_as_json = json.loads(message.payload.decode("utf-8")) insert_json = [ { "measurement": "measurement", "tags": { "sensor": sensorId, }, "fields": { "ppmCO2": float(message_as_json["ppmCO2"]) } } ] optional_description = message_as_json.get("description") if (optional_description): insert_json[0]["fields"]["description"]=optional_description optional_temperature = message_as_json.get("temperatur") if(optional_temperature): insert_json[0]["fields"]["temperature"]=optional_temperature print("sending") print(insert_json) if (influx_client.write_points(insert_json)): print ("insert success") else: print ("insert fail") def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) if (rc == 0): print("(Success)") else: print("(Fail)") # Subscribing in on_connect() means that if we lose the connection and # reconnect then subscriptions will be renewed. client.subscribe("/co2ampel/#", 0) mqttc.username_pw_set("knurps", "LEPEZ1ELYDUmjg") mqttc.on_message = messageReceived mqttc.on_connect = on_connect mqttc.connect("ccc-p.org", 1883, 60) mqttc.loop_forever()