1 简介
如图,多个客户端发送请求调用(消费者)项目中的findOne方法,这时候在这个项目中的线程池中会发申请与请求数量相同的线程数,对EurekaServiceProvider(服务提供者)的getUserById方法发起调用,每个线程都要调用一次,在高并发的场景下,这样势必会对服务提供者项目产生巨大的压力。
请求合并就是将单个请求合并成一个请求,去调用服务提供者,从而降低服务提供者负载的,一种应对高并发的解决办法
2 请求合并的原理
NetFlix在Hystrix为我们提供了应对高并发的解决方案----请求合并,如下图
通过请求合并器设置延迟时间,将时间内的,多个请求单个的对象的方法中的参数(id)取出来,拼成符合服务提供者的多个对象返回接口(getUsersByIds方法)的参数,指定调用这个接口(getUsersByIds方法),返回的对象List再通过一个方法(mapResponseToRequests方法),按照请求的次序将结果对象对应的装到Request对应的Response中返回结果。
3 请求合并适用的场景
在服务提供者提供了返回单个对象和多个对象的查询接口,并且单个对象的查询并发数很高,服务提供者负载较高的时候,我们就可以使用请求合并来降低服务提供者的负载
4 请求合并带来的问题
问题:即然请求合并这么好,我们是否就可以将所有返回单个结果的方法都用上请求合并呢?答案自然是否定的!
原因:
1. 我们为这个请求人为的设置了延迟时间,这样在并发不高的接口上使用请求缓存,会降低响应速度
2. 实现请求合并比较复杂
5 实现方式
修改OrderService,添加下面方法
@HystrixCollapser(batchMethod = "findAll", collapserProperties = {
@HystrixProperty(name = "timerDelayInMilliseconds", value = "300")
})
public Future<String> findOne(Integer id) {
System.out.println("被合并的请求");
return null;
}
@HystrixCommand
public List<String> findAll(List<Integer> ids) {
System.out.println("合并的请求");
String url = "http://study-user/getUserAll?ids={1}";
return restTemplate.getForObject(url, List.class, StringUtils.join(ids, ","));
}
在OrderController进行测试
@RequestMapping("/findTest")
public String findTest() throws ExecutionException, InterruptedException {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
Future<String> f1 = orderService.findOne(1);
Future<String> f2 = orderService.findOne(2);
Thread.sleep(150);
Future<String> f3 = orderService.findOne(3);
String res = f1.get() + " , " + f2.get() + " , " + f3.get();
context.close();
return res;
}
测试接口
查看日志输出
注解讲解:
以下面代码为例
@HystrixCommand(groupKey = "productStockOpLog", commandKey = "addProductStockOpLog",
fallbackMethod = "addProductStockOpLogFallback",
commandProperties = {
@HystrixProperty(name =
"execution.isolation.thread.timeoutInMilliseconds", value = "400"),//指定多久超时,单位毫秒。超
时进fallback
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value
= "10"),//判断熔断的最少请求数,默认是10;只有在一个统计窗口内处理的请求数量达到这个阈值,才会进行熔断与否
的判断
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",
value = "10"),//判断熔断的阈值,默认值50,表示在一个统计窗口内有50%的请求处理失败,会触发熔断
}
)
public void addProductStockOpLog(Long sku_id, Object old_value, Object new_value) throws
Exception {
if (new_value != null && !new_value.equals(old_value)) {
doAddOpLog(null, null, sku_id, null, ProductOpType.PRODUCT_STOCK, old_value != null
? String.valueOf(old_value) : null, String.valueOf(new_value), 0, "C端", null);
}
}
public void addProductStockOpLogFallback(Long sku_id, Object old_value, Object new_value)
throws Exception {
LOGGER.warn("发送商品库存变更消息失败,进入Fallback,skuId:{},oldValue:{},newValue:{}",
sku_id, old_value, new_value);
}
@HystrixCommand(groupKey="UserGroup", commandKey = "GetUserByIdCommand",
commandProperties = {
@HystrixProperty(name =
"execution.isolation.thread.timeoutInMilliseconds", value = "100"),//指定多久超时,单位毫秒。超
时进fallback
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value
= "10"),//判断熔断的最少请求数,默认是10;只有在一个统计窗口内处理的请求数量达到这个阈值,才会进行熔断与否
的判断
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",
value = "10"),//判断熔断的阈值,默认值50,表示在一个统计窗口内有50%的请求处理失败,会触发熔断
},
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "30"),
@HystrixProperty(name = "maxQueueSize", value = "101"),
@HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
@HystrixProperty(name = "metrics.rollingStats.numBuckets", value =
"12"),
@HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds",
value = "1440")
})
超时时间(默认1000ms,单位:ms)
(1)hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds
在调用方配置,被该调用方的所有方法的超时时间都是该值,优先级低于下边的指定配置
(2)hystrix.command.HystrixCommandKey.execution.isolation.thread.timeoutInMilliseconds
在调用方配置,被该调用方的指定方法(HystrixCommandKey方法名)的超时时间是该值
线程池核心线程数 hystrix.threadpool.default.coreSize(默认为10)
Queue
(1)hystrix.threadpool.default.maxQueueSize(最大排队长度。默认-1,使用SynchronousQueue。其他值则使用 LinkedBlockingQueue。如果要从-1换成其他值则需重启,即该值不能动态调整,若要动态调整,需要使用到下边 这个配置)
(2)hystrix.threadpool.default.queueSizeRejectionThreshold(排队线程数量阈值,默认为5,达到时拒 绝,如果配置了该选项,队列的大小是该队列)
注意:如果maxQueueSize=-1的话,则该选项不起作用
断路器
(1)hystrix.command.default.circuitBreaker.requestVolumeThreshold(当在配置时间窗口内达到此数量 的失败后,进行短路。默认20个)
For example, if the value is 20, then if only 19 requests are received in the rolling
window (say a window of 10 seconds) the circuit will not trip open even if all 19 failed.
简言之,10s内请求失败数量达到20个,断路器开。
(2)hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds(短路多久以后开始尝试是否恢复,默认5s)
(3)hystrix.command.default.circuitBreaker.errorThresholdPercentage(出错百分比阈值,当达到此阈值后,开始短路。默认50%)
fallback
hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests(调用线程允许请求HystrixCommand.GetFallback()的最大数量,默认10。超出时将会有异常抛出,注意:该项配置对于THREAD隔离模式也起作用)