文章目录
1、kafka确保消息不丢失? 1.1、生产者端确保消息不丢失 1.2、kafka服务端确保消息不丢失 1.3、消费者确保正确无误的消费
2、生产者发送消息 KafkaService 3、UserInfoServiceImpl -> login() 4、service-account - > AccountListener.java
1、kafka确保消息不丢失?
1.1、生产者端确保消息不丢失
发送模式:发后即忘、同步阻塞确认、异步非阻塞确认 生产者acks模式:props.put(“acks”, “all”)、acks: all(-1) 配置重试:props.put(“retries”, 3)、retries: 3
1.2、kafka服务端确保消息不丢失
kafka是文件型的消息中间件,不会单纯的因为服务器宕机导致消息丢失 消息的log日志文件损坏:搭建kafka集群(副本)
1.3、消费者确保正确无误的消费
偏移量提交 自动提交:enable-auto-commit: true 手动提交:ack-mode: manual_immediate:同步提交 异步提交(推荐) 偏移量重置: auto-offset-reset: earliest -> 如果有偏移量则继续消费,如果偏移量没了,从头重新进行消费,可能会存在幂等性问题 auto-offset-reset: latest -> 如果有偏移量则继续消费,如果偏移量不存在,只消费新消息,旧消息没消费完就丢掉了 auto-offset-reset: none -> 如果有偏移量则继续消费,如果偏移量不存在,抛出异常 消费者重试:重试主题和死信主题, @RetryableTopic()
2、生产者发送消息 KafkaService
package com. atguigu. tingshu. common. service ;
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. kafka. support. SendResult ;
import org. springframework. stereotype. Service ;
import java. util. concurrent. CompletableFuture ;
@Service
public class KafkaService {
private static final Logger logger = LoggerFactory . getLogger ( KafkaService . class ) ;
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMsg ( String topic, String msg) {
this . sendMsg ( topic, null , null , msg) ;
}
public void sendMsg ( String topic, Integer partition, String key, String msg) {
CompletableFuture < SendResult > future = this . kafkaTemplate. send ( topic, partition, key, msg) ;
future. whenCompleteAsync ( ( result, ex) -> {
if ( ex != null ) {
logger. error ( "生产者发送消息失败!原因:{}" , ex. getMessage ( ) ) ;
}
} ) ;
}
}
whenCompleteAsync:异步完成时的处理、当异步操作完成时
3、UserInfoServiceImpl -> login()
此时 service-user 是生产者 发送消息
@Slf4j
@Service
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
public class UserInfoServiceImpl extends ServiceImpl < UserInfoMapper , UserInfo > implements UserInfoService {
@Autowired
private WxMaService wxMaService;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private UserAccountFeignClient userAccountFeignClient;
@Autowired
private KafkaService kafkaService;
@Override
public Map < String , Object > login ( String code) {
HashMap < String , Object > map = new HashMap < > ( ) ;
try {
WxMaJscode2SessionResult sessionInfo = this . wxMaService. getUserService ( ) . getSessionInfo ( code) ;
String openid = sessionInfo. getOpenid ( ) ;
UserInfo userInfo = this . getOne ( new LambdaQueryWrapper < UserInfo > ( ) . eq ( UserInfo :: getWxOpenId , openid) ) ;
if ( userInfo == null ) {
userInfo = new UserInfo ( ) ;
userInfo. setWxOpenId ( openid) ;
userInfo. setNickname ( "这家伙太懒" + IdWorker . getIdStr ( ) ) ;
userInfo. setAvatarUrl ( "https://img0.baidu.com/it/u=1633409170,3159960019&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500" ) ;
this . save ( userInfo) ;
this . kafkaService. sendMsg ( KafkaConstant . QUEUE_USER_REGISTER , userInfo. getId ( ) . toString ( ) ) ;
}
String token = UUID . randomUUID ( ) . toString ( ) ;
UserInfoVo userInfoVo = new UserInfoVo ( ) ;
BeanUtils . copyProperties ( userInfo, userInfoVo) ;
this . redisTemplate. opsForValue ( ) . set ( RedisConstant . USER_LOGIN_KEY_PREFIX + token, userInfoVo, RedisConstant . USER_LOGIN_KEY_TIMEOUT , TimeUnit . SECONDS ) ;
map. put ( "token" , token) ;
return map;
} catch ( WxErrorException e) {
throw new GuiguException ( ResultCodeEnum . LOGIN_AUTH ) ;
}
}
}
4、service-account - > AccountListener.java
此时 service-account 是消费者 接收消息
@Slf4j
@Component
public class AccountListener {
@Autowired
private UserAccountService userAccountService;
@RetryableTopic ( backoff = @Backoff ( 2000 ) )
@KafkaListener ( topics = KafkaConstant . QUEUE_USER_REGISTER )
public void listen ( String userId, Acknowledgment ack) {
if ( StringUtils . isBlank ( userId) ) {
ack. acknowledge ( ) ;
return ;
}
this . userAccountService. saveAccount ( Long . valueOf ( userId) ) ;
ack. acknowledge ( ) ;
}
}