具体代码
依赖:
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-test -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.5.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
</beans>
写配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="192.168.200.166"
port="5672" username="ljj" password="123456"
virtual-host="/demo"/>
<!--创建一个交换机-->
<!--auto-declare="true" 有则加载该文件,无则创建-->
<rabbit:topic-exchange name="springexchange" auto-declare="true">
</rabbit:topic-exchange>
<!--RedisTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
exchange="springexchange"></rabbit:template>
<!--创建面板工具-->
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
</beans>
News.java:
package com.pb.entity;
import java.io.Serializable;
import java.util.Date;
public class News implements Serializable {
private static final long serialVersionUID = 8707083474774769028L;
private String source;
private String title;
private Date creatTime;
private String content;
public News() {
}
public News(String source, String title, Date creatTime, String content) {
this.source = source;
this.title = title;
this.creatTime = creatTime;
this.content = content;
}
public static long getSerialVersionUID() {
return serialVersionUID;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public Date getCreatTime() {
return creatTime;
}
public void setCreatTime(Date creatTime) {
this.creatTime = creatTime;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
NewsProducer.java:
public class NewsProducer {
private RabbitTemplate rabbitTemplate;
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
}
然后在去配置文件: 在创建一个bean
<!--创建一个bean-->
<bean id="newsProducer" class="com.pb.producer.NewsProducer">
<property name="rabbitTemplate" ref="rabbitTemplate" />
</bean>
然后去加载bean
package com.pb.producer;
import com.pb.entity.News;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.Date;
public class NewsProducer {
private RabbitTemplate rabbitTemplate;
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String routingKey, News news) {
rabbitTemplate.convertAndSend(routingKey, news);
}
public static void main(String[] args) {
ApplicationContext ac = new ClassPathXmlApplicationContext("beans.xml");
NewsProducer newsProducer = ac.getBean(NewsProducer.class);
newsProducer.send("a.123213", new News("xxx", "xxx", new Date(), "xxx"));
newsProducer.send("b.sadfwe", new News("sss", "sss", new Date(), "sss"));
System.out.println("发送成功...");
}
}
运行:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="192.168.200.166"
port="5672" username="ljj" password="123456"
virtual-host="/demo"/>
<!--创建面板工具-->
<rabbit:admin connection-factory="connectionFactory"></rabbit:admin>
<!--创建队列-->
<rabbit:queue name="springqueue" auto-declare="true" auto-delete="false" exclusive="false" durable="false"/>
<!--和交换机进行绑定-->
<rabbit:topic-exchange name="springexchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="a.#" queue="springqueue" />
<rabbit:binding pattern="b.#" queue="springqueue" />
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
在创建监听器的时候先创建一个实体:
实体与生产端一样
NewsConsult.java:
package com.pb.consumer;
import com.pb.entity.News;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class NewsConsumer {
public void getMessage(News news) {
System.out.println(news.getSource() + "\t" + news.getTitle() +"\t" + news.getCreatTime() + "\t" + news.getContent());
}
public static void main(String[] args) {
ApplicationContext ac = new ClassPathXmlApplicationContext("beans.xml");
}
}
在到 配置文件中:加+
<!--创建一个监听-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="newsConsumer" method="getMessage" queue-names="springqueue"></rabbit:listener>
</rabbit:listener-container>
<!--创建消费者的bean-->
<bean id="newsConsumer" class="com.pb.consumer.NewsConsumer"></bean>
运行:
使用RabbitAdmin管理MQ(这样子出不来 有没有人 会? 求助)
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:beans.xml")
public class RabbitTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void test(){
//System.out.println(rabbitAdmin);
//创建交换机
rabbitAdmin.declareExchange(new TopicExchange("aaa"));
rabbitAdmin.declareExchange(new DirectExchange("bbb"));
rabbitAdmin.declareExchange(new FanoutExchange("ccc"));
}
}
迁移到c
package com.pb;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import java.util.HashMap;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:beans.xml")
public class RabbitTest {
@Resource
private RabbitAdmin rabbitAdmin;
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
//创建交换机
rabbitAdmin.declareExchange(new TopicExchange("aaa2", false, false));
/*rabbitAdmin.declareExchange(new DirectExchange("bbb"));
rabbitAdmin.declareExchange(new FanoutExchange("ccc"));*/
rabbitAdmin.declareQueue(new Queue("dadeng"));
}
@Test
public void test1() {
rabbitAdmin.declareBinding(new Binding("dadeng", Binding.DestinationType.QUEUE, "springexchange", "#", new HashMap<String,Object>()));
rabbitTemplate.convertAndSend("aaa", "大灯最近有点飘");
}
@Test
public void test2() {
rabbitAdmin.deleteExchange("springexchange");
rabbitAdmin.deleteQueue("dadeng");
}
}
我们先用这个:
运行test1:
删除
@Test
public void test2() {
rabbitAdmin.deleteExchange("springexchange");
rabbitAdmin.deleteQueue("dadeng");
}
运行:
Springboot与RabbitMQ整合-1 生产者:
依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.pb</groupId>
<artifactId>springbootconsumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springbootconsumer</name>
<description>springbootconsumer</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Product
配置文件:yml
server:
port: 9000
spring:
rabbitmq:
username: test
password: 123
host: 192.168.235.128
port: 5672
virtual-host: /demo
connection-timeout: 20000
publisher-confirm-type: CORRELATED #确认消息是否传递到mq
publisher-returns: true #如果说我们的消息没有匹配的路由则退回给生产者
template:
mandatory: true #如果说我们的消息没有匹配的路由则退回给生产者
Emp.java:
package com.pb.com.pb.entity;
import java.io.Serializable;
public class Emp implements Serializable {
private static final long serialVersionUID = 6010911148025965132L;
private Integer empno;
private String name;
private Integer age;
public Emp() {
}
public Emp(Integer empno, String name, Integer age) {
this.empno = empno;
this.name = name;
this.age = age;
}
public Integer getEmpno() {
return empno;
}
public void setEmpno(Integer empno) {
this.empno = empno;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
EmpProducer.java:
@Component
public class EmpProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send() {
rabbitTemplate.convertAndSend("springbootexchange","#", new Emp(1001, "光光",20));
}
}
报错!!!!
我们手动创建一个交换机:
@Component
public class EmpProducer {
@Resource
private RabbitTemplate rabbitTemplate;
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println(b + "\t" + s);
}
};
RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage msg) {
Message message = msg.getMessage();
String str = new String(message.getBody());
System.err.println("信息为:" + str);
System.err.println("交换机为:" + msg.getExchange());
System.err.println("路由为:" + msg.getRoutingKey());
}
};
public void send() {
//设置消息ack机制
rabbitTemplate.setConfirmCallback(confirmCallback);
//设置消息的return机制
rabbitTemplate.setReturnsCallback(returnsCallback);
rabbitTemplate.convertAndSend("springbootexchange","#", new Emp(1001, "光光",20));
}
}
测试:
消费者:
配置文件:
spring:
rabbitmq:
username: ljj
password: 123456
host: 192.168.200.166
port: 5672
virtual-host: /demo
listener:
simple:
acknowledge-mode: manual #手动签收
concurrency: 1
max-concurrency: 5
NewConsumer.java:
package com.pb.consumer;
import com.pb.entity.Emp;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class NewsConsumer {
@RabbitListener(
bindings=@QueueBinding( //绑定队列和交换机
value=@Queue(name="bootqueue"), //队列
exchange =@Exchange(name = "springbootexchange " ,type = "topic"), //交换机
key = "#" //路由
)
)
public void getMessage(@Payload Emp emp, Channel channel, @Headers Map<String,Object> headers) throws IOException {
System.out.println("接收到了消息:" + emp.getName() + "\t" + emp.getEmpno());
Long id = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); //签收的Id
channel.basicAck(id, false);
}
}
启动服务:消费
运行生产者:tese