Kafka Server

KafkaServer 架构 #

SocketServer #

AbstractServerThread #

Processor #

RequestChannel #

KafkaServer 初始化 #

如何保证客户端请求的顺序
Processor.run(), 其中有多处注册/取消OP_READ事件以及注册/取消OP_WRITE事件的操作,通过这些操作的组合可以保证每个连接
只有一个请求和一个对应的响应,从而实现请求的顺序性

KafkaServer 流程 #

  1. KafkaProducer线程创建ProducerRecord后,会将其缓存进RecordAccumulator
  2. Sender线程从RecordAccumulator中获取缓存的消息,放入KafkaChannel.send字段中待发送,同时放入InFlightResquests队列中等待响应
  3. 客户端会通过KSelector将请求发送出去
  4. 在服务端,Processor线程使用KSelector读取请求并暂存到stageReceives队列中,KSelector.poll()之后,请求被转移到completeReceives队列中
  5. Processor将请求进行一些解析操作后,放入RequestChannel.requestQueue队列。Handler线程会从RequestQueue队列中取出请求进行处理,将处理之后生成的响应放入RequestChannel.responseQueue队列。
  6. Processor 线程从其对应的 RequestChannel.responseQueue 队列中取出响应并放入 inflightResponses 队列中缓存,当响应发送出去之后会将其从 inflightResponses 中删除。

API层 #