Kafka与RocketMq文件存储机制对比

2021-09-26

一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。

开头问题

kafka文件结构和rocketMQ文件结构是什么样子?特点是什么?

一、目录结构

Kafka

Kafka以partition为单元分片存储消息

Kafka部分名词解释如下:

  • Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • Segment:partition物理上由多个segment组成
  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

partition(分片目录)

为方便理解以单broker为例,假设建立一个broker建立的topic是kafka-topic-01,partition数量是3, 会形成以下目录

#1、分区目录文件
drwxr-x--- 2 root root 4096 Jul 26 19:35 kafka-topic-01-0
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-1
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-2
  • 名称topic+有序序号
  • 一个partition(目录)中的数据被切分为多个大小相等的segment(段)数据文件中
  • partition中消息只能顺序写读

segment(分段消息)

分为三个文件

  • 索引文件.index
  • 日志文件.log
  • 时间戳索引文件.timeindex
#2、分区目录中的日志数据文件和日志索引文件
-rw-r----- 1 root root 512K Jul 24 19:51 00000000000000000000.index
-rw-r----- 1 root root 1.0G Jul 24 19:51 00000000000000000000.log
-rw-r----- 1 root root 768K Jul 24 19:51 00000000000000000000.timeindex
-rw-r----- 1 root root 512K Jul 24 20:03 00000000000022372103.index
-rw-r----- 1 root root 1.0G Jul 24 20:03 00000000000022372103.log
-rw-r----- 1 root root 768K Jul 24 20:03 00000000000022372103.timeindex
-rw-r----- 1 root root 512K Jul 24 20:15 00000000000044744987.index
-rw-r----- 1 root root 1.0G Jul 24 20:15 00000000000044744987.log
-rw-r----- 1 root root 767K Jul 24 20:15 00000000000044744987.timeindex
-rw-r----- 1 root root  10M Jul 24 20:21 00000000000067117761.index
-rw-r----- 1 root root 511M Jul 24 20:21 00000000000067117761.log
-rw-r----- 1 root root  10M Jul 24 20:21 00000000000067117761.timeindex
  • segment文件以偏移量命名,数值最大64位long类型

segment内部-index文件

  • 索引文件采用稀疏索引(即有的消息不能找到对应的索引),目的是节省存储空间
  • 定长,占8个字节
消息单元的存储结构

字段名

说明

relativeOffset(4)

相对偏移量,相对baseOffset来说

position(4)

物理地址,日志文件中的物理地址

如何查找消息

如offset的值是368772

1.根据offset找到所在的segment,根据二分查找,找到消息所在的log文件0000000000000368769.log和索引文件0000000000000368769.index

2.计算下差368772-368769=3,在索引文件中也是二分查找,定位到是<3,497>记录,即对应的物理位置是497,从而找到消息

3.根据物理位置497在0000000000000368769.log文件找到消息。

segment内部-timeIndex文件

根据指定的时间戳查找偏移量信息

  • 文件名:以时间戳命名
  • 定长,12个字节
  • 时间戳只能递增,追加的时间戳小于之前的时间戳,不予添加

字段名

说明

timestamp(8)

当前日志分段最大时间戳

relativeOffset(4)

时间戳对应的相对偏移量

segment内部-log文件

RocketMQ

rocketMQ把所有topic中的消息都commitLog中

存储的文件主要分为:

  • commitlog: 存储消息实体
  • consumequeue: 按Topic和队列存储消息的offset
  • index: index按key、tag、时间等存储

commitlog(物理队列)

文件地址:${user.home} \store\${commitlog}${fileName}

  • 存放该broke所有topic的消息
  • 默认1G大小
  • 以偏移量为文件名,当一个文件写满时则创建新文件,这样的设计主要是方便根据消息的物理偏移量,快速定位到消息所在的物理文件
  • 一个消息存储单元是不定长的
  • 顺序写但是随机读

consumeQueue(消费队列)

文件地址:${storeRoot}\consumequeue\${topicName}\${queueId}\${fileName}

  • 文件名:跟commitlog一样以偏移量作为文件名
  • 按topic和queueId纬度分别存储消息commitLogOffset、size、tagHashCode
  • 一个存储单元是20个字节的定长的
  • 顺序读顺序写
消息单元的存储结构

字段名

说明

offset(8)

commitlog的偏移量

size(4)

commitlog消息大小

tagHashCode

tag的哈希值

indexFile(索引文件)

文件地址:${user.home}\store\index\${fileName}

  • 以时间作为文件名
  • 一个存储单元是20个字节定长的
  • 一个indexFile最多存储2000w条消息

索引文件(Index)提供消息检索的能力,主要在问题排查和数据统计等场景应用

如何查找消息

  1. 消费者顺序读取consumerQueue,获取到物理offset,根据物理offset去commitlog文件中随机读取消息实体

二、如何保存消息消费进度

Kafka

方式一:zookeeper存储

0.9之前老版本

消费者如果是根据javaapi来消费,也就是【kafka.javaapi.consumer.ConsumerConnector,通过配置参数【zookeeper.connect】来消费。这种情况下,消费者的offset会更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目录下,例如:

[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
5662
cZxid = 0x20006d28a
ctime = Wed Apr 12 18:20:51 CST 2017
mZxid = 0x30132b0ed
mtime = Tue Aug 22 18:53:22 CST 2017
pZxid = 0x20006d28a
cversion = 0
dataVersion = 5758
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0

保存方式:

consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset,该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同

方式二:broker存储

broker 存放 offset 是 kafka 从 0.9 版本开始

存储位置:

consumer 默认将 offset 持久化保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

提交offset分为:自动提交和手动提交

保存方式:

消费者正常运行,除了持久化一份消费offset到broker中,还会在内存中保存一份消费进度offset,所以当消费者都正常运行时__consumer_offsets使用的比较少。当消费者崩溃或者balance时,会从broker中拉取最后一次消费offset。

RocketMQ

方式一:集群模式

集群模式:topic中的一条消息只会同一个消费者组中的一个消费者消费,不会被多个消费者消费

对offset的管理分为本地模式和远程模式。本地模式是以文本文件的形式存储在客户端,而远程模式是将数据保存到broker端,对应的数据结构分别为LocalFileOffsetStore和RemoteBrokerOffsetStore。

集群模式使用的是远程模式。

存储位置:

ocketMQ的broker端中,offset的是以json的形式持久化到磁盘文件中,文件路径为${user.home}/store/config/consumerOffset.json

{
    "offsetTable": {
        "topic-name@consumer-group": {
            "0": 88526, 
            "1": 88528
        }
    }
}

保存方式:

定时持久化到broker磁盘ConsumerOffset.json

consumer从broker拉取消息后,Broker更新消费进度,仅仅是更新了内存中的offsetTable表,并没有涉及到ConsumerOffset.json这个文件。broker启动时会启动一个定时任务(默认5秒),来定时把消费offset持久化到磁盘consumerOffset.json,保存的过程是先将原来的文件存到ConsumerOffset.json.bak文件中,然后将新的内容存入ConsumerOffset.json文件

方式二:广播模式

广播模式:一条消息会被每个消费者消费

当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集。

三、特点

Kafka

为什么要设计成partition中多segment

  • 一个就是上面提到的如果使用单个 Partition 来管理数据,顺序往 Partition 中累加写势必会造成单个 Partition 文件过大,读消息是顺序读的(调用FileMessageSet的searchFor方法),文件过大,查询效率下降
  • 另一个原因是 Kafka 消息记录不是一直堆堆堆,默认是有日志清除策略的。要么是日志超过设定的保存时间触发清理逻辑,要么就是 Topic 日志文件超过阈值触发清除逻辑,如果是一个大文件删除是要锁文件的这时候写操作就不能进行。因此设置分段存储对于清除策略来说也会变得更加简单,只需删除较早的日志块即可

清理数据功能-日志清理

  1. 基于时间

日志删除任务会定时(默认5分钟执行一次)检查是否有保留时间超过设定阈值(默认保存7天)可删除的segment文件。

  1. 基于日志大小

日志删除任务会检查当前日志的大小是否超过设定的阈值retentionSize来寻找可删除的日志分段的文件集合deletableSegments,参考下图所示

基于日志大小的保留策略与基于时间的保留策略类似,其首先计算日志文件的总大小size和retentionSize的差值diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合deletableSegments。查找出deletableSegments之后就执行删除操作

基于日志起始偏移量

该删除策略具体是删除某日志分段的下一个日志分段的baseOffset小于等于logStartOffset的部分。

压缩数据

Producer 端压缩、Broker 端保持、Consumer 端解压缩

在Kafka中,压缩可能发生在两个地方:生产者端和Broker端。broker端保存的也是压缩的消息,传输到consumer端再进行解压缩

在吞吐量方面:LZ4 > Snappy > zstd / GZIP

RocketMQ

RocketMQ的CommitLog文件采用混合型存储

即所有的Topic下的消息队列共用同一个CommitLog的日志数据文件。感觉这样会增加随机读的概率,可以学着kakfa按topic隔离。

预加载MappedFile文件

消息写入时,每次都回去去mappedFileQueue中去拿mappedfile。而这个mappedfile是由后台运行的AllocateMappedFileService服务线程去创建和预分配的。这样下次获取时候直接返回就可以不用等待MappedFile创建分配所产生的时间延迟

文件预热

我们拿到mmapedfile文件,可能pagecache中还是出现页数据不存在的情况,所以rocketmq增加了预热

有一个warmMappedFile方法,它会把当前映射的文件,每一页遍历多去,写入一个 0 字节,然后再调用mlockmadvise(MADV_WILLNEED)

mlock:可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到 swap 空间。

madvise:给操作系统建议,说这文件在不久的将来要访问的,因此,提前读几页可能是个好主意

四、读写方式

通过哪些I/O机制来访问index和segment文件呢?可以分为写和读两块:

Kafka

写(生产)消息:

  • index文件较小,可以直接用mmap进行内存映射
  • segment文件较大,可以采用普通的write(FileChannel.write),由于是顺序写PageCache,可以达到很高的性能

读(消费)消息:

  • index文件仍然通过mmap读,缺页中断的可能性较小
  • segment可以使用sendfile进行零拷贝的发送给消费者,达到非常高的性能

RocketMQ

写(生产)消息:

  • CommitLog、ConsumerQueue都使用MMAP进行写

读(消费)消息:

  • commitLog和consumerQueue文件都是MMAP读

五、存储关键技术—Mmap、PageCache、sendfile

Mmap

普通读文件过程

大体流程如下:

  1. 进程使用系统调用向内核发起文件读取请求,此时会有用户态转为内核态的过程。
  2. 内核访问文件系统。
  1. 如果有 cache 直接返回数据,没有开始读取磁盘
  2. 读取成功将 page1 读取到 cache 中完成第一次 copy
  1. 通知内核读取完毕(不同IO模型实现不同)
  2. 将数据从位于内核空间的 cache 拷贝到进程空间,完成第二次拷贝。

这里简单说一下为啥要拷贝到进程中:进程之间是相互隔离的,而且在常规操作下进程无法访问内核数据,所以得将 cache 拷贝到进程当中,给进程使用。

  • 耗时主要集中在内核切换、copy时长

Mmap映射

没有数据拷贝,映射的是数据地址

mmap 把文件映射到用户空间里的虚拟内存,省去了从内核缓冲区复制到用户空间的过程,文件中的位置在虚拟内存中有了对应的地址,可以像操作内存一样操作这个文件,相当于已经把整个文件放入内存。mmap 在完成了 read、write 相同效果的同时不仅省去了内核到进程的内存拷贝过程,而且还可以实现数据的共享操作:一个文件可以同时被多个进程、内核映射,如果映射的文件被内核或其他进程修改,那么最终的结果也会反映到映射当中。

  • 映射的虚拟内存可以被多个进程读写
  • 少了每次内核态pageCache到进程私有内存的拷贝
  • 可以简单看成直接操作pageCache

Mmap的限制

  • 映射文件的大小有限制,一般最大1.5G~2G
  • MMAP 使用的是虚拟内存,和 PageCache 一样是由操作系统来控制刷盘的,虽然可以通过 force() 来手动控制,小内存场景不适合。

OS的PageCache机制

PageCache是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写访问,这里的主要原因就是在于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache

一、文件读取

如果一次读取文件时出现未命中(cache miss)PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取(ps:顺序读入紧随其后的少数几个页面)。这样,只要下次访问的文件已经被加载至PageCache时,读取操作的速度基本等于访问内存

二、文件写入

OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上对于文件的顺序读写操作来说,读和写的区域都在OS的PageCache内,此时读写性能接近于内存。不是顺序写,当pageCache中发现漏页,还是会去吧磁盘中数据拉到pageCache再写

sendfile

FileChannel#tranferTo transferFrom实现零拷贝

kafka消费的时候使用了零拷贝的sendfile。pagecache数据不经过内核切换直接拷贝到socket buffer传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少 为2次。根据测试结果,可以提高60%的数据发送性能。

六、参考

https://t1mek1ller.github.io/2019/11/13/kafka-rocketmq-storage/

https://tech.meituan.com/2015/01/13/kafka-fs-design-theory.html

swap 空间: swap space是磁盘上的一块区域,可以是一个分区,也可以是一个文件,或者是他们的组合。当RAM满了后,并且需要更多内存空间时,使用磁盘空间代替RAM空间