屏蔽底层消息中间件MQ的差异,降低切换成本,统一消息的编程模型。
生产者
一、依赖
spring-cloud-starter-stream-rabbit (rabbitMQ中间件)
二、配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #配置要绑定的rabbitmq的服务信息
defatuleRabbit:
type: rabbit #消息组件类型
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
output:
destination: studyExchange # 要使用exchange名称定义
content-type: application/json # 设置消息类型,文本设置为text/plain
binder: defaultRabbit #要绑定的消息服务具体设置
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8001.com # 在信息列表时显示主机名称
prefer-ip-address: true #访问的路径变为ip地址
三、启动类
@SpringBootApplication
public class StreamMQMain8801{
public static void main(String[] args){
SpringApplication.run(StreamMQMain8801.class,args);
}
}
四、业务类(发送消息接口、发送消息接口实现类)
public interface IMessageProvider{
public String send();
}
@EnableBinding(Source.class) //消息推送管道
public class MessageProvierImpl implements IMessageProvider{
@Resource
private MessageChannel output; //消息发送管道
@Override
public String send(){
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayLoad(serial).build());
System.out.println("*****serial:"+serial);
return null;
}
}
五、Controller控制类
@RestController
public class SendMessageController{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessge")
public String sendMessage(){
return messageProvider.send();
}
}
六、测试
启动eureka,启动rabbitmq,启动消息生产者
访问:http://localhost:8801/sendMessage
消费者
一、依赖
spring-cloud-starter-stream-rabbit
二、配置文件
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port:5672
username: guest
password: guest
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
eureka:
client:
service-url:
defaultZont: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: receive-8802.com
prefer-ip-address: true
三、启动类
@SpringBootApplication
public class StreamMQMain8802{
public static void main(String[] args){
SpringApplication.run(StreamMQMain8802.class,args);
}
}
四、接收消息
@Component
@EnableBinding(Sink.class)
public clas ReceiveMessageListenerController{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号,----->接收到的消息:"+ message.getPayload()+ "\t port:"+serverPort);
}
}
克隆一个消费者工程,运行产生的两个问题
- 有重复消费问题,即两个消费者工程同时收到了生产工程的消息(不同组可以重复消费,但同一组内发生竞争关系,只有一个可以消费)
- 消息持久化问题
一、配置文件更改(添加一个组的标记)
两个消费者工程分别配置(自定义配置分组,默认情况是不同的分组)
若避免重复消费,将配置文件的group配置成相同的名称