文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Spring Cloud Gateway网关全局核心过滤器路由执行过程详解

2024-11-30 15:57

关注

1 RouteToRequestUrlFilter

根据路由配置的url信息,构建成为要访问的目标地址,如下路由配置:

spring:
cloud:
gateway:
enabled: true
# 全局超时配置
httpclient:
connect-timeout: 10000
response-timeout: 5000
discovery:
locator:
enabled: true
lowerCaseServiceId: true
# 这里是全局过滤器,也就是下面在介绍过滤器执行的时候一定会执行StripPrefixGatewayFilterFactory#apply
# 返回的过滤器,如下路由配置:该过滤器会将你的请求转换为:http://localhost:8088/demos,保存到上下文中
# ServerWebExchange#getAttributes().put(GATEWAY_REQUEST_URL_ATTR, newRequest.getURI())
default-filters:
- StripPrefix=1
routes:
- id: R001
uri: http://localhost:8787
predicates:
- Path=/api-1
// 这里集合返回{}
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(
new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint())));
// choose负载查找指定服务(order-server)
return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {
if (!response.hasServer()) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
}
ServiceInstance retrievedInstance = response.getServer();
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
URI requestUrl = reconstructURI(serviceInstance, uri);
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
}).then(chain.filter(exchange))
.doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext(CompletionContext.Status.FAILED,
throwable, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
.doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(
lifecycle -> lifecycle.onComplete(new CompletionContext(
CompletionContext.Status.SUCCESS, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
}


protected URI reconstructURI(ServiceInstance serviceInstance, URI original) {
return LoadBalancerUriTools.reconstructURI(serviceInstance, original);
}


private Mono> choose(Request lbRequest, String serviceId,
Set supportedLifecycleProcessors) {
// 从order-service对应的ApplicationContext中查找ReactorServiceInstanceLoadBalancer
ReactorLoadBalancer loadBalancer = this.clientFactory.getInstance(serviceId,
ReactorServiceInstanceLoadBalancer.class);
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + serviceId);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 查找服务实例
return loadBalancer.choose(lbRequest);
}


private String getHint(String serviceId, Map hints) {
String defaultHint = hints.getOrDefault("default", "default");
String hintPropertyValue = hints.get(serviceId);
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
}


// 轮询算分
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
final AtomicInteger position;
ObjectProvider serviceInstanceListSupplierProvider;


public Mono> choose(Request request) {
// 接下面ClientFactoryObjectProvider中获取ServiceInstanceListSupplier
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}


private Response processInstanceResponse(ServiceInstanceListSupplier supplier,
List serviceInstances) {
Response serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}


private Response getInstanceResponse(List instances) {
if (instances.isEmpty()) {
return new EmptyResponse();
}
// TODO: enforce order?
int pos = Math.abs(this.position.incrementAndGet());
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}


class ClientFactoryObjectProvider implements ObjectProvider {
private final NamedContextFactory clientFactory;
// type = ServiceInstanceListSupplier
private final Class type;
// name = order-service
private final String name;


private ObjectProvider delegate() {
if (this.provider == null) {
// 从order-service对应ApplicationContext中获取ServiceInstanceListSupplier
// 这里最终返回的是:DiscoveryClientServiceInstanceListSupplier
this.provider = this.clientFactory.getProvider(this.name, this.type);
}
return this.provider;
}
}


public class LoadBalancerClientConfiguration {
@Configuration(proxyBeanMethods = false)
@ConditionalOnReactiveDiscoveryEnabled
@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
public static class ReactiveSupportConfiguration {


@Bean
@ConditionalOnBean(ReactiveDiscoveryClient.class)
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.configurations", havingValue = "default", matchIfMissing = true)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
// 这里最终构建的是:DiscoveryClientServiceInstanceListSupplier
return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
}
}
}


public final class ServiceInstanceListSupplierBuilder {
public ServiceInstanceListSupplierBuilder withDiscoveryClient() {
this.baseCreator = context -> {
// 先从order-service对应的ApplicationContext中查找ReactiveDiscoveryClient,如果你没有自定义,那么就会从
// 父容器中查找,如果你使用的nacos,那么会返回NacosReactiveDiscoveryClient
ReactiveDiscoveryClient discoveryClient = context.getBean(ReactiveDiscoveryClient.class);
return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
};
return this;
}
}

总结:

  1. 获取地址
    获取上一步中保存在上下文的地址
    URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
  2. 获取LoadBalancerLifecycle
    取得当前服务(order-service),对应的AnnotationConfigApplicationContext中配置的LoadBalancerLifecycle,该负载均衡生命周期能够监控负载均衡的执行过程。该类是泛型类,3个泛型参数,类型依次为:RequestDataContext.class, ResponseData.class, ServiceInstance.class。
  3. 获取ReactorServiceInstanceLoadBalancer
    获取当前服务(order-server),对应的AnnotationConfigApplicationContext中配置的ReactorServiceInstanceLoadBalancer。每一个服务都有一个对应的默认配置类LoadBalancerClientConfiguration,该配置类中有默认的RoundRobinLoadBalancer。我们可以为具体的服务提供LoadBalancerClientSpecification 类型的Bean,该类我们可以指定你要配置的serviceId及配置类,在配置类中我们可以自定义ReactorServiceInstanceLoadBalancer 的实现类Bean。
  4. 选择服务
    在上一步中获得ReactorServiceInstanceLoadBalancer后,接下来就是选取一个服务实例了。
  5. 重构URI
    上一步中获取了ServiceInstance 后就能够重构URL了,当前的URL为: http://localhost:9090/orders 构建后:http://localhost:9093/storages ,将该URL保存到上下文中 exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);

3 NettyRoutingFilter

public class NettyRoutingFilter implements GlobalFilter {
private final HttpClient httpClient;


public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 从上下文中获取解析后的目标地址
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
// ...
// 获取上下文中的路由信息
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
// getHttpClient获取客户端信息
Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
// ...
}).request(method).uri(url).send((req, nettyOutbound) -> {
// 发送网络请求
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
}).responseConnection((res, connection) -> {
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
// 建立的Connection对象保存到上下文中,在后续的NettyWriteResponseFilter中会获取该对象获取响应数据
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
}
setResponseStatus(res, response);
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
Type.RESPONSE);
if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
return Mono.just(res);
});


// 从路由中的元数据中获取response-timeout响应超时时间
Duration responseTimeout = getResponseTimeout(route);
if (responseTimeout != null) {
responseFlux = responseFlux
// 设置超时时间
.timeout(responseTimeout,
Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
.onErrorMap(TimeoutException.class,
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}


protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {
// 从路由的元数据中获取配置的连接超时时间:connect-timeout
Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR);
if (connectTimeoutAttr != null) {
Integer connectTimeout = getInteger(connectTimeoutAttr);
// 设置Netty的连接超时时间
// io.netty.channel.ChannelOption
return this.httpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
}
return httpClient;
}
}

总结:

  1. 获取URL
    获取上一步保存在上下文中的URL
    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
  2. 设置当前路由状态
    设置当前路由已经路由状态
    setAlreadyRouted(exchange);
    exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
  3. 获取路由
    Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
    获取当前的Route信息。主要就用来获取配置路由时提供的配置信息,比如:超时时间设置,如上配置。RoutePredicateHandlerMapping#getHandlerInternal方法中保存路由到上下文中
  4. 构建HttpClient
    通过上一步取得的Route对象,配置HttpClient相关属性,比如:超时时间。配置基本的http相关信息,建立连接后将Connection对象保存到上下文中,供下一个过滤器获取响应数据

4 NettyWriteResponseFilter

该过滤器的作用是处理由NettyRoutingFilter中建立的HTTP请求(包括:请求参数,请求头,建立连接);在NettyRoutingFilter中会将建立连接后的Connection保存到ServerWebExchange上下文中。

public class NettyWriteResponseFilter implements GlobalFilter, Ordered {
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
// until the NettyRoutingFilter is run
// @formatter:off
return chain.filter(exchange)
.doOnError(throwable -> cleanup(exchange))
.then(Mono.defer(() -> {
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
if (connection == null) {
return Mono.empty();
}
ServerHttpResponse response = exchange.getResponse();
// TODO: needed?
final Flux<DataBuffer> body = connection
.inbound()
.receive()
.retain()
.map(byteBuf -> wrap(byteBuf, response));
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
}
// 根据不同的ContentType做不同的响应
return (isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
})).doOnCancel(() -> cleanup(exchange));
// @formatter:on
}


protected DataBuffer wrap(ByteBuf byteBuf, ServerHttpResponse response) {
DataBufferFactory bufferFactory = response.bufferFactory();
if (bufferFactory instanceof NettyDataBufferFactory) {
NettyDataBufferFactory factory = (NettyDataBufferFactory) bufferFactory;
return factory.wrap(byteBuf);
}
// MockServerHttpResponse creates these
else if (bufferFactory instanceof DefaultDataBufferFactory) {
DataBuffer buffer = ((DefaultDataBufferFactory) bufferFactory).allocateBuffer(byteBuf.readableBytes());
buffer.write(byteBuf.nioBuffer());
byteBuf.release();
return buffer;
}
throw new IllegalArgumentException("Unkown DataBufferFactory type " + bufferFactory.getClass());
}


private void cleanup(ServerWebExchange exchange) {
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
if (connection != null && connection.channel().isActive() && !connection.isPersistent()) {
connection.dispose();
}
}


private boolean isStreamingMediaType(@Nullable MediaType contentType) {
return (contentType != null && this.streamingMediaTypes.stream().anyMatch(contentType::isCompatibleWith));
}
}

总结:

  1. 取得Connection
    取得上一步中保存的Connection
    Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
  2. 响应内容
    输出微服务端响应的数据
final Flux<DataBuffer> body = connection
.inbound()
.receive()
.retain()
.map(byteBuf -> wrap(byteBuf, response));

以上就是Gateway在处理一个路由请求的执行流程

完毕!!!

来源:实战案例锦集内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯