Hystrix工作原理三
异常处理
Hystrix异常类型
- HystrixRuntimeException
- HystrixBadRequestException
- HystrixTimeoutException
- RejectedExecutionException
HystrixRuntimeException
HystrixCommand
失败时抛出, 不会触发fallback.
HystrixBadRequestException
用提供的参数或状态表示错误的异常, 而不是执行失败. 与其他HystrixCommand
抛出的异常不同, 这个异常不会触发fallback
, 也不会记录进failure
的指标, 因而也不会触发断路器,
应该在用户输入引起的错误是抛出, 否则会它与容错和后退行为的目的相悖.
不会触发fallback, 也不会记录到错误的指标中, 也不会触发断路器.
RejectedExecutionException
线程池发生reject
时抛出
HystrixTimeoutException
在HystrixCommand.run()
或者HystrixObservableCommand.construct()
时抛出, 会记录timeout
的次数. 如果希望某些类型的失败被记录为timeout
, 应该将这些类型的失败包装为HystrixTimeoutException
异常处理
ignoreExceptions
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
Feign中响应状态码处理
Feign使用SynchronousMethodHandler
做请求的执行和响应的处理. 响应处理的部分, 对[200, 300)
区间的状态, 会将response返回; 如果是404
, 根据@FeignClient
中decode404
(默认为false)和方法返回值判断是否熔断, 如果响应返回404
, decode
为false
, 同时方法返回值不是void
, 会包装成FeignException
抛出; 其他的状态, 通过包装成FeignException
抛出.
FeignException
是RuntimeException
的实现, 如果没有ignore的话, 会计入熔断器的计算中.
final class SynchronousMethodHandler implements MethodHandler {
Object executeAndDecode(RequestTemplate template) throws Throwable {
...
if (response.status() >= 200 && response.status() < 300) {
if (void.class == metadata.returnType()) {
return null;
} else {
return decode(response);
}
} else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
return decode(response);
} else {
throw errorDecoder.decode(metadata.configKey(), response);
}
...
}
}
Ribbon中响应状态码处理
在Zuul中, 路由使用Ribbon
做负载均衡, 同时使用Hystrix
做断路器, 使用RibbonCommand
接口的实现. RibbonCommand
的实现并没有对响应编码封装异常, 因此也不会触发熔断器.
AbstractRibbonCommand
是RibbonCommand
的抽象实现, 所有其他实现的父类. 核心run()
方法并没有针对响应编码重新封装异常.
public abstract class AbstractRibbonCommand<LBC extends AbstractLoadBalancerAwareClient<RQ, RS>, RQ extends ClientRequest, RS extends HttpResponse>
extends HystrixCommand<ClientHttpResponse> implements RibbonCommand {
...
@Override
protected ClientHttpResponse run() throws Exception {
final RequestContext context = RequestContext.getCurrentContext();
RQ request = createRequest();
RS response;
boolean retryableClient = this.client instanceof AbstractLoadBalancingClient
&& ((AbstractLoadBalancingClient)this.client).isClientRetryable((ContextAwareRequest)request);
if (retryableClient) {
response = this.client.execute(request, config);
} else {
response = this.client.executeWithLoadBalancer(request, config);
}
context.set("ribbonResponse", response);
// Explicitly close the HttpResponse if the Hystrix command timed out to
// release the underlying HTTP connection held by the response.
//
if (this.isResponseTimedOut()) {
if (response != null) {
response.close();
}
}
return new RibbonHttpResponse(response);
}
...
}
Observable.error(ex)
会捕获run()
方法抛出的异常.
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
...
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
...
}
Hystrix 超时处理
在Hystrix版本1.4之前, Seamphore策略是不支持超时的. 目前spring-cloud-netflix
的1.4.4中使用的是1.5.12
如果开启了timeout, HystrixCommand会lift
一个HystrixObservableTimeoutOperator
到Observable
中.
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
...
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
}
这个HystrixObservableTimeoutOperator
会添加注册TimeListener
. TimeListener
是以tick
的方式运行, 即启动一个线程延迟executionTimeoutInMilliseconds
运行, 然后每次在executionTimeoutInMilliseconds + n * executionTimeoutInMilliseconds
时运行.
如果判断操作超时? 看tick
方法的实现, 线程每次运行时, 尝试修改Command的状态从NOT_EXECUTED
到TIMED_OUT
. 如果成功, 说明运行超时. 最后抛出HystrixTimeoutException
异常, 被handleFallback
处理.
// if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
// otherwise it means we lost a race and the run() execution completed or did not start
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// report timeout failure
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// shut down the original request
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
timeoutRunnable.run();
//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
}