手写分布式配置中心(四)增加实时刷新功能(长轮询)

news2024/11/24 1:35:27

上一篇文章中实现了短轮询,不过短轮询的弊端也很明显,如果请求的频率较高,那么就会导致服务端压力大(并发高);如果请求的频率放低,那么客户端感知变更的及时性就会降低。所以我们来看另一种轮询方式,长轮询。
长轮询就是客户端发起请求,如果服务端的数据没有发生变更,那么就hold住请求,直到服务端的数据发生了变更,或者达到了一定的时间就会返回。这样就减少了客户端和服务端不断频繁连接和传递数据的过程,并且不会消耗服务端太多资源,而且客户端感知变更的及时性也会大大提高

代码在https://gitee.com/summer-cat001/config-center​​​​​​​

原理

要实现服务端长时间hold请求,就要利用到servlet异步的特性,因为web服务器会有一个线程池,每一个请求来了之后会提交给这个线程池去处理请求,如果一个任务很长时间都没完成的话就会一直占有这个线程,那么其他请求来了会发现线程池里没有可用的线程就会一直等,直到有空闲的线程,这样就会导致并发性大大的减少。所以需要采用异步响应的方式去实现,而比较方便实现异步http的方式就是Servlet3.0提供的AsyncContext 机制。asyncContext是为了把主线程返回给web服务器的线程池,不影响服务对其他客户端请求。会有线程专门处理这个长轮询,但并不是说每一个长轮询的http请求都要用一个线程阻塞在那。而是把长轮询的request的引用在一个集合中存起来,用一个或几个线程专门处理一批客户端的长轮询请求,这样就不需要为每一个长轮询单独分配线程阻塞在那了,从而大大降低了资源的消耗。注意,异步不是非阻塞,响应数据时还是要阻塞的。

服务端

服务端增加一个长轮询的接口

@PostMapping("/change/get/long")
    public Result<Void> getLongChangeConfig(@RequestBody Map<Long, Integer> configIdMap, HttpServletRequest request, HttpServletResponse response) {
        if (configIdMap == null || configIdMap.isEmpty()) {
            return Result.fail("配置参数错误");
        }
        response.setContentType("application/json;charset=UTF-8");

        AsyncContext asyncContext = request.startAsync();
        asyncContext.setTimeout(0);

        ConfigPolingTask configPolingTask = new ConfigPolingTask();
        configPolingTask.setAsyncContext(asyncContext);
        configPolingTask.setConfigPolingDataMap(configIdMap);
        configPolingTask.setEndTime(System.currentTimeMillis() + 28 * 1000);
        configService.configListener(configPolingTask);
        return null;
    }

主要就是把请求的配置id和版本的map、超时时间、asyncContext对象组装成一个任务,添加到任务池里,如有更新了配置,会去任务池里找是否有该配置id的任务,如果版本号大于任务的版本号,就将新配置返回给客户端。于此同时会有1个定时线程每1秒访问一下任务池,找到过期的任务,返回给客户端。客户端的请求过期时间是30秒,服务端过期时间定的是28秒,也就是配置没有改变的情况下,会hold请求28秒才返回,提前2秒返回是为了防止返回传输时间导致超过30秒,客户端断开链接。
 

@Slf4j
@Service
public class ConfigServiceImpl implements ConfigService {

    private ConfigDAO configDAO;
    private ConfigSyncService configSyncService;

    @Autowired
    private LocalConfigDAO localConfigDAO;

    @Autowired
    private LocalConfigSyncServiceImpl localConfigSyncService;

    @Value("${config.center.mode:0}")
    private int configCenterMode;

    private int respThreadNum;
    private final ExecutorService respExecutor;
    private final ConfigPolingTasksHolder configPolingTasksHolder;

    public ConfigServiceImpl() {
        configPolingTasksHolder = new ConfigPolingTasksHolder();
        //构建用于响应长轮询的线程池
        respExecutor = new ThreadPoolExecutor(100, 5000,
                0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(102400),
                this::newRespThread,
                new ThreadPoolExecutor.CallerRunsPolicy());
        //每1秒轮询执行一次任务超时检测
        ScheduledExecutorService timeoutCheckExecutor = new ScheduledThreadPoolExecutor(1, this::newCheckThread);
        timeoutCheckExecutor.scheduleAtFixedRate(this::responseTimeoutTask, 0, 1, TimeUnit.SECONDS);
    }

    @PostConstruct
    public void init() {
        ConfigCenterModeEnum configCenterModeEnum = ConfigCenterModeEnum.getEnum(configCenterMode);
        if (configCenterModeEnum == null) {
            throw new IllegalArgumentException("配置config.center.mode错误");
        }
        if (configCenterModeEnum == ConfigCenterModeEnum.STANDALONE) {
            this.configDAO = localConfigDAO;
            this.configSyncService = localConfigSyncService;
        }
    }

    @Override
    public Result<Void> insertConfig(ConfigBO configBO) {
        List<ConfigDO> configList = configDAO.getAllValidConfig();
        if (configList.stream().anyMatch(c -> c.getName().equals(configBO.getName()))) {
            return Result.fail("配置名重复");
        }
        ConfigDO configDO = new ConfigDO();
        configDO.setName(configBO.getName());
        configDO.setConfigData(configBO.getConfigData().toJSONString());
        configDAO.insertConfigDO(configDO);
        return Result.success(null);
    }

    @Override
    public Result<Void> updateConfig(ConfigBO configBO) {
        ConfigDO configDO = new ConfigDO();
        configDO.setId(configBO.getId());
        configDO.setName(configBO.getName());
        configDO.setConfigData(configBO.getConfigData().toJSONString());
        configDAO.updateConfig(configDO);
        configSyncService.publish(configBO.getId());
        return Result.success(null);
    }

    @Override
    public Result<Void> delConfig(long id, long updateUid) {
        configDAO.delConfig(id, updateUid);
        return Result.success(null);
    }

    @Override
    public Result<List<ConfigBO>> getAllValidConfig() {
        List<ConfigDO> configList = configDAO.getAllValidConfig();
        return Result.success(configList.stream().map(this::ConfigDO2BO).collect(Collectors.toList()));
    }

    @Override
    public void configListener(ConfigPolingTask configPolingTask) {
        //先将任务加到待响应列表中,然后再判断账号是否有改变,防止并发问题
        //如先判断再加进去,加入前如有变动,任务里无法感知到,空等到超时
        configPolingTasksHolder.addConfigTask(configPolingTask);

        List<ConfigBO> allValidConfig = getAllValidConfig().getData();
        List<ConfigVO> changeConfigList = getChangeConfigList(configPolingTask, allValidConfig);
        if (!changeConfigList.isEmpty()) {
            List<ConfigPolingTask> todoTask = configPolingTasksHolder.getExecuteTaskList(configPolingTask::equals);
            if (!todoTask.isEmpty()) {
                doResponseTask(configPolingTask, Result.success(changeConfigList));
            }
        }
    }

    @Override
    public void onChangeConfigEvent(long configId) {
        List<ConfigPolingTask> todoTasks = configPolingTasksHolder.getExecuteTaskList(
                configPolingTask -> configPolingTask.getConfigPolingDataMap().containsKey(configId));

        if (!todoTasks.isEmpty()) {
            List<ConfigBO> configList = Collections.singletonList(ConfigDO2BO(configDAO.getConfig(configId)));
            todoTasks.forEach(todoTask -> {
                List<ConfigVO> changeConfigList = getChangeConfigList(todoTask, configList);
                respExecutor.submit(() -> doResponseTask(todoTask, Result.success(changeConfigList)));
            });
        }
    }

    private List<ConfigVO> getChangeConfigList(ConfigPolingTask configPolingTask, List<ConfigBO> configList) {
        Map<Long, Integer> configPolingDataMap = configPolingTask.getConfigPolingDataMap();
        return configList.stream()
                .filter(configBO -> configPolingDataMap.containsKey(configBO.getId()))
                .filter(configBO -> configBO.getVersion() > configPolingDataMap.get(configBO.getId()))
                .map(ConfigServiceImpl::configBO2ConfigVO).collect(Collectors.toList());
    }

    private ConfigBO ConfigDO2BO(ConfigDO configDO) {
        ConfigBO configBO = new ConfigBO();
        configBO.setId(configDO.getId());
        configBO.setName(configDO.getName());
        configBO.setVersion(configDO.getVersion());
        configBO.setCreateTime(configDO.getCreateTime());
        configBO.setConfigData(JSON.parseObject(configDO.getConfigData()));
        return configBO;
    }

    //响应超时未改变的任务
    private void responseTimeoutTask() {
        List<ConfigPolingTask> timeoutTasks = configPolingTasksHolder.getExecuteTaskList(
                configPolingTask -> System.currentTimeMillis() >= configPolingTask.getEndTime());

        timeoutTasks.forEach(timeoutTask -> respExecutor.submit(() ->
                doResponseTask(timeoutTask, Result.success(new ArrayList<>()))));
    }

    private void doResponseTask(ConfigPolingTask configPolingTask, Result<?> result) {
        AsyncContext asyncContext = configPolingTask.getAsyncContext();
        try (PrintWriter writer = asyncContext.getResponse().getWriter()) {
            writer.write(JSON.toJSONString(result));
            writer.flush();
        } catch (Exception e) {
            log.error("doResponseTimeoutTask error,task:{}", configPolingTask, e);
        } finally {
            asyncContext.complete();
        }
    }

    private Thread newCheckThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true);
        t.setName("ConfigLongPollingTimeoutCheckExecutor");
        return t;
    }

    private Thread newRespThread(Runnable r) {
        Thread t = new Thread(r);
        t.setDaemon(true);
        t.setName("ConfigLongPollingTimeoutRespExecutor-" + respThreadNum++);
        return t;
    }

    public static ConfigVO configBO2ConfigVO(ConfigBO configBO) {
        ConfigVO configVO = new ConfigVO();
        configVO.setId(configBO.getId());
        configVO.setName(configBO.getName());
        configVO.setVersion(configBO.getVersion());
        configVO.setConfigData(configBO.getConfigData());
        configVO.setCreateTime(DateUtil.date2str1(configBO.getCreateTime()));
        return configVO;
    }
}
public class ConfigPolingTasksHolder {

    private final List<ConfigPolingTask> configPolingTasks;

    public ConfigPolingTasksHolder() {
        configPolingTasks = new ArrayList<>();
    }

    public synchronized void addConfigTask(ConfigPolingTask configPolingTask) {
        configPolingTasks.add(configPolingTask);
    }

    //将要处理的任务在任务列表中删除,并将其放到外面执行,防止锁的时间太长
    public synchronized List<ConfigPolingTask> getExecuteTaskList(Predicate<ConfigPolingTask> predicate) {
        List<ConfigPolingTask> resultTasks = new ArrayList<>();
        configPolingTasks.removeIf(configPolingTask -> {
            boolean res = predicate.test(configPolingTask);
            if (res) {
                resultTasks.add(configPolingTask);
            }
            return res;
        });
        return resultTasks;
    }
}
@Data
public class ConfigPolingTask {
    /**
     * 截止时间
     */
    private long endTime;

    /**
     * 异步请求
     */
    private AsyncContext asyncContext;

    /**
     * 配置轮询数据(配置id,版本)
     */
    private Map<Long, Integer> configPolingDataMap;
}

客户端

客户端就很简单了,只要循环发一个超时时间是30秒的http请求就行

public void startLongPolling() {
        polling("/config/change/get/long", null, 30000);
    }

    public void polling(String uri, Runnable runnable, int readTimeout) {
        Thread thread = new Thread(() -> {
            while (!Thread.interrupted()) {
                try {
                    Optional.ofNullable(runnable).ifPresent(Runnable::run);
                    Map<Long, List<ConfigDataBO>> refreshConfigMap = new HashMap<>();
                    configMap.values().forEach(configBO -> {
                        Optional.ofNullable(configBO.getConfigDataList()).ifPresent(cdList -> cdList.stream()
                                .filter(cd -> cd.getRefreshFieldList() != null && !cd.getRefreshFieldList().isEmpty())
                                .forEach(refreshConfigMap.computeIfAbsent(configBO.getId(), k1 -> new ArrayList<>())::add));
                    });
                    if (refreshConfigMap.isEmpty()) {
                        return;
                    }
                    Map<String, Integer> configIdMap = refreshConfigMap.keySet().stream()
                            .collect(Collectors.toMap(String::valueOf, configId -> configMap.get(configId).getVersion()));
                    HttpRespBO httpRespBO = HttpUtil.httpPostJson(url + uri, JSON.toJSONString(configIdMap), readTimeout);
                    List<ConfigVO> configList = httpResp2ConfigVOList(httpRespBO);
                    if (configList.isEmpty()) {
                        continue;
                    }
                    configList.forEach(configVO -> {
                        Map<String, Object> result = new HashMap<>();
                        DataTransUtil.buildFlattenedMap(result, configVO.getConfigData(), "");
                        ConfigBO configBO = this.configMap.get(configVO.getId());
                        configBO.setVersion(configVO.getVersion());

                        List<ConfigDataBO> configDataList = configBO.getConfigDataList();
                        Map<String, ConfigDataBO> configDataMap = configDataList.stream()
                                .collect(Collectors.toMap(ConfigDataBO::getKey, Function.identity()));
                        result.forEach((key, value) -> {
                            ConfigDataBO configDataBO = configDataMap.get(key);
                            if (configDataBO == null) {
                                configDataList.add(new ConfigDataBO(key, value.toString()));
                            } else {
                                configDataBO.setValue(value.toString());
                                List<RefreshFieldBO> refreshFieldList = configDataBO.getRefreshFieldList();
                                if (refreshFieldList == null) {
                                    refreshFieldList = new ArrayList<>();
                                    configDataBO.setRefreshFieldList(refreshFieldList);
                                }
                                refreshFieldList.forEach(refreshFieldBO -> {
                                    try {
                                        Field field = refreshFieldBO.getField();
                                        field.setAccessible(true);
                                        field.set(refreshFieldBO.getBean(), value.toString());
                                    } catch (Exception e) {
                                        log.error("startShortPolling set Field error", e);
                                    }
                                });
                            }
                        });

                    });
                } catch (Exception e) {
                    log.error("startShortPolling error", e);
                }
            }
        });
        thread.setName("startShortPolling");
        thread.setDaemon(true);
        thread.start();
    }

private List<ConfigVO> httpResp2ConfigVOList(HttpRespBO httpRespBO) {
        if (!httpRespBO.success()) {
            throw new IllegalArgumentException("获取配置失败:code:" + httpRespBO.getCode() + ",msg:" + httpRespBO.getMessage());
        }
        if (httpRespBO.getBody() == null) {
            throw new IllegalArgumentException("获取配置失败 body is null:code:" + httpRespBO.getCode() + ",msg:" + httpRespBO.getMessage());
        }
        Result<?> result = JSON.parseObject(new String(httpRespBO.getBody(), StandardCharsets.UTF_8), Result.class);
        if (result.failed()) {
            throw new IllegalArgumentException("获取配置失败 result:" + result);
        }
        return JSON.parseArray(JSON.toJSONString(result.getData()), ConfigVO.class);
    }
public class ClientTest {

    private String userName;

    private String userAge;

    private List<Object> education;

    public ClientTest() throws NoSuchFieldException {
        ConfigCenterClient configCenterClient = new ConfigCenterClient("http://localhost:8088");
        Map<String, String> configProperty = configCenterClient.getConfigProperty();
        this.userName = configProperty.get("user.name");
        this.userAge = configProperty.get("user.age");
        this.education = new ArrayList<>();
        int i = 0;
        while (configProperty.containsKey("user.education[" + i + "]")) {
            education.add(configProperty.get("user.education[" + (i++) + "]"));
        }

        configCenterClient.addRefreshField("user.name", new RefreshFieldBO(this, ClientTest.class.getDeclaredField("userName")));
        configCenterClient.startLongPolling();
    }

    public String toString() {
        return "姓名:" + userName + ",年龄:" + userAge + ",教育经历:" + education;
    }

    public static void main(String[] args) throws NoSuchFieldException, InterruptedException {
        ClientTest clientTest = new ClientTest();
        while (!Thread.interrupted()) {
            System.out.println(clientTest);
            Thread.sleep(1000);
        }
    }
}

效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1494942.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

总结归纳Kubernetes | 一站式速查知识,助您轻松驾驭容器编排技术(水平扩展控制)

Kubernetes&#xff0c;亦被称为K8s&#xff0c;是业界公认的容器编排巨擘&#xff0c;以其卓越的能力简化了容器化应用的部署、扩展和管理流程。通过其强大的功能&#xff0c;Kubernetes不仅提升了应用的可靠性和可伸缩性&#xff0c;还优化了资源利用率&#xff0c;为开发者和…

MM配置1-定义、复制、删除、检查工厂

配置步骤&#xff0c;如下图&#xff1a; 双击“复制&#xff0c;删除&#xff0c;检查工厂选项” 点击“复制”按钮&#xff0c;输入参考工厂&#xff0c;和要新建的工厂 复制完成后&#xff0c;返回到上一界面&#xff0c;双击“定义工厂”选项 选中新建的1070工厂&#xff0…

java常用技术栈,java面试带答案

前言 我们从一个问题引入今天的主题。 在日常业务开发中&#xff0c;我们可能经常听到 DBA 对我们说“不要”&#xff08;注意&#xff1a;不是禁止&#xff09;使用 join&#xff0c;那么为什么 DBA 对 join 这么抵触呢&#xff1f;是 join 本身有问题&#xff0c;还是我们使…

HarmonyOS 数据持久化之首选项 preferences

接下来 我们来说说数据持久化的问题 我们平时处理的数据都是在内存中处理的 我们应用一点重启 所有数据都消失了 肯恩是不符合一些场景的 harmonyos中 提供了比较多的解决方案 最多的两种是 用户首选项 关系型数据库 本文的话 我们就来看看 用户首选项 首先&#xff0c;什么…

Android开发新手入门教程,android笔试面试题

前言 尤其是在最近一段时间内&#xff0c;感觉一天天的时间过得又慢又快&#xff0c;慢的是感觉复工了以后在公司的8.9个小时简直算是煎熬了&#xff0c;快的是常常感觉时间一天天&#xff0c;一月月的过去了&#xff0c;可是发现自己还在原路踏步走。看似每天忙成狗&#xff…

【数学建模】层次分析

1.建立递阶层次结构模型 2.构造出各层次中的所有判断矩阵 对指标的重要性进行两两比较&#xff0c;构造判断矩阵&#xff0c;科学求出权重 矩阵中元素aij的意义是&#xff0c;第i个指标相对第j个指标的重要程度 对角线1&#xff0c;aijaji1 矛盾——>一致性检验

win11配置Mask DINO小白踩坑记录

win11配置Mask DINO踩坑记录 1 准备工作2 创建python环境和安装detectron22.1 安装前提2.2 安装流程2.2.1 cl.exe的错误2.2.2 SetuptoolsDeprecationWarning的错误 3 MaskDINO运行3.1 运行demo 前情提要&#xff1a;需要复现Mask DINO&#xff0c;但是实验室没有Linux的电脑&am…

+++

解法&#xff1a; 显然a可以为aik&#xff08;i为整数&#xff09;&#xff0c;b可以为bjk(j为整数&#xff09;。 若aikbjk。假定i>j&#xff0c;那么a<b。可以得到b-a(i-j)k 我是傻逼&#xff0c;不放代码了 #include<iostream> #include<vector> #inc…

Tomcat基础与Nginx的动静分离

一、TOMCAT基础功能 &#xff08;一&#xff09;自动解压war包 在配置文件中讲到&#xff0c;当接受到请求后&#xff0c;会匹配符合要求的Host&#xff0c;在配置文件中的Host只有一个&#xff0c;且规定了自动解压war包 自动解压war包 .war&#xff1a;WebApp打包,类zip格…

【ESP32 IDF】SPI层次结构SPI协议与SPI控制器结构

文章目录 前言一、SPI 程序层次1.1 硬件原理图1.2 硬件框图1.3 软件层次 二、SPI协议2.1 硬件连线2.2 如何访问SPI设备2.3 SPI 框图 总结 前言 SPI&#xff08;Serial Peripheral Interface&#xff09;是一种常见的串行通信协议&#xff0c;用于在微控制器和外部设备之间进行…

Qt QtCreator打开pro项目时出现假死现象

在Windows系统下&#xff0c;QtCreator打开pro项目时出现假死现象&#xff0c;左侧项目树形图无法展开&#xff0c;项目根节点一直在转圈。尝试关掉所有QtCreator进程&#xff0c;重新打开pro也是无济于事。 解决方案&#xff1a; 打开“运行”窗口&#xff08;快捷键&#x…

鸿蒙文章专题-2021年鸿蒙相关的文章废弃

#原因 至于为什么说2021年我的鸿蒙专栏的文章废弃了&#xff0c;只是说没有了参考意义&#xff0c;是因为鸿蒙4.0以前的版本语言从以Java为主过渡为以ArkTS为主。以前的Java版本的工程已经无法再使用了&#xff0c;后续的开发都必须以ArkTS开发语言为主。 其中而且整个项目结构…

操作教程|使用MeterSphere对恒生UFX系统进行压力测试

恒生UFX&#xff08;United Finance Exchange&#xff0c;统一金融交换&#xff09;系统&#xff08;以下简称为“UFX系统”&#xff09;&#xff0c;是一款帮助证券公司统一管理外部接入客户的系统&#xff0c;该系统整体上覆盖了期货、证券、基金、银行、信托、海外业务等各类…

Python 服务实现可观测性最佳实践

前言 本次实践主要是介绍 Python 服务通过无侵入的方式接入观测云进行全面的可观测。 环境信息 系统环境&#xff1a;主机环境开发语言&#xff1a;Python2.7APM 探针包&#xff1a;ddtrace 接入方案 准备工作 安装 DataKit 主机安装 DataKit # 需要把token 改成观测云…

xss.haozi.me靶机 通关

0x00 没有任何过滤可以直接注入<img srcx οnerrοralert(1)> 0x01 使用了testarea标签将我们的输入内容以普通的字符串进行展示 但是我们可以将标签进行闭合 </textarea><img srcx οnerrοralert(1)> 0x02 我们依然可以先闭合之后添加属性a" οncl…

Mendix 使用OIDC组件实现SSO|Azure Microsoft Entra ID 集成(原名:AD)

引言 在快节奏的软件开发领域&#xff0c;Mendix作为一款领先的低代码开发平台&#xff0c;为企业提供了快速构建、部署和迭代应用程序的能力。这种灵活性和效率使得Mendix成为了推动数字化转型的强大工具。随着企业应用数量的激增&#xff0c;单点登录&#xff08;SSO&#x…

【debug】element-ui时间控件回显后不可编辑且显示为空

问题&#xff1a;使用element-ui的时间控件回显数据&#xff0c;编辑数据没有反应&#xff1a;点时间和“确认”按钮都没反应。 输入框中会显示数据&#xff0c;但提交时的校验显示为空。 <el-form-item label"开始时间" prop"limitStartTime"><…

【数据结构】堆的TopK问题

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解堆的TopK问题&#xff0c;如果你觉得我写的还不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 目录 一. 前言二. TopK三. 代码 一. 前言 TOP-K问题&#xff1a;即求数据结合中前K个最大的元…

专业145+总分410+西工大西北工业大学827信号与系统考研经验电子信息与通信工程,海航,真题,大纲,参考书。

经过一年的努力&#xff0c;分数终于出来。今年专业课827信号与系统145&#xff08;很遗憾差了一点点满分&#xff0c;没有达到Jenny老师的最高要求&#xff09;&#xff0c;数一130&#xff0c;英语和政治也都比较平衡&#xff0c;总分410分&#xff0c;当然和信息通信考研Jen…

类变量和类方法【静态变量 static】

类变量和类方法【静态变量 static】 类变量什么是类变量如何定义类变量如何访问类变量类变量使用注意事项和细节 类方法类方法的基本介绍类方法的调用类方法应用案例类方法经典的使用场景类方法使用注意事项和细节 类变量 什么是类变量 类变量&#xff0c;也叫静态属性/静态变…