文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

图解 Kafka 源码之 NetworkClient 网络通信组件架构设计

2024-11-30 17:18

关注

大家好,我是 华仔, 又跟大家见面了。

上篇主要带大家深度剖析了「发送网络 I/O 的 Sender 线程的架构设计」,消息先被暂存然后调用网络I/O组件进行发送,今天主要聊聊「真正进行网络 I/O 的 NetworkClient 的架构设计」深度剖析下消息是如何被发送出去的。

认真读完这篇文章,我相信你会对 Kafka NetworkClient 的源码有更加深刻的理解。

这篇文章干货很多,希望你可以耐心读完。

一、总的概述

继续通过「场景驱动」的方式,来看看消息是如何在客户端被累加和待发送的。

在上篇中,我们知道了消息被 Sender 子线程先暂存到 KafkaChannel 的 Send 字段中,然后调用 NetworkClient#client.poll() 进行真正发送出去,如下图所示「6-11步」。

NetworkClient 为「生产者」、「消费者」、「服务端」等上层业务提供了网络I/O的能力。在 NetworkClient 内部使用了前面介绍的 Kafka 对 NIO 的封装组件,同时做了一定的封装,最终实现了网络I/O能力。NetworkClient  不仅仅用于客户端与服务端的通信,也用于服务端之间的通信。

接下来我们就来看看,「NetworkClient 网络I/O组件的架构实现以及发送处理流程」,为了方便大家理解,所有的源码只保留骨干。

二、NetworkClient 架构设计

NetworkClient 类是 KafkaClient 接口的实现类,它内部的重要字段有「Selectable」、「InflightRequest」以及内部类 「MetadataUpdate」。

github 源码地址如下:

https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java

https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java

https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java

https://github.com/apache/kafka/blob/2.7.0/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java

1、关键字段

public class NetworkClient implements KafkaClient {
// 状态枚举值
private enum State {
ACTIVE,
CLOSING,
CLOSED
}

// 用于执行网络 I/O 的选择器
private final Selectable selector;
// Metadata元信息的更新器, 它可以尝试更新元信息
private final MetadataUpdater metadataUpdater;

// 管理集群所有节点连接的状态
private final ClusterConnectionStates connectionStates;

// 当前正在发送或等待响应的请求集合
private final InFlightRequests inFlightRequests;

// 套接字发送数据的缓冲区的大小(以字节为单位)
private final int socketSendBuffer;

// 套接字接收数据的缓冲区的大小(以字节为单位)
private final int socketReceiveBuffer;

// 表示客户端id,标识客户端身份
private final String clientId;

// 向服务器发送请求时使用的当前关联 ID
private int correlation;

// 单个请求等待服务器确认的默认超时
private final int defaultRequestTimeoutMs;

// 重连的退避时间
private final long reconnectBackoffMs;

private final boolean discoverBrokerVersions;
// broker 端版本
private final ApiVersions apiVersions;
// 存储着要发送的版本请求,key 为 nodeId,value 为构建请求的 Builder
private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>();
// 取消的请求集合
private final List<ClientResponse> abortedSends = new LinkedList<>();

从该类属性字段来看比较多,这里说几个关键字段:

  1. selector:Kafka 自己封装的 Selector,该选择器负责监听「网络I/O事件」、「网络连接」、「读写操作」。
  2. metadataUpdater:NetworkClient 的内部类,主要用来实现Metadata元信息的更新器, 它可以尝试更新元信息。
  3. connectionStates:管理集群所有节点连接的状态,底层使用 Map实现,NodeConnectionState 枚举值表示连接状态,并且记录了最后一次连接的时间戳。
  4. inFlightRequests:用来保存当前正在发送或等待响应的请求集合。
  5. socketSenderBuffer:表示套接字发送数据的缓冲区的大小。
  6. socketReceiveBuffer:表示套接字接收数据的缓冲区的大小。
  7. clientId:表示客户端id,标识客户端身份。
  8. reconnectBackoffMs:表示重连的退避事件,为了防止短时间内大量重连造成的网络压力,设计了这么一个时间段,在此时间段内不得重连。

2、关键方法

NetworkClient 类的方法也不少,这里针对关键方法逐一讲解下。

(1)ready()


@Override
public boolean ready(Node node, long now) {
// 空节点
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
// 1、判断节点是否准备好发送请求
if (isReady(node, now))
return true;
// 2、判断节点连接状态
if (connectionStates.canConnect(node.idString(), now))
// if we are interested in sending to a node and we don't have a connection to it, initiate one
// 3、初始化连接,但此时不一定连接成功了
initiateConnect(node, now);

return false;
}


@Override
public boolean isReady(Node node, long now) {
// if we need to update our metadata now declare all requests unready to make metadata requests first priority
// 当发现正在更新元数据时,会禁止发送请求 && 当连接没有创建完毕或者当前发送的请求过多时,也会禁止发送请求
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
}


private boolean canSendRequest(String node, long now) {
// 三个条件必须都满足
return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
}

该方法表示某个节点是否准备好并可以发送请求,主要做了三件事:

  1. 先判断节点是否已经准备好连接并接收请求了,需要满足以下四个条件:
  • !metadataUpdater.isUpdateDue(now):不能是正在更新元数据的状态,且元数据不能过期。
  • canSendRequest(node.idString(), now):此处有3个条件。(1)、客户端和 node 连接是否处于 ready 状态;(2)、客户端和 node 的 channel 是否建立好;(3)、inFlightRequests 中对应的节点是否可以接收更多的请求。
  1. 如果连接好返回 true 表示准备好,如果没有准备好接收请求,则会尝试与对应的 Node 连接,此处也需要满足两个条件:
  • 首先连接必须是 isDisconnected,不能是 connecteding 状态,即客户端与服务端的连接状态是没有连接上

  • 两次重试之间时间差要大于重试退避时间,目的就是为了避免网络拥塞,防止重连过于频繁造成网络压力过大

  1. 最后初始化连接。

(2)initiateConnect()


private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
// 1、更新连接状态为正在连接
connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
// 获取连接地址
InetAddress address = connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node {} using address {}", node, address);
// 2、调用 selector 尝试异步进行连接,后续通过selector.poll进行监听事件就绪
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
log.warn("Error connecting to node {}", node, e);
// Attempt failed, we'll try again after the backoff
connectionStates.disconnected(nodeConnectionId, now);
// Notify metadata updater of the connection failure
metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
}
}

该方法主要是进行初始化连接,做了两件事:

  1. 调用 connectionStates.connecting() 更新连接状态为正在连接。
  2. 调用 selector.connect() 异步发起连接,此时不一定连接上了,后续 Selector.poll() 会监听连接是否准备好并完成连接,如果连接成功,则会将  ConnectionState 设置为 CONNECTED。

当连接准备好后,接下来我们来看下发送相关的方法。

(3)send()、doSend()


public final class ClientRequest {
// 节点地址
private final String destination;
// ClientRequest 中通过 requestBuilder 给不同类型的请求设置不同的请求内容
private final AbstractRequest.Builder requestBuilder;
// 请求头的 correlationId
private final int correlationId;
// 请求头的 clientid
private final String clientId;
// 创建时间
private final long createdTimeMs;
// 是否需要进行响应
private final boolean expectResponse;
// 请求的超时时间
private final int requestTimeoutMs;
// 回调函数 用来处理响应
private final RequestCompletionHandler callback;
......
}


@Override
public void send(ClientRequest request, long now) {
doSend(request, false, now);
}

// 检测请求版本是否支持,如果支持则发送请求
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
// 确认是否活跃
ensureActive();
// 目标节点id
String nodeId = clientRequest.destination();
// 是否是 NetworkClient 内部请求 这里为 false
if (!isInternalRequest) {
// 检测是否可以向指定 Node 发送请求,如果还不能发送请求则抛异常
if (!canSendRequest(nodeId, now))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder builder = clientRequest.requestBuilder();
try {
// 检测版本
NodeApiVersions versionInfo = apiVersions.get(nodeId);
// ... 忽略
// builder.build()是 ProduceRequest.Builder,结果是ProduceRequest
// 调用 doSend 方法
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException unsupportedVersionException) { log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder, clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
// 请求的版本不协调,那么生成 clientResponse
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
// 添加到 abortedSends 集合里
abortedSends.add(clientResponse);
}
}


private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
// 目标节点地址
String destination = clientRequest.destination();
// 生成请求头
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
}
// 1、构建 NetworkSend 对象 结合请求头和请求体,序列化数据,保存到 NetworkSend
Send send = request.toSend(destination, header);
// 2、构建 inFlightRequest 对象 保存了发送前的所有信息
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
// 3、把 inFlightRequest 加入 inFlightRequests 集合里
this.inFlightRequests.add(inFlightRequest);
// 4、调用 Selector 异步发送数据,并将 send 和对应 kafkaChannel 绑定起来,并开启该 kafkaChannel 底层 socket 的写事件,等待下一步真正的网络发送
selector.send(send);
}

@Override
public boolean active() {
// 判断状态是否是活跃的
return state.get() == State.ACTIVE;
}

// 确认是否活跃
private void ensureActive() {
if (!active())
throw new DisconnectException("NetworkClient is no longer active, state is " + state);
}

从上面源码可以看出此处发送并不是真正的网络发送,而是先将数据发送到缓存中。

  1. 首先最外层是 send() ,里面调用 doSend() 。
  2. 这里的 doSend() 主要的作用是判断 inFlightRequests 集合上对应的节点是不是能发送请求,需要满足三个条件:
  1. 最后再次调用另一个 doSend(),用来最终的请求发送到缓存中。步骤如下:

综上可以得出这里的发送过程其实是把要发送的请求先封装成 inFlightRequest,然后放到 inFlightRequests 集合里,然后放到对应 channel 的字段 NetworkSend 里缓存起来。总之,这里的发送过程就是为了下一步真正的网络I/O发送而服务的

接下来看下真正网络发送的方法。

(4)poll()

该方法执行网络发送并把响应结果「pollSelectionKeys 的各种读写」做各种状态处理,此处是通过调用 handleXXX() 方法进行处理的,代码如下:


@Override
public List<ClientResponse> poll(long timeout, long now) {
// 确认是否活跃
ensureActive();
// 取消发送是否为空
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
// 1、尝试更新元数据
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
// 2、执行网络 I/O 操作,真正读写发送的地方,如果客户端的请求被完整的处理过了,会加入到completeSends 或 complteReceives 集合中
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}

// process completed actions
long updatedNow = this.time.milliseconds();
// 响应结果集合:真正的读写操作, 会生成responses
List<ClientResponse> responses = new ArrayList<>();
// 3、完成发送的handler,处理 completedSends 集合
handleCompletedSends(responses, updatedNow);
// 4、完成接收的handler,处理 completedReceives 队列
handleCompletedReceives(responses, updatedNow);
// 5、断开连接的handler,处理 disconnected 列表
handleDisconnections(responses, updatedNow);
// 6、处理连接的handler,处理 connected 列表
handleConnections();
// 7、处理版本协调请求(获取api版本号) handler
handleInitiateApiVersionRequests(updatedNow);
// 8、超时连接的handler,处理超时连接集合
handleTimedOutConnections(responses, updatedNow);
// 9、超时请求的handler,处理超时请求集合
handleTimedOutRequests(responses, updatedNow);
// 10、完成响应回调
completeResponses(responses);

return responses;
}

这里的步骤比较多,我们按照先后顺序讲解下。

  1. 尝试更新元数据。
  2. 调用 Selector.poll() 执行真正网络 I/O 操作,可以点击查看 图解 Kafka 源码网络层实现机制之 Selector 多路复用器 主要操作以下3个集合。
  • connected集合:已经完成连接的 Node 节点集合。
  • completedReceives集合:接收完成的集合,即 KafkaChannel 上的 NetworkReceive 写满后会放入这个集合里。
  • completedSends集合:发送完成的集合,即 channel 上的 NetworkSend 读完后会放入这个集合里。
  1. 调用 handleCompletedSends() 处理 completedSends 集合。
  2. 调用 handleCompletedReceives() 处理 completedReceives 队列。
  3. 调用 handleDisconnections() 处理与 Node 断开连接的请求。
  4. 调用 handleConnections() 处理 connected 列表。
  5. 调用 handleInitiateApiVersionRequests() 处理版本号请求。
  6. 调用 handleTimedOutConnections() 处理连接超时的 Node 集合。
  7. 调用 handleTimedOutRequests() 处理 inFlightRequests 集合中的超时请求,并修改其状态。
  8. 调用 completeResponses() 完成每个消息自定义的响应回调。

接下来看下第 3~9 步骤的方法实现。

(5)handleCompletedSends()

当 NetworkClient 发送完请求后,就会调用 handleCompletedSends 方法,表示请求已经发送到 Broker 端了。


private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
// 1、遍历 completedSends 发送完成的请求集合,通过调用 Selector 获取从上一次 poll 开始的请求
for (Send send : this.selector.completedSends()) {
// 2、从 inFlightRequests 集合获取该 Send 关联对应 Node 的队列取出最新的请求,但并没有从队列中删除,取出后判断这个请求是否期望得到响应
InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
// 3、是否需要响应, 如果不需要响应,当Send请求完成时,就直接返回.还是有request.completed生成的ClientResponse对象
if (!request.expectResponse) {
// 4、如果不需要响应就取出 inFlightRequests 中该 Sender 关联对应 Node 的 inFlightRequest,即提取最新的请求
this.inFlightRequests.completeLastSent(send.destination());
// 5、调用 completed() 生成 ClientResponse,第一个参数为null,表示没有响应内容,把请求添加到 Responses 集合
responses.add(request.completed(null, now));
}
}
}

该方法主要用来在客户端发送请求后,对响应结果进行处理,做了五件事:

  1. 遍历 seletor 中的 completedSends 集合,逐个处理完成的 Send 对象。
  2. 从 inFlightRequests 集合获取该 Send 关联对应 Node 的队列中第一个元素,但并没有从队列中删除,取出后判断这个请求是否期望得到响应。
  3. 判断是否需要响应。
  4. 如果不需要响应就删除 inFlightRequests 中该 Sender 关联对应 Node 的 inFlightRequest,对于 Kafka 来说,有些请求是不需要响应的,对于发送完不用考虑是否发送成功的话,就构建 callback 为 null 的 Response 对象。
  5. 通过 InFlightRequest.completed(),生成 ClientResponse,第一个参数为 null 表示没有响应内容,最后把 ClientResponse 添加到 Responses 集合。

从上面源码可以看出,「completedSends」集合与「InflightRequests」集合协作的关系。

但是这里有个问题:如何保证从 Selector 返回的请求,就是对应到 InflightRequests 集合队列的最新的请求呢?

completedSends 集合保存的是最近一次调用 poll() 方法中发送成功的请求「发送成功但还没有收到响应的请求集合」。而 InflightRequests 集合存储的是已经发送但还没收到响应的请求。每个请求发送都需要等待前面的请求发送完成,这样就能保证同一时间只有一个请求正在发送,因为 Selector 返回的请求是从上一次 poll 开始的,这样就对上了。

「completedSends」的元素对应着「InflightRequests」集合里对应队列的最后一个元素, 如下图所示:

(6)handleCompletedReceives()

当 NetworkClient 收到响应时,就会调用 handleCompletedReceives 方法。


private void handleCompletedReceives(List<ClientResponse> responses, long now) {
// 1、遍历 CompletedReceives 响应集合,通过 Selector 返回未处理的响应
for (NetworkReceive receive : this.selector.completedReceives()) {
// 2、获取发送请求的 Node id
String source = receive.source();
// 3、从 inFlightRequests 集合队列获取已发送请求「最老的请求」并删除(从 inFlightRequests 删除,因为inFlightRequests 存储的是未收到请求响应的 ClientRequest,现在请求已经有响应了,就不需要保存了)
InFlightRequest req = inFlightRequests.completeNext(source);
// 4、解析响应,并且验证响应头,生成 responseStruct 实例
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,throttleTimeSensor, now);
// 生成响应体
AbstractResponse response = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
....
// If the received response includes a throttle delay, throttle the connection.
// 流控处理
maybeThrottle(response, req.header.apiVersion(), req.destination, now);
// 5、判断返回类型
if (req.isInternalRequest && response instanceof MetadataResponse)
// 处理元数据请求响应
metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);
else if (req.isInternalRequest && response instanceof ApiVersionsResponse)
// 处理版本协调响应
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) response);
else
// 普通发送消息的响应,通过 InFlightRequest.completed(),生成 ClientResponse,将响应添加到 responses 集合中
responses.add(req.completed(response, now));
}
}

// 解析响应,并且验证响应头,生成 responseStruct 实例
private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, Sensor throttleTimeSensor, long now) {
// 解析响应头
ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer,
requestHeader.apiKey().responseHeaderVersion(requestHeader.apiVersion()));
// 解析响应体
Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer);
// 验证请求头与响应头的 correlation id 必须相等
correlate(requestHeader, responseHeader);
if (throttleTimeSensor != null && responseBody.hasField(CommonFields.THROTTLE_TIME_MS))
throttleTimeSensor.record(responseBody.get(CommonFields.THROTTLE_TIME_MS), now);
return responseBody;
}

该方法主要用来处理接收完毕的网络请求集合,做了五件事:

  1. 遍历 selector 中的 completedReceives 集合,逐个处理完成的 Receive 对象。
  2. 获取发送请求的 Node id。
  3. 从 inFlightRequests 集合队列获取已发送请求「最老的请求」并删除(从 inFlightRequests 删除,因为inFlightRequests 存储的是未收到请求响应的 ClientRequest,现在请求已经有响应了,就不需要保存了)。
  4. 解析响应,并且验证响应头,生成 responseStruct 实例,生成响应体。
  5. 处理响应结果,此处分为三种情况:
  • 处理元数据请求响应,则调用 metadataUpdater.handleSuccessfulResponse()。
  • 处理版本协调响应,则调用 handleApiVersionsResponse()。
  • 普通发送消息的响应,通过 InFlightRequest.completed(),生成 ClientResponse,将响应添加到 responses 集合中。

从上面源码可以看出,「completedReceives」集合与「InflightRequests」集合也有协作的关系, completedReceives 集合指的是接收到的响应集合,如果请求已经收到响应了,就可以从 InflightRequests 删除了,这样 InflightRequests 就起到了可以防止请求堆积的作用。

与 「completedSends」正好相反,「completedReceives」集合对应 「InflightRequests」集合里对应队列的第一个元素,如下图所示:

(7)leastLoadedNode()


@Override
public Node leastLoadedNode(long now) {
// 从元数据中获取所有的节点
List<Node> nodes = this.metadataUpdater.fetchNodes();
if (nodes.isEmpty())
throw new IllegalStateException("There are no nodes in the Kafka cluster");
int inflight = Integer.MAX_VALUE;

Node foundConnecting = null;
Node foundCanConnect = null;
Node foundReady = null;

int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
// 节点是否可以发送请求
if (canSendRequest(node.idString(), now)) {
// 获取节点的队列大小
int currInflight = this.inFlightRequests.count(node.idString());
// 如果为 0 则返回该节点,负载最小
if (currInflight == 0) {
// if we find an established connection with no in-flight requests we can stop right away
log.trace("Found least loaded node {} connected with no in-flight requests", node);
return node;
} else if (currInflight < inflight) { // 如果队列大小小于最大值
// otherwise if this is the best we have found so far, record that
inflight = currInflight;
foundReady = node;
}
} else if (connectionStates.isPreparingConnection(node.idString())) {
foundConnecting = node;
} else if (canConnect(node, now)) {
if (foundCanConnect == null ||
this.connectionStates.lastConnectAttemptMs(foundCanConnect.idString()) >
this.connectionStates.lastConnectAttemptMs(node.idString())) {
foundCanConnect = node;
}
} else {
log.trace("Removing node {} from least loaded node selection since it is neither ready " +
"for sending or connecting", node);
}
}

// We prefer established connections if possible. Otherwise, we will wait for connections
// which are being established before connecting to new nodes.
if (foundReady != null) {
log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);
return foundReady;
} else if (foundConnecting != null) {
log.trace("Found least loaded connecting node {}", foundConnecting);
return foundConnecting;
} else if (foundCanConnect != null) {
log.trace("Found least loaded node {} with no active connection", foundCanConnect);
return foundCanConnect;
} else {
log.trace("Least loaded node selection failed to find an available node");
return null;
}
}

该方法主要是选出一个负载最小的节点,如下图所示:

三、InflightRequests 集合设计

通过上面的代码分析,我们知道「InflightRequests」集合的作用就是缓存已经发送出去但还没有收到响应的  ClientRequest 请求集合。底层是通过 ReqMap> 实现,其中 key 是 NodeId,value 是发送到对应 Node 的 ClientRequest 请求队列,默认为5个,参数:max.in.flight.requests.per.connection 配置请求队列大小。它为每个连接生成一个双端队列,因此它能控制请求发送的速度。

其作用有以下2个:

  1. 节点是否正常:收集从「开始发送」到「接收响应」这段时间的请求,来判断要发送的 Broker 节点是否正常,请求和连接是否超时等等,也就是说用来监控发送到哥哥节点请求是否正常
  2. 节点的负载情况:Deque 队列到一定长度后就认为某个 Broker 节点负载过高了。

final class InFlightRequests {
// 每个连接最大执行中的请求数
private final int maxInFlightRequestsPerConnection;
// 节点 Node 至客户端请求双端队列 Deque 的映射集合,key为 NodeId, value 是请求队列
private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();

// 线程安全的 inFlightRequestCount
private final AtomicInteger inFlightRequestCount = new AtomicInteger(0);
// 设置每个连接最大执行中的请求数
public InFlightRequests(int maxInFlightRequestsPerConnection) {
this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
}

这里通过「场景驱动」的方式来讲解关键方法,当有新请求需要发送处理时,会在队首入队。而实际被处理的请求,则是从队尾出队,保证入队早的请求先得到处理。

1、canSendMore()

先来看下发送条件限制, NetworkClient 调用这个方法用来判断是否还可以向指定 Node 发送请求。


public boolean canSendMore(String node) {
// 获取节点对应的双端队列
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
// 判断条件 队列为空 || (队首已经发送完成 && 队列中没有堆积更多的请求)
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

从上面代码可以看出限制条件,队列虽然可以存储多个请求,但是新的请求要是加进来条件是上一个请求必须发送成功。

条件判断如下:

  1. queue == null || queue.isEmpty(),队列为空就能发送。
  2. 判断 queue.peekFirst().send.completed() 队首是否发送完成。
  • 如果队首的请求迟迟发送不出去,可能就是网络的原因,因此不能继续向此 Node 发送请求。
  • 队首的请求与对应的 KafkaChannel.send 字段指向的是同一个请求,为了避免未发送的消息被覆盖掉,也不能让 KafkaChannel.send 字段指向新请求
  1. queue.size() < this.maxInFlightRequestsPerConnection,该条件就是为了判断队列中是否堆积过多请求,如果 Node 已经堆积了很多未响应的请求,说明这个节点出现了网络拥塞,继续再发送请求,则可能会超时。

2、add() 入队


public void add(NetworkClient.InFlightRequest request) {
// 这个请求要发送到哪个 Broker 节点上
String destination = request.destination;
// 从 requests 集合中根据给定请求的目标 Node 节点获取对应 Deque 双端队列 reqs
Deque<NetworkClient.InFlightRequest> reqs = this.requests.get(destination);
// 如果双端队列reqs为null
if (reqs == null) {
// 构造一个双端队列 ArrayDeque 类型的 reqs
reqs = new ArrayDeque<>();
// 将请求目标 Node 节点至 reqs 的映射关系添加到 requests 集合
this.requests.put(destination, reqs);
}
// 将请求 request 添加到 reqs 队首
reqs.addFirst(request);
// 增加计数
inFlightRequestCount.incrementAndGet();
}

3、completeNext() 出队最老请求


public NetworkClient.InFlightRequest completeNext(String node) {
// 根据给定 Node 节点获取客户端请求双端队列 reqs,并从队尾出队
NetworkClient.InFlightRequest inFlightRequest = requestQueue(node).pollLast();
// 递减计数器
inFlightRequestCount.decrementAndGet();
return inFlightRequest;
}

对比下入队和出队这2个方法,「入队 add()」时是通过 addFirst() 方法添加到队首的,所以队尾的请求是时间最久的,也是应该先处理的,所以「出队 completeNext()」是通过 pollLast(),将队列中时间最久的请求袁术移出进行处理。

4、lastSent() 获取最新请求​


public NetworkClient.InFlightRequest lastSent(String node) {
return requestQueue(node).peekFirst();
}

5、completeLastSent() 出队最新请求


public NetworkClient.InFlightRequest completeLastSent(String node) {
// 根据给定 Node 节点获取客户端请求双端队列 reqs,并从队首出队
NetworkClient.InFlightRequest inFlightRequest = requestQueue(node).pollFirst();
// 递减计数器
inFlightRequestCount.decrementAndGet();
return inFlightRequest;
}

最后我们来看看「InflightRequests」,表示正在发送的请求,存储着请求发送前的所有信息。

另外它支持生成响应 ClientResponse,当正常收到响应时,completed()会根据响应内容生成对应的 ClientResponse,当连接突然断开后,disconnected() 会生成 ClientResponse 对象,代码如下:

static class InFlightRequest {
// 请求头
final RequestHeader header;
// 这个请求要发送到哪个 Broker 节点上
final String destination;
// 回调函数
final RequestCompletionHandler callback;
// 是否需要进行响应
final boolean expectResponse;
// 请求体
final AbstractRequest request;
// 发送前是否需要验证连接状态
final boolean isInternalRequest; // used to flag requests which are initiated internally by NetworkClient
// 请求的序列化数据
final Send send;
// 发送时间
final long sendTimeMs;
// 请求的创建时间,即 ClientRequest 的创建时间
final long createdTimeMs;
// 请求超时时间
final long requestTimeoutMs;
.....

public ClientResponse completed(AbstractResponse response, long timeMs) {
return new ClientResponse(header, callback, destination, createdTimeMs, timeMs,
false, null, null, response);
}


public ClientResponse disconnected(long timeMs, AuthenticationException authenticationException) {
return new ClientResponse(header, callback, destination, createdTimeMs, timeMs,
true, null, authenticationException, null);
}
}

中间的部分代码请移步到星球查看

五、完整请求流程串联

一条完整的请求主要分为以下几个阶段:

  1. 调用 NetworkClient 的 ready(),连接服务端。
  2. 调用 NetworkClient 的 poll(),处理连接。
  3. 调用 NetworkClient 的 newClientRequest(),创建请求 ClientRequest。
  4. 然后调用 NetworkClient 的 send(),发送请求。
  5. 最后调用 NetworkClient 的 poll(),处理响应。

1、创建连接过程

NetworkClient 发送请求之前,都需要先和 Broker 端创建连接。NetworkClient 负责管理与集群的所有连接。

2、生成请求过程

3、发送请求过程

4、处理响应过程

(1)请求发送完成

(2)请求收到响应

(3)执行处理响应

六、总结

这里,我们一起来总结一下这篇文章的重点。

1、开篇总述消息消息被 Sender 子线程先将消息暂存到 KafkaChannel 的 send 中,等调用「poll方法」执行真正的网络I/O 操作,从而引出了为客户端提供网络 I/O 能力的 「NetworkClient 组件」。

2、带你深度剖析了「NetworkClient 组件」 、「InflightRequests」、「ClusterConnectionState」的实现细节。

3、最后带你串联了整个消息发送请求和处理响应的流程,让你有个更好的整体认知。

来源:华仔聊技术内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯