Skip to content
On this page

LoadBalancer 负载均衡

通过Eureka服务器进行服务注册与发现, 那么现在来看看,它的负载均衡到底是如何实现的,实际上之前演示的负载均衡是依靠LoadBalancer实现的。

在2020年前的SpringCloud版本是采用Ribbon作为负载均衡实现,但是2020年的版本之后SpringCloud把Ribbon移除了,进而用自己编写的LoadBalancer替代。

那么,负载均衡是如何进行的呢?

负载均衡

实际上,在添加@LoadBalanced注解之后,会启用拦截器对我们发起的服务调用请求进行拦截(注意这里是针对我们发起的请求进行拦截), 叫做LoadBalancerInterceptor,它实现ClientHttpRequestInterceptor接口:

java

@FunctionalInterface
public interface ClientHttpRequestInterceptor {
    ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException;
}
1
2
3
4
5
java
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

    private LoadBalancerClient loadBalancer;

    private LoadBalancerRequestFactory requestFactory;

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
        this.loadBalancer = loadBalancer;
        this.requestFactory = requestFactory;
    }

    public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
        // for backwards compatibility
        this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
    }

    @Override
    public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
                                        final ClientHttpRequestExecution execution) throws IOException {
        final URI originalUri = request.getURI();
        String serviceName = originalUri.getHost();
        Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
        return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

LoadBalancerInterceptor 调用会使用LoadBalancerClient 的execute 方法,而他的实现是 BlockingLoadBalancerClient

java
public class BlockingLoadBalancerClient implements LoadBalancerClient {

    public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
        String hint = this.getHint(serviceId);
        LoadBalancerRequestAdapter<T, DefaultRequestContext> lbRequest = new LoadBalancerRequestAdapter(request, new DefaultRequestContext(request, hint));
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors = this.getSupportedLifecycleProcessors(serviceId);
        supportedLifecycleProcessors.forEach((lifecycle) -> {
            lifecycle.onStart(lbRequest);
        });
        ServiceInstance serviceInstance = this.choose(serviceId, lbRequest);
        if (serviceInstance == null) {
            supportedLifecycleProcessors.forEach((lifecycle) -> {
                lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, new EmptyResponse()));
            });
            throw new IllegalStateException("No instances available for " + serviceId);
        } else {
            return this.execute(serviceId, serviceInstance, lbRequest);
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

ServiceInstance serviceInstance = this.choose(serviceId, lbRequest); 可以看到在这里会调用choose方法自动获取对应的服务实例信息,所以,实际上在进行负载均衡的时候,会向Eureka发起请求,选择一个可用的对应服务,然后会返回此服务的主机地址等信息。 然后逐层返回。

自定义的策略

自定义负载均衡策略

LoadBalancer默认提供了两种负载均衡策略:

  • RandomLoadBalancer - 随机分配策略
  • (默认) RoundRobinLoadBalancer - 轮询分配策略

现在我们希望修改默认的负载均衡策略,可以进行指定,比如我们现在希望用户服务采用随机分配策略,我们需要先创建随机分配策略的配置类(不用加@Configuration):

java
public class LoadBalancerConfig {
    //将官方提供的 RandomLoadBalancer 注册为Bean
    @Bean
    public ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RandomLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
    }
}
1
2
3
4
5
6
7
8

接着我们需要为对应的服务指定负载均衡策略,直接使用注解即可:

java

@Configuration
@LoadBalancerClient(value = "userservice",      //指定为 userservice 服务,只要是调用此服务都会使用我们指定的策略
        configuration = LoadBalancerConfig.class)   //指定我们刚刚定义好的配置类
public class BeanConfig {
    @Bean
    @LoadBalanced
    RestTemplate template() {
        return new RestTemplate();
    }
}
1
2
3
4
5
6
7
8
9
10
11

拓展

灰度版本的负载均衡如何实现?

OpenFeign实现负载均衡

官方文档:https://docs.spring.io/spring-cloud-openfeign/docs/current/reference/html/

Feign和RestTemplate一样,也是HTTP客户端请求工具,但是它的使用方式更加便捷。首先是依赖:

xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
1
2
3
4
5

接着在启动类添加@EnableFeignClients注解:

java

@SpringBootApplication
@EnableFeignClients
public class BorrowApplication {
    public static void main(String[] args) {
        SpringApplication.run(BorrowApplication.class, args);
    }
}
1
2
3
4
5
6
7
8

那么现在我们需要调用其他微服务提供的接口,该怎么做呢?我们直接创建一个对应服务的接口类即可:

java

@FeignClient("service-user")
public interface UserClient {

    //路径保证和其他微服务提供的一致即可
    @GetMapping("/user/{uid}")
    User getUserById(@PathVariable("uid") int uid);  //参数和返回值也保持一致

}
1
2
3
4
5
6
7
8
9

接着我们直接注入使用

java
class BorrowServiceImpl {
    @Resource
    UserClient userClient;

    @Override
    public UserBorrowDetail getUserBorrowDetailByUid(int uid) {
        List<Borrow> borrow = mapper.getBorrowsByUid(uid);

        User user = userClient.getUserById(uid);
        //这里不用再写IP,直接写服务名称bookservice
        List<Book> bookList = borrow
                .stream()
                .map(b -> template.getForObject("http://bookservice/book/" + b.getBid(), Book.class))
                .collect(Collectors.toList());
        return new UserBorrowDetail(user, bookList);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

访问,可以看到结果依然是正确的,并且我们可以观察一下两个用户微服务的调用情况,也是以负载均衡的形式进行的。

原理

Feign 的实现其实也是通过动态代理实现的,所以我们观察ReflectiveFeign 这个类,他的newInstance() 方法帮助我们生成代理类, 而代理增强类FeignInvocationHandlerinvoke() 方法去是先方法的调用。

java
class FeignInvocationHandler implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if ("equals".equals(method.getName())) {
            try {
                Object otherHandler =
                        args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
                return equals(otherHandler);
            } catch (IllegalArgumentException e) {
                return false;
            }
        } else if ("hashCode".equals(method.getName())) {
            return hashCode();
        } else if ("toString".equals(method.getName())) {
            return toString();
        }
        return dispatch.get(method).invoke(args);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

dispatch.get(method) 返回的是一个 SynchronousMethodHandler 调用的invoke 方法会低啊用executeAndDecode 方法。 这个方法里封装的实际执行和响应解码。

java
class SynchronousMethodHandler {
    @Override
    public Object invoke(Object[] argv) throws Throwable {
        // 封装请求 
        RequestTemplate template = buildTemplateFromArgs.create(argv);
        // 读写超时的一些配置
        Options options = findOptions(argv);
        // 重试的机制
        Retryer retryer = this.retryer.clone();
        while (true) {
            try {
                // 关键
                return executeAndDecode(template, options);
            } catch (RetryableException e) {
                try {
                    retryer.continueOrPropagate(e);
                } catch (RetryableException th) {
                    Throwable cause = th.getCause();
                    if (propagationPolicy == UNWRAP && cause != null) {
                        throw cause;
                    } else {
                        throw th;
                    }
                }
                if (logLevel != Logger.Level.NONE) {
                    logger.logRetry(metadata.configKey(), logLevel);
                }
                continue;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

关于这个 client 就是我们的请求客户端 FeignBlockingLoadBalancerClient

java
class SynchronousMethodHandler {

    Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
        Request request = targetRequest(template);
        // 是否打印日志
        if (logLevel != Logger.Level.NONE) {
            logger.logRequest(metadata.configKey(), logLevel, request);
        }

        Response response;
        long start = System.nanoTime();
        try {
            // 重点
            response = client.execute(request, options);
            // ensure the request is set. TODO: remove in Feign 12
            response = response.toBuilder()
                    .request(request)
                    .requestTemplate(template)
                    .build();
        } catch (IOException e) {
            if (logLevel != Logger.Level.NONE) {
                logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
            }
            throw errorExecuting(request, e);
        }
        long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);


        if (decoder != null)
            return decoder.decode(response, metadata.returnType());

        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
        asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
                metadata.returnType(),
                elapsedTime);

        try {
            if (!resultFuture.isDone())
                throw new IllegalStateException("Response handling not done");

            return resultFuture.join();
        } catch (CompletionException e) {
            Throwable cause = e.getCause();
            if (cause != null)
                throw cause;
            throw e;
        }
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

那这个FeignBlockingLoadBalancerClient 干什么呢,看名字就是负载均衡客户端,点进去看一下 发现,里面有一个loadBalancerClient 其实就是和之前的SpringCloudLoadBalancer 一样, 是一个BlockingLoadBalancerClient ,使用它去帮我们的feignClient 和之前一样选择一个实例, 然后告诉FeignClient 在调用

java
public class FeignBlockingLoadBalancerClient implements Client {

	private static final Log LOG = LogFactory.getLog(FeignBlockingLoadBalancerClient.class);

	private final Client delegate;

	private final LoadBalancerClient loadBalancerClient;

	private final LoadBalancerClientFactory loadBalancerClientFactory;

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        final URI originalUri = URI.create(request.url());
        String serviceId = originalUri.getHost();
        Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
        String hint = getHint(serviceId);
        DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
                new RequestDataContext(buildRequestData(request), hint));
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
                .getSupportedLifecycleProcessors(
                        loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
                        RequestDataContext.class, ResponseData.class, ServiceInstance.class);
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
        // 使用SpringCloud 的 BlockingLoadBalancer 选择一个实例
        ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
        org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
                instance);
        if (instance == null) {
            String message = "Load balancer does not contain an instance for the service " + serviceId;
            if (LOG.isWarnEnabled()) {
                LOG.warn(message);
            }
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                    .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                            CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
            return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
                    .body(message, StandardCharsets.UTF_8).build();
        }
        String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
        Request newRequest = buildRequest(request, reconstructedUrl);
        // feignClient 去调用
        return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
                supportedLifecycleProcessors);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

最后调用feignClient 的 Default实现类的 execute() 还是默认的 HttpURLConnection

java

class Default implements Client {
    @Override
    public Response execute(Request request, Options options) throws IOException {
        HttpURLConnection connection = convertAndSend(request, options);
        return convertResponse(connection, request);
    }
    
    HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
        final URL url = new URL(request.url());
        final HttpURLConnection connection = this.getConnection(url);
        if (connection instanceof HttpsURLConnection) {
            HttpsURLConnection sslCon = (HttpsURLConnection) connection;
            if (sslContextFactory != null) {
                sslCon.setSSLSocketFactory(sslContextFactory);
            }
            if (hostnameVerifier != null) {
                sslCon.setHostnameVerifier(hostnameVerifier);
            }
        }
        connection.setConnectTimeout(options.connectTimeoutMillis());
        connection.setReadTimeout(options.readTimeoutMillis());
        connection.setAllowUserInteraction(false);
        connection.setInstanceFollowRedirects(options.isFollowRedirects());
        connection.setRequestMethod(request.httpMethod().name());

        Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
        boolean gzipEncodedRequest = this.isGzip(contentEncodingValues);
        boolean deflateEncodedRequest = this.isDeflate(contentEncodingValues);

        boolean hasAcceptHeader = false;
        Integer contentLength = null;
        for (String field : request.headers().keySet()) {
            if (field.equalsIgnoreCase("Accept")) {
                hasAcceptHeader = true;
            }
            for (String value : request.headers().get(field)) {
                if (field.equals(CONTENT_LENGTH)) {
                    if (!gzipEncodedRequest && !deflateEncodedRequest) {
                        contentLength = Integer.valueOf(value);
                        connection.addRequestProperty(field, value);
                    }
                } else {
                    connection.addRequestProperty(field, value);
                }
            }
        }
        // Some servers choke on the default accept string.
        if (!hasAcceptHeader) {
            connection.addRequestProperty("Accept", "*/*");
        }

        if (request.body() != null) {
            if (disableRequestBuffering) {
                if (contentLength != null) {
                    connection.setFixedLengthStreamingMode(contentLength);
                } else {
                    connection.setChunkedStreamingMode(8196);
                }
            }
            connection.setDoOutput(true);
            OutputStream out = connection.getOutputStream();
            if (gzipEncodedRequest) {
                out = new GZIPOutputStream(out);
            } else if (deflateEncodedRequest) {
                out = new DeflaterOutputStream(out);
            }
            try {
                out.write(request.body());
            } finally {
                try {
                    out.close();
                } catch (IOException suppressed) { // NOPMD
                }
            }
        }
        return connection;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

20210723174836687.png

Released under the MIT License.