记一次项目所学(中间件等)–动态提醒功能(RocketMQ)
订阅发布模式与观察者模式
RocketMQ:纯java编写的开源消息中间件 高性能低延迟分布式事务
Redis : 高性能缓存工具,数据存储在内存中,读写速度非常快
RocketMQ相关工具类及配置实现
配置类
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version>
</dependency>
//redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
生产者发送消息工具类
public class RocketMQUtil {
//同步发送消息
public static void syncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{
SendResult result = producer.send(msg);
System.out.println(result);
}
//异步发送消息
public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
Logger logger = LoggerFactory.getLogger(RocketMQUtil.class);
logger.info("异步发送消息成功,消息id:" + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
}
RocketMQ配置类
@Configuration
public class RocketMQConfig {
// rocketMQ名称服务器的地址
@Value("${rocketmq.name.server.address}")
private String nameServerAddr;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private UserFollowingService userFollowingService;
//生产者
@Bean("momentsProducer")
public DefaultMQProducer momentsProducer() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);
producer.setNamesrvAddr(nameServerAddr);
producer.start();
return producer;
}
@Bean("momentsConsumer")
//push 为推送,还有拉取等consumer
public DefaultMQPushConsumer momentsConsumer() throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS);
consumer.setNamesrvAddr(nameServerAddr);
//订阅 *表示所有内容
consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS, "*");
//消费者监听器,监听到后下一步操作
//registerMessageListener注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
//ConsumeConcurrentlyStatus并发处理
//MessageExt消息的扩充,ConsumeConcurrentlyContext为处理的上下文
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){
MessageExt msg = msgs.get(0);
if(msg == null){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//取出的是byte数组类型
String bodyStr = new String(msg.getBody());
UserMoment userMoment = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), UserMoment.class);
Long userId = userMoment.getUserId();
//定位粉丝id
List<UserFollowing>fanList = userFollowingService.getUserFans(userId);
for(UserFollowing fan : fanList){
//发到redis用户到redis拿
String key = "subscribed-" + fan.getUserId();
//把动态列表拿出来
String subscribedListStr = redisTemplate.opsForValue().get(key);
List<UserMoment> subscribedList;
if(StringUtil.isNullOrEmpty(subscribedListStr)){
subscribedList = new ArrayList<>();
}else{
//转换列表的类
subscribedList = JSONArray.parseArray(subscribedListStr, UserMoment.class);
}
subscribedList.add(userMoment);
//把列表再转成字符串放进去
redisTemplate.opsForValue().set(key, JSONObject.toJSONString(subscribedList));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
return consumer;
}
具体业务逻辑:
@Service
public class UserMomentsService {
@Autowired
private UserMomentsDao userMomentsDao;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private RedisTemplate<String, String> redisTemplate;
public void addUserMoments(UserMoment userMoment) throws Exception {
userMoment.setCreateTime(new Date());
//crud
userMomentsDao.addUserMoments(userMoment);
DefaultMQProducer producer = (DefaultMQProducer)applicationContext.getBean("momentsProducer");
//主题 以及json的数组消息
Message msg = new Message(UserMomentsConstant.TOPIC_MOMENTS, JSONObject.toJSONString(userMoment).getBytes(StandardCharsets.UTF_8));
RocketMQUtil.syncSendMsg(producer, msg);
}
// 查询订阅动态
public List<UserMoment> getUserSubscribedMoments(Long userId) {
String key = "subscribed-" + userId;
//查出来的是String描述的json类型
String listStr = redisTemplate.opsForValue().get(key);
//返回的是List类型,要把查出来的String封装成一个一个的UserMoment再进List中
return JSONArray.parseArray(listStr, UserMoment.class);
}
}
PS:消费信息逻辑在配置类的Consumer中已经写好了