【MQTT】mqtt 服务器部署--go 生产和消费demo

news2024/9/24 15:28:39

1. 背景

Golang 是谷歌开发的一种静态强类型、编译、并发和垃圾收集编程语言。围棋富有表现力,干净,高效。它的并发机制使得编写最大限度地利用多核和网络机器的程序变得容易,它的创新类型系统使得灵活和模块化的程序构造成为可能。Go 可以快速编译成机器代码,但具有垃圾收集的便利性和运行时反射的强大功能。它是一个快速的、静态类型的、编译语言的,就像一个动态类型的、直译语言的。

MQTT 是一种基于发布/订阅模型的轻量级物联网消息传递协议,它只需要很少的代码和带宽,就可以为物联网设备提供实时可靠的消息传递服务。它适用于硬件资源有限的设备和带宽有限的网络环境。因此,MQTT 协议广泛应用于物联网、移动互联网、物联网、电力等行业。源码GitHub - emqx/emqx-rel: Release Project for EMQ X Broker prior to 4.3. Newer releases are built here: https://github.com/emqx/emqx

本文主要介绍如何在 Golang 项目中使用 paho.MQTT.Golang 客户端库,并实现客户端与 MQTT 代理之间的连接、订阅和消息传递。

2.  设计原理

 3. 服务部署

方法一:命令安装Installation | EMQX 3.0 Documentation

  • apt-get install lksctp-tools
  • curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
  • sudo apt-get install emqx

 方法二:源码编译

  • $ git clone https://github.com/emqx/emqx-rel.git emqx-rel
    $ cd emqx-rel
    $ git checkout $(git describe --tags $(git rev-list --tags --max-count=1))
    $ make emqx-pkg
    $ ls _packages/emqx

 

 4. go 生产消费源码

common.go 

package main

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

var broker = "你的broker ip"
var port = 1883
var userName = "emqx"
var passwd = "你的密码"
var topic = "topic/test"

func sub(client mqtt.Client, producer bool) {
	token := client.Subscribe(topic, 1, nil)
	token.Wait()
	if producer {
		fmt.Printf("Producer subscribed to topic %s", topic)
	} else {
		fmt.Printf("Consumer subscribed to topic %s", topic)
	}
}

producer.go

package main

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("Producer Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	fmt.Printf("Connect lost: %v", err)
}

func producerPoint() {
	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
	opts.SetClientID("go_mqtt_producer")
	opts.SetUsername(userName)
	opts.SetPassword(passwd)
	opts.SetKeepAlive(8 * time.Second)
	opts.SetDefaultPublishHandler(messagePubHandler)
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	sub(client, true)
	publish(client)

	time.Sleep(30 * time.Second)
	client.Disconnect(250)
}

func publish(client mqtt.Client) {
	num := 10
	for i := 0; i < num; i++ {
		text := fmt.Sprintf("Message %d", i)
		token := client.Publish(topic, 0, false, text)
		token.Wait()
		time.Sleep(time.Second)
	}
}

consumer.go

package main

import (
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"time"
)

var messageRecHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("Clenit Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

// 其实consumer既可以收到消息,也可以发送消息
// 作为互联网硬件收集器,采集的环境信息数据(温度、湿度等)发送到broker
// 作为互联网硬件执行器,可以接受broker的消息(执行指令信息,如显示文字、声音等),并根据消息执行硬件行为

func consumerPoint() {
	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
	opts.SetClientID("go_mqtt_consumer")
	opts.SetUsername(userName)
	opts.SetPassword(passwd)
	opts.SetKeepAlive(8 * time.Second)
	opts.SetDefaultPublishHandler(messageRecHandler)
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	sub(client, false)
	time.Sleep(30 * time.Second)
	client.Disconnect(250)
}

main.go

package main

import "time"

func main() {
	go consumerPoint()
	go producerPoint()
	time.Sleep(30 * time.Second)
}

结果:

 5. 小结

  1. 其实consumer既可以收到消息,也可以发送消息
  2. 作为互联网硬件收集器,采集的环境信息数据(温度、湿度等)发送到broker
  3. 作为互联网硬件执行器,可以接受broker的消息(执行指令信息,如显示文字、声音等),并根据消息执行硬件行为

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/98125.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【博客561】利用隧道和conntrack实现NAT网关

利用隧道和conntrack实现NAT网关 场景&#xff1a;实现一个NAT网关来转发其它node的出外网流量 如&#xff1a;图中的2节点充当NAT网关来转发1的出外网流量 利用隧道和conntrack实现NAT网关 节点ip&#xff1a; node1是172.17.158.48&#xff0c;node2是172.17.158.46 1、配…

Clipper库 | 类型和填充规则

裁剪类型(ClipType) CT_INTERSECTION 0 CT_UNION 1 CT_DIFFERENCE 2 CT_XOR 3交集&#xff0c;AND (intersection) &#xff1a;主体和裁剪多边形相交的区域。并集&#xff0c;OR (union) - 主体和裁剪多边形两者合并的区域。非/差&#xff0c;NOT (difference) - 裁剪多边…

net/http 库的客户端实现(上)

前言 Go语言标准库 net/http 是一个非常强大的标准库&#xff0c;使得构建 HTTP 请求和编写 Web 服务器的工作变得非常简单。 我们来看看是他是如何实现客户端和服务端的。 使用示例 假设本地有一个GET方法的HTTP接口&#xff0c;响应 Hello World&#xff01; 使用 net/ht…

Cesium:实时数据渲染性能优化与内存泄漏问题分析

在基于Cesium.js三维可视化开发框架,采用“轮询”策略,实现单车点位数据的实时渲染的demo示例,线上部署完毕之后,竟发现出现了“内存泄漏”的问题。思前想后,反复调试,然而并没有找到引发泄露的根源所在,最后偶然间在《JavaScript高级程序设计(第4版)》中看到了问题的答…

HashTable源码解析

HashTable源码解析 简介 HashTable 是一个古老的&#xff08;JDK1.0 时就已存在&#xff09;线程安全的容器&#xff0c;其核心方法都是 synchronized 修饰的。 相反 HashMap 不是线程安全的。 HashTable与HashMap对比 二者继承体系 HashTable HashMap 从图中可以对比得出…

零基础的小白如何学习编程,该怎么入手学习?

零基础的小白如何学习编程&#xff0c;该怎么入手学习&#xff1f;这是一个被问烂透而有很有趣的话题了。听到这个问题时&#xff0c;小编的第一反应就是要弄清楚对方为什么要学习编程&#xff0c;这是一个很好地起点&#xff0c;清楚自己想要什么&#xff0c;才能去努力实现。…

【JY】 ABAQUS子程序UEL的有限元原理与应用

不等待即关注【简述ABAQUS中UEL子程序】ABAQUS作为成熟的商用有限元软件&#xff0c;可为高级用户提供特定的分析需求。ABAQUS常见的二次开发子程序包括&#xff1a;UMAT、VUMAT、UGENS、UEL和VUEL等。其中UEL/VUEL分别适用于ABAQUS的Standard/Explicit求解器。只有清楚有限元分…

零基础怎么学Python编程,新手常犯哪些错误?

Python是人工智能时代最佳的编程语言&#xff0c;入门简单、功能强大&#xff0c;深获初学者的喜爱。 很多零基础学习Python开发的人都会忽视一些小细节&#xff0c;进而导致整个程序出现错误。下面就给大家介绍一下Python开发者常犯的几个错误。 1、错误的使用变量。 在Pyt…

华为网工入门之eNSP小实验(5)--VLAN间相互通信的三种方法

VLAN间相互通信 实际网络部署中一般会将不同IP地址段划分到不同的VLAN。同VLAN且同网段的PC之间可直接进行通信&#xff0c;无需借助三层转发设备&#xff0c;该通信方式被称为二层通信。VLAN之间需要通过三层通信实现互访&#xff0c;三层通信需借助三层设备(路由器,三层交换…

高可用系列文章之二 - 传统分层架构技术方案

前文链接 高可用系列文章之一 - 概述 - 东风微鸣技术博客 (ewhisper.cn) 三 技术方案 3.1 概述 单点是系统高可用最大的风险和敌人&#xff0c;应该尽量在系统设计的过程中避免单点。 保障系统的高可用, 方法论上&#xff0c;高可用保证的原则是「集群化」(或 「冗余」), …

LeetCode HOT 100 —— 312.戳气球

题目 有 n 个气球&#xff0c;编号为0 到 n - 1&#xff0c;每个气球上都标有一个数字&#xff0c;这些数字存在数组 nums 中。 现在要求你戳破所有的气球。戳破第 i 个气球&#xff0c;你可以获得 nums[i - 1] * nums[i] * nums[i 1] 枚硬币。 这里的 i - 1 和 i 1 代表和 i…

别只关注chatGPT能不能写论文了,它还支持49中场景,代码都给你写好了,速领

简介 chatGPT最近非常不稳定&#xff0c;访问一不小心就出现了网络错误&#xff0c;根本就不能很好的使用。那么我们该怎么办呢&#xff1f;勇哥给大家想到了一个种办法&#xff0c;就是用程序去调用openapi的接口&#xff0c;这个接口虽然是收费的&#xff0c;但是可免费使用…

linux下源码编译cloudcompare(解决无法加载pcd文件的问题)

cloudcompare是一款点云处理软件&#xff0c;里面有很多算法&#xff0c;值得大家学习研究。 下面介绍linux下源码编译cloudcompare的方法。 1.安装依赖&#xff1a; sudo apt-get install doxygen sudo apt install cmake-curses-gui2.下载&#xff1a; git clone --recurs…

Qt之天气预报——界面优化篇(含源码+注释)

一、界面优化效果 下方为界面优化完成和优化前的效果对比。 优化前&#xff1a; 优化后&#xff1a; 二、优化内容 添加标题栏添加图片&#xff08;图图标素材源自阿里巴巴矢量图标库&#xff09;更新UI内容&#xff08;微调大小、布局比例&#xff09;添加鼠标事件函数&…

Java 教程

Java 教程 Java 是由 Sun Microsystems 公司于 1995 年 5 月推出的高级程序设计语言。 Java 可运行于多个平台&#xff0c;如 Windows, Mac OS 及其他多种 UNIX 版本的系统。 本教程通过简单的实例将让大家更好的了解 Java 编程语言。 移动操作系统 Android 大部分的代码采用…

RepVGG:一个结构重参数化网络

本文来自公众号“AI大道理” ResNet、DenseNet 等复杂的多分支网络可以增强模型的表征能力&#xff0c;使得训练效果更好。但是多分支的结构在推理的时候效率严重不足。 看起来二则不可兼得。 能否两全其美&#xff1f; RepVGG通过结构重参数化的方法&#xff0c;在训练的时候…

2022 年 Kubernetes 高危漏洞盘点

2022 年&#xff0c;Kubernetes继续巩固自己作为关键基础设施领域的地位。从小型到大型组织&#xff0c;它已成为广受欢迎的选择。出于显而易见的原因&#xff0c;这种转变使 Kubernetes 更容易受到攻击。但这还没有结束&#xff0c;开发人员通常将Kubernetes 部署与其他云原生…

【2022.12.18】备战春招Day13——每日一题 + 234. 回文链表 + 139. 单词拆分

【每日一题】1703. 得到连续 K 个 1 的最少相邻交换次数 题目描述 给你一个整数数组 nums 和一个整数 k 。 nums 仅包含 0 和 1 。每一次移动&#xff0c;你可以选择 相邻 两个数字并将它们交换。 请你返回使 nums 中包含 k 个 连续 1 的 最少 交换次数 输入&#xff1a;nums …

【数据结构】堆(二)——堆排序、TOP-K问题

作者&#xff1a;一个喜欢猫咪的的程序员 专栏&#xff1a;《数据结构》 喜欢的话&#xff1a;世间因为少年的挺身而出&#xff0c;而更加瑰丽。 ——《人民日报》 目录 堆排序&#xff1a;&#xff08;以小堆为例&#xff09; Heapsort函数…

C语言重点解剖关键字要点速记

1.在windows中&#xff0c;双击的本质是运行该程序&#xff0c;就是将程序(.exe)加载到内存当中去。任何程序在被运行之前都必须加载到内存当中去。 2.所有的变量本质都是在内存的某个位置开辟的。变量不能定义在硬盘上&#xff0c;因为变量必须在程序运行的时候才能被开辟&am…