SpringCloud Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑

综合编程 2017-12-31

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-first-run/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本

������关注 微信公众号:【芋道源码】 有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令执行(一)之正常执行逻辑

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM 《【翻译】Hystrix文档-实现原理》「流程图」

  • 框 :Hystrix 命令执行的过程。
  • 圈 :本文分享的部分 —— 正常执行逻辑。

推荐 Spring Cloud 书籍:

2. #applyHystrixSemantics(…)

《Hystrix 源码解析 —— 执行结果缓存》 里,我们看到 #toObservable() 方法里的 第 11 至 19 行 ,当缓存特性 未开启 ,或者缓存 未命中 时,使用 applyHystrixSemantics 传入 Observable#defer(...) 方法,声明 执行命令 的 Observable。

创建 applyHystrixSemantics 变量,代码如下 :

// `AbstractCommand#toObservable()` 方法
 1: final Func0<Observable> applyHystrixSemantics = new Func0<Observable>() {
 2: @Override
 3: public Observable call(){
 4: // commandState 处于 UNSUBSCRIBED 时,不执行命令
 5: if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
 6: return Observable.never();
 7: }
 8: // 获得 执行Observable
 9: return applyHystrixSemantics(_cmd);
 10: }
 11: };
  • 第 5 至 7 行 :当 commandState 处于 UNSUBSCRIBED 时,不执行命令。
  • 第 9 行 :调用 #applyHystrixSemantics(...) 方法,获得执行 Observable 。

#applyHystrixSemantics(...) 方法,代码如下 :

 1: private Observable applyHystrixSemantics(final AbstractCommand _cmd){
 2: // TODO 【2003】【HOOK】
 3: // mark that we're starting execution on the ExecutionHook
 4: // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
 5: executionHook.onStart(_cmd);
 6: 
 7: /* determine if we're allowed to execute */
 8: if (circuitBreaker.attemptExecution()) {
 9: // 获得 信号量
10: final TryableSemaphore executionSemaphore = getExecutionSemaphore();
11: 
12: // 信号量释放Action
13: final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
14: final Action0 singleSemaphoreRelease = new Action0() {
15: @Override
16: public void call(){
17: if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
18: executionSemaphore.release();
19: }
20: }
21: };
22: 
23: // TODO 【2011】【Hystrix 事件机制】
24: final Action1 markExceptionThrown = new Action1() {
25: @Override
26: public void call(Throwable t){
27: eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
28: }
29: };
30: 
31: // 信号量 获得
32: if (executionSemaphore.tryAcquire()) {
33: try {
34: // 标记 executionResult 调用开始时间
35: /* used to track userThreadExecutionTime */
36: executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
37: 
38: // 获得 执行Observable
39: return executeCommandAndObserve(_cmd)
40: .doOnError(markExceptionThrown)
41: .doOnTerminate(singleSemaphoreRelease)
42: .doOnUnsubscribe(singleSemaphoreRelease);
43: } catch (RuntimeException e) {
44: return Observable.error(e);
45: }
46: } else {
47: return handleSemaphoreRejectionViaFallback();
48: }
49: } else {
50: return handleShortCircuitViaFallback();
51: }
52: }
  • 第 5 行 :TODO 【2003】【HOOK】
  • 第 8 行 :TODO 【2012】【链路健康度】
  • 第 10 行 :调用 #getExecutionSemaphore() 方法,获得 信号量 ( TryableSemaphore )对象,在「3. TryableSemaphore」详细解析。
  • 第 13 至 21 行 :信号量释放 Action ,用于下面【执行命令 Observable】的 #doOnTerminate(Action)#doOnUnsubscribe(Action) 方法( 见第 41 至 42 行 )。
  • 第 24 至 29 行 :TODO 【2011】【Hystrix 事件机制】
  • 第 32 行 :调用 TryableSemaphore#tryAcquire() 方法, 信号量 ( TryableSemaphore )使用成功,在「3. TryableSemaphore」详细解析。
  • 第 36 行 :标记 executionResult调用 开始时间。
  • 第 39 行 :调用 #executeCommandAndObserve() 方法,获得【执行命令 Observable】。在「4. #executeCommandAndObserve(…)」详细解析。
  • 第 43 至 45 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。
  • 第 46 至 48 行 : 信号量 ( TryableSemaphore )使用失败,调用 #handleSemaphoreRejectionViaFallback() 方法,处理信号量拒绝的失败回退逻辑,在 《Hystrix 源码解析 —— 命令执行(四)之失败回退逻辑》 详细解析。
  • 第 49 至 51 行 :链路处于 熔断 状态,调用 #handleShortCircuitViaFallback() 方法,处理链路熔断的失败回退逻辑,在 《Hystrix 源码解析 —— 命令执行(四)之失败回退逻辑》 详细解析。

3. TryableSemaphore

com.netflix.hystrix.AbstractCommand.TryableSemaphore ,Hystrix 定义的信号量 接口 。代码如下 :

interface TryableSemaphore{
 
 boolean tryAcquire();
 
 void release();
 
 int getNumberOfPermitsUsed();
}
  • 从 API 上,Java 自带的 java.util.concurrent.Semaphore 都能满足,为什么不使用它呢?继续一起往下看。

TryableSemaphore 共有两个子类实现 :

  • TryableSemaphoreNoOp
  • TryableSemaphoreActual

3.1 TryableSemaphoreNoOp

com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp无操作 的信号量。代码如下 :

/* package */static class TryableSemaphoreNoOp implements TryableSemaphore{

 public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();

 @Override
 public boolean tryAcquire(){
 return true;
 }

 @Override
 public void release(){
 
 }

 @Override
 public int getNumberOfPermitsUsed(){
 return 0;
 }

}
  • 从实现上看, #tryAcquire() 方法,每次都返回的是 true#release() 方法,无任何操作。这个是 为什么 ?在 Hystrix 里提供了两种 执行隔离策略
    • Thread ,该方式不使用信号量,因此使用 TryableSemaphoreNoOp ,这样每次调用 #tryAcquire() 都能返回 true 。在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》 详细解析该方式。
    • Semaphore ,该方式使用信号量,因此使用 TryableSemaphoreActual ,这样每次调用 #tryAcquire() 根据情况返回 true / false 。在「3.2 TryableSemaphoreActual」详细解析。

3.2 TryableSemaphoreActual

com.netflix.hystrix.AbstractCommand.TryableSemaphoreActual真正的 的信号量实现。不过实际上,TryableSemaphoreActual 更加像一个 计数器 。代码如下 :

/* package */static class TryableSemaphoreActual implements TryableSemaphore{
 protected final HystrixProperty numberOfPermits;
 private final AtomicInteger count = new AtomicInteger(0);

 public TryableSemaphoreActual(HystrixProperty numberOfPermits){
 this.numberOfPermits = numberOfPermits;
 }

 @Override
 public boolean tryAcquire(){
 int currentCount = count.incrementAndGet();
 if (currentCount > numberOfPermits.get()) {
 count.decrementAndGet();
 return false;
 } else {
 return true;
 }
 }

 @Override
 public void release(){
 count.decrementAndGet();
 }

 @Override
 public int getNumberOfPermitsUsed(){
 return count.get();
 }

}
  • numberOfPermits 属性,信号量 上限com.netflix.hystrix.strategy.properties.HystrixProperty 是一个接口,当其使用类似 com.netflix.hystrix.strategy.properties.archaius.IntegerDynamicProperty 动态 属性的实现时,可以实现动态调整信号量的 上限 ,这就是上文提到的为什么不使用 java.util.concurrent.Semaphore 的原因之一。
  • count 属性,信号量使用数量。&#55357;&#56898;,这是为什么说 TryableSemaphoreActual 更加像一个 计数器 的原因。
  • 另一个不使用 java.util.concurrent.Semaphore 的原因,TryableSemaphoreActual 无 阻塞 获取信号量的需求,使用 AtomicInteger 可以达到更轻量级的实现。

3.3 #getExecutionSemaphore()

调用 #getExecutionSemaphore() 方法,获得信号量对象,代码如下 :

/**
* 执行命令(正常执行)信号量映射
* KEY :命令名 {@link #commandKey}
*/
/* each circuit has a semaphore to restrict concurrent fallback execution */
protected static final ConcurrentHashMap executionSemaphorePerCircuit = new ConcurrentHashMap();
 
protected TryableSemaphore getExecutionSemaphore(){
 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
 if (executionSemaphoreOverride == null) {
 TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
 if (_s == null) { // 不存在时,创建 TryableSemaphoreActual
 // we didn't find one cache so setup
 executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
 // assign whatever got set (this or another thread)
 return executionSemaphorePerCircuit.get(commandKey.name());
 } else {
 return _s;
 }
 } else {
 return executionSemaphoreOverride;
 }
 } else {
 // return NoOp implementation since we're not using SEMAPHORE isolation
 return TryableSemaphoreNoOp.DEFAULT;
 }
}
  • 根据 执行隔离策略 不同获取不同的信号量实现 :
    • Thread ,该方式不使用信号量,因此使用 TryableSemaphoreNoOp 。
    • Semaphore ,该方式使用信号量,因此使用 TryableSemaphoreActual 。
      • 相同的 commandKey ,使用相同的 TryableSemaphoreActual 。

4. #executeCommandAndObserve(…)

调用 #executeCommandAndObserve(...) 方法,获得【执行命令 Observable】。代码如下 :

 1: private Observable executeCommandAndObserve(final AbstractCommand _cmd){
 2: // TODO 【】
 3: final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
 4: 
 5: // TODO 【2007】【executionResult】用途
 6: final Action1 markEmits = new Action1() {
 7: @Override
 8: public void call(R r){
 9: if (shouldOutputOnNextEvents()) {
10: executionResult = executionResult.addEvent(HystrixEventType.EMIT);
11: eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
12: }
13: if (commandIsScalar()) {
14: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
15: eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
16: executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
17: eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
18: circuitBreaker.markSuccess();
19: }
20: }
21: };
22: 
23: // TODO 【2007】【executionResult】用途
24: final Action0 markOnCompleted = new Action0() {
25: @Override
26: public void call(){
27: if (!commandIsScalar()) {
28: long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
29: eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
30: executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
31: eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
32: circuitBreaker.markSuccess();
33: }
34: }
35: };
36: 
37: // 失败回退逻辑 Func1
38: final Func1<Throwable, Observable> handleFallback = new Func1<Throwable, Observable>() {
39: @Override
40: public Observable call(Throwable t){
41: circuitBreaker.markNonSuccess();
42: Exception e = getExceptionFromThrowable(t);
43: executionResult = executionResult.setExecutionException(e);
44: if (e instanceof RejectedExecutionException) {
45: return handleThreadPoolRejectionViaFallback(e);
46: } else if (t instanceof HystrixTimeoutException) {
47: return handleTimeoutViaFallback();
48: } else if (t instanceof HystrixBadRequestException) {
49: return handleBadRequestByEmittingError(e);
50: } else {
51: /*
52: * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
53: */
54: if (e instanceof HystrixBadRequestException) {
55: eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
56: return Observable.error(e);
57: }
58: 
59: return handleFailureViaFallback(e);
60: }
61: }
62: };
63: 
64: // TODO 【2008】【请求缓存】
65: final Action1<Notification> setRequestContext = new Action1<Notification>() {
66: @Override
67: public void call(Notification rNotification){
68: setRequestContextIfNeeded(currentRequestContext);
69: }
70: };
71: 
72: Observable execution;
73: if (properties.executionTimeoutEnabled().get()) {
74: execution = executeCommandWithSpecifiedIsolation(_cmd)
75: .lift(new HystrixObservableTimeoutOperator(_cmd)); // 超时
76: } else {
77: execution = executeCommandWithSpecifiedIsolation(_cmd);
78: }
79: 
80: return execution.doOnNext(markEmits)
81: .doOnCompleted(markOnCompleted)
82: .onErrorResumeNext(handleFallback)
83: .doOnEach(setRequestContext);
84: }
  • 第 3 行 :TODO 【2012】【请求上下文】
  • 第 6 至 21 行 :TODO 【2007】【executionResult】用途
  • 第 24 至 35 行 :TODO 【2007】【executionResult】用途
  • 第 38 至 62 行 :失败回退逻辑 Func1 ,在 《Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑》 详细解析。
  • 第 65 至 70 行 :TODO 【2012】【请求上下文】
  • 第 72 至 78 行 :调用 #executeCommandWithSpecifiedIsolation(...) 方法,获得【执行命令 Observable】,在「5. #executeCommandWithSpecifiedIsolation(…)」详细解析。
  • 第 80 至 83 行 :返回【执行命令 Observable】。

5. #executeCommandWithSpecifiedIsolation(…)

调用 #executeCommandWithSpecifiedIsolation(...) 方法,获得【执行命令 Observable】。代码如下 :

 1: private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd){
 2: if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
 3: // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
 4: return Observable.defer(new Func0<Observable>() {
 5: @Override
 6: public Observable call(){
 7: 
 8: // 标记 executionResult 执行已发生
 9: executionResult = executionResult.setExecutionOccurred();
 10: 
 11: // 设置 commandState 为 USER_CODE_EXECUTED
 12: if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
 13: return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
 14: }
 15: 
 16: // TODO 【2002】【metrics】
 17: metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
 18: 
 19: // TODO 【2009】【执行超时】
 20: if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
 21: // the command timed out in the wrapping thread so we will return immediately
 22: // and not increment any of the counters below or other such logic
 23: return Observable.error(new RuntimeException("timed out before executing run()"));
 24: }
 25: 
 26: // 设置 线程状态 为 ThreadState.STARTED
 27: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
 28: // TODO 【2002】【metrics】
 29: //we have not been unsubscribed, so should proceed
 30: HystrixCounters.incrementGlobalConcurrentThreads();
 31: threadPool.markThreadExecution();
 32: 
 33: // TODO 【2010】【endCurrentThreadExecutingCommand】
 34: // store the command that is being run
 35: endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
 36: 
 37: // 标记 executionResult 使用线程执行
 38: executionResult = executionResult.setExecutedInThread();
 39: /**
40: * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
41: */
 42: try {
 43: // TODO 【2003】【HOOK】
 44: executionHook.onThreadStart(_cmd);
 45: executionHook.onRunStart(_cmd);
 46: executionHook.onExecutionStart(_cmd);
 47: 
 48: // 获得 执行Observable
 49: return getUserExecutionObservable(_cmd);
 50: } catch (Throwable ex) {
 51: return Observable.error(ex);
 52: }
 53: } else {
 54: //command has already been unsubscribed, so return immediately
 55: return Observable.empty();
 56: }
 57: }
 58: }).doOnTerminate(new Action0() {
 59: @Override
 60: public void call(){
 61: if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
 62: handleThreadEnd(_cmd);
 63: }
 64: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
 65: //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
 66: }
 67: //if it was unsubscribed, then other cleanup handled it
 68: }
 69: }).doOnUnsubscribe(new Action0() {
 70: @Override
 71: public void call(){
 72: if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
 73: handleThreadEnd(_cmd);
 74: }
 75: if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
 76: //if it was never started and was cancelled, then no need to clean up
 77: }
 78: //if it was terminal, then other cleanup handled it
 79: }
 80: }).subscribeOn(threadPool.getScheduler(new Func0() { // TODO 芋艿:Scheduler
 81: @Override
 82: public Boolean call(){
 83: return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
 84: }
 85: }));
 86: } else {
 87: return Observable.defer(new Func0<Observable>() {
 88: @Override
 89: public Observable call(){
 90: // 标记 executionResult 执行已发生
 91: executionResult = executionResult.setExecutionOccurred();
 92: 
 93: // 设置 commandState 为 USER_CODE_EXECUTED
 94: if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
 95: return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
 96: }
 97: 
 98: // TODO 【2002】【metrics】
 99: metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
100: 
101: // TODO 【2010】【endCurrentThreadExecutingCommand】
102: // semaphore isolated
103: // store the command that is being run
104: endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
105: try {
106: // TODO 【2003】【HOOK】
107: executionHook.onRunStart(_cmd);
108: executionHook.onExecutionStart(_cmd);
109: 
110: // 获得 执行Observable
111: return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
112: } catch (Throwable ex) {
113: //If the above hooks throw, then use that as the result of the run method
114: return Observable.error(ex);
115: }
116: }
117: });
118: }
119: }
  • 根据 执行隔离策略 不同,创建 不同 的【执行命令 Observable】。仔细对比下,大体逻辑都是相同的,差别在于 执行隔离策略Thread 时,使用 RxJava Scheduler 以及对 线程 的处理。
  • 第 2 至 85 行 : 执行隔离策略Thread

    • 第 9 行 :标记 executionResult 执行已发生。
    • 第 12 至 14 行 :设置 commandStateUSER_CODE_EXECUTED 。若设置失败,调用 Observable#error(Exception) 方法返回 Observable 。
    • 第 17 行 :TODO 【2002】【metrics】
    • 第 20 至 24 行 :TODO 【2009】【执行超时】
    • 第 27 行 :设置 threadStateThreadState.STARTED 成功。
      • 第 30 至 31 行 :TODO 【2002】【metrics】
      • 第 35 行 :TODO 【2010】【endCurrentThreadExecutingCommand】
      • 第 38 行 :标记 executionResult 使用 线程 执行。
      • 第 44 至 46 行 :TODO 【2003】【HOOK】
      • 第 49 行 :调用 #getUserExecutionObservable(...) 方法, 创建 【执行命令 Observable】。
      • 第 50 至 52 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。
    • 第 53 至 56 行 :设置 threadStateThreadState.STARTED 失败,执行命令此时已经被 取消 ,调用 Observable#empty() 方法返回 Observable 。
    • 第 58 至 68 行 :调用 Observable#doOnTerminate(...) 方法,添加 Action0 。 #handleThreadEnd(...) 方法,点击 链接 查看。
    • 第 69 至 79 行 :调用 Observable#doOnUnsubscribe(...) 方法,添加 Action0 。
    • 第 80 至 85 行 :调用 Observable#subscribeOn(Scheduler) 方法,指定 Observable 自身 在哪个调度器上执行。
  • 第 86 至 118 行 : 执行隔离策略SEMAPHORE

    • 第 91 行 :[ 与第 9 行 相同 ]。
    • 第 94 至 96 行 :[ 与第 12 至 14行 相同 ]。
    • 第 99 行 :[ 与第 17 行 类似 ]。
    • 第 104 行 :[ 与第 35 行 相同 ]。
    • 第 107 至 108 行 :[ 与第 45 至 46 行 相同 ]。
    • 第 111 行 :[ 与第 49 行 相同 ]。
    • 第 112 至 115 行 :[ 与第 50 至 52 行 相同 ]。

6. #getUserExecutionObservable(…)

调用 #getUserExecutionObservable(...) 方法,创建【执行命令 Observable】。代码如下 :

 1: private Observable getUserExecutionObservable(final AbstractCommand _cmd){
 2: Observable userObservable;
 3: 
 4: try {
 5: userObservable = getExecutionObservable();
 6: } catch (Throwable ex) {
 7: // the run() method is a user provided implementation so can throw instead of using Observable.onError
 8: // so we catch it here and turn it into Observable.error
 9: userObservable = Observable.error(ex);
10: }
11: 
12: return userObservable
13: .lift(new ExecutionHookApplication(_cmd)) // TODO 【2003】【HOOK】
14: .lift(new DeprecatedOnRunHookApplication(_cmd)); // 已废弃
15: }
  • 第 5 行 :调用 #getExecutionObservable() 方法,创建【执行命令 Observable】。 #getExecutionObservable() 是个 抽象 方法,代码如下 :

    protected abstract Observable getExecutionObservable();
    
    • HystrixCommand 实现了该方法,在「7. #getExecutionObservable」详细解析。
  • 第 6 至 10 行 :若发生异常,调用 Observable#error(Exception) 方法返回 Observable 。

  • 第 12 至 14 行 :返回【执行命令 Observable】。
    • 第 13 行 :TODO 【2003】【HOOK】

7. #getExecutionObservable()

调用 HystrixCommand#getExecutionObservable() 方法,创建【执行命令 Observable】。代码如下 :

 1: @Override
 2: final protected Observable getExecutionObservable(){
 3: return Observable.defer(new Func0<Observable>() {
 4: @Override
 5: public Observable call(){
 6: try {
 7: return Observable.just(run());
 8: } catch (Throwable ex) {
 9: return Observable.error(ex);
10: }
11: }
12: }).doOnSubscribe(new Action0() {
13: @Override
14: public void call(){
15: // 记录 执行线程
16: // Save thread on which we get subscribed so that we can interrupt it later if needed
17: executionThread.set(Thread.currentThread());
18: }
19: });
20: }
21: 
22: protected abstract R run() throws Exception;
  • 第 3 至 11 行 :调用 Observable#defer(Func0<Observable) 方法,创建【执行命令 Observable】。
    • 第 7 行 :调用 #run() 方法, 运行正常执逻辑 。通过 Observable#just(...) 方法,返回创建【执行命令 Observable】。
  • 第 12 至 19 行 :调用 #doOnSubscribe(...) 方法,添加 Action 。该操作记录执行线程( executionThread ) 。 executionThread 用于 HystrixCommand#queue() 方法,返回的 Future 结果,可以调用 Future#cancel(Boolean) 方法,点击 链接 查看该方法。
  • 第 22 行 : #run() 抽象 方法,实现该方法, 运行正常执逻辑

8. CommandState

com.netflix.hystrix.AbstractCommand.CommandState ,命令状态,代码如下 :

protected enum CommandState {
 NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL
}

状态变迁如下图 :

9. ThreadState

com.netflix.hystrix.AbstractCommand.ThreadState ,线程状态,代码如下 :

protected enum ThreadState {
 NOT_USING_THREAD, STARTED, UNSUBSCRIBED, TERMINAL
}

状态变迁如下图 :

666. 彩蛋

对 Hystrix 和 RxJava 慢慢更有感觉了。

柳暗花明又一村。

继续加油!

胖友,分享一波朋友圈可好!

极客头条

责编内容by:极客头条 (源链)。感谢您的支持!

您可能感兴趣的

阿里Dubbo疯狂更新,关Spring Cloud什么事?_技术栈微信半月刊第07期... 阿里Dubbo疯狂更新,关Spring Cloud什么事?_技术栈微信半月刊第07期 51CTO技术栈微信半月刊第三期,为您分享最热门,最前沿关于开发架构、系统运维、大数据、人工智能等一线技术解析和实践案例等深度干货文章,愿我们一起悦享技术,成就CTO梦想!欢迎订阅! ...
Spring Cloud(八)高可用的分布式配置中心 Spring Cloud Config... 在分布式系统中,由于服务数量巨多,为了方便服务配置文件统一管理,实时更新,所以需要分布式配置中心组件。在Spring Cloud中,有分布式配置中心组件spring cloud config ,它支持配置服务放在配置服务的内存中(即本地),也支持放在远程Git仓库中。在spring cloud co...
Microservices and Spring Cloud Config Server With microservices , we create a central config server where all configurable parameters of micro-services are written version controlled. The benefi...
Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失(续)... 上篇文章 《Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失》 我们对ThreadLocal数据丢失进行了详细的分析,并通过代码的方式复现了这个问题。 在上篇文章的末尾我也说了思路给大家提供了,如果需要能够在Hystrix 为线程隔离模式也能正确传...
几种常见的微服务架构方案——ZeroC IceGrid、Spring Cloud、基于消息队列、Do... 微服务架构是当前很热门的一个概念,它不是凭空产生的,是技术发展的必然结果。虽然微服务架构没有公认的技术标准和规范草案,但业界已经有一些很有影响力的开源微服务架构平台,架构师可以根据公司的技术实力并结合项目的特点来选择某个合适的微服务架构平台,以此稳妥地实施项目的微服务化改造或开发进程。 本文盘点...