Kafka Log

概括 #

FileRecords #

FileRecords 读写 #

ByteBufferMessageSet #

功能与MemoryRecords类似
主要功能

  1. 将Message集合按照指定的压缩类型进行压缩
  2. 提供迭代器,实现深层迭代和浅层迭代两种迭代方式
  3. 提供消息验证和分配

LogValidator #

OffsetIndex #

为了提高查找消息的性能,从Kafka0.8版本开始,为每个日志文件添加了对应的索引文件。
OffsetIndex对象对应管理磁盘上的一个索引文件

Kafka使用稀疏索引的方式构建消息的索引,它不保证每个消息在索引文件中都有对应的索引项,这算是磁盘空间,内存空间、查找时间等多个方面的折中
不断减小索引文件大小的目的是为了将索引文件映射到内存,在OffsetIndex中使用 MappedByteBuffer 将索引文件映射到内存中

LogSegment #

为了防止Log文件过大,将Log切分成多个日志文件,每个日志文件对应一个LogSegment。 在LogSegment中封装了一个FileRecord和一个OffsetIndex对象,提供日志文件和索引文件的读写功能以及其他辅助功能

read #

  1. 将absoluteOffset转换成Index File中使用的相对offset,得到17 通过OffsetIndex.lookup()方法查找Index File的到(7,700)这个索引项
  2. 根据(7,700)索引项,从 FileRecords 中position=700处,查找absoluteOffset为1017的消息
  3. 通过 FileRecords.searchFor() 方法遍历查找,得到 (1018,800) 这个位置消息

recover #

根据日志文件重建,同时验证日志文件中消息的合法性。在重建索引过程中,如果遇到压缩消息,需要进行解压,主要原因是因为索引项中保存的相对offset是第一条消息的offset。而外部消息offset是压缩消息集合中最后一条消息的offset。

Log #

Log是对多个 LogSegment 对象的顺序组合,形成一个逻辑的日志。
为了实现快速定位 LogSegment,Log 使用跳表对 LogSegment 进行管理

appendAsLeader() #

append #

read #

Log.read() 实现了读取消息的功能,它的实现逻辑是:通过segments跳表,快速定位到读取的起始 LogSegment 并从中读取消息。
Log.append()通过加锁进行同步控制,Log.read() 并没有加锁操作,因为它在开始查询消息之前会将 nextOffsetMetadata 保存成方法的局部变量,从而避免线程安全的问题
从日志中读取消息。
参数: 
startOffset – 开始读取的偏移量
maxLength – 要读取的最大字节数
maxOffset – 要读取的偏移量,独占。 (即此偏移量不包含在结果消息集中)
minOneMessage – 如果为 true,即使超过maxLength (如果存在),也会返回第一条消息
isolationLevel – fetcher 的隔离级别。 READ_UNCOMMITTED 隔离级别具有传统的读取语义(例如,消费者仅限于获取高水位线)。 在 READ_COMMITTED 中,消费者仅限于获取最后一个稳定的偏移量。 此外,在 READ_COMMITTED 中,在获取之后查询事务索引以收集获取范围内的中止事务列表,消费者使用该列表在将获取的记录返回给用户之前对其进行过滤。 请注意,从关注者获取的信息始终使用 READ_UNCOMMITTED。
返回: 
获取数据信息包括获取起始偏移元数据和读取的消息

LogManager #

在一个Broker上的所有 Log 都是由 LogManager 进行管理的。LogManager 提供了加载 Log、创建 Log集合 删除Log集合、查询Log集合等功能
三个周期性的后台任务以及 Cleaner线程

  • log-flusher任务:日志刷写
  • log-retention任务:日志保留
  • recovery-point-checkpoint: 检查点刷新
  • Cleaner:日志清理

结构 #

定时任务 #

cleanupLogs #

flush操作,检查要刷新的脏日志 #

帮助Broker进行Log恢复 #

日志压缩 #

通过 log-retention 任务,Kafka 服务端可以避免出现大量日志占满磁盘的情况。
log-retention 任务配置可以对整个Broker设置全局配置,也可以对某些特定的Topic 配置

Kafka还提供了 日志压缩功能,通过此功能可以有效地减小日志文件的大小,缓解磁盘紧张的情况

Log在写入消息时就是将消息追加到 activeSegment 的日志文件末尾。为了避免 activeSegment 成为热点,activeSegment 不会 参与日志压缩操作,而是只压缩其余的只读的 LogSegment。
在日志压缩过程中启动多个 Cleaner 线程,可以通过调整 Cleaner 线程池中的线程数量,优化并发压缩的性能,减少对整个服务端性能的影响。 一般情况下,Log的数据量很大,为了避免Cleaner 线程与其他业务线程长时间竞争CPU,并不会将除 activeSegment 之外的所有 LogSegment 在一次压缩操作中全部处理掉,而是将这些 LogSegment 分批进行压缩

Cleaner #

删除 log #