效果图
数据库表及实体模型参考
创建消息事件模型
public class SystemMsgSendEvent implements DomainEvent {
private String eventId;//事件id
private ExecutePoint executePoint;//执行方式 事务提交后当前线程
private OperateInfo operateInfo;//操作信息
private String msgType;//消息类型 见 XysdMsgConstants.MSG_TYPE_
private MsgBody<?> msgBody;//消息内容
@ApiModelProperty(value="目标组织ids")
private Set<String> targetPartyIds;
@ApiModelProperty(value="目标组织拥有的资源")
private Set<String> targetPartyRes;
@ApiModelProperty(value="目标组织机构类型ids")
private Set<String> targetPartyOrgTypes;
@ApiModelProperty(value="忽略的组织ids")
private Set<String> ignorePartyIds;
@ApiModelProperty(value="忽略的领导级别")
private Set<String> ignoreLeaderTypes;
@ApiModelProperty(value="目标组织中 拥有的职务(没有指定默认只给科员发) 主要领导:leader_main/主管领导:leader_zg/科长:leader_kz/副科长:leader_fkz")
private Set<String> targetPartyLeaderDuties;
@Override
public OperateInfo getOperateInfo() {
return operateInfo;
}
public String getMsgType() {
return msgType;
}
public MsgBody<?> getMsgBody() {
return msgBody;
}
public Set<String> getTargetPartyIds() {
return targetPartyIds;
}
public Set<String> getTargetPartyOrgTypes() {
return targetPartyOrgTypes;
}
public Set<String> getIgnorePartyIds() {
return ignorePartyIds;
}
public Set<String> getTargetPartyRes() {
return targetPartyRes;
}
public Set<String> getIgnoreLeaderTypes() {
return ignoreLeaderTypes;
}
public Set<String> getTargetPartyLeaderDuties() {
return targetPartyLeaderDuties;
}
public SystemMsgSendEvent(){
super();
this.executePoint=ExecutePoint.CURR_THREAD_AFTER_COMMITTED;
}
public SystemMsgSendEvent(String msgType){
this();
this.msgType=msgType;
if(SystemMsgSendVO.msgType_notice.equals(msgType)) {
this.executePoint=ExecutePoint.NEW_THREAD_AFTER_COMMITTED;
}
}
public SystemMsgSendEvent bulidTargetPartyRes(Set<String> targetPartyRes) {
this.targetPartyRes = targetPartyRes;
return this;
}
//构建事件id
public SystemMsgSendEvent buildEventId(String eventId) {
this.eventId = eventId;
return this;
}
//构建事件执行点
public SystemMsgSendEvent buildExecutePoint(ExecutePoint executePoint) {
this.executePoint = executePoint;
return this;
}
//构建事件操作信息
public SystemMsgSendEvent buildOperateInfo(OperateInfo operateInfo) {
this.operateInfo = operateInfo;
return this;
}
//构建事件id
public SystemMsgSendEvent buildLeaderTypes(Set<String> leaderTypes) {
this.ignoreLeaderTypes = leaderTypes;
return this;
}
//构建职务id
public SystemMsgSendEvent buildTargetPartyLeaderDuties(Set<String> targetPartyLeaderDuties) {
this.targetPartyLeaderDuties = targetPartyLeaderDuties;
return this;
}
//构建所有职务id
public SystemMsgSendEvent buildTargetPartyLeaderDuties() {
Set<String> targetPartyLeaderDuties = new HashSet<>();
targetPartyLeaderDuties.add("leader_main");
targetPartyLeaderDuties.add("leader_zg");
targetPartyLeaderDuties.add("leader_kz");
targetPartyLeaderDuties.add("leader_fkz");
this.targetPartyLeaderDuties = targetPartyLeaderDuties;
return this;
}
//构建消息体
public SystemMsgSendEvent buildMsgBody(String msgContent) {
if(StringUtils.isBlank(msgContent)) throw new RuntimeException("消息内容不能为空");
if(this.msgBody!=null&&!(this.msgBody instanceof TextMsgBody)) {
throw new RuntimeException("非文本消息体不能设置文本内容");
}
this.msgBody = new TextMsgBody(msgContent);
return this;
}
//构建消息体
public SystemMsgSendEvent addMsgBodyDatas(Object... params) {
if(this.msgBody==null) this.msgBody = new ObjectMsgBody();
if(!(this.msgBody instanceof ObjectMsgBody)) {
throw new RuntimeException("非对象消息体不可设置消息体数据");
}
if(params.length%2!=0) throw new RuntimeException("对象消息体数据必须成对出现");
for (int i=0;i<params.length;i += 2) {
((ObjectMsgBody)this.msgBody).addMsgBodyData((String)params[i],params[i+1]);
}
return this;
}
//添加消息体数据
public SystemMsgSendEvent addMsgBodyData(String key, Object val) {
if(this.msgBody==null) this.msgBody = new ObjectMsgBody();
if(!(this.msgBody instanceof ObjectMsgBody)) {
throw new RuntimeException("非对象消息体不可设置消息体数据");
}
((ObjectMsgBody)this.msgBody).addMsgBodyData(key, val);
return this;
}
//添加消息目标组织ids
public SystemMsgSendEvent addTargetPartyIds(String... partyIds) {
if(this.targetPartyIds==null) this.targetPartyIds = new HashSet<>();
for (String partyId : partyIds) {
if(StringUtils.isBlank(partyId)){
//防止业务上的错误数据导致 发送全库消息
this.targetPartyIds.add("");
continue;
}
this.targetPartyIds.add(partyId);
}
return this;
}
//添加消息目标组织ids
public SystemMsgSendEvent addTargetPartyIds(Collection<String> partyIds) {
if(this.targetPartyIds==null) this.targetPartyIds = new HashSet<>();
for (String partyId : partyIds) {
if(StringUtils.isBlank(partyId)){
//防止业务上的错误数据导致 发送全库消息
this.targetPartyIds.add("");
continue;
}
this.targetPartyIds.add(partyId);
}
return this;
}
//添加消息目标组织ids
public SystemMsgSendEvent addTargetPartyOrgTypes(String... partyIds) {
if(this.targetPartyOrgTypes==null) this.targetPartyOrgTypes = new HashSet<>();
for (String partyId : partyIds) {
if(StringUtils.isBlank(partyId)) continue;
this.targetPartyOrgTypes.add(partyId);
}
return this;
}
//添加消息目标组织ids
public SystemMsgSendEvent addTargetPartyOrgTypes(Collection<String> partyIds) {
if(this.targetPartyOrgTypes==null) this.targetPartyOrgTypes = new HashSet<String>();
for (String partyId : partyIds) {
if(StringUtils.isBlank(partyId)) continue;
this.targetPartyOrgTypes.add(partyId);
}
return this;
}
//添加消息忽略组织ids
public SystemMsgSendEvent addIgnorePartyIds(String... partyIds) {
if(this.ignorePartyIds==null) this.ignorePartyIds = new HashSet<>();
for (String partyId : partyIds) {
if(StringUtils.isBlank(partyId)) continue;
this.ignorePartyIds.add(partyId);
}
return this;
}
//添加消息忽略组织ids
public SystemMsgSendEvent addIgnorePartyIds(Collection<String> partyIds) {
if(this.ignorePartyIds==null) this.ignorePartyIds = new HashSet<String>();
for (String partyId : partyIds) {
if(StringUtils.isBlank(partyId)) continue;
this.ignorePartyIds.add(partyId);
}
return this;
}
@ApiModelProperty(value="短信签名 可指定 默认值在配置文件中 指定“” 表示没有签名")
private String signature;
@ApiModelProperty(value="短信内容")
private String content;
@ApiModelProperty(value="短信模板编码 采用模板发送短信时必填")
private String templateCode;
@ApiModelProperty(value="短信模板参数 非必填")
private Map<String,Object> templateParams;
@ApiModelProperty(value="仅给离线用户发送 默认true")
private boolean onlyOfflineUser=true;
/** //先不用 短信服务那边还没改好
//构建根据短信内容发送短信参数
public SystemMsgSendEvent buildSendSmsParams(String signature, String content) {
this.signature=signature;
this.content=content;
return this;
}
**/
//构建根据短信模板发送短信参数
public SystemMsgSendEvent buildSendSmsParams(String signature, String templateCode,
Map<String,Object> templateParams) {
//根据配置 判断是否发送短信
if(!"true".equals(BaseConstants.getProperty("sendSMS","false"))) {
return this;
}
this.signature=signature;
this.templateCode=templateCode;
this.templateParams=templateParams;
return this;
}
//构建消息发送vo
public SystemMsgSendVO<?> getSystemMsgSendVO() {
SystemMsgSendVO<?> vo=null;
if(this.msgBody instanceof TextMsgBody) {
vo= new SystemMsgSendVO<>(this.operateInfo, this.msgType, this.msgBody,
this.targetPartyIds, this.targetPartyOrgTypes, this.ignorePartyIds, this.targetPartyRes,this.ignoreLeaderTypes,this.targetPartyLeaderDuties);
}else {
vo= new SystemMsgSendVO<>(this.operateInfo, this.msgType, this.msgBody,
this.targetPartyIds, this.targetPartyOrgTypes, this.ignorePartyIds, this.targetPartyRes,this.ignoreLeaderTypes,this.targetPartyLeaderDuties);
}
//构建发送短信参数
vo.buildSendSmsParams(signature, content, templateCode, templateParams,onlyOfflineUser);
return vo;
}
//给所有用户发短信
public SystemMsgSendEvent onlyOfflineUser(boolean onlyOfflineUser){
this.onlyOfflineUser = onlyOfflineUser;
return this;
}
//异常是否抛出
private boolean throwException=true;
public SystemMsgSendEvent buildThrowException(boolean throwException) {
this.throwException = throwException;
return this;
}
@Override
public boolean throwException() {
return throwException;
}
@Override
public String getEventId() {
if(StringUtils.isBlank(this.eventId)) return Utils.getUUID("");
return this.eventId;
}
@Override
public AccessTokenUser getOperator() {
return operateInfo.getOperator();
}
@Override
public Date obtainEventTime() {
return operateInfo.obtainNotNullOperateTime();
}
@Override
public <R> R getEventData() {
return null;
}
@Override
public ExecutePoint obtainExecutePoint() {
return this.executePoint;
}
@Override
public String getEventType() {
return SystemMsgSendEvent.class.getName();
}
}
消息体模型
public class MsgBody<T> {
protected T msgBody;
public void setMsgBody(T msgBody) {
this.msgBody = msgBody;
}
public T getMsgBody() {
return msgBody;
}
}
//更多的是使用这个模型
@ApiModel(value="ObjectMsgBody ",description="对象消息体")
@JsonIgnoreProperties(ignoreUnknown=true)
public class ObjectMsgBody extends MsgBody<OtherData> {
public ObjectMsgBody() {
}
public ObjectMsgBody(String xiaoxzsType) {
this.getMsgBody().put("xiaoxzsType", xiaoxzsType);
}
//添加消息体数据
public ObjectMsgBody addMsgBodyData(String key, Object val) {
this.getMsgBody().put(key, val);
return this;
}
@Override
public OtherData getMsgBody() {
if(this.msgBody==null) this.msgBody=new OtherData();
if(!(this.msgBody instanceof OtherData)) {
OtherData msgBody=new OtherData();
msgBody.putAll(this.msgBody);
this.msgBody = msgBody;
}
return this.msgBody;
}
}
@ApiModel(value="TextMsgBody ",description="文本消息体")
@JsonIgnoreProperties(ignoreUnknown=true)
public class TextMsgBody extends MsgBody<String> {
public TextMsgBody() {
super();
}
public TextMsgBody(String msgContent) {
super();
this.msgBody = msgContent;
}
}
发送事件
public void unlock(OperateInfo operateInfo, String id, String reason) {
AssessDataForm assessDataForm = assessDataFormRepository.findByDataFormId(id);
if (assessDataForm==null)throw new RuntimeException("xxx");
if (!AssessDataForm.STATUS_YSB.equals(assessDataForm.getStatus()))throw new RuntimeException("xxxxx");
boolean sysBelongOrgId = BaseConstants.getProperty("sysBelongOrgId", "").equals(operateInfo.obtainOperateOrgId());
if (!sysBelongOrgId)throw new RuntimeException("xxxx");
assessDataForm.unlock(operateInfo,reason);
assessDataFormRepository.createOrUpdate(assessDataForm);
this.handleBackUpdateScoreRecordStatus(operateInfo,assessDataForm);
if (!"xxx".equals(assessDataForm.getBaseInfo().getFormType().getName())) return;
SysDataSimpleValObj scoreWay = assessDataForm.getAssessTarget().getScoreWay();
//业务处理完成 发送消息提醒
DomainEventPublisherFactory.getRegisteredPublisher().publishEvent(
new SystemMsgSendEvent((String) XysdMsgConstants.MSG_TYPE_SYSTEM_GONGZZS.getId())
.buildEventId(assessDataForm.getId() + "_sendMsg")
.buildExecutePoint(ExecutePoint.NEW_THREAD_AFTER_COMMITTED)
.buildOperateInfo(operateInfo)//操作人
.addMsgBodyData("recordId",assessDataForm.getId())
.addMsgBodyData("recordType",RelateProjConstants.PROJTYPE_ASSESS_DATAFORM)
.addMsgBodyData("title","数据填报")
.addMsgBodyData("content","您单位填报的"+assessDataForm.getBaseInfo().getFormType().getName()+"被退回,请修改后重新上报。")
.addMsgBodyData("type", "tuihxg")//退回修改
.addMsgBodyData("sendTime", CalendarUtils.toString(operateInfo.obtainNotNullOperateTime(), "yyyy-MM-dd HH:mm"))//审核日期
.addTargetPartyIds(assessDataForm.getBaseInfo().getFormUser().getId())//经办人id
);
}
定义处理消息事件的监听器
@Service
public class NeedSendSystemMsgEventListener implements IDomainEventListener {
@Autowired
private ISystemMsgService systemMsgService;
@Override
public void onEvent(DomainEvent domainEvent) {
//迁移状态不处理
if(BaseConstants.getProperty("qyStatus", "false").equals("true")){
return;
}
if(!(domainEvent instanceof SystemMsgSendEvent)) {
return;
}
SystemMsgSendEvent event=(SystemMsgSendEvent)domainEvent;
SystemMsgSendVO<?> vo=event.getSystemMsgSendVO();
if(SystemMsgSendVO.msgType_notice.equals(vo.getMsgType())) {
this.systemMsgService.sendSystemNotice(vo);
}else {
this.systemMsgService.sendSystemMsg(vo);
}
}
@Override
public boolean listenOn(String eventType) {
return SystemMsgSendEvent.class.getName().equals(eventType);
}
}
定义消息处理服务接口
@ApiOperation(value="消息-发送系统消息", httpMethod="POST")
@RequestMapping(value="/system/msg/send",method= RequestMethod.POST)
public ApiResultDTO<String> sendSystemMsg(@RequestBody SystemMsgSendVO<MsgBody<?>> vo,
HttpServletRequest hreq){
return RestAPITemplate.restapi(() -> {
OperateInfo operateInfo = new AccessTokenUserAssembler().getOperateInfoFromReq( hreq);
msgService.sendSystemMsg(operateInfo, vo);
return null;
});
}
public void sendSystemMsg(OperateInfo operateInfo, SystemMsgSendVO<?> vo) {
if(vo.getOperateInfo()!=null) operateInfo=vo.getOperateInfo();
String msgType=vo.getMsgType();
String msgContentType=(vo.getMsgBody()==null||(vo.getMsgBody().getMsgBody() instanceof TextMsgBody))
?Msg.MSGCONTENTTYPE_TEXT:Msg.MSGCONTENTTYPE_OBJECT;
String msgContent=vo.obtainMsgContent();
PartyFindVO qvo=new PartyFindVO();
if(CollectionUtils.isNotEmpty(vo.getTargetPartyIds())) {
qvo.setRootIds(new ArrayList<String>(vo.getTargetPartyIds()));
}
if(CollectionUtils.isNotEmpty(vo.getTargetPartyOrgTypes())) {
qvo.setPartyOrgTypes(new ArrayList<String>(vo.getTargetPartyOrgTypes()));
}
if(CollectionUtils.isNotEmpty(vo.getIgnorePartyIds())) {
qvo.setIgnorePartyIds(new ArrayList<String>(vo.getIgnorePartyIds()));
}
if(CollectionUtils.isNotEmpty(vo.getTargetPartyRes())){
qvo.setHasAnyResUser(new ArrayList<String>(vo.getTargetPartyRes()));
}
//根据上面不同条件最终查找到符合条件的用户
//默认有提醒菜单的
qvo.setHasAllResUser(Arrays.asList((String) XysdSysConstants.RES_MODULE_SHOUY_TIX.getId()));
qvo.setHasRes(true);
qvo.setPartyTypes(Arrays.asList((String)XysdConstants.PARTY_TYPE__USER.getId()));
qvo.setLoadZhiWOtherData(true);//加载职务信息
List<SysDataSimpleDTO> users=ThreadLocalCache.fetchAPIData(
"findSysDataSimpleDTOPartys.",
()->sysGatewayService.findSysDataSimpleDTOPartys(qvo),
"获取目标人员信息失败"
);
if(CollectionUtils.isEmpty(users)) return;
//对于符合发送消息的用户进行简单封装
Set<String> duties = vo.getTargetPartyLeaderDuties()==null?new LinkedHashSet<>():vo.getTargetPartyLeaderDuties();
List<SysDataSimpleDTO> targetUsers = new ArrayList<>();
for (SysDataSimpleDTO user : users) {
OtherData otherData = user.getOtherData();
boolean leaderMain= otherData!=null&&Boolean.TRUE.equals(otherData.get(SystemMsgSendVO.leader_duties_leader_main))?true:false;//主要领导
boolean leaderZg = otherData!=null&&Boolean.TRUE.equals(otherData.get(SystemMsgSendVO.leader_duties_leader_zg))?true:false; //主管领导
boolean leaderKz = otherData!=null&&Boolean.TRUE.equals(otherData.get(SystemMsgSendVO.leader_duties_leader_kz))?true:false; //科长
boolean leaderFkz = otherData!=null&&Boolean.TRUE.equals(otherData.get(SystemMsgSendVO.leader_duties_leader_fkz))?true:false; //副科长
if(!leaderMain&&!leaderZg&&!leaderKz&&!leaderFkz){
targetUsers.add(user);
}else if(duties.contains(SystemMsgSendVO.leader_duties_leader_main)&&leaderMain){
targetUsers.add(user);
}else if(duties.contains(SystemMsgSendVO.leader_duties_leader_zg)&&leaderZg){
targetUsers.add(user);
}else if(duties.contains(SystemMsgSendVO.leader_duties_leader_kz)&&leaderKz){
targetUsers.add(user);
}else if(duties.contains(SystemMsgSendVO.leader_duties_leader_fkz)&&leaderFkz){
targetUsers.add(user);
}
}
//给用户构建分组
Map<String,SysDataSimpleDTO> groupIdToTargetUser=new HashMap<>();
for (SysDataSimpleDTO targetUser : targetUsers) {
MsgGroup group=MsgGroup.buildSystemMsgGroup(msgType, targetUser.getId());
groupIdToTargetUser.put(group.getId(), targetUser);
}
//根据分组id集合查询是否有已存在的分组
Map<String,SysDataSimpleDTO> errorGroupIdToTargetUser=new HashMap<>(groupIdToTargetUser);
List<MsgGroup> groups=this.msgDomainService.findMsgGroupsByIds(errorGroupIdToTargetUser.keySet());
for (MsgGroup group : groups) {
//如果存在,则将分组记录移除
errorGroupIdToTargetUser.remove(group.getId());
}
//构建线程池
Map<String,Future<MsgGroup>> groupIdToFuture=new HashMap<>();
//为没有组的人创建组
for (Map.Entry<String,SysDataSimpleDTO> entry: errorGroupIdToTargetUser.entrySet()) {
//创建分组记录
groupIdToFuture.put(entry.getKey(), this.createMsgGroupInThread(operateInfo, msgType, entry.getValue()));
}
for (Future<MsgGroup> future : groupIdToFuture.values()) {
try {
MsgGroup group = future.get();
if(group==null) continue;
//将新创建的分组记录添加到集合中
groups.add(group);
//移除已创建的分组
errorGroupIdToTargetUser.remove(group.getId());
} catch (Exception e) {
logger.error("创建消息组失败",e);
}
}
//防止并发创建系统组失败 再次获取下
if(errorGroupIdToTargetUser.size()>0) {
//再次查找是否还有遗漏未创建的,但已被其他请求给创建
List<MsgGroup> createErrorGroups=this.msgDomainService.findMsgGroupsByIds(errorGroupIdToTargetUser.keySet());
for (MsgGroup group : createErrorGroups) {
//再次将新创建的分组记录添加到集合
groups.add(group);
//移除已创建的分组记录
errorGroupIdToTargetUser.remove(group.getId());
}
}
//构建线程池
groupIdToFuture=new HashMap<String,Future<MsgGroup>>();
//发送消息
for (MsgGroup group : groups) {
//创建消息,并发送消息
groupIdToFuture.put(group.getId(), this.createMsgInThread(operateInfo, group, msgContentType, msgContent));
}
for (Map.Entry<String,Future<MsgGroup>> entry : groupIdToFuture.entrySet()) {
MsgGroup group = null;
try {
group = entry.getValue().get();
} catch (Exception e) {
logger.error("创建消息失败",e);
}
if(group!=null) {
continue;
}
errorGroupIdToTargetUser.put(entry.getKey(),groupIdToTargetUser.get(entry.getKey()));
}
//发送短信
if(StringUtils.isNotBlank(msgContent)){
this.handleSendMsgSameTimeSendSmS(operateInfo, vo, targetUsers);
}
if(errorGroupIdToTargetUser.size()==0) return;
StringBuffer names=new StringBuffer();
for (SysDataSimpleDTO targetUser : errorGroupIdToTargetUser.values()) {
if(names.length()>0) names.append("、");
String name=targetUser.getOtherData().obtainVal("fullName");
if(StringUtils.isBlank(name)) name=targetUser.getName();
names.append(name);
}
logger.warn("["+names+"]发送["+msgType+"]消息["+msgContent+"]失败");
}
//线程中创建消息组
private Future<MsgGroup> createMsgGroupInThread(final OperateInfo operateInfo,final String msgType,
final SysDataSimpleDTO targetUser){
return asyncTaskComponent.runTaskInThreadPool((d)->{
try {
AccessTokenUser u=new AccessTokenUser(targetUser.buildSysDataSimpleValObj(),targetUser.getOtherData());
MsgGroup group=this.msgDomainService.createSystemMsgGroup(operateInfo, msgType, u);
return group;
} catch (Exception e) {
logger.warn("创建消息组失败",e);
return null;
}
}, null);
}
//线程中创建消息
private Future<MsgGroup> createMsgInThread(OperateInfo operateInfo, MsgGroup group,
String msgContentType, String msgContent
) {
return asyncTaskComponent.runTaskInThreadPool((d)->{
try {
this.msgDomainService.sendMsgByGroup(operateInfo, group, msgContentType, msgContent);
return group;
} catch (Exception e) {
logger.warn("创建消息失败",e);
return null;
}
}, null);
}
/**
* 创建系统消息组
* @param operateInfo
* @param groupName
* @param userIds
* @return
*/
public MsgGroup createSystemMsgGroup(OperateInfo operateInfo, String msgType, AccessTokenUser targetUser) {
MsgGroup group=MsgGroup.buildSystemMsgGroup(msgType, targetUser.getUserId());
this.msgGroupRepository.createMsgGroup(group);
//添加组员
List<AccessTokenUser> users=new ArrayList<AccessTokenUser>();
users.add(targetUser);
this.msgGroupAddUsers(operateInfo,group,users);
return group;
}
/**
* 发送消息
* @param operateInfo
* @param group
* @param msgContent
*/
public Msg sendMsgByGroup(OperateInfo operateInfo, MsgGroup group, String msgContentType,
String msgContent
) {
//创建消息
Msg msg=new Msg(operateInfo, group, msgContentType, msgContent);
Msg exists = this.msgRepository.findByUniqueKey(msg.getUniqueKey());
if(exists != null) return exists;
this.msgRepository.createMsg(msg);
//更新消息组 最后消息
this.createSQLQueryByParams(
"update T_MSG_GROUP set last_msg_id=?0,last_msg_time=?1 where last_msg_time<?1 and id=?2",
msg.getId(), msg.getSendTime().getTime(), group.getId()
).executeUpdate();
//非系统组 未发送者 创建消息关系
if(!MsgGroup.GROUPTYPE_SYSTEM.equals(group.getGroupType())) {
MsgRelUser msgRelUser=new MsgRelUser(msg);
this.msgRepository.createMsgRelUser(msgRelUser);
}
//根据消息组批量未消息创建 用户关系
Map<String,Object> params=new HashMap<String,Object>();
StringBuffer sql=new StringBuffer();
sql.append("INSERT INTO T_MSG_REL_USER")
.append("(")
.append("id,")//关系id
.append("msg_id,")//消息id
.append("group_id,")//消息组id
.append("rel_type,")//用户关联类型
.append("user_id,")//关联用户id
.append("user_name,")//关联用户name
.append("user_dept_id,")//关联用户当时部门id
.append("user_dept_name,")//关联用户当时部门name
.append("user_org_id,")//关联用户当时机构id
.append("user_org_name,")//关联用户当时机构name
.append("viewed,")//消息是否已阅
//.append("viewTime,")//消息查阅时间
.append("removed")//消息是否已删除
.append(") ")
;
sql.append("SELECT ")
.append(SqlAdapterUtils.concat("'"+msg.getId()+"'","'_'","gu.user_id")).append(" as id,")
.append("'").append(msg.getId()).append("'").append(" as msg_id,")
.append("gu.group_id as group_id,")
.append("'").append(MsgRelUser.RELTYPE_RECEIVER).append("'").append(" as rel_type,")
.append("gu.user_id as user_id,")
.append("gu.user_name as user_name,")
.append("gu.user_dept_id as user_dept_id,")
.append("gu.user_dept_name as user_dept_name,")
.append("gu.user_org_id as user_org_id,")
.append("gu.user_org_name as user_org_name,")
.append("0 as viewed,")
//.append("null as viewTime,")
.append("0 as removed")
.append(" ")
;
sql.append("FROM T_MSG_GROUP_USER gu ");
sql.append("WHERE gu.group_id=:groupId ");
params.put("groupId",group.getId());
//非系统组 过滤掉 发送者关系 已经在上面创建过
if(!MsgGroup.GROUPTYPE_SYSTEM.equals(group.getGroupType())) {
sql.append("and gu.user_id<>:senderId ");
params.put("senderId",msg.getSender().getId());
}
this.createSQLQueryByMapParams(sql.toString(), params).executeUpdate();
//将 组内人员 组设置为展示
this.createSQLQueryByParams(
"update T_MSG_GROUP_USER set show_group=1 where group_id=?0 and show_group=0",
group.getId()
).executeUpdate();
DomainEvent de=new DefaultDomainEvent(msg.getId()+"_SendMsgContentToClient", operateInfo,
ExecutePoint.NEW_THREAD_AFTER_COMMITTED, Utils.buildMap("msgId",msg.getId()),
operateInfo.obtainNotNullOperateTime(), "SendMsgContentToClient"
);
DomainEventPublisherFactory.getRegisteredPublisher().publishEvent(de);
return msg;
}
向客户端发送消息监听
/**
* 处理 消息发送后 向客户端推送消息
*/
@Service
@Transactional
@SuppressWarnings("unchecked")
public class HandleSendMsgContentToClientListener extends JpaBaseQueryService implements IDomainEventListener {
@Autowired
private IMsgQueryService msgQueryService;
@Autowired
private MsgDomainService msgDomainService;
@Autowired
private ITokenService tokenService;
@Value("${spring.application.name}")
private String serviceName;
@Autowired
private DiscoveryClient discoveryClient;
@Autowired
private RestTemplate restTemplate;
@Autowired
private MsgWebSocketHandler msgWebSocketHandler;
@Override
public void onEvent(DomainEvent event) {
Map<String,Object> eventData=event.getEventData();
Set<String> userIds=null;
String msgContent=null;
String msgId=(String)eventData.get("msgId");
//根据 消息id 发送
if(StringUtils.isNotBlank(msgId)) {
ShowMsgDTO<?> msgDTO=this.msgQueryService.getShowMsgDTOById(null,msgId);
if(msgDTO==null) return;
MsgGroup group=this.safeGet(MsgGroup.class, msgDTO.getGroupId());
MsgGroupDTO groupDTO=new MsgGroupDTO(group.getId(), group.getGroupType(), group.getGroupName());
userIds=new HashSet<>();
msgContent=JsonConverter.toJsonStr(WebSocketMsgDTO.message(groupDTO,msgDTO));
List<MsgRelUser> rels=this.msgDomainService.findMsgRelUserByMsgIds(Collections.singleton(msgId));
for (MsgRelUser rel : rels) {
//发送人不在发送消息
if(MsgRelUser.RELTYPE_SENDER.equals(rel.getRelType())) {
continue;
}
userIds.add(rel.getUser().getId());
}
}
//根据 指定用户和消息内容 发送
else {
userIds=(Set<String>)eventData.get("userIds");
msgContent=(String)eventData.get("msgContent");
}
if(CollectionUtils.isEmpty(userIds)) return;
//获取注册中心本服务的所有实例列表
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);
if(CollectionUtils.isEmpty(serviceInstances)) {
serviceInstances = discoveryClient.getInstances(StringUtils.upperCase(serviceName));
}
if(serviceInstances.size()<=1) {
//发送消息
this.msgWebSocketHandler.sendMessageToUsers(userIds, msgContent);
return;
}
SendMessageToUsersVO vo=new SendMessageToUsersVO(userIds, msgContent);
//构建http请求体
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.APPLICATION_JSON);
String access_token=event.getOperator()!=null?event.getOperator().getAccessToken():null;
if(StringUtils.isNotBlank(access_token)) {
httpHeaders.add("access_token", access_token);
}else {
//获取内部访问令牌
String innerToken=this.createInnerToken(event.getOperator());
httpHeaders.add("inner_token", innerToken);
}
HttpEntity<SendMessageToUsersVO> req = new HttpEntity<>(vo, httpHeaders);
//通知服务的所有实例 向客户端推送websocket消息
for (ServiceInstance serviceInstance : serviceInstances) {
try {
restTemplate.postForObject(serviceInstance.getUri()+"/"+serviceName+"/_api/send/msg/to_client",
req, ApiResultDTO.class);
} catch (Exception e) {
logger.warn("通知其他服务发送消息失败",e);
}
}
}
//生成内部访问令牌
private String createInnerToken(AccessTokenUser user) {
ApiResultDTO<String> apiData=null;
try {
AccessTokenUser _user=user;
if(user==null){
_user=new AccessTokenUser(new SysDataSimpleValObj("xysd-msg","消息系统"),null);
}
apiData=this.tokenService.createInnerToken(_user);
return apiData.getData();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("生成内部访问令牌错误"+e.getMessage());
}
}
@Override
public boolean listenOn(String eventType) {
return "SendMsgContentToClient".equals(eventType);
}
}
通过websocket向客户端发送消息
//WebSocket处理器
public interface IWebSocketHandler extends WebSocketHandler {
//websocket拦截器
HandshakeInterceptor getHandshakeInterceptor();
//拦截的路径
default String[] getHandlePaths() {
return new String[] {"/api/websocket"};
}
//拦截的SockJS路径
default String[] getHandleSockJSPaths() {
return new String[] {"/api/sockjs"};
}
//允许的跨域规则
default String[] getAllowedOriginPatterns() {
return new String[] {"*"};
}
}
@Component
public class MsgWebSocketHandler implements IWebSocketHandler {
private Logger logger=LogManager.getLogger(this.getClass());
@Autowired
private IMsgService msgService;
@Autowired
private IMsgQueryService msgQueryService;
private int sessionC=0;//当前话量
//在线 用户 WebSocketSession列表 userId->token->WebSocketSession
private Map<String,List<WebSocketSession>> userIdToWsSessions = new ConcurrentHashMap<>();
//增加用户websocketSession
private boolean addUserWsSession(WebSocketSession wsSession, AccessTokenUser user) {
String userId=user.getUserId();
List<WebSocketSession> userWsSessions=this.userIdToWsSessions.get(userId);
if(userWsSessions==null) {
synchronized (this.userIdToWsSessions) {
//再锁内再获取一边
userWsSessions=this.userIdToWsSessions.get(userId);
if(userWsSessions==null) {
userWsSessions=new ArrayList<WebSocketSession>();
this.userIdToWsSessions.put(userId, userWsSessions);
}
}
}
logger.info("当前话量:"+this.sessionC);
logger.info("用户【"+user.getUserName()+"】会话量"+userWsSessions.size());
synchronized (userWsSessions) {
//当前服务器会话量 超过5000 不允许再增加
if(this.sessionC>5000) return false;
//同一个人 不允许 在同一台服务器中超过5个会话
if(userWsSessions.size()>=5) return false;
userWsSessions.add(wsSession);
// if(userWsSessions.size()==1) {
// //第一次上线 向其他用户广播状态
// WebSocketMsgDTO<AccessTokenUser> msg=WebSocketMsgDTO.online(user);
// this.sendMessageToAllClient(JsonConverter.toJsonStr(msg),user.getUserId());
// }
}
synchronized (this.getClass()) {
this.sessionC++;
}
return true;
}
//删除用户websocketSession
private boolean removeUserWsSession(WebSocketSession wsSession, AccessTokenUser user) {
String userId=user.getUserId();
List<WebSocketSession> userWsSessions=this.userIdToWsSessions.get(userId);
if(userWsSessions==null) return false;
boolean removed=false;
synchronized (userWsSessions) {
removed=userWsSessions.remove(wsSession);
}
if(!removed) return false;
// synchronized (this.userIdToWsSessions) {
// //最后一个会话 向其他用户广播状态
// if(userWsSessions.size()==0) {
// WebSocketMsgDTO<AccessTokenUser> msg=WebSocketMsgDTO.offline(user);
// this.sendMessageToAllClient(JsonConverter.toJsonStr(msg),user.getUserId());
// }
// }
synchronized (this.getClass()) {
this.sessionC--;
}
return true;
}
@Override
public HandshakeInterceptor getHandshakeInterceptor() {
return WebSocketAPIInterceptor.singleton();
}
/**
* WebSocket 客戶端连接建立成功后自动触发
* @param wsSession
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession wsSession) throws Exception {
Map<String, Object> attrs = wsSession.getAttributes();
OperateInfo operateInfo = (OperateInfo)attrs.get(WebSocketAPIInterceptor.ATTR_KEY_OPERATEINFO);
if(operateInfo==null) {
if (wsSession.isOpen()) {
wsSession.sendMessage(new TextMessage(JsonConverter.toJsonStr(WebSocketMsgDTO.error("非法操作用户"))));
wsSession.close(CloseStatus.POLICY_VIOLATION);
}
return;
}
AccessTokenUser user = operateInfo.getOperator();
//添加用户 会话
boolean added=this.addUserWsSession(wsSession, user);
if(!added) {
if (wsSession.isOpen()) {
wsSession.sendMessage(new TextMessage(JsonConverter.toJsonStr(WebSocketMsgDTO.error("会话窗口过多"))));
wsSession.close(CloseStatus.POLICY_VIOLATION);
}
}
MsgGroupQueryVO vo=new MsgGroupQueryVO();
List<ShowMsgGroupDTO> groups=this.msgQueryService.findUserMsgGroupDTOs(operateInfo, vo);
wsSession.sendMessage(new TextMessage(JsonConverter.toJsonStr(WebSocketMsgDTO.feedback(groups))));
}
/**
* WebSocket 客户端 发送消息来时自动触发
* @param wsSession :websocket 会话
* @param message :websocket 消息
* @throws Exception
*/
@Override
public void handleMessage(WebSocketSession wsSession, WebSocketMessage<?> wsMessage) throws Exception {
Map<String, Object> attrs = wsSession.getAttributes();
OperateInfo operateInfo = (OperateInfo)attrs.get(WebSocketAPIInterceptor.ATTR_KEY_OPERATEINFO);
if(operateInfo==null) {
if (wsSession.isOpen()) {
wsSession.sendMessage(new TextMessage(JsonConverter.toJsonStr(WebSocketMsgDTO.error("非法操作用户"))));
wsSession.close(CloseStatus.POLICY_VIOLATION);
}
return;
}
AccessTokenUser user = operateInfo.getOperator();
// 获取客户端发来的消息内容,不会为 null
String msg = (String)wsMessage.getPayload();
logger.info("接收到用户【"+user.getUserName()+"】["+user.getUserId()+"]发送的消息【"+msg+"】");
ReceivedMsgDTO dto=JsonConverter.jsonStrToObject(msg, ReceivedMsgDTO.class);
if(dto==null) {
wsSession.sendMessage(new TextMessage(JsonConverter.toJsonStr(WebSocketMsgDTO.error("消息格式错误"))));
return;
}
//发送消息
try {
ShowMsgDTO<String> showMsg=this.msgService.sendTextMsgByGroup(operateInfo, dto.getGroupId(), dto.getMsgContent());
if(showMsg == null) {
return;
}
// 给客户端反馈信息
wsSession.sendMessage(new TextMessage(JsonConverter.toJsonStr(WebSocketMsgDTO.feedback(dto.getClientMsgId(),showMsg))));
} catch (Exception e) {
e.printStackTrace();
logger.error("发送用户【"+user.getUserName()+"】["+user.getUserId()+"]消息【"+msg+"】失败");
// 给客户端反馈信息
wsSession.sendMessage(new TextMessage(JsonConverter.toJsonStr(WebSocketMsgDTO.feedbackError(dto.getClientMsgId(),e.getMessage()))));
}
}
/**
* 消息传输错误后自动触发
* @param wsSession
* @param throwable
* @throws Exception
*/
@Override
public void handleTransportError(WebSocketSession wsSession, Throwable throwable) throws IOException {
Map<String, Object> attrs = wsSession.getAttributes();
OperateInfo operateInfo = (OperateInfo)attrs.get(WebSocketAPIInterceptor.ATTR_KEY_OPERATEINFO);
if(operateInfo==null) {
if (wsSession.isOpen()) wsSession.close(CloseStatus.POLICY_VIOLATION);
return;
}
AccessTokenUser user = operateInfo.getOperator();
this.removeUserWsSession(wsSession, user);
if (wsSession.isOpen()) {
//如果 chrome 浏览器在连接成功的情况进行页面刷新,下面 close 的时候,会抛异常 而firefox则不会
wsSession.close(CloseStatus.BAD_DATA);
}
logger.warn("用户【"+user.getUserName()+"】["+user.getUserId()+"]消息传输错误,关闭连接");
}
/**
* 与客户端连接关闭后自动触发
* @param wsSession
* @param closeStatus
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession wsSession, CloseStatus closeStatus) {
Map<String, Object> attrs = wsSession.getAttributes();
OperateInfo operateInfo = (OperateInfo)attrs.get(WebSocketAPIInterceptor.ATTR_KEY_OPERATEINFO);
if(operateInfo==null) return;
AccessTokenUser user = operateInfo.getOperator();
this.removeUserWsSession(wsSession, user);
logger.info("用户【"+user.getUserName()+"】["+user.getUserId()+"]"+"断开链接");
}
/**
* 方法字面意思是:支持部分媒体消息,不常用
* @return
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 批量向用户发送消息
* @param userIds
* @param msg
* @return
*/
public Map<String,Boolean> sendMessageToUsers(Set<String> userIds, String msg) {
Map<String,Boolean> results=new HashMap<>();
if(CollectionUtils.isEmpty(userIds)) return results;
for (String userId : userIds) {
results.put(userId, this.sendMessageToUser(userId, msg));
}
return results;
}
/**
* 给指定用户发送信息
* @param userId 用户id
* @param textMessage : 待发送的消息, TextMessage 是间接实现了 WebSocketMessage 接口的
* @return 返回是否发送成功
*/
private boolean sendMessageToUser(String userId, String msg) {
List<WebSocketSession> userWsSessions=this.userIdToWsSessions.get(userId);
if(CollectionUtils.isEmpty(userWsSessions)) return false;
TextMessage textMessage = new TextMessage(msg);
boolean result = false;
for (WebSocketSession wsSession : userWsSessions) {
if (wsSession==null||!wsSession.isOpen()) continue;
try {
wsSession.sendMessage(textMessage);
result = true;
} catch (Exception e) {
e.printStackTrace();
}
}
if(result) {
logger.info("向用户["+userId+"]发送消息【"+msg+"】成功");
}else {
logger.info("向用户["+userId+"]发送消息【"+msg+"】失败");
}
return result;
}
/**
//广播信息 :给所有用户发送信息
private boolean sendMessageToAllClient(String msg, String ignoreUserId) {
logger.info("向所有用户发送消息【"+msg+"】");
TextMessage textMessage = new TextMessage(msg);
for (Map.Entry<String, List<WebSocketSession>> entrySet : this.userIdToWsSessions.entrySet()) {
if(CollectionUtils.isEmpty(entrySet.getValue())) continue;
if(entrySet.getKey().equals(ignoreUserId)) continue;
for (WebSocketSession wsSession : entrySet.getValue()) {
if (wsSession==null||!wsSession.isOpen()) continue;
try {
wsSession.sendMessage(textMessage);
} catch (Exception e) {
e.printStackTrace();
}
}
}
return true;
}
**/
}
页面展示请求接口
@ApiOperation(value="消息-分批获取历史消息", httpMethod="GET")
@ApiImplicitParams({
@ApiImplicitParam(name="groupId",value="消息组id"),
@ApiImplicitParam(name="lastMsgId",value="最后一条消息的id 空表示查最后{querySize}调",required=false),
@ApiImplicitParam(name="querySize",value="查询条数 默认10")
})
@RequestMapping(value="/history/find/by_last_id",method= RequestMethod.GET)
public ApiResultDTO<List<ShowMsgDTO<?>>> findHistoryMsgByLastMsgId(
@RequestParam("groupId")String groupId,
@RequestParam(name="lastMsgId",required=false)String lastMsgId,
@RequestParam(name="querySize",required=false)Integer querySize,
HttpServletRequest hreq
){
return RestAPITemplate.restapi(() -> {
OperateInfo operateInfo = new AccessTokenUserAssembler().getOperateInfoFromReq( hreq);
List<ShowMsgDTO<?>> datas=msgQueryService.findHistoryMsgByLastMsgId(operateInfo,
groupId, lastMsgId, querySize);
return datas;
});
}
Service服务
public List<ShowMsgDTO<?>> findHistoryMsgByLastMsgId(OperateInfo operateInfo, String groupId,
String lastMsgId, Integer querySize) {
Msg msg=this.msgDomainService.getMsgById(lastMsgId);
Date beginTime=null;
Date endTime=msg!=null?msg.getSendTime():null;
if(querySize==null) querySize=10;
List<Msg> msgs=new ArrayList<Msg>();
Map<Integer,MsgYear> yearToMsgYear=this.msgDomainService.getAllMsgYears();
int currYear=CalendarUtils.getCurrentYear();
int y=currYear;
String tableName="T_MSG";
for (int year=y;yearToMsgYear.get(year)!=null;year--) {
if(year<currYear) tableName="T_MSG_"+year;
Map<String,Object> params=new HashMap<String,Object>();
StringBuffer sql=new StringBuffer();
sql.append("select m.id,m.create_ip,m.create_time,m.group_id,m.msg_content_type,m.msg_content,m.msg_type,m.revoked,m.sender_dept_id,m.sender_dept_name,m.sender_org_id,m.sender_org_name,m.send_time,m.sender_id,m.sender_name,m.unique_key ");
sql.append("from ").append(tableName).append(" m ");
sql.append("join ").append(tableName.replace("T_MSG","T_MSG_REL_USER")).append(" mu on mu.msg_id=m.id ");
sql.append("where m.revoked=0 and m.group_id=:groupId and mu.user_id=:thisUserId ");
params.put("groupId", groupId);
params.put("thisUserId", operateInfo.obtainOperatorId());
if(beginTime!=null) {//预留 有可能将来半年半年查
sql.append("and m.send_time>=:beginTime ");
params.put("beginTime", beginTime);
}
if(endTime!=null&&year==y) {
sql.append("and (m.send_time<:endTime or (m.send_time=:endTime and m.id<:lastMsgId)) ");
params.put("lastMsgId", lastMsgId);
params.put("endTime", endTime);
}else if(endTime!=null) {
sql.append("and m.send_time<:endTime ");
params.put("endTime", endTime);
}
sql.append("order by m.send_time desc,m.id desc ");
List<Msg> datas=this.createPageSQLQueryByMapParams(sql.toString(), 1, querySize, params)
.addEntity(Msg.class).list();
if(CollectionUtils.isEmpty(datas)) continue;
if((msgs.size()+datas.size())>querySize) {
msgs.addAll(datas.subList(0, querySize-msgs.size()-1));
}else {
msgs.addAll(datas);
}
if(msgs.size()==querySize) break;
}
List<ShowMsgDTO<?>> msgDTOs=new MsgDTOAssembler(beansFactoryService)
.toUserShowMsgDTOs(operateInfo.getOperator(), msgs);
return msgDTOs;
}
封装数据模型
ApiModel(value="ShowMsgDTO ",description="展示的消息信息dto")
@JsonIgnoreProperties(ignoreUnknown=true)
public class ShowMsgDTO<T> {
@ApiModelProperty(value="id")
private String id;
@ApiModelProperty(value="groupId")
private String groupId;
@ApiModelProperty(value="消息发送者")
private SysDataSimpleValObj sender;//消息发送者
@ApiModelProperty(value="消息发送者部门")
private SysDataSimpleValObj sendDept;
@ApiModelProperty(value="消息发送者机构")
private SysDataSimpleValObj sendOrg;
@ApiModelProperty(value="消息发送时间")
@JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
private Date sendTime;
@ApiModelProperty(value="消息体类型 text:文本/object:对象")
private String msgBodyType;
@ApiModelProperty(value="消息体")
private T msgBody;
@ApiModelProperty(value="是否已阅")
private boolean viewed;
//省略get/set方法
public ShowMsgDTO() {
super();
}
public ShowMsgDTO(String id, String groupId, SysDataSimpleValObj sender, SysDataSimpleValObj sendDept,
SysDataSimpleValObj sendOrg, Date sendTime, String msgBodyType, T msgBody, boolean viewed
) {
super();
this.id = id;
this.groupId = groupId;
this.sender = sender;
this.sendDept = sendDept;
this.sendOrg = sendOrg;
this.sendTime = sendTime;
this.msgBodyType = msgBodyType;
this.msgBody = msgBody;
this.viewed = viewed;
}
}