这是本人学习的总结,主要学习资料如下
- 马士兵教育
- 1、Overview
- 2、execute()
- 2.1、Overview
- 2.2、示例
- 3、queue()
- 3.1、Overview
- 3.2、示例
- 4、observe()
- 4.1、Overview
- 4.2、示例
- 5、toObservable()
- 5.1、observe()和toObservable()的区别
1、Overview
我们知道Hystrix
是通过HystrixCommand
或HystrixObservableCommand
两个对象来处理服务请求,服务端以此来简单快速地实现熔断,限流,服务降级等功能。
这两个对象有四个方法execute(),queue(),observe(),toObservable()
,这篇文章就是讲解这四个方法如何使用,以及各自有什么特性。
这个Hytrix
的架构流程图。
2、execute()
2.1、Overview
HystrixCommand
都拥有这个方法。
execute()
是阻塞式的执行逻辑。从源码中能看出来它获取到Future
后直接执行了get()
方法,自然线程就会阻塞直到对应的job
执行完成返回结果。
public R execute() {
try {
return this.queue().get();
} catch (Exception var2) {
throw Exceptions.sneakyThrow(this.decomposeException(var2));
}
}
2.2、示例
首先是写一个实现HytrixCommnad
的类,复写其中的run()
方法。实际开发中run()
方法里就应该写我们的逻辑代码。这里只是简单打印一下Hello World
。
为了验证execute()
是阻塞式执行,我特意让线程沉睡了800ms
。
public class CommandDemo extends HystrixCommand<String> {
String name;
public CommandDemo(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("command")));
this.name = name;
}
@Override
protected String run() throws Exception {
// 模拟业务逻辑
Thread.sleep(800);
String result = "Hello, World! I'm " + name;
return result;
}
}
这是测试代码,可以看到执行时间永远是大于800ms
,所以execute()
是阻塞式执行。
public class CommandTest {
@Test
public void execute() {
HystrixCommand<String> commandDemo = new CommandDemo("demo case");
long before = System.currentTimeMillis();
String execute = commandDemo.execute();
long after = System.currentTimeMillis();
System.out.println(after - before);
System.out.println(execute);
}
}
3、queue()
3.1、Overview
只有HystrixComand
拥有这个方法。
queue()
是阻塞式的。它返回一个Future
对象,get()
方法会阻塞我们的线程。
3.2、示例
实现HystrixCommand
的代码不变,这里只展示调用queue()
的代码。
因为是非阻塞的,所以打印的时间基本不可能超过800ms
。
@Test
public void queue() throws Exception{
HystrixCommand<String> commandDemo = new CommandDemo("demo case");
long before = System.currentTimeMillis();
Future<String> queue = commandDemo.queue();
long after = System.currentTimeMillis();
System.out.println("queue方法执行的时间:" + (after - before) + "ms");
System.out.println(queue.get());
}
4、observe()
4.1、Overview
HystrixComand
和HystrixObserveCommand
拥有这个方法。
observe()
返回一个Observe
对象。它比较灵活,可以以阻塞式运行线程,也可以以非阻塞式运行线程。
4.2、示例
- 这是阻塞式运行线程。需要用到
toBlocking().single()
之类的方法,这些方法会阻塞线程直到结果返回。
@Test
public void observe() throws Exception{
HystrixCommand<String> commandDemo = new CommandDemo("demo case");
long before = System.currentTimeMillis();
Observable<String> observe = commandDemo.observe();
System.out.println(observe.toBlocking().single());
long after = System.currentTimeMillis();
System.out.println("observe方法执行的时间:" + (after - before) + "ms");
}
- 这是非阻塞式调用,主线程不会受到阻塞。我们需要使用
subscribe(Subscriber subscriber)
方法,覆写参数Subscriber
中的onCompleted()
,onError()
和onNext()
三个方法。
线程执行错误会调用onError()
,顺利执行结束后则会依次执行onNext()
和onCompleted()
在示例中,最后一行沉睡了主线程3000ms
。如果不沉睡的话主线程会早于observe
的线程结束,导致我们看不到运行结果。
@Test
public void observe2() throws Exception{
HystrixCommand<String> commandDemo = new CommandDemo("demo case");
long before = System.currentTimeMillis();
Observable<String> observe = commandDemo.observe();
observe.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("Complete...");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error...");
}
@Override
public void onNext(String s) {
System.out.println(s);
long after = System.currentTimeMillis();
System.out.println("observe方法执行的时间:" + (after - before) + "ms");
}
});
System.out.println("主线程的执行时间:" + (System.currentTimeMillis() - before) + "ms");
Thread.sleep(3000);
}
5、toObservable()
5.1、observe()和toObservable()的区别
HystrixComand
和HystrixObserveCommand
拥有这个方法。
toObservable()
返回一个Observe
对象,用法和observe()
一模一样,这里不重复展示。
observe()
和toObserve()
的区别主要是加载run()
方法的时机不同。
observe()
的执行顺序:- 执行
run()
方法。 - 加载注册
Subscribe
对象。 - 将
run()
方法的结果注入到Subscribe
对象的onNext()
方法中。
- 执行
toObservable()
的执行顺序的一二步则是相反:- 加载注册
Subscribe
对象。 - 执行
run()
方法。 - 将
run()
方法的结果注入到Subscribe
对象的onNext()
方法中。
- 加载注册
执行顺序的不同可以通过Subscribe
的onStart()
方法来验证,这个方法在onNext()
之前进行。
对于observe()
来说,onStart()
先执行;而toObservable()
则是run()
先执行。
这是验证代码,需要注意的是,在执行subscribe()
方法之前,主线程沉睡一秒,确保observe()
和toObservable()
有足够的时间完成注册。
@Test
public void toObservable() throws Exception{
HystrixCommand<String> commandDemo = new CommandDemo("demo case");
long before = System.currentTimeMillis();
Observable<String> toObservable = commandDemo.toObservable();
// Observable<String> toObservable = commandDemo.observe();
Thread.sleep(1000);
toObservable.subscribe(new Subscriber<String>() {
@Override
public void onStart() {
System.out.println("Start...");
}
@Override
public void onCompleted() {
System.out.println("Complete...");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error...");
}
@Override
public void onNext(String s) {
System.out.println(s);
long after = System.currentTimeMillis();
System.out.println("observe方法执行的时间:" + (after - before) + "ms");
}
});
System.out.println("主线程的执行时间:" + (System.currentTimeMillis() - before) + "ms");
Thread.sleep(3000);
}
这是observe()
的执行结果,run()
方法执行在前。
这是toObservable()
的执行结果,onStart()
执行在前