文章目录
1、发送消息 KafkaService 2、生产者 service-album -> AlbumInfoServiceImpl 2.1、新增 saveAlbumInfo() 2.2、更新 updateAlbumInfo() 2.3、删除 removeAlbumInfo()
3、消费者 service-search - > AlbumListener.java
新增:如果是公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
新增数据 更新:如果是公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
更新数据 如果是私有的
专辑则发送消息给kafka,search通过监听器获取消息es删除
数据 删除:发送消息给kafka,search通过监听器获取消息es删除数据
1、发送消息 KafkaService
package com. atguigu. tingshu. common. service ;
import org. slf4j. Logger ;
import org. slf4j. LoggerFactory ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. kafka. support. SendResult ;
import org. springframework. stereotype. Service ;
import java. util. concurrent. CompletableFuture ;
@Service
public class KafkaService {
private static final Logger logger = LoggerFactory . getLogger ( KafkaService . class ) ;
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMsg ( String topic, String msg) {
this . sendMsg ( topic, null , null , msg) ;
}
public void sendMsg ( String topic, Integer partition, String key, String msg) {
CompletableFuture < SendResult > future = this . kafkaTemplate. send ( topic, partition, key, msg) ;
future. whenCompleteAsync ( ( result, ex) -> {
if ( ex != null ) {
logger. error ( "生产者发送消息失败!原因:{}" , ex. getMessage ( ) ) ;
}
} ) ;
}
}
whenCompleteAsync:异步完成时的处理、当异步操作完成时
2、生产者 service-album -> AlbumInfoServiceImpl
2.1、新增 saveAlbumInfo()
新增:如果是公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
新增数据
@Slf4j
@Service
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
public class AlbumInfoServiceImpl extends ServiceImpl < AlbumInfoMapper , AlbumInfo > implements AlbumInfoService {
@Autowired
private AlbumAttributeValueMapper attributeValueMapper;
@Autowired
private AlbumStatService albumStatService;
@Autowired
private KafkaService kafkaService;
@Transactional ( rollbackFor = Exception . class )
@Override
public void saveAlbumInfo ( AlbumInfoVo albumInfoVo) throws FileNotFoundException {
AlbumInfo albumInfo = new AlbumInfo ( ) ;
BeanUtils . copyProperties ( albumInfoVo, albumInfo) ;
Long userId = AuthContextHolder . getUserId ( ) ;
albumInfo. setUserId ( userId == null ? 1 : userId) ;
albumInfo. setTracksForFree ( 5 ) ;
albumInfo. setSecondsForFree ( 30 ) ;
albumInfo. setStatus ( SystemConstant . ALBUM_STATUS_PASS ) ;
this . save ( albumInfo) ;
Long albumInfoId = albumInfo. getId ( ) ;
List < AlbumAttributeValueVo > albumAttributeValueVoList = albumInfoVo. getAlbumAttributeValueVoList ( ) ;
if ( ! CollectionUtils . isEmpty ( albumAttributeValueVoList) ) {
albumAttributeValueVoList. forEach ( albumAttributeValueVo -> {
AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue ( ) ;
BeanUtils . copyProperties ( albumAttributeValueVo, albumAttributeValue) ;
albumAttributeValue. setAlbumId ( albumInfoId) ;
this . attributeValueMapper. insert ( albumAttributeValue) ;
} ) ;
}
this . albumStatService. saveAlbumStat ( albumInfoId) ;
if ( StringUtils . equals ( albumInfo. getIsOpen ( ) , "1" ) ) {
this . kafkaService. sendMsg ( KafkaConstant . QUEUE_ALBUM_UPPER , albumInfoId. toString ( ) ) ;
}
}
}
2.2、更新 updateAlbumInfo()
更新:如果是公开的
专辑则发送消息给kafka,search通过监听器获取消息同步
更新数据 如果是私有的
专辑则发送消息给kafka,search通过监听器获取消息es删除
数据
@Slf4j
@Service
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
public class AlbumInfoServiceImpl extends ServiceImpl < AlbumInfoMapper , AlbumInfo > implements AlbumInfoService {
@Autowired
private AlbumAttributeValueMapper attributeValueMapper;
@Autowired
private KafkaService kafkaService;
@Transactional
@Override
public void updateAlbumInfo ( Long albumId, AlbumInfoVo albumInfoVo) {
AlbumInfo albumInfo = new AlbumInfo ( ) ;
BeanUtils . copyProperties ( albumInfoVo, albumInfo) ;
albumInfo. setId ( albumId) ;
this . updateById ( albumInfo) ;
this . attributeValueMapper. delete ( new LambdaUpdateWrapper < AlbumAttributeValue > ( ) . eq ( AlbumAttributeValue :: getAlbumId , albumId) ) ;
List < AlbumAttributeValueVo > albumAttributeValueVoList = albumInfoVo. getAlbumAttributeValueVoList ( ) ;
if ( ! CollectionUtils . isEmpty ( albumAttributeValueVoList) ) {
albumAttributeValueVoList. forEach ( albumAttributeValueVo -> {
AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue ( ) ;
BeanUtils . copyProperties ( albumAttributeValueVo, albumAttributeValue) ;
albumAttributeValue. setAlbumId ( albumId) ;
this . attributeValueMapper. insert ( albumAttributeValue) ;
} ) ;
}
if ( StringUtils . equals ( albumInfoVo. getIsOpen ( ) , "1" ) ) {
this . kafkaService. sendMsg ( KafkaConstant . QUEUE_ALBUM_UPPER , albumId. toString ( ) ) ;
} else {
this . kafkaService. sendMsg ( KafkaConstant . QUEUE_ALBUM_LOWER , albumId. toString ( ) ) ;
}
}
}
2.3、删除 removeAlbumInfo()
删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
public class AlbumInfoServiceImpl extends ServiceImpl < AlbumInfoMapper , AlbumInfo > implements AlbumInfoService {
@Autowired
private AlbumAttributeValueMapper attributeValueMapper;
@Autowired
private AlbumStatMapper albumStatMapper;
@Autowired
private KafkaService kafkaService;
@Transactional
@Override
public void removeAlbumInfo ( Long albumId) {
this . removeById ( albumId) ;
this . albumStatMapper. delete ( new LambdaUpdateWrapper < AlbumStat > ( ) . eq ( AlbumStat :: getAlbumId , albumId) ) ;
this . attributeValueMapper. delete ( new LambdaUpdateWrapper < AlbumAttributeValue > ( ) . eq ( AlbumAttributeValue :: getAlbumId , albumId) ) ;
this . kafkaService. sendMsg ( KafkaConstant . QUEUE_ALBUM_LOWER , albumId. toString ( ) ) ;
}
}
3、消费者 service-search - > AlbumListener.java
package com. atguigu. tingshu. search. listener ;
@Component
public class AlbumListener {
@Autowired
private AlbumInfoFeignClient albumInfoFeignClient;
@Autowired
private UserInfoFeignClient userInfoFeignClient;
@Autowired
private CategoryFeignClient categoryFeignClient;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@KafkaListener ( topics = KafkaConstant . QUEUE_ALBUM_UPPER )
public void upper ( String albumId) {
if ( StringUtils . isBlank ( albumId) ) {
return ;
}
Result < AlbumInfo > albumInfoResult = this . albumInfoFeignClient. getAlbumInfo ( Long . valueOf ( albumId) ) ;
Assert . notNull ( albumInfoResult, "同步数据时,获取专辑信息失败!" ) ;
AlbumInfo albumInfo = albumInfoResult. getData ( ) ;
Assert . notNull ( albumInfo, "同步数据时,没有对应的专辑!" ) ;
AlbumInfoIndex albumInfoIndex = new AlbumInfoIndex ( ) ;
BeanUtils . copyProperties ( albumInfo, albumInfoIndex) ;
Result < UserInfoVo > userInfoVoResult = this . userInfoFeignClient. getUserById ( albumInfo. getUserId ( ) ) ;
Assert . notNull ( userInfoVoResult, "数据导入时,获取主播信息失败!" ) ;
UserInfoVo userInfoVo = userInfoVoResult. getData ( ) ;
if ( userInfoVo != null ) {
albumInfoIndex. setAnnouncerId ( userInfoVo. getId ( ) ) ;
albumInfoIndex. setAnnouncerName ( userInfoVo. getNickname ( ) ) ;
}
Result < BaseCategoryView > categoryResult = this . categoryFeignClient. getAllLevelCategories ( albumInfo. getCategory3Id ( ) ) ;
Assert . notNull ( categoryResult, "数据导入时,获取分类信息失败!" ) ;
BaseCategoryView baseCategoryView = categoryResult. getData ( ) ;
if ( baseCategoryView != null ) {
albumInfoIndex. setCategory1Id ( baseCategoryView. getCategory1Id ( ) ) ;
albumInfoIndex. setCategory2Id ( baseCategoryView. getCategory2Id ( ) ) ;
}
int playNum = ( new Random ( ) . nextInt ( 100 ) + 1 ) * 10000 ;
albumInfoIndex. setPlayStatNum ( playNum) ;
int subscribeNum = ( new Random ( ) . nextInt ( 100 ) + 1 ) * 10000 ;
albumInfoIndex. setSubscribeStatNum ( subscribeNum) ;
int buyNum = ( new Random ( ) . nextInt ( 100 ) + 1 ) * 10000 ;
albumInfoIndex. setBuyStatNum ( buyNum) ;
int commentNum = ( new Random ( ) . nextInt ( 100 ) + 1 ) * 10000 ;
albumInfoIndex. setCommentStatNum ( commentNum) ;
albumInfoIndex. setHotScore ( playNum * 0.1 + commentNum * 0.2 + subscribeNum * 0.3 + buyNum * 0.4 ) ;
Result < List < AlbumAttributeValue > > albumAttributeValueResult = this . albumInfoFeignClient. getAlbumAttributeValue ( albumInfo. getId ( ) ) ;
Assert . notNull ( albumAttributeValueResult, "数据导入时,获取标签及值失败!" ) ;
List < AlbumAttributeValue > albumAttributeValues = albumAttributeValueResult. getData ( ) ;
if ( ! CollectionUtils . isEmpty ( albumAttributeValues) ) {
albumInfoIndex. setAttributeValueIndexList ( albumAttributeValues. stream ( ) . map ( albumAttributeValue -> {
AttributeValueIndex attributeValueIndex = new AttributeValueIndex ( ) ;
BeanUtils . copyProperties ( albumAttributeValue, attributeValueIndex) ;
return attributeValueIndex;
} ) . collect ( Collectors . toList ( ) ) ) ;
}
this . elasticsearchTemplate. save ( albumInfoIndex) ;
}
@KafkaListener ( topics = KafkaConstant . QUEUE_ALBUM_LOWER )
public void lower ( String albumId) {
if ( StringUtils . isBlank ( albumId) ) {
return ;
}
this . elasticsearchTemplate. delete ( albumId, AlbumInfoIndex . class ) ;
}
}