LoadBalancer 负载均衡
通过Eureka服务器进行服务注册与发现, 那么现在来看看,它的负载均衡到底是如何实现的,实际上之前演示的负载均衡是依靠LoadBalancer实现的。
在2020年前的SpringCloud版本是采用Ribbon作为负载均衡实现,但是2020年的版本之后SpringCloud把Ribbon移除了,进而用自己编写的LoadBalancer替代。
那么,负载均衡是如何进行的呢?
负载均衡
实际上,在添加@LoadBalanced
注解之后,会启用拦截器对我们发起的服务调用请求进行拦截(注意这里是针对我们发起的请求进行拦截), 叫做LoadBalancerInterceptor
,它实现ClientHttpRequestInterceptor
接口:
java
@FunctionalInterface
public interface ClientHttpRequestInterceptor {
ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException;
}
1
2
3
4
5
2
3
4
5
java
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
LoadBalancerInterceptor 调用会使用LoadBalancerClient 的execute 方法,而他的实现是 BlockingLoadBalancerClient
java
public class BlockingLoadBalancerClient implements LoadBalancerClient {
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
String hint = this.getHint(serviceId);
LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter(request, new DefaultRequestContext(request, hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId);
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStart(lbRequest);
});
ServiceInstance serviceInstance = this.choose(serviceId, lbRequest);
if (serviceInstance == null) {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, new EmptyResponse()));
});
throw new IllegalStateException("No instances available for " + serviceId);
} else {
return this.execute(serviceId, serviceInstance, lbRequest);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ServiceInstance serviceInstance = this.choose(serviceId, lbRequest);
可以看到在这里会调用choose方法自动获取对应的服务实例信息,所以,实际上在进行负载均衡的时候,会向Eureka发起请求,选择一个可用的对应服务,然后会返回此服务的主机地址等信息。 然后逐层返回。
自定义的策略
自定义负载均衡策略
LoadBalancer默认提供了两种负载均衡策略:
- RandomLoadBalancer - 随机分配策略
- (默认) RoundRobinLoadBalancer - 轮询分配策略
现在我们希望修改默认的负载均衡策略,可以进行指定,比如我们现在希望用户服务采用随机分配策略,我们需要先创建随机分配策略的配置类(不用加@Configuration
):
java
public class LoadBalancerConfig {
//将官方提供的 RandomLoadBalancer 注册为Bean
@Bean
public ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RandomLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
接着我们需要为对应的服务指定负载均衡策略,直接使用注解即可:
java
@Configuration
@LoadBalancerClient(value = "userservice", //指定为 userservice 服务,只要是调用此服务都会使用我们指定的策略
configuration = LoadBalancerConfig.class) //指定我们刚刚定义好的配置类
public class BeanConfig {
@Bean
@LoadBalanced
RestTemplate template() {
return new RestTemplate();
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
拓展
灰度版本的负载均衡如何实现?
OpenFeign实现负载均衡
官方文档:https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/
Feign和RestTemplate一样,也是HTTP客户端请求工具,但是它的使用方式更加便捷。首先是依赖:
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
1
2
3
4
5
2
3
4
5
接着在启动类添加@EnableFeignClients
注解:
java
@SpringBootApplication
@EnableFeignClients
public class BorrowApplication {
public static void main(String[] args) {
SpringApplication.run(BorrowApplication.class, args);
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
那么现在我们需要调用其他微服务提供的接口,该怎么做呢?我们直接创建一个对应服务的接口类即可:
java
@FeignClient("service-user")
public interface UserClient {
//路径保证和其他微服务提供的一致即可
@GetMapping("/user/{uid}")
User getUserById(@PathVariable("uid") int uid); //参数和返回值也保持一致
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
接着我们直接注入使用
java
class BorrowServiceImpl {
@Resource
UserClient userClient;
@Override
public UserBorrowDetail getUserBorrowDetailByUid(int uid) {
List<Borrow> borrow = mapper.getBorrowsByUid(uid);
User user = userClient.getUserById(uid);
//这里不用再写IP,直接写服务名称bookservice
List<Book> bookList = borrow
.stream()
.map(b -> template.getForObject("http://bookservice/book/" + b.getBid(), Book.class))
.collect(Collectors.toList());
return new UserBorrowDetail(user, bookList);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
访问,可以看到结果依然是正确的,并且我们可以观察一下两个用户微服务的调用情况,也是以负载均衡的形式进行的。
原理
Feign 的实现其实也是通过动态代理实现的,所以我们观察ReflectiveFeign
这个类,他的newInstance()
方法帮助我们生成代理类, 而代理增强类FeignInvocationHandler
的invoke()
方法去是先方法的调用。
java
class FeignInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
return dispatch.get(method).invoke(args);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
dispatch.get(method)
返回的是一个 SynchronousMethodHandler
调用的invoke 方法会低啊用executeAndDecode 方法。 这个方法里封装的实际执行和响应解码。
java
class SynchronousMethodHandler {
@Override
public Object invoke(Object[] argv) throws Throwable {
// 封装请求
RequestTemplate template = buildTemplateFromArgs.create(argv);
// 读写超时的一些配置
Options options = findOptions(argv);
// 重试的机制
Retryer retryer = this.retryer.clone();
while (true) {
try {
// 关键
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
关于这个 client 就是我们的请求客户端 FeignBlockingLoadBalancerClient
java
class SynchronousMethodHandler {
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template);
// 是否打印日志
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
// 重点
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (decoder != null)
return decoder.decode(response, metadata.returnType());
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);
try {
if (!resultFuture.isDone())
throw new IllegalStateException("Response handling not done");
return resultFuture.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
那这个FeignBlockingLoadBalancerClient
干什么呢,看名字就是负载均衡客户端,点进去看一下 发现,里面有一个loadBalancerClient 其实就是和之前的SpringCloudLoadBalancer
一样, 是一个BlockingLoadBalancerClient
,使用它去帮我们的feignClient 和之前一样选择一个实例, 然后告诉FeignClient 在调用
java
public class FeignBlockingLoadBalancerClient implements Client {
private static final Log LOG = LogFactory.getLog(FeignBlockingLoadBalancerClient.class);
private final Client delegate;
private final LoadBalancerClient loadBalancerClient;
private final LoadBalancerClientFactory loadBalancerClientFactory;
@Override
public Response execute(Request request, Request.Options options) throws IOException {
final URI originalUri = URI.create(request.url());
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
String hint = getHint(serviceId);
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
new RequestDataContext(buildRequestData(request), hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 使用SpringCloud 的 BlockingLoadBalancer 选择一个实例
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
instance);
if (instance == null) {
String message = "Load balancer does not contain an instance for the service " + serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
.body(message, StandardCharsets.UTF_8).build();
}
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
Request newRequest = buildRequest(request, reconstructedUrl);
// feignClient 去调用
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
supportedLifecycleProcessors);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
最后调用feignClient 的 Default实现类的 execute()
还是默认的 HttpURLConnection
java
class Default implements Client {
@Override
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection, request);
}
HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
final URL url = new URL(request.url());
final HttpURLConnection connection = this.getConnection(url);
if (connection instanceof HttpsURLConnection) {
HttpsURLConnection sslCon = (HttpsURLConnection) connection;
if (sslContextFactory != null) {
sslCon.setSSLSocketFactory(sslContextFactory);
}
if (hostnameVerifier != null) {
sslCon.setHostnameVerifier(hostnameVerifier);
}
}
connection.setConnectTimeout(options.connectTimeoutMillis());
connection.setReadTimeout(options.readTimeoutMillis());
connection.setAllowUserInteraction(false);
connection.setInstanceFollowRedirects(options.isFollowRedirects());
connection.setRequestMethod(request.httpMethod().name());
Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
boolean gzipEncodedRequest = this.isGzip(contentEncodingValues);
boolean deflateEncodedRequest = this.isDeflate(contentEncodingValues);
boolean hasAcceptHeader = false;
Integer contentLength = null;
for (String field : request.headers().keySet()) {
if (field.equalsIgnoreCase("Accept")) {
hasAcceptHeader = true;
}
for (String value : request.headers().get(field)) {
if (field.equals(CONTENT_LENGTH)) {
if (!gzipEncodedRequest && !deflateEncodedRequest) {
contentLength = Integer.valueOf(value);
connection.addRequestProperty(field, value);
}
} else {
connection.addRequestProperty(field, value);
}
}
}
// Some servers choke on the default accept string.
if (!hasAcceptHeader) {
connection.addRequestProperty("Accept", "*/*");
}
if (request.body() != null) {
if (disableRequestBuffering) {
if (contentLength != null) {
connection.setFixedLengthStreamingMode(contentLength);
} else {
connection.setChunkedStreamingMode(8196);
}
}
connection.setDoOutput(true);
OutputStream out = connection.getOutputStream();
if (gzipEncodedRequest) {
out = new GZIPOutputStream(out);
} else if (deflateEncodedRequest) {
out = new DeflaterOutputStream(out);
}
try {
out.write(request.body());
} finally {
try {
out.close();
} catch (IOException suppressed) { // NOPMD
}
}
}
return connection;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79