一、介绍
(1)提供统一接口操作不同厂商的消息队列组件,降低学习成本
(2)生产者和消费者只需操作binder对象即可与消息队列交互,生产者output,消费者input
(3)核心概念:发布订阅、消费组、分区
(4)使用topic模式
二、项目搭建
(1)生产者
a、编写pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>demo20220821</artifactId>
<groupId>com.wsh.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.wsh.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
b、编写application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-rabbit-provider
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.0.166
port: 5672
username: guest
password: guest
bindings:
output:
destination: testExchange
content-type: application/json
binder: defaultRabbit
eureka:
client:
# 客户端设置为true
register-with-eureka: true
# 客户端设置为true
fetch-registry: true
service-url:
# defaultZone: http://localhost:7001/eureka
defaultZone: http://eureka1.com:7001/eureka, http://eureka2.com:7002/eureka
instance:
instance-id: cloudSreamRabbitProvider8801
prefer-ip-address: true
management:
endpoints:
web:
exposure:
include: "*"
c、编写启动类
package com.wsh.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
/**
* @ClassName ConfigMain3344
* @Description: TODO
* @Author wshaha
* @Date 2023/10/15
* @Version V1.0
**/
@SpringBootApplication
@EnableEurekaClient
public class StreamRabbitMqProvider8801 {
public static void main(String[] args) {
SpringApplication.run(StreamRabbitMqProvider8801.class, args);
}
}
d、编写接口及实现类
package com.wsh.springcloud.service;
public interface IMessageProvider {
public String send();
}
package com.wsh.springcloud.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
/**
* @ClassName MessageProviderImpl
* @Description: TODO
* @Author wshaha
* @Date 2023/10/15
* @Version V1.0
**/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
@Autowired
@Qualifier("output")
private MessageChannel messageChannel;
@Override
public String send() {
messageChannel.send(MessageBuilder.withPayload("hello").build());
return null;
}
}
e、编写Controller
package com.wsh.springcloud.controller;
import com.wsh.springcloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName MessageController
* @Description: TODO
* @Author wshaha
* @Date 2023/10/15
* @Version V1.0
**/
@RestController
public class MessageController {
@Autowired
private IMessageProvider messageProvider;
@GetMapping("/sendMessage")
public void sendMessage(){
messageProvider.send();
}
}
(2)编写消费者
a、编写pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>demo20220821</artifactId>
<groupId>com.wsh.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.wsh.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
b、编写application.yml
server:
port: 8802
spring:
application:
name: cloud-stream-rabbit-provider
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.0.166
port: 5672
username: guest
password: guest
bindings:
input:
destination: testExchange
content-type: application/json
binder: defaultRabbit
eureka:
client:
# 客户端设置为true
register-with-eureka: true
# 客户端设置为true
fetch-registry: true
service-url:
# defaultZone: http://localhost:7001/eureka
defaultZone: http://eureka1.com:7001/eureka, http://eureka2.com:7002/eureka
instance:
instance-id: cloudSreamRabbitProvider8801
prefer-ip-address: true
management:
endpoints:
web:
exposure:
include: "*"
c、编写启动类
package com.wsh.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
/**
* @ClassName ConfigMain3344
* @Description: TODO
* @Author wshaha
* @Date 2023/10/15
* @Version V1.0
**/
@SpringBootApplication
@EnableEurekaClient
public class StreamRabbitMqConsumer8802 {
public static void main(String[] args) {
SpringApplication.run(StreamRabbitMqConsumer8802.class, args);
}
}
d、编写Controller
package com.wsh.springcloud.controller;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName ConsumerController
* @Description: TODO
* @Author wshaha
* @Date 2023/10/15
* @Version V1.0
**/
@RestController
@EnableBinding(Sink.class)
public class ConsumerController {
@StreamListener(Sink.INPUT)
public void receiveMessage(Message<String> message){
System.out.println(message.getPayload());
}
}
(3)运行
三、解决消息重复消费
(1)绑定同一交换机且不同组的消费者会收到相同消息
(2)解决方式,同一组的消费者只有一个消费者会收到消息,故配置这群消费者为同一个组即可
(3)配置
四、消息持久化
(1)定义分组后会实现消息持久化,原理:没定义分组时,服务对应的队列是autodelete,服务停止后就删除队列,手续发送的消息无法收到