项目接口请求合并

news2025/1/11 8:01:11

请求合并到底有什么意义呢?我们来看下图。

图片

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

图片

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

技术手段

  • LinkedBlockQueue 阻塞队列

  • ScheduledThreadPoolExecutor 定时任务线程池

  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

代码实现

查询用户的代码

public interface UserService {

    Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
}
@Service
public class UserServiceImpl implements UserService {

    @Resource
    private UsersMapper usersMapper;

    @Override
    public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
        // 全部参数
        List<Long> userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
        QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
        // 用in语句合并成一条SQL,避免多次请求数据库的IO
        queryWrapper.in("id", userIds);
        List<Users> users = usersMapper.selectList(queryWrapper);
        Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
        HashMap<String, Users> result = new HashMap<>();
        userReqs.forEach(val -> {
            List<Users> usersList = userGroup.get(val.getUserId());
            if (!CollectionUtils.isEmpty(usersList)) {
                result.put(val.getRequestId(), usersList.get(0));
            } else {
                // 表示没数据
                result.put(val.getRequestId(), null);
            }
        });
        return result;
    }
}

合并请求的实现

package com.springboot.sample.service.impl;

import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

/***
 * zzq
 * 包装成批量执行的地方
 * */
@Service
public class UserWrapBatchService {
    @Resource
    private UserService userService;

    /**
     * 最大任务数
     **/
    public static int MAX_TASK_NUM = 100;


    /**
     * 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
     * CompletableFuture将处理结果返回
     */
    public class Request {
        // 请求id 唯一
        String requestId;
        // 参数
        Long userId;
        //TODO Java 8 的 CompletableFuture 并没有 timeout 机制
        CompletableFuture<Users> completableFuture;

        public String getRequestId() {
            return requestId;
        }

        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }

        public Long getUserId() {
            return userId;
        }

        public void setUserId(Long userId) {
            this.userId = userId;
        }

        public CompletableFuture getCompletableFuture() {
            return completableFuture;
        }

        public void setCompletableFuture(CompletableFuture completableFuture) {
            this.completableFuture = completableFuture;
        }
    }

    /*
    LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
    LinkedBlockingQueue与ArrayBlockingQueue的区别
    ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
    ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
    两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
    而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
    也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
     */
    private final Queue<Request> queue = new LinkedBlockingQueue();

    @PostConstruct
    public void init() {
        //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        scheduledExecutorService.scheduleAtFixedRate(() -> {
            int size = queue.size();
            //如果队列没数据,表示这段时间没有请求,直接返回
            if (size == 0) {
                return;
            }
            List<Request> list = new ArrayList<>();
            System.out.println("合并了 [" + size + "] 个请求");
            //将队列的请求消费到一个集合保存
            for (int i = 0; i < size; i++) {
                // 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
                if (i < MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
            List<Request> userReqs = new ArrayList<>();
            for (Request request : list) {
                userReqs.add(request);
            }
            //将参数传入service处理, 这里是本地服务,也可以把userService 看成RPC之类的远程调用
            Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
            //将处理结果返回各自的请求
            for (Request request : list) {
                Users result = response.get(request.requestId);
                request.completableFuture.complete(result);    //completableFuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了
            }
        }, 100, 10, TimeUnit.MILLISECONDS);
        //scheduleAtFixedRate是周期性执行 schedule是延迟执行 initialDelay是初始延迟 period是周期间隔 后面是单位
        //这里我写的是 初始化后100毫秒后执行,周期性执行10毫秒执行一次
    }

    public Users queryUser(Long userId) {
        Request request = new Request();
        // 这里用UUID做请求id
        request.requestId = UUID.randomUUID().toString().replace("-", "");
        request.userId = userId;
        CompletableFuture<Users> future = new CompletableFuture<>();
        request.completableFuture = future;
        //将对象传入队列
        queue.offer(request);
        //如果这时候没完成赋值,那么就会阻塞,直到能够拿到值
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
}

控制层调用

/***
 * 请求合并
 * */
@RequestMapping("/merge")
public Callable<Users> merge(Long userId) {
    return new Callable<Users>() {
        @Override
        public Users call() throws Exception {
            return userBatchService.queryUser(userId);
        }
    };
}

Callable是什么可以参考:

https://blog.csdn.net/baidu_19473529/article/details/123596792

模拟高并发查询的代码

package com.springboot.sample;

import org.springframework.web.client.RestTemplate;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class TestBatch {
    private static int threadCount = 30;

    private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount); //为保证30个线程同时并发运行

    private static final RestTemplate restTemplate = new RestTemplate();

    public static void main(String[] args) {


        for (int i = 0; i < threadCount; i++) {//循环开30个线程
            new Thread(new Runnable() {
                public void run() {
                    COUNT_DOWN_LATCH.countDown();//每次减一
                    try {
                        COUNT_DOWN_LATCH.await(); //此处等待状态,为了让30个线程同时进行
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    for (int j = 1; j <= 3; j++) {
                        int param = new Random().nextInt(4);
                        if (param <=0){
                            param++;
                        }
                        String responseBody = restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class);
                        System.out.println(Thread.currentThread().getName() + "参数 " + param + " 返回值 " + responseBody);
                    }
                }
            }).start();

        }
    }
}

测试效果

图片

图片

要注意的问题

  • Java 8 的 CompletableFuture 并没有 timeout 机制

  • 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行(本例中加了MAX_TASK_NUM判断)

使用队列的超时解决Java 8 的 CompletableFuture 并没有 timeout 机制

  

核心代码

package com.springboot.sample.service.impl;

import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

/***
 * zzq
 * 包装成批量执行的地方,使用queue解决超时问题
 * */
@Service
public class UserWrapBatchQueueService {
    @Resource
    private UserService userService;

    /**
     * 最大任务数
     **/
    public static int MAX_TASK_NUM = 100;


    /**
     * 请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
     * CompletableFuture将处理结果返回
     */
    public class Request {
        // 请求id
        String requestId;

        // 参数
        Long userId;
        // 队列,这个有超时机制
        LinkedBlockingQueue<Users> usersQueue;


        public String getRequestId() {
            return requestId;
        }

        public void setRequestId(String requestId) {
            this.requestId = requestId;
        }

        public Long getUserId() {
            return userId;
        }

        public void setUserId(Long userId) {
            this.userId = userId;
        }

        public LinkedBlockingQueue<Users> getUsersQueue() {
            return usersQueue;
        }

        public void setUsersQueue(LinkedBlockingQueue<Users> usersQueue) {
            this.usersQueue = usersQueue;
        }
    }

    /*
    LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
    LinkedBlockingQueue与ArrayBlockingQueue的区别
    ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
    ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
    两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
    而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
    也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
     */
    private final Queue<Request> queue = new LinkedBlockingQueue();

    @PostConstruct
    public void init() {
        //定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

        scheduledExecutorService.scheduleAtFixedRate(() -> {
            int size = queue.size();
            //如果队列没数据,表示这段时间没有请求,直接返回
            if (size == 0) {
                return;
            }
            List<Request> list = new ArrayList<>();
            System.out.println("合并了 [" + size + "] 个请求");
            //将队列的请求消费到一个集合保存
            for (int i = 0; i < size; i++) {
                // 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
                if (i < MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
            List<Request> userReqs = new ArrayList<>();
            for (Request request : list) {
                userReqs.add(request);
            }
            //将参数传入service处理, 这里是本地服务,也可以把userService 看成RPC之类的远程调用
            Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
            for (Request userReq : userReqs) {
                // 这里再把结果放到队列里
                Users users = response.get(userReq.getRequestId());
                userReq.usersQueue.offer(users);
            }

        }, 100, 10, TimeUnit.MILLISECONDS);
        //scheduleAtFixedRate是周期性执行 schedule是延迟执行 initialDelay是初始延迟 period是周期间隔 后面是单位
        //这里我写的是 初始化后100毫秒后执行,周期性执行10毫秒执行一次
    }

    public Users queryUser(Long userId) {
        Request request = new Request();
        // 这里用UUID做请求id
        request.requestId = UUID.randomUUID().toString().replace("-", "");
        request.userId = userId;
        LinkedBlockingQueue<Users> usersQueue = new LinkedBlockingQueue<>();
        request.usersQueue = usersQueue;
        //将对象传入队列
        queue.offer(request);
        //取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
        try {
            return usersQueue.poll(3000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}
...省略..

    @Override
    public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
        // 全部参数
        List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
        QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
        // 用in语句合并成一条SQL,避免多次请求数据库的IO
        queryWrapper.in("id", userIds);
        List<Users> users = usersMapper.selectList(queryWrapper);
        Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
        HashMap<String, Users> result = new HashMap<>();
        // 数据分组
        userReqs.forEach(val -> {
            List<Users> usersList = userGroup.get(val.getUserId());
            if (!CollectionUtils.isEmpty(usersList)) {
                result.put(val.getRequestId(), usersList.get(0));
            } else {
                // 表示没数据 , 这里要new,不然加入队列会空指针
                result.put(val.getRequestId(), new Users());
            }
        });
        return result;
    }

...省略...

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

源码:https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1082158.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PO、DTO、VO、BO到底是什么用在哪一层

简介 PO &#xff08;Persistant Object&#xff09;&#xff1a;此对象与数据库表结构一一对应&#xff0c;通过 DAO 层向上传输数据源对象。DTO&#xff08;Data Transfer Object&#xff09;&#xff1a;数据传输对象&#xff0c;Service 或 Manager 向外传输的对象。BO&a…

linux C++ vscode连接mysql

1.linux使用Ubuntu 2.Ubuntu安装vscode 2.1 安装的是snap版本,直接打开命令行执行 sudo snap install --classic code 3.vscode配置C 3.1 直接在扩展中搜索C安装即可 我安装了C, Chinese, code runner, 安装都是同理 4.安装mysql sudo apt update sudo apt install mysql-…

【算法】双向冒泡排序

// // Created by Lunau on 2023/10/11. // #include<stdio.h> #include <cstdlib>void swap(int &i,int &j) {int tp i;i j;j tp; }/*** 双向冒泡排序&#xff0c;采用双指针* param arr* param len*/ void Sort(int arr[], int len) {int p0, q len-…

干货!SRC漏洞挖掘项目实战经验分享

目录 一、hunter上搜索web.title”nacos”&#xff0c;查找中国境内的资产&#xff0c;定位到两个地址。 二、访问一下8086端口&#xff0c;界面很明显是nacos&#xff0c;直接抓包&#xff0c;创建用户。 三、登录网站&#xff0c;里面看到配置管理。 四、查看下redis.yml…

驾驶数字未来:汽车业界数字孪生技术的崭新前景

随着数字化时代的到来&#xff0c;汽车行业正经历着前所未有的变革。数字孪生技术&#xff0c;作为一种前沿的数字化工具&#xff0c;正在为汽车行业带来革命性的影响&#xff0c;不仅改变着汽车制造和维护的方式&#xff0c;也为消费者带来了前所未有的体验。让我们一起探讨&a…

许战海战略文库|无增长则衰亡:中小型制造企业增长困境

竞争环境不是匀速变化&#xff0c;而是加速变化。企业的衰退与进化、兴衰更迭在不断发生&#xff0c;这成为一种不可避免的现实。事实上&#xff0c;在产业链竞争中增长困境不分企业大小&#xff0c;而是一种普遍存在的问题&#xff0c;许多收入在1亿至10亿美元间的制造企业也同…

【WSN】模拟无线传感器网络研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【python海洋专题十七】读取几十年的OHC数据,画四季图

本期内容 读取多年数据&#xff0c;画四季图 Part01. 多年数据处理 图片 多年年平均的OHC分布 图片 春夏秋冬&#xff1a; ohc_all_new np.reshape(ohc_new, (12, 80, 29, 27), order‘C’) ohc_all_season_mean np.mean(ohc_all_new, axis1) ohc_season np.reshape(o…

移动网络 为何ping 不通自己的公网 IP?

移动网络 ping 不通自己的公网 ip&#xff0c;显示请求超时怎么办&#xff1f; 题主所谓的公网IP&#xff0c;并没有配置在任何主机的TCP/IP协议栈上&#xff0c;所以Ping不通。在浩瀚的互联网上去寻找答案&#xff0c;包括ChatGPT机器人&#xff0c;答案就是“禁Ping了“。可…

js数组按照id删除

1.使用filter()方法实现JavaScript数组id删除 &#xff08;该方法写在methods里面&#xff09; DelSkill(row){ console.log(row,"ww"); this.personSkillTableData this.personSkillTableData.filter(item > item.skillId ! row.skillId) }, 在上述代码中&am…

2023年天猫双11:草柴APP如何找到2023天猫双十一优惠券怎么领取使用?

2023年天猫双11活动整体分为三波&#xff0c;第一波为现货售卖&#xff0c;第二波为双11红包雨&#xff0c;第三波为尾款支付。2023天猫双十一可参加活预售、满减、红包、优惠券。具体如何使用草柴APP找到2023天猫双11要购买商品的优惠券&#xff0c;请查看详细教程。 草柴APP如…

十六、代码校验(1)

本章概要 测试 单元测试JUnit测试覆盖率的幻觉 你永远不能保证你的代码是正确的&#xff0c;你只能证明它是错的。 让我们先暂停编程语言特性的学习&#xff0c;看看一些代码基础知识。特别是能让你的代码更加健壮的知识。 测试 如果没有测试过&#xff0c;它就是不能工作的…

EPLAN_003#常用功能(三)

一、图形编辑 按空格键结束 二、对象捕捉、设计模式 设计模式类似于CAD中 基于基准点 复制等功能 三、比例缩放、拉伸、修剪、修改长度、圆角、倒角 四、标注 五、窗口宏、符号宏&#xff08;在编辑菜单中&#xff0c;或者右键&#xff09; 快捷键&#xff1a;CTRLF5 打开的时候…

Matlab之数组、包含分配给类别的值函数categorical

一、功能 categorical 是为一组有限的离散类别&#xff08;例如 High、Med 和 Low&#xff09;赋值的数据类型。这些类别可以采用您指定的数学排序&#xff0c;例如 High > Med > Low&#xff0c;但这并非必须。分类数组可用来有效地存储并方便地处理非数值数据&#xf…

2023-10-11 LeetCode每日一题()

2023-10-11每日一题 一、题目编号 2512. 奖励最顶尖的 K 名学生二、题目链接 点击跳转到题目位置 三、题目描述 给你两个字符串数组 positive_feedback 和 negative_feedback &#xff0c;分别包含表示正面的和负面的词汇。不会 有单词同时是正面的和负面的。 一开始&…

ROS学习笔记(六)---服务通信机制

1. 服务通信是什么 在ROS中&#xff0c;服务通信机制是一种点对点的通信方式&#xff0c;用于节点之间的请求和响应。它允许一个节点&#xff08;服务请求方&#xff09;向另一个节点&#xff08;服务提供方&#xff09;发送请求&#xff0c;并等待响应。 服务通信机制在ROS中…

极坐标和直角坐标的积分转换

如下图所示&#xff0c;当 θ 不变&#xff0c; ρ 增大为 ρ Δ ρ ( 即 ρ 1 ) 时 \theta不变&#xff0c;\rho增大为\rho \Delta \rho(即\rho_1)时 θ不变&#xff0c;ρ增大为ρΔρ(即ρ1​)时, 面积的增量为&#xff1a; 1 2 θ ( ρ Δ ρ ) 2 − 1 2 θ ρ 2 θ ρ…

第二章 进程与线程 十九、死锁的概念

目录 一、定义 二、死锁、饥饿和死循环的区别 三、死锁的必要条件 四、死锁的处理策略 五、总结 一、定义 死锁是指两个或多个进程等待对方释放自己所持有的资源&#xff0c;导致所有进程都被阻塞&#xff0c;无法继续执行。这种情况可能会导致系统瘫痪&#xff0c;需要通…

(二)实现Bean属性依赖注入功能【手撸Spring】

一、前言 在上一篇手撸Spring之实现一个简易版IoC容器&#xff0c;我们先把最简单的IoC架子搭了起来&#xff0c;即实现了一个Bean的注入&#xff0c;体会了依赖反转的过程。这里提到的依赖反转&#xff0c;大家肯定非常耳熟了&#xff0c;也就是将业务对Bean的依赖反转了&…

【网络】网络编程——带你手搓简易TCP服务端(echo服务器)+客户端(四种版本)

这里写自定义目录标题 前言正式开始用生活中的例子来讲解TCP服务端和客户端代码讲解服务端基本框架创建套接字 bindlisten监听accept接收连接通信单线程版多进程①版多进程②版多线程版线程池版 客户端 收尾 前言 本篇主要讲解套接字编程&#xff0c;以TCP服务端和客户端为主…