SpringCloud_Config配置中心和Bus消息总线和Stream消息驱动

news2024/11/15 17:24:57

文章目录

  • 一、SpringCloudConfig配置中心
    • 1、SpringCloudConfig配置中心的概论
    • 2、SpringCloudConfig配置中心的gitee仓库搭建
    • 3、SpringCloudConfig配置中心服务端的搭建
    • 4、SpringCloudConfig配置中心客户端的的搭建
    • 5、SpringCloudConfig配置中心客户端动态刷新配置文件
  • 二、SpringCloudBus消息总线
    • 1、SpringCloudBus消息总线的概论
    • 2、虚拟机安装RabbitMQ
    • 3、SpringCloudBus动态刷新全局广播
  • 三、SpringCloudStream消息驱动
    • 1、SpringCloudStream消息驱动的概论
    • 2、SpringCloudStream消息驱动的消息生产者
    • 3、SpringCloudStream消息驱动的消息消费者1
    • 4、SpringCloudStream消息驱动的消息消费者2
    • 5、SpringCloudStream消息驱动的分组消费
  • 总结

一、SpringCloudConfig配置中心

1、SpringCloudConfig配置中心的概论

  1. 在分布式系统中,由于服务数量巨多,为了方便服务配置文件统一管理,实时更新,所以需要分布式配置中心组件。
    SpringCloudConfig项目是一个解决分布式系统的配置管理问题。
    SpringCloudConfig项目包含了client和server两个部分。
  2. Config的作用:
    1)提供服务端和客户端支持
    2)集中管理各环境的配置文件
    3)配置文件修改之后,可以快速的生效
    4)可以进行版本管理
    5)支持大的并发查询
    6)支持各种语言

2、SpringCloudConfig配置中心的gitee仓库搭建

  1. 登录码云gitee,然后点击+号,新建仓库,输入仓库名称以及路径后,点击创建即可。
    在这里插入图片描述
    设置成开源的
    在这里插入图片描述
    随便写一下介绍,以及后面公开须知的三个选项都打上勾即可,点击保存即可。

  2. 将gitee的仓库拉取到指定的本地文件中
    1)在gitee中点击进入刚刚创建的仓库中,复制HTTPS的链接
    在这里插入图片描述

2)在存放的文件夹中右键选择Git Bash Here
在这里插入图片描述
3)在刚刚打开的git命令行中拉取gitee的远程仓库到本地文件中
先输入这两个命令
在这里插入图片描述
再拉取即可
在这里插入图片描述

  1. 创建并编写对应的yml配置文件
    1)在拉取的仓库内,也就是有.git文件的文件夹中创建config-dev.yml、config-prod.yml、config-test.yml文件
    在这里插入图片描述
    2)编辑config-dev.yml文件

    config:
        info: config-dev.yml version=1
    

    编辑config-test.yml文件

    config:
        info: config-test.yml version=1
    

    编辑config-prod.yml文件

    config:
        info: config-prod.yml version=1
    

    3)在.git的文件夹中右键选择Git Bash Here,即再次打开git命令行
    在这里插入图片描述
    即将刚刚创建的3个文件都上传到码云gitee。第一个指令是指将当前文件夹的文件上传,第二个指令是上传到新增文件配置中,第三个是上传指令。
    也就是说第一个是指定要上传的文件夹,第二是上传到哪个文件夹,第三个是上传。
    4)上传成功后可以看到如下
    在这里插入图片描述

3、SpringCloudConfig配置中心服务端的搭建

  1. 在cloud父项目中创建子模块项目cloud-config-server3344
    在这里插入图片描述

  2. 在cloud-config-server3344项目的POM文件中添加依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud</artifactId>
            <groupId>com.zzx</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-config-server3344</artifactId>
    
        <properties>
            <maven.compiler.source>17</maven.compiler.source>
            <maven.compiler.target>17</maven.compiler.target>
        </properties>
        <dependencies>
            <!--  引入Eureka client依赖  -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <!--  引入 config 依赖-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-config-server</artifactId>
            </dependency>
            <!--  引入 web 依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.22</version>
            </dependency>
        </dependencies>
    
    
    </project>
    
  3. 修改cloud-config-server3344项目的com.zzx包下的主启动类Main,修改为ConfigServer3344,修改代码如下

    package com.zzx;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.config.server.EnableConfigServer;
    /**
     * 主启动类
     */
    @SpringBootApplication
    @Slf4j
    // 开启配置中心功能
    @EnableConfigServer
    public class ConfigServer3344 {
        public static void main(String[] args) {
            SpringApplication.run(ConfigServer3344.class,args);
            log.info("****** ConfigServer3344服务 启动成功 *****");
        }
    }
    
  4. 在cloud-config-server3344项目的resources目录下,创建并编写application.yml文件
    1)代码如下

    server:
      port: 3344
    eureka:
      instance:
        # 注册的实例名
        instance-id: cloud-config-server3344
      client:
        service-url:
          # Eureka server的地址
          #单机
          #defaultZone: http://localhost:7001/eureka/
          #集群
          defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka
    spring:
      application:
        #设置应用名
        name: cloud-config-server
      cloud:
        config:
          server:
            git:
              # git仓库地址
              uri: https://gitee.com/zzx0402/cloud-config3344.git
              # 占位符url
              search-paths:
                - cloud-config
          # git仓库上面的分支名字
          label: master
    

    2)复制gitee的https的仓库链接,复制到上面uri那里
    在这里插入图片描述

  5. 测试
    1)启动eureka7001和eureka7002以及config3344服务
    2)在浏览器中访问:http://localhost:3344/config-test.yml
    在这里插入图片描述

4、SpringCloudConfig配置中心客户端的的搭建

  1. Config配置读取规则
    Config支持的请求的参数规则
    1)/{application}/{profile}[/{label}]
    2)/{application}-{profile}.yml
    3)/{label}/{application}-{profile}.yml
    4)/{application}-{profile}.properties
    5)/{label}/{application}-{profile}.properties
    其中{application} 就是应用名称;{profile} 就是配置文件的版本(dev、test、prod);{label} 表示 git 分支

  2. 在父工程cloud下,创建子模块项目cloud-config-client3355
    在这里插入图片描述

  3. 在config3355项目下的POM文件中添加如下依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud</artifactId>
            <groupId>com.zzx</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-config-client3355</artifactId>
    
        <properties>
            <maven.compiler.source>17</maven.compiler.source>
            <maven.compiler.target>17</maven.compiler.target>
        </properties>
        <dependencies>
            <!--  引入Eureka client依赖  -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <!--  引入 config 依赖-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-config</artifactId>
            </dependency>
            <!--  引入 web 依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.22</version>
            </dependency>
            <dependency>
    			  <groupId>org.springframework.cloud</groupId>
    			  <artifactId>spring-cloud-starter-bootstrap</artifactId>
    		</dependency>
    		
    
        </dependencies>
    
    
    </project>
    
  4. 修改在config3355项目下com.zzx包下的主启动类Main,修改为ConfigClientMain3355 ,代码如下

    package com.zzx;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    @Slf4j
    public class ConfigClientMain3355 {
        public static void main(String[] args) {
            SpringApplication.run(ConfigClientMain3355.class,args);
            log.info("********** ConfigClientMain3355服务 启动成功 ********");
        }
    }
    
  5. 在config3355项目中的resources目录下创建bootstrap.yml配置文件,配置如下:

    server:
      port: 3355
    
    eureka:
      instance:
        # 注册名
        instance-id: cloud-config-client3355
      client:
        service-url:
          # Eureka server的地址
          #集群
          defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka
          #单机
          #defaultZone: http://localhost:7001/eureka/
    spring:
      application:
        #设置应用名
        name: cloud-config-client
      cloud:
        config:
          # 分支名字
          label: master
          # 应用名字
          name: config
          # 环境名
          profile: dev
          # config server 地址
          uri: http://localhost:3344
    
  6. 在config3355项目中的com.zzx.controller包下创建配置控制层类ConfigController,代码如下:

    package com.zzx.controller;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class ConfigController {
        @Value("${config.info}")
        private String configInfo;
    
        /**
         * 读取配置文件的内容
         * @return
         */
        @GetMapping("getConfigInfo")
        public String getConfigInfo(){
            return configInfo;
        }
    }
    
    
  7. 测试
    1)启动Eureka7001和Eureka7002以及config3344和config3355服务
    2)在浏览器中访问:http://localhost:3355/getConfigInfo
    在这里插入图片描述

5、SpringCloudConfig配置中心客户端动态刷新配置文件

  1. 在gitee上修改配置文件后,config3344,也就是服务器端可以马上更新;但是config3355,也就是客户端没有更新。
    即需要实现客户端config3355的动态刷新配置文件的功能。

  2. 在config3355项目中的POM文件中添加actuator监控依赖

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    
  3. 在config3355项目中的bootstrap.yml文件中添加暴露监控端口的配置

    management:
      endpoints:
        web:
          exposure:
            include: "*"
    
  4. 在config3355项目中的com.zzx.controller包下的ConfigController类上添加如下注解

    @RefreshScope
    
  5. 测试
    1)配置完后重启config3355服务后
    在gitee中修改版本号
    在这里插入图片描述
    发现在浏览器上访问客户端一样是不会刷新,因为需要访问特定的url,以及使用POST请求。
    2)使用Postman工具来发送POST请求,进行手动刷新
    在这里插入图片描述
    3)再次打开浏览器访问:http://localhost:3355/getConfigInfo
    在这里插入图片描述
    此时已经更新。

二、SpringCloudBus消息总线

1、SpringCloudBus消息总线的概论

  1. Spring Cloud Bus通过建立多个应用之间的通信频道,管理和传播应用间的消息,从技术角度来说,应用了AMQP消息代理作为通道,通过MQ的广播机制实现消息的发送和接收。Bus支持两种消息代理:RabbitMQ和Kafka 。
    Spring Cloud Bus组件主要解决配置中心客户端手动刷新的问题。
  2. Spring Cloud Bus做配置更新的步骤:
    1)修改配置文件,提交代码触发post给客户端A发送bus/refresh
    2)客户端A接收到请求从Server端更新配置并且发送给Spring Cloud Bus
    3)Spring Cloud bus接到消息并通知给其它客户端
    4)其它客户端接收到通知,请求Server端获取最新配置
    5)全部客户端均获取到最新的配置
    即修改配置文件后,A更新并且通知消息总线;消息总线再通过广播发送消息给其他客户端,其他客户端更新。

2、虚拟机安装RabbitMQ

  1. 打开centos7的虚拟机,使用mobax终端连接该虚拟机,安装docker:yum install docker
  2. 启动dokcer:systemctl start docker
  3. 使用docker指令拉取RabbitMQ消息中间件的镜像:docker pull docker.io/macintoshplus/rabbitmq-management
  4. 启动RabbitMQ:docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 docker.io/macintoshplus/rabbitmq-management
    即后台启动RabbitMQ,指定名字为rabbitmq,用户名和密码都为guest,可视化RabbitMQ端口15672,服务端口为5672,镜像名字为docker.io/macintoshplus/rabbitmq-management。
  5. IP是虚拟机的IP地址,浏览器访问:http://192.168.126.11:15672/在这里插入图片描述
  6. 此时用户名和密码都是上面指定的guest,登录后如图
    在这里插入图片描述

3、SpringCloudBus动态刷新全局广播

  1. 创建父项目cloud的子模块项目cloud-config-client3366
    1)直接复制cloud-config-client3355,粘贴到cloud父项目中,修改项目名字为cloud-config-client3366
    在这里插入图片描述
    2)修改cloud-config-client3366的POM文件中的项目名为cloud-config-client3366
    在这里插入图片描述
    3)右键cloud-config-client3366的POM文件,选择AddasMavenProject
    在这里插入图片描述
    4)将主启动类cloud-config-client3355都修改为cloud-config-client3366
    在这里插入图片描述
    5)修改bootstrap.yml文件的端口号和注册的实例名
    在这里插入图片描述

  2. 给config3344、config3355、config3366项目的POM文件添加RabiitMQ代理的消息总线BUS的依赖

    <!--    消息总线BUS    -->
    <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-starter-bus-amqp</artifactId>
    </dependency>
    
  3. 给config3344、config3355、config3366项目的yml文件添加rabbitmq的配置信息

    spring:
      rabbitmq:
        host: 192.168.126.11
        port: 5672
        username: guest
        password: guest
    
  4. 测试实现一次手动刷新配置,所有客户端都能刷新配置
    1)启动eureka7001和eureka7002和config3344和config3355以及config3366服务
    2)更新gitee上配置信息的版本号为3,然后提交
    在这里插入图片描述
    3)使用postman请求发送post请求更新配置信息
    在这里插入图片描述
    发送完成后,config3355和config3366服务都更新了配置,即只需要发送一个,其他客户端也会更新。

  5. 测试只刷新一个服务的配置
    1)更新gitee上配置信息的版本号为4,然后提交
    在这里插入图片描述
    2)使用postman请求发送post请求更新配置信息
    在这里插入图片描述
    即在全部更新的url路径上加上指定的应用名:端口号,即可实现单一服务的配置刷新。也就是说3355的版本号更新为4了,而3366不更新,还是3。

三、SpringCloudStream消息驱动

1、SpringCloudStream消息驱动的概论

  1. SpringCloudStream 是一个构建消息驱动微服务的框架。实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。
  2. SpringCloudStream组件主要解决屏蔽底层消息中间件的差异问题。

2、SpringCloudStream消息驱动的消息生产者

  1. 在cloud父工程下创建cloud-stream-rabbitmq-provider8001
    在这里插入图片描述

  2. 在cloud-stream-rabbitmq-provider8001项目中的POM文件中导入依赖
    在这里插入图片描述
    即导入这两个依赖的任意一个即可。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud</artifactId>
            <groupId>com.zzx</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-stream-rabbitmq-provider8001</artifactId>
    
        <properties>
            <maven.compiler.source>17</maven.compiler.source>
            <maven.compiler.target>17</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
            <!--  引入Eureka client依赖  -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.22</version>
            </dependency>
        </dependencies>
    
    
    </project>
    
  3. 在cloud-stream-rabbitmq-provider8001项目的com.zzx包下,修改主启动类Main的名字为StreamMain8001,修改代码如下

    package com.zzx;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    @Slf4j
    public class StreamMain8001 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMain8001.class,args);
            log.info("******** StreamMain8001 启动成功 *******");
        }
    }
    
  4. 在cloud-stream-rabbitmq-provider8001项目的resources目录下,创建application.yml配置文件,添加如下配置

    server:
      port: 8001
    eureka:
      instance:
        # 注册的实例名
        instance-id: cloud-stream-provider8001
      client:
        service-url:
          # Eureka server的地址
          #单机
          #defaultZone: http://localhost:7001/eureka/
          #集群
          defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka
    spring:
      application:
        #设置应用名
        name: cloud-stream-provider
      rabbitmq:
        host: 192.168.126.11
        port: 5672
        username: guest
        password: guest
      cloud:
        stream:
          bindings:
            # 广播消息 生产者绑定名称,myBroadcast是自定义的绑定名称,out代表生产者,0是固定写法
            myBroadcast-out-0:
              # 对应的真实的 RabbitMQ Exchange
              destination: my-broadcast-topic
    
    
    
    
  5. 在cloud-stream-rabbitmq-provider8001项目的com.zzx包下创建common包,在common包下创建消息实体类MyMessage,添加如下代码

    package com.zzx.common;
    
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.io.Serializable;
    
    /**
     * 消息实体类
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class MyMessage implements Serializable {
        /**
         * 消息体
         */
        private String payload;
    }
    
    
  6. 在cloud-stream-rabbitmq-provider8001项目的com.zzx包下创建service包,在service包下创建消息接口IMessageProvider,添加如下代码

    package com.zzx.service;
    
    import org.springframework.stereotype.Service;
    
    /**
     * 发送消息的接口
     */
    public interface IMessageProvider {
        /**
         * 发送消息
         * @param message 消息的内容
         * @return
         */
        String send(String message);
    }
    
    
  7. 在cloud-stream-rabbitmq-provider8001项目的com.zzx.service包下创建impl包,在impl包下创建消息实现类MessageProviderImpl,添加如下代码

    package com.zzx.service.impl;
    
    import com.zzx.common.MyMessage;
    import com.zzx.service.IMessageProvider;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.function.StreamBridge;
    import org.springframework.stereotype.Service;
    
    /**
     * 定义消息推送的管道
     */
    @Service
    public class MessageProviderImpl implements IMessageProvider {
        @Autowired
        private StreamBridge streamBridge;
        @Override
        public String send(String message) {
            MyMessage myMessage = new MyMessage();
            myMessage.setPayload(message);
            /**
             * 第一个参数为 生产者绑定的名称
             * 第二个参数为 发送的消息实体
             */
            streamBridge.send("myBroadcast-out-0",myMessage);
            return "success";
        }
    }
    
    
  8. 在cloud-stream-rabbitmq-provider8001项目的com.zzx包下创建controller包,在controller包下创建消息控制层类ProviderController ,添加如下代码

    package com.zzx.controller;
    
    import com.zzx.service.IMessageProvider;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class ProviderController {
        @Autowired
        private IMessageProvider iMessageProvider;
        /**
         * 发送消息
         * @param message 消息的内容
         * @return
         */
        @GetMapping("send")
        public String send(String message){
            return iMessageProvider.send(message);
        }
    }
    
    
  9. 测试
    1) 启动eureka7001和eureka7002和stream8001服务
    2)使用Postman测试
    在这里插入图片描述
    即测试消息生产者是否能接收到消息。
    3)在postman发送请求成功后,查看可视化rabbitmq时,可以看到exchange会被自动创建。
    在这里插入图片描述

3、SpringCloudStream消息驱动的消息消费者1

  1. 在cloud父工程下创建cloud-stream-rabbitmq-consumer8002
    在这里插入图片描述

  2. 在cloud-stream-rabbitmq-consumer8002项目的POM文件中引入依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud</artifactId>
            <groupId>com.zzx</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>cloud-stream-rabbitmq-consumer8002</artifactId>
    
        <properties>
            <maven.compiler.source>17</maven.compiler.source>
            <maven.compiler.target>17</maven.compiler.target>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
            <!--  引入Eureka client依赖  -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.22</version>
            </dependency>
        </dependencies>
    </project>
    
  3. 在cloud-stream-rabbitmq-consumer8002项目的resources目录下,创建application.yml配置文件,添加如下配置

    server:
      port: 8002
    eureka:
      instance:
        # 注册的实例名
        instance-id: cloud-stream-consumer8002
      client:
        service-url:
          # Eureka server的地址
          #单机
          #defaultZone: http://localhost:7001/eureka/
          #集群
          defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka
    spring:
      application:
        #设置应用名
        name: cloud-stream-consumer
      rabbitmq:
        host: 192.168.126.11
        port: 5672
        username: guest
        password: guest
      cloud:
        stream:
          bindings:
            # 广播消息 消费者绑定名称,myBroadcast是自定义的绑定名称,out代表生产者,in代表消费者,0是固定写法
            myBroadcast-in-0:
              # 对应的真实的 RabbitMQ Exchange
              destination: my-broadcast-topic
    	function:
          # 定义出消费者
          definition: myBroadcast
    
  4. 将cloud-stream-rabbitmq-provider8001项目的com.zzx.common包以及该包下的类复制,粘贴cloud-stream-rabbitmq-consumer8002项目的com.zzx包下

  5. 在cloud-stream-rabbitmq-consumer8002项目的com.zzx.service包下,创建消费者类Consumer

    package com.zzx.service;
    
    import com.zzx.common.MyMessage;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    /**
     * 消费者类
     */
    public class Consumer {
        /**
         * 消费广播消息
         * @return
         */
        @Bean
        public java.util.function.Consumer<MyMessage> myBroadcast(){
            return myMessage -> {
                log.info("接收到了广播消息:{}",myMessage.getPayload());
            };
        }
    }
    
    
  6. 在cloud-stream-rabbitmq-consumer8002项目的com.zzx下,修改主启动类Main的名字为StreamConsumerMain8002,代码如下

    package com.zzx;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * 主启动类
     */
    @SpringBootApplication
    @Slf4j
    public class StreamConsumerMain8002 {
        public static void main(String[] args) {
            SpringApplication.run(StreamConsumerMain8002.class,args);
            log.info("********** StreamConsumerMain8002 启动成功 ********");
        }
    }
    

4、SpringCloudStream消息驱动的消息消费者2

  1. 直接复制在cloud-stream-rabbitmq-consumer8002项目,粘贴到cloud父工程下,修改项目名为cloud-stream-rabbitmq-consumer8003。
    在这里插入图片描述
  2. 在cloud-stream-rabbitmq-consumer8003项目POM修改项目名为cloud-stream-rabbitmq-consumer8003
    在这里插入图片描述
  3. 在cloud-stream-rabbitmq-consumer8003项目右键选择AddasMavenProject
    在这里插入图片描述
  4. 在cloud-stream-rabbitmq-consumer8003项目的application.yml文件中,修改8002为8003
    在这里插入图片描述
  5. 在cloud-stream-rabbitmq-consumer8003项目的主启动类中修改8002为8003
    在这里插入图片描述
  6. 在cloud父工程的POM文件中添加子模块cloud-stream-rabbitmq-consumer8003

在这里插入图片描述

  1. 测试
    1)启动eureka7001和eureka7002和StreamMain8001和StreamConsumerMain8002以及StreamConsumerMain8003服务
    2)在Postman工具发送请求到8001
    在这里插入图片描述
    此时消息生产者8001会将消息进行广播到消息消费者8002和8003;在IDEA中8002和8003在接收到广播的消息后,会在控制台中打印该信息。

5、SpringCloudStream消息驱动的分组消费

  1. 在cloud-stream-rabbitmq-consumer8002和cloud-stream-rabbitmq-consumer8003的yml文件中的bindings下添加如下配置信息

    	# 分组消息
        myGroup-in-0:
          # 对应的真实的 RabbitMQ Exchange
          destination: my-group-topic
          # 同一分组的消费服务,只能有一个消费者消费到消息
          group: my-group-0
    function:
      # 定义出消费者
      definition: myBroadcast;myGroup
    

    此时跟前面的广播配置不同的是多了一个group,即分组。

  2. 在cloud-stream-rabbitmq-provider8001的yml文件中的bindings下添加如下配置信息

    # 分组消费 生产者绑定名称,myGroup是自定义的绑定名称,out代表生产者,0是固定写法
    myGroup-out-0:
      # 对应的真实的 RabbitMQ Exchange
      destination: my-group-topic  
    
  3. 在cloud-stream-rabbitmq-consumer8002和cloud-stream-rabbitmq-consumer8003的com.zzx.service包中的Consumer类中添加分组消费方法

    @Bean
        public java.util.function.Consumer<MyMessage> myGroup(){
            return myMessage -> {
                log.info("接收到了分组消息:{}",myMessage.getPayload());
            };
        }
    
  4. 在cloud-stream-rabbitmq-provider8001项目中的com.zzx.service包中的IMessageProvider中,添加如下接口方法

    String groupSend(String message);
    
  5. 在cloud-stream-rabbitmq-provider8001项目中的com.zzx.service.impl包中的MessageProviderImpl中,实现IMessageProvider接口的方法,代码如下

     @Override
        public String groupSend(String message) {
            MyMessage myMessage = new MyMessage();
            myMessage.setPayload(message);
            /**
             * 第一个参数为 生产者绑定的名称
             * 第二个参数为 发送的消息实体
             */
            streamBridge.send("myGroup-out-0",myMessage);
            return "success";
        }
    
  6. 在cloud-stream-rabbitmq-provider8001项目中的com.zzx.controller包中的ProviderController中,添加调用发送分组消息方法,代码如下

    /**
         * 发送分组消息
         * @param message 消息的内容
         * @return
         */
        @GetMapping("groupSend")
        public String groupSend(String message){
            return iMessageProvider.groupSend(message);
        }
    
  7. 测试
    1)启动eureka7001和eureka7002和StreamMain8001和StreamConsumerMain8002以及StreamConsumerMain8003服务
    2)在Postman工具发送请求到8001
    在这里插入图片描述
    此时调用groupSend方法,也就是分组方法,即8002和8003这两个消费者每次只有一个会接收到分组消息并在IDEA控制台进行打印。

总结

  1. 1)在分布式系统中,由于服务数量巨多,为了方便服务配置文件统一管理,实时更新,所以需要分布式配置中心组件SpringCloudConfig。
    即SpringCloudConfig项目是一个解决分布式系统的配置管理问题。
    2)SpringCloudConfig的配置文件需要存在gitee上,需要创建gitee仓库并设置为开源;创建完gitee的仓库后,需要在本地创建文件夹来存储配置文件,然后在该文件夹中打开git命令行,将gitee中的HTTPS链接复制到该命令行中,从而拉取该gitee远程仓库;最后在创建yml配置文件后,将这些文件上传到gitee上。
    3)SpringCloudConfig服务端的搭建,需要引入config的服务端和eureka客户端等依赖;以及yml文件需要配置gitee的HTTPS的远程仓库链接和分支等信息。config服务器端,即负责向Gitte远程仓库拉取配置文件的服务,以及将拉取过来的配置文件共享给config客户端。
    4)SpringCloudConfig客户端的搭建,需要引入config客户端和actuator监控和eureka客户端以及bootstrap等依赖;以及需要在bootstrap.yml文件中配置SpringCloudConfig服务端的服务器地址、gitee中的分支名、应用名和环境名和暴露监控端口等。而且动态刷新时还需要给ConfigController类上添加@RefreshScope注解,这样才能使用POST请求通过访问本机ip:端口/actuator/refresh路径的方法进行手动刷新gitee上修改的配置。
    手动刷新配置时,必须使用POST请求。
  2. 1)SpringCloudBus消息总线,即修改配置文件后,A更新后通知消息总线;消息总线再通过广播发送消息给其他客户端,其他客户端更新。
    消息总线主要用来解决配置中心客户端的手动刷新;可以用RabbitMQ或Kafka来作为代理。
    即一个客户端更新,全部客户端都会实现更新操作,而不用一个一个的手动更新。
    2)使用Centos7虚拟机,然后用docker来安装RabbitMQ。
    3)SpringCloudBus动态刷新全局广播的实现流程,需要给config的服务端和客户端的POM文件加上Bus的依赖和yml配置文件加上rabbitmq的配置。动态刷新全局广播,即一个客户端发送刷新消息到Bus消息总线即可实现全部客户端刷新配置文件。
    也可以实现单一服务刷新配置,即在全局的url上加上应用名:端口号即可。
  3. 1)SpringCloudStream是一个构建消息驱动微服务的框架,主要解决屏蔽底层消息中间件的差异问题。
    2)创建消息生产者项目,需要导入eureka客户端和stream-rabbit等依赖;配置rabbitmq信息和stream绑定生产者的交换机等信息。然后通过StreamBridge对象进行消息的发送等操作。
    3)创建消息的消费者项目,也需要导入eureka客户端和stream-rabbit等依赖;配置rabbitmq信息和stream绑定生产者的交换机等信息,但是绑定消费者名称时,需要指定为in。
    并且需要在definition属性指定消费者方法的名字;然后在类中需要实现这个消费者方法并且在方法上加上@Bean注解,将该类上加上@Component注解,即将该类中的使用@Bean的方法交给SpringIOC容器。在消息生产者通过广播发送消息到消息消费者,Spring就会调用该方法。
    4)分组消费,即在广播的基础上,在yml文件加上group属性,用于定义分组信息。在同一个分组内的所有消费者,当消费生产者每次发送消息只有一个消费者能接收到。
  4. 1)Config配置中心用来解决分布式系统的配置管理问题。
    2)但是Config需要配置中心的客户端去手动刷新配置,然而Bus消息总线使用消息中间件解决配置中心客户端的手动刷新,也就是说将消息发送给Bus消息总线,Bus消息总线给所有配置中心客户端发送更新的消息,但是配置中心服务端和客户端都需要yml文件中配置消息中间件的信息,以此进行绑定。
    3)Stream消息驱动用来解决屏蔽底层消息中间件的差异问题。即解决消息中间件不一致的问题,因为消息中间件有很多种。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/498333.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何用ChatGPT做品牌联名方案策划?

该场景对应的关键词库&#xff08;15个&#xff09;&#xff1a; 品牌、个人IP、社交话题、联名策划方案、调研分析、市场影响力、资源互补性、产品体验、传播话题、视觉形象设计、合作职权分配、销售转化、曝光目标、宣发渠道、品牌形象 提问模板&#xff08;1个&#xff09;…

Milvus应用开发实战【语义搜索】

美国总统竞选活动即将到来。 现在是回顾拜登政府上任头两年的一些演讲的好时机。 搜索一些演讲记录以了解更多关于白宫迄今为止关于某些主题的信息不是很好吗&#xff1f; 假设我们要搜索演讲的内容。 我们该怎么做&#xff1f; 我们可以使用语义搜索。 语义搜索是目前人工智能…

【谷粒商城之分布式锁Redisson-lock】

本笔记内容为尚硅谷谷粒商城分布式锁Redisson-lock部分 目录 一、分布式锁与本地锁 二、分布式锁实现 使用 RedisTemplate 操作分布式锁 三、Redisson 完成分布式锁 1、简介 2、导入依赖 3、配置 4、使用 1.可重入锁 2.公平锁&#xff08;Fair Lock&#xff09; 3…

记录-VUE中常用的4种高级方法

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 1. provide/inject provide/inject 是 Vue.js 中用于跨组件传递数据的一种高级技术&#xff0c;它可以将数据注入到一个组件中&#xff0c;然后让它的所有子孙组件都可以访问到这个数据。通常情况下&a…

DC-8通关详解

信息收集 漏洞发现 找个扫描器扫一下 msf试了几个exp都没用 那么手动找找 发现传参nid出存在sql注入 python sqlmap -u "http://192.168.45.144:80/?nid1" --union-cols1 -D d7db -T users -C name,pass --columns --tables --dump 用john爆密码 admin爆了20分钟没…

为Linux系统添加一块新硬盘,并扩展根目录容量

我的原来ubuntu20.04系统装的时候不是LVM格式的分区&#xff0c; 所以先将新硬盘转成LVM&#xff0c;再将原来的系统dd到新硬盘&#xff0c;从新硬盘的分区启动&#xff0c;之后再将原来的分区转成LVM&#xff0c;在融入进来 1&#xff1a;将新硬盘制作成 LVM分区 我的新硬盘…

Python进阶

1.Json数据格式&#xff08;用于不同语言的数据交互&#xff09; 特定格式的字符串 第一种形式的Json &#xff0c;转换成字典 第二种形式的Json&#xff0c;转换成字典列表 1.1 Python的Json转化 dumps 方法 Python转Json loads 方法 Json转Python 1.2 字典转json 需要…

Studio One6Mac中文免费版数字音乐工作站DAW

无论你是第一次接触数字音乐工作站&#xff08;DAW&#xff09;&#xff0c;还是第一次尝试 制作属于自己的音乐&#xff0c;Studio One 都能给你非凡的体验&#xff01;Studio One 6中文版是一款音乐制作软件&#xff0c;通过新的智能模板、直观的拖放工作流、可定制的用户界面…

C# 使用自带的组件PrintPreviewDialog 和 PrintDocument实现打印预览(一)

文章目录 前言相关内容了解打印预览功能1 创建winform工程2 创建要打印的测试数据3 绘制打印页绘制页脚绘制内容PrintPage事件 完整的代码工程小节 前言 有这么个需求&#xff1a;DataTable中有一些数据是需要给显示或直接可以连接打印机进行打印的&#xff0c; 查阅了一下资料…

jenkins共享ci阶段

jenkins共享ci阶段 需求 一个产品包含多个服务&#xff0c;这些服务的流水线都是类似的&#xff1a;制作制品构建并推送镜像构建并推送chart包触发自动部署。我们期望将流水线拆分为ci流水线、cd流水线&#xff0c;ci流水线包含&#xff1a;制作制品构建并推送镜像构建并推送…

蓝牙协议栈之L2CAP使用

目录 前言一、逻辑链路层及自适应协议层&#xff08;L2CAP&#xff09;二、常用的L2CAP术语三、L2CAP的工作模式四、L2CAP通道五、L2CAP帧类型六、Fragmentation/Recombination七、Segmentation/Reassembly八、L2CAP MTU九、Controller to Host Flow Control十、总结 前言 本文…

七个值得推荐的物联网分析平台

物联网分析平台是一种软件工具&#xff0c;可以帮助企业收集和分析来自其广泛的物联网设备的数据。企业可以通过物联网收集大量数据&#xff0c;从消费者支出模式到流量使用&#xff0c;物联网数据分析平台在帮助企业获得竞争优势所需的洞察力方面至关重要。 物联网分析平台已…

「2023 最新」 Github、Gitlab Git 工作流「常用」 git 命令、规范以及操作总结

Git commit 规范 关于提交信息的格式&#xff0c;可以遵循以下的规则&#xff1a; feat: 新特性&#xff0c;添加功能fix: 修改 bugrefactor: 代码重构docs: 文档修改style: 代码格式修改test: 测试用例修改chore: 其他修改, 比如构建流程, 依赖管理 Git 基础知识 当我们通过…

Midjourney生成LOGO指南

目录 常见的Logo 宠物店Logo Graphic Logo​ Lettermark Logo​ Geometric Logo​ Mascot Logo​ 增加风格——艺术运动​ 每个产品都有自己的专属名称&#xff0c;也有自己专属的Logo&#xff0c;经过前几篇的学习&#xff0c;我相信你也有了一定的基础&#xff0c;今天…

TiDB实战篇-PD调度常见问题处理方法

常见的问题 调度产生和执行 常见的调度类型 参数调度的速度 调度典型场景 Leader分布不均匀监控 leader分布算法&#xff0c;每一个leader的size作为总和&#xff0c;还有TiKV的剩余空间等等。 可以手动设置权重。 分布不均衡处理 TiKV节点下线速度慢 TiKV下线速度慢解决方法 …

一文说透IO多路复用select/poll/epoll

概述 如果我们要开发一个高并发的TCP程序。常规的做法是&#xff1a;多进程或者多线程。即&#xff1a;使用其中一个线程或者进程去监听有没有客户端连接上来&#xff0c;一旦有新客户端连接&#xff0c;就新开一个线程(进程)&#xff0c;将其扔到线程&#xff08;或进程&…

C++——类和对象[下]

0.关注博主有更多知识 C知识合集 目录 1.再谈构造函数 1.1初始化列表 1.2初始化列表的初始化顺序 1.3构造函数的隐式类型转换 1.4explicit关键字 2.static成员 2.1static成员变量 2.2static成员函数 3.友元 3.1友元函数 3.2友元类 4.内部类 5.匿名对象 6.编译器…

美颜sdk对于移动端视频直播的优化效果研究报告

随着移动互联网的快速发展&#xff0c;移动端视频直播应用也越来越受到用户的青睐。然而&#xff0c;对于许多用户来说&#xff0c;直播的画质却成为了一个令人头疼的问题。为了解决这个问题&#xff0c;许多直播应用开始引入美颜sdk&#xff0c;以期提升直播画质和用户体验。本…

凌思微-蓝牙框架-流程理解

1.蓝牙SOC芯片主函数流程 int main() { sys_init_app(); ble_init(); dev_manager_init(dev_manager_callback); gap_manager_init(gap_manager_callback); gatt_manager_init(gatt_manager_callback); rtos_init(); ble_task_init(); app_task_init(); vTaskStartScheduler();…

第8章:聚合函数

目录 一、常见的聚合函数 二、GROUP BY 的使用 三、HAVING 的使用&#xff0c;过滤数据 四、SQL底层的执行原理 五、练习 一、常见的聚合函数 1.概念 聚合函数作用于一组数据&#xff0c;并对一组数据返回一个值。 2.聚合函数的类型 AVG(),SUM(),MAX(),MIN(),COUNT() 3. AV…