029-从零搭建微服务-消息队列(一)

news2025/1/22 8:53:16

写在最前

如果这个项目让你有所收获,记得 Star 关注哦,这对我是非常不错的鼓励与支持。

源码地址(后端):mingyue: 🎉 基于 Spring Boot、Spring Cloud & Alibaba 的分布式微服务架构基础服务中心

源码地址(前端):mingyue-ui: 🎉 基于 Vue3 + TS + Vite + Element plus 等技术,适配 MingYue 后台微服务

文档地址:Wiki - Gitee.com

消息队列

消息队列(Message Queue)是一种用于在分布式系统中进行异步通信的通信模式和技术。它允许不同的组件或服务之间通过发送和接收消息来进行通信,而无需直接耦合它们的实现细节。消息队列通常用于解耦系统的不同部分,提高系统的可伸缩性、可靠性和灵活性。

以下是消息队列的一些关键特点和概念:

  1. 消息生产者(Producer): 这是向消息队列发送消息的组件或应用程序。生产者将消息发送到队列中,通常包括一些有关消息内容的元数据。

  2. 消息队列(Queue): 这是用于存储消息的中间件组件,消息在这里排队等待被处理。消息队列通常支持不同的消息传递模式,例如先进先出(FIFO)或发布/订阅模式。

  3. 消息消费者(Consumer): 这是从消息队列接收消息并进行处理的组件或应用程序。消费者订阅特定队列,并在有新消息可用时接收并处理它们。

  4. 消息代理(Message Broker): 这是协调消息的发送和接收的中间件服务。消息代理通常负责消息的路由、传递和确保消息的可靠性。

  5. 消息确认(Acknowledgment): 消费者在成功处理消息后,通常会向消息队列发送确认,以告知队列消息已被处理。这确保了消息不会被重复处理。

  6. 消息持久性(Message Durability): 消息队列通常支持消息的持久性,这意味着即使在消息被传递给消费者之后,消息仍然会在系统中存储,以确保不会丢失。

  7. 消息超时(Message Timeout): 有时候,消息队列会设置消息的超时时间,以确保消息在一定时间内被处理,否则可能会被认为是过期消息。

  8. 发布/订阅模式(Publish/Subscribe): 这是一种消息传递模式,其中生产者将消息发布到一个主题(topic),而不是特定的队列,然后多个消费者订阅该主题以接收消息。这种模式支持广播消息。

使用场景

  • 异步通信:允许不同的系统组件异步通信,提高系统的响应性能。

  • 解耦组件:降低系统中不同组件之间的耦合,使得系统更容易维护和扩展。

  • 负载均衡:通过分发消息给多个消费者来平衡工作负载。

  • 消息传递可靠性:确保消息的可靠传递,即使在系统中的故障情况下也能保证不丢失消息。

  • 日志和审计:用于记录和审计系统活动,以便后续分析和故障排除。

技术选型

一些常见的消息队列实现包括 RabbitMQ、RocketMQ、Kafka等,选择适合特定应用场景的消息队列是关键,因为它会影响系统的性能、可靠性和可扩展性。不同的场景可能更适合不同的消息队列系统。

基础对比
RabbitMQRocketMQKafka
推出时间2007年2012年2012年
所属Pivotal开源,Mozilla阿里开源,ApacheLinkin开源,Apache
社区活跃度
开发语言ErlangJavaScala、Java
支持的协议AMQP自己定义一套自行定义一套(基于TCP)
吞吐量万级(5.95w/s)十万级(11.6w/s)十万级(17.3w/s)
topic数量对吞吐量的影响topic达到几百,几千个时,吞吐量会有较小幅度的下降topic达到几十,几百个时,吞吐量会大幅度下降
时效性微秒级毫秒级毫秒级
可用性高(主从架构)非常高(分布式架构)非常高(分布式架构)
使用场景适用于各种规模的应用程序,尤其适合需要多语言支持的场景。适用于大规模的企业应用和互联网场景,尤其在阿里巴巴等大型公司中得到广泛应用。适用于大数据处理、实时数据流分析、事件溯源等高吞吐量场景。
功能对比
RabbitMQRocketMQKafka
延迟队列
死信队列
优先级队列
消息回溯
消焦持久化
消魚确认机制单条OffsetOffset
消息TTL
消息重复支持at least once、at most once支持at least once支持at least once、at most once
消息顺序性消费者加锁分区有序
消息事务
消息过滤
消息查询
消息重新消费
消费模式队列模式广播模式+集群模式流模式
消费推拉模式Pull、PushPull、PushPull
批量发送

选型总结

通过对RabbitMQ、RocketMQ、Kafka 基础与功能两个维度对比,本项目将采用 RocketMQ、Kafka 两个消息队列。

RocketMQ 适用场景

  • 高性能、高可用性的消息传递场景,例如实时数据分析、电商秒杀等。

  • 需要强大的消息过滤和消息追踪功能的场景,例如广告投放、用户推送等。

  • 需要分布式事务支持的场景,RocketMQ提供了分布式事务消息特性。

Kafka 适用场景

  • 需要高吞吐量和低延迟的实时数据处理场景,例如用户行为日志分析、实时监控等。

  • 需要保留大量历史数据并支持数据回溯的场景,例如大数据分析、数据仓库等。

  • 需要构建事件驱动架构的场景,Kafka可以作为事件源和消息总线。

Docker 安装 RocketMQ

创建目录结构

具体内容可以参考:mingyue/docker/rocketmq

rocketmq
  /broker1
    /conf
      broker.conf
    /logs
      README.md
    /store
      README.md
  /namesrv
    /logs
      README.md
docker-compose.yml

编写 docker-compose rocketmq 服务

version: '3.8'
services:
  mingyue-mqnamesrv:
    image: apache/rocketmq:4.9.4
    container_name: mingyue-mqnamesrv
    ports:
      - "9876:9876"
    environment:
      JAVA_OPT: -server -Xms512m -Xmx512m
    command: sh mqnamesrv
    volumes:
      - ./rocketmq/namesrv/logs:/home/rocketmq/logs/rocketmqlogs
​
  mingyue-mqbroker1:
    image: apache/rocketmq:4.9.4
    container_name: mingyue-mqbroker1
    ports:
      - "10911:10911"
      - "10909:10909"
      - "10912:10912"
    environment:
      JAVA_OPT_EXT: -server -Xms512M -Xmx512M -Xmn256m
    command: sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf
    depends_on:
      - mingyue-mqnamesrv
    volumes:
      - ./rocketmq/broker1/conf/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf
      - ./rocketmq/broker1/logs:/home/rocketmq/logs/rocketmqlogs
      - ./rocketmq/broker1/store:/home/rocketmq/store
​
  mingyue-mqconsole:
    image: styletang/rocketmq-console-ng
    container_name: mingyue-mqconsole
    ports:
      - "19876:19876"
    links:
      - mingyue-mqnamesrv:mqnamesrv #可以用mqnamesrv这个域名访问rocketmq服务
    environment:
      JAVA_OPTS: -Dserver.port=19876 -Drocketmq.namesrv.addr=mqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false
    depends_on:
      - mingyue-mqnamesrv

启动测试

启动前先执行部分目录赋予读写权限,例:chmod 777 /docker/rocketmq/broker1/logs

访问 mingyue-mqconsole 可以打开 Dashboard 页面即可:http://ip:19876/#/

Docker 安装 Kafka

创建目录结构

具体内容可以参考:mingyue/docker/kafka

kafka
  /data
    README.md
docker-compose.yml

编写 docker-compose kafka 服务

version: '3.8'
services:
  mingyue-zookeeper:
    image: 'bitnami/zookeeper:3.8.0'
    container_name: mingyue-zookeeper
    ports:
      - "2181:2181"
    environment:
      TZ: Asia/Shanghai
      ALLOW_ANONYMOUS_LOGIN: "yes"
      ZOO_SERVER_ID: 1
      ZOO_PORT_NUMBER: 2181
      # 自带的控制台 一般用不上可自行开启
      ZOO_ENABLE_ADMIN_SERVER: "no"
      # 自带控制台的端口
      ZOO_ADMIN_SERVER_PORT_NUMBER: 8080
​
  mingyue-kafka:
    image: 'bitnami/kafka:3.2.0'
    container_name: mingyue-kafka
    ports:
      - "9092:9092"
    environment:
      TZ: Asia/Shanghai
      # 更多变量 查看文档 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
      KAFKA_BROKER_ID: 1
      # 监听端口
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
      # 实际访问ip 本地用 127 内网用 192 外网用 外网ip
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://宿主机IP:9092
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      ALLOW_PLAINTEXT_LISTENER: "yes"
    volumes:
      - /docker/kafka/data:/bitnami/kafka/data
    depends_on:
      - mingyue-zookeeper
    links:
      - mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务
​
  mingyue-kafka-manager:
    image: sheepkiller/kafka-manager:latest
    container_name: mingyue-kafka-manager
    ports:
      - "19092:19092"
    environment:
      ZK_HOSTS: mingyue-zookeeper:2181
      APPLICATION_SECRET: letmein
      KAFKA_MANAGER_USERNAME: mingyue
      KAFKA_MANAGER_PASSWORD: mingyue123
      KM_ARGS: -Dhttp.port=19092
    depends_on:
      - mingyue-kafka
    links:
      - mingyue-zookeeper:zookeeper #可以用zookeeper这个域名访问zookeeper服务

启动测试

启动前先执行部分目录赋予读写权限,例:chmod 777 /docker/kafka/data`

访问 mingyue-kafka-manager 可以打开 Clusters 页面即可:http://mingyue-mq:19092/

Spring Cloud Stream

Spring Cloud Stream 是一个用于构建与共享消息系统连接的高度可扩展的事件驱动微服务的框架。该框架提供了一个基于已经建立和熟悉的 Spring 成语和最佳实践的灵活编程模型,包括支持持久的 pub/sub 语义、消费者组和有状态分区。

说人话:Spring Cloud Stream 是 Spring 用来整合各种 MQ 中间件的框架。

Spring Cloud Stream的核心构建块

  • Destination Binders(目标绑定器):目标指的是 Kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。

  • Destination Bindings(目标绑定):MQ 中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)

  • Message(消息):一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。

Spring Cloud Stream 架构图

Spring Cloud Stream 应用程序由中间件中立的核心组成。该应用程序通过在外部代理暴露的目的地和代码中的输入/输出参数之间建立绑定,与外部世界进行通信。建立绑定所需的经纪人特定细节由特定于中间件的 Binder 实现处理。

  • Middleware:消息中间件,如RabbitMQ、Kafka、RocketMQ等。

  • Binder:可以认为是适配器,用来将Stream与中间件连接起来,不同的Binder对应不同的中间件,需要我们配置。

  • Application:由Stream封装的消息机制,很少自定义开发。

  • Inputs:输入,可以自定义开发。

  • Outputs:输出,可以自定义开发。

小结

本节介绍了什么是消息队列、以及选择什么样的消息队列,如何对比,最终选择了 Kafka 与 RocketMQ。然后给出了 Docker 一件部署 Kafka 与 RocketMQ 的 docker-compose 脚本。阐述了什么是 Spring Cloud Stream,未来将会使用 Spring Cloud Stream 作为 MQ 中间价的框架。

下面我们就使用 Spring Cloud Stream 来搭建代码与 MQ 之间的桥梁~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1046275.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1992-2021年省市县经过矫正的夜间灯光数据(GNLD、VIIRS)

1992-2021年省市县经过矫正的夜间灯光数据(GNLD、VIIRS) 1、时间:1992-2021年3月,其中1992-2013年为年度数据,2013-2021年3月为月度数据 2、来源:1992-2013年来源于DMSP、2013-2021年3月来自VIIRS 3、范…

spring AOP源码阅读分析

理论知识 AOP是面向切面编程(Aspect Oriented Programming)的意思。定义一些切点(pointcut),然后可以在切点织入一些通知(advice),对切点方法进行代理增强,与核心业务逻辑分离开来,以提高系统的可维护性、可扩展性和重…

网工内推 | 网络工程师,软考证书优先,六险一金,包吃

01 科力信息 招聘岗位:网络工程师 职责描述: 1、负责蚌埠项目的设备安装及调试; 2、对边界网络运行中的监控、故障排除、问题处理。 任职要求: 1、2年及以上网络相关工作经验,有交通管理网络运维经验优先&#xff1b…

【移动端测试工具】Appium自动化测试工具安装与配置

文章目录 一、JAVA环境配置检查是否已安装java jdk 二、android SDK安装1.下载android sdk压缩包2.解压压缩包3.安装SDK Manager4.sdk环境变量配置5.验证sdk是否安装成功 三、node JS安装1.下载node.js安装包2.安装node.js3.环境配置4.测试完成验证5.安装淘宝镜像并检验是否安装…

Android MeasureSpec测量规格

文章目录 Android MeasureSpec测量规格概述MeasureSpec组成常用APIMeasureSpec源码分析getChildMeasureSpec源码分析总结 Android MeasureSpec测量规格 概述 MeasureSpec指View的测量规格,MeasureSpec是View的一个静态内部类。 View的MeasureSpec是根据自身的布局…

SoloX:Android和iOS性能数据的实时采集工具

SoloX:Android和iOS性能数据的实时采集工具 github地址:https://github.com/smart-test-ti/SoloX 最新版本:V2.7.6 一、SoloX简介 SoloX是开源的Android/iOS性能数据的实时采集工具,目前主要功能特点: 无需ROOT/越狱…

Java调用操作系统命令的输出乱码问题解决

本篇解决的问题 使用Java 的Runtime调用操作系统的命令,出现异常时使用getErrorStream()获取错误信息的字节流,转换该字节流为字符串显示时,出现乱码。 Java调用操作系统命令 这里以Windows 操作系统为例, 调用cd 命令切换路径…

SAP 销售订单审批状态参数设置

定义权限码 BS52 Spro->控制->内部订单->订单主数据->状态管理->定义状态管理授权码 创建状态参数文件 BS02 SPRO->销售与分销->销售->销售凭证->定义并分配状态参数文件->定义状态参数文件 1)命名,描述 设置对象类型:销…

记录一个iOS UITableView 正在刷新的时候修改数据源导致的崩溃

首先看一下崩溃堆栈信息 由于tableview 调用layoutsubViews 执行到代理方法 -(CGFloat)tableView:(UITableView *)tableView heightForRowAtIndexPath:(NSIndexPath *)indexPath{ 由于是崩溃在系统方法里面的,我们无法直接看到是因为调用哪个方法导致的崩溃 后来…

秦时明月沧海手游礼包码,秦时明月沧海兑换码

在玩《秦时明月沧海》手游时,你可能会遭到礼包码的诱惑。如果你还没找到可用的兑换码,这里有一些可供使用的礼包码,赶快领取吧! 关注【娱乐天梯】,获取内部福利号 1. 礼包码:QIN0809 包含:金镒…

面试打底稿⑤ 项目一的第一部分

简历原文 抽查部分 项目描述 该项目旨在服务广州地区的快递物流,实现了下单、快递员取派件、订单转运单、线路规划、网点设置等功能。 责任描述 登录系统优化,双token三验证模式实现设置token状态、提高登录安全性的效果 模拟问答 1.能简单介绍一下…

2023-9-27 JZ18 删除链表的结点

题目链接: 删除链表的结点 import java.util.*;/** public class ListNode {* int val;* ListNode next null;* public ListNode(int val) {* this.val val;* }* }*/public class Solution {/*** 代码中的类名、方法名、参数名已经指定,请…

【C++】友元函数 ( 友元函数简介 | 友元函数声明 | 友元函数语法 | 友元函数声明不受访问控制限制 | 友元函数参数要求 )

文章目录 一、友元函数简介二、友元函数声明1、友元函数语法2、友元函数声明不受访问控制限制3、友元函数参数要求4、友元函数示例 三、完整代码示例 - 友元函数 一、友元函数简介 在 C 语言中 , " 友元函数 " 是 与 类 相关联的函数 , " 友元函数 " 不是…

【Verilog 教程】6.6Verilog 仿真激励

关键词:testbench,仿真,文件读写 Verilog 代码设计完成后,还需要进行重要的步骤,即逻辑功能仿真。仿真激励文件称之为 testbench,放在各设计模块的顶层,以便对模块进行系统性的例化调用进行仿真…

对象存储,从单机到分布式的演进

关于数据存储的相关知识,请大家关注“数据存储张”,各大平台同名。 通过《什么是云存储?从对象存储说起》我们对对象存储的历史、概念和基本使用有了一个大概的认识。而且我们以Minio为例,通过单机部署的模式实际操作了一下对象存储的GUI,感受了一下对象存储的用法。 在上…

word中使用latex多行公式,矩阵公式

\eqarray{H& [h(x_1)^T,\cdots,h(x_N)^T]^T \\ & [\matrix{g(w_1 x_1b_1) & \cdots & g(w_L x_1b_L) \\ \vdots & \ddots & \vdots \\ g(w_1 x_Nb_1) & \cdots & g(w_L x_Nb_L)}]_{N \times L}}&的引起的那条竖线可以通过backspace或者del…

人工智能AI 全栈体系(七)

第一章 神经网络是如何实现的 神经网络不仅仅可以处理图像,同样也可以处理文本。由于处理图像讲起来比较形象,更容易理解,所以基本是以图像处理为例讲解的。 七、词向量 图像处理之所以讲起来比较形象,是因为图像的基本元素是像…

VB6.0开发文件管理小数据库-基于ACCESS

今天的客户也是小客户,需要对文件的一些操作记录在数据库里面,这里采用的数据库ACCCESS,用的是VB自带的可视化数据管理器创建的mdb数据库文件。实现了数据的增删改查。简单方便。基本可以用这套代码模板实现大部分的数据库功能了。想研发或学…

自定义ElementPlus主题颜色

构建工具采用Vite CSS预处理器采用Sass 一.准备定制化的样式文件 1.安装Sass npm i sass -D 2.创建好文件目录 3.书写样式 ElementPlus默认样式. //index.scss/* 只需要重写你需要的即可 */ forward element-plus/theme-chalk/src/common/var.scss with ($colors: (prim…

腾讯云秒杀活动是什么?如何参与?

腾讯云是国内知名的云计算服务提供商之一,为了吸引更多的用户,腾讯云会不定期地推出各种各样的优惠活动,其中最受大家欢迎的就是“腾讯云秒杀活动”。本文将为大家详细介绍腾讯云秒杀活动参与方式以及购买攻略。 一、腾讯云秒杀活动是什么&am…