图片来自 Pexels
eBay 广告数据平台为 eBay 第一方广告主(使用 Promoted Listing 服务的卖家)提供了广告流量、用户行为和效果数据分析功能。
广告卖家通过卖家中心(Seller Hub)的营销标签页、效果标签页和公开API,有效掌控和对比店铺的营销活动和推广商品的流量、销量的实时和历史数据,并通过网页或者 API 下载数据分析报告。
这一系统上线之初使用了自研的分布式 SQL 引擎,构建在对象存储系统之上。3 年前随着广告流量增加,我们把数据引擎切换到 Druid 上。
这一平台的主要挑战如下:
- 数据量大:每日的插入数据记录有数百亿条,每秒的插入峰值接近一百万条。
- 离线数据摄入:在不影响实时数据摄入的情况下,每天需要对前 1-2 天的数据进行在线替换。
根据上游数据团队发布清洗过的每日数据,广告数据平台需要在不影响查询的情况下每日替换实时数据,数据切换要求实现跨节点的全局原子操作。
- 完整性和一致性:面向卖家的财务数据,离线更新后的数据要求不能有遗漏和重复;实时数据要求端对端的延迟在十秒内。
Druid vs ClickHouse
Druid 于 2011 年由 Metamarkets 开发,是一款高性能列式在线分析和存储引擎。它于 2012 年开源,2015 年成为 Apache 基金会旗下项目。
Druid 在业界使用广泛,为千亿级数据提供亚秒级的查询延迟,擅长高可用、水平扩展。
另外为数据摄入提供了很多非常方便的聚合、转换模版,内建支持多种数据源,最快可以在几十分钟内配置好新的数据表,包括数据定义和数据摄入链路(Lambda 架构),大大提高了开发效率。
ClickHouse 由俄罗斯最大的搜索引擎公司 Yandex 研发,设计目标是支持 Yandex.Metrica(世界第二大 Web 分析平台)生成用户分析报表等核心功能。
ClickHouse 是一个数据库管理系统(DBMS),有数据库、表、视图、DDL、DML 等概念,并提供了较为完整的 SQL 支持。
其核心特性有如下几点:
- 高效的数据存储:通过数据压缩和列式存储,可以达到最高 10 倍的数据压缩率。
- 高效的数据查询:通过主键索引、向量化引擎处理、多处理器并发和分布式查询,最大压榨 CPU 的所有能力,在中小规模的数据量上尤为突出。
- 灵活的数据定义和接入:通过支持 SQL 语言、JDBC 和关系模型,降低学习和迁移成本,可以和其他现有数据的产品无缝集成。
为什么迁移?
运维
Druid 虽然提供了很多非常方便的数据摄入功能,但它的组件构成也较为复杂,节点类型有 6 种(Overload,Coordinator,Middle Manager,Indexer,Broker 和 Historical)。
除了自身的节点,Druid 还依赖于 MySQL 存储元数据信息、Zookeeper 选举 Coordinator 和 Overlord、HDFS 备份历史数据。
ClickHouse 的架构采用了对等节点的设计,节点只有一种类型,没有主从节点。如果使用了副本功能,则依赖于 Zookeeper 保存数据段的同步进度。
与此同时,eBay 的基础架构团队提出在定制 ClickHouse 的基础上,向产品团队提供列式数据库存储的服务。
除了运维和生命周期管理,基础架构团队对 ClickHouse 进行改造和二次开发,进一步提高了数据摄入和存储的效率,并在离线摄入方面弥补了和 Druid 的功能差距。
延时数据插入
Druid 通过引入实时数据的索引任务,把实时数据处理成一个个分段数据(segment),并归档成历史数据。成为分段数据之后,该时段数据即不可写入。
由于并发实时索引任务数的限制,我们设置了 3 个小时的窗口长度(每个小时一个任务),因此超过 3 个小时的数据就无法写入。
在某些极端情况下,例如上游数据延迟或者实时数据消费过于滞后,就会导致离线数据替换前这部分数据的缺失。ClickHouse 则没有这个限制,任意分区都可以随时写入。
主键优化
ClickHouse 支持的主键并不是传统意义下关系型数据库的主键。传统的主键要求每条表记录都有唯一的键值,通过查询主键可以唯一地查询到一条表记录。
而在 ClickHouse 中,主键定义了记录在存储中排序的顺序,允许重复,所以称之为排序键似乎更加合理。
事实上在 ClickHouse 里的主键定义通过 ORDER BY 声明,仅在个别场景中允许和排序键不一致(但必须是排序键的前缀)。
由于我们的产品是给卖家提供分析功能,几乎所有的查询限定在了单一卖家维度,因此通过主键按照卖家排序,可以极大地提高查询效率以及数据压缩率。
系统架构
图 1
如图 1 所示,系统由 4 个部分组成:
- 实时数据获取模块,接入 eBay 的行为和交易实时消息平台。
- 离线数据替换模块,接入 eBay 内部的数据仓库平台。
- ClickHouse 部署和外围数据服务。
- 报表服务,支撑广告主、商家后台和 eBay 公开 API。
实战经历
Schema 设计
ClickHouse 提供了丰富的 Schema 配置。这方面需要根据业务场景和数据模式反复斟酌和多次试验,因为不同的选择会对存储和性能有数量级的影响,一个错误的选择会导致后期巨大的调优和变更成本。
①表引擎
ClickHouse 的存储引擎的核心是合并树(MergeTree),以此为基础衍生出:
- 汇总合并树(SummingMergeTree)
- 聚合合并树(AggregationMergeTree)
- 版本折叠树(VersionCollapsingTree)等常用的表引擎
另外上述所有的合并树引擎都有复制功能(ReplicatedXXXMergeTree)的对应版本。
我们的广告数据平台的展示和点击数据选择了复制汇总合并树。这两类用户行为数据量极大,减小数据量节省存储开销并提升查询效率是模式设计的主要目标。
ClickHouse 在后台按照给定的维度汇总数据,降低了 60% 的数据量。
销售数据选择了普通的复制合并树,一方面由于销售数据对某些指标有除汇总以外的聚合需求,另一方面由于本身数据量不大,合并数据的需求并不迫切。
②主键
一般情况下,ClickHouse 表的主键(Primary Key)和排序键(Order By Key)相同,但是采用了汇总合并树引擎(SummingMergeTree)的表可以单独指定主键。
把一些不需要排序或者索引功能的维度字段从主键里排除出去,可以减小主键的大小(主键运行时需要全部加载到内存中),提高查询效率。
③压缩
ClickHouse支持列级别的数据压缩,显著地减少原始数据的存储量,这也是列存储引擎的巨大优势。查询阶段,较小的存储占用也可以减少 IO 量。
对不同列选择一种合适的压缩算法和等级,能把压缩和查询的平衡做到性价比最优。
ClickHouse 的所有列默认使用 LZ4 压缩。除此以外,一般的数据列可以选择更高压缩率的算法如 LZ4HC,ZSTD。
而对于类似时间序列的单调增长数据可以选择 DoubleDelta,Gorilla 等特殊压缩算法。
LZ4HC 和 ZSTD 等高压缩率的算法还可以自己选择压缩级别。在我们的生产数据集上,ZSTD 算法对 String 类型字段压缩效果较为显著。LZ4HC 是 LZ4 的高压缩比改进版,更适用于非字符串类型。
更高的压缩率意味着更少的存储空间,同时由于降低了查询的 IO 量,可以间接提升查询性能。
不过 CPU 也不是大风刮来的,数据的插入性能就成了牺牲品。根据我们内部测试的数据,在我们的生产数据集上使用 LZ4HC(6) 相比 LZ4 可以节省 30% 的数据,但实时数据摄取性能下降了 60%。
④低基
值得一提的是,对于基数较低的列(即列值多样性低),可以使用 LowCardinality 来降低原始存储空间(从而降低最终存储空间)。
如果在使用压缩算法的情况下对一字符串类型的列使用 LowCardinality,还能再缩小 25% 的空间量。
在我们的测试数据集上,如果整表组合使用 LowCardinality、LZ4HC(6) 和 ZSTD(15),整体压缩比大约在原来的 13% 左右。
离线数据替换
①挑战
针对广告主的数据报表要求数据准确、一致。实时的行为数据存在少量的 bot 数据(需要离线清除),另外广告的归因也需要在离线阶段重新调整。
因此我们引入了离线数据链路,在实时数据写入 24-72 小时之后,用离线数据替换实时数据。
其中的挑战如下:
- 广告系统每天需要处理的用户离线数据量近 1TB,在此之前,需要耗费大量时间将数据从 Hadoop 导入 Druid。
另外,导入期间的 I/O、CPU 和内存的开销对查询的压力不小。如何在保证数据一致性的同时,亦确保数据迁移的效率,是问题的关键。
- 如何在数据替换期间,确保用户可见的数据波动最小。这就要求数据替换操作是原子性的,或者至少对每个广告主都是原子的。
- 除了日常的离线数据更新,在数据仓库数据出现偏差遗漏时,需要支持大范围的数据修正和补偿。
作业调度要求保证日常工作及时完成,并尽快完成数据修正工作。此外还需要监控数据更新中的各种指标,以应对各种突发状况。
Druid 原生支持数据离线更新服务,我们与基础架构团队合作,在 ClickHouse 平台实现了这一功能。
②数据架构
对于整合在线数据和离线数据的大数据架构,业界通常的做法是 Lambda 架构。即离线层和在线层分别导入数据,在展示层进行数据的合并。
我们也大致上采用了这一架构。但具体的做法和经典有所不同。
ClickHouse 里数据分区(partition)是一个独立的数据存储单元,每一个分区都可以单独从现有表里脱离(detach)、引入(attach)和替换(replace)。
分区的条件可以自定义,一般按照时间划分。通过对数据表内数据分区的单个替换,我们可以做到查询层对底层数据更新的透明,也不需要额外的逻辑进行数据合并。
③Spark 聚合与分片
为了降低 ClickHouse 导入离线数据性能压力,我们引入了 Spark 任务对原始离线数据进行聚合和分片。每个分片可以分别拉取并导入数据文件,节省了数据路由、聚合的开销。
④数据更新任务管理
锁定分区拓扑结构:在处理数据前,离线数据更新系统向基础架构团队提供的服务请求锁定 ClickHouse 的分区拓扑结构,在此期间该分区的拓扑结构不会改变。
服务端根据预先定义好的数据表结构与分区信息返回数据的分片逻辑与分片 ID。
离线数据更新系统根据拓扑信息提交 Spark 任务。多张表的数据处理通过 Spark 并行完成,显著提升了数据更新的速度。
数据聚合与分片:对于每一张需要更新的表,启动一个 Spark 任务对数据进行聚合与分片。
根据 ClickHouse 服务端返回的表结构与分片拓扑将数据写入 Hadoop,同时输出数据替换阶段用于校验一致性的 checksum 与分片行数。
系统通过 Livy Server API 提交并轮询任务状态,在有任务失败的情况下进行重试,以排除 Spark 集群资源不足导致的任务失败。
离线数据更新不但要满足每天的批量数据更新需求,还需要支持过往数据的再次更新,以便同步上游数据在日常定时任务更新之外的数据变动。
我们利用平台团队封装的 Spring Batch 管理更新任务,按照日期将每天的数据划分为一个子任务。
通过 Spring Batch 实现的 Continuously Job 保证在同一时刻子任务在运行的唯一性,避免产生任务竞争问题。
对于过往数据的更新,我们将 Batch 任务分类,除了日常任务之外,还可以手动触发给定时间范围内的数据修正任务(如图 2)。
图 2
数据替换:在子任务中的所有 Spark Job 完成后,离线数据更新系统会调用基础架构团队提供的数据替换接口,发起数据替换请求。
服务端按照定义好的分区,将数据从 Hadoop 直接写入 ClickHouse,如图 3 所示。
图 3
离线数据更新系统的架构如图 4 所示:
图 4
MySQL 数据库用于记录数据替换过程中任务的状态与优先级,当 Spark Job 失败或者由于其他原因导致替换任务失败重启后,恢复任务的进度。
⑤原子性与一致性
为了保证数据替换的原子性,基础架构团队提供了分区替换的方式。在离线数据导入的过程中,首先创建目标分区的临时分区。当数据替换完毕并且校验完成之后,目标分区会被临时分区替换。
针对不同机器上不同分片的原子性替换问题,基础架构团队为每一条数据引入了数据版本。
对于每一个数据分区,都有对应的活跃版本号。直到待替换数据分区的所有分片都成功导入之后,分区的版本号进行更新。
上游应用的同一条 SQL 只能读取同一分区一个版本的数据,每个分区的数据替换只感觉到一次切换,并不会出现同时读取新旧数据的问题。
广告平台报表生成应用因此在 SQL 层面引入了相应的修改,通过引入固定的 WITH 和 PREWHERE 语句,在字典中查询出每个数据分区对应的版本号,并在查询计划中排除掉不需要的数据分区。
为了确保数据替换的一致性,在完成 Spark 数据处理之后,离线数据更新系统会计算各数据分片的校验码与数据总量。
当替换完毕之后,ClickHouse 服务端会对分片数据进行校验,确保在数据搬迁过程中没有数据丢失和重复。
数据查询
ClickHouse 支持 SQL 查询(不完全),有 HTTP 和 TCP 两种连接方式,官方和第三方的查询工具和库丰富。
用户可以使用命令行,JDBC 或者可视化工具快速进行数据查询的开发和调试。
ClickHouse 通过 MPP(Massively Parallel Processing)+SMP(Symmetric Multiprocessing)充分地利用机器资源,单条查询语句默认使用机器核数一半的 CPU。
因此 ClickHouse 不支持高并发的应用场景。在业务使用层面,最核心的问题是查询校验和并发控制,单条过大的查询或者过高的并发都会导致集群资源使用率过高,影响集群稳定性。
应用架构
eBay Seller Hub 通过 Reports Service 接入 ClickHouse 查询,Reports Service 提供了 Public 和 Internal 两套 API。
Internal API 提供给 Seller Hub 以及其他内部的已知应用使用,Public API 在 eBay Developers Program 开放给第三方开发者,详情见:
- https://developer.ebay.com/
图 5
Internal API 的查询直接提交内部线程池执行,线程池的大小根据 ClickHouse 的集群机器数量设置。查询请求执行前会进行校验,过滤所有非法以及资源不可预估的请求。
Public API 通过任务提交的方式异步执行查询,用户提交的查询任务存入 DB 中,Service 内部的 Schedule 定时扫表,根据任务的状态串行执行查询任务。
执行成功的任务上传生成 Report 到文件服务器,用户拿到 URL 后自行下载。执行失败的任务,根据错误类型(非法的请求,资源不足等)来选择是否在下一个周期再次执行。
测试发布
在生产环境部署完成后,我们开启了数据双写,往 ClickHouse 里不断地插入实时数据和离线数据,直到达到 Druid 的数据水平。
在数据一致性验证过后,我们镜像了一份生产服务的查询,然后把这些查询转发给 ClickHouse。
通过收集和对比 Druid 和 ClickHouse 的响应,我们能够验证 ClickHouse 链路的数据质量和查询性能。
之后的灰度阶段,我们逐渐提升 ClickHouse 服务生产系统的比例,并保持 Druid 继续运行,以保证出现问题可以及时回滚。
查询 GUI
数据可视化方面,我们需要提供类似 Turnilo 的可视化工具给开发、测试和 BI 人员使用。
ClickHouse 支持多种商业和开源的产品接入,我们选用了 Cube.JS,并进行了简单的二次开发。
图 6
总结
本文介绍了广告数据平台的基本情况,ClickHouse/Druid 的特点对比和团队使用 ClickHouse 替换 Druid 的架构方案。
ClickHouse 表现出了良好的性能和扩展能力,并且还在快速的迭代更新。
目前项目已经上线,接下来我们还会和大家继续分享过程中的碰到的一些问题和解决方法,欢迎大家持续关注。
作者:吴寒思、周路、余何
编辑:陶家龙
出处:转载自公众号eBay技术荟(ID:eBayTechRecruiting)