pom 添加redis
<!-- redis 缓存操作 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
发布消息
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* @author zpy
* @date 2023/8/9 13:48
*/
@Slf4j
@Component
public class RedisMessageUtils {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 向通道发布消息
* topic 通道名称
*/
public void sendMessage(String topic, Object message) {
if (!StringUtils.hasText(topic)) {
return;
}
try {
stringRedisTemplate.convertAndSend(topic, message);
log.info("发送消息成功,topic:{},message:{}", topic, message);
} catch (Exception e) {
log.info("发送消息失败,topic:{},message:{}", topic, message);
e.printStackTrace();
}
}
}
接收消息
/**
* Redis消息监听器容器
* 这个容器加载了RedisConnectionFactory和消息监听器
* 可添加多个不同话题的redis监听器,需要将消息监听器和消息频道绑定,
* 通过反射调用消息订阅处理器的相关方法进行业务处理
*
* @param redisConnectionFactory 连接工厂
* @param listener Redis消息监听器
* @param MessageListenerAdapter Redis消息监听适配器
* @return RedisMessageListenerContainer 消息监听容器
*/
@Bean
@SuppressWarnings("all")
public RedisMessageListenerContainer container(RedisMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactorySlave());
// 所有的订阅消息,都需要在这里进行注册绑定
// new PatternTopic(TOPIC_NAME1) 表示发布信息的频道
// 可以添加多个频道以及配置不同的频道
container.addMessageListener(listener, new PatternTopic("消息频道名称"));
//container.addMessageListener(adapter, new PatternTopic(SystemConstants.TOPIC_NAME2));
container.setTaskExecutor(springSessionRedisTaskExecutor());
/**
* 设置序列化对象
* 特别注意:1. 发布的时候和订阅方都需要设置序列化
* 2. 设置序列化对象必须放在 {加入消息监听器} 这步后面,不然接收器接收不到消息
*/
/*Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
container.setTopicSerializer(seria);*/
return container;
}
/** 自定义接收消息的线程池
*/
@Bean
public ThreadPoolTaskExecutor springSessionRedisTaskExecutor() {
ThreadPoolTaskExecutor springSessionRedisTaskExecutor = new ThreadPoolTaskExecutor();
springSessionRedisTaskExecutor.setCorePoolSize(8);
springSessionRedisTaskExecutor.setMaxPoolSize(15);
springSessionRedisTaskExecutor.setKeepAliveSeconds(10);
springSessionRedisTaskExecutor.setQueueCapacity(1000);
springSessionRedisTaskExecutor.setThreadNamePrefix("redis-");
return springSessionRedisTaskExecutor;
}
测试
@Scheduled(cron = "0/10 * * * * *")
private void init(){
for(int i=0;i<50;i++){
//处理请求
redisMessageUtils.sendMessage("接收频道名称", UUID.randomUUID().toString());
}
}