Docker Compose 构建 EMQX 集群 实现mqqt 和websocket

news2025/1/7 12:26:40

EMQX 集群化管理mqqt真香

目录

#目录 /usr/emqx

容器构建

vim docker-compose.yml

version: '3'

services:
  emqx1:
    image: emqx:5.8.3
    container_name: emqx1
    environment:
    - "EMQX_NODE_NAME=emqx@node1.emqx.io"
    - "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
    - "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
      interval: 5s
      timeout: 25s
      retries: 5
    networks:
      emqx-bridge:
        aliases:
        - node1.emqx.io
    ports:
      - 1883:1883
      - 8083:8083
      - 8084:8084
      - 8883:8883
      - 18083:18083 
    # volumes:
    #   - $PWD/emqx1_data:/opt/emqx/data

  emqx2:
    image: emqx:5.8.3
    container_name: emqx2
    environment:
    - "EMQX_NODE_NAME=emqx@node2.emqx.io"
    - "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
    - "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
      interval: 5s
      timeout: 25s
      retries: 5
    networks:
      emqx-bridge:
        aliases:
        - node2.emqx.io
    # volumes:
    #   - $PWD/emqx2_data:/opt/emqx/data

networks:
  emqx-bridge:
    driver: bridge

启动

docker-compose up -d

集群状态

#查看集群状态
docker exec -it emqx1 sh -c "emqx ctl cluster status"

#验证
telnet 192.168.0.15 1883
#内网
nc -zv  192.168.0.15 1883 

#账户
admin
#默认密码
public

服务开放端口

1883,8083,8084,8883,18083

端口占用

EMQX 默认使用以下端口,请确保这些端口未被其他应用程序占用,并按照需求开放防火墙以保证 EMQX 正常运行。

端口协议描述
1883TCPMQTT over TCP 监听器端口,主要用于未加密的 MQTT 连接。
8883TCPMQTT over SSL/TLS 监听器端口,用于加密的 MQTT 连接。
8083TCPMQTT over WebSocket 监听器端口,使 MQTT 能通过 WebSocket 进行通信。
8084TCPMQTT over WSS (WebSocket over SSL) 监听器端口,提供加密的 WebSocket 连接。
18083HTTPEMQX Dashboard 和 REST API 端口,用于管理控制台和 API 接口。
4370TCPErlang 分布式传输端口,根据节点名称不同实际端口可能是 BasePort (4370) + Offset。
5370TCP集群 RPC 端口(在 Docker 环境下为 5369),根据节点名称不同实际端口可能是 BasePort (5370) + Offset。

前端js示

<!DOCTYPE html>
<html>
<head>
  <title>MQTT WebSocket Test</title>
  <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
  <script>
    // 使用提供的客户端 ID 或生成一个唯一 ID
    const clientId = 'emqx_NjI4MT2';

    // 配置 WebSocket MQTT broker 地址
    const host = 'ws://127.0.0.1:8083/mqtt';

    // MQTT 连接选项
    const options = {
      keepalive: 60, // 心跳时间间隔
      clientId: clientId,
      protocolId: 'MQTT', // 协议 ID
      protocolVersion: 5, // 使用 MQTT 5 协议
      clean: true, // 是否清除会话
      reconnectPeriod: 1000, // 重连间隔时间 (ms)
      connectTimeout: 30 * 1000, // 连接超时时间 (ms)
      username: 'admin', // 设置用户名
      password: 'public', // 设置密码
      will: {
        topic: 'pushRanking/1',
        payload: 'Connection Closed abnormally..!',
        qos: 0,
        retain: false
      },
    };

    console.log('Connecting mqtt client');

    // 连接到 MQTT Broker
    const client = mqtt.connect(host, options);

    // 连接成功回调
    client.on('connect', () => {
      console.log('Connected to MQTT broker');

      // 订阅主题 pushRanking/#,支持通配符
      client.subscribe('pushRanking/1', { qos: 0 }, (err) => {
        if (!err) {
          console.log('Subscribed to topic: pushRanking/1');
        } else {
          console.error('Failed to subscribe:', err);
        }
      });
    });

    // 处理接收到的消息
    client.on('message', (topic, message) => {
      console.log(`Received message from topic "${topic}": ${message.toString()}`);
    });

    // 连接错误回调
    client.on('error', (err) => {
      console.log('Connection error:', err);
      client.end();
    });

    // 重新连接回调
    client.on('reconnect', () => {
      console.log('Reconnecting...');
    });

    // 连接关闭回调
    client.on('close', () => {
      console.log('Connection closed');
    });

    // 模拟消息发布以测试接收
    setTimeout(() => {
      client.publish('pushRanking/1', JSON.stringify({ msg: 'hello' }), { qos: 0 });
    }, 5000);
  </script>
</body>
</html>

后端

package emqx

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "testing"
    "time"
)

func TestMQTT(t *testing.T) {
    // 创建 EMQX 客户端实例
    client := NewEMQXClient("tcp://127.0.0.1:1883", "test-client", "admin", "QfTzLy3cop9NOGWj")

    // 连接到 EMQX
    if err := client.Connect(); err != nil {
       fmt.Printf("Failed to connect: %v\n", err)
       return
    }
    defer client.Disconnect()

    // 订阅主题
    client.Subscribe("testtopic/#", 1, func(client mqtt.Client, msg mqtt.Message) {
       fmt.Printf("Message received: %s\n", msg.Payload())
    })

    // 发布消息
    client.Publish("testtopic/1", 1, false, "Hello from Golang!")

    // 保持连接一段时间以接收消息
    time.Sleep(10 * time.Second)
}

/*长连接的场景 DEMO

func main() {
    client := emqxclient.NewEMQXClient("tcp://broker.emqx.io:1883", "test-client", "", "")

    if err := client.Connect(); err != nil {
       fmt.Printf("Failed to connect: %v\n", err)
       return
    }
    // 使用 defer 确保程序退出时断开连接
    defer client.Disconnect()

    // 订阅主题
    client.Subscribe("test/topic", 1, func(client mqtt.Client, msg mqtt.Message) {
       fmt.Printf("Received message: %s\n", msg.Payload())
    })

    // 发布消息
    client.Publish("test/topic", 1, false, "Hello from Golang!")

    // 捕获退出信号
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

    fmt.Println("Running... Press Ctrl+C to exit.")
    <-signalChan
    fmt.Println("Exiting...")
}



*/

/*JS 调用如下
import mqtt from 'mqtt';

const brokerURL = 'ws://broker.emqx.io:8083/mqtt'; // WebSocket 连接地址
const clientID = `mqttjs_${Math.random().toString(16).substr(2, 8)}`;

// 创建客户端
const client = mqtt.connect(brokerURL, {
  clientId: clientID,
  username: '', // 如需要认证,填入用户名
  password: '', // 如需要认证,填入密码
});

// 连接事件
client.on('connect', () => {
  console.log('Connected to EMQX');

  // 订阅主题
  client.subscribe('test/topic', (err) => {
    if (!err) {
      console.log('Subscribed to topic: test/topic');
    } else {
      console.error('Failed to subscribe:', err);
    }
  });

  // 发布消息
  client.publish('test/topic', 'Hello from JavaScript!');
});

// 接收消息事件
client.on('message', (topic, message) => {
  console.log(`Received message on topic "${topic}": ${message.toString()}`);
});

// 错误事件
client.on('error', (err) => {
  console.error('Connection error:', err);
});







*/

common封装调用

package emqx

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

type EMQXClient struct {
    client mqtt.Client
}

// NewEMQXClient 初始化 EMQX 客户端
func NewEMQXClient(broker string, clientID string, username string, password string) *EMQXClient {
    opts := mqtt.NewClientOptions().
       AddBroker(broker).
       SetClientID(clientID).
       SetUsername(username).
       SetPassword(password).
       SetKeepAlive(60 * time.Second).
       SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
          fmt.Printf("Received message on topic: %s, message: %s\n", msg.Topic(), msg.Payload())
       }).
       SetPingTimeout(1 * time.Second)

    client := mqtt.NewClient(opts)
    return &EMQXClient{client: client}
}

// Connect 连接到 EMQX
func (c *EMQXClient) Connect() error {
    token := c.client.Connect()
    if token.Wait() && token.Error() != nil {
       return token.Error()
    }
    fmt.Println("Connected to EMQX broker")
    return nil
}

// Publish 发布消息
func (c *EMQXClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
    token := c.client.Publish(topic, qos, retained, payload)
    token.Wait()
    return token.Error()
}

// Subscribe 订阅主题
func (c *EMQXClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) error {
    token := c.client.Subscribe(topic, qos, callback)
    token.Wait()
    return token.Error()
}

// Unsubscribe 取消订阅
func (c *EMQXClient) Unsubscribe(topics ...string) error {
    token := c.client.Unsubscribe(topics...)
    token.Wait()
    return token.Error()
}

// Disconnect 断开连接
func (c *EMQXClient) Disconnect() {
    c.client.Disconnect(250)
    fmt.Println("Disconnected from EMQX broker")
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2270437.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

067B-基于R语言平台Biomod2模型的物种分布建模与数据可视化-高阶课程【2025】

课程培训包含&#xff1a;发票全套软件脚本学习数据视频文件导师答疑 本教程旨在通过系统的培训学习&#xff0c;学员可以掌握Biomod2模型最新版本的使用方法&#xff0c;最新版包含12个模型&#xff08;ANN, CTA, FDA, GAM, GBM, GLM, MARS, MAXENT, MAXNET, RF, SRE, XGBOOST…

USB 中断传输的 PID 序列

文章目录 中断传输的 PID 序列全速设备抓包高速设备抓包参考中断传输的 PID 序列 端点在初始化后,从 DATA0 开始,每成功执行一个事务,数据包序列翻转一次(从 DATA0 变为DATA1 或从 DATA1 变为 DATA0)。 数据翻转和传输的个数没有直接关系,只由端点在初始化后处理的总数决…

ESP32物联网无线方案,智能穿戴设备联网通信,产品无线交互应用

在物联网的世界里&#xff0c;每一个设备都不再是孤立的个体&#xff0c;它们通过无线连接芯片相互连接&#xff0c;形成一个庞大的智能网络。这些芯片是实现万物互联的基础&#xff0c;它们使得设备能够相互沟通&#xff0c;共享数据&#xff0c;从而创造出无限的可能性。 这…

C语言格式输出

1.转换字符说明&#xff1a; 2.常用的打印格式&#xff1a; 在 C 语言中&#xff0c;格式输出主要依靠 printf 函数来实现。以下是一些 C 语言格式输出的代码举例及相关说明。 printf("%2d"&#xff0c;123)&#xff0c;因为输出的部分有三位数&#xff0c;但是要求…

GJB系统设计说明模板

GJB系统设计说明模板及详解 1 范围 1.1 标识 1.2 系统概述 1.3 文档概述 2 引用文档 GJB XXX XXX XXX&#xff1b; XXX XXX。 前2章通用不再赘述 3 系统级设计决策 系统设计决策的目的:对系统规格说明中的关键需求(包括功能、质量属性和设计约束)进行分析,得到系统级概念性架构…

windows编译llama.cpp GPU版本

Build 指南 https://github.com/ggerganov/llama.cpp/blob/master/docs/build.md 一、Prerequire 具体步骤&#xff08;以及遇到的坑&#xff09;&#xff1a; 如果你要使用CUDA&#xff0c;请确保已安装。 1.安装 最新的 cmake, git, anaconda&#xff0c; pip 配置pyt…

Unity WebGL 部署IIS

Unity WebGL 部署IIS iis添加网站WebGL配置文件WebGL Gzip模式浏览器加载速度优化iis添加网站 第一步在配置好IIS并且添加网站 WebGL配置文件 在web包Build文件夹同级创建web.config文件 web.config文件内容 <?xml version="1.0" encoding="UTF-8"?…

职场常用Excel基础03-自定义排序

大家好&#xff0c;今天和大家一起分享一下excel中的自定义排序~ 通过排序&#xff0c;用户可以快速地对表格中的数据进行整理&#xff0c;以便更直观地观察趋势、查找特定信息或为后续的数据分析做准备。除了标准的升序和降序排序外&#xff0c;Excel还提供了强大的自定义排序…

每天40分玩转Django:Django类视图

Django类视图 一、知识要点概览表 类别知识点掌握程度要求基础视图View、TemplateView、RedirectView深入理解通用显示视图ListView、DetailView熟练应用通用编辑视图CreateView、UpdateView、DeleteView熟练应用Mixin机制ContextMixin、LoginRequiredMixin理解原理视图配置U…

PgSQL如何用cmd命令行备份和还原数据库

一、备份 备份为压缩的二进制格式&#xff08;通常更快且占用空间更少&#xff09; pg_dump -U username -Fc -h hostname -p port -d dbname -F p -f backup.sql-U username&#xff1a;指定连接数据库的用户名&#xff08;默认是 postgres&#xff09;。-Fc&#xff1a;备…

QT-------------多线程

实现思路 QThread 类简介&#xff1a; QThread 是 Qt 中用于多线程编程的基础类。可以通过继承 QThread 并重写 run() 方法来创建自定义的线程逻辑。新线程的执行从 run() 开始&#xff0c;调用 start() 方法启动线程。 掷骰子的多线程应用程序&#xff1a; 创建一个 DiceThre…

VBA批量插入图片到PPT,一页一图

Sub InsertPicturesIntoSlides()Dim pptApp As ObjectDim pptPres As ObjectDim pptSlide As ObjectDim strFolderPath As StringDim strFileName As StringDim i As Integer 设置图片文件夹路径strFolderPath "C:\您的图片文件夹路径\" 请替换为您的图片文件夹路径…

当知识图谱遇上文本智能处理,会擦出怎样的火花?

目前以理解人类语言为入口的认知智能成为了人工智能发展的突破点&#xff0c;而知识图谱则是迈向认知智能的关键要素。达观数据在2018AIIA人工智能开发者大会承办的语言认知智能与知识图谱公开课上&#xff0c;三位来自企业和学术领域的专家分别从不同角度讲述的知识图谱的应用…

数据挖掘——回归算法

数据挖掘——回归算法 回归算法线性回归最小二乘法优化求解——梯度下降法逻辑回归逻辑回归函数逻辑回归参数估计逻辑回归正则化 决策树回归小结 回归算法 回归分析 如果把其中的一些因素&#xff08;房屋面积&#xff09;作为自变量&#xff0c;而另一些随自变量的变化而变化…

Ubuntu 24.04 LTS 解决网络连接问题

1. 问题描述 现象&#xff1a;ens33 网络接口无法获取 IPv4 地址&#xff0c;导致网络不可用。初步排查&#xff1a; 运行 ip a&#xff0c;发现 ens33 接口没有分配 IPv4 地址。运行 ping www.baidu.com&#xff0c;提示“网络不可达”。查看 NetworkManager 日志&#xff0c…

spring-boot启动源码分析(二)之SpringApplicationRunListener

在上一篇《spring-boot启动源码分析&#xff08;一&#xff09;之SpringApplication实例构造》后&#xff0c;继续看了一个月的Spring boot启动源码&#xff0c;初步把流程看完了&#xff0c;接下来会不断输出总结&#xff0c;以巩固这段时间的学习。同时也希望能帮到同样感兴趣…

基于N-HiTS神经层次插值模型的时间序列预测——cross validation交叉验证与ray tune超参数优化

论文链接&#xff1a;https://arxiv.org/pdf/2201.12886v3 N-HiTS: Neural Hierarchical Interpolation for TimeSeries Forecasting \begin{aligned} &\text{\large \color{#CDA59E}N-HiTS: Neural Hierarchical Interpolation for TimeSeries Forecasting}\\ \end{aligne…

Lumos学习王佩丰Excel第二十三讲:Excel图表与PPT

一、双坐标柱形图的补充知识 1、主次坐标设置 2、主次坐标柱形避让&#xff08;通过增加两个系列&#xff0c;挤压使得两个柱形挨在一起&#xff09; 增加两个系列 将一个系列设置成主坐标轴&#xff0c;另一个设成次坐标轴 调整系列位置 二、饼图美化 1、饼图美化常见设置 …

YK人工智能(三)——万字长文学会torch深度学习

2.1 张量 本节主要内容&#xff1a; 张量的简介PyTorch如何创建张量PyTorch中张量的操作PyTorch中张量的广播机制 2.1.1 简介 几何代数中定义的张量是基于向量和矩阵的推广&#xff0c;比如我们可以将标量视为零阶张量&#xff0c;矢量可以视为一阶张量&#xff0c;矩阵就是…

Java基于SpringBoot的甘肃非物质文化网站的设计与实现,附源码

博主介绍&#xff1a;✌Java老徐、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&…