添加rabbitmq依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
配置文件中加入rabbitmq配置
server:
port: 18072
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
在启动类上添加注解
@EnableRabbit
@SpringBootApplication
public class RabbitMqBootApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqBootApplication.class, args);
}
}
创建Rabbit配置类
@Configuration
public class RabbitConfig {
//定义队列
public static final String SIMPLE_QUEUE_NAME = "mqTest1";
@Bean
public Queue simpleQueue() {
/**
durable(): 是否持久化,当mq 重启之后还在
exclusive(): 是否独占:只能有一个消费者监听这个队列,当Connection 关闭时,是否删除队列
autoDelete(): 是否自动删除,当没有Consumer 监听时,自动删除
withArgument(): 参数
*/
return QueueBuilder.durable(SIMPLE_QUEUE_NAME).build();
}
}
创建消费者
@Service
@Slf4j
public class RabbitMqConsumer {
@RabbitListener(queues = "mqTest1")
public void receive(@Payload String message){
log.info("收到了mqTest1队列消息:" + message);
}
}
创建生产者
@Slf4j
@RestController
@RequestMapping("/mq")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String send(String message){
rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE_NAME,message);
return "发送 " + message + " 到" + RabbitConfig.SIMPLE_QUEUE_NAME;
}
}
压测
使用for循环创建20个线程,每个线程向队列中插入一百万条数据
@RequestMapping("/strongSend")
public String strongSend(){
for (int i = 0; i < 20; i++) {
new Thread(() -> {
for (int i1 = 0; i1 < 1000000; i1++) {
rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE_NAME,
Thread.currentThread().getName());
}
}).start();
}
return "压测完成";
}
启动项目进行压测(记得把消费者关掉,或者消费者另启一个项目)
调用压测接口
进入rabbitmq管理页面查看
已经写入了96万数据,写入速度约每秒1.5万条,后面启动消费者进行消费即可