SpringCloud之MQ笔记分享

news2024/9/29 18:06:54

MQ异步通信

初始MQ

同步通信

优点:时效性较强,可以以及得到结果

Feign就属于同步方式–问题:

  • 耦合问题
  • 性能下降(中间的等待时间)
  • 资源浪费
  • 级联失败

异步通信

优点

  • 耦合度低
  • 性能提升,吞吐量高
  • 故障隔离
  • 服务无强依赖,解决级联失败问题
  • 流量削峰

缺点

  • 依赖于broker的可靠性,安全性,吞吐能力
  • 架构复杂了,业务没有明显的流程先,不好追踪管理

MQ常见框架

什么是MQ:信息队列,存放消息的队列,也就是时间驱动架构中的broker

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

推荐RabbitMQ

RabbitMQ快速入门

案例

发布者

package cn.itcast.mq.helloworld;



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("xxx.xx.xxx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxx");
        factory.setPassword("xxxxx");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq1!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

消费者

package cn.itcast.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("xxx.xxx.xx.xxx");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

运行消费者后运行发布者,就可以接收到发布者的信息(因为消费者的函数是异步,所以发布者可以随时发,消费者一直等着)

SpringAMQP

什么是AMQP:一种协议,API规范

SpringAMQP:底层是rabbitMQ

基础实现

使用SpringAMQP实现Helllo World中的基础消息队列功能

1,引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2,基础配置

配置基础配置并且工具类并测试

基础配置

spring:
  rabbitmq:
    host: xxx.xx.xxx.xx
    port: 5672
    username: xxx
    password: xxx
    virtual-host: /

测试类

package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage(){
        String queueName = "simple.queue";
        String message = "hello,spring amqp";
        rabbitTemplate.convertAndSend(queueName,message);
    }
}

测试结果

前面几个是之前的,第三个和后面的是我自己设置的

image-20230226153614657

上述为发布者,消费者需要:

1,配置yml文件

2,定义一个类

这里的核心是使用@Component自动装配到Spring,使用RabbitListener进行队列的监听,得到返回值msg就打印

package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        System.out.println("消费者接收到消息:【"+msg+"】");
    }
}

WorkQueue模型

当一个消费者能力足够强,而另一个比较弱,合理的处理方式应该是强的消费者处理更多的消息,而弱的处理少的(根据能力分工)

所以WorkQueue模型就是为了让同一个队列的消费者,能力强的处理更多的消息,核心在于prefetch控制预提取的信息数量

生产者循环发送消息到simple.queue:发送消息

@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message__";
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
        // 避免发送太快
        Thread.sleep(20);
     }
}

消费者监听:两个消费者

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者1接收到消息:【" + msg + "】");
    Thread.sleep(25);
}
    
@RabbitListener(queues = "simple.queue") 
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消费者2接收到消息:【" + msg + "】");
Thread.sleep(100);
}

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量

交换机:exchange

exchange:exchange负责消息路由,而不是存储,路由失败则消息丢失

FanoutExchange的使用

创建两个队列,并进行绑定

package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
    //itcast.exchage
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("ylw.fanout");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanout.queue1");
    }
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }
    //fanout.queue2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanout.queue2");
    }
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }
}

Consumer声明两个消费者

@RabbitListener(queues = "fanout.queue1")

public void listenFanoutQueue1(String msg) 
{System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}
@RabbitListener(queues = "fanout.queue2") 

public void listenFanoutQueue2(String msg) 
{System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}

发布者发送消息

@Test
public void testFanoutExchange() {    
    // 队列名称    
    String exchangeName = "itcast.fanout";    
    // 消息
    String message = "hello, everyone!";    
    // 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息     
    rabbitTemplate.convertAndSend(exchangeName, "", message);}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/375195.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习经典算法——决策树(Decision Tree)

决策树的基本原理 决策树是⼀种分⽽治之的决策过程。⼀个困难的预测问题&#xff0c;通过树的分⽀节点&#xff0c;被划分成两个或多个较为简单的⼦集&#xff0c;从结构上划分为不同的⼦问题。将依规则分割数据集的过程不断递归下去。随着树的深度不断增加&#xff0c;分⽀节…

Django-版本信息介绍-版本选择

文章目录1.如何获取Django1.1.选项1:获取最新的正式版本1.2.选项2:获取4.2的beta版1.3.选项3:获取最新的开发版本2.得到之后3.支持版本4.选择版本1.如何获取Django Django在BSD许可下是开源的。我们建议使用最新版本的Python 3。支持Python 2.7的最新版本是Django 1.11 LTS。请…

判断一个用字符串表达的数字是否可以被整除

一.问题引出 当一个数字很大的时候,我们常用字符串进行表达,(超过了int和long等数据类型可以存储的最大范围),但是这个时候我们该如何判断他是否可以被另一个数整除呢? 这个时候我们不妨这样来考虑问题,每次将前边求模之后的数保存下来,然后乘以10和这一位的数字进行相加的操…

Linux 阻塞和非阻塞 IO 实验

目录 一、阻塞和非阻塞简介 1、IO 概念 2、阻塞与非阻塞 二、等待队列 1、等待队列头 2、等待队列项 3、将队列项添加/移除等待队列头 4、等待唤醒 5、等待事件 三、轮询 1、应用程序的非阻塞函数 2、Linux 驱动下的 poll 操作函数 四、阻塞IO之等待事件唤醒 添加…

JavaScript split()方法

JavaScript split()方法 目录JavaScript split()方法一、定义和用法二、语法三、参数值四、返回值五、更多实例5.1 省略分割参数5.2 使用limit参数5.3 使用一个字符作为分割符一、定义和用法 split() 方法用于把一个字符串分割成字符串数组。 二、语法 string.split(separat…

Linux驱动中的open函数是如何从软件打通硬件呢?

一、前言 打开文件是Linux系统中最基本的操作之一&#xff0c;open函数可以实现打开文件的功能。下面我将为您介绍open函数打通上层到底层硬件的详细过程。 二、open函数打通软硬件介绍 open函数是系统调用中的一种&#xff0c;其原型定义在头文件unistd.h中&#xff1a; #…

webpack模块化的原理

commonjs 在webpack中既可以书写commonjs模块也可以书写es模块&#xff0c;而且不用考虑浏览器的兼容性问题&#xff0c;我们来分析一下原理。 首先搞清楚commonjs模块化的处理方式&#xff0c;简单配置一下webpack&#xff0c;写两个模块编译一下看一下&#xff1a; webpac…

【数据结构与算法】——第八章:排序

文章目录1、基本概念1.1 什么是排序1.2 排序算法的稳定性1.3 排序算法的分类1.4 内排序的方法2、插入排序2.1 直接插入排序2.2 直接插入排序2.3 希尔排序3、交换排序3.1 冒泡排序3.2 快速排序4、选择排序4.1 简单选择排序4.2 树形选择排序4.3 堆排序4.4 二路归并排序5、基数排序…

Benchmark测试——fio——源码分析

1. main 1.1 parse_options() 解析选项&#xff0c;更新数据结构 1.1.1 fio_init_options() 1.1.2 fio_test_cconv(&def_thread.o) <cconv.c> 1.1.2.1 convert_thread_options_to_cpu() 传递options给数据结构 1.1.3 parse_cmd_line() switch语句多路选择&am…

Vue3电商项目实战-商品详情模块8【23-商品详情-评价组件-图片预览、24-商品详情-评价组件-★分页组件】

文章目录23-商品详情-评价组件-图片预览24-商品详情-评价组件-★分页组件23-商品详情-评价组件-图片预览 目的&#xff1a;封装一个组件展示 图片列表 和 预览图片 功能。 大致步骤&#xff1a; 准备一个组件导入goods-comment.vue使用起来&#xff0c;传入图片数据展示图片列…

华为OD机试模拟题 用 C++ 实现 - 快递货车(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 最多获得的短信条数(2023.Q1)) 文章目录 最近更新的博客使用说明快递货车题目输入输出示例一输入输出Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单…

逻辑地址和物理地址转换

在操作系统的学习中&#xff0c;很多抵挡都会涉及虚拟地址转换为物理地址的计算&#xff0c;本篇就简单介绍一下在分页存储管理、分段存储管理、磁盘存储管理中涉及的地址转换问题。 虚拟地址与物理地址 编程一般只有可能和逻辑地址打交道&#xff0c;比如在 C 语言中&#x…

常用逻辑运算符

逻辑符号表格 逻辑符号含义描述&按位与将数字转成二进制计算&#xff0c;两个位都为1时&#xff0c;结果才为1|或两个位都为0时&#xff0c;结果才为0 &#xff0c;反知任何一个为1结果为1^异或两个位相同为0&#xff0c;不同为1<<左移整体二进位全部左移若干位&…

《数据库系统概论》学习笔记——第五章:数据库完整性

教材为数据库系统概论第五版&#xff08;王珊&#xff09; 本章概念比较多&#xff0c;稍微记一下 数据库的完整性 数据库的完整性指的是数据的正确性和相容性 完整性&#xff1a;指数据是符合现实世界语义、反映当前实际状况的 相容性&#xff1a;数据库同一对象在不同关系表…

操作系统——7.进程的定义,组成,组成方式和特征

目录 1.概述 ​编辑2.定义 2.1单道程序 2.2多道程序 2.3进程定义 3.进程的组成 3.1进程的组成内容 3.2 PCB中的内容 4.进程的组织 4.1进程的两种组织方式 4.2链接方式 4.3索引方式 5.进程的特征 6.小结 这篇文章&#xff0c;我们主要来学习一下进程的定义&#xff0…

深度剖析Java集合之Properties

Properties 前面我们介绍了各种各样的Map ,今天我们介绍一个我们常用来保存程序运行配置参数的一个类,Properties 它的操作和Map 非常像,不一样的是它提供了和文件的交互操作,也就是说我们的配置可以保存到文件里或者是从文件里加载。 源代码 首先我们看一下这个类的继承…

[FI业务流程] - 未清项管理 (XOPVW, XLGCLR, X_UJ_CLR)

1 业务背景 未清项管理&#xff08;OIM: Open Item Management&#xff09;是SAP系统中针对资产负债表科目&#xff08;Balance Sheet Accounts&#xff09;的一个功能。 对于过账到未清项管理类型的科目下的凭证&#xff0c;首先会被标记为“未清项”也即open状态&#xff1b…

解密游戏推荐系统的建设之路

作者&#xff1a;vivo 互联网服务器团队- Ke Jiachen、Wei Ling 本文从零开始介绍了游戏推荐项目的发展历程&#xff0c;阐述了大型项目建设中遇到的业务与架构问题以及开发工程师们的解决方案&#xff0c;描绘了游戏推荐项目的特点以及业务发展方向&#xff0c;有着较好的参考…

内存保护_1:Tricore芯片MPU模块介绍

上一篇 | 返回主目录 | 下一篇 内存保护_1&#xff1a;Tricore芯片MPU模块介绍1 何为MPU2 MPU相关的硬件子系统2.1 基于地址范围保护逻辑说明2.1.1 地址范围寄存器2.1.2 读、写、执行权限寄存器2.1.3 保护集设置位2.1.4 内存保护功能使能位2.1.5 核的内存保护范围获取说明2.1.6…

【Proteus仿真】【STM32单片机】粮仓温湿度控制系统设计

文章目录一、功能简介二、软件设计三、实验现象联系作者一、功能简介 本项目使用Proteus8仿真STM32单片机控制器&#xff0c;使用声光报警模块、LCD1602显示模块、DHT11温湿度模块、继电器模块、加热加湿除湿风扇等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD1602显示…