目录
- 截图
- 一、代码示例
- 1.1 pom.xml依赖
- 1.2 application.xml配置
- 1.3 启动类
- 1.4 配置类
- 1.5 消息实体
- 1.6 自定义错误
- 1.7 自定义注解
- 1.8 服务类
- 1.9 监听类
- 1.10 controller
截图
一、代码示例
1.1 pom.xml依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-learning</artifactId>
<groupId>com.learning</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springboot-mq-redis</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.5.11</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-26</artifactId>
<version>3.17.1</version>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.15</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.2 application.xml配置
spring:
redis:
host: 127.0.0.1
port: 6379
password:
database: 0
ssl: false
redis:
model: 1
1.3 启动类
package com.learning.mq.redis;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Description 启动类
*/
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
1.4 配置类
package com.learning.mq.redis.config;
import lombok.Data;
@Data
public class RedisClusterProperties {
/**
* 集群状态扫描间隔时间,单位是毫秒
*/
private int scanInterval=5000;
/**
* 集群节点
*/
private String nodes;
/**
* 默认值: SLAVE(只在从服务节点里读取)设置读取操作选择节点的模式。 可用值为: SLAVE - 只在从服务节点里读取。
* MASTER - 只在主服务节点里读取。 MASTER_SLAVE - 在主从服务节点里都可以读取
*/
private String readMode;
/**
* (从节点连接池大小) 默认值:64
*/
private int slaveConnectionPoolSize;
/**
* 主节点连接池大小)默认值:64
*/
private int masterConnectionPoolSize;
/**
* (命令失败重试次数) 默认值:3
*/
private int retryAttempts;
/**
*命令重试发送时间间隔,单位:毫秒 默认值:1500
*/
private int retryInterval;
/**
* 执行失败最大次数默认值:3
*/
private int failedAttempts;
}
package com.learning.mq.redis.config;
import lombok.Data;
@Data
public class RedisPoolProperties {
private int maxIdle;
private int minIdle;
private int maxActive;
private int maxWait;
private int connTimeout;
private int soTimeout = 30000;
/**
* 池大小
*/
private int size;
}
package com.learning.mq.redis.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedisProperties {
private int database;
/**
* 等待节点回复命令的时间。该时间从命令发送成功时开始计时
*/
private int timeout;
private String password;
/**
* 池配置
*/
private RedisPoolProperties pool;
/**
* 集群 信息配置
*/
private RedisClusterProperties cluster;
/**
* 哨兵配置
*/
private RedisSentinelProperties sentinel;
private String host;
private String port;
public String getAddress() {
return this.getHost()+":"+this.getPort();
}
}
package com.learning.mq.redis.config;
import lombok.Data;
@Data
public class RedisSentinelProperties {
/**
* 哨兵master 名称
*/
private String master;
/**
* 哨兵节点
*/
private String nodes;
/**
* 哨兵配置
*/
private boolean masterOnlyWrite;
/**
*
*/
private int failMax;
}
package com.learning.mq.redis.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.*;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
@Slf4j
public class RedissonConfig {
@Autowired
private RedisProperties redisProperties;
@Value("${redis.model}")
private String mode;
@Value("${spring.redis.database:0}")
private String database;
/**
* 单机模式 redisson 客户端
*/
@Bean
@ConditionalOnProperty(name = "redis.model", havingValue = "1")
public RedissonClient redissonSingle() {
log.info("redis.model=1, 为单机模式");
Config config = new Config();
String node = redisProperties.getAddress();
node = node.startsWith("redis://") ? node : "redis://" + node;
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(node)
.setDatabase(redisProperties.getDatabase());
// .setTimeout(redisProperties.getPool().getConnTimeout());
// .setConnectionPoolSize(redisProperties.getPool().getSize())
// .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
/**
* 哨兵模式 redisson 客户端
*
* @return
*/
@Primary
@Bean
@ConditionalOnProperty(name = "redis.model", havingValue = "2")
public RedissonClient redissonSentinel() {
log.info("redis.model=2, 为哨兵模式");
Config config = new Config();
String[] nodes = redisProperties.getSentinel().getNodes().split(",");
List<String> newNodes = new ArrayList(nodes.length);
Arrays.stream(nodes).forEach((index) -> newNodes.add(index.startsWith("redis://") ? index : "redis://" + index));
SentinelServersConfig serverConfig = config.useSentinelServers()
.addSentinelAddress(newNodes.toArray(new String[0]))
.setDatabase(redisProperties.getDatabase())
.setMasterName(redisProperties.getSentinel().getMaster())
.setReadMode(ReadMode.MASTER)//TODO 如果实现读写分离时,需要配置这里
.setTimeout(redisProperties.getTimeout());
// .setMasterConnectionPoolSize(redisProperties.getPool().getSize())
// .setSlaveConnectionPoolSize(redisProperties.getPool().getSize());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
@Primary
@Bean("redissonConnectionFactory")
public RedissonConnectionFactory redissonConnectionFactory() {
RedissonClient redisson;
if ("1".equals(mode)) {
redisson = redissonSingle();
} else if ("2".equals(mode)) {
redisson = redissonSentinel();
} else {
mode = "0";
redisson = redissonCluster();
}
return new RedissonConnectionFactory(redisson);
}
/**
* 集群模式的 redisson 客户端
*
* @return
*/
@Bean
@ConditionalOnProperty(name = "redis.model", havingValue = "3")
RedissonClient redissonCluster() {
log.info("redis.model=3, 为集群模式");
Config config = new Config();
String[] nodes = redisProperties.getCluster().getNodes().split(",");
List<String> newNodes = new ArrayList(nodes.length);
Arrays.stream(nodes).forEach((index) -> newNodes.add(index.startsWith("redis://") ? index : "redis://" + index));
ClusterServersConfig serverConfig = config.useClusterServers()
.addNodeAddress(newNodes.toArray(new String[0]))
.setScanInterval(redisProperties.getCluster().getScanInterval())
.setReadMode(ReadMode.MASTER)
// .setIdleConnectionTimeout(redisProperties.getPool().getSoTimeout())
// .setConnectTimeout( redisProperties.getPool().getConnTimeout())
// .setRetryAttempts( redisProperties.getCluster().getRetryAttempts())
// .setRetryInterval( redisProperties.getCluster().getRetryInterval())
// .setMasterConnectionPoolSize(redisProperties.getCluster().getMasterConnectionPoolSize())
// .setSlaveConnectionPoolSize(redisProperties.getCluster().getSlaveConnectionPoolSize())
.setPingConnectionInterval(1000)
.setTimeout(redisProperties.getTimeout());
if (StringUtils.isNotBlank(redisProperties.getPassword())) {
serverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setDefaultSerializer(jackson2JsonRedisSerializer);
redisTemplate.setEnableDefaultSerializer(true);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
1.5 消息实体
package com.learning.mq.redis.dto;
import lombok.Data;
/**
* @Description 消息实体
*/
@Data
public class MessageDTO {
private String content;
}
1.6 自定义错误
package com.learning.mq.redis.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ErrorHandler;
/**
* @description
*/
@Slf4j
public class CustomerErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable e) {
//TODO 消费异常时处理逻辑
log.error("发生了异常", e);
}
}
1.7 自定义注解
package com.learning.mq.redis.annotation;
import java.lang.annotation.*;
/**
* 消息队列自定义注解
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisStreamMessageQueueListener {
/**
* 消息队列的 key
* @return
*/
String value();
/**
* 消费者组
* @return
*/
String group() default "";
/**
* 消息反序列化类型
* @return
*/
Class type();
/**
* 是否自动 acknowledge
* @return
*/
boolean autoAck() default true;
/**
* 每次拉取的消息数,线程池每秒拉取一次
* @return
*/
int batchSize() default 5;
/**
* 启用死信队列
* @return
*/
boolean enableDeadLetter() default false;
/**
* 投递次数阈值
* 超过阈值后,消息将被添加到死信队列中
* @return
*/
int maxDeliverTimes() default 3;
/**
* 消息未Ack时间阈值(单位:秒)
* 超过阈值的消息将被重新投递到其他消费者(仅当消费者组存在多个消费者情况)
* @return
*/
int timeout() default 300;
/**
* 死信队列的key,当enableDeadLetter=true时必填
* @return
*/
String deadLetterKey() default "";
/**
* 死信队列最大长度
* @return
*/
int deadLetterMaxSize() default 1000;
}
1.8 服务类
package com.learning.mq.redis.service;
import com.alibaba.fastjson.JSON;
import com.learning.mq.redis.annotation.RedisStreamMessageQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.dao.DataAccessException;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ListenerAnnotation implements ApplicationListener<ApplicationStartedEvent>, InitializingBean {
protected Logger streamMqLog = LoggerFactory.getLogger(ListenerAnnotation.class);
private static final String DISTUIBUTE_HANDLE_PENDING_DATA_LOCK = "distuibute_handle_pending_data_lock";
private static List<RedisStreamMessageQueueListener> ans = new ArrayList<>();
// 记录违规的死信队列keys(违规:死信队列又配置了死信队列)
private List<String> illegalDeadLetterKeys = new ArrayList<>();
private volatile static ListenerAnnotation listenerAnnotation;
private final RedisStreamMessageQueueStartService redisStreamMessageQueueStartService;
private RedisTemplate redisTemplate;
private final String applicationName;
private final RedissonClient redissonClient;
// 当前服务ip及端口号,用于动态消费者组的游标继承
private final String serviceIP;
private final String servicePort;
private ListenerAnnotation(RedisStreamMessageQueueStartService redisStreamMessageQueueStartService, RedisTemplate redisTemplate, String applicationName, RedissonClient redissonClient, String serviceIP, String servicePort) {
this.redisStreamMessageQueueStartService = redisStreamMessageQueueStartService;
this.redisTemplate = redisTemplate;
this.applicationName = applicationName;
this.redissonClient = redissonClient;
this.serviceIP = serviceIP;
this.servicePort = servicePort;
}
/**
* 单例
*
* @param redisStreamMessageQueueStartService
* @return
*/
public static ListenerAnnotation getListenerAnnotation(RedisStreamMessageQueueStartService redisStreamMessageQueueStartService, RedisTemplate redisTemplate, String group, RedissonClient redissonClient, String serviceIP, String servicePort) {
if (listenerAnnotation == null) {
synchronized (ListenerAnnotation.class) {
if (listenerAnnotation == null) {
listenerAnnotation = new ListenerAnnotation(redisStreamMessageQueueStartService, redisTemplate, group, redissonClient, serviceIP, servicePort);
}
}
}
return listenerAnnotation;
}
/**
* 循环处理keys中的pending数据
*/
private void handlePendingData() {
for (RedisStreamMessageQueueListener redisStreamMessageQueueListener : ans) {
String key = redisStreamMessageQueueListener.value();
String groupName = redisStreamMessageQueueListener.group();
StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(key);
if (groups.groupCount() > 0) {
Iterator<StreamInfo.XInfoGroup> groupIterator = groups.stream().iterator();
while (groupIterator.hasNext()) {
StreamInfo.XInfoGroup group = groupIterator.next();
if (StringUtils.equals(group.groupName(), groupName)) {
StreamInfo.XInfoConsumers consumers = redisTemplate.opsForStream().consumers(key, groupName);
if (group.pendingCount() > 0) {
PendingMessagesSummary pendingMessagesSummary = redisTemplate.opsForStream().pending(key, groupName);
PendingMessages pendingMessages = redisTemplate.opsForStream().pending(key, group.groupName(), pendingMessagesSummary.getIdRange(), 100);
if (pendingMessages.size() > 0) {
for (PendingMessage pendingMessage : pendingMessages) {
if (pendingMessage.getElapsedTimeSinceLastDelivery().toMillis() > redisStreamMessageQueueListener.timeout() * 1000) {
final String recordId = pendingMessage.getId().getValue();
streamMqLog.info("redis stream listen task. mqConfig: [{}] - stream: [{}] - group: [{}] - recordId: [{}] 发现消息超过限定时长[{}]s", redisStreamMessageQueueListener, key, group.groupName(), recordId, redisStreamMessageQueueListener.timeout());
try {
Map recordVal = null;
// 查询消息内容
List<MapRecord<String, Object, Object>> readRes = redisTemplate.opsForStream().range(key, Range.closed(recordId, recordId), new RedisZSetCommands.Limit().count(1));
if (readRes != null && readRes.size() > 0) {
for (MapRecord<String, Object, Object> v : readRes) {
if (StringUtils.equals(v.getId().getValue(), recordId)) {
recordVal = v.getValue();
}
}
}
if (recordVal != null) {
streamMqLog.info("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] 查询到消息的内容为[{}]", key, group.groupName(), recordId, JSON.toJSONString(recordVal));
if (pendingMessage.getTotalDeliveryCount() <= redisStreamMessageQueueListener.maxDeliverTimes() && consumers.size() > 1) {
streamMqLog.info("redis stream listen task. stream: [{}] - group: [{}] - consumers: [{}]", key, group.groupName(), consumers);
// 移交其他消费者处理
String newConsumer = consumers.get(consumers.size() - 1).consumerName();
for (Iterator<StreamInfo.XInfoConsumer> consumerIterator = consumers.stream().iterator(); ; consumerIterator.hasNext()) {
StreamInfo.XInfoConsumer curConsumer = consumerIterator.next();
if (StringUtils.equals(curConsumer.consumerName(), pendingMessage.getConsumerName())) {
break;
} else {
newConsumer = curConsumer.consumerName();
}
}
if (StringUtils.isNotBlank(newConsumer)) {
streamMqLog.info("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] 从消费者[{}]移交给消费者[{}]处理", key, group.groupName(), recordId, pendingMessage.getConsumerName(), newConsumer);
final String newConsumerFinal = newConsumer;
redisTemplate.execute(new RedisCallback<List<ByteRecord>>() {
@Override
public List<ByteRecord> doInRedis(RedisConnection connection) throws DataAccessException {
RedisStreamCommands.XClaimOptions claimOptions = RedisStreamCommands.XClaimOptions.minIdle(Duration.ofSeconds(1)).ids(pendingMessage.getId());
return connection.streamCommands().xClaim(key.getBytes(), groupName, newConsumerFinal, claimOptions.idle(Duration.ofSeconds(1)));
}
});
} else {
streamMqLog.warn("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] 移交消费者失败,未查询到有效的新消费者!", key, group.groupName(), recordId);
}
} else {
// 记录日志或者放入配置好的队列中
if (redisStreamMessageQueueListener.enableDeadLetter()) {
String deadLetterKey = redisStreamMessageQueueListener.deadLetterKey();
if (StringUtils.isNotBlank(deadLetterKey) && !illegalDeadLetterKeys.contains(deadLetterKey) && redisStreamMessageQueueListener.deadLetterMaxSize() > 0) {
// 放入队列中
redisStreamMessageQueueStartService.coverSend(deadLetterKey, recordVal, new Integer(redisStreamMessageQueueListener.deadLetterMaxSize()).longValue());
streamMqLog.info("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] 放入死信队列[{}]中", key, group.groupName(), recordId, deadLetterKey);
} else {
// 报错违规
streamMqLog.warn("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] 放入死信队列[{}]失败!原因:死信队列为空或者违规套用死信队列,或未配置死信队列最大长度", key, group.groupName(), recordId, deadLetterKey);
if (streamMqLog.isDebugEnabled()) {
streamMqLog.info("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] - data: [{}]", key, group.groupName(), recordId, JSON.toJSONString(recordVal));
}
}
}
}
} else {
streamMqLog.warn("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] 未查询到消息的内容", key, group.groupName(), recordId);
}
} catch (Exception e) {
log.error("", e);
streamMqLog.error("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] 处理数据失败,原因:{}", key, group.groupName(), recordId, e.getMessage());
} finally {
// ack数据
Long ackLen = redisTemplate.opsForStream().acknowledge(key, groupName, recordId);
streamMqLog.info("redis stream listen task. stream: [{}] - group: [{}] - recordId: [{}] 消息ack[{}]条", key, group.groupName(), recordId, ackLen);
}
}
}
}
}
}
}
}
}
}
/**
* 添加监听
*
* @param event
*/
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
Map<String, Object> beans = event.getApplicationContext().getBeansWithAnnotation(RedisStreamMessageQueueListener.class);
List<String> deadLetterKeys = new ArrayList<>();
for (Object bean : beans.values()) {
RedisStreamMessageQueueListener ca = bean.getClass().getAnnotation(RedisStreamMessageQueueListener.class);
String group = ca.group();
redisStreamMessageQueueStartService.listener(ca.value(), group, ca.type(), ca.autoAck(), ca.batchSize(), (StreamListener) bean);
if (StringUtils.isNotBlank(group)) {
ans.add(ca);
}
if (ca.enableDeadLetter() && StringUtils.isNotBlank(ca.deadLetterKey())) {
deadLetterKeys.add(ca.deadLetterKey());
}
}
// 获取违规死信队列(违规:死信队列又配置了死信队列)
ans.forEach(v -> {
if (v.enableDeadLetter() && StringUtils.isNotBlank(v.deadLetterKey()) && deadLetterKeys.contains(v.value())) {
illegalDeadLetterKeys.add(v.value());
}
});
}
@Override
public void afterPropertiesSet() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
RLock lock = redissonClient.getLock(DISTUIBUTE_HANDLE_PENDING_DATA_LOCK + applicationName);
boolean res = false;
try {
res = lock.tryLock(1, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("", e);
}
if (!res) {
return;
}
try {
this.handlePendingData();
} catch (Exception e) {
log.error("", e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
} else {
log.warn("Lock not held by CurrentThread!!!");
}
}
},
0, 10, TimeUnit.SECONDS);
}
}
package com.learning.mq.redis.service;
import org.springframework.data.redis.stream.StreamListener;
/**
* 监听和发送方法
*/
public interface RedisStreamMessageQueueStartService {
/**
* 添加监听器
* @param event
* @param groupName
* @param type
* @param autoAck
* @param batchSize
* @param streamListener
*/
void listener(String event, String groupName, Class type, boolean autoAck, int batchSize, StreamListener streamListener);
<V> void coverSend(String event, V val, Long maxSize);
}
package com.learning.mq.redis.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import com.learning.mq.redis.handler.CustomerErrorHandler;
import java.net.Inet4Address;
import java.net.UnknownHostException;
import java.time.Duration;
@Slf4j
public class RedisStreamMessageQueueStartServiceImpl implements RedisStreamMessageQueueStartService {
private final long dataCenterId = getDataCenterId();
private final RedisTemplate<String, Object> redisTemplate;
long maxLen;
public RedisStreamMessageQueueStartServiceImpl(RedisTemplate<String, Object> redisTemplate, Long maxLen) {
this.redisTemplate = redisTemplate;
this.maxLen = maxLen;
}
@Override
public void listener(String event, String groupName, Class type, boolean autoAck, int batchSize, StreamListener streamListener) {
// 如果是动态消费者组,则无需创建消费者组
if (StringUtils.isNotBlank(groupName)) {
createGroup(event, groupName);
}
startSubscription(event, groupName, type, autoAck, batchSize, streamListener);
}
@Override
public <V> void coverSend(String event, V val, Long maxSize) {
ObjectRecord<String, V> record = StreamRecords.newRecord()
.ofObject(val)
.withId(RecordId.autoGenerate())
.withStreamKey(event);
redisTemplate.opsForStream().add(record);
redisTemplate.opsForStream().trim(event, maxSize, true);
if (log.isDebugEnabled()) {
log.debug("event {} send content {}", event, val);
}
}
private void startSubscription(String event, String groupName, Class type, boolean autoAck, int batchSize, StreamListener streamListener) {
RedisConnectionFactory redisConnectionFactory = redisTemplate.getConnectionFactory();
StreamMessageListenerContainer.StreamMessageListenerContainerOptions options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(type)
.batchSize(batchSize)
.hashValueSerializer(RedisSerializer.string())
.objectMapper(new ObjectHashMapper())
.errorHandler(new CustomerErrorHandler())
.build();
StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer
.create(redisConnectionFactory, options);
if (StringUtils.isBlank(groupName)) {
listenerContainer.receive(
StreamOffset.create(event, ReadOffset.lastConsumed()),
streamListener);
} else {
if (autoAck) {
listenerContainer.receiveAutoAck(
Consumer.from(groupName, groupName + dataCenterId),
StreamOffset.create(event, ReadOffset.lastConsumed()),
streamListener);
} else {
listenerContainer.receive(
Consumer.from(groupName, groupName + dataCenterId),
StreamOffset.create(event, ReadOffset.lastConsumed()),
streamListener);
}
}
listenerContainer.start();
}
private void createGroup(String event, String groupName) {
try {
String addGroupResult = redisTemplate.opsForStream().createGroup(event, ReadOffset.latest(), groupName);
if (!StringUtils.equals(addGroupResult, "OK")) {
log.error("STREAM - Group create failed");
}
} catch (Exception e) {
log.info("STREAM - Group create failed. reason: exist group.");
}
}
private static Long getDataCenterId() {
try {
String hostName = Inet4Address.getLocalHost().getHostName();
int[] ints = StringUtils.toCodePoints(hostName);
int sums = 0;
for (int b : ints) {
sums += b;
}
return (long) (sums % 32);
} catch (UnknownHostException e) {
// 如果获取失败,则使用随机数备用
return RandomUtils.nextLong(0, 31);
}
}
}
1.9 监听类
package com.learning.mq.redis.listener;
import com.alibaba.fastjson.JSON;
import com.learning.mq.redis.annotation.RedisStreamMessageQueueListener;
import com.learning.mq.redis.dto.MessageDTO;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
@Component
@RedisStreamMessageQueueListener(value = "message-stream", group = "consumerGroup", type = String.class, timeout = 20, autoAck = true)
@AllArgsConstructor
@Slf4j
public class MessageListener implements StreamListener<String, ObjectRecord<String, String>> {
private final RedisTemplate redisTemplate;
@Override
public void onMessage(ObjectRecord<String, String> message) {
try {
String stream = message.getStream();
RecordId id = message.getId();
String msg = message.getValue();
// MessageDTO messageDTO = JSON.parseObject(msg, MessageDTO.class);
log.info("消费的消息:{}", msg);
//当是消费组消费时,如果不是自动ack,则需要在这个地方手动ack
// redisTemplate.opsForStream().acknowledge("consumerGroup", message);
//移除消息
redisTemplate.opsForStream().delete(stream, id.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.10 controller
package com.learning.mq.redis.controller;
import lombok.AllArgsConstructor;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description 消息发送
*/
@RestController
@RequestMapping("/message")
@AllArgsConstructor
public class MessageController {
private RedisTemplate redisTemplate;
@GetMapping("/save")
public String save(@RequestParam String message){
// try {
String key = "message-stream";
// MessageDTO messageDTO = new MessageDTO();
// messageDTO.setContent(message);
// ObjectRecord<String, MessageDTO> record = StreamRecords.newRecord()
// .in(key)
// .ofObject(messageDTO)
// .withId(RecordId.autoGenerate());
// String msgId = redisTemplate.opsForStream().add(record).toString();
// return msgId;
// } catch (Exception e) {
// e.printStackTrace();
// }
// return message;
ObjectRecord<String, String> record = StreamRecords.newRecord()
.in(key)
.ofObject(message)
.withId(RecordId.autoGenerate());
String msgId = redisTemplate.opsForStream().add(record).toString();
return msgId;
}
}