树莓派+DHT11传感器(温度,湿度检测)实验Python实验

为了测试树莓派的GPIO接口,并尝试发出数据,做此实验并记录。


实验实现了以下内容: 

  1. 通过树莓派GPIO来连接并控制DHT11传感器,获得温度,湿度数据;
  2. 使用MQTT将树莓派所获取的温度与湿度数据广播;
  3. PC端通过订阅MQTT服务器的广播获取湿度与温度数据,并存入数据库。

1.树莓派连接控制DHT11传感器,获得温度,湿度数据并用MQTT广播

  1. 在树莓派未通电情况下连接GPIO引脚与DHT11;

启动树莓派,并根据所连接引脚进行编码。

# 此代码块在树莓派端运行
import time
import board
import adafruit_dht  # 读取DHT11数据使用的库,该库由官方提供下载
from datetime import datetime
import paho.mqtt.client as mqtt  # mqtt库
import json # mqtt 服务器地址/端口
broker = "192.168.137.57"
port = 1883
keepalive = 60
# 需要发布的主题
topic = "DHT11_message"# 实例化并连接
client = mqtt.Client()
client.connect(broker, port)Pin = board.D4  # 使用GPIO(BCM),注意根据官方提供的GPIO引脚图进行连接
dhtDevice = adafruit_dht.DHT11(Pin)while True:try:# 获取时间recDate = datetime.now().strftime('%Y-%m-%d %H:%M:%S')# 获取传感器数值temperature_c = dhtDevice.temperaturehumidity = dhtDevice.humidity# mqtt 传输msg = json.dumps({'Tem':temperature_c,'Hum':humidity,'Date':recDate})result = client.publish(topic, msg)status = result[0]if status == 0:print(f"Send `{msg}` to topic `{topic}`")else:print(f"Failed to send message to topic {topic}")print(msg)except RuntimeError as error:# Errors happen fairly often, DHT's are hard to read, just keep goingprint(error.args[0])time.sleep(2.0)continueexcept Exception as error:dhtDevice.exit()raise errortime.sleep(2.0)

2.订阅MQTT服务器的广播获取湿度与温度数据,并存入数据库

本实验中我将MQTT服务器搭建在树莓派上,所以MQTT服务器的地址就是树莓派目前的IP。当然也可以在PC端搭建MQTT服务器

'''
PC端DHT11数据接收,并写入数据库中
'''
import paho.mqtt.client as mqtt
import time
import json
import pymysql# 连接数据库用参数
DBHOST = 'localhost'
DBUSER = 'root'
DBPASS = '123456'  # 数据库登录密码
DBNAME = 'DHT11'   # 数据库 库名# 连接数据库并创建实例
try:# 连接数据库,所需参数(主机名,用户名,密码,数据库名)db = pymysql.connect(host=DBHOST, user=DBUSER, password=DBPASS, database=DBNAME,charset='utf8')print('连接成功')
except pymysql.Error as e:print('连接数据库失败'+str(e))cur = db.cursor()# MQTT服务器地址及端口
HOST = "192.168.000.000"  # 服务器ip地址
MQTT_PORT = 1883  # 服务器端口(基本默认为1883)# 访问mqtt数据部分
def on_connect(client, userdata, flags, rc):rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户密码错误", "无授权"]print("connect:", rc_status[rc])def on_message(client, userdata, msg):print("主题:", msg.topic)res = msg.payload.decode("UTF-8")data = json.loads(res)print("消息:", data)write_mysql(data)#写入数据库
def write_mysql(data):sql ="insert into sensordata (nowtime,tem,hum) values (%s,%s,%s)"params = (data['Date'],data['Tem'],data['Hum'])db.ping(reconnect=True)cur.execute(sql,params)db.commit()print("数据库插入成功")def main():client = mqtt.Client()client.on_connect = on_connect  # 返回连接状态的回调函数client.connect(HOST, MQTT_PORT, keepalive=600)  # 连接服务器client.on_message = on_message  # 定义回调函数client.subscribe('DHT11_message', qos=0)  # 订阅主题client.loop_forever()           # 执行
main()


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部