java和python实现mqtt

news2025/4/18 18:28:23

说明:

MQTT 异步通信系统功能文档

  1. 系统概述
    本系统基于 MQTT 协议实现异步通信,包含三个核心组件:

Broker(消息代理):负责消息的路由和转发。
Client(主客户端):定时发送时间戳消息并等待响应。
Echo Client(回显客户端):接收消息并原样返回。
所有组件均运行在本地(localhost),使用端口 10008 进行通信。

  1. 功能描述
    2.1 Broker(消息代理)
    持续运行,负责接收和转发消息。
    监听 localhost:10008,确保客户端之间的通信畅通。
    2.2 Client(主客户端)
    每秒发布当前时间戳到 /ping 主题。
    订阅 /pong 主题,等待回显消息。
    检测连接状态,在断开时终止运行。
    2.3 Echo Client(回显客户端)
    订阅 /ping 主题,接收主客户端发送的消息。
    将接收到的消息原样发布到 /pong 主题。
    检测连接状态,在断开时终止运行。

  2. 通信流程
    Client 发送消息
    每秒生成时间戳,发布到 /ping 主题。
    Echo Client 接收并回显
    从 /ping 获取消息,原样转发到 /pong 主题。
    Client 接收回显
    从 /pong 获取消息,打印并进入下一轮循环。
    如果任一客户端断开连接,系统会检测并终止运行。

  3. 运行机制
    异步架构:使用 asyncio 实现非阻塞并发,提高效率。
    自动重连:客户端连接失败时,以 100ms 间隔尝试重连。
    日志输出:各组件打印关键操作(如发送/接收消息),便于调试。

  4. 适用场景
    MQTT 协议学习:演示基本的发布/订阅机制。
    设备间通信模拟:测试消息收发逻辑。
    异步编程实践:展示 asyncio 与 MQTT 的结合使用。
    系统设计简洁,便于扩展或集成到更复杂的项目中。

包含功能:
MQTT服务器
运行在本地端口10008,支持匿名访问
数据存储于内存中,无持久化
程序终止时自动关闭服务
Ping客户端
每秒发送一条时间戳消息到主题/ping
订阅主题/pong,接收并显示该主题的消息
Echo客户端
监听主题/ping,收到消息后立即将内容转发至主题/pong
实现自动应答机制

/我是分割线

python部分

step101:C:\Users\wangrusheng\PycharmProjects\FastAPIProject1\hello.py


import asyncio
import time

import mqttools

BROKER_PORT = 10008


async def start_client():
    client = mqttools.Client('localhost', BROKER_PORT, connect_delays=[0.1])
    await client.start()

    return client


async def client_main():
    """Publish the current time to /ping and wait for the echo client to
    publish it back on /pong, with a one second interval.

    """

    client = await start_client()
    await client.subscribe('/pong')

    while True:
        print()
        message = str(int(time.time())).encode('ascii')
        print(f'client: Publishing {message} on /ping.')
        client.publish(mqttools.Message('/ping', message))
        message = await client.messages.get()
        print(f'client: Got {message.message} on {message.topic}.')

        if message is None:
            print('Client connection lost.')
            break

        await asyncio.sleep(1)


async def echo_client_main():
    """Wait for the client to publish to /ping, and publish /pong in
    response.

    """

    client = await start_client()
    await client.subscribe('/ping')

    while True:
        message = await client.messages.get()
        print(f'echo_client: Got {message.message} on {message.topic}.')

        if message is None:
            print('Echo client connection lost.')
            break

        print(f'echo_client: Publishing {message.message} on /pong.')
        client.publish(mqttools.Message('/pong', message.message))


async def broker_main():
    """The broker, serving both clients, forever.

    """

    broker = mqttools.Broker(('localhost', BROKER_PORT))
    await broker.serve_forever()


async def main():
    await asyncio.gather(
        broker_main(),
        echo_client_main(),
        client_main()
    )


asyncio.run(main())


step102:运行

(.venv) PS C:\Users\wangrusheng\PycharmProjects\FastAPIProject1> python hello.py 

client: Publishing b'1744796556' on /ping.
echo_client: Got b'1744796556' on /ping.
echo_client: Publishing b'1744796556' on /pong.
client: Got b'1744796556' on /pong.

client: Publishing b'1744796557' on /ping.
echo_client: Got b'1744796557' on /ping.
echo_client: Publishing b'1744796557' on /pong.
client: Got b'1744796557' on /pong.

client: Publishing b'1744796558' on /ping.
echo_client: Got b'1744796558' on /ping.
echo_client: Publishing b'1744796558' on /pong.
client: Got b'1744796558' on /pong.

end

/我是分割线

step201:C:\Users\wangrusheng\IdeaProjects\untitled2\build.gradle

plugins {
    id 'java'
}

group = 'org.example'
version = '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    // MQTT 服务端依赖
    implementation 'io.moquette:moquette-broker:0.15'
    // MQTT 客户端依赖
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'

    testImplementation platform('org.junit:junit-bom:5.10.0')
    testImplementation 'org.junit.jupiter:junit-jupiter'
}

test {
    useJUnitPlatform()
}

step202:C:\Users\wangrusheng\IdeaProjects\untitled2\src\main\java\org\example\Main.java

package org.example;

import org.eclipse.paho.client.mqttv3.MqttException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        // 启动MQTT服务器
        startMqttServer();

        // 等待服务器初始化
        try {
            Thread.sleep(1500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // 启动客户端
        startPingClient();
        startEchoClient();
    }

    private static void startMqttServer() {
        new Thread(() -> {
            try {
                System.out.println("Starting MQTT server...");
                MqttServer.startServer();
            } catch (Exception e) {
                System.err.println("Failed to start server: ");
                e.printStackTrace();
                System.exit(1);
            }
        }).start();
    }

    private static void startPingClient() {
        new Thread(() -> {
            try {
                MqttClientHandler client = new MqttClientHandler("ping-client");
                client.connect();
                client.subscribe("/pong", 0);

                // 定时发送ping消息
                ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
                scheduler.scheduleAtFixedRate(() -> {
                    try {
                        String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
                        System.out.printf("client: Publishing %s on /ping.\n", timestamp);
                        client.publish("/ping", timestamp, 0);
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }, 0, 1, TimeUnit.SECONDS);

                // 保持线程运行
                Thread.sleep(Long.MAX_VALUE);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    private static void startEchoClient() {
        new Thread(() -> {
            try {
                MqttClientHandler echoClient = new EchoClientHandler("echo-client");
                echoClient.connect();
                echoClient.subscribe("/ping", 0);

                // 保持线程运行
                Thread.sleep(Long.MAX_VALUE);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

step203:C:\Users\wangrusheng\IdeaProjects\untitled2\src\main\java\org\example\EchoClientHandler.java

package org.example;

import org.eclipse.paho.client.mqttv3.MqttMessage;

public class EchoClientHandler extends MqttClientHandler {
    public EchoClientHandler(String clientId) {
        super(clientId);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message)   {
        System.out.printf("echo_client: Got %s on %s.\n", new String(message.getPayload()), topic);

        try {
            if ("/ping".equals(topic)) {
                String payload = new String(message.getPayload());
                System.out.printf("echo_client: Publishing %s on /pong.\n", payload);

                publish("/pong", payload, 0);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

step204:C:\Users\wangrusheng\IdeaProjects\untitled2\src\main\java\org\example\MqttServer.java

package org.example;

import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import java.io.IOException;
import java.util.Properties;

public class MqttServer {
    private static Server mqttBroker;

    public static void startServer() throws IOException {
        if (mqttBroker == null) {
            mqttBroker = new Server();
            IConfig config = new MemoryConfig(serverConfig());
            mqttBroker.startServer(config);
            System.out.println("MQTT Broker started on port 10008");
            addShutdownHook();
        }
    }

    private static Properties serverConfig() {
        Properties props = new Properties();
        props.put("port", "10008"); // 修改端口
        props.put("host", "0.0.0.0");
        props.put("allow_anonymous", "true");
        props.put("persistence_store", "memory");
        return props;
    }


    private static void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Stopping MQTT broker...");
            mqttBroker.stopServer();
            System.out.println("MQTT broker stopped");
        }));
    }
}

step205:C:\Users\wangrusheng\IdeaProjects\untitled2\src\main\java\org\example\MqttClientHandler.java

package org.example;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MqttClientHandler implements MqttCallbackExtended {
    private static final String BROKER_URL = "tcp://localhost:10008";
    private final String clientId;
    private IMqttClient client;
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    public MqttClientHandler(String clientId) {
        this.clientId = clientId;
    }

    public void connect() throws MqttException {
        client = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());
        client.setCallback(this);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);
        options.setCleanSession(true);
        options.setConnectionTimeout(30);
        client.connect(options);
    }

    // 其他方法保持相同,只需修改messageArrived的日志输出
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        System.out.printf("client: Got %s on %s.\n", new String(message.getPayload()), topic);
    }

    // 其他原有方法保持不变...




    public void publish(String topic, String content, int qos) throws MqttException {
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        client.publish(topic, message);
    }

    public void subscribe(String topic, int qos) throws MqttException {
        client.subscribe(topic, qos);
    }

    public void disconnect() throws MqttException {
        client.disconnect();
        client.close();
        scheduler.shutdown();
    }

    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("Connection lost, attempting reconnect...");
        scheduler.scheduleAtFixedRate(() -> {
            try {
                if (!client.isConnected()) {
                    client.reconnect();
                    System.out.println("Reconnected successfully");
                }
            } catch (MqttException e) {
                System.err.println("Reconnect failed: " + e.getMessage());
            }
        }, 0, 5, TimeUnit.SECONDS);
    }

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        System.out.println("Connection established: " + (reconnect ? "Reconnected" : "New connection"));
    }


    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("Message delivery confirmed");
    }


}

step206:运行

ing ':org.example.Main.main()'> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE

> Task :org.example.Main.main()
Starting MQTT server...
MQTT Broker started on port 10008
log4j:WARN No appenders could be found for logger (io.moquette.broker.Server).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connection established: New connection
Connection established: New connection
client: Publishing 1744797021 on /ping.
Message delivery confirmed
echo_client: Got 1744797021 on /ping.
echo_client: Publishing 1744797021 on /pong.
client: Publishing 1744797022 on /ping.
Message delivery confirmed
client: Got 1744797021 on /pong.
client: Publishing 1744797023 on /ping.
Message delivery confirmed
client: Publishing 1744797024 on /ping.
Message delivery confirmed
client: Publishing 1744797025 on /ping.
Message delivery confirmed
client: Publishing 1744797026 o

end

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2336539.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux——进程通信

我们知道,进程具有独立性,各进程之间互不干扰,但我们为什么还要让其联系,建立通信呢?比如:数据传输,资源共享,通知某个事件,或控制某个进程。因此,让进程间建…

【免费参会合集】2025年生物制药行业展会会议表格整理

全文精心整理, 建议今年参会前都好好收藏着,记得点赞! 医药人非常吃资源,资源从何而来?作为一名从事医药行业的工作者,可以很负责任的告诉诸位,其中非常重要的一个渠道就是会议会展! 建议所有医…

腾讯云开发+MCP:旅游规划攻略

1.登录注册好之后进入腾讯云开发 2.创建环境 4.创建好环境之后点击去开发 5.进入控制台后,选择AI,找到MCP 6.点击创建MCP Server 使用腾讯云开发创建MCP目前需要云开发入门版99/月,我没开通,所以没办法往下进行。

Sklearn入门之数据预处理preprocessing

、 Sklearn全称:Scipy-toolkit Learn是 一个基于scipy实现的的开源机器学习库。它提供了大量的算法和工具,用于数据挖掘和数据分析,包括分类、回归、聚类等多种任务。本文我将带你了解并入门Sklearn下的preprocessing在机器学习中的基本用法。 获取方式…

家用打印机性价比排名及推荐

文章目录 品牌性价比一、核心参数对比与场景适配二、技术类型深度解析三、不同场景选择 相关文章 品牌 性价比 一、核心参数对比与场景适配 兄弟T436W 优势: 微压电技术,打印头寿命长,堵头率低。 支持A4无边距和5G WiFi,适合照片…

数字电子技术基础(四十七)——使用Mutlisim软件来模拟74LS85芯片

目录 1 使用74LS85N芯片完成四位二进制数的比较 1.1原理介绍 1.2 器件选择 1.3 运行电路 2 使用74LS85N完成更多位的二进制比较 1 使用74LS85N芯片完成四位二进制数的比较 1.1原理介绍 对于74LS85 是一款 4 位数值比较器集成电路,用于比较两个 4 位二进制数&…

关于STM32创建工程文件启动文件选择

注意启动文件只要选择这几个 而不是要把所有都选上

LLC电路工作在容性区的风险

在t0时刻之前,Q6Q7导通,回路如下所示,此时A点电压是低压,B点电压是高压 在t0时刻时,谐振电流相位发生变换,在t1时刻,Q5,Q8导通,对于Q8MOS管来说,B点电压在Q6Q…

Linux Kernel 6

clone 系统调用(The clone system call) 在 Linux 中,使用 clone() 系统调用来创建新的线程或进程。fork() 系统调用和 pthread_create() 函数都基于 clone() 的实现。 clone() 系统调用允许调用者决定哪些资源应该与父进程共享&#xff0c…

【开源项目】Excel手撕AI算法深入理解(四):AlphaFold、Autoencoder

项目源码地址:https://github.com/ImagineAILab/ai-by-hand-excel.git 一、AlphaFold AlphaFold 是 DeepMind 开发的突破性 AI 算法,用于预测蛋白质的三维结构。它的出现解决了生物学领域长达 50 年的“蛋白质折叠问题”,被《科学》杂志评为…

第IV部分有效应用程序的设计模式

第IV部分有效应用程序的设计模式 第IV部分有效应用程序的设计模式第23章:应用程序用户界面的架构设计23.1设计考量23.2示例1:用于非分布式有界上下文的一个基于HTMLAF的、服务器端的UI23.3示例2:用于分布式有界上下文的一个基于数据API的客户端UI23.4要点第24章:CQRS:一种…

如何编制实施项目管理章程

本文档概述了一个项目管理系统的实施计划,旨在通过统一的业务规范和技术架构,加强集团公司的业务管控,并规范业务管理。系统建设将遵循集团统一模板,确保各单位项目系统建设的标准化和一致性。 实施范围涵盖投资管理、立项管理、设计管理、进度管理等多个方面,支持项目全生…

排序(java)

一.概念 排序:对一组数据进行从小到大/从大到小的排序 稳定性:即使进行排序相对位置也不受影响如: 如果再排序后 L 在 i 的前面则稳定性差,像图中这样就是稳定性好。 二.常见的排序 三.常见算法的实现 1.插入排序 1.1 直…

【HDFS入门】HDFS副本策略:深入浅出副本机制

目录 1 HDFS副本机制概述 2 HDFS副本放置策略 3 副本策略的优势 4 副本因子配置 5 副本管理流程 6 最佳实践与调优 7 总结 1 HDFS副本机制概述 Hadoop分布式文件系统(HDFS)的核心设计原则之一就是通过数据冗余来保证可靠性,而这一功能正是通过副本策略实现的…

智能 GitHub Copilot 副驾驶® 更新升级!

智能 GitHub Copilot 副驾驶 迎来重大升级!现在,所有 VS Code 用户都能体验支持 Multi-Context Protocol(MCP)的全新 Agent Mode。此外,微软还推出了智能 GitHub Copilot 副驾驶 Pro 订阅计划,提供更强大的…

【今日三题】添加字符(暴力枚举) / 数组变换(位运算) / 装箱问题(01背包)

⭐️个人主页:小羊 ⭐️所属专栏:每日两三题 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~ 目录 添加字符(暴力枚举)数组变换(位运算)装箱问题(01背包) 添加字符(暴力枚举) 添加字符 当在A的开头或结尾添加字符直到和B长度…

Linux——消息队列

目录 一、消息队列的定义 二、相关函数 2.1 msgget 函数 2.2 msgsnd 函数 2.3 msgrcv 函数 2.4 msgctl 函数 三、消息队列的操作 3.1 创建消息队列 3.2 获取消息队列并发送消息 3.3 从消息队列接收消息recv 四、 删除消息队列 4.1 ipcrm 4.2 msgctl函数 一、消息…

领慧立芯LHE7909可兼容替代TI的ADS1299

LHE7909是一款由领慧立芯(Legendsemi)推出的24位高精度Δ-Σ模数转换器(ADC),主要面向医疗电子和生物电势测量应用,如脑电图(EEG)、心电图(ECG)等设备。以下是…

MongoDB简单用法

图片中 MongoDB Compass 中显示了默认的三个数据库: adminconfiglocal 如果在 .env 文件中配置的是: MONGODB_URImongodb://admin:passwordlocalhost:27017/ MONGODB_NAMERAGSAAS💡 一、为什么 Compass 里没有 RAGSAAS 数据库?…

uniapp-商城-26-vuex 使用流程

为了能在所有的页面都实现状态管理,我们按照前面讲的页面进行状态获取,然后再进行页面设置和布局,那就是重复工作,vuex 就会解决这样的问题,如同类、高度提炼的接口来帮助我们实现这些重复工作的管理。避免一直在造一样的轮子。 https://vuex.vuejs.org/zh/#%E4%BB%80%E4…