如何在Python中使用MQTT

描述

Python 是一种跨平台的计算机程序设计语言,是 ABC 语言的替代品,属于面向对象的动态类型语言。它最初被设计用于编写自动化脚本,随着版本的不断更新和语言新功能的添加,越来越多被用于独立的、大型项目的开发。

 

MQTT 是一个物联网传输协议,用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。其轻量、简单、开放和易于实现等特点,使得它适用范围更加广泛。

 

本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。

一、项目准备

本项目使用 Python 3.10 进行开发测试。

 

用户可用以下命令来确认 Python 的版本:

 

python3 --version 

Python 3.10.9

 

测试设备:

 

瑞科慧联(RAK)网关 RAK7268 V2、带温湿度传感器的数据采集器 Sensor Hub

二、选择 MQTT 客户端库

paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库。它为 Python 2.7 或 3.x 版本以上的客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持,还提供了一些帮助程序功能。这使得消息发布到 MQTT 服务器变得更简单。

三、Pip 安装 Paho MQTT 客户端

Pip 是 Python 包管理工具。该工具提供了对 Python 包的查找、下载、安装、卸载的功能。

 

pip3 install paho.mqtt

四、Python MQTT 使用

1、连接 MQTT 服务器

本文将使用瑞科慧联 LoRaWAN® 网关提供的内置 MQTT 服务,该服务基于 Mosquitto 的开源消息代理。服务器接入信息如下:

  • Broker: 192.168.230.1
  • TCP Port: 1883

2、导入 Paho MQTT 客户端

from paho.mqtt import client as mqtt

3、设置 MQTT Broker 连接参数

设置 MQTT Broker 连接地址,端口以及 topic,同时调用 Python random.randint  函数随机生成 MQTT 客户端 id。

 

MQTT_SERVER_IP = "192.168.230.1"

MQTT_PORT = 1883

4、编写 MQTT 连接函数

编写连接回调函数  on_connect,该函数将在客户端连接后会被调用。在该函数中可以依据 rc 来判断客户端是否连接成功。同时可创建一个 MQTT 客户端连接到 broker.emqx.io

 

def mqtt_connect(MQTT_SERVER_IP, MQTT_PORT):

    """连接MQTT服务器"""

    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))

    mqttClient = mqtt.Client(client_id)

    mqttClient.on_connect = on_connect  # 返回连接状态的回调函数

    mqttClient.on_message = on_message  # 返回订阅消息回调函数

    MQTT_HOST = MQTT_SERVER_IP  # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username", "password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST, MQTT_PORT, 60)

    mqttClient.loop_start()  # 启用线程连接

    return mqttClient

5、发布消息

定义一个 while 循环语句,在循环中设置每秒调用 MQTT 客户端 publish 函数向 /python/mqtt 主题发送消息。

 

ddef on_publish():

    # 发布消息

    msg_count = 0

    while True:

        time.sleep(1)

        mqttClient = mqtt_connect(MQTT_SERVER_IP, MQTT_PORT)

        topic = 'application/1/device/0000000000000444/tx' # 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg = '{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result = mqttClient.publish(topic, msg)

        status = result[0]

        if status == 0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count += 1

6、订阅消息

编写消息回调函数  on_message,函数将在客户端从 MQTT Broker 收到消息后被调用,并打印出订阅的 topic 名称以及接收到的消息内容。

 

def on_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient = mqtt_connect(MQTT_SERVER_IP, MQTT_PORT)

    while True:

        mqttClient.subscribe("application/#", 2)

        time.sleep(1)

7、完整代码

消息订阅代码

 

#!/usr/bin/python

from paho.mqtt import client as mqtt

import time

import json

# from settings import *

import base64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP = "192.168.230.1"

MQTT_PORT = 1883

def on_connect(client, userdata, flags, rc):

    """一旦连接成功, 回调此方法"""

    rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]

    print("connect:",rc_status[rc])

def on_message(client, userdata, msg):

    """一旦订阅到消息, 回调此方法"""

    print("主题" + msg.topic + " 消息" + str(msg.payload.decode('gbk')))

    print("主题" + msg.topic + " 消息" + str(msg.payload.decode()))

    try:

        temp = json.loads(msg.payload.decode())

        # client.disconnect()

        deveui = temp['devEUI']

        print("devEUI: ", deveui)

        data = temp['data']

        print("解码前的data为: ", data)

        data_decode = base64.b64decode(data).hex()

        print("解码后的data为: ", data_decode)

        str1 = data_decode[4:]

        if str1[0:4]=="0167":

            a = int(str1[4:8],16)*0.1 

            print("温度:", a,"℃")

            if str1[8:12]=="0268":

               b = int(str1[12:16],16)

            print("湿度:", b,"%RH")

        elif str1[0:4]=="0268":

            c = int(str1[4:8],16)

            print("湿度:", c,"%RH")                        

    except Exception as e:

        print(e)

def mqtt_connect(MQTT_SERVER_IP, MQTT_PORT):

    """连接MQTT服务器"""

    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))

    mqttClient = mqtt.Client(client_id)

    mqttClient.on_connect = on_connect  # 返回连接状态的回调函数

    mqttClient.on_message = on_message  # 返回订阅消息回调函数

    MQTT_HOST = MQTT_SERVER_IP  # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username", "password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST, MQTT_PORT, 60)

    mqttClient.loop_start()  # 启用线程连接

    return mqttClient

def on_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient = mqtt_connect(MQTT_SERVER_IP, MQTT_PORT)

    while True:

        mqttClient.subscribe("application/#", 2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if __name__ == '__main__':

    on_subscribe()

 

消息发布代码

 

#!/usr/bin/python

from paho.mqtt import client as mqtt

import time

import json

# from settings import *

import base64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP = "192.168.230.1"

MQTT_PORT = 1883

def on_connect(client, userdata, flags, rc):

    """一旦连接成功, 回调此方法"""

    rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]

    print("connect:",rc_status[rc])

def mqtt_connect(MQTT_SERVER_IP, MQTT_PORT):

    """连接MQTT服务器"""

    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))

    mqttClient = mqtt.Client(client_id)

    mqttClient.on_connect = on_connect  # 返回连接状态的回调函数

    MQTT_HOST = MQTT_SERVER_IP  # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username", "password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST, MQTT_PORT, 60)

    mqttClient.loop_start()  # 启用线程连接

    return mqttClient

def on_publish():

    # 发布消息

    msg_count = 0

    while True:

        time.sleep(1)

        mqttClient = mqtt_connect(MQTT_SERVER_IP, MQTT_PORT)

        topic = 'application/x/device/x/tx' # 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg = '{"confirmed": true,"data": "SGVsbG8=","fPort": 10}' #需要发布的消息内容

        result = mqttClient.publish(topic, msg)

        status = result[0]

        if status == 0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count += 1

if __name__ == '__main__':

    on_publish()

 

测试

 

消息发布

运行 MQTT 消息发布代码,将看到客户端连接成功,并且成功将消息发布。

 

物联网

 

消息订阅

通过瑞科慧联带温湿度传感器的 Sensor hub 进行数据传输,订阅并解析数据结果如下:

 

物联网

五、总结

至此,我们完成了使用  paho-mqtt  客户端连接到 LoRaWAN® 网关内置 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅并解析。

 

与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现。使用 Python 可以减少代码上的逻辑复杂度,降低与设备的交互成本。未来,我们相信在物联网领域 Python 将会有更广泛的应用!

 

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分