JRT实现缓存协议

news2024/11/21 0:21:11

上一篇介绍的借助ORM的增、删、改和DolerGet方法,ORM可以很精准的知道热点数据做内存缓存。那么就有一个问题存在,即部署了多个站点时候,如果用户在一个Web里修改数据了,那么其他Web的ORM是不知道这个变化的,其他Web还是缓存的老数据的话就会造成其他Web命中的缓存数据是老的,造成不可靠问题。

那么就需要一种机制来解决多Web缓存不一致问题,参照ECP实现,把Web分为主服务器和从服务器。主服务器启动TCP服务端,从服务器启动TCP客户端连接主服务器,从服务器和主服务器之间一直保留TCP长连接用来通知缓存变化数据。这样在一个服务器增、删、改数据后就能及时通知其他服务器更新缓存,这样所有服务器的缓存数据都是可信赖的。

首先提取发送数据的对象类型
在这里插入图片描述

然后实现ECP管理类ECPManager来管理启动服务端和客户端

package JRT.DAL.ORM.Global;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 企业缓存协议,基于TCP在多台服务器直接共享缓存的更新,从而确保每个Web下的缓存数据的可靠性,这里实现TCP服务端当做小机,其他服务当做客户端和小机连接。
 * 小机会分发增加数据(增加数据分发与否倒是不影响缓存可靠性,主要是修改和删除)、修改数据、删除数据的执行记录给每个客户端,客户端按收到的记录把数据推入缓存
 */
public class ECPManager {
    /**
     * 要推入TCP的缓存队列
     */
    public static ConcurrentLinkedDeque ECPQuen = new ConcurrentLinkedDeque();

    /**
     * 主处理进程
     */
    private static Thread MainThread = null;

    /**
     * 发送数据的操作对象
     */
    public static Socket Sender = null;

    /**
     * 写操作对象
     */
    public static PrintWriter Writer = null;

    /**
     * 数据编码
     */
    private static String Encode="GBK";

    /**
     * 缓存所有客户端
     */
    private static ConcurrentHashMap<String,Socket> AllClient= new ConcurrentHashMap<>();

    /**
     * IP地址
     */
    private static String IP="";

    /**
     * 端口
     */
    private static  int Port=1991;

    /**
     * 是否启用了ECP
     */
    private static boolean UseEcp=false;

    /**
     * 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可
     *
     * @param obj
     * @throws Exception
     */
    public static void InECPToQuen(ECPDto obj) throws Exception{
        ECPQuen.add(obj);
    }

    /**
     * push目前的ECP缓存数据到远程服务器,供GlobalManager定时器定时推送
     */
    public static void TryPushEcp() throws Exception
    {
        try {
            //如果是客户端,先检查连接
            if (!IP.isEmpty()) {
                //重连
                if (Sender == null || Sender.isClosed()) {
                    LogUtils.WriteSecurityLog("ECP尝试重连:" + IP + ":" + Port);
                    StartEcpManager(IP, Port);
                    Thread.sleep(1000);
                }
                if (Sender == null || Sender.isClosed()) {
                    return;
                } else {
                    LogUtils.WriteSecurityLog("ECP尝试重连成功:" + IP + ":" + Port);
                }
            }
            List<ECPDto> pushList=null;
            //从缓存队列里面弹出数据push
            while (ECPQuen.size() > 0) {
                ECPDto obj = (ECPDto)ECPQuen.pop();
                if (obj != null) {
                    //启用了ECP就push到服务端,否则就只更新自己缓存
                    if(UseEcp==true)
                    {
                        //初始化push列表
                        if(pushList==null)
                        {
                            pushList=new ArrayList<>();
                        }
                        pushList.add(obj);
                    }
                    else
                    {
                        //转换成数据实体推入缓存
                        JRT.DAL.ORM.Global.GlobalManager.InCache(obj);
                    }
                }
            }
            //直接列表一次推送,没有启用ECP的话这个列表一直是空
            if(pushList!=null)
            {
                //客户端推送
                if (!IP.isEmpty()) {
                    Writer.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));
                    Writer.flush();
                }
                //服务端推送
                else {
                    //给每个连接的客户端推送信息
                    for (String ip : AllClient.keySet()) {
                        Socket oneClient = AllClient.get(ip);
                        //移除关闭的客户端
                        if (oneClient.isClosed()) {
                            AllClient.remove(ip);
                        }
                        PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);
                        oneWriter.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));
                        oneWriter.flush();
                    }
                }
            }
        }
        catch (Exception ex)
        {
            LogUtils.WriteExceptionLog("推送数据到ECP异常", ex);
        }
    }

    /**
     * 启动ECP管理
     * @param ip
     * @param port
     */
    public static void StartEcpManager(String ip, int port)
    {
        IP=ip;
        Port=port;
        //当客户端
        if (!ip.isEmpty()) {
            MainThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //得到输入流
                    InputStream inputStream = null;
                    //创建Socket对象并连接到服务端
                    Socket socket = null;
                    try {
                        //创建Socket对象并连接到服务端
                        socket = new Socket(ip, port);
                        Sender = socket;
                        Writer = new PrintWriter(new OutputStreamWriter(Sender.getOutputStream(), Encode), false);
                        //得到输入流
                        inputStream = socket.getInputStream();
                        //IO读取
                        byte[] buf = new byte[10240];
                        int readlen = 0;
                        //阻塞读取数据
                        while ((readlen = inputStream.read(buf)) != -1) {
                            try {
                                String ecpJson = new String(buf, 0, readlen, Encode);
                                //得到ECP实体
                                List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);
                                if(dtoList!=null&&dtoList.size()>0)
                                {
                                    for(ECPDto dto:dtoList) {
                                        //转换成数据实体推入缓存
                                        JRT.DAL.ORM.Global.GlobalManager.InCache(dto);
                                    }
                                }
                            }
                            catch (Exception ee)
                            {
                                LogUtils.WriteExceptionLog("ECP处理主服务器数据异常", ee);
                            }
                        }
                    }
                    catch (IOException ex) {
                        LogUtils.WriteExceptionLog("ECP侦听TCP服务异常", ex);
                    }
                    finally {
                        Sender=null;
                        try {
                            if (inputStream != null) {
                                //关闭输入
                                inputStream.close();
                            }
                            if (Writer != null) {
                                Writer.flush();
                                //关闭输出
                                Writer.close();
                                Writer=null;
                            }
                            if (socket != null) {
                                // 关闭连接
                                socket.close();
                            }
                        }
                        catch (Exception ex) {
                            LogUtils.WriteExceptionLog("释放TCP资源异常", ex);
                        }

                    }
                }
            });

        }
        //当服务端
        else {
            MainThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //得到输入流
                    InputStream inputStream = null;
                    //创建Socket对象并连接到服务端
                    Socket socket = null;
                    try {
                        ServerSocket serverSocket = new ServerSocket(port);
                        //增加一个无限循环
                        while (true) {
                            //等待客户端连接,阻塞
                            Socket clientSocket = serverSocket.accept();
                            //按IP存客户端连接
                            String clientIP=clientSocket.getInetAddress().getHostAddress();
                            AllClient.put(clientIP,clientSocket);
                            //接收客户端消息
                            Thread ClientThread = new Thread(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        //得到输出流
                                        Writer = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream(), Encode), false);
                                        //得到输入流
                                        InputStream inputStream = clientSocket.getInputStream();
                                        //IO读取
                                        byte[] buf = new byte[10240];
                                        int readlen = 0;
                                        //阻塞读取数据
                                        while ((readlen = inputStream.read(buf)) != -1) {
                                            String ecpJson = new String(buf, 0, readlen, Encode);
                                            //得到ECP实体
                                            List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);
                                            if(dtoList!=null&&dtoList.size()>0)
                                            {
                                                for(ECPDto dto:dtoList) {
                                                    //转换成数据实体推入缓存
                                                    JRT.DAL.ORM.Global.GlobalManager.InCache(dto);
                                                }
                                                //给每个连接的客户端推送信息
                                                for (String ip : AllClient.keySet()) {
                                                    Socket oneClient = AllClient.get(ip);
                                                    //移除关闭的客户端
                                                    if (oneClient.isClosed()) {
                                                        AllClient.remove(ip);
                                                    }
                                                    PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);
                                                    oneWriter.print(ecpJson);
                                                    oneWriter.flush();
                                                }
                                            }
                                        }
                                    }
                                    catch (Exception ee)
                                    {
                                        LogUtils.WriteExceptionLog("ECP处理客户端数据异常", ee);
                                    }
                                }
                            });
                            ClientThread.start();
                        }
                    }
                    catch (IOException ex) {
                        LogUtils.WriteExceptionLog("侦听仪器TCP异常", ex);
                    }
                    finally {
                        try {
                            if (inputStream != null) {
                                //关闭输入
                                inputStream.close();
                            }
                            if (Writer != null) {
                                Writer.flush();
                                //关闭输出
                                Writer.close();
                            }
                            if (socket != null) {
                                // 关闭连接
                                socket.close();
                            }
                        } catch (Exception ex) {
                            LogUtils.WriteExceptionLog("释放TCP资源异常", ex);
                        }

                    }
                }
            });
        }
        //启动主进程
        MainThread.start();
        UseEcp=true;
    }

    /**
     * 返回ECP待处理队列的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewECPQuenDate() throws Exception
    {
        return JRT.Core.Util.JsonUtil.Object2Json(ECPQuen);
    }

}

ECP管理类再对接上GlobalManager

package JRT.DAL.ORM.Global;

import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

import JRT.Core.MultiPlatform.JRTConfigurtaion;
import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.OneGlobalNode;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 实现内存模拟global的效果
 */
public class GlobalManager {
    /**
     * 在内存里缓存热点数据
     */
    private static ConcurrentHashMap<String, ConcurrentHashMap<String, OneGlobalNode>> AllHotData = new ConcurrentHashMap<>();

    /**
     * 要缓存数据的队列
     */
    private static ConcurrentLinkedDeque TaskQuen = new ConcurrentLinkedDeque();

    /**
     * 管理缓存的定时器
     */
    private static Timer ManageTimer = new Timer();

    /**
     * 缓存的最大对象数量
     */
    public static Integer GlobalCacheNum = 100000;

    /**
     * 当前的缓存数量
     */
    private static AtomicInteger CurCacheNum=new AtomicInteger(0);

    /**
     * 最后删除数据的时间
     */
    private static Long LastDeleteTime=null;

    /**
     * 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可
     *
     * @param obj
     * @throws Exception
     */
    public static void InCache(ECPDto obj) throws Exception{
        TaskQuen.add(obj);
    }

    /**
     * 通过主键查询数据
     * @param model
     * @param id
     * @param <T>
     * @return
     * @throws Exception
     */
    public static <T> T DolerGet(T model,Object id) throws Exception
    {
        //实体的名称
        String modelName = model.getClass().getName();
        if(AllHotData.containsKey(modelName))
        {
            //命中数据,克隆返回
            if(AllHotData.get(modelName).containsKey(id))
            {
                OneGlobalNode node=AllHotData.get(modelName).get(id);
                //更新时间
                node.Time=JRT.Core.Util.TimeParser.GetTimeInMillis();
                Object retObj=JRT.Core.Util.JsonUtil.CloneObject(node.Data);
                return (T)retObj;
            }
        }
        return null;
    }

    /**
     * 启动缓存数据管理的线程
     */
    public static void StartGlobalManagerTask() throws Exception{
        //最大缓存数量
        String GlobalCacheNumConf = JRTConfigurtaion.Configuration("GlobalCacheNum");
        if (GlobalCacheNumConf != null && !GlobalCacheNumConf.isEmpty()) {
            GlobalCacheNum = JRT.Core.Util.Convert.ToInt32(GlobalCacheNumConf);
        }
        //定时任务
        TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {
                try {
                    //缓存队列的数据并入缓存
                    while (TaskQuen.size() > 0) {
                        //处理要加入缓存的队列
                        DealOneDataQuen();
                    }
                    //尝试推送ECP数据到主服务
                    JRT.DAL.ORM.Global.ECPManager.TryPushEcp();
                    //清理多余的缓存数据,这里需要讲究算法,要求在上百万的缓存数据里快速找到时间最久远的数据
                    if(CurCacheNum.get()>GlobalCacheNum)
                    {
                        //每轮清理时间处于上次清理时间和当前时间前百分之5的老数据
                        long Diff=(JRT.Core.Util.TimeParser.GetTimeInMillis()-LastDeleteTime)/20;
                        //留下数据的最大时间
                        long LeftMaxTime=LastDeleteTime+Diff;
                        //遍历所有的热点数据
                        for (String model : AllHotData.keySet()) {
                            ConcurrentHashMap<String, OneGlobalNode> oneTableHot=AllHotData.get(model);
                            //记录要删除的数据
                            List<String> delList=new ArrayList<>();
                            for (String key : oneTableHot.keySet()) {
                                OneGlobalNode one=oneTableHot.get(key);
                                //需要删除的数据
                                if(one.Time<LeftMaxTime)
                                {
                                    delList.add(key);
                                }
                            }
                            //移除时间久的数据
                            for(String del:delList)
                            {
                                oneTableHot.remove(del);
                            }
                        }
                    }
                    //清理时间久远的缓存数据
                } catch (Exception ex) {
                    LogUtils.WriteExceptionLog("处理Global缓存异常", ex);
                }
            }
        };
        //尝试启动ECP管理器
        String IP = JRTConfigurtaion.Configuration("ECPIP");
        String Port = JRTConfigurtaion.Configuration("ECPPort");
        if(Port!=null&&!Port.isEmpty())
        {
            if(IP==null)
            {
                IP="";
            }
            //启动ECP管理器
            JRT.DAL.ORM.Global.ECPManager.StartEcpManager(IP,JRT.Core.Util.Convert.ToInt32(Port));
        }
        //启动缓存管理定时器
        ManageTimer.schedule(timerTask, 0, 500);

    }

    /**
     * 通过实体名称获得实体类型信息
     *
     * @param modelName 实体名称
     * @return
     */
    private static Class GetTypeByName(String modelName) throws Exception {
        return JRT.Core.Util.ReflectUtil.GetType(JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName") + ".Entity." + modelName, JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName"));
    }


    /**
     * 处理队列里的一条数据并入缓存
     */
    private static void DealOneDataQuen() {
        try {
            Object obj = TaskQuen.pop();
            if (obj != null) {
                ECPDto dto=(ECPDto)obj;
                //添加或者更新缓存
                if(dto.Cmd.equals("A")||dto.Cmd.equals("U")) {
                    Class type = GetTypeByName(dto.Model);
                    Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);
                    JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
                    //实体的名称
                    String modelName = dto.Model;
                    //得到数据的主键
                    String id = tableInfo.ID.Value.toString();
                    if (!AllHotData.containsKey(modelName)) {
                        ConcurrentHashMap<String, OneGlobalNode> map = new ConcurrentHashMap<>();
                        AllHotData.put(modelName, map);
                    }
                    //更新数据
                    if (AllHotData.get(modelName).containsKey(id)) {
                        AllHotData.get(modelName).get(id).Data = entity;
                        AllHotData.get(modelName).get(id).Time = JRT.Core.Util.TimeParser.GetTimeInMillis();
                    }
                    //加入到缓存
                    else {
                        OneGlobalNode node = new OneGlobalNode();
                        node.Data = entity;
                        node.Time = JRT.Core.Util.TimeParser.GetTimeInMillis();
                        AllHotData.get(modelName).put(id, node);
                        //缓存数量加1
                        CurCacheNum.addAndGet(1);
                        //记录时间
                        if (LastDeleteTime == null) {
                            LastDeleteTime = JRT.Core.Util.TimeParser.GetTimeInMillis();
                        }
                    }
                }
                //删除缓存
                else if(dto.Cmd.equals("D"))
                {
                    Class type = GetTypeByName(dto.Model);
                    Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);
                    JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
                    //实体的名称
                    String modelName = dto.Model;
                    //得到数据的主键
                    String id = tableInfo.ID.Value.toString();
                    if (AllHotData.containsKey(modelName)&&AllHotData.get(modelName).containsKey(id)) {
                        AllHotData.get(modelName).remove(id);
                    }
                }
                //清空表缓存
                else if(dto.Cmd.equals("CLEAR"))
                {
                    //实体的名称
                    String modelName = dto.Model;
                    if (AllHotData.containsKey(modelName)) {
                        AllHotData.get(modelName).clear();
                    }
                }
            }
        }
        catch (Exception ex) {
            LogUtils.WriteExceptionLog("处理Global缓存添加异常", ex);
        }
    }

    /**
     * 返回Global的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewGlobalJson() throws Exception
    {
        return JRT.Core.Util.JsonUtil.Object2Json(AllHotData);
    }

    /**
     * 返回Global待处理队列的json数据供调试看是否符合预期
     * @return
     */
    public static String ViewGlobalTaskQuenDate() throws Exception
    {
        return JRT.Core.Util.JsonUtil.Object2Json(TaskQuen);
    }

}

增删改和DolerGet调整

/**
     * 保存对象,不抛异常,执行信息通过参数输出
     *
     * @param entity   实体对象
     * @param outParam 输出执行成功或失败信息,执行成功时输出执行记录主键
     * @param <T>      实体类型约束
     * @return影响行数
     */
    @Override
    public <T> int Save(T entity, OutValue outValue, OutParam outParam) throws Exception {
        int row = 0;
        PreparedStatement stmt = null;
        boolean innerT = false;     //标识是否内部开启事务
        String sql = "";
        try {
            //根据实体对象获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取插入SQL语句
            sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetInsertSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, hash, false);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行插入SQL:" + sql + ";SQL参数:" + JRT.Core.Util.JsonUtil.Object2Json(hash.GetParam()));
            //获取ID列
            String idKey = tableInfo.ID.Key;
            //声明式SQL,并设置参数
            if (!idKey.isEmpty()) {
                stmt = Manager().Connection().prepareStatement(sql, new String[]{idKey});
            } else {
                stmt = Manager().Connection().prepareStatement(sql);
            }
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            ResultSet rowID = stmt.getGeneratedKeys();
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            //保存成功返回记录主键,返回影响记录数 1
            if (row == 1) {
                if (rowID.next() && (!idKey.isEmpty())) {
                    outValue.Value = rowID.getInt(idKey);
                    //设置RowID到实体
                    JRT.Core.Util.ReflectUtil.SetObjValue(entity, tableInfo.ID.Key, rowID.getInt(idKey));
                    //尝试把数据推入缓存队列
                    Manager().TryPushToCache(entity, "A");
                }
            } else {
                outParam.Code = OutStatus.ERROR;
                outParam.Message = "保存数据失败,执行保存返回:" + row;
            }
            return row;
        } catch (Exception ex) {
            outParam.Code = OutStatus.ERROR;
            //操作异常,判断如果开启事务,则回滚事务
            if (Manager().Hastransaction) {
                if (!Manager().RollTransaction()) {
                    outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + "执行SQL:" + sql;
        }
        //操作结束释放资源
        finally {
            if (stmt != null) {
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
                Manager().Close();
            }
        }
        return row;
    }


/**
     * 更新实体对象
     *
     * @param entity        实体对象
     * @param param         更新条件,有条件就按条件更新,没有条件就按主键更新
     * @param outParam      输出执行成功或失败的信息
     * @param updateColName 更新属性名集合,无属性则更新实体的所有属性
     * @param joiner        连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and
     * @param operators     操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=
     * @param <T>           类型限定符
     * @return 影响行数
     */
    @Override
    public <T> int Update(T entity, HashParam param, OutParam outParam, List<String> updateColName, List<String> joiner, List<String> operators) throws Exception {
        PreparedStatement stmt = null;
        if (outParam == null) outParam = new OutParam();
        int row = 0;
        boolean innerT = false;     //标识是否内部开启事务
        try {
            //根据实体获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取更新的SQL语句
            String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetUpdateSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, updateColName, joiner, operators, hash);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行更新SQL:" + sql);
            //声明式SQL,并且设置参数
            stmt = Manager().Connection().prepareStatement(sql);
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            if (row == 1) {
                //尝试把数据推入缓存队列
                Manager().TryPushToCache(entity, "U");
            }
            outParam.Code = OutStatus.SUCCESS;
            outParam.Message = "更新数据成功。";
            return row;
        } catch (Exception ex) {
            //操作异常,判断如果开启了事务,就回滚事务
            outParam.Code = OutStatus.ERROR;
            if (Manager().Hastransaction) {
                if (!Manager().RollTransaction()) {
                    outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "更新数据失败!" + ex.getCause().getMessage();
        }
        //操作结束释放资源
        finally {
            if (stmt != null) {
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
                Manager().Close();
            }
        }
        return row;
    }


/**
     * 根据条件删除记录
     *
     * @param entity    实体对象
     * @param param     删除条件,有条件按条件删除,没有条件按主键删除
     * @param outParam  输出执行成功或失败的信息
     * @param joiner    多条件逻辑连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and
     * @param operators 操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=
     * @param <T>       类型限定符
     * @return 影响行数
     */
    @Override
    public <T> int Remove(T entity, HashParam param, OutParam outParam, List<String> joiner, List<String> operators) throws Exception {
        PreparedStatement stmt = null;
        if (outParam == null) outParam = new OutParam();
        int row = 0;
        try {
            //根据实体对象获取表信息
            JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);
            HashParam hash = new HashParam();
            //获取删除SQL语句
            String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetDeleteSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, joiner, operators, hash);
            //写SQL日志
            JRT.Core.Util.LogUtils.WriteSqlLog("执行删除SQL:" + sql);
            //声明式SQL,并设置参数
            stmt = Manager().Connection().prepareStatement(sql);
            String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());
            row = stmt.executeUpdate();
            if (row == 1) {
                //尝试把数据推入缓存队列
                Manager().TryPushToCache(entity, "D");
            }
            JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);
            outParam.Code = OutStatus.SUCCESS;
            outParam.Message = "删除数据成功。";
            return row;
        } catch (Exception ex) {
            //操作异常,判断如果开启了事务,就回滚事务
            outParam.Code = OutStatus.ERROR;
            if (Manager().Hastransaction) {
                if (!Manager().RollTransaction()) {
                    outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";
                }
            }
            outParam.Message = "更新数据失败!" + ex.getCause().getMessage();
        }
        //操作结束释放资源
        finally {
            if (stmt != null) {
                stmt.close();
            }
            //如果上层调用未开启事务,则调用结束释放数据库连接
            if (!Manager().Hastransaction) {
                Manager().Close();
            }
        }
        return row;
    }

/**
     * 通过主键查询数据,带缓存的查询,用来解决关系库的复杂关系数据获取,顶替Cache的$g
     *
     * @param model 实体
     * @param id    主键
     * @param <T>
     * @return
     * @throws Exception
     */
    public <T> T DolerGet(T model, Object id) throws Exception {
        T ret = GlobalManager.DolerGet(model, id);
        //命中缓存直接返回
        if (ret != null) {
            return ret;
        }
        else {
            //调用数据库查询
            ret = GetById(model, id);
            //找到数据,推入缓存
            if(ret!=null) {
                ECPDto dto = new ECPDto();
                dto.Cmd = "A";
                dto.Model = ret.getClass().getSimpleName();
                dto.Data = JRT.Core.Util.JsonUtil.Object2Json(ret);
                //通知存入缓存
                GlobalManager.InCache(dto);
            }
        }
        return ret;
    }

/**
     * 通过主键查询数据,主业务数据没找到会按历史切割数量找历史表
     *
     * @param model
     * @param id
     * @param <T>
     * @return
     * @throws Exception
     */
    public <T> T GetById(T model, Object id) throws Exception {
        JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);
        List<ParamDto> param = new ArrayList<>();
        ParamDto p = new ParamDto();
        p.Key = tableInfo.ID.Key;
        p.Value = id;
        param.add(p);
        //创建实体集合
        List<T> lstObj = FindAll(model, param, "", -1, -1, "", null, null);
        //结果为空,返回一个新建的对象
        if (lstObj.size() == 0) {
            //从历史表取数据
            String HisTableName = tableInfo.TableInfo.HisTableName();
            if (!HisTableName.isEmpty()) {
                int cutNum=0;
                //指定了切割列按切割列切割
                if(!tableInfo.TableInfo.CutHisColName().isEmpty())
                {
                    cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());
                }
                else
                {
                    cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());
                }
                //除以历史页大小算到数据该放入哪个历史表
                int hisNum = cutNum/HistoryPage;
                //分割所有历史实体
                String[] HisTableNameArr = HisTableName.split("^");
                //存放页小于所有历史表数据就做移动
                if (hisNum < HisTableNameArr.length) {
                    String HisModelName = HisTableNameArr[hisNum];
                    //得到历史表的实体
                    Class cHis = GetTypeByName(HisModelName);
                    //克隆得到历史表的对象
                    Object hisData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);
                    //创建实体集合
                    List<T> lstHisObj = DolerFindAll(0,hisData, param, "", -1, -1, "", null, null);
                    //结果为空,返回一个新建的对象
                    if (lstHisObj.size() > 0) {
                        return lstHisObj.get(0);
                    }
                }
            }
            return null;
        }
        //否则返回第一个实体
        else {
            return lstObj.get(0);
        }
    }

    /**
     * 把数据安装维护的历史表大小移入历史表
     *
     * @param model 实体数据
     * @param <T>   泛型
     * @return 是否成功
     * @throws Exception
     */
    public <T> boolean MoveToHistory(T model) throws Exception {
        JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);
        String HisTableName = tableInfo.TableInfo.HisTableName();
        if (!HisTableName.isEmpty()) {
            //分割所有历史实体
            String[] HisTableNameArr = HisTableName.split("^");
            int cutNum=0;
            //指定了切割列按切割列切割
            if(!tableInfo.TableInfo.CutHisColName().isEmpty())
            {
                cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());
            }
            else
            {
                cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());
            }
            //除以历史页大小算到数据该放入哪个历史表
            int hisNum = cutNum/HistoryPage;
            //存放页小于所有历史表数据就做移动
            if (hisNum < HisTableNameArr.length) {
                String HisModelName = HisTableNameArr[hisNum];
                //得到历史表的实体
                Class cHis = GetTypeByName(HisModelName);
                //克隆得到历史表的对象
                Object newData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);
                OutParam out = new OutParam();
                //保存历史数据
                int saveRet = Save(newData, out);
                if (saveRet == 1) {
                    saveRet = Remove(model, out);
                }
                if (saveRet == 1) {
                    return true;
                }
            }
        }
        return false;
    }

连接管理类调整,根据是否带事务在操作执行成功后把数据推入ECP队列供Global管理器往主服务推送分发

package JRT.DAL.ORM.DBUtility;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import JRT.DAL.ORM.DBUtility.C3P0Util;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import JRT.DAL.ORM.DBUtility.IDbFactory;
import JRT.DAL.ORM.Global.ECPDto;

/**
 * 连接和事务管理
 */
public class DBManager {
    /**
     * 驱动名称
     */
    private String factoryName="";

    /**
     * 当前对象的驱动
     */
    private IDbFactory factory=null;


    /**
     * 存数据库连接对象
     */
    private Connection connection=null;

    /**
     * 要转入缓存的临时数据
     */
    private List<ECPDto> ToEcpTmp=null;

    /**
     * 尝试把数据推入缓存队列
     * @param obj 对象
     * @param Oper 操作 A:添加  U:更新  D:删除
     * @throws Exception
     */
    public void TryPushToCache(Object obj,String Oper) throws Exception
    {
        ECPDto dto=new ECPDto();
        dto.Cmd=Oper;
        dto.Model=obj.getClass().getSimpleName();
        dto.Data=JRT.Core.Util.JsonUtil.Object2Json(obj);
        //有事务就推缓存
        if(Hastransaction=true)
        {
            if(ToEcpTmp==null)
            {
                ToEcpTmp=new ArrayList<>();
            }
            ToEcpTmp.add(dto);
        }
        else
        {
            //没事务的直接推入缓存队列
            JRT.DAL.ORM.Global.ECPManager.InECPToQuen(dto);
        }
    }

    /**
     * 为每个数据库驱动存储工厂
     */
    private static ConcurrentHashMap<String, IDbFactory> hsFact = new ConcurrentHashMap<>();

    /**
     * 为每个数据库驱动存储连接池
     */
    private static ConcurrentHashMap<String, ComboPooledDataSource> hsPoll = new ConcurrentHashMap<>();

    /**
     * 得到驱动对象
     * @param factoryName
     * @return
     */
    public IDbFactory GetIDbFactory(String factoryName)
    {
        if(factory==null)
        {
            factory=hsFact.get(factoryName);
        }
        return factory;
    }


    /**
     * 尝试初始化连接池
     * @param factoryName
     */
    public static void TryInitConnPool(String factoryName) throws Exception
    {
        if(factoryName=="")
        {
            factoryName="LisMianDbFactory";
        }
        if(!hsPoll.containsKey(factoryName))
        {
            IDbFactory factory=JRT.Core.Context.ObjectContainer.GetTypeObject(factoryName);
            hsPoll.put(factoryName,C3P0Util.GetConnPool(factory));
            if(!hsFact.containsKey(factoryName))
            {
                hsFact.put(factoryName,factory);
            }
        }
    }

    /**
     * 构造函数
     * @param factName 驱动配置名称
     * @throws Exception
     */
    public DBManager(String factName) throws Exception
    {
        factoryName=factName;
        TryInitConnPool(factoryName);
    }

    /**
     * 存数据库连接对象
     */
    public Connection Connection() throws Exception
    {
        if(connection==null)
        {
            connection=hsPoll.get(factoryName).getConnection();
        }
        return connection;
    }

    /**
     * 标识是否开启事务
     */
    public boolean Hastransaction = false;

    /**
     * 存储开启多次事务的保存点,每次调用BeginTransaction开启事务是自动创建保存点
     */
    public LinkedList<Savepoint> Transpoints = new LinkedList<Savepoint>();

    /**
     * 获取开启的事务层级
     * @return
     */
    public int GetTransactionLevel()
    {
        return this.Transpoints.size();
    }

    /**
     * 释放数据库连接
     * @return true成功释放,false释放失败
     */
    public boolean Close() throws Exception
    {
        if(connection!=null)
        {
            connection.setAutoCommit(true);
            connection.close();
        }
        connection=null;
        return true;
    }


    /**
     * 此方法开启事务
     * @return  true开启事务成功,false开始事务失败
     */
    public boolean BeginTransaction() throws Exception
    {
        try
        {
            this.Connection().setAutoCommit(false);
            this.Hastransaction = true;
            Savepoint savepoint = this.Connection().setSavepoint();
            Transpoints.addLast(savepoint);
            return true;
        }

        catch (SQLException sqle)
        {
            JRT.Core.Util.LogUtils.WriteExceptionLog("开启事务失败!" + sqle.getMessage(), sqle);
        }
        return false;
    }

    /**
     * 回滚上一层事务
     * @return true回滚事务成功,false回滚事务失败
     */
    public boolean RollTransaction() throws Exception
    {
        //删除临时数据
        if(ToEcpTmp!=null) {
            ToEcpTmp.clear();
            ToEcpTmp=null;
        }
        //未开启事务时,算回滚事务成功
        if (!this.Hastransaction)
        {
            return true;
        }
        try
        {
            if (this.Transpoints.size() == 0)
            {
                this.Connection().rollback();
                this.Hastransaction = false;
            }
            else
            {
                Savepoint point = this.Transpoints.poll();
                this.Connection().rollback(point);
            }
            return true;
        }
        catch (SQLException sqle)
        {
            JRT.Core.Util.LogUtils.WriteExceptionLog("事务回滚失败!" + sqle.getMessage(),sqle);
            throw sqle;
        }
        finally
        {
            if (!this.Hastransaction)
            {
                Close();
            }
        }
    }

    /**
     * 回滚开启的全部事务
     * @return true回滚事务成功,false回滚事务失败
     */
    public boolean RollTransactionAll() throws Exception
    {
        //删除临时数据
        if(ToEcpTmp!=null) {
            ToEcpTmp.clear();
            ToEcpTmp=null;
        }
        //未开启事务时,算回滚事务成功
        if (!this.Hastransaction)
        {
            return true;
        }
        try
        {
            this.Connection().rollback();
            this.Hastransaction = false;
            return true;
        }
        catch (SQLException sqle)
        {
            JRT.Core.Util.LogUtils.WriteExceptionLog("回滚所有事务层级失败!" + sqle.getMessage(),sqle);
            throw sqle;
        }
        finally
        {
            Close();
        }
    }

    /**
     * 提交事务
     * @return true提交事务成功,false提交事务失败
     */
    public boolean CommitTransaction() throws Exception
    {
        try
        {
            //临时数据推入缓存
            if(ToEcpTmp!=null)
            {
                for(ECPDto obj:ToEcpTmp)
                {
                    //没事务的直接推入缓存队列
                    JRT.DAL.ORM.Global.ECPManager.InECPToQuen(obj);
                }
            }
            this.Connection().commit();
            this.Hastransaction = false;
            return true;
        }
        catch (SQLException sqle)
        {
            JRT.Core.Util.LogUtils.WriteExceptionLog("提交事务失败!" + sqle.getMessage(),sqle);
        }
        finally
        {
            //提交事务,不论成功与否,释放数据库连接
            try
            {
                Close();
            }
            catch (Exception ex)
            {

            }
        }
        return false;
    }

}





增加的配置
在这里插入图片描述

这样就有一个理论上比较靠谱的缓存机制了,业务用SQL查到主列表数据后,调用DolerGet的获得各种周边相关数据来组装给前台返回,就不用每个写业务的人自己考虑写复杂的级联查询数据库受不了,自己缓存数据时候缓存是否可靠,自己不缓存数据时候调用太多数据库交互又慢的问题。DolerGet基本可以满足比较无脑的多维取数据组装的要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1269257.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

强基固本,红海云数字化重塑提升国企干部管理能力

国有企业的干部管理体系建设具有重要的战略意义&#xff0c;对于构建高素质专业化的干部队伍&#xff0c;推动企业高质量发展至关重要。特别是在党的二十大以后&#xff0c;建设中国特色现代企业制度&#xff0c;在完善公司治理中加强党的领导&#xff0c;加强党管干部党管人才…

15.spring源码解析-invokeBeanFactoryPostProcessors

BeanFactoryPostProcessor接口允许我们在bean正是初始化之前改变其值。此接口只有一个方法: void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory);有两种方式可以向Spring添加此对象: 通过代码的方式: context.addBeanFactoryPostProcessor 通过xml…

【UGUI】事件侦听EventSystem系统0学

前言介绍 EventSystem是Unity UGUI中的一个重要组件&#xff0c;用于处理用户输入事件&#xff0c;如点击、拖拽、滚动等。它负责将用户输入事件传递给合适的UI元素&#xff0c;并触发相应的事件回调函数&#xff08;就是你想要做的事情&#xff0c;自定义函数&#xff09;。 …

Chrome显示分享按钮

分享按钮不见了&#xff01; Chrome://flags Chrome Refresh 2023 Disabled 左上角的标签搜索会到右上角。

《第一行代码:Android》第三版-3.4.4体验Activity的生命周期

本文的代码是在主Activity中&#xff0c;重载了几个生命周期函数&#xff0c;在日志中打印出对应的日志信息&#xff0c;有两个按钮&#xff0c;负责启动另外的Activity&#xff0c;并回到主Activity 由此查看日志&#xff0c;来体会生命周期。 MainActivity.kt 文件如下 pac…

井盖倾斜监测方式,智能井盖传感器效果

大家是否都曾经想过&#xff0c;为什么路面上的井盖容易发生事故&#xff1f;其实这主要是因为井盖倾斜或者位移等异常状态出现时&#xff0c;由于人员巡查的范围较大从而无法及时察觉所导致的。为了保障道路行人和车辆的安全&#xff0c;对于井盖的监测需要不断完善和升级。而…

C++-多态

目录 一.多态的概念 二.多态的条件 三.对实现多态的条件进行解释 四.override和final 五.三重对比 六.虚函数表和虚函数表指针 七.静态的多态和动态的多态 一.多态的概念 多态的概念&#xff1a;通俗来说&#xff0c;就是多种形态&#xff0c;具体点就是去完成某个行为…

《第一行代码:Android》第三版-如何为一个Activity添加layout文件

确切地说就是讲如何给一个不带view的Activity添加一个view&#xff0c;就是添加一个layout文件。 新建安卓项目&#xff0c;如果选择&#xff1a;就会给你创建一个没有view的Activity&#xff0c;如果后来你发现需要为这个Activity添加view&#xff0c;就是添加一个布局文件怎…

高速USB转以太网芯片CH397 UBOOT使用教程

简介 CH397 是一款高集成度、低功耗的 USB 网卡芯片&#xff0c;内置青稞 RISC-V 处理器、符合 USB2.1 协议规 范的高速 USB 控制器及收发器 PHY、以及符合 IEEE802.3 协议规范、支持 10M/100M 网络的以太网 MACPHY。已适配各类台式电脑、笔记本电脑、平板电脑、游戏机等的标准…

香港身份、香港永居身份、香港护照区别,三种证件之间是什么关系?

香港身份、香港永居身份、香港护照区别&#xff0c;三种证件之间是什么关系&#xff1f; 在港“通常性”住满7年之后&#xff0c;可以申请永居身份&#xff01; 香港身份&#xff1a;也可以称之为临时身份&#xff0c;无论通过香港优才计划、高才通计划、专才计划或者留学拿身份…

ASUS(华硕) B760M-AYW WIFI D4_解决wifi不能使用

1、最近新购买了一套 diy电脑主机&#xff0c;选用的是 ASUS B760M-AYW WIFI D4电脑主板 win10 系统&#xff0c;到货后 发现右下角电脑图标处及网络适配器中 没有wifi选项 首先 在官网和旗舰店客服处&#xff0c;确认了 该主板 有集成wifi模块&#xff0c;鲨鱼鳍天线未安装…

一篇让小白彻底搞懂性能调优

什么是性能调优&#xff1f;(what) 为什么需要性能调优&#xff1f;(why) 什么时候需要性能调优&#xff1f;(when) 什么地方需要性能调优&#xff1f;(where) 什么人来进行性能调优&#xff1f;(who) 怎么样进行性能调优&#xff1f;(How) 硬件配置&#xff1a;CUP Xeon…

【获奖作品公开】和鲸社区全方位协力师生备战中国大学生计算机设计大赛

11 月 23 日&#xff0c;2021 - 2023 年&#xff08;第 14 - 16 届&#xff09;中国大学生计算机设计大赛获奖作品正式上线和鲸社区&#xff01; 中国大学生计算机设计大赛&#xff08;下简称“大赛”或“4C”&#xff09;是由教育部认证、我国高校面向本科生最早的赛事之一&a…

一文读懂Asyncio

什么是Asyncio asyncio 是用来编写并发代码的库&#xff0c;使用async/await语法。 asyncio 被用作多个提供高性能 Python 异步框架的基础&#xff0c;包括网络和网站服务&#xff0c;数据库连接库&#xff0c;分布式任务队列等等。 asyncio 往往是构建 IO 密集型和高层级结构化…

无人机覆盖路径规划综述

摘要&#xff1a;覆盖路径规划包括找到覆盖某个目标区域的每个点的路线。近年来&#xff0c;无人机已被应用于涉及地形覆盖的多个应用领域&#xff0c;如监视、智能农业、摄影测量、灾害管理、民事安全和野火跟踪等。本文旨在探索和分析文献中与覆盖路径规划问题中使用的不同方…

二叉树:已知先序中序求后序或者其他(秒解)

看图找规律哈&#xff0c;不明白的在评论区找我哦

【精选】600兆 保研资料包 文书+面经+择校择导经验 等等 干货满满

推免上岸学长精选保研资料包 资源简介 保研上岸某中9计算机专业学长精心整理的600多兆保研资料包&#xff0c;内容覆盖全面&#xff0c;一次将保研涉及到的面试PPT、个人陈述、中英文自我介绍、英语问题、保研常识、专家推荐信、联系导师邮件、往年保研经验贴&面经、面试…

苹果TF签名全称TestFlight签名,需要怎么做才可以上架呢?

如果你正在开发一个iOS应用并准备进行内测&#xff0c;TestFlight是苹果提供的一个免费的解决方案&#xff0c;它使开发者可以邀请用户参加应用的测试。以下是一步步的指南&#xff0c;教你如何利用TestFlight进行内测以便于应用后续可以顺利上架App Store。 1: 准备工作 在测…

【Openstack Train安装】九、Nova安装

Nova是OpenStack中最核心的组件&#xff0c;它负责根据需求提供虚拟机服务并管理虚拟机生命周期&#xff0c;包括虚拟机创建、虚拟机调度和热迁移等。 Nova的子组件包括nova-api、nova-compute、nova-scheduler、nova-conductor、nova-db、nova-console等等。 本文介绍Nova安装…

Mysql 不执行索引问题与优化

难以查找的隐藏问题 及 解决办法&#xff1a; 问题总结&#xff1a;