独家解读!京东高可用分布式流数据存储的架构设计
京东于 2018 年对其自研的消息队列中间件 JMQ 进行了一次彻底的重构,升级为 JournalQ。相比上一代产品,JournalQ 大幅提升了性能,功能上增加了 Kafka、MQTT 等协议的支持,提供更加完善的事务机制;设计上采用了存储与计算分离的模式,数据存储层从 JournalQ 中分离出来作为一个独立的中间件产品,高可用分布式的流数据存储:JournalKeeper。基于这种存储计算分离的设计,JournalQ 在产品的定位上从单纯的消息数据管道升级为流数据的存储分发平台。
笔者作为架构师,全程参与了 JournalQ 和 JournalKeeper 的设计和开发。这篇文章中,我将跟大家分享在开发这两款产品过程中的一些技术心得和实践经验。
流数据存储并不是当下技术圈火热的话题之一,甚至很少人会听到过这个话题,更少的人会在实际业务中使用一款流数据存储的产品。那京东为什么要开发这样一款流数据存储呢?
一切还需要从数据治理说起。随着微服务架构的普及,服务治理的理念已经深入每个开发者的心中。我们先回顾一下服务架构的演进过程:从最原始的单体应用,发展为烟筒式架构,然后是 SOA 模式,直到现在流行的微服务架构,服务的粒度被拆分的更细,服务的复用能力更强,服务间耦合度更低,直接带来的益处是降低了总体拥有成本。
和服务治理一样,当企业拥有的数据规模发展到一定阶段,数据也需要被治理。同样回顾一下数据存储架构的发展过程:早期业务规模不大时,单体服务配合单个数据库就可以满足需求;随着业务规模逐步扩大,数据规模也越来大,单体数据库已经无法满足性能和容量的需求,普遍的解决办法是对数据库进行分库分表,并且为了提高性能和可靠性,采用读写分离的架构。具备一定规模的互联网公司,往往业务分工更加细致,对数据的使用方式也更加多样化,分库分表已经不能满足其业务需求。例如,对于同样一份数据,搜索团队需要把数据存储在 ElasticSearch 中以便于提升搜索性能;大数据团队希望把实时数据接入到 Kafka 中,离线数据存放到 HDFS 中,以便于其计算和分析;负责在线业务的团队,需要将数据存放到 Redis 中用于缓存,获得更好的在线访问体验,等等。
为了满足不同的业务需求,同一份数据被转换成各种特定的数据格式,存放在各种各样数据库中。这种多副本的数据结构的优点是显而易见的:每个副本的数据结构都是基于特定业务的查询方式进行优化,并且选用最适合的数据库进行存储,可以达到最佳的查询性能。
为此付出的代价是耗费了大量存储和计算资源。为了维护数据新鲜,每一份数据副本都要实时或者定期从上游数据源进行数据同步,当数据量很大的时候,这种 ETL 操作需要大量的计算资源;每一份数据为了保证查询性能和可靠性,需要存放多个数据副本,为了确保数据可靠性,还需要定期备份数据快照,这些副本和快照都需要占用大量的存储资源。另外一个问题是数据耦合,当业务需要对某个数据库的数据结构变更时,还需要考虑是否能满足下游数据的需求,这种在不同的数据库之间直接进行数据同步的方式,造成了事实上的数据依赖。
为了治理这种数据乱象,在不降低各种业务性能的前提下,减少对存储和计算资源的使用,解决数据耦合问题,我们提出了如下这种数据架构:
我们这里面提到的“流数据”相比大家熟知的流计算中对应的概念更加宽泛一些,几乎所有的数据在产生的源头都可以认为是“流数据”,例如:
Nginx 收到的 Http 请求;
微服务计算后生成的更新数据的 SQL;
从页面和 APP 采集到的埋点数据;
各种应用程序的日志等。
将流数据从产生的源头就实时存入流数据平台,各业务系统统一从流数据平台获取数据经过必要的计算和转换后,存入对应的业务数据库中。数据使用方可以像使用消息队列一样从数据流平台获取订阅数据的实时推送,也可以按照指定的位置或者时间来进行数据定期的数据同步,实现了批流一体的模式。统一数据订阅避免了数据多次 ETL 浪费的计算资源。并且由于数据流的可回溯性,不需要对数据流本身备份数据快照,数据的使用方可以也可以减少数据快照的密度,节省了存储资源。使用统一的数据流平台,隔离了数据的生产者和数据使用方,有效的解决了数据耦合的问题。
当然,数据流存储也不是万能的,这种存储形式只支持按照时间和位置进行查询,并不适合业务系统直接使用,所以其定位还是一个数据存储、交换和分发的平台。
数据库和中间件这类 PaaS 层的基础设施类软件,近些年的发展趋势是越来越专业化、精细化。只在一个很窄的领域内解决一两个特定的问题,但是在这个领域内,具备极致的性能和体验,可以以极高的性能的处理海量的数据。我们的流数据存储也是这样一种设计思路,它的功能非常的简单,就是存储流数据,但需要具备存储海量数据的能力,并且具备非常高的性能。
我们在设计这款产品的时候,给它定义了如下这些特性:
有序:数据必须是严格有序的,不同顺序有可能导致完全不一样的结果。
Append Only: 数据只能追加写入,并且写入成功的数据具有不可变的特性。
此外,它还需要具备其它数据存储集群相同的一些通用特性,包括:
分布式: 支持集群模式,可以水平扩展;
高性能:具有远超一般结构化数据库的至少一个数量级超高的性读写能,这样整个系统才不会因为引入这个流数据存储而显著的降低总体性能;
可靠性:单节点损坏不会丢失数据;
顺序一致性:集群中所有节点按照一致的顺序更新数据,简单的说,刚刚写入的数据不要求立刻在所有节点都能读到,经过一个短暂的时延后数据陆续更新至所有节点是可以接受的。
近乎无限的容量。
我们请专门的测试团队对 JournalQ 进行了极限性能的压测,测试结果显示,单节点的极限写入性能为:32,961,776 条每秒,并且在极限情况下具有非常好的稳定性,响应时延的 tp99 不超过 1ms。数据同步读取的性能与写入性能相当,可以满足同步读写的要求,做到“写入多快就读取多快”。测试环境如下:
测试服务器:32C/256G/4TB SSD/ 万兆以太网
测试每条消息大小为:1KB
压缩方式:LZ4 压缩
接下来分享一下我们在实现过程中性能优化的一些经验。
对于数据存储类的系统,决定其读写性能的根本因素是存储结构的设计。JournalKeeper 采用了一种非常简单高效的存储结构,如下图所示:
数据按照顺序依次写入 Journal 文件中,然后将每条数据的全局偏移量作为索引值,按照同样的顺序记录在 Index 文件中。考虑到单个文件的大小限制,把 Journal 和 Index 都拆分成多个连续的文件,每个文件的文件名就是文件内第一条数据的全局偏移量。
数据写入时,由于流数据尾部追加写入的特性,只要一直保存索引和数据尾部的所在的文件和偏移量,就可以直接进行写数据操作,因此写入的时间复杂度为 O(1)。
读取的查找过程稍微复杂一些:
首先需要根据给定的索引序号找到对应的索引文件。由于每个索引的长度固定为 16 个字节,索引序号 x16 即可以计算出索引的全局偏移量。
JournalKeeper 把每个分区的索引文件的文件名(即这个文件第一条索引的全局偏移量)都存放在一个跳表中,找到索引所在文件的过程相当于在跳表中进行一次搜索,其时间复杂度为:O(logn),其中 n 为 Index 文件的个数;
找到文件用,用索引全局偏移量减去文件名就可以找到索引在文件中的位置,通过读取索引获得数据在 Journal 中的全局偏移量 ;
根据数据的全局偏移量查找数据的过程和查找索引类似,其时间复杂度为:O(logm),其中 m 为 Journal 文件的个数;
总体的读取时间复杂度为:O(logn)+O(logm)
其中 n 和 m 分别为 Index 文件和 Jouranl 文件的数量,考虑到 n 和 m 远远小于数据的总数,可以近似的认为:O(logn)+O(logm)≈O(1)
在 JournalKeeper 中,流数据是存储在磁盘中的,为了提高读写的性能,我们为其设计了一套定制的内存缓存系统。经测试,在正常读写的情况下,这套缓存的命中率约为 99.96%,几乎全部的读请求都可以命中缓存,提升了读性能的同时,还可以将几乎全部的磁盘 IO 用于数据写入,进一步提升了数据写入的性能。
在缓存页粒度的选择时,JournalKeeper 使用了最简单的策略:将整个文件缓存在内存中。无论是 Journal 文件还是 Index 文件,每个缓存页面对应一个文件。这种设计的优势在于,不需要再为缓存页编写单独的查找算法,只需要复用文件的查找算法即可,并且缓存页和文件的对应关系也变得非常简单。
不足之处是,如果只是为了读取文件中的一小部分数据,不得不加载整个文件,这种设计显然是不太经济的。但是考虑到流数据的顺序连续读写特性,随机的读写非常少,更多的读写方式从某个位置开始连续的向后读写,这种场景下,较大的缓存粒度不仅很少会出现“数据读到内存中却最终没有被使用”的情况,反而可以避免频繁的换页带来的性能抖动。
另外一个问题是,缓存页比较大,从磁盘加载整个文件到内存中的耗费的时间相对较长。我们针对这个问题做了二方面的优化。
大多数应用对流数据的访问有一个特性:越新的数据访问概率越高。比如像消息队列,正常情况下生产的数据马上就会被消费掉。数据在写入磁盘前一定会经过内存,那我们就没必要在读的时候再从磁盘上重新加载一次,直接从内存中读出来更快,而且节省了宝贵又特别慢的磁盘 IO,这个我们称为 读写共页,这是第一项优化。
第二项优化叫 异步预加载,原理非常简单但是效果很好。既然是连续读写,那上一个文件读写完成后,有非常大的概率会继续读写下一个文件。基于这个特性,当读写到接近文件的尾部时,JournalKeeper 会开启一个异步线程,把下一个文件先加载好,这样不仅能解决大文件加载慢的问题,还能避免同步加载文件导致的卡顿和性能抖动。
在内存管理方面,为了避免 JVM 频繁的垃圾回收造成的卡顿,JournalKeeper 选择使用堆外内存作为缓存。使用堆外内存的好处是性能更好,多数情况下可以减少一次内存拷贝。JournalKeeper 自己进行内存管理,避免了不可预期的 FullGC。
最后说一下缓存的淘汰策略,内存空间是有限的,不断有新的页需要缓存必然要淘汰一些缓存页。JournalKeeper 采用一种改进的 LRU 策略 PLRU。LRU 淘汰最近最少使用的页,JournalKeeper 根据流数据存储的特点,在淘汰时增加了一个考量维度:页面位置(即文件名)与尾部的距离。因为越是靠近尾部的数据,被访问的概率越大。这样综合考虑下的淘汰算法,不仅命中率更高,还能有效的避免“挖坟”问题:例如某个客户端正在从很旧的位置开始的向后读取一批历史数据,内存中的缓存很快都会被替换成这些历史数据,相当于大部分缓存资源都被消耗掉了,这样会导致其他客户端的访问命中率下降。加入位置权重后,比较旧的页面会很快被淘汰掉,减少挖坟对系统的影响。
说完了存储接下来聊一聊代码本身的优化。
首先更正一个在很多开发者的观念里都存在的误区:高并发并不等于高性能。在很多开发者的认知里,应用增加并发后性能确实得到了成倍的提升。其实根本的原因是单个并发的性能没有很好的优化,没有做到充分的利用计算资源,大部分时间都浪费在等待上了。
对于计算密集型的应用,瓶颈资源是 CPU,理想情况下,最高效的方式 CPU 有几个核就起几个线程,这样才是最充分的利用 CPU 资源。启动了过多的线程,反而会有一部分 CPU 时间在 CPU 上下文切换被浪费掉了。但如果代码优化的不够好,比如说每次计算出一批结果后把计算结果写到磁盘里,在写磁盘等待 IO 的这段时间内,这个线程对应的 CPU 核心是处于闲置状态的。这种情况下启动更多的线程,操作系统会自动把 CPU 调度给其它线程,这样看起来提高并发确实带来了性能提升。但我们要知道,只不过是因为我们的代码优化的不够充分,操作系统替我们的程序做了一些调度优化而已,总体的性能并没有达到最优的状态。
所以,做极致的性能优化,最先要解决的是减少等待。
实际开发过程中,可用的方法有很多,这里面分享几个比较简单实用方法:
异步化:将你的线程模型都改成异步化,比如使用 CompletableFuture、RxJava 等异步框架,避免等待那些可能耗时的操作结果。
拆分流程:把一个很长的流程拆分成几个短的流程。
减少锁:设计时尽量少的使用共享资源,减少锁的使用。
减少锁等待:实在需要使用锁的的地方,尽量减少锁的粒度或者用读写锁,减少锁的等待时间;
一般来说消息队列都是生产的时候需要处理的业务逻辑相对比较多,我们看下 JournalQ 是如何优化它这部分设计的。
写入数据的流程如下:
Producer 发消息给 Leader Broker;
Leader Broker 解析处理消息;
Leader Broker 将想消息复制给所有的 Follower Broker,同时异步将消息写入磁盘;
Leader Broker 收到大多数 Follower Broker 的复制成功确认后,给 Producer 回响应告知消息发送成功。
对于这个流程,我们设计的线程模型是这样的:
图中白色的细箭头是数据流,蓝色的箭头是控制流,白色的粗箭头代表远程调用。
这里我们设计了 6 组线程,将一个大的流程拆成了 6 个小流程。并且整个过程完全是异步化的。除了 JournalCache 的加载和卸载需要对文件加锁以外,没有用到其它的锁。每个小流程都不会等待其它流程的共享资源(没有数据需要处理时等待上游流程提供数据的情况除外),并且只要有数据就能第一时间处理。
说完了单节点的性能优化,我们来谈整个集群的架构。
从实用角度出发,我们在设计一个集群或者一个系统的总体架构时,需要在 CAPC 这几个方面进行取舍:
一致性 (Consistency)
可用性 (Availability)
性能 (Performance)
复杂度 (Complexity)
举个例子,现在很多微服务的应用都是用 MySQL 存储在线业务数据,为了加快业务访问会使用 Redis 缓存部分 MySQL 中的数据。这种设计提升了系统整体的性能,付出的代价是牺牲了数据的一致性:从 Redis 中读出的数据有可能并不是最新的,在某些特定应用的场景下,这种暂时的数据不一致是可以接受的。
系统的复杂度是容易被忽略的考量指标。过于复杂的设计更难于实现和维护,会大幅提高系统的总体拥有成本,因此在其它三个考量因素都可以接受的范围内,尽量采用简单的设计总是一个不错的选择。
如果可能的话,可以将服务设计成无状态的。无状态服务的设计让集群的结构更加简单,天然支持水平扩容。对于有状态的服务,可以尝试将存储和计算逻辑分离为无状态的计算服务和有状态的存储服务,然后用一致性的存储来保存状态数据。
很多分布式系统选择 Apache ZooKeeper(以下简称 ZK)用于存储状态数据,ZK 一主多从的架构和其自动选举机制很好的平衡了数据可靠性、一致性和可用性,并且具有相对不错的性能。JouralQ 的上一代产品 JMQ 也使用 ZK 存储元数据,但我们在运维 JMQ 的过程中也遇到了一些 ZK 的问题:
可维护性问题: 运维人员部署和运维 JMQ 集群时,不得不一并维护 ZK 集群,并且 ZK 集群故障会影响到 JMQ 集群。
多机房部署的问题:京东的 JMQ 集群包含超过 2000 个节点部署在全球多个机房中,当机房间的链路出现问题时,在拥有少数节点的机房中 ZK 集群将处于不可用状态,不可避免的会对使用 ZK 的 JMQ 集群产生影响。
数据容量的问题:ZK 本身的容量是有上限的(我们的经验数据是 500MB 左右),否则很容易导致选举失败,陷入反复选举集群不可用的状态。
选举速度慢:ZK 选举完成后,还需要完成超过半数以上节点的数据同步过程才能提供服务,当数据量比较大时数据同步的耗时也比较长,导致不可用时间也会相应变长。
考虑到上述问题,在设计 JournalKeeper 时,我们决定基于 Raft 协议自行实现分布式协调相关的服务,并把这部分功能直接集成到 JournalKeeper 的服务进程中,避免运维不必要的协调服务集群。
JournalKeeper 不仅使用 Raft 来维护其元数据,Raft 协议也被用来维护存储的流数据的一致性。我们为对于每个数据流(可以理解为一个 Topic)都创建一个 Raft 集群,集群的每个节点为一个虚拟进程,Leader 节点提供流数据写入服务,所有节点都可以提供流数据的读服务。
关于 Raft 一致性算法本身,大家可以参考作者在 GitHub 上的 主页:https://raft.github.io 和 论文:https://raft.github.io/raft.pdf。
Raft 的优点在于:
强一致:严格按照 Raft 协议实现的集群可以提供最高等级的一致性保证。
快速选举:Raft 的选举算法非常简单高效,大多数情况向通过一轮投票即可选出新的 Leader,并且选举完成后 Leader 立刻就可以提供服务,不需要等待数据同步。
易于理解:Raft 相比于其它的一致性算法,更易于理解和实现。
Raft 协议也存在一些不足之处:
首先,Raft 的大多数原则限制了集群的规模,一般来说,集群的节点数设置为 3、5 或 7 个,更多的节点数量会显著拖慢选举和复制的过程。受限于一致性的要求,Leader 只能顺序处理写入请求,处理写入请求过程中需要等待数据安全复制到大多数节点上。集群节点越多,Leader 的出流量更高,复制的时延更大,将导致集群的写入的性能下降。类似的,集群节点越多,选举的过程越慢,由于选举过程中集群是处于不可用状态的,过多的节点数量会降低集群的可用率。
原生的 Raft 协议并不能直接满足 JournalKeeper 的需求,我们在实现过程中对协议的算法做了一些适应性的调整,牺牲了部分一致性,用以换取性能的极大提升。
对于流数据存储来说,并不需要强一致,顺序一致已经可以满足需求。刚刚写入的日志在通过短暂的复制后才能读到是可以接受的。
JournalKeeper 在支持强一致的同时,提供另外一种比更宽松的高性能一致性实现:顺序一致性,来缓解性能和可用性的问题。顺序一致不要求在同一时刻所有节点的状态都保证完全相同,只要保证集群各节点按照一致的顺序保存同一份日志即可。Raft 协议中,已经提交的日志具有不变性,也就是说在集群任何一个节点上同一个位置,只要这个位置已经提交,读到的日志就是一样的。基于这个保证,对于流数据(也就是 Raft 的日志),可以把读请求分流到 Follower 节点上。
将一致性约束放宽至顺序一致的前提下,JournalKeeper 的所有的节点都可以提供读服务,实现了读写分离,大幅提高了集群整体的读性能。并且,可以通过增加 Follower 的数量来水平扩容,集群的节点数量越多,总体的读性能越好。通过将读请求的压力从 Leader 分流到 Followers 上去,相对的提高了写入性能。
我们将两种一致性混合使用,在一致性、性能和可用性三方面达到一个相对最优的平衡:
对于元数据的访问,通过 Leader 读写确保强一致;
对于流数据的写请求,通过 Leader 写入保证流数据的顺序和一致性;
对于流数据的读请求,不需要严格一致,通过 Follower 读取;
为了提高集群的吞吐量,需要用更多的节点数量分摊压力,但增加节点数量又会导致集群的写性能和可用率下降。JournalKeeper 提出了一种新的角色 观察者 (OBSERVER) 来解决这一矛盾。集群中的节点被划分为如下 2 种角色:
选民(VOTER) 拥有选举权和被选举权的节点,可以成为 Leader、Follower 或 Candidate 三种状态。
观察者(OBSERVER) 没有选举权和被选举权的节点,提供只读服务,只从集群的其它节点上复制已提交的日志。
选民节点即 Raft 中的节点,可以成为 Leader、Follower 或 Candidate,参与选举和复制过程。观察者从集群的其它节点拉取已提交的日志,更新自己的日志和提交位置。观察者节点提供和选民节点完全相同的读服务。
观察者既可以从选民节点拉取日志,也可以从其它观察者节点拉取日志。为观察者节点提供日志的节点无需维护观察者节点的状态,观察者节点也无需固定从某一个节点上拉取数据。观察者对于选民来说是透明的,选民无需感知观察者,这样确保 Raft 中定义的选举和复制的算法无需做任何变更,不破坏原有的安全性。观察者可以提供和所有选民一样的读服务,因此可以通过增加观察者的数量来提升集群的吞吐量。观察者不参与选举和复制的过程,增加观察者的数量不会拖慢选举和复制的性能。
集群节点超过一定数量时,大量的观察者节都从少量的选民节点拉取数据,可能会导致网络拥塞。这种情况下,可以使用多级复制的结构来分散日志复制的流量。需要注意的是,复制的层级越多,处于边缘的节点更新到最新状态的所需的时间越长。
针对 Raft 线性复制的性能较差的问题,JournalKeeper 在保证一致性的前提下,给出了一种并行复制的实现,能显著降低日志复制的平均时延,提升总体吞吐量。
在 Raft 中,串行复制的流程是:
读取:Leader 读取数据,构建复制请求;
网络传输:Leader 将复制请求发送给 Follower;
写入:Follower 收到日志后写入内存或磁盘,构建响应;
网络传输:Follower 将响应发送给 Leader;
提交:Leader 收到响应,如果满足条件则提交已完成复制的日志。
并行复制的思路是,Leader 并行发送复制请求,Follower 中维护一个按照日志位置排序请求列表,按照日志位置串行处理这些复制请求,Leader 按照位置顺序处理响应。也就是说整个复制流程拆分成上面的 5 个小流程,其中 1、2、4 三个小流程可以并发,3、5 为了保证数据一致性不能并发,依然串行执行。对于并发后可能出现的乱序和数据空洞问题,可以通过对请求按照数据的位置进行排序和少量数据重传解决,具体的实现细节大家可以参照 JournalKeeper 的源码或文档。
如果说单节点的性能优化更多的是一些小的方法和技巧,这个在中国传统文化里面称之为“术”。而集群层面的架构设计更多的是一些大方向的选择和取舍,这个称之为“道”,也就是道理的“道”。没有最好的架构,只有最适合的架构,所谓有一得必有一失,一个优秀的架构师,不仅要有具备足够的技术能力,更要有足够的高度和大局观,懂得在宏观层面做好把握和取舍,方能成就优秀产品。
JournalQ 和 JournalKeeper 这两款中间件产品将会在近期开源,也请大家多关注,谢谢!
点个在看少个 bug ?
关注公众号:拾黑(shiheibook)了解更多
[广告]赞助链接:
四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/
随时掌握互联网精彩
- 1 习主席复信里的中外情谊 7991760
- 2 搞“人草大战” 副市长等9人被处理 7982913
- 3 32岁飞行员失联车内情况被还原 7835484
- 4 提升银发经济含“金”量 7752780
- 5 伊万卡:永久退出政坛 7681921
- 6 张馨予老公年薪15万 7521335
- 7 女副校长婚内出轨疑为情夫举报 7483481
- 8 哈尔滨一冰雕多处血印?媒体核查 7335706
- 9 还有8天刑满释放的他改判死缓 7234186
- 10 爆料人:刘诗诗就是不想承认离婚 7165296