消息中间件-RocketMQ入门 消息发送的三种方式

news2025/1/13 8:09:26

消息中间件-RocketMQ入门 消息发送的三种方式

  • 消息中间件简介
    • 应用场景
    • 常用消息中间件
  • RocketMQ核心概念
    • 入门案例-生产者和消费者代码逻辑
    • 消息发送的三种方式
      • 同步发送
      • 异步发送
      • 一次性消息


消息中间件简介

应用场景

假设现在有订单微服务和积分微服务,正常请求流程之后是不是一个订单完成后给对应的用户加上积分,但如果积分微服务坏掉了,正常来说会回滚,但实际中情况中,积分完全可以晚一点加,没有什么影响

1.解决代码耦合的问题

在这里插入图片描述
解决问题的方法
在这里插入图片描述
这样订单微服务把参数发送给中间件之后就完成了它自己的任务,使微服务不用依赖其它微服务,就算中间件挂了也不需要担心,它虽然默认存储在内存里面,但也会在磁盘里面存一份

2.进行流量的削峰

在这里插入图片描述

3.数据分发

在这里插入图片描述
在这里插入图片描述
解决办法:
在这里插入图片描述

常用消息中间件

1.ActiveMQ是Apache出品,比较老的一个开源的消息中间件,以前在中小企业应用广泛.
2.Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统。它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决.
3.RabbitMQ是一个基于Erlang语言开发的消息中间件,
RabbitMQ最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
对数据的一致性,稳定性和可靠性要求比较高的场景
4.RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache软件基金会,并于2017年9月25日成为 Apache的顶级项目。作为经历过多次阿里巴巴双十一这种"超级工程"的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
淘宝内部的交易系统使用了淘宝自主研发的Noify消息中间件,使用MySQL作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,Kaka无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用Javai语言编写了RocketMQ,定位于非日志的可靠消息传输〈(日志场最也OK),目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binlog分发等场景。

在这里插入图片描述


RocketMQ核心概念

消息中间件里面集群了多个代理服务器,如何做到负载?

在创造RocketMQ的时候,它本身有一个轻量级的注册中心称为"NameServer命名服务",因为像Nacos和zookeeper这样复杂的注册中心,运行起来对性能肯定也会有一定的影响,倘若有一天该注册中心不开源不维护了,该中间件是不是也会因此遇到很大的麻烦
在这里插入图片描述

入门案例-生产者和消费者代码逻辑

第一步:创建两个两个项目,分别为生产者和消费者

在这里插入图片描述
创建生产者

第一步:导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
  </dependency>
</dependencies>

第二步:创建生产类模拟生产

public class Producer {
    public static void main(String[] args) throws Exception {
        //定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        //连接nameServer
        producer.setNamesrvAddr("43.143.161.59:9876");
        //启动生产者
        producer.start();
        //设置消息发送的目的地
        String topic = "helloTopic";
        //发送消息
        for(int i=0;i<10;i++){
            Message msg = new Message(topic,("RocketMQ普通消息"+i).getBytes(Charset.defaultCharset()));
            SendResult result = producer.send(msg);
            System.out.println("发送状态"+result.getSendStatus());
        }
        System.out.println("消息发送完毕.");
        //关闭资源
        producer.shutdown();
    }
}

创建消费者

第一步:导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
  </dependency>
</dependencies>

第二步:创建消费类模拟接收

public class Consumer {
    public static void main(String[] args) throws Exception {
        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        //设置nameServer地址
        consumer.setNamesrvAddr("43.143.161.59:9876");
        //设置订阅的主题
        consumer.subscribe("helloTopic","*");
        //设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

消息发送的三种方式

同步发送

在这里插入图片描述
应用程序给消息中间件发送消息的时候。需要等待消息中间件将消息存储完毕之后,才响应回去。业务代码才能往下执行

异步发送

在这里插入图片描述
应用程序给消息中间件发送消息的时候,消息中间件收到这个消息之后,直接给应用程序响应了.(此时消息并没有完全存储到磁盘),消息中间件继续存储消息。存储完成(成功或者失败),通过回调地址通知有应用程序。消息存储的结果
示例代码

public class Producer {
    public static void main(String[] args) throws Exception {
        //定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        //连接nameServer
        producer.setNamesrvAddr("43.143.161.59:9876");
        //启动生产者
        producer.start();
        //设置消息发送的目的地
        String topic = "helloTopic";
        //发送消息
        Message msg = new Message(topic,("RocketMQ异步消息").getBytes(Charset.defaultCharset()));
        System.out.println("消息发送前");
        //异步发送
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息存储状态:"+sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送出现异常");
            }
        });
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        //关闭资源
        producer.shutdown();
    }

运行结果
在这里插入图片描述

业务逻辑处理 ----> 执行send方法,不需要等待消息中间件存储消息,可以直接执行业务逻辑代码

与同步发送相比,异步发送时间更短一点,响应更快一点,为了使响应时间更快的可以选择异步发送,但同步发送也有它自己的意义,同步发送更加可靠

一次性消息

应用程序给消息中间件发送消息的时候,不需要知道消息是否在消息中间存储了,只管发就是了.
在这里插入图片描述

public class Producer {
    public static void main(String[] args) throws Exception {
        //定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        //连接nameServer
        producer.setNamesrvAddr("43.143.161.59:9876");
        //启动生产者
        producer.start();
        //设置消息发送的目的地
        String topic = "helloTopic";
        //发送消息
        Message msg = new Message(topic,("RocketMQ一次性消息").getBytes(Charset.defaultCharset()));
        System.out.println("消息发送前");
        producer.sendOneway(msg);
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        //关闭资源
        producer.shutdown();
    }
}

运行结果
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/335971.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java程序员:为了跳槽刷完1000道真题,想不到老板直接给我升职了

同事&#xff1a;前阵子听说你要跳槽&#xff0c;现在准备得怎么样啊&#xff1f; 程序员T&#xff1a;不跳了 同事&#xff1a;啊&#xff1f;为什么&#xff1f; 程序员T&#xff1a;涨薪了呗&#xff1f; 同事&#xff1a;真的吗&#xff1f;涨了多少&#xff1f;你自己…

DAMA数据管理知识体系指南之文档和内容管理

第10章 文档和内容管理 10.1 简介 文档和内容管理是对存储在关系数据库以外的信息的采集、存储、访问以及使用的控制活动。文档和内容管理的侧重点在完整性和访问控制上。因此&#xff0c;它与关系数据库的数据操作管理大致相同。由于多数非结构化数据与存储在结构化文件中的…

数据存储技术复习(四)未完

1.什么是NAS。一般用途服务器与NAS设备之间有何不同。NAS是一个基于IP的专用高性能文件共享和存储设备。—般用途服务器可用于托管任何应用程序&#xff0c;因为它运行的是一般用途操作系统NAS设备专用于文件服务。它具有专门的操作系统&#xff0c;专用于通过使用行业标准协议…

151、【动态规划】leetcode ——2. 01背包问题:二维数组+一维数组(C++版本)

题目描述 原题链接&#xff1a;2. 01背包问题 解题思路 &#xff08;1&#xff09;二维dp数组 动态规划五步曲&#xff1a; &#xff08;1&#xff09;dp[i][j]的含义&#xff1a; 容量为j时&#xff0c;从物品1-物品i中取物品&#xff0c;可达到的最大价值 &#xff08;2…

算法顶级比赛汇总

可参赛的算法比赛 阿里云天池大数据竞赛 时间&#xff1a;每年各个季度很多类型都会出题&#xff08;比赛总时间大概为两个月&#xff09; 内容&#xff1a;各个类型的算法题都会出、奖金上万不等 形式&#xff1a;在线提交&#xff08;提交后在线检查结果&#xff09;、离线…

简洁易懂:源码+实战讲解Redisson并发锁及看门狗自动续期

1 缘起 有一次同事问Redisson存储的键是否为hash&#xff1f; 我当时&#xff0c;没有看Redisson的相关源码&#xff0c;只知道应用&#xff0c; 所以没有办法回答&#xff0c;于是开始看看Redisson实现的源码&#xff0c; 顺便写了一个单机Redisson测试&#xff0c; 发现Redi…

leaflet 加载CSV数据,显示图形(代码示例046)

第046个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+leaflet中加载CSV文件,将图形显示在地图上。 直接复制下面的 vue+openlayers源代码,操作2分钟即可运行实现效果; 注意如果OpenStreetMap无法加载,请加载其他来练习 文章目录 示例效果配置方式示例源代码(共74…

海思3559:BT656调试笔记

前言 海思3559a的sdk例子是没有提供BT1120和BT656视频接入的&#xff0c;但实际上硬件是可以支持接入的。不过前提是只支持逐行方式输入&#xff0c;不支持隔行视频&#xff0c;如果想输入PAL制式的隔行视频&#xff0c;请先用芯片转成逐行再接入。不知道是官方手册有意无意的忽…

弄懂自定义 Hooks 不难,改变开发认知有点不习惯

前言 我之前总结逻辑重用的时候&#xff0c;就一直在思考一个问题。 对于逻辑复用&#xff0c;render props 和 高阶组件都可以实现&#xff0c;同样官方说 Hooks 也可以实现&#xff0c;且还是在不增加额外的组件的情况下。 但是我在项目代码中&#xff0c;没有找到自定义 …

python | 第二章考试题和练习题

一、考试题 1、turtle八边形绘制 问题描述&#xff1a; 使用turtle库&#xff0c;绘制一个八边形。 参考代码&#xff1a; import turtle as t t.pensize(2) for i in range(8):t.fd(100)t.left(45) 2、turtle八角图形绘制 问题描述&#xff1a; 使用turtle库&#xff0c;…

SaleSmartly(ss客服)带你了解:缩短B2B销售周期的秘诀

缩短B2B销售周期的秘诀&#xff1a;即时聊天 关键词&#xff1a;B2B 销售&#xff1b;即时沟通&#xff1b;SaleSmartly&#xff08;ss客服&#xff09; 在B2B销售中&#xff0c;时间就是一切。在某些情况下&#xff0c;买家正在积极寻找即时解决方案&#xff0c;潜在客户以多种…

【2023unity游戏制作-mango的冒险】-开始画面API制作

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 收录于专栏&#xff1a;游戏制作 ⭐mango的冒险-开始画面制作⭐ 文章目录⭐mango的冒险-开始画面制作⭐&#x1f468;‍&…

EasyCVR视频云存储的架构解析与Sharelist云存挂载方法介绍

一、什么是视频云存储&#xff1f; 视频云存储主要用于为上层应用提供视频文件、结构化信息、事件信息的相关服务。云存储节点分为数据文件存储节点和结构化数据存储节点。数据文件存储节点主要用于视频、图片的存储。结构化数据存储节点用于存储结构化数据并提供相关服务。 …

【学习记录】PCA主成分分析 SVD奇异值分解

在看MSC-VO代码的过程中&#xff0c;大量出现了奇异值分解的内容&#xff0c;本身对这部分了解不多&#xff0c;这里补一下课&#xff0c;参考b站up主小旭学长的视频&#xff0c;链接为&#xff1a;PCA主成分分析和SVD主成分分析 PCA主成分分析 PCA根本目的在于让数据在损失尽…

机器学习笔记之生成模型综述(三)生成模型的表示、推断、学习任务

机器学习笔记之生成模型综述——表示、推断、学习任务引言生成模型的表示任务从形状的角度观察生成模型的表示任务从概率分布的角度观察生成模型的表示任务生成模型的推断任务生成模型的学习任务引言 上一节介绍了从监督学习、无监督学习任务的角度介绍了经典模型。本节将从表…

概率论面试题1:玫瑰花

概率论面试题 1. 一个活动&#xff0c;n个女生手里拿着长短不一的玫瑰花&#xff0c;无序的排成一排&#xff0c;一个男生从头走到尾&#xff0c;试图拿更长的玫瑰花&#xff0c;一旦拿了一朵就不能再拿其他的&#xff0c;错过了就不能回头&#xff0c;问最好的策略&#xff1…

3年自动化测试这水平?我还不如去招应届生

公司前段缺人&#xff0c;也面了不少测试&#xff0c;结果竟然没有一个合适的。一开始瞄准的就是中级的水准&#xff0c;也没指望来大牛&#xff0c;提供的薪资在10-20k&#xff0c;面试的人很多&#xff0c;但平均水平很让人失望。看简历很多都是3年工作经验&#xff0c;但面试…

什么是响应性?

响应性&#xff1a; 这个术语在今天的各种编程讨论中经常出现&#xff0c;但人们说它的时候究竟是想表达什么意思呢&#xff1f;本质上&#xff0c;响应性是一种可以使我们声明式地处理变化的编程范式。一个经常被拿来当作典型例子的用例即是 Excel 表格&#xff1a; 这里单元…

angular相关知识点总结

创建 angualr 组件和传值 angular组件其实就是个xxx.component.ts,本质还是ts文件一个html文件 1.创建组件&#xff1a;在Angular中&#xff0c;可以使用命令行工具ng generate component创建一个新组件。例如&#xff1a; ng generate component my-component这将创建一个名…

Ubuntu 系统下Docker安装与使用

Ubuntu 系统下Docker安装与使用Docker安装与使用Docker安装安装环境准备工作系统要求卸载旧版本Ubuntu 14.04 可选内核模块Ubuntu 16.04 使用 APT 安装安装 Docker CE使用脚本自动安装启动 Docker CE建立 docker 用户组测试 Docker 是否安装正确镜像加速Docker使用拉取镜像创建…