bboyjing's blog

跟开涛学架构四【Hystrix的使用】

Hystrix是Netflix开源的一款针对分布式系统的延迟和容错库,目的是用来隔离分布式服务故障。它是通过线程池和信号量来是实现的,现在没有概念没关系,先来看怎么用吧。

Hystrix简单使用

我们在real_server_1项目上做测试,如下两步即可完成:

  1. 添加hystrix依赖:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    <repositories>
    <repository>
    <id>netflix</id>
    <name>netflix oss</name>
    <url>https://dl.bintray.com/netflixoss/maven/</url>
    </repository>
    </repositories>
    <dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>1.5.13</version>
    </dependency
  2. 新建类测试类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class HelloWorldCommand extends HystrixCommand<String> {
    private final String name;
    public HelloWorldCommand(String name) {
    super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
    this.name = name;
    }
    @Override
    protected String run() throws Exception {
    return "Hello " + name +" thread:" + Thread.currentThread().getName();
    }
    public static void main(String[] args) throws Exception {
    /**
    * Command对象的使用有如下三种方式
    * 注:每个Command对象不是单例的,不能重用
    */
    // 同步调用
    HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
    System.out.println("result=" + helloWorldCommand.execute());
    // Future方式的异步调用
    helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
    Future<String> future = helloWorldCommand.queue();
    System.out.println("result=" + future.get());
    // RxJava响应式
    helloWorldCommand = new HelloWorldCommand("RxJava-hystrix");
    Observable<String> observable = helloWorldCommand.observe();
    observable.asObservable().subscribe(result -> System.out.println("result=" + result));
    System.out.println("mainThread=" + Thread.currentThread().getName());
    }
    }

看下HelloWorldCommand类的输出:

1
2
3
4
result=Hello Synchronous-hystrix thread:hystrix-ExampleGroup-1
result=Hello Asynchronous-hystrix thread:hystrix-ExampleGroup-2
result=Hello RxJava-hystrix thread:hystrix-ExampleGroup-3
mainThread=main

从输出可以看出,线程池的概念已经很明显了。HelloWorldCommand类可以理解为hystrix的执行单元,
继承自HystrixCommand,具体的处理逻辑在run方法中。该Command的执行(run方法的调用)是在单独的线程中,由hystrix管理。还有核心的降级、熔断等处理,这里就不演示了,推荐个博客给各位吧。

引入Hystrix到项目中

用过Sping Cloud的同学知道,它已经集成了Netflix的一套服务化治理的工具,这里我们不准备采用其starter,而是通过自己配置来熟悉。下面以real_server_1为例:

  1. 注释掉ConsulConfiguration.java的@Configuration,因为这里用不上。
  2. 添加处理类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Component
    @Scope("prototype")
    public class RealServiceCommand extends HystrixCommand<String> {
    protected RealServiceCommand() {
    super(HystrixCommandGroupKey.Factory.asKey("RealServiceGroup"));
    }
    @Override
    protected String run() throws Exception {
    return "Thread:" + Thread.currentThread().getName() + " is handling...";
    }
    }
    @Scope("prototype")
    public class RealServerController {
    @Autowired
    private RealServiceCommand realServiceCommand;
    ...
    }

    这样就配置好了,有一点要注意的是,RealServiceCommand类和RealServerController类都要配成prototype,这样感觉有点恶心,回头看看springboot是如何优雅地解决的。

上面配置了最基本的实现方式,下面来试着处理下降级和熔断。降级是通过重写getFallback()方法实现的,简单点来说就是当run()方法执行出现某些错误时,将跳转到getFallback()相关逻辑。熔断机制相当于电路的跳闸功能,意思就是当调用该Command类请求错误比例达到指定阈值时,直接调用getFallback()方法。先来配置下降级处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@Scope("prototype")
public class RealServiceCommand extends HystrixCommand<String> {
...
@Override
protected String run() throws Exception {
System.out.println("Thread:" + Thread.currentThread().getName() + " is handling...");
throw new RuntimeException();
//return "Thread:" + Thread.currentThread().getName() + " is handling...";
}
@Override
protected String getFallback() {
return "Thread:" + Thread.currentThread().getName() + " fallback.";
}
}

来发个请求测试下:

1
2
3
> curl localhost:8081
from real server 1.Thread:hystrix-RealServiceGroup-2 fallback.%
# 同时服务端打印出Thread:hystrix-RealServiceGroup-1 is handling...

从返回的数据也可以看出降级的意思,当请求的服务(run())不可用时,选择另一个替代方案(getFallback())。再来看下熔断的配置:

1
2
3
4
5
6
7
8
9
10
11
12
public class RealServiceCommand extends HystrixCommand<String> {
protected RealServiceCommand() {
super(HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RealServiceGroup"))
.andCommandPropertiesDefaults(
// 10s内失败请求数超过1个时熔断器开始生效
HystrixCommandProperties.Setter().withCircuitBreakerRequestVolumeThreshold(1)
)
);
}
...
}

自行测试下,当连续发两个请求时,第二个请求将直接进入到getFallback()方法,起到了熔断的效果。
本章节我们学习了Hystrix的皮毛,路还很长啊,再接再厉吧。