MQTT客户端实战:从连接到通信。详细说明MQTT客户端和MQTT代理进行通信

news2024/9/25 16:38:22

在这里插入图片描述

EMQX安装

EMQX服务器安装

安装文档,见链接不另外写

https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html

启动 EMQX

启动为一个 systemd 服务:

sudo systemctl start emqx

在windows安装客户端

在线 MQTT WebSocket 客户端工具,MQTTX Web 是开源的 MQTT 5.0 浏览器客户端,但是经我测试没有成功,好像有bug.

建议使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX,旨在帮助开发者在不需要使用图形化界面的基础上,也能更快的开发和调试 MQTT 服务与应用。
在这里插入图片描述
由于是后期被写的博文,图是借官方的。请自行区分一下。

平台安装后的地址

1,平台的地址

  • http://127.0.0.1:18083
    后台登录 用户名:test 密码:test

Laravel中处理MQTT订阅

1,安装MQTT客户端库

在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:

composer require php-mqtt/client

2, 新建command文件

文件路径:app/Console/Commands/MqttClientCommand.php

这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。

MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT连接。

MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。

<?php

namespace App\Console\Commands;

use App\Http\Controllers\Wxapi\DeviceReportController;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;

use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Client;
use Simps\MQTT\Config\ClientConfig;
use Simps\MQTT\Hex\ReasonCode;

use Swoole\Coroutine;
use Illuminate\Support\Facades\Redis;

class MQTTUserConfig
{    
    const SIMPS_MQTT_REMOTE_HOST = '*';
    const SIMPS_MQTT_PORT = 1883;
    const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;
    const SIMPS_MQTT_USER = 'test*';
    const SIMPS_MQTT_PASSWORD = 'test*';
}

class MqttClientCommand extends Command
{
    protected $signature = 'mqtt:handle {param1}';

    protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';
    protected  $mqtt ;
    const SWOOLE_MQTT_CONFIG = [
        'open_mqtt_protocol' => true,
        'package_max_length' => 2 * 1024 * 1024,
        'connect_timeout' => 5.0,
        'write_timeout' => 5.0,
        'read_timeout' => 5.0,
    ];

    //模拟设备
    const CLiENT_IDs = [
        'mqttx_devA',
        'mqttx_devB',
        'mqttx_devC',
        'mqttx_devD'
    ];

    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {

        $param1 =$this->argument('param1');
//        $param2 =$this->argument('param2');
        if ($param1=='subscribe') {
            $this->info('启动订阅...');
            $this->subscribeMqtt();
        } elseif ($param1=='public') {
            $this->info('启动发布...');
            $this->publishMQTT();
        }
        echo '\r\n\r\n分配工作执行完成!!!';

    }



    protected function getTestMQTT5ConnectConfig()
        {

            $config = new ClientConfig();
            $UserConfig = new MQTTUserConfig();
            return $config->setUserName($UserConfig::SIMPS_MQTT_USER)
                ->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)
                ->setClientId(Client::genClientID())
                ->setKeepAlive(10)
                ->setDelay(3000) // 3s
                ->setMaxAttempts(5)
                ->setProperties([
                    'session_expiry_interval' => 60,
                    'receive_maximum' => 65535,
                    'topic_alias_maximum' => 65535,
                ])
                ->setProtocolLevel(5)
                ->setSwooleConfig( [
                    'open_mqtt_protocol' => true,
                    'package_max_length' => 2 * 1024 * 1024,
                    'connect_timeout' => 5.0,
                    'write_timeout' => 5.0,
                    'read_timeout' => 5.0,
                ]);

    }

    private function heartbeat($message) {
        if ($message) {
            parse_str($message,$array);
            $device = $array['imei'];
            $hash = ':mqtt:heartbeat:online'.":{$device}";
            Redis::expire($hash,30);  ##30s有效
            Redis::sAdd($hash,1);
        }

    }
    /*
     * 订阅
     *  private function subscribeMqtt(){


        Coroutine\run(function () {
            $client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());
            ....
     */
    private function subscribeMqtt(){

        Coroutine\run(function () {
            $UserConfig = new MQTTUserConfig();
            $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,
            $this->getTestMQTT5ConnectConfig());
            $will = [
                'topic' => 'simps-mqtt/dinweiyi/delete',
                'qos' => 1,
                'retain' => 0,
                'message' => 'byebye',
                'properties' => [
                    'will_delay_interval' => 60,
                    'message_expiry_interval' => 60,
                    'content_type' => 'test',
                    'payload_format_indicator' => true, // false 0 1
                ],
            ];
            $client->connect(true, $will);

            $topics['simps-mqtt/dinweiyi/subscribe_message'] = [
                'qos' => 2,
                'no_local' => true,
                'retain_as_published' => true,
                'retain_handling' => 2,
            ];

            $res = $client->subscribe($topics);
            $timeSincePing = time();
            var_dump($res);

            echo '\r\n\r\n connect success !!!';
            while (true) {
                try {
                    $buffer = $client->recv();
                    $message = null;
                    if ($buffer && $buffer !== true) {
                        $message = $buffer["message"];

                        // QoS1 PUBACK
                        if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {
                            $client->send(
                                [
                                    'type' => Types::PUBACK,
                                    'message_id' => $buffer['message_id'],
                                ],
                                false
                            );
                        }
                        if ($buffer['type'] === Types::DISCONNECT) {
                            echo sprintf(
                                "Broker is disconnected, The reason is %s [%d]\n",
                                ReasonCode::getReasonPhrase($buffer['code']),
                                $buffer['code']
                            );
                            $client->close($buffer['code']);
                            break;
                        }
                        $reportObj = new DeviceReportController();

                        $ret = $reportObj->store($message);
                        var_dump("182>>>",$ret);
                        unset($reportObj);
                    }
                    if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {
                        $buffer = $client->ping();
                        if ($buffer) {
                            echo 'send ping success ...' ;
                            $this->heartbeat($message);
                            $timeSincePing = time();
                        }
                    }

                } catch (\Throwable $e) {
                    throw $e;
                }
            }

        });


    }

    protected function getMessage() {
        $client_ids = [
            'mqttx_devA',
//            'mqttx_devB',
            'mqttx_devC',
            'mqttx_devD'
        ];
        $message = [];
        $message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];
        $message['time'] = time();
        $message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];
        return json_encode($message);
    }
    /*
     * 发布
     */
    public function publishMQTT() {
        Coroutine\run(function () {
            $UserConfig = new MQTTUserConfig();
            $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,
            $this->getTestMQTT5ConnectConfig());
            $client->connect();
            while (true) {
                $message = $this->getMessage();
                $response = $client->publish(
                    'simps-mqtt/user/subscribe_message',
                    $message,
                    1,
                    0,
                    0,
                    [
                        'topic_alias' => 1,
                        'message_expiry_interval' => 12,
                    ]
                );
                var_dump( 'publishMQTT>>>',$message);
                Coroutine::sleep(1);
            }
        });
    }

}

3, 代码流程图

使用Mermaid语法描述的上述PHP代码的流程图:

subscribe
public
收到消息
心跳超时
开始
构造函数 __construct
handle 方法
param1 参数
调用 subscribeMqtt
调用 publishMQTT
Coroutine 运行 subscribeMqtt
创建 MQTT 客户端并连接
设置遗嘱消息
订阅主题
接收消息
处理消息
发送心跳
心跳函数 heartbeat
存储消息
是否断开连接
关闭连接
Coroutine 运行 publishMQTT
创建 MQTT 客户端并连接
循环发布消息
获取测试消息
发布消息
结束

流程说明:

  1. 开始:程序启动。
  2. 构造函数 __construct:初始化命令行工具。
  3. handle 方法:处理命令行输入。
  4. param1 参数:根据输入的参数决定是订阅还是发布。
  5. 调用 subscribeMqtt:如果参数是subscribe,则调用此方法。
  6. 调用 publishMQTT:如果参数是public,则调用此方法。
  7. Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
  8. 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
  9. 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
  10. 订阅主题:订阅特定的MQTT主题。
  11. 接收消息:持续监听并接收消息。
  12. 处理消息:对接收到的消息进行处理。
  13. 心跳函数 heartbeat:检查设备心跳。
  14. 存储消息:将消息存储到数据库或其他存储系统。
  15. 是否断开连接:检查客户端是否断开连接。
  16. 关闭连接:如果断开,则关闭连接。
  17. Coroutine 运行 publishMQTT:在协程中运行发布方法。
  18. 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
  19. 循环发布消息:循环发布消息。
  20. 获取测试消息:生成要发布的测试消息。
  21. 发布消息:将消息发布到MQTT服务器。
  22. 结束:程序结束。

后台常驻运行

1,php artisan命令在后台运行
  1. 打开您的终端或SSH到您的服务器。
  2. 使用nohup命令运行您的Artisan命令进行测试,如下所示
php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe

3.命令行的php的版本与web php的版本号要一致

2,使用宝塔的守护进程开启进程

在这里插入图片描述
也可以添加守护进程。
以上2种最好是只选一个

测试

打开emqx web ,在浏览器输入http://127.0.0.0.1:18083/#/websocket

在这里插入图片描述
主题:
主题跟php代码内的主题是一致的。
Payload:
是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。
已发送
会出现发布的主题和内容

检查发送的结果

打开数据库,检查device_report表是否成功。成功应下图所示:
在这里插入图片描述

实操完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2164149.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

唯众智能化控制箱

为满足智慧城市、雪亮工程、智能交通、智慧农业等领域大数据信息化的管理要求&#xff0c;唯众自主研发设计了智能化控制箱&#xff0c;该产品是一款集智能网络传输、温湿度监测、门锁控制于一体的综合系统。该系统由先进的I/O网络模块、高精度传感器、强大的管理后端以及便捷的…

window下 php 安装 lua扩展

1.执行php -v 看看自己的php是什么版本 2.下载对应版本 https://pecl.php.net/package/lua 3.安装 php_lua.dll 放在 php的ext文件夹下 liblua.dll放在php的根目录下 4.编辑php.ini 5.重启 6. 执行 php -m

计算机毕业设计之:云中e百货微信小程序设计与实现(源码+文档+定制)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

国内可用ChatGPT-4中文镜像网站整理汇总【持续更新】

一、GPT中文镜像网站 ① yixiaai.com 支持GPT4、4o以及o1&#xff0c;支持MJ绘画 ② chat.lify.vip 支持通用全模型&#xff0c;支持文件读取、插件、绘画、AIPPT ③ AI Chat 支持GPT3.5/4&#xff0c;4o以及MJ绘画 二、模型知识 o1/o1-mini&#xff1a;最新的版本模型&am…

跑lvs出现soft connect怎么处理?

首先&#xff0c;我们先了解一下什么是soft connect。简而言之&#xff0c;就是工具会将所有连接在psub上的信号认作soft connect&#xff08;也就是short&#xff09;。如图1所示&#xff0c;VSS和AVSS都接到了p上&#xff0c;它们通过psub便有了soft connect。 如果有soft co…

AfuseKt1.3.6-10110功能强大的安卓网络视频播放器,支持多种在线存储和媒体管理平台!

AfuseKt 是一款功能强大的安卓网络视频播放器&#xff0c;专为满足用户对多样化媒体播放需求而设计。它不仅支持多种流行的在线存储和媒体管理平台&#xff0c;如阿里云盘、Alist、WebDAV和Emby等&#xff0c;还提供了刮削功能和海报墙展示&#xff0c;使得用户能够更加便捷地管…

解锁视频生成新时代! 探索智谱CogVideoX-2b:轻松生成6秒视频的详细指南

文章目录 一、CogVideoX的诞生背景二、 创建丹摩平台实例三、 环境配置与依赖安装四、模型文件与配置五、 模型运行六、使用 Web 界面生成视频 一、CogVideoX的诞生背景 CogVideoX 的推出标志着视频生成技术进入了一个全新的阶段。在视频生成领域&#xff0c;长期以来一直存在效…

2024 年海上安全:技术集成商需要考虑的几件事

今年&#xff0c;海上发生了许多新的冲突&#xff0c;从索马里海盗到红海商船遇袭。这些事件表明&#xff0c;2024 年&#xff0c;安全专业人员做好准备帮助客户应对海上紧急情况&#xff08;无论是什么情况&#xff09;是多么重要。 技术是任何安全战略的关键推动因素。掌握最…

Python 课程19-FastAPI

前言 FastAPI 是一个用于构建 API 的现代化、快速的 Python Web 框架。它基于 Python 的 type hints 构建&#xff0c;能够自动生成 API 文档并提供出色的性能。FastAPI 的设计目标是简单易用、高性能和支持异步操作&#xff0c;因此它非常适合开发高并发的 Web 应用程序和 AP…

c# 子类继承父类接口问题

在C#中&#xff0c;子类并不直接“继承”父类继承的接口&#xff0c;但子类的确会继承父类对接口的实现&#xff08;如果父类实现了该接口&#xff09;。这里有一些关键的概念需要澄清&#xff1a; 接口继承&#xff1a;当一个类实现了某个接口时&#xff0c;它必须实现接口中…

Webpack教程-概述

什么是Webpack Webpack是一个静态资源打包工具。它以一个或多个文件作为打包入口&#xff0c;将整个项目所有的文件编译组合成一个或多个文件进行输出。(输出的文件即编译好的文件&#xff0c;就可以在浏览器上运行) Webpack官网 核心概念 entry (入口) entiry 指webpack…

2024下最全软考机考操作事项、绘图指南合集!

从2023年下半年软考全部科目改革为机考方式后&#xff0c;到现在已经进行过两场考试&#xff0c;鉴于有很多考生是初次参加软考&#xff0c;就给大家介绍下关于软考机考的具体操作指南&#xff0c;希望对大家有所帮助。 一、操作事项 在考试正式开始前&#xff0c;软考办会开放…

ClickHouse复杂查询单表亿级数据案例(可导出Excel)

通过本篇博客&#xff0c;读者可以了解到如何在 ClickHouse 中高效地创建和管理大规模销售数据。随机数据生成和复杂查询的示例展示了 ClickHouse 的强大性能和灵活性。掌握这些技能后&#xff0c;用户能够更好地进行数据分析和决策支持&#xff0c;提升业务洞察能力。 表结构…

性能测试1初步使用Jmeter

当你看到这边文章的时候&#xff0c;详细你已经知道啥是性能测试&#xff0c;以及也听说过Jmeter了&#xff0c;所以不过多介绍&#xff0c;这里&#xff0c;只是帮助你快速的使用Jmeter来测试接口。 1获取安装包 官网下载地址&#xff1a;https://jmeter.apache.org/downloa…

力扣19 删除链表的倒数第N个节点 Java版本

文章目录 题目描述代码 题目描述 给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5] 示例 2&#xff1a; 输入&#xff1a;head [1], n 1 …

erlang学习:Linux命令学习4

顺序控制语句学习 if&#xff0c;else对文件操作 判断一个文件夹是否存在&#xff0c;如果存在则进行删除&#xff0c;如果不存在则创建该文件夹&#xff0c;并复制一份该脚本后&#xff0c;删除该脚本 if [ -d "/erlangtest/testdir"]; then echo "删除文件夹…

数字化转型:国内证书哪个更有用

探讨数字化转型&#xff0c;有哪些国内证书推荐&#xff1f;让我们一起来了解一下。 软考-系统集成项目管理工程师(中项)/信息系统项目管理师(高项)&#xff1a;由人社部和工信部联合颁发&#xff0c;紧密贴合国内IT领域的项目管理实际需求。 这两个软考科目没有考试门槛限制…

AI 文生图快速入门教程:让 Stable Diffusion 更易于上手

Stable Diffusion 是一个强大的 AI 图像生成工具&#xff0c;但它可能会消耗大量资源。在本指南中&#xff0c;我们将学习如何使用 AUTOMATIC1111 的 Stable Diffusion WebUI 来设置它。同时&#xff0c;我们将在 DigitalOcean GPU Droplet 云服务器上运行它&#xff0c;通过 H…

python爬虫:从12306网站获取火车站信息

代码逻辑 初始化 (init 方法)&#xff1a; 设置请求头信息。设置车站版本号。 同步车站信息 (synchronization 方法)&#xff1a; 发送GET请求获取车站信息。返回服务器响应的文本。 提取信息 (extract 方法)&#xff1a; 从服务器响应中提取车站信息字符串。去掉字符串末尾的…

钰泰-ETA6027限流开关IC

描述 ETA6027 是一种负载开关&#xff0c;可为可能遇到大电流条件的系统和负载提供全面保护。ETA6027 提供 70mΩ 限流开关&#xff0c;可在 2.1-6V 的输入电压范围内工作。电流限制可通过精密电阻器进行外部编程&#xff0c;范围为 75mA 至 2.2A。开关控制由能够直接与低电压…