H5W3
当前位置:H5W3 > java > 正文

【Java】hystrix – Fallback是怎么调用的

hystrix – Fallback是怎么调用的

大军发布于 今天 15:18

调用Fallback有以下几种情况:

  • 熔断器开启
  • 信号量拒绝
  • 线程池拒绝
  • 执行方法失败

hystrix – @EnableCircuitBreaker那些事我们知道最终会调用HystrixCommand的execute方法,他这个方法就会调用queue方法。

public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}

queue方法如下,这里是拿到一个Future对象。重点还是toObservable方法。

 public Future<R> queue() {
// 其他略
final Future<R> delegate = toObservable().toBlocking().toFuture();
// 其他略
}

这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以后面就会调用applyHystrixSemantics方法。

public Observable<R> toObservable() {
//其他略
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
//其他略
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//其他略
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}

熔断判断

这里是比较重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback方法,如果没有开启熔断,他还会去判断是否能请求到信号量,请求不到就调用handleSemaphoreRejectionViaFallback方法。如果都正常,就调用executeCommandAndObserve方法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//其他略
if (circuitBreaker.allowRequest()) {
//其他略
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
//
return handleShortCircuitViaFallback();
}
}

熔断开启

我们先看看熔断开启后的调用,会通过getFallbackObservable方法获取fallbackExecutionChain,getFallbackObservable主要的作用就是调用getFallback方法。

private Observable<R> handleShortCircuitViaFallback() {
// 其他略
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
// 其他略
}
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// 其他略
fallbackExecutionChain = getFallbackObservable();
// 其他略
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
}
// 其他略
}
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}

getFallback是调用fallback方法的入口,MethodExecutionAction会通过发射,调用我们配置的方法。

@Override
protected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}

信号量隔离

我们看了上面熔断开启的代码,这边就比较简单,就是跟上面一样,调用getFallbackOrThrowException方法。

private Observable<R> handleSemaphoreRejectionViaFallback() {
// 其他略
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException);
}

线程池隔离

如果熔断没开,信号量又可以获取到,他就会调用executeCommandAndObserve方法。这个方法,handleFallback定义了几个异常,比如线程池异常处理,时间超时处理,请求异常处理,以及其他异常处理。然后调用executeCommandWithSpecifiedIsolation方法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//其他略
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
//其他略
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

这个方法主要是两件事情,一个是subscribeOn方法,这里是线程池隔离用的,另外一个就是正常情况下的调用,调用的是getUserExecutionObservable方法,这个方法在线程池后面讲。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 其他略
return getUserExecutionObservable(_cmd);
// 其他略
}
})// 其他略
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 其他略
}

创建相关类

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 动态配置线程池信息
touchConfig();
// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

这里创建一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创建ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 动态配置线程池信息
touchConfig();
// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

线程池判断的地方在HystrixContextSchedulerWorker的schedule方法,不够就抛RejectedExecutionException异常,异常的捕获上面已经讲了。

public Subscription schedule(Action0 action) {
if (threadPool != null) {
// 判断线程池线程的地方
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}

正常调用

如果上面都正常,就会调用getUserExecutionObservable方法。这个方法最后会调用run方法。

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
//其他略
userObservable = getExecutionObservable();
//其他略
}
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
})//其他略
}

在这里,同样通过反射,调用到我们的方法。

protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}

总结

上面讲了几个fallback调用的方法,以及正常的流程,流程图如下
【Java】hystrix - Fallback是怎么调用的

java源码分析hystrix
阅读 56发布于 今天 15:18
本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
avatar

大军

运气好的时候要拼命努力,运气不好的时候要储备能力

370 声望
29 粉丝

0 条评论
得票时间

avatar

大军

运气好的时候要拼命努力,运气不好的时候要储备能力

370 声望
29 粉丝

宣传栏

调用Fallback有以下几种情况:

  • 熔断器开启
  • 信号量拒绝
  • 线程池拒绝
  • 执行方法失败

hystrix – @EnableCircuitBreaker那些事我们知道最终会调用HystrixCommand的execute方法,他这个方法就会调用queue方法。

public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}

queue方法如下,这里是拿到一个Future对象。重点还是toObservable方法。

 public Future<R> queue() {
// 其他略
final Future<R> delegate = toObservable().toBlocking().toFuture();
// 其他略
}

这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以后面就会调用applyHystrixSemantics方法。

public Observable<R> toObservable() {
//其他略
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
//其他略
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//其他略
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}

熔断判断

这里是比较重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback方法,如果没有开启熔断,他还会去判断是否能请求到信号量,请求不到就调用handleSemaphoreRejectionViaFallback方法。如果都正常,就调用executeCommandAndObserve方法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//其他略
if (circuitBreaker.allowRequest()) {
//其他略
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
//
return handleShortCircuitViaFallback();
}
}

熔断开启

我们先看看熔断开启后的调用,会通过getFallbackObservable方法获取fallbackExecutionChain,getFallbackObservable主要的作用就是调用getFallback方法。

private Observable<R> handleShortCircuitViaFallback() {
// 其他略
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
// 其他略
}
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// 其他略
fallbackExecutionChain = getFallbackObservable();
// 其他略
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
}
// 其他略
}
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}

getFallback是调用fallback方法的入口,MethodExecutionAction会通过发射,调用我们配置的方法。

@Override
protected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}

信号量隔离

我们看了上面熔断开启的代码,这边就比较简单,就是跟上面一样,调用getFallbackOrThrowException方法。

private Observable<R> handleSemaphoreRejectionViaFallback() {
// 其他略
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException);
}

线程池隔离

如果熔断没开,信号量又可以获取到,他就会调用executeCommandAndObserve方法。这个方法,handleFallback定义了几个异常,比如线程池异常处理,时间超时处理,请求异常处理,以及其他异常处理。然后调用executeCommandWithSpecifiedIsolation方法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//其他略
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
//其他略
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

这个方法主要是两件事情,一个是subscribeOn方法,这里是线程池隔离用的,另外一个就是正常情况下的调用,调用的是getUserExecutionObservable方法,这个方法在线程池后面讲。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 其他略
return getUserExecutionObservable(_cmd);
// 其他略
}
})// 其他略
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 其他略
}

创建相关类

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 动态配置线程池信息
touchConfig();
// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

这里创建一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创建ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// 动态配置线程池信息
touchConfig();
// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

线程池判断的地方在HystrixContextSchedulerWorker的schedule方法,不够就抛RejectedExecutionException异常,异常的捕获上面已经讲了。

public Subscription schedule(Action0 action) {
if (threadPool != null) {
// 判断线程池线程的地方
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}

正常调用

如果上面都正常,就会调用getUserExecutionObservable方法。这个方法最后会调用run方法。

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
//其他略
userObservable = getExecutionObservable();
//其他略
}
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
})//其他略
}

在这里,同样通过反射,调用到我们的方法。

protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}

总结

上面讲了几个fallback调用的方法,以及正常的流程,流程图如下
【Java】hystrix - Fallback是怎么调用的

本文地址:H5W3 » 【Java】hystrix – Fallback是怎么调用的

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址