使用java +paho mqtt编写模拟发布温度及订阅的过程

news2024/11/28 8:34:56
  • 启动mqtt 服务
  •  创建项目,在项目中添加模块
  •  
  •  
  • 添加文件夹
    • 添加maven依赖
  •     <dependencies>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.0</version>
            </dependency>
    
    
        </dependencies>
    • 编写订阅程序  名字没起好 后面有时间再调整
  • import org.eclipse.paho.client.mqttv3.IMqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class EngineTemperatureSensor implements Callable<Void> {
    
        // ... private members omitted
        IMqttClient client;
        public static final String TOPIC = "testTopic1/003";
    
        public EngineTemperatureSensor(IMqttClient client) {
            this.client = client;
        }
    
        @Override
        public Void call() throws Exception {
            if ( !client.isConnected()) {
                return null;
            }
            CountDownLatch receivedSignal = new CountDownLatch(10);
            client.subscribe("testTopic1/003", (topic, msg) -> {
                byte[] payload = msg.getPayload();
                // ... payload handling omitted
                //print out the message
                System.out.println("Received message: " + new String(payload));
                receivedSignal.countDown();
            });
            receivedSignal.await(1, TimeUnit.MINUTES);
    
            //print out the message
            System.out.println("Published message:2222222222222 " );
    
    
    
            return null;
        }
    
    }
  • 订阅:

  • import org.eclipse.paho.client.mqttv3.IMqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class EngineTemperatureSensor implements Callable<Void> {
    
        // ... private members omitted
        IMqttClient client;
        public static final String TOPIC = "testTopic1/003";
    
        public EngineTemperatureSensor(IMqttClient client) {
            this.client = client;
        }
    
        @Override
        public Void call() throws Exception {
            if ( !client.isConnected()) {
                return null;
            }
            CountDownLatch receivedSignal = new CountDownLatch(10);
            client.subscribe("testTopic1/003", (topic, msg) -> {
                byte[] payload = msg.getPayload();
                // ... payload handling omitted
                //print out the message
                System.out.println("Received message: " + new String(payload));
                receivedSignal.countDown();
            });
            receivedSignal.await(1, TimeUnit.MINUTES);
    
            //print out the message
            System.out.println("Published message:2222222222222 " );
    
    
    
            return null;
        }
    
    }

import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

public class c5M {

    //main5
    public static void main(String[] args) {
        System.out.println("Hello World");

        String publisherId = UUID.randomUUID().toString();
        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {
            IMqttClient subscriber = new MqttClient("tcp://127.0.0.1:1883", publisherId);

            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(true);
            options.setCleanSession(true);
            options.setConnectionTimeout(10);
            subscriber.connect(options);

            // 调用EngineTemperatureSensor
            EngineTemperatureSensor sensor = new EngineTemperatureSensor(subscriber);
            executor.submit(sensor); // 提交任务,但不阻塞主线程



            // 这里可以添加代码来等待用户输入或者其他信号来安全地关闭程序
            // 例如,你可以使用System.in.read()来等待用户输入
            System.out.println("Press Enter to exit...");
            new Scanner(System.in).nextLine(); // 等待用户输入

        } catch (Exception e) {
            //print e message
            //print seperator line
            System.out.println("))))))))))))))))))))))))");

            System.out.println(e.getMessage());
            throw new RuntimeException(e);

        } finally {
            // 确保最后关闭ExecutorService和MQTT客户端
            executor.shutdown(); // 提交的任务将不再被接受
            try {
                // 等待任务完成(可选,取决于你是否需要确保所有任务都完成)
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    executor.shutdownNow(); // 取消正在执行的任务
                }
            } catch (InterruptedException ie) {
                executor.shutdownNow(); // 当前线程被中断,需要关闭ExecutorService
                Thread.currentThread().interrupt(); // 保留中断状态
            }
            // 关闭MQTT客户端(如果有必要的话)
            // 注意:这里可能需要额外的逻辑来处理MQTT客户端的关闭,具体取决于你的实现
        }

    }

}

发布代码:

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class EngineTemperatureSensor implements Callable<Void> {

    // ... private members omitted
    IMqttClient client;
    public static final String TOPIC = "testTopic1/003";

    public EngineTemperatureSensor(IMqttClient client) {
        this.client = client;
    }

    @Override
    public Void call() throws Exception {
        if ( !client.isConnected()) {
            return null;
        }
        Random rnd = null;
        //double temp =  80 + rnd.nextDouble() * 20.0;
        double temp =  10 + 1.1 * 20.0;
        byte[] payload = String.format("T:%04.2f",temp)
                .getBytes();
        MqttMessage msg2= new MqttMessage(payload);

        msg2.setQos(0);
        msg2.setRetained(true);
        client.publish(TOPIC,msg2);

        //print out the message
        System.out.println("Published message: " + msg2);



        return null;
    }

}

 

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class mainc3 {

    // Main method
    public static void main(String[] args) {
        System.out.println("Hello World");

        String publisherId = UUID.randomUUID().toString();
        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {
            IMqttClient publisher = new MqttClient("tcp://127.0.0.1:1883", publisherId);

            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(true);
            options.setCleanSession(true);
            options.setConnectionTimeout(10);
            publisher.connect(options);

            // 调用EngineTemperatureSensor
            EngineTemperatureSensor sensor = new EngineTemperatureSensor(publisher);
            executor.submit(sensor); // 提交任务,但不阻塞主线程



            // 这里可以添加代码来等待用户输入或者其他信号来安全地关闭程序
            // 例如,你可以使用System.in.read()来等待用户输入
            System.out.println("Press Enter to exit...");
            new Scanner(System.in).nextLine(); // 等待用户输入

        } catch (Exception e) {
            //print e message
            //print seperator line
            System.out.println("))))))))))))))))))))))))");

            System.out.println(e.getMessage());
            throw new RuntimeException(e);

        } finally {
            // 确保最后关闭ExecutorService和MQTT客户端
            executor.shutdown(); // 提交的任务将不再被接受
            try {
                // 等待任务完成(可选,取决于你是否需要确保所有任务都完成)
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    executor.shutdownNow(); // 取消正在执行的任务
                }
            } catch (InterruptedException ie) {
                executor.shutdownNow(); // 当前线程被中断,需要关闭ExecutorService
                Thread.currentThread().interrupt(); // 保留中断状态
            }
            // 关闭MQTT客户端(如果有必要的话)
            // 注意:这里可能需要额外的逻辑来处理MQTT客户端的关闭,具体取决于你的实现
        }





    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1851500.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

提升研发效率:三品PLM解决方案在汽车汽配行业的实践

随着全球汽车市场的快速发展&#xff0c;中国汽车汽配行业迎来了前所未有的发展机遇。然而&#xff0c;在这一过程中&#xff0c;企业也面临着诸多挑战&#xff0c;如研发能力的提升、技术资料管理的复杂性、以及跨部门协作的困难等。为了应对这些挑战&#xff0c;三品产品生命…

90V转12V1A恒压WT6039

90V转12V1A恒压WT6039 WT6039降压DC-DC转换器芯片专为处理宽泛的电压输入范围设计&#xff0c;支持从12V至90V。该芯片集成了关键功能&#xff0c;如使能控制开关、参考电源、误差放大器、过热保护、限流保护及短路保护等&#xff0c;以确保系统在各种操作条件下的安全与稳定性…

NSSCTF-Web题目14

目录 [CISCN 2019华东南]Web11和[NISACTF 2022]midlevel 1、题目 2、知识点 3、思路 [HDCTF 2023]SearchMaster 1、题目 2、知识点 3、思路 [CISCN 2019华东南]Web11和[NISACTF 2022]midlevel 这两道题目一样 1、题目 2、知识点 SSTI&#xff08;服务端模板注入漏洞&…

攻防世界-2-1

下载附件&#xff0c;发现是一张损坏的png文件&#xff0c;扔winhex里面修改文件头 修改之后发现还是打不开&#xff0c;提示CRC错误&#xff0c;脚本跑一下 循环冗余校验CRC&#xff08;Cyclic Redundancy Check&#xff09;是数据通信领域常用的一种数据传输检错技术。通过在…

企业级Web项目中应该如何做单元测试、集成测试和功能测试?

先自我介绍下&#xff1a; 本人有过10年测试经验&#xff0c;也参与过公安部网络安全产品测试交付、华为4G 网络设备测试交付、腾讯QQ空间APP产品测试交付。 关于“企业级Web项目中应该如何做单元测试、集成测试和功能测试”这个问题&#xff0c;我想给大家唠唠&#xff0c;我…

kafka(五)spring-kafka(2)详解与demo

一、简单的收发消息demo 父工程pom&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation&qu…

外包IT运维解决方案

随着企业信息化进程的不断深入&#xff0c;IT系统的复杂性和重要性日益增加。高效的IT运维服务对于保证业务连续性、提升企业竞争力至关重要。外包IT运维解决方案通过专业的服务和技术支持&#xff0c;帮助企业降低运维成本、提高运维效率和服务质量。 本文结合《外包IT运维解…

Go语言的诞生背景

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 目录 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌…

数字营销新玩法:拓新与裂变的完美结合

在当今这个飞速发展的数字化时代&#xff0c;数字营销已经成为了企业发展中至关重要的一环。拓新&#xff0c;简单来说就是不断去开拓新的客户群体&#xff0c;让更多的人了解并接触到我们的产品或服务。要做到这一点&#xff0c;那可得充分利用各种线上渠道。像热闹非凡的社交…

设计模式(七)创建者模式之建造者模式

这里写目录标题 概述需求需求类图BikeBuilderMobikeBuilderOfoBuilderDirectorClientClient优缺点使用场景 模式扩展ComputerClient创建者模式对比工厂方法模式VS建造者模式抽象工厂模式VS建造者模式 总结 概述 建造者模式又叫生成器模式&#xff0c;是一种对象构建模式。它可…

threejs视频融合 webgl

threejs三维视频融合 let objList []; const clock new THREE.Clock(); const container document.getElementById( container );const stats new Stats(); container.appendChild( stats.dom );const renderer new THREE.WebGLRenderer( { antialias: true } ); rendere…

Day1:二分查找704 移除元素27

题目链接704. 二分查找 - 力扣&#xff08;LeetCode&#xff09; int search(vector<int>& nums, int target) {int left 0;int right nums.size() - 1;int mid (right - left) / 2;while (left < right){if (target nums[mid]){return mid;}else if (target …

代码阅读器--Understand

代码阅读器--Understand 1 介绍2 安装步骤2.1 下载连接2.2 正常安装&#xff0c;设置自己的安装路径2.3 修改 understand.exe&#xff0c;搜索"areYouThere" &#xff0c; 用"IamNotHere!" 替代2.4 字节序替换 3 使用参考 1 介绍 Understand 的强大不言而…

用于制作耳机壳的UV树脂耳机壳UV胶价格高不高?

用于制作耳机壳的UV树脂耳机壳UV胶价格高不高&#xff1f; 制作耳机壳的UV树脂价格相对于一些其他材料可能会略高&#xff0c;但具体的价格取决于多个因素&#xff0c;如品牌、型号、质量等。一些高端的UV树脂品牌和型号可能会价格较高&#xff0c;但它们也通常具有更好的性能…

无法打开微软商店

今天给大家讲一下我在使用win11系统时遇到的一些问题&#xff0c;希望对出现类似情况的人有所帮助。 首先就是微软商店打不开的问题。相信许多windows系统的用户都会碰到此问题。我在打开时会出 现一直转圈的界面&#xff0c;在网上找了一些方法&#xff0c;但都没什么用处。…

提前还贷有“坑”?房产抵押经营贷避坑指南(十六大常见问题)

一、自己去银行还是找专人办理&#xff1f; 贷款这事儿&#xff0c;说起来容易&#xff0c;办起来可不简单。银行的大门敞开&#xff0c;但门槛却不低。很多人觉得自己资质不错&#xff0c;结果一申请才发现&#xff0c;条件这儿那儿都不符合&#xff0c;最后搞得心力交瘁&…

聚类算法(2)--- ISODATA算法

本篇文章是博主在人工智能等领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对人工智能等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅解。文章分类在AI学习笔记&#…

【金】02Y90-60 大数据-HivetoMysQL

1、安装 Java 程序&#xff08;jdk&#xff09; 2、添加以下JAR包 3、确认配置成自己的数据库 ....

程序员为什么不能一次性写好,需要一直改Bug?

程序员在编写代码时不能一次性写好&#xff0c;而是需要不断修改Bug&#xff0c;这主要是由几个因素导致的&#xff1a; 复杂性&#xff1a;软件开发是一个高度复杂的过程&#xff0c;涉及到多个模块、功能、逻辑和数据的交互。即使是最有经验的程序员&#xff0c;也很难一次性…

[最新教程]Claude Sonnet 3.5注册方法详细步骤分享,新手小白收藏,文末免费送已注册的Claude账号

一.Claude sonnet 3.5大模型面世 6月21日&#xff0c;被称为“OpenAI 最强竞对”的大模型公司 Anthropic 发布了 Claude 3.5 系列模型中的第一个版本——Claude 3.5 Sonnet。 Anthropic 在官方博客中表示&#xff0c;Claude 3.5 Sonnet 提高了智能化的行业标准&#xff0c;在…