文章目录
依赖 bus应用
接收的应用
使用bus 定义bus远程调用 A应用数据更新后通过bus数据同步给B应用
依赖
< dependency>
< groupId> org.springframework.cloud</ groupId>
< artifactId> spring-cloud-starter-bus-amqp</ artifactId>
</ dependency>
bus应用
接口
供内部其他应用使用,远程调用该接口实现各应用之间数据同步 参数1定义事件,参数2定义操作具体crud,参数3定义传参数据,参数4定义给哪个应用(nacos注册的应用名)同步数据
import cn. hutool. log. Log ;
import cn. hutool. log. LogFactory ;
import com. xyc. sms. common. bus. events. DataSyncEventEnum ;
import com. xyc. sms. common. bus. events. DataSyncEventFactory ;
import com. xyc. sms. common. bus. events. DataSyncOperateTypeEnum ;
import com. xyc. sms. common. entity. Result ;
import org. springframework. cloud. bus. ServiceMatcher ;
import org. springframework. context. ApplicationEventPublisher ;
import org. springframework. web. bind. annotation. * ;
import javax. annotation. Resource ;
@RestController
@RequestMapping ( "/default" )
public class DataSyncNotifyEventController {
private static final Log logger = LogFactory . get ( ) ;
@Resource
private ServiceMatcher busServiceMatcher;
@Resource
private ApplicationEventPublisher applicationEventPublisher;
@PostMapping ( "/publish/{eventEnum}/{operateType}" )
public Result publishDataSyncNotifyEvent ( @PathVariable ( "eventEnum" ) DataSyncEventEnum eventEnum,
@PathVariable ( "operateType" ) DataSyncOperateTypeEnum operateType,
@RequestBody Object obj,
@RequestParam ( value = "destination" , required = false ) String destination) {
try {
applicationEventPublisher. publishEvent ( DataSyncEventFactory . getInstanceForEvent (
eventEnum,
operateType,
obj,
busServiceMatcher. getServiceId ( ) ,
destination) ) ;
return Result . returnSuccessWithMsg ( "success" ) ;
} catch ( Exception e) {
logger. error ( e) ;
return Result . returnFail ( e. getMessage ( ) ) ;
}
}
}
import com. fasterxml. jackson. databind. ObjectMapper ;
import java. io. IOException ;
import java. lang. reflect. Constructor ;
import java. lang. reflect. InvocationTargetException ;
import java. util. Arrays ;
public class DataSyncEventFactory {
private static final ObjectMapper OM = new ObjectMapper ( ) ;
public static DataSyncEvent < ? > getInstanceForEvent ( DataSyncEventEnum eventEnum,
DataSyncOperateTypeEnum operateType,
Object source,
String originService,
String destinationService)
throws NoSuchMethodException , InvocationTargetException , InstantiationException , IllegalAccessException , IOException , ClassNotFoundException {
Constructor < ? > [ ] constructors = DataSyncEventFactory . getEventClass ( eventEnum) . getDeclaredConstructors ( ) ;
Constructor < ? > constructor = Arrays . stream ( constructors) . filter ( c -> c. getParameterCount ( ) == 4 ) . findFirst ( ) . orElseThrow (
NoSuchMethodException :: new ) ;
Object o = OM. readValue ( OM. writeValueAsString ( source) , constructor. getParameterTypes ( ) [ 1 ] ) ;
return ( DataSyncEvent < ? > ) constructor. newInstance ( operateType, o, originService, destinationService) ;
}
private static Class < ? > getEventClass ( DataSyncEventEnum eventEnum) throws ClassNotFoundException {
return Class . forName ( eventEnum. getEventClassName ( ) ) ;
}
}
用到的封装参数类
public enum DataSyncEventEnum {
BLACKLIST_SYN ( "com.xyc.sms.common.bus.events.dataSync.BlackListSyncEvent" ) ,
ROUTE_SYN ( "com.xyc.sms.common.bus.events.dataSync.RouteSyncEvent" ) ;
private final String eventClassName;
DataSyncEventEnum ( String eventClassName) {
this . eventClassName = eventClassName;
}
public String getEventClassName ( ) {
return eventClassName;
}
}
public enum DataSyncOperateTypeEnum implements Serializable {
ADD, UPD, DEL
}
public abstract class DataSyncEvent < T > extends RemoteApplicationEvent {
private DataSync < T > dataSync;
public DataSync < T > getDataSync ( ) {
return dataSync;
}
public void setDataSync ( DataSync < T > dataSync) {
this . dataSync = dataSync;
}
public DataSyncEvent ( DataSync < T > source, String originService, String destinationService) {
super ( source, originService, destinationService) ;
this . dataSync = source;
}
public String logPrint ( ) {
return String . format ( "{\"originService\":\"%s\",\"destinationService\":\"%s\",\"id\":\"%s\",\"dataSync\":%s,\"timestamp\":\"%s\"}" , this . getId ( ) , this . getOriginService ( ) , this . getDestinationService ( ) , Objects . nonNull ( this . dataSync) ? this . dataSync. toString ( ) : "null" , this . getTimestamp ( ) ) ;
}
public static class DataSync < T > implements Serializable {
private DataSyncOperateTypeEnum operateType;
private T data;
public DataSync ( ) {
}
public DataSync ( DataSyncOperateTypeEnum operateType, T data) {
this . operateType = operateType;
this . data = data;
}
public DataSyncOperateTypeEnum getOperateType ( ) {
return operateType;
}
public T getData ( ) {
return data;
}
@Override
public String toString ( ) {
return "{\"operateType\":" +
operateType
+ ",\"data\":" +
data
+ "}" ;
}
}
}
public enum ServiceEnum {
SMS_BLACK_API ( "sms-black-api" ) ,
SMS_RULES ( "sms-rules" ) ;
public final String serviceName;
ServiceEnum ( String serviceName) {
this . serviceName = serviceName;
}
}
接收的应用
监听器
推送过来的操作枚举类crud的值,决定执行哪个crud的具体方法 该类放在接收的应用中,其他顶部继承的类放在common
包中即可
import cn. hutool. core. collection. CollectionUtil ;
import cn. hutool. core. date. DateUtil ;
import cn. hutool. log. Log ;
import cn. hutool. log. LogFactory ;
import com. xyc. sms. common. bus. DataSyncListener ;
import com. xyc. sms. common. bus. events. dataSync. RouteSyncEvent ;
import com. xyc. sms. common. entity. sms. Route ;
import com. xyc. sms. rules. dao. boss. RouteMapper ;
import com. xyc. sms. rules. data. RuleSymbol ;
import com. xyc. sms. rules. service. SynService ;
import org. springframework. beans. factory. annotation. Autowired ;
import org. springframework. stereotype. Component ;
import java. util. List ;
import java. util. Objects ;
import java. util. Optional ;
import java. util. stream. Collectors ;
@Component
public class RouteSynNotifyListener extends DataSyncListener < RouteSyncEvent , List < Route > > {
private static final Log log = LogFactory . get ( ) ;
@Autowired
private RouteMapper routeMapper;
@Autowired
private SynService synService;
@Override
public void handleByADD ( List < Route > data) {
Optional . ofNullable ( data) . ifPresent ( ls -> {
if ( ls. isEmpty ( ) ) {
return ;
}
long l = System . currentTimeMillis ( ) ;
String time = DateUtil . formatDateTime ( ls. get ( 0 ) . getCreateTime ( ) ) ;
List < Route > list = routeMapper. selectByCreatetime ( time) ;
if ( CollectionUtil . isEmpty ( list) ) {
return ;
}
list. forEach ( r -> RuleSymbol. RouteMap . put ( r. getId ( ) , r) ) ;
synService. transformRoute ( list, ( s, r) -> RuleSymbol. RouteChannelMap . put ( s, r) ) ;
log. info ( "RouteSynNotifyListener - {} - add | createTime:{}" , ( System . currentTimeMillis ( ) - l) , time) ;
} ) ;
}
@Override
public void handleByUPD ( List < Route > data) {
Optional . ofNullable ( data) . ifPresent ( ls -> {
List < Integer > collect = ls. stream ( )
. map ( Route :: getId )
. filter ( Objects :: nonNull )
. collect ( Collectors . toList ( ) ) ;
if ( collect. isEmpty ( ) ) {
return ;
}
long l = System . currentTimeMillis ( ) ;
List < Route > c = collect. stream ( )
. map ( RuleSymbol. RouteMap :: get )
. filter ( Objects :: nonNull )
. collect ( Collectors . toList ( ) ) ;
if ( ! c. isEmpty ( ) ) {
synService. transformRoute ( c, ( s, r) -> RuleSymbol. RouteChannelMap . remove ( s, r) ) ;
}
List < Route > list = routeMapper. selectById ( collect) ;
if ( ! list. isEmpty ( ) ) {
list. forEach ( r -> RuleSymbol. RouteMap . put ( r. getId ( ) , r) ) ;
synService. transformRoute ( list, ( s, r) -> RuleSymbol. RouteChannelMap . put ( s, r) ) ;
}
log. info ( "RouteSynNotifyListener - {} - update | {}" , ( System . currentTimeMillis ( ) - l) , collect) ;
} ) ;
}
@Override
public void handleByDEL ( List < Route > data) {
Optional . ofNullable ( data) . ifPresent ( ls -> {
long l = System . currentTimeMillis ( ) ;
List < Route > collect = ls. stream ( )
. map ( r -> RuleSymbol. RouteMap . remove ( r. getId ( ) ) )
. filter ( Objects :: nonNull )
. collect ( Collectors . toList ( ) ) ;
if ( collect. isEmpty ( ) ) {
return ;
}
synService. transformRoute ( collect, ( s, r) -> RuleSymbol. RouteChannelMap . remove ( s) ) ;
log. info ( "RouteSynNotifyListener - {} - remove" , ( System . currentTimeMillis ( ) - l) ) ;
} ) ;
}
}
import cn. hutool. log. Log ;
import cn. hutool. log. LogFactory ;
import com. xyc. sms. common. bus. events. DataSyncEvent ;
import org. springframework. context. ApplicationListener ;
public abstract class DataSyncListener < T extends DataSyncEvent < D > , D > implements ApplicationListener < T > {
private static final Log logger = LogFactory . get ( ) ;
@Override
public void onApplicationEvent ( T event) {
logger. info ( "[DataSyncListener][onApplicationEvent] trigger event - {} - {}" , event. getClass ( ) . getName ( ) , event. logPrint ( ) ) ;
try {
triggerEvent ( event) ;
} catch ( Exception e) {
logger. error ( e) ;
}
}
public void triggerEvent ( T event) {
DataSyncEvent. DataSync < D > source = event. getDataSync ( ) ;
switch ( source. getOperateType ( ) ) {
case ADD:
handleByADD ( source. getData ( ) ) ;
return ;
case UPD:
handleByUPD ( source. getData ( ) ) ;
return ;
case DEL:
handleByDEL ( source. getData ( ) ) ;
return ;
default :
}
}
public abstract void handleByADD ( D data) ;
public abstract void handleByUPD ( D data) ;
public abstract void handleByDEL ( D data) ;
}
定义的事件类
package com. xyc. sms. common. bus. events. dataSync ;
import com. xyc. sms. common. bus. events. DataSyncEvent ;
import com. xyc. sms. common. bus. events. DataSyncOperateTypeEnum ;
import com. xyc. sms. common. entity. sms. Route ;
import java. util. List ;
public class RouteSyncEvent extends DataSyncEvent < List < Route > > {
private static final long serialVersionUID = - 501657066268464154L ;
public RouteSyncEvent ( DataSyncOperateTypeEnum operateType, List < Route > Routes , String originService, String destinationService) {
super ( new DataSync < > ( operateType, Routes ) , originService, destinationService) ;
}
}
public abstract class DataSyncEvent < T > extends RemoteApplicationEvent {
private DataSync < T > dataSync;
public DataSync < T > getDataSync ( ) {
return dataSync;
}
public void setDataSync ( DataSync < T > dataSync) {
this . dataSync = dataSync;
}
public DataSyncEvent ( DataSync < T > source, String originService, String destinationService) {
super ( source, originService, destinationService) ;
this . dataSync = source;
}
public String logPrint ( ) {
return String . format ( "{\"originService\":\"%s\",\"destinationService\":\"%s\",\"id\":\"%s\",\"dataSync\":%s,\"timestamp\":\"%s\"}" , this . getId ( ) , this . getOriginService ( ) , this . getDestinationService ( ) , Objects . nonNull ( this . dataSync) ? this . dataSync. toString ( ) : "null" , this . getTimestamp ( ) ) ;
}
public static class DataSync < T > implements Serializable {
private DataSyncOperateTypeEnum operateType;
private T data;
public DataSync ( ) {
}
public DataSync ( DataSyncOperateTypeEnum operateType, T data) {
this . operateType = operateType;
this . data = data;
}
public DataSyncOperateTypeEnum getOperateType ( ) {
return operateType;
}
public T getData ( ) {
return data;
}
@Override
public String toString ( ) {
return "{\"operateType\":" +
operateType
+ ",\"data\":" +
data
+ "}" ;
}
}
}
使用bus
定义bus远程调用
@FeignClient ( value= "sms-bus" , fallbackFactory = DataSyncNotifyEventServiceFallbackFactory . class )
public interface DataSyncNotifyEventService {
@PostMapping ( "/default/publish/{eventEnum}/{operateType}" )
Result publishDataSyncNotifyEvent ( @PathVariable ( "eventEnum" ) DataSyncEventEnum eventEnum,
@PathVariable ( "operateType" ) DataSyncOperateTypeEnum operateType,
@RequestBody Object obj,
@RequestParam ( "destination" ) String destination) ;
}
注入使用
A应用数据更新后通过bus数据同步给B应用
try {
result = dataSyncNotifyEventService. publishDataSyncNotifyEvent ( DataSyncEventEnum . ROUTE_SYN,
DataSyncOperateTypeEnum . ADD,
new ArrayList < Route > ( ) { {
Route r = new Route ( ) ;
r. setCreateTime ( date) ;
add ( r) ;
} } , ServiceEnum . SMS_RULES. serviceName) ;
log. info ( "新增路由调用通知同步所有服务 result:{}" , result) ;
} catch ( Exception e) {
log. error ( "同步异常 {}" , result, e) ;
}