RabbitMq架构设计原理

news2025/1/19 20:40:41

1、消息中间件

1.1、什么是消息中间件

消息中间件基于队列模型实现异步/同步传输数据

作用:

  • 支撑高并发
  • 异步解耦
  • 流量削峰
  • 降低耦合度

1.2、传统的HTTP请求有什么缺点

  • Http请求基于请求响应模型,高并发情况下,客户端发送大量的请求到服务器端可能导致服务器端请求堆积
  • Tomcat服务器处理每个请求都会有自己的独立的线程,如果超过最大线程数就会将该请求缓存到队列中,如果请求堆积过多的情况下,就会导致服务器崩溃。所以一般会在 nginx 入口处实现限流

在这里插入图片描述

  • Http请求处理业务逻辑比较耗时的情况下,就会导致客户端一直等待,阻塞等待过程中会导致客户端超时发生重试策略,可能会引发幂等性问题

接口是为Http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该交给多线程或者是MQ来处理

1.3、MQ的应用场景

  • 异步调用第三方服务发送短信

  • 处理一些比较耗时的操作

2、同步、多线程、以及MQ处理业务逻辑的区别

2.1、同步发送Http 请求

需求:客户端发送请求到服务器端,服务器端实现会员注册业务逻辑

1、插入会员数据需要1s

2、发送登录短信提醒 3s

3、发送新人优惠卷 3s

总共响应需要6s时间,可能会导致客户端阻塞6s,用户体验极其不好
在这里插入图片描述

2.2、多线程处理业务逻辑

插入会员数据之后单独开启一个线程异步处理发送短信和发送新人优惠卷业务

此时客户端只需要等待 1s

优点: 适合小项目实现异步

缺点: 多开线程会消耗服务器CPU资源

在这里插入图片描述

2.3、MQ实现业务逻辑

先向数据库中插入一条会员数据,然后向MQ中投递一个消息,MQ服务器再将消息推送给消费者异步解耦处理发送短信和发送优惠卷

在这里插入图片描述

Mq和多线程之间的区别

  • Mq可以实现异步解耦、流量削峰
  • 多线程也可以实现异步,但是消耗CPU资源,没有实现解耦

3、Mq消息中间件名词

  • Producer 生产者:投递消息到MQ服务器端

  • Consumer 消费者:从MQ服务器端获取消息处理业务逻辑

  • Broker MQ服务器端

  • Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题

  • Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表

  • Message 生产者投递消息报文:json

4、简单实现Mq的思路

4.1、基于多线程队列简单实现Mq

思路:

  • 首先需要一个生产者线程将消息投递到Mq,这里用LinkedBlockingDeque<JSONObject>队列来模拟Mq
  • 然后需要一个消费者线程从Mq中拿到消息进行消费

代码:

public class ThreadMQ {

    private static LinkedBlockingDeque<JSONObject> msgs = new LinkedBlockingDeque<JSONObject>();

    public static void main(String[] args) {
        // 生产线程
        new Thread(()-> {
                try {
                    while (true) {
                        Thread.sleep(1000);
                        JSONObject data = new JSONObject();
                        data.put("userId", "123");
                        // 存入消息
                        msgs.offer(data);
                    }
                } catch (Exception e) {
                }
        }, "生产者").start();
        // 消费者线程
        new Thread(()-> {
                try {
                    while (true) {
                        JSONObject data = msgs.poll();
                        if (data != null) {
                            System.out.println(Thread.currentThread().getName() + "," + data);
                        }
                    }

                } catch (Exception e) {

                }

        }, "消费者").start();
    }
    /**
     * 运行结果:
     * 消费者,{"userId":"123"}
     * 消费者,{"userId":"123"}
     * 消费者,{"userId":"123"}
     * 消费者,{"userId":"123"}
     * 消费者,{"userId":"123"}
     * 消费者,{"userId":"123"}
     * 消费者,{"userId":"123"}
     * 消费者,{"userId":"123"}
     * 消费者,{"userId":"123"}
     * ...
     */
}

4.2、基于Netty实现Mq

思路:

1、生产者nettyClient端发送请求给MQ服务器端(nettyServer端),MQ服务器端在将该消息内容发送给消费者

2、消费者nettyClient端与MQ服务器端(nettyServer端)保持长连接,MQ服务器端保存消费者连接

在这里插入图片描述

请求体格式:

body:{
    "msg":{
        "userId":"123456",
         "age":"23"
    },
    "type":"producer",  //类型
    "topic":""  //主题
}

生产者投递消息给MQ服务器端,MQ服务器端需要缓存该消息

如果mq服务器端宕机之后,消息如何保证不丢失

持久化机制

如果mq接收到生产者投递消息,如果消费者不在的情况下,该消息是否会丢失?

消息确认机制

答案是不会丢失,消息确认机制必须要消费者消费成功该消息之后,再通知Mq 服务器端删除该消息

  • Mq 服务器端将该消息推送消费者
  • 消费者此时已经和Mq 服务器保持长连接
  • 消费者主动拉取消息

Mq如何实现抗高并发思想

Mq消费者根据自身能力情况 ,拉取mq服务器端消息消费。默认的情况下是取出一条消息

缺点:存在延迟的问题

需要考虑mq消费者提高速率的问题:消费者实现集群、消费者批量获取消息即可。

Demo:

Mq 服务器端:

public class MayiktNettyMQServer {
    public void bind(int port) throws Exception {
        /**
         * Netty 抽象出两组线程池BossGroup和WorkerGroup
         * BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {
            bootstrap.group(bossGroup, workerGroup)
                    // 设定NioServerSocketChannel 为服务器端
                    .channel(NioServerSocketChannel.class)
                    //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
                    //用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                    .option(ChannelOption.SO_BACKLOG, 100)
                    // 服务器端监听数据回调Handler
                    .childHandler(new MayiktNettyMQServer.ChildChannelHandler());
            //绑定端口, 同步等待成功;
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("当前服务器端启动成功...");
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //优雅关闭 线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 设置异步回调监听
            ch.pipeline().addLast(new MayiktNettyMQServer.MayiktServerHandler());
        }
    }
    //主业务逻辑
    public static void main(String[] args) throws Exception {
        int port = 9008;
        new MayiktNettyMQServer().bind(port);
    }
    private static final String type_consumer = "consumer";
    private static final String type_producer = "producer";
    private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();
    private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();

    // 生产者投递消息的:topicName
    public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {

        /**
         * 服务器接收客户端请求
         *
         * @param ctx
         * @param data
         * @throws Exception
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object data)
                throws Exception {
            JSONObject clientMsg = getData(data);
            String type = clientMsg.getString("type");
            switch (type) {
                case type_producer:
                    producer(clientMsg);
                    break;
                case type_consumer:
                    consumer(ctx);
                    break;
            }
        }

        /**
         * 消费者
         * @param ctx
         */
        private void consumer(ChannelHandlerContext ctx) {
            // 保存消费者连接
            ctxs.add(ctx);
            // 主动拉取mq服务器端缓存中没有被消费的消息
            String data = msgs.poll();
            if (StringUtils.isEmpty(data)) {
                return;
            }
            // 将该消息发送给消费者
            byte[] req = data.getBytes();
            ByteBuf firstMSG = Unpooled.buffer(req.length);
            firstMSG.writeBytes(req);
            ctx.writeAndFlush(firstMSG);
        }

        /**
         * 生产者
         * @param clientMsg
         */
        private void producer(JSONObject clientMsg) {
            // 缓存生产者投递 消息
            String msg = clientMsg.getString("msg");
            msgs.offer(msg);

            //需要将该消息推送消费者
            ctxs.forEach((ctx) -> {
                // 将该消息发送给消费者
                String data = msgs.poll();
                if (data == null) {
                    return;
                }
                byte[] req = data.getBytes();
                ByteBuf firstMSG = Unpooled.buffer(req.length);
                firstMSG.writeBytes(req);
                ctx.writeAndFlush(firstMSG);
            });
        }

        private JSONObject getData(Object data) throws UnsupportedEncodingException {
            ByteBuf buf = (ByteBuf) data;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            return JSONObject.parseObject(body);
        }


        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {

            ctx.close();
        }
    }
}

客户端消费者端:

public class MayiktNettyMQConsumer {
    public void connect(int port, String host) throws Exception {
        //配置客户端NIO 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(group)
                    // 设置为Netty客户端
                    .channel(NioSocketChannel.class)
                    /**
                     * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                     * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                     * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                     */
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MayiktNettyMQConsumer.NettyClientHandler());
                        }
                    });

            //绑定端口, 异步连接操作
            ChannelFuture future = client.connect(host, port).sync();
            //等待客户端连接端口关闭
            future.channel().closeFuture().sync();
        } finally {
            //优雅关闭 线程组
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 9008;
        MayiktNettyMQConsumer client = new MayiktNettyMQConsumer();
        try {
            client.connect(port, "127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {


        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            JSONObject data = new JSONObject();
            data.put("type", "consumer");
            // 生产发送数据
            byte[] req = data.toJSONString().getBytes();
            ByteBuf firstMSG = Unpooled.buffer(req.length);
            firstMSG.writeBytes(req);
            ctx.writeAndFlush(firstMSG);
        }

        /**
         * 客户端读取到服务器端数据
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("消费者客户端接收到服务器端请求:" + body);
        }

        // tcp属于双向传输

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
}

客户端生产者端:

public class MayiktNettyMQProducer {
    public void connect(int port, String host) throws Exception {
        //配置客户端NIO 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(group)
                    // 设置为Netty客户端
                    .channel(NioSocketChannel.class)
                    /**
                     * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                     * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                     * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                     */
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MayiktNettyMQProducer.NettyClientHandler());
                        }
                    });

            //绑定端口, 异步连接操作
            ChannelFuture future = client.connect(host, port).sync();
            //等待客户端连接端口关闭
            future.channel().closeFuture().sync();
        } finally {
            //优雅关闭 线程组
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 9008;
        MayiktNettyMQProducer client = new MayiktNettyMQProducer();
        try {
            client.connect(port, "127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {


        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            JSONObject data = new JSONObject();
            data.put("type", "producer");
            JSONObject msg = new JSONObject();
            msg.put("userId", "123456");
            msg.put("age", "23");
            data.put("msg", msg);
            // 生产发送数据
            byte[] req = data.toJSONString().getBytes();
            ByteBuf firstMSG = Unpooled.buffer(req.length);
            firstMSG.writeBytes(req);
            ctx.writeAndFlush(firstMSG);
        }

        /**
         * 客户端读取到服务器端数据
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
            System.out.println("客户端接收到服务器端请求:" + body);
        }

        // tcp属于双向传输

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
}

5、主流Mq区别对比

特性ActiveMQRabbitMQRocketMQkafka
开发语言javaerlangjavascala
单机吞吐量万级万级10万级10万级
时效性ms级us级ms级ms级以内
可用性高(主从架构)高(主从架构)非常高(分布式架构)非常高(分布式架构)
功能特性成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好基于erlang开发,所以并发能力很强,性能极其好,延时很低管理界面较丰富MQ功能比较完备,扩展性佳只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。

简单来说:小型的微服务应用可以选择使用RabbitMq,大型应用、数据量大的情况下就可以选择使用 RocketMq 或者 Kafka

6、RabbitMq

6.1、RabbitMq简单介绍

RabbitMq是实现了高级消息队列协议(AMQP)的开源消息代理软件,也称为面向对象的中间件

官网:https://www.rabbitmq.com/

在这里插入图片描述

1、点对点的队列(简单队列)

2、工作(公平性)队列模式

3、发布订阅模式

在这里插入图片描述

4、路由模式

5、通配符模式

6、RPC

6.2、RabbitMQ环境的基本安装

笔者觉得使用docker安装超级方便,所以介绍下Linux 中如何安装

1、拉取镜像

docker pull rabbitmq:3-management

2、运行MQ容器

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

3、docker ps 查看镜像是否运行

4、然后就可以访问啦

6.3、Rabbitmq管理平台中心

RabbitMQ 管理平台地址 http://127.0.0.1:15672

默认账号:guest/guest 用户可以自己创建新的账号

像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?

RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通

默认的端口 15672:rabbitmq管理平台端口号 http协议

默认的端口 5672: rabbitmq消息中间内部通讯的端口

默认的端口 25672: rabbitmq集群的端口号

6.4、RabbitMQ常见名词

  • /Virtual Hosts:分类

  • /队列 存放我们消息

  • Exchange 分派我们消息在那个队列存放起来 类似于nginx

6.5、快速入门RabbitMQ简单队列

首先需要在RabbitMQ平台创建Virtual Hosts 和队列

  • Virtual Hosts :/meiteVirtualHosts

  • 队列:mayikt

创建连接:

public class RabbitMQConnection {

    /**
     * 创建连接
     *
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.配置Host
        connectionFactory.setHost("192.168.181.103");
        //3.设置Port
        connectionFactory.setPort(5672);
        //4.设置账户和密码
        connectionFactory.setUsername("itcast");
        connectionFactory.setPassword("123321");
        //5.设置VirtualHost
        connectionFactory.setVirtualHost("/meiteVirtualHosts");
        return connectionFactory.newConnection();
    }
}

编写生产者代码:

public class Producer {
    private static final String QUEUE_NAME = "mayikt";

    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建一个新连接
        Connection connection = RabbitMQConnection.getConnection();
        //2.设置channel
        Channel channel = connection.createChannel();
        //3.发送消息
        String msg = "哈哈哈";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        System.out.println("消息投递成功");
        channel.close();
        connection.close();
    }
}

编写消费者代码:

public class Consumer {
    private static final String QUEUE_NAME = "mayikt";

    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
        // 1.创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2.设置通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msg);
                // 消费者完成 消费该消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }
}

启动生产者并投递两次消息:

在这里插入图片描述

此时mq控制台:

在这里插入图片描述

然后启动消费者:

在这里插入图片描述

再看mq 控制台:

在这里插入图片描述

消息被成功消费

6.6、RabbitMQ如何保证消息不丢失

Mq如何保证消息不丢失:

1、 生产者角色

  • 确保生产者投递消息到MQ服务器端成功。

2、 消费者角色

在rabbitmq情况下:

  • 必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。

在kafka中的情况下:

  • 不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。

3、 Mq服务器端 在默认的情况下 都会对队列中的消息实现持久化,持久化硬盘。

使用消息确认机制+持久化

A.消费者确认收到消息机制

channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

注:第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。

在处理完消息时,返回应答状态。

channel.basicAck(envelope.getDeliveryTag(), false);

B.生产者确认投递消息成功 使用Confirm机制 或者事务消息

在这里插入图片描述

在这里插入图片描述

C.RabbitMQ默认创建是持久化

在这里插入图片描述

代码中设置 durable为 true

参数名称详解:

  • durable是否持久化 : durable为持久化、 Transient 不持久化

  • autoDelete 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除

Demo

生产者:

public class Producer {
    private static final String QUEUE_NAME = "mayikt";

    public static void main(String[] args) {
        try {
            //1.创建一个新连接
            Connection connection = RabbitMQConnection.getConnection();
            //2.设置channel
            Channel channel = connection.createChannel();
            //3.发送消息
            String msg = "哈哈哈";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            boolean result = channel.waitForConfirms();
            if (result) {
                System.out.println("消息投递成功");
            } else {
                System.out.println("消息投递失败");
            }
            channel.close();
            connection.close();
        } catch (Exception e) {

        }
    }
}

消费者:

public class Consumer {
    private static final String QUEUE_NAME = "mayikt";

    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
        // 1.创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2.设置通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msg);
                // 消费者完成 消费者通知给mq服务器端删除该消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

6.7、RabitMQ工作队列模型

默认的传统队列是均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。

在这里插入图片描述

采用工作队列:

在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送

channel.basicQos(1);

6.8、RabbitMQ交换机

  • Direct exchange(直连交换机)

  • Fanout exchange(扇型交换机)

  • Topic exchange(主题交换机)

  • Headers exchange(头交换机)

再来理解下相关概念:

/Virtual Hosts 区分不同的团队

队列 存放消息

交换机 路由消息存放在那个队列中 类似于nginx

路由key 分发规则

6.8.1、fanout交换机

生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者。

在这里插入图片描述

在这里插入图片描述

原理:

  1. 需要创建两个队列 ,每个队列对应一个消费者;

  2. 队列需要绑定交换机;

  3. 生产者投递消息到交换机中,交换机再将消息分配给两个队列中都存放起来;

  4. 消费者从队列中获取这个消息。

Demo

生产者代码:

public class ProducerFanout {

    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        String msg = "每特教育";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        channel.close();
        connection.close();
    }

}

邮件消费者代码:

public class MailConsumer {
    /**
     * 定义邮件队列
     */
    private static final String QUEUE_NAME = "fanout_email_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

短信消费者:

public class SmsConsumer {
    /**
     * 定义短信队列
     */
    private static final String QUEUE_NAME = "fanout_email_sms";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

6.8.2、Direct路由模式

当交换机类型为direct类型时,根据队列绑定的路由键转发到具体的队列中存放消息

在这里插入图片描述

Demo

生产者代码:

public class ProducerDirect {

    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        String msg = "每特教育";
        channel.basicPublish(EXCHANGE_NAME, "email", null, msg.getBytes());
        channel.close();
        connection.close();
    }

}

邮件消费者:

public class MailConsumer {
    /**
     * 定义邮件队列
     */
    private static final String QUEUE_NAME = "direct_email_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

短信消费者:

public class SmsConsumer {
    /**
     * 定义短信队列
     */
    private static final String QUEUE_NAME = "direct_sms_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

6.8.3、Topic 主题模式

当交换机类型为topic类型时,根据队列绑定的路由键模糊转发到具体的队列中存放。

在这里插入图片描述

Demo

生产者:路由key 是 mayikt.sms

public class ProducerTopic {

    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        String msg = "每特教育";
        channel.basicPublish(EXCHANGE_NAME, "mayikt.sms", null, msg.getBytes());
        channel.close();
        connection.close();
    }

}

邮件消费者:路由key 是 mayikt.* ,与生产者可以匹配上,成功消费到消息

public class MailConsumer {
    /**
     * 定义邮件队列
     */
    private static final String QUEUE_NAME = "topic_email_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "mayikt.*");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

短信消费者:路由key 是meite.* 与生产者不能匹配, 消费不到消息

public class SmsConsumer {
    /**
     * 定义短信队列
     */
    private static final String QUEUE_NAME = "topic_sms_queue";
    /**
     * 定义交换机的名称
     */
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "meite.*");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

7、SpringBoot整合RabbitMq

1、导入依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.1.RELEASE</version>
</parent>
<dependencies>

    <!-- springboot-web组件 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 添加springboot对amqp的支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>
    <!--fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.49</version>
    </dependency>

2、配置类

@Component
public class RabbitMQConfig {
    /**
     * 定义交换机
     */
    private String EXCHANGE_SPRINGBOOT_NAME = "/mayikt_ex";


    /**
     * 短信队列
     */
    private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
    /**
     * 邮件队列
     */
    private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";

    /**
     * 配置smsQueue
     *
     * @return
     */
    @Bean
    public Queue smsQueue() {
        return new Queue(FANOUT_SMS_QUEUE);
    }

    /**
     * 配置emailQueue
     *
     * @return
     */
    @Bean
    public Queue emailQueue() {
        return new Queue(FANOUT_EMAIL_QUEUE);
    }

    /**
     * 配置fanoutExchange
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
    }

    // 绑定交换机 sms
    @Bean
    public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(smsQueue).to(fanoutExchange);
    }

    // 绑定交换机 email
    @Bean
    public Binding bindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(emailQueue).to(fanoutExchange);
    }
}

3、配置文件

spring:
  rabbitmq:
    ####连接地址
    host: 192.168.181.103
    ####端口号
    port: 5672
    ####账号
    username: itcase
    ####密码
    password: 123321
    ### 地址
    virtual-host: /meiteVirtualHosts

4、生产者

@RestController
public class FanoutProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 发送消息
     *
     * @return
     */
    @RequestMapping("/sendMsg")
    public String sendMsg(String msg) {
        /**
         * 1.交换机名称
         * 2.路由key名称
         * 3.发送内容
         */
        amqpTemplate.convertAndSend("/mayikt_ex", "", msg);
        return "success";
    }
}

5、消费者

@Slf4j
@Component
@RabbitListener(queues = "fanout_email_queue")
public class FanoutEmailConsumer {

    @RabbitHandler
    public void process(String msg) {
        log.info(">>邮件消费者消息msg:{}<<", msg);
    }
}
@Slf4j
@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {

    @RabbitHandler
    public void process(String msg) {
        log.info(">>短信消费者消息msg:{}<<", msg);
    }
}

8、Mq如何获取消费结果

应该根据业务来定

消费者消费成功的结果:

  1. 能够在数据库中成功插入一条数据

  2. Rocketmq自带全局消息id,能够根据该全局消息获取消费结果

原理:

  • 生产者投递消息到mq服务器,mq服务器端在这时候返回一个全局的消息id
  • 消费者消费该消息成功之后,消费者会给mq服务器端发送通知标记该消息消费成
  • 生产者获取到该消息全局id,每隔2s时间调用mq服务器端接口 查询该消息是否有被消费成功。

异步返回一个全局id,前端使用ajax定时主动查询;

在rocketmq中,根据自带消息id查询是否消费成功

9、RabbitMQ死信队列

思维导图:

在这里插入图片描述

9.1、死信队列产生的背景

RabbitMQ死信队列俗称,备胎队列;

消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key

9.2、产生死信队列的原因

  • 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取消息,

    消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。

  • 队列达到最大的长度 (队列容器已经满了)

  • 消费者消费多次消息失败,就会转移存放到死信队列中

在这里插入图片描述

9.3、死信队列的架构原理

死信队列和普通队列区别不是很大

普通队列与死信队列都有自己独立的交换机和路由key队列和消费者

区别:

  1. 生产者投递消息先投递到普通交换机中,普通交换机再将该消息投到普通队列中缓存起来,普通队列对应有自己的独立普通消费者。

  2. 如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机对应有自己独立的 死信队列 对应 独立死信消费者。

9.4、死信队列应用场景

  • 30分钟订单超时设计

  • Redis过期key

死信延迟队列实现:

  • 创建一个普通队列, 没有对应的消费者消费消息,在30分钟过后

    就会将该消息转移到死信备胎消费者实现消费。

  • 死信备胎消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下

    则会开始回滚库存操作。

10、RabbitMQ消息幂等问题

思维导图:

在这里插入图片描述

10.1、RabbitMQ消息自动重试机制

当消费者处理执行业务代码的时候,如果抛出异常的情况下,在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。

应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿

在什么情况下消费者需要实现重试策略?

  • 消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?

    该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。

  • 消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?

    该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。
    可以将日志存放起来,后期通过定时任务或者人工补偿形式

如果是重试多次还是失败消息,需要重新发布消费者版本实现消费, 那么就可以使用死信队列

Mq在重试的过程中,有可能会引发消费者重复消费的问题

Mq消费者需要解决

  • 幂等性问题 : 保证数据唯一

解决方式:

生产者在投递消息的时候,生成一个全局唯一id,放在消息中。

  • 消费者获取到该消息,可以根据该全局唯一id 实现去重复

  • 全局唯一id 根据业务来定 比如订单号作为全局的唯一id

实际上还是需要在数据库层面解决数据防重复

  • 业务逻辑是在做insert操作时 使用唯一主键约束

  • 业务逻辑是在做update操作 使用乐观锁

10.2、如何合理选择消息重试

  • 消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试 ?

  • 消费者获取消息后,应该代码问题抛出数据异常,是否需要重试?

总结:如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。

10.3、消费者重试过程中,如何避免幂等性问题

  • 重试的过程中,为了避免业务逻辑重复执行,建议根据全局id提前查询,如果存在

    的情况下,就无需再继续重试。

  • 重试的次数最好有一定间隔次数,在数据库底层保证数据唯一性,比如加上唯一id

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/427859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL索引15连问,你能坚持到第几问?

目录 1.索引是什么? 2.MySQL索引有哪些类型 3.索引什么时候会失效? 4.哪些场景不适合建立索引? 5.为什么要用 B树&#xff0c;为什么不用二叉树? 6.一次B树索引树查找过程 7.什么是回表? 如何减少回表? 8.什么是覆盖索引? 9.聊聊索引的最左前缀原则 10.索引下…

Phind——一款面向开发人员的AI搜索引擎

目录前言一、Phind优点二、使用方法总结前言 Phind是一款面向开发人员的AI搜索引擎&#xff0c;它由大语言模型&#xff08;Large Language Model&#xff0c;LLM&#xff09;驱动 。相比于传统的搜索引擎&#xff0c;Phind有以下优势&#xff1a;自然语言搜索、面向开发者、AI…

【数据结构】期中考试一把梭(通宵版上)

前言 红中(Hong_zhong) CSDN内容合伙人、2023年新星计划web安全方向导师、 吉林师范大学网安大一的一名普通学生、摸鱼拿过大挑校二、 华为MindSpore截至目前最年轻的优秀开发者、IK&N战队队长、 阿里云专家博主、华为网络安全云享专家、腾讯云自媒体分享计划博主、 划了…

URL 和 HandlerMapping建立映射(11)

上一篇https://blog.csdn.net/chen_yao_kerr/article/details/130194864 我们已经分析了Spring MVC的配置&#xff0c;并且说明了如何通过注解的方式去替换各种各样的xml配置文件。本篇将更深入分析&#xff1a; 取代 springmvc.xml 配置 之前我们说过&#xff0c;定义一个类…

简述API(电商数据API)网关的概念和功能

API 网关 ( API gateway ) 前言 在 IOT &#xff08; 物联网 &#xff09;中&#xff0c;当我们的一些设备。例如&#xff08; 监控、传感器等 &#xff09;需要将收集到的数据和信息进行汇总时&#xff0c;我们就需要一个 API。&#xff08;如果你需要Taobao/JD/pinduoduo平台…

OpenAI-ChatGPT最新官方接口《语音智能转文本》全网最详细中英文实用指南和教程,助你零基础快速轻松掌握全新技术(六)(附源码)

Speech to text 语音智能转文本Introduction 导言Quickstart 快速开始Transcriptions 转录python代码cURL代码Translations 翻译python代码cURL代码Supported languages 支持的语言Longer inputs 长文件输入Prompting 提示其它资料下载Speech to text 语音转文本 Learn how to …

一句话设计模式11:过滤器模式

过滤器模式: 直接看 java8的filter; 文章目录过滤器模式: 直接看 java8的filter;前言一、过滤器模式的作用二、如何实现过滤器模式直接上代码总结前言 过滤器模式一般使用场景是: 过滤集合中的不同元素的一种手段,其实平时开发中你经常用,但是你不知道而已;(心里话: 这也算一种…

C++命名空间

C命名空间 C命名空间是一种用于组织代码的机制&#xff0c;它可以将全局命名空间划分为更小的、独立的部分&#xff0c;从而避免命名冲突和名字空间污染。在本文中&#xff0c;我们将介绍C命名空间的基本概念、使用方法和注意事项。 什么是命名空间&#xff1f; 命名空间是C…

QT

多平台C图形用户界面应用程序框架 集成了很多可以直接运用的图形的库 应用在windowns10系统 新建项目 有三种基类可以选择&#xff0c;开发是基于这三种基类的基础上&#xff0c;利用软件支持的QT语言进行界面元素添加与优化 代码添加&#xff08;添加代码时&#xff0c;大小…

flutter实战(1)-配置安装

目录支持的OS安装SDKwindows找到windows对应的SDK安装LINUXsnapd手动IDEMacLinux 或者 Windows 平台支持的OS 有以下这些OS可以安装配置flutter 安装SDK windows 要想安装和运行 Flutter&#xff0c;你的开发环境至少应该满足如下的需求&#xff1a; 操作系统&#xff1a;W…

组合预测模型 | SSA-LSTM、LSTM麻雀算法优化长短期记忆神经网络时间序列预测(Matlab程序)

组合预测模型 | SSA-LSTM、LSTM麻雀算法优化长短期记忆神经网络时间序列预测(Matlab程序) 目录 组合预测模型 | SSA-LSTM、LSTM麻雀算法优化长短期记忆神经网络时间序列预测(Matlab程序)预测结果评价指标基本介绍程序设计参考资料预测结果 评价指标 SSA-LSTM优化得到的最优…

[TIFS 2022] FLCert:可证明安全的联邦学习免受中毒攻击

FLCert: Provably Secure Federated Learning Against Poisoning Attacks | IEEE Journals & Magazine | IEEE Xplore 摘要 由于其分布式性质&#xff0c;联邦学习容易受到中毒攻击&#xff0c;其中恶意客户端通过操纵其本地训练数据和/或发送到云服务器的本地模型更新来毒…

阿里“通义千问”大模型上线!让生成式AI更贴近中国人生活

阿里版的 ChatGPT 语言大模型来了。 张勇在峰会上表示&#xff0c;阿里巴巴所有产品未来将接入“通义千问”大模型&#xff0c;进行全面改造。他认为&#xff0c;面向AI时代&#xff0c;所有产品都值得用大模型重新升级。 目前&#xff0c;钉钉、天猫精灵等产品已接入通义千问测…

PYQT5学习笔记00——Pycharm环境搭建以及配置项目虚拟环境教程

1、安装基本环境 需要的基本环境有python3.x的解释器、pip包管理工具以及pipenv虚拟环境管理工具。   我们安装了python后&#xff0c;pip包管理工具会自带安装&#xff0c;pipenv虚拟环境管理工具我们使用pycharm即可&#xff0c;无需使用python自带的。 python解释器下载地…

【Git代码仓库托管】上海道宁为您提供构建、扩展和交付安全软件的完整开发人员平台

GitHub是用于 构建、扩展和交付安全软件的 完整开发人员平台 通过提高开发人员速度的工具 推动创新 加快高质量软件开发 GitHub提供无限的存储库 一流的版本控制和 世界上强大的开源社区 因此您的团队可以 更高效地协同工作 开发商介绍 GitHub归属于微软公司&#xf…

Java EE企业级应用开发(SSM)第6章

第6章Spring MVC应用一.预习笔记 1.Spring MVC的请求参数 项目的基础配置 web.xml springmvc-config.xml jar包资源引入&#xff1a; 1-1&#xff1a;获取默认参数 jsp页面如下&#xff1a; Controller如下&#xff1a; 1-2&#xff1a;简单数据类型&#xff08;获取数据不…

sqlplus / as sysdba无法登陆

dba你快用你无敌sysdba登陆数据库&#xff01; 导言&#xff1a;as sysdba属于特殊的数据库权限&#xff0c;使用的是系统认证&#xff0c;sqlplus sys/passtns as sysdba用的才是你的密码文件中的设置 #认证设置问题 配置在sqlnet.ora文件 在linux下 #sqlnet.authenticati…

MySQL-binlog+dump备份还原

目录 &#x1f341;binlog日志恢复 &#x1f342;binlog介绍 &#x1f342;Binlog的用途 &#x1f342;开启binary log功能 &#x1f342;配置binlog &#x1f341;mysqldump &#x1f342;数据库的导出 &#x1f342;数据库的导入 &#x1f341;mysqldumpbinlog &#x1f990;…

小程序系统API调用

目录&#xff1a; 1 网络请求API和封装 2 展示弹窗和页面分享 3 设备信息和位置信息 4 小程序Storage存储 5 页面跳转和数据传递 6 小程序登录流程演练 小程序的网络请求&#xff0c;不管是post还是get的请求的数据都是写在data里面的。 网络请求一般写在onLoad()的页面生…

debian部署docker(傻瓜式)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 debian10部署dockerdebian10部署docker&#xff08;傻瓜式&#xff09;一、准备工作二、**使用 APT 安装&#xff0c;注意要先配置apt网络源**1.配置网络源2.官方下载三、安装…