关于图数据库的概念,这里不作详细阐述。而是以图表的形式,对其与另外几种 NoSQL 产品进行比较。图数据库本身归属于 NoSQL 存储,而诸如KV 类型、宽表类型、文档类型、时序类型等其他 NoSQL 产品,各自具备独特的特性。从上图左侧的坐标轴中可以看到,从 KV 到宽表、文档,再到图,数据关联度和查询复杂度是越来越高的。前三者,即 KV、宽表和文档,主要关注的是单个记录内部的丰富性,但并未涉及记录间的关系。而图数据库则专注于处理这些关系。图数据库主要适用于需要挖掘深链路或多维度关系的业务场景。
接下来通过一个具体示例,再来对比一下图数据库与关系型数据库。这是社交网络中常见的一种表结构,包括四个数据表:用户表、好友关系表、点赞行为表以及笔记详情表。比如要查询 Tom 这个用户的好友所点赞的笔记的详细信息,那么可能需要编写一段冗长的 SQL 语句。在该 SQL 语句中,涉及到三个 join 操作,首先将用户表和好友关系表进行连接,从而获取 Tom 的所有好友信息。然后,将得到的中间结果与点赞行为表进行连接,以确定 Tom 的好友都点赞了哪些笔记。最后,还需要对先前生成的临时表和笔记详情表进行连接,以便最终获取这些笔记的全部内容。
关系型数据库中的 join 操作通常复杂度较高,其执行过程中需消耗大量的 CPU 资源、内存空间以及 IO,虽然我们可以通过精心的设计,例如针对所要关联的列创建索引,以降低扫描操作的比例,通过索引匹配来实现一定程度的性能提升。然而,这样的举措所产生的成本相对较高,因为所有新的场景都需要创建索引,要考虑如何撰写 SQL 中的 join 条件,选择哪个表作为驱动表等等,这些都需要耗费大量的精力和时间。
而如果采用图数据库,则会简单很多。首先进行图建模,创建两类顶点,分别为用户和笔记,同时创建两类边,一类是好友关系,即用户到用户的边;另一类是用户到笔记的点赞关系。当我们将这些数据存储到图数据库中时,它们在逻辑上呈现出一种网状结构,其关联关系已经非常明确。查询时,如上图中使用 Gremlin 语句,仅需四行代码即可获取到所需的信息。其中第一行 g.V().has('name', 'Tom'),用于定位 Tom 节点,两个 out 子句,第一个 out 子句用于查找 Tom 的好友,第二个 out 子句用于查找 Tom 的点赞笔记。当第二个 out 子句执行完毕后,就可以遍历所有外部的绿色顶点,即笔记节点。最后,读取它们的 content 属性。可以发现,与关系型数据库相比,图数据库的查询语句更加简洁、清晰易懂。
此外,图数据库还有一个更为显著的优势,就是在存储时,它已经将顶点及其关系作为一等公民进行设计和存储,因此在进行邻接边访问和关系提取时,效率极高。即使数据规模不断扩大,也不会导致查询时间显著增加。
2. 图数据库在小红书的使用场景
小红书是一个年轻的生活方式共享平台。在小红书,用户可以通过短视频、图片等方式,直观地记录生活的点点滴滴。在小红书内部,图数据库被广泛应用于多种场景中,下面将分别列举在线、近线以及离线场景的实例。
第一个案例是社交实时推荐功能。小红书具有典型的社区特性,用户可以在其中点赞、发布贴文、关注他人、转发信息等。譬如我进入某用户主页并停留了较长时间,那么系统便会判定我对该用户有兴趣,而这个用户可能同样吸引了他人的注意。因此,系统会将该用户的其他关注者以及他们所关注的其他用户推荐给我,因为我们有共同的兴趣爱好,所以他们的关注内容我也有可能感兴趣,这便是一种简单的实时推荐机制。
第二个案例是社区风控机制,小红书社区会对优质笔记或优质视频的创作者进行奖励,但这也给了一些羊毛党可乘之机,他们发布一些质量较低的帖子或笔记,将其发布在互刷群中,或者转发给亲朋好友,让他们点赞和转发,从而伪装成所谓的高质量笔记,以此来骗取平台的奖励。社区业务部门拥有一些离线算法,能够对已有的数据进行分析,识别出哪些用户和笔记属于作弊用户,在图中用红色的点标出。在近线场景中,系统会判断每个顶点在多跳关系内接触到的作弊用户的数量或比例,如果超过一定的阈值,则会将这个人标记为潜在的风险用户,即黄色的顶点,进而采取防范措施。
第三个案例是离线任务的调度问题,在大数据平台中,往往存在大量的离线任务,而任务之间的依赖关系错综复杂,如何合理地调度任务,成为一个棘手的问题。图结构非常适合解决这类问题,通过拓扑排序或其他算法,可以找出最受依赖的任务,并进行反向推理。
3. 业务上面临的困境
小红书在社交、风控及离线任务调度等场景中均采用了图数据库,然而在实际应用过程中遇到了诸多挑战。在此,简要介绍其中基于实时推荐场景的一个痛点。
业务诉求是能即时向用户推送可能感兴趣的“好友”或“内容”,如图所示,A 与 F 之间仅需经过三次跳跃即可到达,因此 A 与 F 构成了一种可推荐的关联关系,如果能即时完成此推荐,则能有效提升用户使用体验,提升留存率。然而,由于先前 REDgraph 在某些方面的能力尚未完善,业务一直只采用了一跳和两跳查询,未使用三跳,风控场景也是类似。
业务对时延的具体要求为,社交推荐要求三跳的 P99 低于 50 毫秒,风控则要求三跳的 P99 低于 200 毫秒,这是目前 REDgraph 所面临的一大难题。
那为何一至二跳可行,三跳及以上就难以实现呢?对此,我们基于图数据库与其他类型系统在工作负载的差异,做了一些难点与可行性分析。
首先在并发方面,OLTP 的并发度很高,而 OLAP 则相对较低。图的三跳查询,服务的仍然是在线场景,其并发度也相对较高,这块更贴近 OLTP 场景。
其次在计算复杂度方面,OLTP 场景中的查询语句较为简单,包含一到两个 join 操作就算是较为复杂的情况了,因此,OLTP 的计算复杂度相对较低。OLAP 则是专门为计算设计的,因此其计算复杂度自然较高。图的三跳查询则介于 OLTP 和 OLAP 之间,它虽不像 OLAP 那样需要执行大量的计算,但其访问的数据量相对于 OLTP 来说还是更可观的,因此属于中等复杂度。
第三,数据时效性方面,OLTP 对时效性的要求较高,必须基于最新的数据提供准确且实时的响应。而在 OLAP 场景中则没有这么高的时效要求,早期的 OLAP 数据库通常提供的是 T+1 的时效。图的三跳查询,由于我们服务的是在线场景,所以对时效性有一定的要求,但并不是非常高。使用一小时或 10 分钟前的状态进行推荐,也不会产生过于严重的后果。因此,我们将其定义为中等时效性。
最后,查询失败代价方面。OLTP 一次查询的成本较低,因此其失败的代价也低;而 OLAP 由于需要消耗大量的计算资源,因此其失败代价很高。图查询在这块,更像 OLTP 场景一些,但毕竟访问的数据量较大,因此同样归属到中等。
总结一下:图的三跳查询具备 OLTP 级别的并发度,却又有比一般 OLTP 大得多的数据访问量和计算复杂度,所以比较难在在线场景中使用。好在其对数据时效性的要求没那么高,也能容忍一些查询失败,所以我们能尝试对其优化。
正如前面提到的,在小红书,三跳查询的首要目标还是降低延迟。有些系统中会考虑牺牲一点时延来换取吞吐的大幅提升,而这在小红书业务上是不可接受的。如果吞吐上不去,还可以通过扩大集群规模来兜底,而如果时延高则直接不能使用了。
二、原架构问题分析
第二部分将详述原体系结构中所存在的问题及其优化措施。
1. RedGraph 整体架构
REDgraph 的整体结构如上图所示,其与当前较为流行的 NewSQL,如 TiDB 的架构构相似。采用了存储和计算分离的架构,并且存储是 shared-nothing 的。三类节点分别为 meta-server,元信息的管理;query-server,用户查询请求的处理;store-server,存储数据。
2. RedGraph 图切分方式
图切分的含义为,如果我们拥有一个巨大的图,规模在百亿到千亿水平,应该如何将其存储在分布式集群之中,以及如何对其进行切分。在工业界中,主要存在两种典型的切分策略,即边切分和点切分。
边切分,以顶点为中心,这种切分策略的核心思想是每个顶点会根据其 ID 进行哈希运算,并将其路由到特定的分片上。每个顶点上的每条边在磁盘中都会被存储两份,其中一份与起点位于同一分片,另一份则与终点位于同一分片。如上图中的例子,其中涉及到 ABC 三个顶点的哈希定位结果。在这个例子中,A 至 C 的这条出边,被放置在与 A 同一个节点上。同样,B 至 C 的出边跟 B 放到了一起,最后一个桶中保存了 C 以及 C 的入边,即由 A 和 B 指向 C 的两条入边。
点切分,与边切分相对应,以边为中心,每个顶点会在集群内保存多份。
这两种切分方式各有利弊。边切分的优点在于每个顶点与其邻居都保存在同一个分片中,因此当需要查询某个顶点的邻居时,其访问局部性极佳;其缺点在于容易负载不均,且由于节点分布的不均匀性,引发热点问题。点切分则恰恰相反,其优点在于负载较为均衡,但缺点在于每个顶点会被切成多个部分,分配到多个机器上,因此更容易出现同步问题。
REDgraph 作为一个在线的图查询系统,选择的是边切分的方案。
3. 优化方案 1.0
我们之前已经实施了一些优化,可以称之为优化方案 1.0。当时主要考虑的是如何快速满足用户需求,因此我们的方案包括:首先根据常用的查询模式提供一些定制化的算法,这些算法可以跳过解析、校验、优化和执行等繁琐步骤,直接处理请求,从而实现加速。其次,我们会对每个顶点的扇出操作进行优化,即每个顶点在向外扩展时,对其扩展数量进行限制,以避免超级点的影响,降低时延。此外,我们还完善了算子的下推策略,例如 filter、sample、limit 等,使其尽可能在存储层完成,以减少网络带宽的消耗。同时,我们还允许读从节点、读写线程分离、提高垃圾回收频率等优化。
然而,这些优化策略有一个共性,就是每个点都比较局部化和零散,因此其通用性较低。比如第一个优化,如果用户需要发起新的查询模式,那么此前编写的算法便无法满足其需求,需要另行编写。第二个优化,如果用户所需要的是顶点的全部结果,那此项也不再适用。第三个优化,如果查询中本身就不存在这些运算符,那么自然也无法进行下推操作。诸如此类,通用性较低,因此需要寻找一种更为通用,能够减少重复工作的优化策略。
4. 新方案思考
如上图中,是对一个耗时接近一秒的三跳查询的 profile 分析。我们发现在每一跳产出的记录数量上,第一跳至第二跳扩散了 200 多倍,第二跳至第三跳扩散了 20 多倍,表现在结果上,需要计算的数据行数从 66 条直接跃升至 45 万条,产出增长速度令人惊讶。此外,我们发现三跳算子在整个查询过程中占据了较大的比重,其在查询层的耗时更是占据了整个查询的 80% 以上。
那么应该如何进行优化呢?在数据库性能优化方面,有许多可行的方案,主要分为三大类:存储层的优化、查询计划的优化以及执行引擎的优化。
由于耗时大头在查询层,所以我们重点关注这块。因为查询计划的优化是一个无止境的工程,用户可能会写出各种查询语句,产生各种算子,难以找到一个通用且可收敛的方案来覆盖所有情况。而执行引擎则可以有一个相对固定的优化方案,因此我们优先选择了优化执行引擎。
图数据库的核心就是多跳查询执行框架,而其由于数据量大,计算量大,导致查询时间较长,因此我们借鉴了 MPP 数据库和其他计算引擎的思想,提出了分布式并行查询的解决方案。
原有的多跳查询执行流程如上图所示。假设我们要查询933 顶点的三跳邻居节点 ID,即检索到蓝圈中的所有顶点。经过查询层处理后,将生成右图所示执行计划,START 表示计划的起点,本身并无实际操作。GetNeighbor 算子则负责实际查询顶点的邻居,例如根据 933 找到 A 和 B。后续的 Project、InnerJoin 以及 Project 等操作均为对先前产生的结果进行数据结构的转换、处理及裁剪等操作,以确保整个计算流程的顺利进行。正是后续的这几个算子耗费的时延较高。
算子的物理执行过程如上图所示。查询服务器(Query Server)执行 START 指令后,将请求发送至存储节点(Store Server)中的一个,该节点获取其邻居信息,并反馈至查询层。查询层接收到结果后,会对其中的数据进行去重或其他相关处理,然后再次下发,此次的目标是另外两个 Store Server。这一步骤即为获取二度邻居的信息,返回至查询层后,再对这些结果进行汇总和去重处理,如此往复。
在整个流程中,我们明显观察到三个问题。首先,图中蓝色方框内的算子都是串行运行的,必须等待前一个计算完成后,才能执行下一个。对于大规模的数据,串行执行的效率显然无法与并行执行相提并论。其次,Query Server 内部存在一个同步点,即左侧标注为红色的字(等待所有响应返回),要求 query Server 等待所有存储节点的响应返回后,才能继续执行后续操作。若某一存储节点的数据量较大或负载过高,导致响应速度较慢,则会耗费大量时间在等待上,因此我们考虑取消同步等待的过程。最后,存储层的结果需要先转发回查询层进行简单处理,然后再向下发送,这无疑增加了不必要的转发成本。如果存储节点(Store Server)能够自行转发,便可避免一次网络转发过程,从而降低开销。
相应的解决策略便是三点:算子并行执行,取消同步点,以及让 Store Server 的结果直接转发。基于此,我们提出了如下的改造思路。
首先,查询服务器(Query Server)将整个执行计划以及执行计划所需的初始数据传输至存储服务器(Store Server),之后 Store Server 自身来驱动整个执行过程。以 Store Server 1 为例,当它完成首次查询后,便会根据结果 ID 所在的分区,将结果转发至相应的 Store Server。各个 Store Server 可以独立地继续进行后续操作,从而实现整个执行动作的并行化,并且无同步点,也无需额外转发。
需要说明的是,图中右侧白色方框比左侧要矮一些,这是因为数据由上方转到下方时,进行了分区下发,必然比在查询服务器接收到的总数据量要少。
可以看到,在各部分独立驱动后,并未出现等待或额外转发的情况,Query Server 只需在最后一步收集各个 Store Server 的结果并聚合去重,然后返回给客户端。如此一来,整体时间相较于原始模型得到了显著缩短。
三、分布式并行查询实现
分布式并行查询的具体实现,涉及到多个关键元素。接下来介绍其中一些细节。
1. 如何保证不对 1-2 跳产生负优化
首先一个问题是,在进行改造时如何确保不会对原始的 1-2 跳产生负优化。在企业内部进行新的改造和优化时,必须谨慎评估所采取的措施是否会对原有方案产生负优化。我们不希望新方案还未能带来收益,反而破坏了原有的系统。因此,架构总体上与原来保持一致。在 Store Server 内部插入了一层,称为执行层,该层具有网络互联功能,主要用于分布式查询的转发。Query Server 层则基本保持不变。
这样,当接收到用户的执行计划后,便可根据其跳数选择不同的处理路径。若为 1 至 2 跳,则仍沿用原有的流程,因为原有的流程能够满足 1-2 跳的业务需求,而 3 跳及以上则采用分布式查询。
2. 如何与原有执行框架兼容
第二个关键问题是如何维持与原有执行框架的兼容性,即在进行分布式技术改造时,不希望对原有代码进行大幅修改,而希望通过最小化的调整达到目的。这里参考了其他产品的一些思路,具体来说,就是在一些需要切换分区访问的算子(如 GetNeighbor 等)之前,添加具有路由功能的算子。这里有三种,分别为 Forward、Converge 和 Merge。Forward 的作用显而易见,即当遇到任何运算符时,表示数据需要转发给其他节点处理,而当前节点无法继续处理。Converge 运算符则是在整个执行计划的最后一步添加,用于指示最终结果应返回至最初接收用户请求的节点。在 Converge 后,还需添加一个 Merge 运算符,该节点在收到结果后需要进行聚合操作,然后才能将结果返回给客户端。如此修改后,我们只需实现这三个算子本身,无需对其他算子进行任何修改,且不会对网络层造成干扰,实现了极轻量级的改造。在执行计划修改的过程中,我们还进行了一些额外的优化,例如将 GroupBy、OrderBy 等算子也进行了下推处理。
3. 如何做热点处理
第三问题是如何进行热点处理,或者说是重复 ID 的处理。当整个执行流程改造成由 Store Server 自行驱动之后,会出现一种情况,例如边 AC 和边 BC 位于两个不同的 Store Server 上,查询都是单跳的操作,可能左侧的机器查询 AC 操作更快,而右侧的机器查询 BC 操作较慢,因此导致左侧的机器首先查找到 C,然后将结果转发给其他机器,向下一级中间机器查询 C 的邻居,即执行 GetNeighbor from C,右侧的节点虽然稍显滞后,但也需要执行查询 C 邻居操作。
若不进行任何操作,在中间节点便会对 C 的邻居进行两次查询,造成资源浪费。优化策略非常简单,即在每个存储节点之上添加 NeighborCache。本质是这样一个 Map 结构,每当读请求到来时,首先在 Map 中查找是否存在 C 的邻节点,若存在则获取,否则再访问存储层,访问完毕后填充 NeighborCache 的条目,每个条目的生存时间都非常短暂。之所以短暂,其充分性在于左右节点发出请求的间隔肯定不会很久,不会达到数秒的级别,否则业务上也无法承受。因此,NeighborCache 的每个条目也只需存活在秒级,超过则自动删除。必要性则在于 Map 的 Key 的组合模式,即 Vid+edgeType 这种组合模式还是非常多的,若不及时清理,内存很容易爆炸。此外,查询层从 Disk Store 中查询到数据并向 NeighborCache 回填时,也需进行内存检查,以避免 OOM。
4. 如何做负载均衡
第四个问题是怎么做负载均衡,包括两块,一个是存储的均衡,另一个是计算的均衡。
首先存储的均衡在以边切分的图存储里面其实是很难的,因为它天然的就是把顶点和其邻居全部都存在了一起,这是图数据库相比其他数据库的优势,也是其要承担的代价。所以目前没有一个彻底的解决方法,只能在真的碰到此问题时扩大集群规模,让数据的哈希打散能够更加均匀一些,避免多个热点都落在同一个机器的情况。而在目前的业务场景上来看,其实负载不均衡的现象不算严重,例如风控的一个比较大的集群,其磁盘用量最高和最低的也不超过 10%,所以问题其实并没有想象中的那么严重。
另外一个优化方法是在存储层及时清理那些过期的数据,清理得快的话也可以减少一些不均衡。
计算均衡的问题。存储层采用了三副本的策略,若业务能够接受弱一致的读取(实际上大多数业务均能接受),我们可以在请求转发时,查看三副本中的哪个节点负载较轻,将请求转发至该节点,以尽量平衡负载。此外,正如前文所述,热点结果缓存也是一种解决方案,只要热点处理速度足够快,计算的不均衡现象便不易显现。
5. 如何做流程控制
接下来的问题是如何进行流程控制。执行流程转变为由 Store Server 自行驱动之后,仅第一个 Stage 有 Driver 参与,而后续步骤则由 Worker 之间相互传输和控制。那么,Driver 应如何了解当前执行的阶段以及其对应的某个 Stage 何时可以开始执行呢?有一种解决方案便是要求每一个 Worker 在接收到请求后或下发请求后,向 Driver 回传一个响应,如此便可在 Driver 内记录所有节点的进度信息,这是可行的。
然而,此设计方案较重,因为 driver 并不需要深入了解每个节点的具体状态,它仅需判断自身是否具备执行条件,因此在工程实现中,我们采取了更为轻便的方式,即每个 Stage 生成一个 32 位的二进制数字 reqId,将其发送至 ACKer 确认器以传达相关信息。Acker 也以 32 位整数形式记录该信息,Stage1 同样会接收到 Stage 0 发来的 reqId,经过内部一系列处理后,它会将接收到的 reqId 与自身生成的 3 个 reqId 进行异或运算,并将异或结果再次发送至确认器。由于异或操作的特性,当两个数相同时,结果为 0,因此,当 0010 数进行异或运算后,这部分将变为 0。这就意味着 Stage 0 已经执行完毕。后续的所有阶段均采用类似的方式,当确认器的结果再次变为 0 时,表示整个执行流程已经完成,即前面的 Stage 0 至 Stage 3 已经读取完毕,此时可以执行 Stage 4,从而实现流程驱动。
另一个重要的问题便是全程链路的超时自检,例如在 Stage2 或 Stage3 的某一个节点上运行时间过长,此时不能让其余所有节点一直等待,因为客户端已经超时了。因此我们在每个算子内部的执行逻辑中都设置了一些埋点,用以检查算子的执行是否超过了用户侧的限制时间,一旦超过,便立即终止自身的执行,从而迅速地自我销毁,避免资源的无谓浪费。
以上就是对一些关键设计的介绍。
6. 性能测试
我们在改造工程完成后进行了性能测试,采用 LDBC 组织提供的 SNB 数据集,生成了一个 SF100 级别的社交网络图谱,规模达到 3 亿顶点,18 亿条边。我们主要考察其一跳、二跳、三跳、四跳等多项查询性能。
根据评估结果显示,在一跳和二跳情况下,原生查询和分布式查询性能基本相当,未出现负优化现象。从三跳起,分布式查询相较于原生查询能实现 50% 至 60% 的性能提升。例如,在 Max degree 场景下的分布式查询已将时延控制在 50 毫秒以内。在带有 Max degree 或 Limit 值的情况下,时延均在 200 毫秒以下。尽管数据集与实际业务数据集存在差异,但它们皆属于社交网络领域,因此仍具有一定的参考价值。
四跳查询,无论是原始查询还是分布式查询,其时延的规模基本上都在秒至十余秒的范围内。因为四跳查询涉及的数据量实在过于庞大,已达到数十万甚至百万级别,仅依赖分布式并行查询难以满足需求,因此需要采取其他策略。然而,即便如此,我们所提出的改进方案相较于原始查询模式仍能实现 50% 至 70% 的提升,效果还是很可观的。
四、总结与展望
我们结合 MPP 的思想,成功地对原有 REDgraph 的执行流程实现了框架级别上的革新,提出了一种较为通用的图中分布式并行查询方案。在完成改良后,至少在业务层面上,原本无法执行的三跳任务现在得以实现,这无疑是一项重大突破。同时,通过实验验证,效率得到了 50% 的显著提升。
随着小红书 DAU 的持续攀升,业务数据规模正逐步向着万亿的规模发展。在这样的大背景下,业务对多条查询的需求也将日益强烈。因此,该方案本身具有优化的潜力,具备落地的可能性,且有实际应用的场景。因此,我们将继续致力于提升 REDgraph 的查询能力。另外,尽管该方案主要在图数据库上实施,但其思想对于其他具有类似重查询需求的在线存储系统同样具有一定的参考价值。因此,其他产品也可借鉴此方案,设计出符合自身需求的高效执行框架。
最后。我们诚挚地邀请对技术有着极致追求、志同道合的同学们加入我们的团队。在此,我们特别推荐两个渠道:一是扫描上方二维码加入微信群,共同探讨图数据库相关的技术问题;二是关注小红书的技术公众号 REDtech,该公众号会不定期发布技术文章,欢迎大家关注和转发。
五、问答环节
Q:介绍中提到的 LDBC-SF 100 那个数据集选择测试样本的规模有多大?另外,分布式方式能够提升性能,但分布式实施过程中可能会带来消息通信的成本开支,反而可能导致测试结果表现不佳,可否介绍一下小红书的解决方法。
A:三跳基本上都是在几十万的量级。
关于分布式引发的消息通信,这确实是一个问题,但在我们的场景下,目前这还不是最严重的问题。因为每一跳,特别是三跳中产生的数据量是巨大的,计算算子处理这些数据量所需的时间已经远远超过了消息通信的耗时。尤其是在多跳并存的环境中,比如一跳和二跳,其实它们作为中间结果其数据量并不大,一跳只有几十上百个,二跳可能也就几万个,但是三跳作为最后的需要参与计算的结果直接到了几十万,所以通信开销跟这个比起来,其实是非常微小的。
在消息通信方面,我们也有一些解决思路,比如在发送端开一些很小的窗口(比如 5 毫秒)来做一些聚合,把那些目标点相同的请求进行聚合,这样可以减少一些通信的请求次数。