跳到主要内容

13 篇博文 含有标签「用户案例」

查看所有标签

用户案例

导读: Apache Doris 是小米集团内部应用最为广泛的 OLAP 引擎之一,本文主要从数据的角度分析 A/B 实验场景查询的性能现状,探讨基于 Apache Doris 的性能优化的解决方案。经过一系列基于 Doris 的性能优化和测试,A/B 实验场景查询性能的提升超过了我们的预期。希望本次分享可以给有需要的朋友提供一些参考。

作者|小米集团大数据工程师 乐涛

业务背景

A/B 实验是互联网场景中对比策略优劣的重要手段。为了验证一个新策略的效果,需要准备原策略 A 和新策略 B 两种方案。随后在总体用户中取出一小部分,将这部分用户完全随机地分在两个组中,使两组用户在统计角度无差别。将原策略 A 和新策略 B 分别展示给不同的用户组,一段时间后,结合统计方法分析数据,得到两种策略生效后指标的变化结果,并以此判断新策略 B 是否符合预期。

图片

小米 A/B 实验平台是一款通过 A/B 实验的方式,借助实验分组、流量拆分与科学评估来辅助完成科学的业务决策,最终实现业务增长的一款运营工具产品。其广泛的应用于产品研发生命周期中各个环节:

图片

数据平台架构

本文主要从数据的角度分析 A/B 实验场景查询的性能现状,探讨一下基于 Apache Doris 的性能优化的解决方案。A/B实验平台的架构如下图所示:

图片

  • 平台使用的数据主要包含平台自用的实验配置数据、元数据,以及业务方上报的日志数据。
  • 由于业务方引入 SDK,并与分流服务进行交互,日志数据中包含其参与的实验组 ID 信息。
  • 用户在实验平台上配置、分析、查询,以获得报告结论满足业务诉求。

鉴于 AB 实验报告各个业务方上报数据的链路都大体类似,我们就拿头部业务方广告业务举例,数据流程如下图所示:

图片

从上图可知,整个数据链路并不复杂,日志数据传入后,经过必要的数据处理和清洗工作进入 Talos(小米自研消息队列),通过 Flink 任务以明细数据的形式实时写入到 Doris 表中,同时 Talos 数据也会同步到 Hive 表进行备份,以便问题排查和数据修复。

出于对高效写入以及字段增减需求的考虑,Doris 明细表以 Duplicate 模型来建模

CREATE TABLE `dwd_xxxxxx` (
`olap_date` int(11) NULL COMMENT "分区日期",
`user_id` varchar(256) NULL COMMENT "用户id",
`exp_id` varchar(512) NULL COMMENT "实验组ID",
`dimension1` varchar(256) NULL COMMENT "",
`dimension2` varchar(256) NULL COMMENT "",
......
`dimensionN` bigint(20) NULL COMMENT "",
`index1` decimal(20, 3) NULL COMMENT "",
......
`indexN` int(11) NULL COMMENT "",

) ENGINE=OLAP
DUPLICATE KEY(`olap_date`, `user_id`)
COMMENT "OLAP"
PARTITION BY RANGE(`olap_date`)
(
PARTITION p20221101 VALUES [("20221101"), ("20221102")),
PARTITION p20221102 VALUES [("20221102"), ("20221103")),
PARTITION p20221103 VALUES [("20221103"), ("20221104"))
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 300
;
数据现状分析

在提速之前,小米 A/B 实验平台完成实验报告查询的 P95 时间为小时级,实验报告使用数据的方式存在诸多的性能问题,直接影响业务部门做运营和决策的效率。

报告查询基于明细

当前报告查询的数据来源为明细表,而明细表的数据量巨大:

图片

而且,实验报告的查询条件中时间范围常常横跨多天。基于历史查询报告统计,查询条件中时间范围大于一天的报告占比 69.1%,具体的时间跨度占比分布如下:

图片

明细数据的巨大扫描量给集群带来了不小的压力,且由于报告查询存在并发以及 SQL 的拆分,如果一个 SQL 请求不能快速的返回结果释放资源,也会影响到请求的排队状况。因此在工作时间段内 Doris 集群BE节点 CPU 负载状况基本是持续满载,磁盘 IO 也持续处于高负荷状态,如下图所示:

图片

BE节点CPU使用率

图片

BE节点磁盘IO

个人思考:

  • 当前报告所有查询基于明细数据,且平均查询时间跨度为 4 天,查询扫描数据量上百亿。由于扫描数据量级大,计算成本高,给集群造成较大压力,导致数据查询效率不高。
  • 如果通过对数据进行预聚合处理,控制 Scan Rows 和 Scan Bytes,减小集群的压力,查询性能会大幅提升。

字段查询热度分层分布

由于之前流程管控机制相对宽松,用户添加的埋点字段都会进入到明细表中,导致字段冗余较多。统计历史查询报告发现,明细表中常用的维度和指标只集中在部分字段,且查询热度分层分布:

图片

图片

参与计算的指标也集中在部分字段,且大部分都是聚合计算(sum)或可以转化为聚合计算(avg):

图片

个人思考:

  • 明细表中参与使用的维度只占 54.3%,高频使用的维度只占 15.2%,维度查询频次分层分布。
  • 数据聚合需要对明细表中维度字段做取舍,选择部分维度进行上卷从而达到合并的目的,但舍弃部分字段必然会影响聚合数据对查询请求的覆盖情况。而维度查询频次分层分布的场景非常适合根据维度字段的热度做不同层次的数据聚合,同时兼顾聚合表的聚合程度和覆盖率。

实验组 ID 匹配效率低

当前明细数据的格式为:

图片

明细数据中的实验组 ID 以逗号分隔的字符串形式聚拢在一个字段中,而实验报告的每条查询语句都会使用到exp_id过滤,查询数据时使用 LIKE 方式匹配,查询效率低下。

个人思考:

  • 将实验组 ID 建模成一个单独的维度,可使用完全匹配代替 LIKE 查询,且可利用到 Doris 索引,提高数据查询效率。
  • 将逗号分隔的实验组 ID 直接打平会引起数据量的急剧膨胀,因此需要设计合理的方案,同时兼顾到数据量和查询效率。

进组人数计算有待改进

进组人数查询是实验报告的必查指标,因此其查询速度很大程度上影响实验报告的整体查询效率,当前主要问题如下:

  • 当进组人数作为独立指标计算时,使用近似计算函数APPROX_COUNT_DISTINCT处理,是通过牺牲准确性的方式提升查询效率。
  • 当进组人数作为复合指标的分母进行计算时,使用COUNT DISTINCT处理,此方式在大数据量计算场景效率较低。

个人思考:

  • AB实验报告的数据结论会影响到用户决策,牺牲准确性的方式提升查询效率是不可取的,特别是广告这类涉及金钱和业绩的业务场合,用户不可能接受近似结果。
  • 进组人数使用的COUNT DISTINCT计算需要依赖明细信息,这也是之前查询基于明细数据的重要因素。必须为此类场景设计新的方案,使进组人数的计算在保证数据准确的前提下提高效率。

数据优化方案

基于以上的数据现状,我们优化的核心点是将明细数据预聚合处理,通过压缩数据来控制 Doris 查询的 Scan Rows 和 Scan Bytes。与此同时,使聚合数据尽可能多的覆盖报告查询。从而达到,减小集群的压力,提高查询效率的目的。

新的数据流程如下图所示:

图片

整个流程在明细链路的基础上增加聚合链路,Talos 数据一方面写入 Doris 明细表,另一方面增量落盘到 Iceberg 表中,Iceberg 表同时用作回溯明细数据以及生成聚合数据,我们通过工场 Alpha(小米自研数据开发平台)的实时集成和离线集成保证任务的稳定运行和数据的一致性。

选取高频使用维度聚合

在生成数据聚合的过程中,聚合程度与请求覆盖率是负相关的。使用的维度越少,能覆盖的请求就越少,但数据聚合程度越高;使用的维度越多,覆盖的请求也越多,但数据粒度就越细,聚合程度也越低。因此需要在聚合表建模的过程中取得一个平衡。

我们的具体做法是:拉取历史(近半年)查询日志进行分析,根据维度字段的使用频次排序确认进入聚合表的优先级。在此基础上得出聚合表的覆盖率和数据量随着建模字段增加而变化的曲线,如下图所示:

图片

其中覆盖率根据历史请求日志代入聚合表计算得出。

我们的原则是:针对 OLAP 查询,聚合表的数据量应尽可能的控制在单日 1 亿条以内,请求覆盖率尽可能达到 80%以上

因此不难得出结论:选择 14 个维度字段对聚合表建模比较理想,数据量能控制到单日 8 千万条左右,且请求覆盖率约为 83%

使用物化视图

在分析报告历史查询日志时,我们发现不同的维度字段查询频次有明显的分层:

图片

Top7 维度字段几乎出现在所有报告的查询条件之中,对于如此高频的查询,值得做进一步的投入,使查询效率尽可能的提升到最佳。Doris 的物化视图能够很好的服务于此类场景。

什么是物化视图?

物化视图是一种特殊的物理表,其中保存基于基表(base table)部分字段进一步上卷聚合的结果。

虽然在物理上独立存储,但它是对用户透明的。为一张基表配置好物化视图之后,不需要为其写入和查询做任何额外的工作:

  • 当向基表写入和更新数据时,集群会自动同步到物化视图,并通过事务方式保证数据一致性。
  • 当对基表进行查询时,集群会自动判断是否路由到物化视图获取结果。当查询字段能被物化视图完全覆盖时,会优先使用物化视图。

因此我们的查询路由如下图所示:

图片

用户的查询请求会尽可能的路由到聚合表物化视图,然后是聚合表基表,最后才是明细表。

如此使用多梯度的聚合模型的配合来应对热度分层的查询请求,使聚合数据的效能尽可能的发挥到最大。

精确匹配取代 LIKE 查询

既然物化视图这么好用,为什么我们不是基于 Doris 明细表配置物化视图,而是单独开发聚合表呢?是因为明细数据中的实验组ID字段存储和查询方式并不合理,聚合数据并不适合通过明细数据直接上卷来得到。

上文中已经提到,exp_id(实验组ID)在明细表中以逗号分隔的字符串进行存储,查询数据时使用 LIKE 方式匹配。作为 AB 实验报告查询的必查条件,这种查询方式无疑是低效的。

我们希望的聚合方式如下图所示:

图片

我们需要将exp_id字段拆开,把数据打平,使用精确匹配来取代LIKE查询,提高查询的效率。

控制聚合表数据量

如果只做拆分打平的处理必然会导致数据量的激增,未必能达到正向优化的效果,因此我们还需要想办法来压缩exp_id打平后的数据量:

  • 聚合表选取维度字段建模的时候,除了上文提到的,以字段的使用频次热度作为依据之外,也要关注字段的取值基数,进行综合取舍。如果取值基数过高的维度字段进入聚合表,必然会对控制聚合表的数据量造成阻碍。因此,我们在保证聚合表请求覆盖量的前提下,酌情舍弃部分高基数(取值有十万种以上)的维度。
  • 从业务的角度尽可能过滤无效数据(比如一个实验组的流量为 0% 或者 100%,业务上就没有对照的意义,用户也不会去查,这样的数据就不需要进入聚合表)。

经过这一系列步骤,最终聚合表的数据量被控制在单日约 8000 万条,并没有因为 exp_id打平而膨胀。

值得一提的是,exp_id字段拆分后,除了查询从LIKE匹配变为精确匹配,还额外带来了两项收益:

  • 字段从String类型变为Int类型,作为查询条件时的比对效率变高。
  • 能利用Doris的前缀索引布隆过滤器等能力,进一步提高查询效率。

使用 BITMAP 去重代替 COUNT DISTINCT

要提速实验报告查询,针对进组人数(去重用户数)的优化是非常重要的一个部分。作为一个对明细数据强依赖的指标,我们如何在不丢失明细信息的前提下,实现像 Sum,Min,Max 等指标一样高效的预聚合计算呢?BITMAP 去重计算可以很好的满足我们的需求。

什么是BITMAP去重?

BITMAP 去重简单来说就是建立一种数据结构,表现形式为内存中连续的二进制位(bit),参与去重计算的每个元素(必须为整型)都可以映射成这个数据结构的一个 bit 位的下标,如下图所示:

图片

计算去重用户数时,数据以 bit_or的方式进行合并,以bit_count的方式得到结果。更重要的是,如此能实现去重用户数的预聚合。BITMAP 性能优势主要体现在两个方面:

  • 空间紧凑:通过一个 bit 位是否置位表示一个数字是否存在,能节省大量空间。以 Int32 为例,传统的存储空间为 4 个字节,而在 BITMAP 计算时只需为其分配 1/8 字节(1个 bit 位)的空间。
  • 计算高效:BITMAP 去重计算包括对给定下标的 bit 置位,统计 BITMAP 的置位个数,分别为 O(1) 和 O(n) 的操作,并且后者可使用 CLZ,CTZ 等指令高效计算。此外,BITMAP 去重在 Doris 等 MPP 执行引擎中还可以并行加速处理,每个节点各自计算本地子 BITMAP,而后进行合并。

当然,以上只是一个简化的介绍,这项技术发展至今已经做了很多优化实现,比如RoaringBitmap,感兴趣的同学可以看看:https://github.com/RoaringBitmap/RoaringBitmap

全局字典

要实现 BITMAP 去重计算,必须保证参与计算的元素为 UInt32 / UInt64,而我们的user_idString类型,因此我们还需设计维护一个全局字典,将user_id映射为数字,从而实现 BITMAP 去重计算。

由于聚合数据目前只服务于离线查询,我们选择基于Hive表实现全局字典,其流程如下:

图片

指标聚合

生成 Doris 聚合表时,将 user_id作为查询指标以 BITMAP 类型来存储,其他常规查询指标则通过 COUNT/SUM/MAX/MIN 等方式聚合:

图片

如此明细表和聚合表的指标计算对应关系如下:

图片

优化效果

SQL视角

查询请求转换成 SQL 之后,在明细表和聚合表的表现对比如下:

图片

  • 常规聚合指标查询的性能提升自不必说(速度提升 50~60 倍)
  • 进组人数查询性能的提升也非常可观(速度提升 10 倍左右)

集群视角

SQL 查询的快进快出,使查询占用的资源能快速释放,对集群压力的缓解也有正向的作用。

Doris 集群 BE 节点 CPU 使用情况和磁盘IO 状况的改变效果显著:

图片

需要说明的是,集群状况的改善(包括实验报告查询 P95 提升)并不全归功于数据预聚合优化工作,这是各方合力协作(如产品业务形态调整,后端查询引擎排队优化,缓存调优,Doris 集群调优等)的综合结果。

小技巧

由于业务查询需求的多样,在查询明细表时,会出现一个字段既作为维度又作为指标来使用的情况。

如广告业务表中的targetConvNum(目标转化个数)字段,此字段的取值为 0 和 1,查询场景如下:

--作为维度
select targetConvNum,count(distinct user_id)
from analysis.doris_xxx_event
where olap_date = 20221105
and event_name='CONVERSION'
and exp_id like '%154556%'
group by targetConvNum;
--作为指标
select sum(targetConvNum)
from analysis.doris_xxx_event
where olap_date = 20221105
and event_name='CONVERSION'
and exp_id like '%154556%';

如果这个字段被选取进入聚合表,应该如何处理呢?

我们的处理方式是:

  • 在聚合表中把这类字段建模成维度
  • 聚合表中需要一个计数指标 cnt,表示聚合表中一条数据由明细表多少条数据聚合得
  • 当这类字段被作为指标查询时,可将其与cnt指标配合计算得到正确结果

明细表查询:

select sum(targetConvNum)
from analysis.doris_xxx_event
where olap_date = 20221105
and event_name='CONVERSION'
and exp_id like '%154556%';

对应的聚合表查询:

select sum(targetConvNum * cnt)
from agg.doris_xxx_event_agg
where olap_date = 20221105
and event_name = 'CONVERSION'
and exp_id = 154556;

结束语

经过这一系列基于 Doris 的性能优化和测试,A/B 实验场景查询性能的提升超过了我们的预期。值得一提的是,Doris 较高的稳定性和完备的监控、分析工具也为我们的优化工作提效不少。 希望本次分享可以给有需要的朋友提供一些参考。

最后,感谢 SelectDB 公司和 Apache Doris 社区对我们的鼎力支持。Apache Doris 是小米集团内部应用最为广泛的 OLAP 引擎之一,目前集团内部正在推进最新的向量化版本升级工作。未来一段时间我们将会把业务优化工作和 Doris 最新的向量化版本进行适配,进一步助力业务的正向发展。

用户案例

作者|360数科中间件团队

编辑整理|SelectDB

作为以人工智能驱动的金融科技平台,360数科携手金融合作伙伴,为尚未享受到普惠金融服务的优质用户提供个性化的互联网消费金融产品,致力于成为连接用户与金融合作伙伴的科技平台。360数科旗下产品主要有 360借条、360小微贷、360分期等,截止目前,已累计帮助 141 家金融机构为 4300 万用户提供授信服务、为2630万用户提供借款服务、单季促成交易金额1106.75亿元。同时作为国内领先的信贷科技服务品牌,360数科在三季度累计注册用户数首次突破 2 亿。

业务需求

随着金融科技业务的不断发展,对数据的安全性、准确性、实时性提出了更严格的要求,早期 Clickhouse 集群用于分析、标签业务场景,但是存在稳定性较低、运维复杂和表关联查询较慢等问题,除此之外,我们业务中有部分报表数据分散存储在各类 DB 中,这也导致维护管理复杂度较高,亟需做出优化和重构。

系统选型及对比

基于以上需求及痛点,我们对实时数仓的选型目标提出了明确的需求,我们希望新的 MPP 数据库具有以下几个特点:

  • 数据写入性能高,查询秒级

  • 兼容标准的 SQL 协议

  • 表关联查询性能优秀

  • 丰富的数据模型

  • 运维复杂度低

  • 社区活跃

  • 对商业友好,无法律风险

2022年3月开始,我们对符合以上特点的数据库 Apache Doris 展开了为期两个月的调研测试。以下是 Apache Doris 1.1.2 在各个方面的满足情况。

基于上述情况,我们决定采用 Apache Doris,除了可以满足上文提到的几个特点,我们还考虑以下几个方面:

  1. Clickhouse 由于 Join 查询限制、函数局限性、数据模型局限性(只插入,不更新)、以及可维护性较差等原因,更适合日志存储以及保留当前存量业务,不满足我们当前的业务需求。

  2. 目前Apache Doris 社区活跃、技术交流更多,SelectDB 针对社区有专职的技术支持团队,在使用过程中遇到问题均能快速得到响应解决。

  3. Apache Doris 风险更小,对商业友好,无法律风险。大数据领域 Apache 基金会项目构成了事实标准,在 360数科内部已有广泛应用,且Apache 开源协议对商业友好、无法律风险,不会有协议上的顾虑。

平台架构

360数科大数据平台(毓数)提供一站式大数据管理、开发、分析服务,覆盖大数据资产管理、数据开发及任务调度、自助分析及可视化、统一指标管理等多个数据生命周期流程。在整个 OLAP 中,目前 Apache Doris 主要运用离线数仓分析加速、自助 BI 报表等业务场景。

在引入 Doris 后,考虑已有数据分析业务以及数据规模,Doris 集群将先同步部分业务上优先级更高的数据。通过上述架构图可以看到,依托 Doris 强大的查询性能,我们将把 Doris 架设在 Hive 数仓的上层,为特定场景进行查询加速,这样的架构建设起来成本很低,只需要完成数据从 Hive 数仓到 Doris 集群的导入适配,因为 Doris 集群并没有产生任何新表,可以直接复用已经建设好的数据血缘关系。

数据导入方案,我们在调研了 Stream Load 和 Broker Load 之后,从导入性能、开发成本上进行了评估,在导入性能上,Broker Load 要比 Stream Load 略胜一筹,而在开发成本上两种方式并没有明显的差异。而且对于大表的同步,Broker Load 的导入方式可以做到单表一次导入一个事务,而 Stream Load 在单表数据量超 10G 时则需要拆分后进行数据导入。因此数据导入选择使用 Broker Load 来进行。

数仓即席查询方案,我们自行开发的查询引擎支持多查询引擎动态切换的机制,通过识别查询数据的元信息对本次查询做自动的查询引擎(Doris/Presto/Spark/Hive)路由和故障切换。

Doris 支持原生 MySql 协议,对标准 SQL 支持良好,使得 Doris 可以和一些 BI 工具(帆软、观远等)无缝结合,因此单独搭建了一个 Doris 报表分析集群作为 BI 工具数据源。

应用实践

Doris 对 Hive数仓的查询加速方案

在即席查询场景中,传统的查询引擎(Hive/Spark/Presto)越来越满足不了数据开发者、数据分析师对查询响应性能提出的高要求,动辄几十秒甚者分钟级的查询耗时极大的限制了相关场景的开发效率。

为提高查询性能,我们通过架设的 Doris 数仓加速层来缩短查询耗时,目前我们在不开启 Doris 缓存、不开启用物化视图等优化策略的情况下,命中 Doris 即席查询平均耗时即可从几分钟缩短至 5 秒内。

未来我们将通过分析相关查询的特征,通过开启缓存、创建相关物化视图等策略来进一步优化 Doris 的查询性能。

实现 Doris 加速的核心是支持查询引擎动态切换,查询引擎动态切换的工作机制如下:

查询引擎会及时收集 Hive 和 Doris 的元信息,包括库、表、表字段、表行数等信息,在用户提交即席查询请求时,首先会解析出用户查询的表,并按照如下顺序判断:

  • 查询的表是否已在 Doris 同步

  • Doris 表和 Hive 表结构是否相同

  • Doris 表和 Hive 表表行数是否一致

如果以上要求均被满足,则会将该查询路由到 Doris,否则会依次按照 Presto、Spark、Hive 的顺序进行路由查询,当查询出现异常时,也会按照该顺序依次进行故障转移。

慢查询慢导入分析

对于慢查询和慢导入,Doris 提供了完善的 Profile 机制,在了解相关技术细节后,我们在线上集群开启了 Profile 收集,通过调度任务定时收集慢查询、慢导入的 Profile 信息并落库。

Doris 提供的 Profile 信息非常详细,例如 OLAP_SCAN_NODE 提供了原始的扫描行数,各个索引的过滤行数,每个 Instance 的 EXCHANGE_NODE 提供了接收的数据总行数和接收的数据量大小。这些信息为查询调优提供了详细的依据,我们在使用过程中针对快速定位查询性能的瓶颈进行了优化,取得了良好的效果。

建表规范

在我们的使用场景中,有下列类型的表:

  • pda表:每日全量更新,即每日分区存储全量快照数据

  • pdi表: 每日增量更新,即每日分区存储增量数据

  • a表:全量不分区表

  • s表:静态非每日更新数据

由于当前 Doris 集群中所有的表都是基于 Hive 数仓中各层级的表同步而来,因此目前仅使用了 Duplcate 模型和 Unique 模型,对于 pda、pdi 和 a 表,为了降低 Doris 表的分区数,减轻 FE 元数据管理压力,我们在建 Doris 表时均启用了根据日期划分的动态分区特性,较久远的历史数据我们按年、月的维度分区归档,近期的数据按日、小时分区,未来我们计划通过程序自动识别完成历史分区的归档合并。

对于 pda 表使用场景,pda 表需要每日同步全量数据,我们采用了 Duplicate 模型,不考虑使用 Unique 模型数据去重的原因是 Doris 的导入模型本身就提供了基于任务 Label 的数据一致性保证,同步时一次调度周期的 pda 表的一个分区的导入任务能产生唯一且不变的 Label,因此我们可以保证即使错误执行了多次,该分区的数据仍然不会重复。另外,因为 Duplicate 模型相比于 Unique 模型,在导入和查询阶段均不会做预聚合去重,所以可以一定程度上加速导入和查询的性能。

对于 pdi 表使用场景,因在实际使用中 pdi 表存在少数对历史数据的部分更新场景(绝大部分是数据更新场景,基本没有数据删除场景),考虑到 Doris 数据表的分区可用性,我们采用了 Unique 模型,这样在更新历史分区的数据时不必做重建分区操作。

对于 a 表使用场景,因业务上可以接受短时间数据不可用情况,我们启用了动态分区,在做数据导入时,每次导入都会先删除历史分区,然后将全量数据导入今天的分区内,这样做的考虑是杜绝重建表操作,且实施成本相对比较低,因此我们没有采取动态更新视图绑定当日分区的方案。

在 Doris 之前的版本中,尚未实现 Hive 元数据变更同步和管理功能,为了提高效率开发了 Doris 建表工具,我们通过选择和配置数仓集群、Hive 表名、数据模型、Bucket 数量等参数,自动关联 Hive 表,解析表字段并生成对应的建表语句。经过与社区沟通得知,最近即将发布的 1.2 新版本中已经实现 Multi Catalog,支持 Hive 元数据的对接和 Schema 的自动同步,可以极大程度上减少这一部分的工作。

监控体系

当前 Doris 集群监控体系分为主机指标监控告警、日志告警和集群指标监控告警,总体监控体系如下。

主机指标监控是基于 Open-Falcon 开发的监控告警平台,主要采集 Doris 集群节点的 CPU、IO、内存、磁盘等相关指标并进行监控告警。

集群指标监控参考了 Doris 官方文档提供的基于 PrometheusGrafana 和集群指标监控方案。

日志告警仍然是基于我们的监控告警平台,主要用于监控 Doris 服务日志中容易识别但其他监控方式成本较高的监控、告警场景,是其他两种监控的补充。通过日志监控告警,我们能够准确识别数据导入任务的失败原因并能进行及时的推送通知。

问题排查和审计日志

为了及时排查一些极端的集群问题,上述针对 Doris 的监控体系建设仍然是不够的。为了在集群 BE 出现异常宕机时快速定位堆栈,需要在所有的 BE 节点开启 Core Dump。除此之外,审计日志在集群的日常运维中也发挥了重要作用。

对于 Doris 集群的审计日志收集一般可以通过 2 种方式:

  • 第一种方式是通过日志收集组件、收集各个 FE 节点上的 fe.audit.log

  • 第二种方式是通过安装 Doris 提供的 Auditloader 插件(下载 Doris 源码,该插件在 doris/fe_plugins/auditloader,具体使用文档可参考官方文档:审计日志插件)。

考虑到第二种方式操作更简单,因此采用此方式进行日志采集。不过在使用 Auditloader 插件的过程中,陆续发现和修复了一些插件问题,并向社区提交了 PR,与此同时,我们定制开发了内部控制台,便于查看集群的同步任务情况,数据分布情况以及进行审计日志的检索。

审计日志为集群 BE 崩溃时具体 SQL 定位、客户端访问统计、查询 SQL 耗时统计、访问 SQL 特征分析等提供了详细的信息。例如,数据开发曾经反馈查询 Doris SQL 失败,检索日志出现了大量连接数超限的异常,我们通过审计日志,迅速定位到了问题原因是由于上游导入工作流 Bug 在短时间内创建较多的数据库连接。另外,对于曾经使用的低版本 Doris 出现数次 BE 异常宕机问题,我们通过 gdb 调试工具定位到崩溃时 SQL 的 query_id 后,配合审计日志也能快速的定位到导致崩溃的具体 SQL。

优化实践

数据导入实践和调优

初期数据源主要来自 Hive 数仓,因此大部分数据导入以 Broker Load 方式为主。大数据平台自助导入任务工作流适配了 Doris Broker Load 导入方式,数据开发零代码——通过简单的勾选配置即可完成自助的 Doris 数据导入工作流创建。

而在 Broker Load 的使用过程中,我们也陆续遇到了一些问题,这里拿出几个典型的问题和一些调优经验来分享。

在 Broker Load 导入时遇到的问题:

  1. 因表分桶数设置过少造成 Broker Load 导入失败,具体表现为导入任务失败且异常信息为:
tablet writer write failed, tablet_id=xxx, txn_id=xxx, err=-238

我们推测造成 -238 错误的原因可能是分桶设置太少,接着我们通过 BE 节点的挂载数据来查看单个 Tablet 下的文件大小,我们发现单个 Tablet 的文件占用空间远大于官方推荐的 10GB 上限范围,这也证明了我们的推测正确,因此我们通过适当提高 Doris 表的分桶数,使得这个问题有了较大的缓解。

顺便说一下,如果出现 -235(旧版本是-215)异常,一般是由于 Compaction 过慢导致 Tablet 版本堆积超过限制,这个时候通过 Grafana 看到 BE Compaction Score 在导入前后有明显的波动,而且绝对值很高。如果遇到此问题可以参阅 ApacheDoris 公众号文章:Doris 最佳实践-Compaction调优(3) 对Compaction过程进行调优。

  1. 因 Hive 表字段变更导致 Broker Load 导入失败:

Hive 表在使用过程中会有一些 DDL 的执行,从而导致表字段新增,我们数仓的 Hive 表均使用 ORC 格式存储,那么就会导致 Hive 表中部分历史分区的 ORC 文件中字段信息缺失(缺失新增字段),而新分区的 ORC 文件中字段是正常的,这个时候如果对历史数据重新导入,就会有下面的异常信息:

detailMessage: ParseError : Invalid column selected xxx

在阅读了 Broker Load 相关代码后确认了问题原因:在一次 Broker Load 导入过程中,导入任务的字段解析器会读取一个 ORC 文件头解析字段信息,但解析器只会解析一次,如果一次导入过程中同时有新、历史分区的 ORC 文件,那么就可能导致任务失败。

修复的方法也很简单,只需针对每个 ORC 文件重新解析一次文件头的字段信息即可。在了解问题原因及分析解决思路后,我们也和社区的同学一起修复了这个问题并提交了相关 PR。

  1. 遇到空 ORC 文件时 Broker Load 导入失败:

这个问题的错误表现和问题 2 比较类似,具体原因是 Broker Load 导入过程没有对 ORC 文件做判空,遇到空 ORC 文件仍会尝试解析 ORC 文件字段信息导致报错,我们把这个问题反馈给社区后,社区的同学很快修复了该问题。

  1. Broker Load 导入任务出现 Broker list path exception. path=hdfs:xxx

创建 Broker Load 任务,使用 Kerberos 认证访问 HDFS 的 Hive 文件导入数据,Hive 文件路径中分区和下一级目录使用通配符 *,访问所有分区所有文件,任务提交后隔 40 多秒出现如下的错误:

type:ETL_RUN_FAIL; msg:errCode = 2, detailMessage = Broker list path exception. path=hdfs:xxx

在阅读了 Broker Load 的访问 HDFS 相关代码后确认了问题原因,Broker Load 调用 HDFS 的 LS、DU 方法时会获取文件目录信息,由于路径下的文件过多导致耗时会超过 45 秒,而 Thrift 设置的 Socket 请求超时默认小于 40 秒,所以出现了上述的 RPC 异常,问题反馈社区后,对 FE 增加了配置参数broker_timeout_ms,设置为 90 秒后解决问题。

关于 Broker Load 的导入性能调优策略

我们针对 Broker Load 导入调优的主要方向在确保 Doris 集群不承压的情况下尽可能提高导入并发度,下面根据 2 个典型的案例来说明:

  1. 部分 pdi/pda 表因为数据规模太大导致全量导入耗时过长 (导入数据源是 Hive分区表

部分 pdi/pda 表数据规模在 T 级别,在进行全量导入时,如果只提交一个 Broker Load Job ,将因为导入任务的并发不够,导致导入耗时达到 5-6 小时。针对此问题,我们可以对导入任务进行 Job 拆分,在大数据平台也适配这种场景,支持任务的自动拆分和重试机制,具体的拆分方式如下图:

不过要注意的是,拆分后可能会对集群有较高的写入压力,要及时监控导入任务和集群的状态,特别针对 -235 的情况可能需要进行 Compaction 调优。

  1. 部分 ads 表因为数据规模太大导致全量导入耗时过长 (导入数据源是Hive非分区表

数据开发对部分报表的同步时效提出了很高的要求,我们在针对性的优化表同步时效时,发现一些表导入耗时较长,但通过集群监控体系发现相关表同步期间,BE、FE 节点的 CPU、内存、磁盘 IO 、网卡 IO 并没有达到瓶颈,集群的 Compaction Score 在此期间也一直稳定在低位,且整个同步过程同步任务均未出现-235、-238等相关的错误,我们推测瓶颈可能还是在导入任务的并发程度上。

因为有些表在 Hive 数仓是非分区的表,所以第 1 种通过划分分区范围拆分多个导入 Job 的方式就行不通了,理论上仍然可以通过划分不同的 HDFS 文件来拆分 Job,但是这种方式在毓数大数据平台还需要进一步去适配,所以我们还是优先考虑通过调整集群配置的方式彻底解决此问题:

首先可以通过适当调高 FE 的max_broker_concurrency去提高 Scan HDFS 文件阶段的并发度(最高调高至 BE 节点数),而对于 Table Sink 阶段,可通过调高 FE 的default_load_parallelism(设置fe.conf,可调整到 BE 节点数)和 send_batch_parallelism参数( SQL Session 执行set global send_batch_parallelism=5或在提交 Broker Load 中的 PROPERTIES 中指定,最高调整到 5,如果超过此值,需要同步调整 be.conf max_send_batch_parallelism_per_job 参数),提高该阶段并发度。通过提高 Broker Load Job 各阶段导入的并发度,相关报表的同步时效显著提升,这里我们选取 5 张典型表为例,优化前后的同步时效表现如下:

双机房容灾建设

为了保障 Doris 集群的可用性,我们需要为 Doris 集群提供双机房容灾能力。Doris 目前虽然可以通过不同的 Tag 将 BE 分组部署在多个机房,但是无法解决机房出现问题时的 FE 可用性问题。经过方案调研分析,我们决定通过自行开发 Replicator 主从同步插件去实施双机房容灾建设,具体的架构如下:

通过在主集群安装 Replicator 插件,Replicator 插件会拦截并解析主集群执行的全量 SQL,然后经过过滤操作,筛选涉及库、表结构变更和数据增、删、改相关的 SQL,并将相关 SQL(部分 SQL 需要改写)发送到备集群进行重放。除此之外,我们在 Doris 控制台开发了 Validator 数据校验程序,定期校验主备集群间的数据结构差异和数据差异并上报,在主集群因各种问题导致不可用时,直接通过切换 DNS 解析地址到备集群 LVS 地址完成主备集群的切换。

总结规划

效果总结

从 2022 年3月份开始进行对实时数仓沟通进行调研,7月份正式上线生产,集群数据规模快速增长。目前,生产环境共有 2 个集群,数百张表,几十 TB 数据,每日有数百个同步工作流在运行,几十亿规模的数据新增/更新。在此规模下,Doris 对业务支持良好,稳定运行。

  • Doris 集群架构清晰简单,不依赖其他组件,数据模型简单,数据导入方式多样化且适配成本很低,使得我们可以快速完成前期的调研测试并在短时间内上线实施。
  • Doris 集群作为目前公司 BI 工具的重要数据源,承载了相当一部分的报表分析业务,极大加速了报表分析的时效性。Doris 上线 3+月的时间,已经承载了小部分即席查询场景,大大缩短了相关查询的耗时。
  • Doris 具有完善的监控机制和审计机制,极大的降低了我们的运维工作
  • Doris 社区十分活跃,在我们使用 Doris 过程中遇到的一些疑难问题,官方也可以及时进行响应、处理。

未来规划

在近期的规划中,我们希望 Doris 能支撑更多的业务场景、发挥更大价值,例如基于 Doris 建立实时数仓、基于 Doris 重构用户行为画像、Doris HIVE 外表特性等。同时我们计划通过分析用户的查询 SQL 特征,结合 Doris 的查询缓存和物化视图特性,进一步提升查询效率。通过开发集群探查工具,实时探测集群数据表的数据分布情况,比如 Tablet 有没有过大,Tablet 数据分布是否均匀等,综合探查集群的运行情况并自动给出优化建议。

目前我们使用了 Doris 有大半年时间,在这半年期间一直保持和社区同学进行交流(提交 Issues 和 PRs),非常感谢 SelectDB 团队一直以来对我们的技术支持。最后祝 Apache Doris 越来越好,为基础软件建设添砖加瓦。

用户案例

导读: 长期以来,Apache Doris在小米集团都有着广泛的应用。随着小米互联网业务的快速发展,用户对Apache Doris的查询性能提出了更高的要求,Doris 向量化版本在小米内部上线已经迫在眉睫。在 SelectDB 公司和 Apache Doris 社区的鼎力支持下,我们在小米 A/B实验场景对 Doris 1.1.2 向量化版本进行了一系列的调优操作,使得查询性能和稳定性有了显著地提升。

背景

2019 年 9 月,为了满足小米互联网增长分析业务中近实时、多维分析查询的需求,小米集团首次引入了Apache Doris。在过去的三年时间里,Apache Doris 已经在小米内部得到了广泛的应用,支持了集团数据看板、广告投放、广告BI、新零售、用户行为分析、A/B实验平台、天星数科、小米有品、用户画像、小米造车等小米内部数十个业务,并且在小米内部形成了一套以 Apache Doris 为核心的数据生态 小米集团作为 Apache Doris 最早期的用户之一,一直深度参与社区建设,参与 Apache Doris 的稳定性打磨。

为了保证线上服务的稳定性,小米内部基于 Apache Doris 社区的 0.13 版本进行迭代,为小米的业务提供稳定的报表分析和 BI看板服务,经过业务的长时间打磨,内部 Doris 0.13 版本已经非常稳定。但是,随着小米互联网业务的发展,用户对 Doris 的查询性能提出了更高的要求,Doris 0.13 版本在某些场景下逐渐难以满足业务需求了。与此同时,Apache Doris 社区在快速发展,社区发布的 1.1 版本已经在计算层和存储层全面支持了向量化,查询性能相比非向量化版本有了明显地提升,基于此,小米内部的 Apache Doris 集群进行向量化版本升级势在必行。

场景介绍

小米的 A/B实验平台对 Doris 查询性能的提升有着迫切的需求,因此我们选择优先在小米的 A/B实验平台上线 Apache Doris 向量化版本,也就是 1.1.2 版本。

小米的A/B实验平台是一款通过 A/B测试的方式,借助实验分组、流量拆分与科学评估等手段来辅助完成科学的业务决策,最终实现业务增长的一款运营工具产品。在实际业务中,为了验证一个新策略的效果,通常需要准备原策略A 和新策略B 两种方案。 随后在总体用户中取出一小部分,将这部分用户完全随机地分在两个组中,使两组用户在统计角度无差别。将原策略A和新策略B分别展示给不同的用户组,一段时间后,结合统计方法分析数据,得到两种策略生效后指标的变化结果,并以此来判断新策略B 是否符合预期。

图1-小米的A/B实验简介

小米的A/B实验平台有几类典型的查询应用:用户去重、指标求和、实验协方差计算等,查询类型会涉及较多的 Count(distinct)、Bitmap计算、Like语句等。

上线前验证

我们基于 Doris 1.1.2 版本搭建了一个和小米线上 Doris 0.13 版本在机器配置和机器规模上完全相同的测试集群,用于向量化版本上线前的验证。验证测试分为两个方面:单 SQL 串行查询测试和批量 SQL 并发查询测试。在这两种测试中,我们在保证两个集群数据完全相同的条件下,分别在 Doris 1.1.2 测试集群和小米线上 Doris 0.13 集群执行相同的查询 SQL 来做性能对比。我们的目标是,Doris 1.1.2 版本在小米线上 Doris 0.13 版本的基础上有 1 倍的查询性能提升。

两个集群配置完全相同,具体配置信息如下:

  • 集群规模:3 FE + 89 BE

  • BE节点CPU: Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz 16核 32线程 × 2

  • BE节点内存:256GB

  • BE节点磁盘:7.3TB × 12 HDD

单SQL串行查询测试

在该测试场景中,我们选取了小米A/B 实验场景中 7 个典型的查询 Case,针对每一个查询 Case,我们将扫描的数据时间范围分别限制为 1 天、7 天和 20 天进行查询测试,其中单日分区数据量级大约为 31 亿(数据量大约 2 TB),测试结果如图所示:

图2-单日分区查询耗时

图3-7日分区查询耗时

图4-20日分区查询耗时

根据以上小米A/B 实验场景下的单SQL串行查询测试结果所示,Doris 1.1.2 版本相比小米线上Doris 0.13版本至少有 3~5 倍的性能提升,效果显著,提升效果远高于预期。

批量 SQL 并发查询测试

在并发测试中,我们将小米A/B 实验场景的查询 SQL 按照正常的业务并发分别提交到 Doris 1.1.2 测试集群和小米线上 Doris 0.13 集群,对比观察两个集群的状态和查询延迟。测试结果为,在完全相同的机器规模、机器配置和查询场景下,Doris 1.1.2 版本的查询延迟相比线上 Doris 0.13 版本整体上升了 1 倍,查询性能下降非常明显,另外,Doris 1.1.2 版本稳定性方面也存在比较严重的问题,查询过程中会有大量的查询报错。Doris 1.1.2 版本在小米A/B 实验场景并发查询测试的结果与我们的预期差别较大。并发查询测试过程中,我们遇到了几个比较严重的问题:

  • CPU使用率上不去

查询下发到 Doris 1.1.2 版本所在的集群,CPU 使用率最多只能打到 50% 左右,但是完全相同的一批查询下发到线上 Doris 0.13 版本的集群,CPU使用率可以打到接近 100%。因此推测 Doris 1.1.2 版本在小米 A/B 实验场景中将机器的 CPU 利用不起来造成了查询性能大幅度降低。

图5-Doris 1.1.2版本和Doris 0.13版本CPU使用率对比

  • 查询持续报错

用户并发提交查询的时候会出现如下报错,后续的查询任务均无法执行,集群完全处于不可用的状态,只有重启 BE 节点才能恢复。

RpcException, msg: timeout when waiting for send fragments RPC. Wait(sec): 5, host: 10.142.86.26

用户提交查询的时候也会频繁出现如下报错:

detailMessage = failed to initialize storage reader. tablet=440712.1030396814.29476aaa20a4795e-b4dbf9ac52ee56be, res=-214, backend=10.118.49.24
  • Like 语句查询较慢

在小米 A/B实验场景有较多的使用 Like 语句进行字符串模糊匹配的查询,在并发测试过程中,该类查询普遍性能较低。

  • 内存拷贝耗时较长

并发查询测试过程中,SQL 整体执行较慢,通过抓取查询过程中的 CPU 火焰图,发现读取字符串类型数据的时候内存拷贝会占用较多时间。

图6-CPU火焰图

调优实践

为了解决 Doris 1.1.2 版本在小米 A/B实验场景并发测试过程中暴露出的性能和稳定性问题,推动 Doris 向量化版本尽快在小米 A/B实验平台上线,我们和 SelectDB 公司以及 Apache Doris 社区一起对 Doris 1.1.2 版本进行了一系列的调优工作。

提升 CPU 使用率

针对并发查询时 CPU 使用率上不去的问题,我们截取了查询过程中BE进程的函数调用栈,通过分析发现,有较多的内存分配和释放操作在等锁,这可能会造成 CPU 使用率上不去。

  • 函数调用栈
#0  sys_futex (v3=0, a2=0x0, t=0x7f786c9e7a00, v=<optimized out>, o=128, a=0x560451827c48 <tcmalloc::Static::pageheap_lock_>) at /root/doris/doris/be/src/gutil/linux_syscall_support.h:2419
#1 SpinLockDelay (loop=1822369984, value=2, w=0x560451827c48 <tcmalloc::Static::pageheap_lock_>) at /root/doris/doris/be/src/gutil/spinlock_linux-inl.h:80
#2 base::internal::SpinLockDelay (w=w@entry=0x560451827c48 <tcmalloc::Static::pageheap_lock_>, value=2, loop=loop@entry=20) at /root/doris/doris/be/src/gutil/spinlock_linux-inl.h:68
#3 0x000056044cfd825d in SpinLock::SlowLock (this=0x560451827c48 <tcmalloc::Static::pageheap_lock_>) at src/base/spinlock.cc:118
#4 0x000056044f013a25 in Lock (this=<optimized out>) at src/base/spinlock.h:69
#5 SpinLockHolder (l=<optimized out>, this=0x7f786c9e7a90) at src/base/spinlock.h:124
#6 (anonymous namespace)::do_malloc_pages(tcmalloc::ThreadCache*, unsigned long) () at src/tcmalloc.cc:1360
...
#0  sys_futex (v3=0, a2=0x0, t=0x7f7494858b20, v=<optimized out>, o=128, a=0x560451827c48 <tcmalloc::Static::pageheap_lock_>) at /root/doris/doris/be/src/gutil/linux_syscall_support.h:2419
#1 SpinLockDelay (loop=-1803179840, value=2, w=0x560451827c48 <tcmalloc::Static::pageheap_lock_>) at /root/doris/doris/be/src/gutil/spinlock_linux-inl.h:80
#2 base::internal::SpinLockDelay (w=w@entry=0x560451827c48 <tcmalloc::Static::pageheap_lock_>, value=2, loop=loop@entry=2) at /root/doris/doris/be/src/gutil/spinlock_linux-inl.h:68
#3 0x000056044cfd825d in SpinLock::SlowLock (this=0x560451827c48 <tcmalloc::Static::pageheap_lock_>) at src/base/spinlock.cc:118
#4 0x000056044f01480d in Lock (this=<optimized out>) at src/base/spinlock.h:69
#5 SpinLockHolder (l=<optimized out>, this=0x7f7494858bb0) at src/base/spinlock.h:124
#6 (anonymous namespace)::do_free_pages(tcmalloc::Span*, void*) [clone .constprop.0] () at src/tcmalloc.cc:1435
...
  • Doris 内存管理机制

Doris 中使用 TCMalloc 进行内存管理。根据所分配和释放内存的大小,TCMalloc 将内存分配策略分为小内存管理和大内存管理两类。

图7-TCMalloc内存管理机制

(1)小内存管理

TCMalloc 使用了 ThreadCache、CentralCache 和 PageHeap 三层缓存来管理小内存的分配和释放。

对于每个线程,TCMalloc 都为其单独维护了一个 ThreadCache,每个 ThreadCache 中包含了多个单独的 FreeList,每个 FreeList 中缓存了 N 个固定大小的可供分配的内存单元。进行小内存分配时,会直接从 ThreadCache 中进行内存分配,相应地,小内存的回收也是将空闲内存重新放回 ThreadCache 中对应的 FreeList 中。由于每个线程都有自己独立的 ThreadCache,因此从 ThreadCache 中分配或回收内存是不需要加锁的,可以提升内存管理效率。

内存分配时,如果 ThreadCache 中对应的 FreeList 为空,则需要从 CertralCache 中获取内存来补充自身的 FreeList。CentralCache 中维护了多个 CentralFreeList 链表来缓存不同大小的空闲内存,供各线程的 ThreadCache 取用。由于 CentralCache 是所有线程共用的,因此 ThreadCache 从 CentralCache 中取用或放回内存时是需要加锁的。为了减小锁操作的开销,ThreadCache 一般从 CentralCache 中一次性申请或放回多个空闲内存单元。

当 CentralCache 中对应的 CentralFreeList 为空时,CentralCache 会向 PageHeap 申请一块内存,并将其拆分成一系列小的内存单元,添加到对应的 CentralFreeList 中。PageHeap 用来处理向操作系统申请或释放内存相关的操作,并提供了一层缓存。PageHeap 中的缓存部分会以 Page 为单位、并将不同数量的 Page 组合成不同大小的 Span,分别存储在不同的 SpanList 中,过大的 Span 会存储在一个 SpanSet 中。CentralCache 从 PageHeap 中获取的内存可能来自 PageHeap 的缓存,也可能是来自 PageHeap 向系统申请的新内存。

(2)大内存管理

大内存的分配和释放直接通过 PageHeap 来实现,分配的内存可能来自 PageHeap 的缓存,也可能来自 PageHeap 向系统申请的新内存。PageHeap 向系统申请或释放内存时需要加锁。

TCMalloc 中的 aggressive_memory_decommit 参数用来配置是否会积极释放内存给操作系统。当设置为 true 时,PageHeap 会积极地将空闲内存释放给操作系统,节约系统内存;当该配置设置为 false 时,PageHeap 会更多地将空闲内存进行缓存,可以提升内存分配效率,不过会占用更多的系统内存;在 Doris 中该参数默认为 true

通过分析查询过程中的调用栈发现,有比较多的线程卡在 PageHeap 向系统申请或释放内存的等锁阶段,因此,我们尝试将 aggressive_memory_decommit 参数设为false,让 PageHeap 对空闲内存进行更多的缓存。果然,调整完成之后,CPU 使用率可以打到几乎 100%。在 Doris 1.1.2 版本,数据在内存中采用列式存储,因此,会相比于 Doris 0.13 版本行存的方式有更大的内存管理开销。

图8-调优后Doris 1.1.2测试集群的CPU使用率

社区相关的PR:

https://github.com/apache/doris/pull/12427

缓解 FE 下发 Fragment 超时的问题

在 Doris 1.1.2 版本,如果一个查询任务的 Fragment 数量超过一个,查询计划就会采用两阶段执行(Two Phase Execution)策略。在第一阶段,FE 会下发所有的 Fragment 到 BE 节点,在 BE 上对 Fragment 执行相应的准备工作,确保 Fragment 已经准备好处理数据;当 Fragment 完成准备工作,线程就会进入休眠状态。在第二阶段,FE 会再次通过 RPC 向 BE 下发执行 Fragment 的指令,BE 收到执行 Fragment 的指令后,会唤醒正在休眠的的线程,正式执行查询计划。

RpcException, msg: timeout when waiting for send fragments RPC. Wait(sec): 5, host: 10.142.86.26

在用户执行查询时,会持续有上面的报错,并导致任何查询无法执行。通过截取进程的调用栈,分析发现大量的线程均在休眠状态,均阻塞在 Fragment 完成准备工作并休眠等待被唤醒的状态。排查发现,查询计划的两阶段执行机制中存在 Bug,如果执行计划被FE取消,BE 上已经完成 Fragment 准备工作并休眠等待的线程就不会被唤醒,导致 BE 上的 Fragment 线程池被耗尽,后续所有查询任务的 Fragment 下发到 BE 节点之后,因为没有线程资源都会等待直到 RPC 超时。

为了解决这个问题,我们从社区引入了相关的修复 Patch,为休眠的线程增加了超时唤醒机制,如果线程被超时唤醒,Fragment 会被取消,进而释放线程资源,极大地缓解了 FE 下发执行计划时 RPC 超时的问题。

该问题还未完全解决,当查询并发很大时还会偶发地出现。另外,我们还引入了 Doris 社区相关的其他 Patch 来缓解该问题,比如:减小执行计划的 Thrift Size,以及使用池化的 RPC Stub 替换单一的 RPC Stub 。

社区相关的PR如下:

https://github.com/apache/doris/pull/12392

https://github.com/apache/doris/pull/12495

https://github.com/apache/doris/pull/12459

修复 Tablet 元数据汇报的 Bug

在 Doris 中,BE 会周期性地检查当前节点上所有 Tablet 是否存在版本缺失,并向 FE 汇报所有 Tablet 的状态和元信息,由 FE 对每一个 Tablet 的三副本进行对比,确认其中的异常副本,并下发 Clone 任务,通过 Clone 正常副本的数据文件来恢复异常副本缺失的版本。

detailMessage = failed to initialize storage reader. tablet=440712.1030396814.29476aaa20a4795e-b4dbf9ac52ee56be, res=-214, backend=10.118.49.24

在该报错信息中,错误代码res=-214 (OLAP_ERR_VERSION_NOT_EXIST)表示查询计划执行过程中在 BE 上初始化 Rowset Reader 的时候出现异常,对应的数据版本不存在。在正常情况下,如果 Tablet 的某一个副本存在版本缺失,FE 生成执行计划的时候就不会让查询落在该副本上,然而,查询计划在 BE 上执行的过程中却发现版本不存在,则说明 FE 并没有检测到该副本存在版本缺失。

通过排查代码发现,BE 的 Tablet 汇报机制存在 Bug,当某一个副本存在版本缺失时,BE 并没有将这种情况正常汇报给 FE,导致这些存在版本缺失的异常副本并没有被 FE 检测到,因此不会下发副本修复任务,最终导致查询过程中会发生res=-214的报错。

社区相关的 PR 如下:

https://github.com/apache/doris/pull/12415

优化 Like 语句性能

在 Doris 1.1.2 版本中使用 Like 语句进行字符串模糊匹配查询时,Doris 底层其实是使用了标准库中的std::search()函数对存储层读出的数据进行逐行匹配,过滤掉不满足要求的数据行,完成 Like 语句的模糊匹配。通过调研和对比测试发现,GLIBC 库中的std::strstr()函数针对字符串匹配比std::search()函数有 1 倍以上的性能提升。最终我们使用std::strstr()函数作为 Doris 底层的字符串匹配算法,将 Doris 底层字符串匹配的性能可以提升 1 倍。

优化内存拷贝

在小米的场景中有很多字符串类型的查询字段,Doris 1.1.2 版本使用 ColumnString 对象来存储内存中的一列字符串数据,底层使用了 PODArray 结构来实际存储字符串。执行查询时,需要从存储层逐行读取字符串数据,在这个过程中需要多次对 PODArray 执行 Resize 操作来为列数据申请更大的存储空间,执行 Resize 操作会引起对已经读取的字符串数据执行内存拷贝,而查询过程中的内存拷贝非常耗时,对查询性能影响极大。

为了降低字符串查询过程中内存拷贝的开销,我们需要尽量减少对 PODArray 执行 Resize 操作的次数。鉴于小米 A/B实验场景中同一列不同行的字符串长度相对比较均匀,我们尝试预先为需要读取的字符串申请足够的内存来减少 Resize 的次数,进而降低内存拷贝的开销。在数据扫描时,每个 Batch 需要读取的数据行数是确定的(假设为 n),当字符串数据读取完指定的前 m(在小米的场景中,该值配置为100,m < n)行时,我们根据前 m 行的 PODArray 大小预估所有 n 行字符串数据需要的 PODArray 大小,并为其提前申请内存,避免后面逐行读取时多次执行内存申请和内存拷贝。

内存预估公式为:

所需PODArray总大小 = (当前PODArray总大小 / m)* n

图9-优化内存拷贝开销

当然,该方法只是对所需的内存进行了预估,根据预估的大小提前申请了内存,减少了后面逐行读取字符串时大量的 Resize 操作,减少了内存申请和内存拷贝的次数,并不能完全消除字符串读取过程中的内存拷贝。该优化方案只对一列中字符串长度比较均匀的情况有效,内存的预估相对会比较接近实际内存。如果一列中字符串长度差别较大,该方法的效果可能不甚明显,甚至可能会造成内存浪费。

调优测试结果

我们基于小米的 A/B实验场景对 Doris 1.1.2 版本进行了一系列调优,并将调优后的 Doris 1.1.2 版本与小米线上 Doris 0.13 版本分别进行了并发查询测试。测试情况如下:

测试1

我们选择了 A/B 实验场景中一批典型的用户去重、指标求和以及协方差计算的查询 Case(SQL 总数量为 3245)对两个版本进行并发查询测试,测试表的单日分区数据大约为 31 亿(数据量大约 2 TB),查询的数据范围会覆盖最近一周的分区。测试结果如图所示,Doris 1.1.2 版本相比 Doris0.13版本,总体的平均延迟降低了大约 48%,P95 延迟降低了大约 49%。在该测试中,Doris 1.1.2 版本相比 Doris0.13 版本的查询性能提升了接近 1 倍。

图10-查询平均延迟和P95延迟

测试2

我们选择了 A/B实验场景下的 7 份 A/B 实验报告对两个版本进行测试,每份 A/B 实验报告对应小米 A/B实验平台页面的两个模块,每个模块对应数百或数千条查询 SQL。每一份实验报告都以相同的并发向两个版本所在的集群提交查询任务。测试结果如图所示,Doris 1.1.2 版本相比 Doris 0.13 版本,总体的平均延迟降低了大约 52%。在该测试中,Doris 1.1.2 版本相比 Doris 0.13 版本的查询性能提升了超过 1 倍。

图11-查询平均延迟

测试3

为了验证调优后的 Doris 1.1.2 版本在小米 A/B 实验场景之外的性能表现,我们选取了小米用户行为分析场景进行了 Doris 1.1.2 版本和 Doris 0.13 版本的并发查询性能测试。我们选取了 2022年10月24日、25日、26日和 27日这 4 天的小米线上真实的行为分析查询 Case 进行对比查询,测试结果如图所示,Doris 1.1.2 版本相比 Doris 0.13 版本,总体的平均延迟降低了大约7 7%,P95 延迟降低了大约 83%。在该测试中,Doris 1.1.2 版本相比 Doris 0.13 版本的查询性能有 4~6 倍的提升。

图12-查询平均延迟和P95延迟

结束语

经过一个多月的性能调优和测试,Apache Doris 1.1.2 版本在查询性能和稳定性方面已经达到了小米 A/B实验平台的上线要求,在某些场景下的查询性能甚至超过了我们的预期,希望本次分享可以给有需要的朋友一些可借鉴的经验参考。

最后,感谢 SelectDB 公司和 Apache Doris 社区对我们的鼎力支持,感谢衣国垒老师在我们版本调优和测试过程中的全程参与和陪伴。Apache Doris 目前已经在小米集团内部得到了广泛地应用,并且业务还再持续增长,未来一段时间我们将逐步推动小米内部其他的 Apache Doris 业务上线向量化版本。

用户案例

导读: 随着业务量快速增长,云积互动对数据的实时性及灵活性提出更高要求,早期基于 CDH 的大数据平台已无法满足当前难度以及复杂度较高的的业务需求,因此云积互动于 2021 引进 Apache Doris 在部分业务中使用,并在使用过程中逐渐发掘出 Apache Doris 更多强大之处以及优势,最终决定在 2022 年全面应用 Apache Doris ,基于 Apache Doris 来构建云积互动企业级实时离线统一数仓。

业务背景

云积互动,全称深圳市云积分科技有限公司,成立于 2014 年,是国内领先的 AI 驱动的消费者运营服务提供商,致力于发展消费者运营相关理论、技术、算法、模型及软件工具,为全球消费性企业提供基于 AI 的消费者运营系统及运营策略服务,打造消费者运营领域最佳服务和实践标准,帮助企业构建消费者运营核心能力,以应对当前及未来的场景化运营挑战。目前已成为天猫、京东、抖音、腾讯等主流电商和社交平台深度合作伙伴,服务客户 2300+,其中世界 500 强客户超过 18 家,包括全球第一的美妆、日化、医药集团,均深度服务超过 7 年。

业务需求

云积互动的主要业务是以消费者运营为核心,包含了会员通,CRM,策略营销,数据资产等一系列业务,如下图所示。

图片

早期云积互动的大数据需求较少,起步也比较晚,2019 年才开始搭建基于 CDH 的大数据平台,因此大数据平台的主要目的是为了满足早期较为单一的 BI 数据看板及报表功能。近年来,随着业务量快速增长,数据量的增长,业务对数据的实时性及灵活性提出更高的要求,大数据平台也从早期的只需要满足单一的 BI 服务需求,扩展到需要支持各业务线,包含圈人服务,人群分析,AI 智能数据等多种业务需求。早期基于 CDH 的大数据平台已无法满足当前难度以及复杂度较高的的业务需求。

大数据平台的迭代

早期数仓架构

早期公司业务量较少,基于 Hive+Spark 构建的离线数仓即可满足早期大数据的需求。早期架构主要用于支持 BI 相关功能,数据大屏,自助报表等应用,大部分的指标仅要求 T+1 的指标。

下图为云积互动早期数仓架构,早期的数据源主要为业务数据库 MySQL 以及日志,数据通过 Streamsets 实时采集数据并经 ETL 后传入 ODS 层,存储到 Kudu 中,通过 Impala 对 ODS 层的数据进行处理,实现实时查询业务的需求。通过 Hive 构建了离线数仓的 DWD、DWS 以及 DIM 层,使用 Spark 进行离线任务的计算与调度,最终处理并计算完成的数据输出到 MySQL 和 Kylin 中,应用于上层业务应用及分析。

图片

存在的问题

  • 查询效率低:使用 Impala 多表查询速度太慢,亿级别表 Join 时,查询时间基本上在 3 分钟以上,部分复杂查询会在超过 3 分钟左右判定超时;同时使用 Impala 并行查询对内存消耗较大,影响其他任务运行。
  • 存储成本太高:使用多个系统存储数据( Hive,Hbase,Kudu),存储成本较高,随着业务量的增长,数据量指数级的增多,存储成本更是成倍数增加。
  • 开发难度大:数仓开发基于代码,不能满足灵活的指标需求,当分析需求越来越多时,存在多个场景组合查询、自定义等查询场景,面对这样的场景,必须进行再开发,开发和时间成本都很高。
  • 数据链路长:较长的链路使得数据的一致性很难保证,数据在某一环节出现问题,排查难度高,运维成本也会增加。

技术选型

基于业务对数据实时性及灵活性更高的要求,我们在 2021 年初对当前市面上较为流行的分析引擎 ClickHouse 和 Apache Doris 进行了调研,调研中我们发现, Apache Doris 具有高性能、简单易用、实现成本低等诸多优势。基于此,我们决定在部分业务上开始使用 Apache Doris,在使用的过程中逐渐发掘出 Apache Doris 更多强大之处以及优势,Apache Doris 在很多方面十分贴合我们的诉求,因此,我们决定在 2022 年全面应用 Apache Doris 在数据仓库中,基于 Apache Doris 构建云积互动企业级数仓,选择 Apache Doris 的主要原因如下:

1. 该架构开发效率高,查询性能远高于 Hive。

  • 数仓 ETL 由原来的 Spark 任务改为 Doris SQL 任务,使用 SQL 开发模式可进行快速迭代,开发效率提升了近一倍。
  • Doris 查询支持物化视图索引加速,Doris 在 1.0 版本开始引入了向量化引擎,性能提升 2~3 倍,平均查询耗时降低了 60%。

2. 该架构对 OLAP 支持更好,支持更为灵活的查询。

  • Doris 支持 Cube 函数,实现了 Kylin 的多维计算功能,极大的减少了 SQL 的开发量。
  • Doris 支持 Bitmap 类型,可实现人群之间的快速交并差计算并落地新人群。

3. 支持主键唯一和聚合模型的表,极大的减少了开发难度。

  • 使用主键唯一模型可做到依据主键数据覆盖,自动实现数据更新功能。
  • 使用聚合模型的表,减少了 ETL 过程中的 Join 操作,同时解决了上层数据到达时间不一致而导致的数据关联不上的问题。

新数仓架构

最开始使用新的数仓架构主要是要解决圈人速度慢的问题,圈人服务的核心在于人群圈选,通过 SQL 代码或标签取值组合等多种方式,实现人群查找,帮客户找到符合画像的人群,现在各行各业都会设计广告营销场景,其中也包括云积互动,而如何快速准确找到对的人推送广告就成了大数据场景需要解决的问题。当时我们只是在部分功能上使用了 Apache Doris,用 Apache Doris 替代了 Spark+Impala 来实现实时圈人功能,出乎意料的是,Apache Doris 投入使用之后效果极佳,新版架构的实时圈人业务平均每个任务耗时由 3~5 分钟降低到 10 秒左右,并且在人群落地方面,使用存储更小的 Bitmap 代替原来的人群落地为表,不仅数据管理方便,而且磁盘空间占用减少了 80% 左右。

除此之外,在一段时间的使用和学习中我们发现 Apache Doris 丰富的功能和核心优势,综上原因,我们产生了用 Apache Doris 数仓替代 Hive 数仓的想法,并迅速的付诸于实践。

当确定数仓使用 Apache Doris 之后,结合当前的业务需求以及早期架构需要解决的问题,需要将多平台数据打通,构建统一数据口径和数据指标,我们将数据仓库构建分为:ODS 层,DWD 层,DWS 层和 ADS 层,下图为各个分层主要负责的数据类型。

图片

新数仓的分层逻辑如下:

  • ODS 层: 从业务侧、日志系统和埋点系统等拉取过来的数据,按照原字段名存入 ODS 层。
  • DWD 层: 数据按照维度进行拆分,轻度聚合,将多个平台数据按照同一标准定义进行处理。
  • DWS 层: 主要负责数据的聚合、数据宽表、基于维度的一些计算指标等等。值得注意的是,DWS 层中的部分表使用了 AGGREGATE KEY 模型, AGGREGATE KEY 模型可以提前聚合数据,适合报表和多维度业务,可以有效避免数据汇总时的 Join 操作,部分指标可以使用该表特性实现,无需敲代码,降低了开发成本。
  • ADS 层: 各业务模块根据各自的需求,将数据从 DWS 层汇聚数据指标到 ADS 层。

新架构设计

图片

数据接入:

业务数据 MySQL 存储在多台 RDS 中,因 Binlog 的留存时间较短,且数据存放于多服务器,同时还进行了分库存储,因此如何接入历史数据以及如何同时接入多个库的数据成了棘手的问题。在调研过程中我们发现 FlinkCDC 可以完美解决上述问题:FlinkCDC 可以在接入历史数据之后自动切换为读取 Binlog,且 2.x 版本已经支持断点续传,支持水平扩展,支持动态添加表。

日志和埋点数据我们采用 Kafka + Doris Routine Load 导入方式,Routine Load 支持支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中,支持 Json 解析,并且可以做一些简单的 ETL,极大的减少了代码开发的工作。

数据加工:

数据加工采用了 Doris SQL、Insert into 的方式将增量计算完的结果导入到数仓分层中(ODS/DWD/DWS/ADS)。因业务需求对数据的实时性允许存在一点延迟性,因此将 Dolphinscheduler 设置为每 5 分钟调度一次增量 SQL;同时设置数仓每一层错峰执行任务,避免任务堵塞。

对于数据量比较小的表可以用一个 SQL 完成导入,而对于数据量大的表,为避免 union,需要分成多个 insert into 来执行。但有些大表的逻辑是多个大表的 Join 结果,对于这种场景,我们应用 AGGREGATE KEY 模型的表来解决,利用表的聚合特性来代替 SQL 的 Join 操作。

任务调度:

任务调度一直沿用 Dolphinscheduler,页面化的操作简单方便,且对 SQL 的支持友好,整个大数据平台的任务都是通过该调度器完成。目前使用了 2.x 以上的版本,支持使用钉钉报警和邮件报警的功能来监控任务,任务失败将通过钉钉或者邮件发送。

监控:

使用 Prometheus+Grafana 对 Doris 集群和 Flink 任务进行监控管理,页面化的监控,极大的减少运维成本。

优化方案

  1. 使用 Flink CDC 启动多个同步任务后,磁盘 IO 飙升导致查询延迟变高。
  • 对接多个数据源 CDC 任务同步数据时,中间数据写入 Kafka 进行合并和数据消峰,减少写入 Doris 的任务数。
  • 调整 DorisSink 写入频率,控制每批次数据量
  • 优化部分表的分区分桶,降低数据分片数量
  1. Doris 1.1.0-rc05 版本偶发后台合并线程持续合并已删除的 Tablet, 合并持续失败且数据版本最多的那个 Tablet 的版本数量升高至 150 左右。
  • 使用元数据管理工具 meta_tool 删除已失效的 Tablet 元数据,版本数量显著下降,稳定在 30 左右

  • 对大表的超过两年的数据做冷备份,减少大表的 Tablet 数,降低整个 Doris 集群对 Tablet 的管理压力。

图片3. Bitmap 存储散列用户 ID,使用 Bitmap 相关函数计算时性能较差。

  • 用户唯一 ID 通过字符串转换生成,基数大且非常稀疏;使用全局字典生成紧凑的 ID 代替,优化后性能提高近五倍。

总结与收益

  • Apache Doris 构建的离线+实时数仓一体化,采用 SQL 开发,并用 Dolphinscheduler 一键部署调度,极大的降低开发难度和开发工作量,可进行快速迭代以满足目前行业日益增长的数据需求。
  • 新架构采用 Flink+Doris 的架构体系,FlinkCDC+StreamLoad 可以做到流批一体化数据接入,减少了组件的使用,解决了数据的冗余存储,服务器资源节省了 30%数据存储磁盘占用减少 40% ,同时组件的运维成本大大减少。
  • Doris 的易用性极高,支持 MySQL 协议和标准 SQL,各业务线均可通过查询 MySQL 的方式进行数据查询,极大的减少了学习成本。
  • 从 2021 年 Apache Doris 上线云积互动的第一个业务至今,Apache Doris 在云积互动内部已成为大数据服务的基础,承担了包括人群分析、报表查询、指标计算等场景下的在线/离线需求,在较小的集群规模下支持了每天近 2 万次的用户在线分析查询。

未来规划

目前新的数据仓库已经建设完成,基于 Apache Doris 较多优异特性以及与业务需求较高吻合性,当前团队已经在着手搭建基于 Apache Doris 的数据质量管理和数据血缘,后续我们计划基于 Apache Doris 搭建数据指标体系。下面简单分享一下我们在数据质量管理和数据管理的实现想法。

1. 数据质量管理: 在 Apache Doris 的 ODS 层建表时设定一些非空主键,这些字段都是业务逻辑上的必须字段,当数据接入会给定一些默认值,这样就可以清晰的分类出这些数据,在质量分析中进行输出;在 ETL 中也会存在一些逻辑错误数据,这类数据会通过定时的 Doris SQL 脚本进行输出,同时也可以反馈到业务侧进行数据修复。2. 数据血缘: 依托 Doris 提供的 SQL 审计功能,使用采集工具 Filebeat/Logstash 持续采集审计日志发送到 Kafka,使用开源的 SQL 解析工具或者抽取 Doris 的 SQL 解析模块针对 DDL 或者 DML 进行解析,解析后的数据存入图数据库或者关系型数据库供业务端展示;该功能的实现对于数据问题排查、数据资产管理均有意义。

围绕着 Apache Doris 为核心的数据平台建设目前也在一直迭代发展,当然在使用中也发现了该产品的一些需要优化的地方,但不可否认它优秀的性能和丰富的功能,后续我们也将持续不断地进行优化,将优化方案贡献给 Apache Doris 社区。

作者介绍:

王杰:云积互动大数据团队 leader,负责数据平台研发及数据治理

蒙磊:云积互动大数据高级开发,负责数据平台研发和数仓开发

用户案例

作者:付帅,橙联(中国)有限公司数字化团队,大数据研发经理,负责数字化团队数据中台的研发以及 OLAP 引擎的应用落地及性能优化。

业务背景

橙联股份是一家服务全球跨境电商的科技公司,致力于通过市场分析、系统研发及资源整合,为客户提供物流、金融、大数据等多方面的服务产品,为全球跨境电商提供高品质、全方位的服务解决方案。

随着公司业务的发展和数据的不断增长,早期基于 MySQL 的传统数仓架构已经无法应对公司数据的快速增长。业务的需求和运营的决策对于数据时效性的要求越来越高,对数仓准实时能力的需求越发强烈。为了适应快速的增长需求,橙联于 2022 年正式引入 Apache Doris,以 Apache Doris 为核心构建了新的数仓架构,构建过程中对服务稳定性、查询稳定性、数据同步等多方面进行了优化,同时建立了以 Apache Doris 为核心的数据中台,在这一过程中积累了诸多使用及优化经验,在此分享给大家。

数仓架构演进

早期数仓架构

公司在成⽴初期业务量不⼤,数据团队规模⽐较⼩,对数据的需求仅局限于少量 T + 1 定制化报表需求。因此,早期数仓架构十分简单,如下图所示,直接使用 MySQL 构建数仓集市层,使用数仓集市层的数据进行报表开发,基于商业化的报表平台向需求方提供 T + 1 的数据报表服务。

存在的问题

  1. 随着公司业务规模扩大、数据量激增以及对数据时效性要求不断提高,使⽤ MySQL 进⾏数据分析越来越不能满⾜业务⽅的要求。

  2. 没有对数仓进⾏分层划域,烟囱式的开发模式数据复⽤性很差,开发成本⾼,业务⽅提出的需求不能快速得到响应。

  3. 对数据的质量和元数据的管理缺乏管控。

新数仓架构

为了解决旧架构日益凸显的问题,适应快速增长的数据和业务需求,今年正式引入 Apache Doris 构建新的数仓架构。

选择 Apache Doris 的原因:

易⽤性 - 在当前应用场景下,引入新技术,将面临大量报表迁移问题,因此必须要考虑的产品易用性问题,而 Apache Doris 在学习成本、报表迁移成本、服务运维成本上有着非常优秀的表现,具体包括:

  1. 采⽤ MySQL 协议和语法,⽀持标准 SQL,可以通过各类客户端⼯具来访问 Doris,能够与 BI ⼯具⽆缝对接
  2. ⽀持多表 Join,针对不同场景的 Join 提供了多种优化⽅案
  3. ⽣态扩展完善,离线数据的⾼效批量导⼊、流式数据的低延迟实时导⼊都有很好的⽀持
  4. 相较于业界其他热⻔ OLAP 数据库,Doris 的分布式架构⾮常简洁,只有 FE、BE 两个进程,运⾏不依赖任何第三⽅系统
  5. ⽀持弹性伸缩,对于部署、运维⾮常友好

性能 - 当前报表存在大量降耦聚合操作,对多表关联的查询性能和实时查询的时效性有着十分高的要求,而 Apache Doris 基于 MPP 架构实现,并自带了⾼效的列式存储引擎,可以支持:

  1. 数据的预聚合以及预聚合结果的⾃动更新
  2. 数据的实时更新
  3. ⾼并发查询

基于以上的原因,最终选择了以 Apache Doris 为核心构建新的数仓。

架构介绍

Apache Doris 的数仓架构十分简洁,不依赖 Hadoop 生态组件,构建及运维成本较低。

如以上架构图所示,我们的数据源共有 4 种,业务数据 MySQL、文件系统 CSV、埋点数据和第三方系统 API;针对不同的需求,使用了不同的数据导入方式,文件数据导入使用 Doris Stream Load,离线数据使用 DataX doriswriter 进行数据初始化,实时增量数据使用 Flink CDC + Flink Doris Connector 的方式进行数据同步;数据存储和计算层使用了 Doris ,在分层设计上采用 ODS(Operation Data Store 数据准备区,也称为贴源层)、 明细层 DWD、中间层 DWM、服务层 DWS、应用层 ADS 的分层思想,ODS 之后的分层数据通过 DolphinScheduler 调度 Doris SQL 进行增量和全量的数据更新。最终上层数据应用使用自研的一站式数据服务平台,可以和 Apache Doris 无缝对接,提供自助报表、自助取数、数据大屏、用户行为分析的数据应用服务。

基于 Apache Doris 的数仓架构方案可同时支持离线和准实时应用场景,准实时的 Apache Doris 数仓可以覆盖 80% 以上的业务场景。这套架构大大降低了研发成本,提高了开发效率。

当然在架构构建过程中也遇到一些问题和挑战,我们针对问题进行了相应的优化。

Apache Doris 构建数仓优化方案

在数仓的使用过程中,主要遇到三方面问题。首先是服务稳定性问题,其次是查询速度逐渐变慢的问题,最后是 Doris 数据同步和 Doris SQL 调度问题。具体体现在以下:

服务稳定性

优化前

在 Apache Doris 使用初期,FE 和 BE 的部署方式如下:

  • FE 负责元数据的管理、用户请求接入、查询的解析规划,资源占用较低,因此将 FE 和其他大数据组件混合部署 FE*3。

  • BE 负责数据存储、计算、查询计划的执行,资源占用较大,因此 BE 进行独立部署且分配了较多的资源 BE(16C 128G 4T1)7。

基于以上方式部署,使用初期运行的稳定性还不错。然而在使用了一段时间之后,这种部署方式暴露的问题就越来越明显。

存在的问题

  • 首先,FE 混合部署存在资源竞争。其他组件与 FE 服务竞争资源,导致 FE 资源不足,服务运行稳定性不好。具体问题表现在:每当机器资源使用率打满,就会导致 FE 节点无法连接,长时间获取不到心跳而被 FE 集群判定为离线。

  • 其次,BE 单磁盘存在 Compaction 效率低的问题。初期,我们在部署 BE 时,每个节点只分配了 1 块 4T 的磁盘,虽然磁盘的空间并不小,但是磁盘的数量比较少,Compaction 线程数只有 2,Compaction 效率很低,这是导致 BE Compaction Score 不健康的原因之⼀。

  • Compaction 配置参数

compaction_task_num_per_disk 每个磁盘上的任务数,默认为 2

max_compaction_threads Compaction 线程的总数,默认为 10

total_permits_for_compaction_score Compaction 任务配额,默认 10000

Compaction 工作机制:Apache Doris 的数据写⼊模型使⽤了与 LSM-Tree 类似的数据结构。数据以追加(Append)的⽅式写⼊磁盘,在读逻辑中,需要通过 Merge-on-Read 合并处理写入的数据。Merge-on-Read 会影响读取的效率,为了降低数据读取时需要合并的数据量,使⽤ LSM-Tree 的系统会引⼊后台数据合并逻辑,以⼀定策略定期的对数据进⾏合并。

优化后

为了解决以上的问题,对部署方式进行了优化以提升服务的稳定性:

  • FE 进行独⽴部署,避免了 FE 混合部署资源竞争问题

  • BE 进行磁盘拆分,多磁盘部署,从原来一块 4T 磁盘变更为 5 块 1T 磁盘,使 BE Compaction 线程数提升 5 倍,Compaction 效率、磁盘 I/O 带宽也得到了提升

  • 增加客户端连接代理层 ProxySQL,对 FE 连接进⾏负载均衡,解决 FE 连接单点问题

  • 增加 Supervisor 对 FE、BE 服务进程状态监控,FE、BE 进程意外宕机可以快速恢复

经过以上对部署的优化,Apache Doris 服务的稳定性有了很大的提升,基本可以满足目前对稳定性的需求。

查询稳定性

初期刚部署时,无论进行数据导入还是数据查询,执行起来都比较顺畅。但随着承载的表和数据导入作业数量不断增多,查询稳定性问题逐渐暴露出来。

优化前

存在的问题

随着使用时间和数据量的增加,集群开始频繁出现不可用的问题,主要体现在以下几个方面:

  • DDL 操作很难执行,查询速度变得比较缓慢

  • FE 服务频繁出现 OOM 宕机,有时候甚至出现无法连接的情况

下图是生产环境某张表的体积的大小和 Tablet 数量的情况。这张表的体积只有 275M,但是 Tablet 的数量却达到了 7410,这非常不合理。进一步排查确认整个集群 Tablet 数量非常庞大,集群只有 5T 的数据量,然而 Tablet 数量达到 150 万。

最初我们对 Apache Doris 表数据量大小、分区粒度、Bucket 数量、Tablet 数量的关系及 Tablet 数量对集群的影响没有清晰的概念。开发人员在 Apache Doris 使用中更多的是追求查询速度,将大部分的动态分区表的分区粒度设置的比较小,分区 Bucket 数量设置却比较大。

经过与 Apache Doris 社区小伙伴的沟通交流,了解到 Tablet 数量过大可能会导致元数据管理和运维压力增大,出现查询速度缓慢、FE 不稳定的问题。

优化方案

首先明确 Tablet 数量的计算方式,Tablet 数量 = 分区数量 Bucket 数量 副本数。结合当前实际使用情况,确认可以在分区粒度和 Bucket 数量上进⾏优化。我们将分区的粒度从按天、按周分区更改为按月分区,Bucket 数量按照数据体积大小进行合理的配置。如下图所示,是建议数据体积大小对应的 Bucket 数量设定。

本次的优化目标是将 Tablet 数量从 150 万降低到 15 万,同时我们也对未来的增长速度进行了规划,在三副本情况下,期望 Tablet 数量增长速度是 30000/TB。

优化后

实际上,在仅对 ODS 表进⾏了分区粒度和 Bucket 数量调整后,集群 Tablet 数量从 150 万下降到了 50 万,效果显著。

优化前的 FE

下图是 FE JVM Heap Stat 的监控情况,每当 FE 执行 Checkpoint 时,元数据就会在内存中复制一份。体现在 FE JVM Heap Stat 上就是形成一个个的波峰。优化之前 FE 对内存占用几乎持续在 90% 以上,而且每一个波峰都非常的尖锐。

优化后的 FE

优化后,FE 堆内存占用明显下降,波峰也变得很平缓。FE 的稳定性得到了比较明显的提升。

优化前、后的 BE

BE Compaction Score 监控反映版本的堆积情况,版本堆积的数值在 100 内属于正常范围,超过 100 说明集群可能存在潜在风险。上文也讲到,查询时需要先进行文件合并,再进行数据查询,如果 Tablet 版本过多,版本合并会影响到查询的速度和稳定性。

经过磁盘的部署优化和 Tablet 优化后,BE Compaction Score 可以稳定在 50 以内,查询的稳定性和性能都得到了非常大的提升。

数据同步优化

优化前:

MySQL 数据同步使用 Flink CDC -> Kafka -> Flink Doris Connector -> Doris 的方式全量 + 增量进入 Apache Doris。

在这个方案中,虽然 Flink CDC 支持全量历史数据的初始化,但由于历史遗留问题,部分表数据量较大,单表有几亿数据,而且这种表大多是没有设置任何分区和索引,在执行简单的 COUNT 查询时都需要花费十几分钟的时间。

其次,Flink CDC 虽然可以进行增量数据同步,但对于这类表的全量数据初始化几乎是不能实现的,因为 Flink CDC 做全量同步要先读取全量数据,然后对数据分块,再做数据同步,这种情况下,读取是非常非常缓慢的。

优化后

针对这种情况,在数据同步上,我们做了以下优化:

全量同步使用 MySQL Dump -> CSV -> Doris Stream Load -> Doris

增量同步使用 Flink CDC -> Kafka -> Flink Doris Connector -> Doris

数据调度优化

我们在使用 DolphinScheduler 进行 Doris SQL 的任务调度时,同一 node 下配置多条 SQL 时会出现 node 执行状态异常的情况,导致工作流 DAG 的 node 依赖失效,前一个节点未执行完,后一个节点就开始执行,结果会有缺数据甚至没有数据的情况。这个问题是因为 DolphinScheduler 2.x 在同一个 node 下不支持按顺序执行 MySQL 的多段 SQL,而 Doris 在 DolphinScheduler 中使用 MySQL 数据源创建连接。

此问题在 DolphinScheduler 3.0.0 版本被修复,配置中可以设置多段 SQL 的分隔符,解决了 DAG 依赖关系失效的问题。

Apache Doris 元数据管理和数据血缘实现方案

在没有元数据管理和数据血缘之前,我们经常会遇到一些问题,比如想找一个指标,却不知道指标在哪张表,只能找相关开发人员来确认,当然也存在开发人员忘记指标的位置和逻辑的情况。因此只能通过层层筛选确认,此过程十分耗费时间。

之前我们将表的分层划域、指标口径、负责人等信息放在 Excel 表中,这种维护方式很难保证其完整性,维护起来也比较困难。当需要对数仓进行优化时,无法确认哪些表是可以复用的、哪些表是可以合并的。当需要对表结构进行变更时,或者需要对指标的逻辑进行修改时,也无法确定变更是否会对下游的报表产生影响。

在以上问题背景下,我们经常遭到用户的投诉,接下来介绍如何通过元数据管理和数据血缘分析方案来解决这些问题。

实现方案

元数据管理和数据血缘是围绕 Apache Doris 展开,同时对 DolphinScheduler 的元数据进行了整合。

我们将元数据分为物理元数据和业务元数据两大类:

  • 物理元数据维护表的属性信息和调度信息

  • 业务元数据维护数据在应用过程中约定的口径和规范信息

数据血缘实现了表级血缘和字段级血缘:

  • 表级血缘支持粗粒度表关系和跨层引用分析

  • 字段级血缘支持细粒度的影响分析

上图中,右侧表格是物理元数据业务,元数据指标和血缘分析能够提供的数据服务。

接下来,一起看下元数据管理和数据血缘的架构和工作原理。

架构介绍

元数据管理和数据血缘实现方案技术栈

数据采集:使用 Apache Doris 提供的审计日志插件 Doris Audit Plugin 进行数据采集

数据存储:对审计日志插件做了定制化开发,使用 Kafka 存储 Doris 审计日志数据

血缘解析:使用 Druid 进行 Doris SQL 解析

血缘关系存储:使用 Nebula Graph 存储血缘关系数据

业务元数据:因为业务元数据经常发生 CRUD,因此使用 MySQL 存储业务元数据信息

搜索数据:使用 ElasticSearch 存储血缘关系查询索引以及表和字段的搜索索引数据

接下来介绍一下个架构四个组成部分:审计日志的采集和清洗服务、血缘解析服务、元数据信息整合服务、应用接口服务。

Apache Doris 审计日志的采集/清洗服务

考虑到如果将数据清洗逻辑放在审计日志插件中,当数据清洗逻辑发生变更,可能会出现数据遗漏,这样会对血缘分析和元数据管理产生影响,所以我们将审计日志插件数据采集和数据清洗进行了解耦,对 Apache Doris 的审计日志插件进行了改造,改造后审计日志插件可以实现审计日志数据的格式化以及将数据发送到 Kafka 的功能。

数据清洗服务,首先在清洗逻辑中增加数据重排逻辑,针对多个审计日志插件发送的数据进行重新排序,解决数据乱序的问题。其次把非标准 SQL 转化成标准 SQL,虽然 Apache Doris 支持 MySQL 协议以及标准 SQL 语法,但有一些建表语句、SQL 查询语法与标准 SQL 存在一定差异,因此将非标准 SQL 转化为 MySQL 的标准语句,最后将数据发送到 ES 和 Kafka 中。

血缘解析服务

血缘解析服务使用 Druid 进行 Doris SQL 的解析,通过 Druid 抽象语法树逐层递归获取表和字段的血缘关系,最后将血缘关系数据封装发送到图数据库、血缘查询索引发送到 ES 。进行血缘解析的同时会将物理元数据和业务元数据发送到对应存储位置。

元数据信息整合服务

元数据信息整合服务借鉴了 Metacat 的架构实现方案。

Connector Manager 负责创建 Apache Doris 和 DolphinScheduler 的元数据链接,同时也支持后续其他类型数据源接入的扩展。

Meta Service 负责元数据信息获取的具体实现。Apache Doris 元数据信息主要从 information Schema 库、Restful API、以及 SHOW SQL 的查询结果三种途径来获取。DolphinScheduler 的工作流元数据信息和调度记录信息从 DolphinScheduler 元数据库获取。

应用接口服务

我们提供了 3 种类型的应用接口服务,分别是血缘应用接口服务、元数据应用接口服务和数据行为分析应用接口服务。

  • 血缘应用接口服务提供表、字段、血缘关系、影响分析的查询服务。

  • 元数据应用接口服务提供元数据的查询和字段搜索的服务。

  • 数据行为分析应用接口服务提供表结构变更记录、数据读写记录、产出信息的查询服务。

以上就是元数据管理和数据血缘分析架构的整体方案的全部内容介绍。

总结及收益

今年我们完成了以 Apache Doris 为核心的准实时数仓建设,Apache Doris 经过半年的使用和优化,现在已经趋于稳定,能够满足我们生产的要求。

  1. 新的准实时数仓,对数据计算效率、数据时效的提升是巨大的。

以 On Time Delivery 业务场景报表计算为例,计算 1000w 单轨迹节点时效变化,使用 Apache Doris 之前需要计算 2 个多小时,并且计算消耗的资源非常大,只能在空闲时段进行错峰计算;使用 Apache Doris 之后,只需要 3min 就可以完成计算,之前每周更新一次的全链路物流时效报表,现在可以做到每 10 分钟更新最新的数据,达到了准实时的数据时效。

  1. 得益于 Apache Doris 的标准化 SQL,上手难度小,学习成本低,报表的迁移工作全员都可以参与。

原来报表使用 PowerBI 进行开发,需要对 PowerBI 有非常深入的了解,学习成本很高,开发周期也很长,而且 PowerBI 不使用标准 SQL,代码可读性差;现在基于 Doris SQL 加上自研的拖拉拽形式的报表平台,报表的开发成本直线下降,大部分需求的开发周期从周下降到了天。

未来规划

后续我们也将继续推进基于 Apache Doris 的数据中台建设,对元数据管理的完善、数据血缘的解析率持续进行优化,考虑到数据血缘是大家都渴望的应用,在未来血缘解析成熟后,我们会考虑将其贡献给社区。

与此同时,我们正在着手进行用户行为分析平台的构建,也在考虑使用 Apache Doris 作为核心的存储和计算引擎。目前 Apache Doris 在部分分析场景支持的函数还不够丰富,例如在有序窗口漏斗分析场景,虽然 Apache Doris 已经支持了 window_funnel 函数,但是每层漏斗转化的计算需要用到的 Array 相关计算函数还没有得到很好的支持。不过好在即将发布的 Apache Doris 1.2 版本将包含了 Array 类型以及相关函数,相信未来在越来越多的分析场景中 Apache Doris 都将得到落地。

用户案例

作者:赵伟,思必驰大数据高级研发,10 年大数据开发和设计经验,负责大数据平台基础技术和 OLAP 分析技术开发。社区贡献:Doris-spark-connector 的实时读写和优化。

业务背景

思必驰是国内专业的对话式人工智能平台公司,拥有全链路的智能语音语言技术,致力于成为全链路智能语音及语言交互的平台型企业,自主研发了新一代人机交互平台 DUI 和人工智能芯片 TH1520,为车联网、IoT 及政务、金融等众多行业场景合作伙伴提供自然语言交互解决方案。

思必驰于 2019 年首次引入 Apache Doris ,基于 Apache Doris 构建了实时与离线一体的数仓架构。相对于过去架构,Apache Doris 凭借其灵活的查询模型、极低的运维成本、短平快的开发链路以及优秀的查询性能等诸多方面优势,如今已经在实时业务运营、自助/对话式分析等多个业务场景得到运用,满足了 设备画像/用户标签、业务场景实时运营、数据分析看板、自助 BI、财务对账等多种数据分析需求。在这一过程中我们也积累了诸多使用上的经验,在此分享给大家。

架构演进

早期业务中离线数据分析是我们的主要需求,近几年,随着业务的不断发展,业务场景对实时数据分析的要求也越来越高,早期数仓架构逐渐力不从心,暴露出很多问题。为了满足业务场景对查询性能、响应时间及并发能力更高的要求,2019 年正式引入 Apache Doris 构建实时离线一体的数仓架构。

以下将为大家介绍思必驰数仓架构的演进之路,早期数仓存在的优缺点,同时分享我们选择 Apache Doris 构建新架构的原因以及面临的新问题与挑战。

早期数仓架构及痛点

如上图所示,早期架构基于 Hive +Kylin 来构建离线数仓,实时数仓架基于 Spark+MySQL 来构建实时分析数仓。

我们业务场景的数据源主要分为三类,业务数据库如 MySQL,应用系统如 K8s 容器服务日志,还有车机设备终端的日志。数据源通过 MQTT/HTTP 协议、业务数据库 Binlog 、Filebeat 日志采集等多种方式先写入 Kafka 。在早期架构中,数据经 Kafka 后将分为实时和离线两条链路,首先是实时部分,实时部分链路较短,经过 Kafka 缓冲完的数据通过 Spark 计算后放入 MySQL 中进行分析,对于早期的实时分析需求,MySQL 基本可以满足分析需求。而离线部分则由 Spark 进行数据清洗及计算后在 Hive 中构建离线数仓,并使用 Apache Kylin 构建 Cube,在构建 Cube 之前需要提前做好数据模型的的设计,包括关联表、维度表、指标字段、指标需要的聚合函数等,通过调度系统进行定时触发构建,最终使用 HBase 存储构建好的 Cube。

早期架构的优势:

  1. 早期架构与 Hive 结合较好,无缝对接 Hadoop 技术体系。

  2. 离线数仓中基于 Kylin 的预计算、表关联、聚合计算、精确去重等场景,查询性能较高,在并发场景下查询稳定性也较高。

早期架构解决了当时业务中较为紧迫的查询性能问题,但随着业务的发展,对数据分析要求不断升高,早期架构缺点也开始逐渐凸显出来。

早期架构的痛点:

  1. 依赖组件多。Kylin 在 2.x、3.x 版本中强依赖 Hadoop 和 HBase ,应用组件较多导致开发链路较长,架构稳定性隐患多,维护成本比很高。

  2. Kylin 的构建过程复杂,构建任务容易失败。Kylin 构建需要进行打宽表、去重列、生成字典,构建 Cube 等如果每天有 1000-2000 个甚至更多的任务,其中至少会有 10 个甚至更多任务构建失败,导致需要大量时间去写自动运维脚本。

  3. 维度/字典膨胀严重。维度膨胀指的是在某些业务场景中需要多个分析条件和字段,如果在数据分析模型中选择了很多字段而没有进行剪枝,则会导致 Cube 维度膨胀严重,构建时间变长。而字典膨胀指的是在某些场景中需要长时间做全局精确去重,会使得字典构建越来越大,构建时间也会越来越长,从而导致数据分析性能持续下降。

  4. 数据分析模型固定,灵活性较低。在实际应用过程中,如果对计算字段或者业务场景进行变更,则要回溯部分甚至全部数据。

  5. 不支持数据明细查询。早期数仓架构是无法提供明细数据查询的,Kylin 官方给的解决方法是下推给 Presto 做明细查询,这又引入了新的架构,增加了开发和运维成本。

架构选型

为解决以上问题,我们开始探索新的数仓架构优化方案,先后对市面上应用最为广泛的 Apache Doris、Clickhouse 等 OLAP 引擎进行选型调研。相较于 ClickHouse 的繁重运维、各种各样的表类型、不支持关联查询等,结合我们的 OLAP 分析场景中的需求,综合考虑,Apache Doris 表现较为优秀,最终决定引入 Apache Doris 。

新数仓架构

如上图所示,我们基于 Apache Doris 构建了实时+离线一体的新数仓架构,与早期架构不同的是,实时和离线的数据分别进行处理后均写入 Apache Doris 中进行分析。

因历史原因数据迁移难度较大,离线部分基本和早期数仓架构保持一致,在 Hive 上构建离线数仓,当然完全可以在 Apache Doris 上直接构建离线数仓。

相对早期架构不同的是,离线数据通过 Spark 进行清洗计算后在 Hive 中构建数仓,然后通过 Broker Load 将存储在 Hive 中的数据写入到 Apache Doris 中。这里要说明的, Broker Load 数据导入速度很快,天级别 100-200G 数据导入到 Apache Doris 中仅需要 10-20 分钟。

实时数据流部分,新架构使用了 Doris-Spark-Connector 来消费 Kafka 中的数据并经过简单计算后写入 Apache Doris 。从架构图所示,实时和离线数据统一在 Apache Doris 进行分析处理,满足了数据应用的业务需求,实现了实时+离线一体的数仓架构。

新架构的收益:

  1. 极简运维,维护成本低,不依赖 Hadoop 生态组件。Apache Doris 的部署简单,只有 FE 和 BE 两个进程, FE 和 BE 进程都是可以横向扩展的,单集群支持到数百台机器,数十 PB 的存储容量,并且这两类进程通过一致性协议来保证服务的高可用和数据的高可靠。这种高度集成的架构设计极大的降低了一款分布式系统的运维成本。在使用 Doris 三年时间中花费的运维时间非常少,相比于基于 Kylin 搭建的早期架构,新架构花费极少的时间去做运维。

  2. 链路短,开发排查问题难度大大降低。基于 Doris 构建实时和离线统一数仓,支持实时数据服务、交互数据分析和离线数据处理场景,这使得开发链路变的很短,问题排查难度大大降低。

  3. 支持 Runtime 形式的 Join 查询。Runtime 类似 MySQL 的表关联,这对数据分析模型频繁变更的场景非常友好,解决了早期结构数据模型灵活性较低的问题。

  4. 同时支持 Join、聚合、明细查询。解决了早期架构中部分场景无法查询数据明细的问题。

  5. 支持多种加速查询方式。支持上卷索引,物化视图,通过上卷索引实现二级索引来加速查询,极大的提升了查询响应时间。

  6. 支持多种联邦查询方式。支持对 Hive、Iceberg、Hudi 等数据湖和 MySQL、Elasticsearch 等数据库的联邦查询分析。

问题和挑战:

在建设新数仓架构过程中,我们遇到了一些问题:

  • 高并发场景对 Apache Doris 查询性能存在一定影响。我们分别在 Doris 0.12 和 Doris 1.1 版本上进行测试,同一时间同样的 SQL,10 并发和 50 并发进行访问,性能差别较大。

  • 在实时写入场景中,当实时写入的数据量比较大时,会使得 IO 比较密集,导致查询性能下降。

  • 大数据量下字符串精确去重较慢。目前使用的是 count distinct 函数、Shuffle 和聚合算子去重,此方式算力比较慢。当前业内常见的解决方法一般是针对去重列构建字典,基于字典构建 Bitmap 索引后使用 Bitmap 函数去重。目前 Apache Doris 只支持数字类型的 Bitmap 索引,具有一定的局限性。

业务场景的应用

Apache Doris 在思必驰最先应用在实时运营业务场景以及自助/对话式分析场景,本章节将介绍两个场景的需求及应用情况。

实时运营业务场景

首先是实时运营业务场景,如上图所示,实时运营业务场景的技术架构和前文所述的新版数仓架构基本一致:

  • 数据源:数据源新版架构图中一致,包括 MySQL 中的业务数据,应用系统埋点数据以及设备和终端日志。

  • 数据导入:离线数据导入使用 Broker Load,实时数据导入使用 Doris-Spark-Connector 。

  • 数据存储与开发:几乎所有的实时数仓全部在 Apache Doris 构建,有一部分离线数据放在 Airflow 上执行 DAG 跑批任务。

  • 数据应用:最上层是业务侧提出的业务分析需求,包括大屏展示,数据运营的实时看板、用户画像、BI 看板等。

在实时运营业务场景中,数据分析的需求主要有两方面:

  • 由于实时导入数据量比较大,因此对实时数据的查询效率要求较高

  • 在此场景中,有 20+ 人的团队在运营,需要同时开数据运营的看板,因此对实时写入的性能和查询并发会有比较高的要求。

自助/对话式分析场景

除以上之外,Apache Doris 在思必驰第二个应用是自助/对话式分析场景。

如上图所示,在一般的 BI 场景中,用户方比如商务、财务、销售、运营、项目经理等会提出需求给数据分析人员,数据分析人员在 BI 平台上做数据看板,最终把看板提供给用户,用户从 BI 看板上获取所需信息,但是有时候用户想要查看明细数据、定制化的看板需求,或者在某些场景需做任意维度的上卷或者下钻的分析,一般场景下 BI 看板是不支持的的,基于以上所述用户需求,我们打造了自助对话式 BI 场景来解决用户定制化的需求。

与一般 BI 场景不同的是,我们将自助/对话式 BI 场景从数据分析人员方下沉到用户方,用户方只需要通过打字,描述数据分析的需求。基于我们公司自然语言处理的能力,自助/对话式 BI 场景会将自然语言转换成 SQL,类似 NL2SQL 技术,需要说明的是这里使用的是定制的自然语言解析,相对开源的 NL2SQL 命中率高、解析结果更精确。当自然语言转换成 SQL 后,将 SQL 给到 Apache Doris 查询得到分析结果。由此,用户通过打字就可以随时查看任意场景下的明细数据,或者任意字段的上卷、下钻。

相比 Apache Kylin、Apache Druid 等预计算的 OLAP 引擎,Apache Doris 符合以下几个特点:

  • 查询灵活,模型不固定,支持自由定制场景。

  • 支持表关联、聚合计算、明细查询。

  • 响应时间要快速。

因此我们很顺利的运用 Apache Doris 实现了自助/对话式分析场景。同时,自助/对话式分析在我们公司多个数据分析场景应用反馈非常好。

实践经验

基于上面的两个场景,我们使用过程当中积累了一些经验和心得,分享给大家。

数仓 表设计:

  1. 千万级(量级供参考,跟集群规模有关系)以下的数据表使用 Duplicate 表类型,Duplicate 表类型同时支持聚合、明细查询,不需要额外写明细表。

  2. 当数据量比较大时,使用 Aggregate 聚合表类型,在聚合表类型上做上卷索引,使用物化视图优化查询、优化聚合字段。由于 Aggregate 表类型是预计算表,会丢失明细数据,如有明细查询需求,需要额外写一张明细表。

  3. 当数据量又大、关联表又多时,可用 ETL 先写成宽表,然后导入到 Doris,结合 Aggregate 在聚合表类型上面做优化,也可以使用官方推荐 Doris 的 Join 优化:https://doris.apache.org/zh-CN/docs/dev/advanced/join-optimization/doris-join-optimization

写入:

  1. 通过 Spark Connector 或 Flink Connector 替代 Routine Load: 最早我们使用的是 Routine Load 实时写入 BE 节点, Routine Load 的工作原理是通过 SQL 在 FE 节点起一个类似于 Task Manager 的管理,把任务分发给 BE 节点,在 BE 节点起 Routine Load 任务。在我们实时场景并发很高的情况下,BE 节点 CPU 峰值一般会达到 70% 左右,在这个前提下,Routine Load 也跑到 BE 节点,将严重影响 BE 节点的查询性能,并且查询 CPU 也将影响 Routine Load 导入, Routine Load 就会因为各种资源竞争死掉。面对此问题,目前解决方法是将 Routine Load 从 BE 节点拿出来放到资源调度上,用 Doris-Spark/Flink-Connector 替换 Routine Load。当时 Doris-spark-Connector 还没有实时写入的功能,我们根据业务需求进行了优化,并将方案贡献给社区。

  2. 通过攒批来控制实时写入频率:当实时写入频率较高时,小文件堆积过多、查询 IO 升高,小文件排序归并的过程将导致查询时间加长,进而出现查询抖动的情况。当前的解决办法是控制导入频次,调整 Compaction 的合并线程、间隔时间等参数,避免 Tablet 下小文件的堆积。

查询:

  1. 增加 SQL 黑名单,控制异常大查询。个别用户在查询时没有加 where 条件,或者查询时选择的时间范围较长,这种情况下 BE 节点的 SQL 会把磁盘的负载和 CPU 拉高,导致其他节点的 SQL 查询变慢,甚至出现 BE 节点宕机的情况。目前的解决方案是使用 SQL 黑名单禁止全表及大量分区实时表的查询。

  2. 使用 SQL Cache 和 SQL Proxy 实现高并发访问。同时使用 SQL Cache 和 SQL Proxy 的原因在于,SQL Cache 的颗粒度到表的分区,如果数据发生变更, SQL Cache 将失效,因此 SQL Cache 缓存适合数据更新频次较低的场景(离线场景、历史分区等)。对于数据需要持续写到最新分区的场景, SQL Cache 则是不适用的。当 SQL Cache 失效时 Query 将全部发送到 Doris 造成重复的 Runtime 计算,而 SQL Proxy 可以设置一秒左右的缓存,可以避免相同条件的重复计算,有效提高集群的并发。

存储:

使用 SSD 和 HDD 做热温数据存储周期的分离,近一年以内的数据存在 SSD,超过一年的数据存在 HDD。Apache Doris 支持对分区设置冷却时间,但只支持创建表分区时设置冷却的时间,目前的解决方案是设置自动同步逻辑,把历史的一些数据从 SSD 迁移到 HDD,确保 1 年内的数据都放在 SSD 上。

升级:

升级前一定要备份元数据,也可以使用新开集群的方式,通过 Broker 将数据文件备份到 S3 或 HDFS 等远端存储系统中,再通过备份恢复的方式将旧集群数据导入到新集群中。

升级前后性能对比

思必驰最早是从 0.12 版本开始使用 Apache Doris 的,在今年我们也完成了从 0.15 版本到最新 1.1 版本的升级操作,并进行了基于真实业务场景和数据的性能测试。

从以上测试报告中可以看到,总共 13 个测试 SQL 中,前 3 个 SQL 升级前后性能差异不明显,因为这 3 个场景主要是简单的聚合函数,对 Apache Doris 性能要求不高,0.15 版本即可满足需求。而在 Q4 之后的场景中 ,SQL 较为复杂,Group By 有多个字段、多个字段聚合函数以及复杂函数,因此升级新版本后带来的性能提升非常明显,平均查询性能较 0.15 版本提升 2-3 倍。由此,非常推荐大家去升级到 Apache Doris 最新版本。

总结和收益

  1. Apache Doris 支持构建离线+实时统一数仓,一个 ETL 脚本即可支持实时和离线数仓,大大缩短开发周期,降低存储成本,避免了离线和实时指标不一致等问题。

  2. Apache Doris 1.1.x 版本开始全面支持向量化计算,较之前版本查询性能提升 2-3 倍。经测试,Apache Doris 1.1.x 版本在宽表场景的查询性能已基本与 ClickHouse 持平。

  3. 功能强大,不依赖其他组件。相比 Apache Kylin、Apache Druid、ClickHouse 等,Apache Doris 不需要引入第 2 个组件填补技术空档。Apache Doris 支持聚合计算、明细查询、关联查询,当前思必驰超 90% 的分析需求已移步 Apache Doris 实现。 得益于此优势,技术人员需要运维的组件减少,极大降低运维成本。

  4. 易用性极高,支持 MySQL 协议和标准 SQL,大幅降低用户学习成本。

未来计划

  1. Tablet 小文件过多的问题。Tablet 是 Apache Doris 中读写数据最小的逻辑单元,当 Tablet 小文件比较多时会产生 2 个问题,一是 Tablet 小文件增多会导致元数据内存压力变大。二是对查询性能的影响,即使是几百兆的查询,但在小文件有几十万、上百万的情况下,一个小小的查询也会导致 IO 非常高。未来,我们将做一个 Tablet 文件数量/大小比值的监控,当比值在不合理范围内时及时进行表设计的修改,使得文件数量和大小的比值在合理的范围内。

  2. 支持基于 Bitmap 的字符串精确去重。业务中精确去重的场景较多,特别是基于字符串的 UV 场景,目前 Apache Doris 使用的是 Distinct 函数来实现的。未来我们会尝试的在 Apache Doris 中创建字典,基于字典去构建字符串的 Bitmap 索引。

  3. Doris-Spark-Connector 流式写入支持分块传输。Doris-Spark-Connector 底层是复用的 Stream Load,工作机制是攒批,容易出现两个问题,一是攒批可能会会出现内存压力导致 OOM,二是当 Doris-Spark-Connector 攒批时,Spark Checkpoint 没有提交,但 Buffer 已满并提交给 Doris,此时 Apacche Doris 中已经有数据,但由于没有提交 Checkpoint,假如此时任务恰巧失败,启动后又会重新消费写入一遍。未来我们将优化此问题,实现 Doris-Spark-Connector 流式写入支持分块传输。

用户案例

业务背景

因增长分析业务需要,小米集团于 2019 年首次引入了 Apache Doris 。经过三年时间的发展,目前 Apache Doris 已经在广告投放、新零售、增长分析、数据看板、天星数科、小米有品、用户画像等小米内部数十个业务中得到广泛应用 ,并且在小米内部已经形成一套以 Apache Doris 为核心的数据生态。

当前 Apache Doris 在小米内部已经具有数十个集群、总体达到数百台 BE 节点的规模,其中单集群最大规模达到近百台节点,拥有数十个流式数据导入产品线,每日单表最大增量 120 亿、支持 PB 级别存储,单集群每天可以支持 2W 次以上的多维分析查询。

架构演进

小米引入 Apache Doris 的初衷是为了解决内部进行用户行为分析时所遇到的问题。随着小米互联网业务的发展,各个产品线利用用户行为数据对业务进行增长分析的需求越来越迫切。让每个业务产品线都自己搭建一套增长分析系统,不仅成本高昂,也会导致效率低下。因此能有一款产品能够帮助他们屏蔽底层复杂的技术细节,让相关业务人员能够专注于自己的技术领域,可以极大提高工作效率。基于此,小米大数据和云平台联合开发了增长分析系统 Growing Analytics(下文中简称 GA ),旨在提供一个灵活的多维实时查询和分析平台,统一数据接入和查询方案,帮助业务线做精细化运营。(此处内容引用自:基于 Apache Doris 的小米增长分析平台实践

分析、决策、执行是一个循环迭代的过程,在对用户进行行为分析后,针对营销策略是否还有提升空间、是否需要在前端对用户进行个性化推送等问题进行决策,帮助小米实现业务的持续增长。这个过程是对用户行为进行分析-决策-优化执行-再分析-再决策-再优化执行的迭代过程。

历史架构

增长分析平台立项于 2018 年年中,当时基于开发时间和成本,技术栈等因素的考虑,小米复用了现有各种大数据基础组件(HDFS, Kudu, SparkSQL 等),搭建了一套基于 Lamda 架构的增长分析查询系统。GA 系统初代版本的架构如下图所示,包含了以下几个方面:

  • 数据源:数据源是前端的埋点数据以及可能获取到的用户行为数据。
  • 数据接入层:对埋点数据进行统一的清洗后打到小米内部自研的消息队列 Talos 中,并通过 Spark Streaming 将数据导入存储层 Kudu 中。
  • 存储层:在存储层中进行冷热数据分离。热数据存放在 Kudu 中,冷数据则会存放在 HDFS 上。同时在存储层中进行分区,当分区单位为天时,每晚会将一部分数据转冷并存储到 HDFS 上。
  • 计算层/查询层:在查询层中,使用 SparkSQL 对 Kudu 与 HDFS 上数据进行联合视图查询,最终把查询结果在前端页面上进行显示。

在当时的历史背景下,初代版本的增长分析平台帮助我们解决了一系列用户运营过程中的问题,但同时在历史架构中也存在了两个问题:

第一个问题: 由于历史架构是基于 SparkSQL + Kudu + HDFS 的组合,依赖的组件过多导致运维成本较高。原本的设计是各个组件都使用公共集群的资源,但是实践过程中发现执行查询作业的过程中,查询性能容易受到公共集群其他作业的影响,容易抖动,尤其在读取 HDFS 公共集群的数据时,有时较为缓慢。

第二个问题: 通过 SparkSQL 进行查询时,延迟相对较高。SparkSQL 是基于批处理系统设计的查询引擎,在每个 Stage 之间交换数据 Shuffle 的过程中依然需要落盘操作,完成 SQL 查询的时延较高。为了保证 SQL 查询不受资源的影响,我们通过添加机器来保证查询性能,但是实践过程中发现,性能提升的空间有限,这套解决方案并不能充分地利用机器资源来达到高效查询的目的,存在一定的资源浪费。 (此处内容引用自:基于 Apache Doris 的小米增长分析平台实践

针对上述两个问题,我们的目标是寻求一款计算存储一体的 MPP 数据库来替代我们目前的存储计算层的组件,在通过技术选型后,最终我们决定使用 Apache Doris 替换老一代历史架构。

基于 Apache Doris 的新版架构

当前架构从数据源获取前端埋点数据后,通过数据接入层打入 Apache Doris 后可以直接查询结果并在前端进行显示。

选择 Doris 原因:

  • Doris 具有优秀的查询性能,能够满足业务需求。

  • Doris 支持标准 SQL ,用户使用与学习成本较低。

  • Doris 不依赖于其他的外部系统,运维简单。

  • Doris 社区拥有很高活跃度,有利于后续系统的维护升级。

新旧架构性能对比

我们选取了日均数据量大约 10 亿的业务,分别在不同场景下进行了性能测试,其中包含 6 个事件分析场景,3 个留存分析场景以及 3 个漏斗分析场景。经过对比后,得出以下结论:

  • 在事件分析的场景下,平均查询所耗时间降低了 85%
  • 在留存分析和漏斗分析场景下,平均查询所耗时间降低了 50%

应用实践

随着接入业务的增多和数据规模的增长,让我们也遇到不少问题和挑战,下面我们将介绍在使用 Apache Doris 过程中沉淀出来的一些实践经验

数据导入

小米内部主要通过 Stream Load 与 Broker Load 以及少量 Insert 方式来进行 Doris 的数据导入。数据一般会先打到 Talos 消息队列中,并分为实时数据和离线数据两个部分。

实时数据写入 Apache Doris 中:

一部分业务在通过 Flink 对数据进行处理后,会通过 Doris 社区提供的 Flink Doris Connector 组件写入到 Doris 中,底层依赖于 Doris Stream Load 数据导入方式。也有一部分会通过 Spark Streaming 封装的 Stream Load 将数据导入到 Doris 中。

离线数据写入 Apache Doris 中:

离线数据部分则会先写到 Hive 中,再通过小米的数据工场将数据导入到 Doris 中。用户可以直接在数据工场提交 Broker Load 任务并将数据直接导入 Doris 中,也可以通过 Spark SQL 将数据导入 Doris 中。Spark SQL 方式则是依赖了 Doris 社区提供的 Spark Doris Connector 组件,底层也是对 Doris 的 Stream Load 数据导入方式进行的封装。

数据查询

用户通过数据工场将数据导入至 Doris 后即可进行查询,在小米内部是通过小米自研的数鲸平台来做查询的。用户可以通过数鲸平台对 Doris 进行查询可视化,并实现用户行为分析(为满足业务的事件分析、留存分析、漏斗分析、路径分析等行为分析需求,我们为 Doris 添加了相应的 UDF 和 UDAF )和用户画像分析。

虽然目前依然需要将 Hive 的数据导过来,但 Doris 社区也正在支持湖仓一体能力,在后续实现湖仓一体能力后,我们会考虑直接通过 Doris 查询 Hive 与 Iceberg 外表。值得一提的是,Doris 1.1 版本已经实现支持查询 Iceberg 外表能力。 同时在即将发布的 1.2 版本中,还将支持 Hudi 外表并增加了 Multi Catalog ,可以实现外部表元数据的同步,无论是查询外部表的性能还是接入外表的易用性都有了很大的提升。

Compaction 调优

Doris 底层采用类似 LSM-Tree 方式,支持快速的数据写入。每一次的数据导入都会在底层的 Tablet 下生成一个新的数据版本,每个数据版本内都是一个个小的数据文件。单个文件内部是有序的,但是不同的文件之间又是无序的。为了使数据有序,在 Doris 底层就会存在 Compaction 机制,异步将底层小的数据版本合并成大的文件。Compaction 不及时就会造成版本累积,增加元数据的压力,并影响查询性能。由于 Compaction 任务本身又比较耗费机器 CPU、内存与磁盘资源,如果 Compaction 开得太大就会占用过多的机器资源并影响到查询性能,同时也可能会造成 OOM。针对以上问题,我们一方面从业务侧着手,通过以下方面引导用户:

  • 通过引导业务侧进行合理优化,对表设置合理的分区和分桶,避免生成过多的数据分片。
  • 引导用户尽量降低数据的导入频率 增大单次数据导入的量,降低 Compaction 压力。
  • 引导用户避免过多使用会在底层生成 Delete 版本的 Delete 操作。在 Doris 中 Compaction 分为 Base Compaction 与 Cumulative Compaction。Cumulative Compaction 会快速的把大量新导入的小版本进行快速的合并,在执行过程中若遇到 Delete 操作就会终止并将当前 Delete 操作版本之前的所有版本进行合并。由于 Cumulative Compaction 无法处理 Delete 版本,在合并完之后的版本会和当前版本一起放到 Base Compaction 中进行。当 Delete 版本特别多时, Cumulative Compaction 的步长也会相应变短,只能合并少量的文件,导致 Cumulative Compaction 不能很好的发挥小文件合并效果。

另一方面我们从运维侧着手:

  • 针对不同的业务集群配置不同的 Compaction 参数。 部分业务是实时写入数据的,需要的查询次数很多,我们就会将 Compaction 开的大一点以达到快速合并目的。而另外一部分业务只写今天的分区,但是只对之前的分区进行查询,在这种情况下,我们会适当的将 Compaction 放的小一点,避免 Compaction 占用过大内存或 CPU 资源。到晚上导入量变少时,之前导入的小版本能够被及时合并,对第二天查询效率不会有很大影响。
  • 适当降低 Base Compaction 任务优先级并增加 Cumulative Compaction 优先级。 根据上文提到的内容,Cumulative Compaction 能够快速合并大量生成的小文件,而 Base Compaction 由于合并的文件较大,执行的时间也会相应变长,读写放大也会比较严重。所以我们希望 Cumulative Compaction 优先、快速的进行。
  • 增加版本积压报警。 当我们收到版本积压报警时,动态调大 Compaction 参数,尽快消耗积压版本。
  • 支持手动触发指定表与分区下数据分片的 Compaction 任务。 由于 Compaction 不及时,部分表在查询时版本累积较多并需要能够快速进行合并。所以,我们支持对单个表或单个表下的某个分区提高 Compaction 优先级。

目前 Doris 社区针对以上问题已经做了 一系列的优化 ,在 1.1 版本中 大幅增强了数据 Compaction 能力,对于新增数据能够快速完成聚合,避免分片数据中的版本过多导致的 -235 错误以及带来的查询效率问题。\ 首先,在 Doris 1.1 版本中,引入了 QuickCompaction,增加了主动触发式的 Compaction 检查,在数据版本增加的时候主动触发 Compaction。同时通过提升分片元信息扫描的能力,快速的发现数据版本多的分片,触发 Compaction。通过主动式触发加被动式扫描的方式,彻底解决数据合并的实时性问题。

同时,针对高频的小文件 Cumulative Compaction,实现了 Compaction 任务的调度隔离,防止重量级的 Base Compaction 对新增数据的合并造成影响。

最后,针对小文件合并,优化了小文件合并的策略,采用梯度合并的方式,每次参与合并的文件都属于同一个数据量级,防止大小差别很大的版本进行合并,逐渐有层次的合并,减少单个文件参与合并的次数,能够大幅的节省系统的 CPU 消耗。在社区 1.1 新版本的测试结果中,不论是 Compaction 的效率、CPU 的资源消耗,还是高频导入时的查询抖动,效果都有了大幅的提升。

具体可以参考: Apache Doris 1.1 特性揭秘:Flink 实时写入如何兼顾高吞吐和低延时

监控报警

Doris 的监控主要是通过 Prometheus 以及 Grafana 进行。对于 Doris 的报警则是通过 Falcon 进行。

小米内部使用 Minos 进行集群部署。Minos 是小米内部自研并开源的大数据服务进程管理工具。在完成 Doris 集群部署后会更新至小米内部的轻舟数仓中。在轻舟数仓中的节点注册到 ZooKeeper 后,Prometheus 会监听 ZooKeeper 注册的节点,同时访问对应端口,拉取对应 Metrics 。在这之后,Grafana 会在面板上对监控信息进行显示,若有指标超过预设的报警阈值,Falcon 报警系统就会在报警群内报警,同时针对报警级别较高或某些无法及时响应的警告,可直接通过电话呼叫值班同学进行报警。

另外,小米内部针对每一个 Doris 集群都有 Cloud - Doris 的守护进程。Could - Doris 最大功能是可以对 Doris 进行可用性探测。比如我们每一分钟对 Doris 发送一次 select current timestamp(); 查询,若本次查询 20 秒没有返回,我们就会判断本次探测不可用。小米内部对每一个集群的可用性进行保证,通过上述探测方法,可以在小米内部输出 Doris 可用性指标。

小米对 Apache Doris 的优化实践

在应用 Apache Doris 解决业务问题的同时,我们也发现了 Apache Doris 存在的一些优化项,因此在与社区进行沟通后我们开始深度参与社区开发,解决自身问题的同时也及时将开发的重要 Feature 回馈给社区,具体包括 Stream Load 两阶段提交(2PC)、单副本数据导入、Compaction 内存限制等。

Stream Load 两阶段提交(2PC)

遇到的问题

在 Flink 和 Spark 导入数据进 Doris 的过程中,当某些异常状况发生时可能会导致如下问题:

Flink 数据重复导入 Flink 通过周期性 Checkpoint 机制处理容错并实现 EOS,通过主键或者两阶段提交实现包含外部存储的端到端 EOS。Doris-Flink-Connector 1.1 之前 UNIQUE KEY 表通过唯一键实现了 EOS,非 UNIQUE KEY 表不支持 EOS。

Spark SQL 数据部分导入 通过 SparkSQL 从 Hive 表中查出的数据并写入 Doris 表中的过程需要使用到 Spark Doris Connector 组件,会将 Hive 中查询的数据通过多个 Stream Load 任务写入 Doris 中,出现异常时会导致部分数据导入成功,部分导入失败。

Stream Load 两阶段提交设计

以上两个问题可以通过导入支持两阶段提交解决,第一阶段完成后确保数据不丢且数据不可见,这就能保证第二阶段发起提交时一定能成功,也能够保证第二阶段发起取消时一定能成功。

Doris 中的写入事务分为三步:

  1. 在 FE 上开始事务,状态为 Prepare ;
  2. 数据写入 BE;
  3. 多数副本写入成功的情况下,提交事务,状态变成 Committed,并且 FE 向 BE 下发 Publish Version 任务,让数据立即可见。

引入两阶段提交之后,第 3 步变为状态修改为 Pre Commit,Publish Version 在第二阶段完成。用户在第一阶段完成后(事务状态为 Pre Commit ),可以选择在第二阶段放弃或者提交事务。

Doris-Flink-Connector 1.1 使用两阶段 Stream Load 并支持 Flink 两阶段提交实现了 EOS,只有全局的 Checkpoint 完成时,才会发起 Sream Load 的第二阶段提交,否则发起第二阶段放弃。

解决 SparkSQL 数据部分导入

Doris-Spark-Connector 使用两阶段 Stream Load 之后,成功的 Task 通过 Stream Load 第一阶段将写入数据到 Doris (Pre Commit 状态,不可见),当作业成功后,发起所有 Stream Load 第二阶段提交,作业失败时,发起所有 Stream Load 第二阶段取消。这就确保了不会有数据部分导入的问题。

单副本数据导入优化

单副本数据导入设计

Doris 通过多副本机制确保数据的高可靠以及系统高可用。 写入任务可以按照使用的资源分为计算和存储两类:排序、聚合、编码、压缩等使用的是 CPU 和内存的计算资源,最后的文件存储使用存储资源,三副本写入时计算和存储资源会占用三份。

那能否只写一份副本数据在内存中,待到单副本写入完成并生成存储文件后,将文件同步到另外两份副本呢?答案是可行的,因此针对三副本写入的场景,我们做了单副本写入设计。单副本数据在内存中做完排序、聚合、编码以及压缩后,将文件同步至其他两个副本,这样很大程度上可以节省出 CPU 和内存资源。

性能对比测试

Broker Load 导入 62G 数据性能对比 导入时间: 三副本导入耗时 33 分钟,单副本导入耗时 31 分钟。

内存使用: 内存使用上优化效果十分明显,三副本数据导入的内存使用是单副本导入的三倍。单副本导入时只需要写一份内存,但是三副本导入时需要写三份内存,内存优化达到了 3 倍。

CPU 消耗对比: 三副本导入的 CPU 消耗差不多是单副本的三倍。

并发场景性能对比

测试中向 100 个表并发导入数据,每个表有 50 个导入任务,任务总数为 5000 个。单个 Stream Load 任务导入的数据行是 200 万行,约为 90M 的数据。测试中开了 128 个并发, 单副本导入和三副本导入进行了对比:

导入时间: 3 副本导入耗时 67 分钟,而后单副本耗时 27 分钟完成。导入效率相当提升两倍以上。

内存使用: 单副本的导入会更低。

CPU 消耗对比: 由于都已经是开了并发在导入,CPU 开销都比较高,但是单副本导入吞吐提升明显。

Compaction 内存限制

之前 Doris 在单机磁盘一次导入超过 2000 个 Segment 的情况下,Compaction 有内存 OOM 的问题。对于当天写入但不查当天数据而是查询之前的数据业务场景,我们会把 Compaction 稍微放的小一点,避免占用太大的内存,导致进程 OOM。Doris 之前每个磁盘有固定的线程做存储在这个盘上的数据的 Compaction,没有办法在全局进行管控。因为我们要限制单个节点上面内存的使用,所以我们将该模式改成了生产者-消费者模式:

生产者不停的从所有的磁盘上面生产任务,之后将生产任务提交到线程池中。我们可以很好的把控线程池的入口,达到对 Compaction 的限制。我们在合并时会把底层的小文件进行归并排序,之后在内存里给每一个文件开辟 Block,所以我们可以近似认为占用的内存量与文件的数量是相关的,从而可以通过对单节点上同时执行合并的文件数量做限制,来达到控制内存的效果。

我们增加了对单个 BE Compaction 合并的文件数量的限制。 若正在进行的 Compaction 的文件数量超过或等于当前限制时,后续提交上来的任务就需要等待,等到前面的 Compaction 任务做完并将指标释放出来后,后边提交进来的那些任务才可以进行。

通过这种方式,我们对某些业务场景做了内存的限制,很好的避免集群负载高时占用过多内存导致 OOM 的问题。

总结

自从 Apache Doris 从 2019 年上线第一个业务至今,目前 Apache Doris 已经在小米内部服务了数十个业务、集群数量达到数十个、节点规模达到数百台、每天完成数万次用户在线分析查询,承担了包括增长分析和报表查询等场景绝大多数在线分析的需求。

与此同时,以上所列小米对于 Apache Doris 的优化实践,已经有部分功能已经在 Apache Doris 1.0 或 1.1 版本中发布,有部分 PR 已经合入社区 Master,在不久后发布的 1.2 新版本中应该就会与大家见面。随着社区的快速发展,有越来越多小伙伴参与到社区建设中,社区活跃度有了极大的提升。Apache Doris 已经变得越来越成熟,并开始从单一计算存储一体的分析型 MPP 数据库走向湖仓一体的道路,相信在未来还有更多的数据分析场景等待去探索和实现。

相关链接:

SelectDB 官方网站:

https://selectdb.com

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

用户案例

导读:知乎基于业务需求搭建了 DMP 平台,本文详细的介绍了 DMP 的工作原理及架构演进过程,同时介绍了 Apache Doris 在 DMP 平台的应用实践,本文对大家了解 DMP 工作方式很有帮助,欢迎阅读。

DMP 业务背景

DMP 平台是大家老生常谈的话题。在早期广告系统出现之后就拥有了类似的 DMP 平台,比如:腾讯的广点通、阿里巴巴的达摩盘等都是业界做的比较好的 DMP 平台典型。而知乎搭建属于自己的 DMP 平台,一方面是因为知乎有相关的站内运营业务;另一方面也是因为我们可以通过搭建 DMP 平台支持内部系统对接、同时还可以协助完成相关业务发展以及定制化需求建设的目的。

DMP 业务包含:业务模式、业务场景以及业务需求。

img

图 1.1 DMP 业务

DMP 平台设计的方向:为了找到我们的核心客户,并在后续对我们的核心客户完成如广告投放等营销操作,让核心客户跟我们的内容之间能够有更好的匹配。

业务模式

DMP 平台业务模式

  • 从站外转站内。典型场景是广告主在进行广告投放的过程中,如何通过 Mapping 将可能出现的站外人群转到站内,并在站内的系统上承接这些用户包。
  • 从站内转站外。 在知乎内先找到定向用户后再去用这些用户在三方投广告。
  • 站内运营。 包括内容运营,用户运营以及活动运营。一方面可以增加知乎相关内容的宣传,另一方面进行客户定位并精准解决某些客户的问题与需求。与此同时,我们也可以通过活动设计来提升业务效果。

业务场景

基于这三种业务模式,主要应用的业务场景:

  • 信息流方面。 拿推荐场景举例:推荐场景中会有定向推荐以及定向提权两种诉求。定向推荐是我们把推送内容定向推送给某些用户,而定向提权是我们把推送内容在被推送的用户身上完成提权并重新打分。
  • 广告侧实时竞价。 得知该用户身上挂了哪些广告之后可以进行实时竞价,通过排序选出最适合该用户的广告。
  • 详情页。 详情页中会有弹窗提示:比如说某个用户点击某个详情之后,若该用户没有达到目标条件,会弹窗引导来该用户达到条件。
  • 活动平台。 设置活动的目标用户。针对不一样的目标群体,展示不同的活动信息。
  • 触达系统。 比如在推送消息、弹窗和短信时,可以拿到一类具体的用户,之后向该类用户进行发布相应的 Push 和站内信等。
  • 站外投放。 找到合适的用户群并在站外为其投放相应的广告。

业务需求

基于业务模式场景,在人群方面能做的事情可以分为三类:

对接系统

一般分为以下 3 种情况:

  • 该用户命中了哪些人群包。拿广告系统为例,该人群包 ID 可以 Mapping 成一个广告,也就是该用户命中了哪些广告。
  • 内部人群包。人群包对内部而言就是把内容推荐给谁,或者给谁发布内容的 Push。
  • 对外部的广告。当我们筛选出一类用户需要投放在站外时,这时候就是在使用对外部的人群包。对于这两个人群包之间的区别而言,人群 ID 会有不同:一种是站内的通用 ID,另外一种是基于不同投放平台上对应的对外 ID。

人群定向

人群定向包括导入/导出、基于某些特征进行标签圈选、人群泛化、用户量预估等。

  • 人群泛化,拿到比较小的种子人群包后,基于规则寻找相似特征,再通过对相似特征的置信度进行调整,扩展更多的人群。
  • 用户量预估,选中一批用户后需要立即了解这批用户的数量有多少。

人群洞察

包括画像洞察,用户的内部画像以及两个不同人群包之间的对比分析。

业务流程

由于当前 DMP 业务的三种场景面向人群不同,会提供向站内与站外不同系统来完成这批人群相关的运营动作。

据此情况,我们组织人群定向功能、获取到目标用户之后进行 Mapping ,拿到用户在站内或站外投放的效果回收之后,获取目标用户进行构成分析与对比分析,进行用户洞察。若目标达成,那么本次投放顺利达成;若目标未达成,运营侧会做相关假设:是否可以再加一个特征或特殊操作进一步提升业务?提出假设之后,设计 AB 实验,经过 AB 试验后,我们又会对目标人群进行一些调整。以上就是我们的运营流程。

img

图 1.2 DMP 业务流程

站内运营自闭环

人群定向。 通过标签圈选,选择历史上有活动效果或导入喜欢此活动的人群,进行泛化完成基础人群包选择,以此来确定目标人群。

进行投放。 由于很多业务在推荐侧的信息流、触达系统、详情系统以及广告引擎等系统中进行对接,可以利用以上系统和业务来完成对目标用户在站内不同流量场景投放。

投放之后。 获取本次投放的效果并进行分析。比如我们做的操作是发 Push,谁点击了 Push、阅读时间等行为,可以分析有哪些用户更喜欢我们此次发布的 Push,从而获得目标用户的典型特征。

若此次 Push 的点击量达成推送目标,那么目标完成;若点击量没有达成目标,我们会进行一个假设,比如最初预测点击 Push 的男性>女性,但最后得出的结果相反时,我们会通过 TGI 算法进行排序,找出这两次差别的典型特征,完成设计并产出 AB 实验。

通过 AB 实验我们可以对前后的人群包再做一次对比并发布相关的 Push。如果点击量有所提升,我们在后续过程中就会不断的完成循环,最终找到基于我们运营场景的领域的精准用户。

站内向站外投放

基于已经积累的用户特征数据,找出在知乎内部有几率产生站外效果的人群,并划出该类人群的范围。再通过 Mapping,可以把站内的 ID 转换成在三方投放平台上产出的 ID 并进行投放。

由于这个过程我们的站内系统不同,并不能直接拿到相应的埋点数据供我们进行数据链路建设,所以就必须要通过三方投放平台上下载相应的埋点数据,通过类似的场景完成数据导入后再进行后续流程的建设。这也就导致了整个过程的效果回收会比较长。

站外转站内

假如我是一名知乎站外的广告主,我要投放一个牙膏类的产品,但是我对知乎的用户并不是特别了解。通过前期所做的运营调研,可以发现历史购买牙膏的人群包是什么样子。那么就可以把前期调研所得到的人群包通过 ID Mapping 转换为知乎 ID 并导入生成目标人群。但是广告主拿到购买牙膏的人群可能存在与知乎用户重合度较低的情况。这时候启用第二个功能,也就是人群泛化功能。

人群泛化会把导入人数较少的种子人群连接到知乎,这个过程可以对用户达成的所有特征在 AI 模型中完成训练。可以训练出种子人群在知乎所有用户特征下的模型是什么,之后再把所有用户的全部特征灌入得到的模型之中进行推理。这样得到带有置信度的目标用户。

若广告主认为基于之前的调研结果来看,相关目标人群在知乎中为 1,000 万左右,此时我们就可以选择对于目标用户的置信度。比如说当置信度为 0.7 时,得到结果为 2,000 万;之后我们把置信度调整为 0.8 时,得到结果为 1,000 万,此时我们就可以选择 0.8 的置信度完成广告引擎的对接并进行投放后分析效果。

基于上述运营流程,我们可以抽象出 DMP 平台最核心的功能包括洞察、定向以及 ID mapping。

画像特征

img

图 1.3 DMP 画像特征

我们根据上述的用户画像,构建出了画像特征。其中标签是最重要的部分,也是离散部分。连续部分包括了用户的停留时长以及相关的用户行为,比如:某人在某地做了什么事等,这些都属于连续特征。特征方面,在该特征还没有打上标签之前,我们会统称为普通特征。

功能梳理

img

图 1.4 DMP 功能梳理

基于 DMP 平台的功能,向右侧拓展为业务功能。业务功能会服务于运营、销售或站内的应用系统,包括人群定向、人群洞察以及相关的 ID Mapping。向左侧拓展是信息量巨大且十分重要的特征接入部分。

当前 DMP 平台由于单从标签方面就有 250 万的标签量级,在用户 X 标签也有 1100 亿相关用户数据,同时业务方面对部分标签具有实时性要求。这也就导致在特征接入过程中需要做很多事情。

接下来将为大家介绍具体功能。

  • 人群定向。 人群定向方面整体上分为导入与导出、特征圈选以及人群泛化这三个功能。
  • 人群洞察。 包括构成分析和对比分析两种功能。构成分析部分我们可以简单理解为一个饼图或柱状图。对比分析是多个人群对比分析。
  • ID Mapping。 整体上将无论是 oai、idfa、手机号,全部硬生成知乎的连续统一 ID,而且这个连续 ID 基本是严格自增的。
  • 特征接入
    • 建设方式分为实时特征及离线特征
    • 标签组方面有离线和实时两种接入方式。其中树状标签主要用来应对复杂场景,如用户对某话题在阅读和互动方面的是多选的树形结构。

DMP 架构与实现

img

图 2.1 DMP 业务架构

我认为架构对于实现最终目标是很重要的阶段,但并不是必要阶段。只要我们把所有功能都进行完善就可以完成我们所有的业务实践,但是这样会导致在系统经过不断膨胀后,所对应的维护成本也会不断变高,稳定性变差,最后导致没有人可以维护的窘迫情况发生。架构主要可以为我们解决在多个复杂业务功能场景下,如何以低成本的方式进行维护迭代并有目标的去针对某个模块进行优化,但并不能解决实际的业务功能问题。

基于以上我对架构的认知,对业务以及整体 DMP 架构进行拆解:

DMP 使用用户

DMP 系统对接的是 3 类用户:

  • 平台方面,包括广告平台、信息流、广告引擎以及触达系统。
  • 操作人员,包括运营、投放以及销售等业务相关操作人员。
  • 诸如特征开发的产品及相关内部产品。

而这三部分人所对接的最前台的系统也是不一样的。

首先我们认为,平台或系统方面会与 DMP 的接口层对接。接口主要分为三种:

第一种接口是诸如广告引擎和信息流经常请求用户命中了哪些人群包列表。 在广告引擎内,完成请求之后就可以直接把人群包列表变成某个广告 ID 并完成竞价。信息流与广告引擎类似:当前用户若命中了我们要提权某内容或领域标签时,我们就会进行提权。该接口的设计就是典型的高稳定性、高并发、高吞吐。我们可以通过线上数据来进行该接口与其他接口的承载差别对比:该接口当前承载了 10 万 qps,由于接口对接了公司的核心系统,因此不能有任何抖动与故障,对它的稳定性要求达到 S 级,所以该接口也有多机缓存和高并发方面的相关设计,需要能够达到高稳定性、高并发、高吞吐的目标。

第二部分是站内与站外的人群包,该部分和上述内容也比较类似,都会对接到我们最核心的系统。一旦人群包无法圈选人群,后面整体的营销与定向投放也都会受到影响。对于 DMP 前台部分,该部分和接口层存在着明显区别:DMP 前台主要对接的是我们的内部运营同学与销售同学。DMP 前台若产生异常情况,只是会不能进行新的洞察以及人群定向的,不会影响正常使用历史人群。由于该部分会对接众多的销售和人群而不是对接重请求的接口,使用的复杂性也就必须要降到最低,减少在运营方面的培训成本,所以 DMP 前台就需要具备操作简单且使用成本低的特点。

第三方面是对接我们的内部系统,这部分主要会降低我们日常开发的成本。

DMP 核心功能

DMP 能够支持人群圈选、泛化、人群洞察的核心业务模块;支持标签生产, ID Mapping 还有计算任务运维和存储方面的功能。

DMP 业务模块

DMP 业务模块分为上下两层,向上的业务层实现新增功能的低成本化,重点在于可扩展性;向下的业务层随着人群与业务功能的增长,整体的开发或技术投入成本不会有太大的产出,也就是资源上的可扩展性。

DMP 基础设施

最下方是基础设施,需要保证基础设施相关的稳定性。

我们判断接口的依据是请求的接口主要承载是 Redis;Doris 主要承载了 DMP 前台和整体业务功能;后台部分主要承载是 MySQL 与 TiDB。以上是我们当前具体底层数据库的相关承载方面。

有人会问 Redis 成本是否会太高?不会的。因为核心的圈选人群逻辑都是在 Doris 上实现的,存放的大量相关标签都是通过 Doris 进行存放,只有在某个广告要指定某目标人群的某几个特征进行排列组合并且完成泛化时,我们会圈选出某个人群包 ID 对应的结果,最后才导出存放到 Redis 中。因此 Redis 的主要目的是用来扛高并发,实际的存放量很少。

DMP 平台功能盘点

功能盘点主要分为业务向与基础向两部分。

img

图 2.2 DMP 平台功能盘点-业务向

业务向

业务向我们能够支持人群定向以及人群洞察两部分能力。

人群定向:

  • 人群预估:比如说对性别、年龄、感兴趣的话题、该用户手机品牌是等多个条件进行排列组合,要求能够在 1 秒内完成精确结构的人群特征量级预估。
  • 人群圈选:经过精确结构的人群数量预估后,可以在分钟级别内将预估结果转化为要进行投放和使用的相关人群包。
  • 人群包泛化:泛化的能力要求尽可能简单,比如说我选择有历史的人群包后,就可以进行人群泛化并有具体的执行度选择。

人群洞察:

  • 可以探索当前活动入口画像,并完成流量回收。比如说我向 100 万人发布了推送,其中有 3 万人点击,那么可以对这 3 万人进行流量回收,与已推送的 100 万人进行对比,就可以这 3 万人明显的用户特征,方便我们后续提取出更精准的用户群体。

基础向

另外 DMP 架构还有一些基础功能,包括了主要特征建设、ID mapping 以及计算任务运维。

img

图 2.3 DMP 平台功能盘点-基础向

这三个基础功能不仅可以让我们快速完成实时和批量计算,还能够帮助我们解决新老版本滚动上线的问题。因为我们当前无论是通过 AI、数据采买、特征筛选,找到一个用户,即使是性别这种最基础的特征,也是在不断优化的过程,但每次优化是没有办法快速进行运营影响的评估,因此就需要做到多版本灰度上线,并进行滚动上线。

特征建设

特征整体有两部分,一种是原子特征,一种是派生特征。

  • 在建设原子特征时,我们就需要从离线或实时数据中生产大量相同基准的特征。
  • 对于派生特征,会基于已生产的特征再生产一个特征。举例:假如我们认为某群体是高消费能力群体,放在一个简单的场景中,我们可能会圈选出一位在 18-25 岁之间并在一二线城市的女性,并认为这样的特征可能是对化妆品消费能力比较高的群体特征。之后我们就会把该特征作为派生特征进行存储并去加快后续计算速度并降低运营筛选的成本。

特征建设可以做到能力隔离,以此来提升我们特征建设和上线效率。

Mapping 能力

包括设备 ID Mapping、用户特征 ID Mapping、泛化特征 ID mapping。该部分整体场景主要是统一 ID,并将 ID 从差别较大、类型不同的不连续 ID 变成连续统一的 int 型自增 ID。

计算任务运维

任务运维主要是完成 DAG 的调度与计算资源管理。如果大家用过 Doris 的话,就会知道 Doris 会使用最快的速度完成每一个 SQL 的执行。因此在进行人群预估时就需要做好排队的速度,否则突然有一波运营动作或热点事件时,可能会出现预估出多个人群包的状况并把所有资源都占满,这样都会互相受到影响,所以就需要通过任务运维进行资源的优先级排队,逐一执行相关人群包的圈选工作。

总结

  • 特征建设可以做到能力隔离,以此来提升我们特征建设和上线效率。
  • ID Mapping 屏蔽了我们 ID Mapping 的困难成本。我们会分为完成原子特征建设、完成派生特征建设以及进行基础设施的建设这三部分。当基础设施建设同学完成屏蔽或在架构上隔离之后,特征建设的同学就不需要管 ID Mapping 方面的问题,只需要管专注于建设特征即可。
  • 计算任务运维部分,对于业务开发同学并不需要知道底层到底发生了什么,为此我们要有一个同学完成对底层的封装后向上层提供一个接口,业务侧可以直接使用底层的功能的同时屏蔽了底层的复杂性。通过抽象与屏蔽,可以明显的提升最终上线与建设的效率,并能让其他某些工作从研发侧转移到运营侧。

举例: 我们当前有两种特征,第一种是原子特征。在形成原子特征的过程中,写一个 SQL 就可以形成一个特征。分析师与业务产品均可以参与特征的建设过程。第二种是派生特征。我们在运营后台上具备派生特征的交并差的能力,一些业务上的运营动作可以直接在管理后台进行操作并完成派生特征的建设。这样主要的工作量从研发侧逐渐转移到了产品侧与业务侧,明显的提升了各种能力和特征上线的效率。

DMP 核心介绍

DMP 核心部分有两方面:数据的写入/导入以及快查/快读。写入和导入是链路及存储的一部分,快查和快读我会在后续进行介绍。

特征数据链路及存储

img

图 3.1 特征数据链路及存储

写入部分流程首先是离线链路:离线链路会从各个业务的 Hive 存储上跑相关的 SQL 并生成一个 Tag 表。我们会在 Hive 上落一份 Tag 表后完成离线 Mapping。这个离线 Mapping 过程会请求通过用户设备核心自动生成统一连续的用户 ID,同时在离线 Mapping 的过程会把 imei、idfa、oaid 等数据进行转换和唯一绑定,若过程结束后发现新用户,则生成新 ID,若是老用户则获取用户 ID。通过这个过程,生成 ID mapping 的表,再进行大小写等复杂流程就可以得到用户唯一 ID 与映射 ID 的 Mapping 表,这就是我们得到的第一个表。

接着我们会在 ID Mapping 后进行枚举采集:当前标签组是 125 个,由 120 个离线特征和 5 个实施特征组成。当我们完成这 125 个相关数据的开发之后,数据相应的原子特征就可以通过 Mapping 直接拿出来。之所以要进行枚举采集是因为用户在使用过程中需要标签的搜索功能,当用户搜索标签时,250 万人工录入的成本过高,因此我们在离线和实时处理的过程中会将枚举采集出来,并且通过 Bulk Load 的写到 ElasticSearch 中。在这个过程也会生成连续的自增 ID 去映射用户标签的倒排表,也就是 tag_map 表,这是我们得到的第二个表。另外还存在第三个表用户行为表,这张表是我们在实时数仓方面构建的,因此没有单独强调那一部分。

基于上述三张表的部分,我们形成了三套存储:

  • 第一套是在 ElasticSearch 上的搜索标签存储。
  • 第二套是在 Doris 上,也是最核心的存储。
  • 第三套是整体 ID Mapping 的存储。

获取到这 3 个存储后,可以进行多种 Join 和查询,为后续的洞察及人群定向提供了基础。

接下来为大家公布几个量级:用户 X 标签量级,为 1,100 亿;ID Mapping 是一个宽表,量级是 8.5 亿;ElastichSearch,量级是 250 万。这三个量级也是我们为什么选择 ElasticSearch 和 Doris 的原因。

人群定向流程

上述的数据导入后形成 3 张表,这里是利用这 3 张表产生人群相关定向和人群包。

img

图 3.2 人群定向流程

人群定向流程分为两种:

  • 第一个是通过购物车筛选人群标签后进行人群预估,最后完成人群圈选回写到 Redis 的流程。
  • 第二个是人群泛化,通过 AI 平台完成 AI 模型的整体训练及人群的推理,再回写到 Doris 中,通过置信度进行选择并打上标签。

简单介绍一下这两个流程的过程:

整体的标签搜索。 用户的前台在产出标签搜索的事件之后就会去完成标签的搜索:通过思考各种名字组合寻找想要的标签后,我们会把这个标签放在标签购物车中并立。这个过程就是不停的向人群购物车中加各种标签和组合条件后,查看人群数量的过程。

这个过程存在的原因是在日常运营使用中,我们会对每次推广或目标群体进行量级预估。如果这个事件原本只涉及 200 万到 300 万左右的人群,经过人群圈选预估出来是 5,000 万,那么肯定是我们圈选条件不够精准,这个情况下我们就需要逐渐添加各种精准的条件,并把圈选控制在合适范围的量级后再形成人群包,所以这个过程会不断进行循环并获取到合适的标签/特征的组合。在获取到合适的组合之后,我们需要确定这个标签的目标和人群是,这个过程就会生成人群包。生成人群包的过程会进行连表操作并关联原数据,同时也会关联 ID Mapping 的表。若出现导出到站外的情况,则会做 ID Mapping 的表并完成站外的 ID 转换。之后再把导出的人群包 ID 与人群 ID 写入 Reids 中,写入之后进行通知。

如果只需要提供人群包来发布推送和短信等的业务就不需要写到 Redis 之中,这样可以大量释放存储并写到离线存储上。比如说一方面是 HDFS,另外一方面是我们对接的对象存储就会写到这些存储之中。由这些存储直接传给推动系统后,信息系统就可以直接拿到人群包并批量的给相关人群发布相关 Push 或推送。

人群泛化。 人群泛化流程最开始可能会有上传人群包的过程,也有可能没有。这个过程主要解决有些业务中,我们拥有某些历史活动的人群并需要进行人群泛化的问题。如果说它的人群包之前点击过我们的 Push,可以直接筛选,筛选完成之后关联所有的用户特征进行用户训练,模型训练完成后再对全站用户进行推理,推理出一批带有置信度的人群 ID 的结果并返回写到 Doris 之中。在这个过程中会同时发起另外一个流程,此流程会对用户侧的泛化的结果进行筛选,可以根据合适的置信度选择合适的数量。

接下来为大家介绍几个常用流程: 在开发完成之后,最核心的流程就是加标签和购物车并完成圈选后,传统的人群进行泛化的流程。但是经过和运营侧沟通后,我们发现日常工作中,运营侧实际上会将我们这几个流程反复进行叠加使用,实际的使用有这么几种:拿到带有历史效果的人群并进行泛化,但是完成泛化之后效果他的用户特征也会被相应被扩大,之后再叠加本次运营特点的标签后完成圈选并进行使用。

第二种是获取到历史效果后进行洞察和分析。 包括查看用户的画像后再重新根据标签关系圈选,之后又叠加了一次历史正向人群包后再去进行泛化。泛化之后再实现分发条件,最后再进行圈选,将该人群包给广告与相关的投放业务。运营侧会做很多基于原子能力以外更复杂的一些组合后再进行使用。

人群定向性能优化

背景

img

图 3.3 人群定向性能优化 背景与难点

当前 DMP 系统中有两大功能,第一大功能是人群定向,另外一大功能是人群洞察。基于这两大功能会有一个底层的功能是建设各种用户方面的画像特征。当我们完成拆解之后,我们就会发现人群定向的这部分功能是运营侧或业务侧的痛点。

场景要求

  • 人群预估,针对投放和营销场景,运营侧会有人数预期,那么会构建相应规模的购物车,持续在购物车中加入新的特征,需要立即看到新的特征加入之后会圈选出多少人,而不是每次加入新的特征后都需要很长时间的等待。
  • 人群圈选,针对热点运营。运营侧在日常工作中会持续跟进发生的各种热点事件,当发生了某些热点事件后,要快速的圈选出人群包发布 Psuh 和 推荐。如果圈选过程需要好几分钟,就会错过热点事件。

难点

  • 第一个数据量极大,如上图标注。
  • 第二个期望时间很短,人群预估与人群筛选分别能够在一秒钟内和一分钟内完成。

性能优化(1)

第一阶段优化我们通过了以下几点来解决这两个问题:

img

图 3.3 人群定向性能优化 第一阶段

倒排索引和按条件查询

img

图 3.4 人群定向性能优化 倒排索引及 ID Mapping

  • 首先,倒排索引方面,我们将查询条件由原先的 and or not 改成了 bitmap 函数的交并差;同时我们把连续数值打散成为了离散标签。举例:用户的年龄是大于 0,小于 100 的 int 型,如果按照数字顺序进行筛选,运营侧是不好把控的,圈选的过程中也会导致使用效果不理想。因此我们把按照顺序排列的年龄打上另外的标签,称为年龄段,比如 18-25,0-18 等。
  • 接着,把原先的 and or not 的查询转换为了倒排索引的相关查询,原先建立的表就会变成按照 tag_group 、tag_value_id 、置信区间的标识、bitmap 的顺序排序。同时基于这部分我们也需要进行 ID Mapping,ID Mapping 在导入的过程中的核心就是要把用户 ID 变成连续自增的。

查询逻辑变更

img

图 3.5 人群定向性能优化 查询逻辑变更

原先的查询条件是 where 条件中的 and、or、not,现在经过复杂的手段,把原先的查询条件修改成 bitmap_and,bitmap_or,bitmap_not,我们通过业务代码,将外部运营通过可视化后台配置的 and、or、not 的逻辑全部改为函数式的逻辑,相当于把 where 条件放到了函数和聚合逻辑之中。

但经过优化之后还会存在 2 个问题:

第一个问题是单一的 bitmap 过大,第二个问题是 bitmap 的空间分散。这两个问题集中导致每次进行交并差聚合时网络 IO 特别高。

底层 Doris 中用的是 brpc。在数据交换的过程中,因为每一个单一的 bitmap 都很大,就会导致 brpc 传输拥堵,有时甚至会出现上百兆的 bitmap 进行交换的情况。上百兆的 bitmap 进行交并差计算时性能很低,基本上我们想要达它达到 1 分钟圈选人群,1 秒钟进行人群预估是不可能的。

性能优化(2)

基于仍存在的问题,我们进行了第二阶段的优化。

img

图 3.6 人群定向性能优化 第二阶段

分而治之

第二阶段的核心的思路是分治。当我们进行了第一波上线后,发现人群预估能力是分钟级别,圈选基本上要到 10 分钟开外了。分治的思路是将全站的用户全部打成连续自增 ID 后,按照某个程度进行分组。比如说 0-100 万是一组,100 万-200 万是一组...逐渐分为几个组别。全站用户的交并差,可以等价于分组之后的交并差结果之和。

img

图 3.7 人群定向性能优化 分治

数据预置

当我们发现这个规律之后,通过分而治之可以做相关的数据预置。利用 Doris 中 Colocate group 特性,把每个分组内 100 万人全部放到某一台物理机上,避免网络的开销。

算子优化

全部放到某一个物理机上之后,就可以把聚合的算子由原先 bitmap_and_not 的 bitmap not 和 bitmap count 替换成一个函数来实现。此时基于 Doris 团队的新版本,增加了类似 bitmap_and_not_count 的组合函数后,性能相对于原先的嵌套函数有了比较明显的优化。

解决方案

基于上述解决思路,我们设计了新的解决方案。

新的解决方案以上 3 个思路进行拆分,包括查询逻辑的变更,预估变成子逻辑的求和、人群圈选变成子逻辑的合并。

  • 由于把原先几个 bitmap 的计算变成了多个小组 bitmap 计算,能进一步的提升多线程的并行度,使计算速度提升;同时也对代码进行了优化,将可复合的 bitmap_and_or_not 函数在提交时合并成同一个函数;在写入过程中把分组 ID 和相应的百万分组进行写入调整。
  • 离线和实时之中都会写相应的 tag 表。在完成 tag 表的写入之后可以把每一个 tag 之中不同的 user tag 写到不同的物理机上:比如可以将 300 万拆开分别写在三台不同的物理机上,完成物理机方面的区隔。这里借助了 Colocate group 以及 Group key 进行设置。完成写入之后,计算过程从原先的整体计算变成独立按照每一个 Group 进行计算。由于整体的 bitmap 很大,每一个独立的 Group 又都在一台物理机上面进行计算,速度有非常明显的提升。
  • 在每一个 Group 计算之后进行合并,合并之后,人群预估变成了不同物理机上面的数字简单加和,结果基本达到秒出。人群圈选也就变成了不同物理机上面的 bitmap,再 Shuffle 出去做最后的合并,这个过程量级很小,可以做到 1 分钟之内输出结果。

优化结果

下面两张截图分别是还没有进行合并之前以及合并之后的查询计划。

img图 2.7 人群定向性能优化 数据预置

优化前: 在查询的过程中,首先我们需要针对某一个 tag 做一个 bitmap_and 和 bitmap_not 或者 bitmap_or,在这之后另外几个 tag 也会做相同的聚合,在聚合完之后再做一次 Shuffle,最后进行 Join。同时另外的部分也会进行聚合,经过聚合之后再进行 Shuffle 和 Join。

这几次聚合过程中,每一个 tag 都有非常高的成本,都需要经过聚合—网络传输—再聚合—再网络传输的过程后再做 Join。

优化后: 查询计划有了非常明显的改变。只需要通过一个函数在合并的过程中进行查询,合并完成之后就可以完成最终的结果合并。无论是 int 类型的相加还是 bitmap 的合并都只有最后一层,速度有显著提升。原先人均预估可需要分钟级别完成,优化后,只需要几百毫秒便可完成,即使是复杂到上千个条件也只需要一秒就可以完成。

人群圈选也和上述过程类似:在条件复杂的情况下,可以做到一分多钟到两分多种之间完成。如果只有几十到 一百个的条件的话,人群圈选都可以在一分钟左右完成。

整个过程主要对数据进行了拆分,由 Doris 的 Colocate 原理把拆分后的数据提前预置在某一台物理机上面,通过优化,可以满足大部分场景的运营要求。

未来及展望

业务向

img

图 4.1 未来与展望 业务向

如红色框选所见,当前的系统流程是人群定向之后进行 Mapping,在用户洞察上是围绕人群进行建设的,同时与各个业务侧在 Mapping、洞察以及人群等环节进行对接。但是在这个流程中,如何通过运营达成目标、如何设计 AB 方案,两个部分是松耦合的。

未来我们希望 DMP 运营平台不光是松耦合的模式,而且能够在在业务上执行强耦合、强绑定的模式。这样的运营模式在使用过程中会更舒服,可以完全在 DMP 平台上完成了整体运营流程,并可以根据运营效果设计相关的 AB 实验,不断优化。

技术向

img

图 4.2 未来与展望 技术向

技术建设过程中,最主要的就是圈选人群。运营侧甚至会选几百个条件进行人群圈选。而这些运营人员可能分属在不同业务,这会导致他们的基础条件写得很相似。对于这种相似的基础条件我们会人工建立相应的 bitmap 进行预合并,再去基于此特征圈选,由于预合并的缘故会明显提升我们后续的执行速度。

第一个是查询效率。 对所有运营的人群圈选进行定期扫描及 SQL Parser。经过解析自动设计 SQL 的聚合条件进行预聚合,合成相应的 bitmap 的同时注册到相关的特征。在人群圈选时我们也会通过相同的 SQL Parser 自动将原先圈选的 SQL 改写,在改写之前可能会有好几十个特征,而他们又正好等于某一个派生特征的结果,此时就可以直接替换成派生特征。这个举动能进一步的提升我们查询的圈选速率。

第二个是导入速度。 我们经过五天的时间,每天需要导入大概 2TB 的数据量,存储了 11TB 的数据,数据量比较大,我们希望在导入的过程中可以进一步的提速。当前我们了解到业界有做 Spark 直接撰写具体 OLAP 引擎文件,我们也在思考是否可以通过 Spark 直接撰写 Doris Tablet 文件并挂载到 FE 上面,让我们能够快速完成导入或写入。

Q&A 环节

Q:知乎的标签体系有多少标签?记录量是多少?后台是一张还是多张的大宽表?在人群圈选的时候进行表链接,业务人员能否实时显示圈选出的人群特征和人群数量?

A:知乎的标签体系很大,包含了用户、内容、商业以及业务方面治理与安全等很多方面的标签,DMP 系统方面主要会与用户方面的标签进行对接。就单论通过认证且正在使用的标签组而言就有将近 700 多个,如果在加上业务方面在提未认证标签可以达到上千个。对于我们正在使用的用户方面的标签有 120 个标签组以及 5 个实时标签,总共 125 个标签。

记录量方面有 1100 亿的记录量。

后台不是一张宽表。在子标签完成生成后,会生成出独立的 tag1、tag2、tag3 的数据源表。经过我们将这些表写入 DMP 之后最终才会变成一个大宽表,在 DMP 中是问题中的一个大宽表,在业务中则是每个独立的标签表。多张大宽表在进行人群标签圈选时会进行连接,我们在经过数据处理后,会将数据写入到一张表中而不再是一张大宽表。

由于我们的优化,在这一张表中的存储的文件已经不会再按照 Tag ID 这种查询进度缓慢的方式进行分散。我们会按照存储的 Key,比如说 0~100 万的 ID 都会分在相同的地方进行存储。我们在计算的过程会在同一台物理机扫描出来,在经过聚合逻辑后就可以拿到结果。所以也就能够做到实时圈选相关数量的结果。

Q:人群圈选是基于经验进行标签组合圈选吗?投入后的效果如何分析?是独立的分析平台工具吗?如何知道投放人群包的转化率?转化是否回到打标签中利用另外的分析平台进行分析?

A:人群圈选可以分为两部分。第一部分是我们基于运营的经验进行圈选,这个部分中又分为已知人群圈选与未知人群圈选两个分支。

已知人群圈选,意味着运营已经对这个场景非常明确。能够熟知我们在运营的用户群体就是某个性别以及用户年龄段等,这时候我们就会基于历史经验进行圈选。对于完全未知的用户特征,我们会直接圈大盘。

这两种运营流程的区别就在于已知用户群体圈选的准确率会更高。基于已知的结果,我们几乎不再需要不用进行 AB 实验就可以完成本次投放。对于完全未知的用户特征而言,如果直接圈大盘的话,我们就一定需要进行小流量的 AB 实验发现点击 Push 的用户都满足某一个兴趣后,就可以基于这部分兴趣积累经验,之后再设计一个 AB 实验并调整人群特征至合适场景,直到效果逐渐的达到期望目标后,就会从未知的人群变为已知人群。

还存在另外一种经验。比如说广告主的经验,广告主可能在知乎中并没有历史投放经验,但是广告主知道购买过我的产品人群有哪些,比如说他们手机号的加密 MD5 或手机 idfa 的加密 MD5 等,这样就可以将其他站投放过的效果完成导入,形成基本的人群。通过人群泛化,和站内所有的特征进行 Join 后去训练模型,通过 AI 的能力自动寻找到我的历史购买人群有怎么样的显著特征,之后就可以完成这部分泛化的全选。基于泛化的全选后,还是会经过相同的链路并完成这部分的数次循环循,之后就可以知道我这个场景下应该投放给哪些用户。

转化率我们在单独的地方进行查看,这也是我后期想要集成在 DMP 平台内做到的功能。我们在单独的页面上可以看不同 Push 的转化率。DMP 平台上面只能通过效果回收进行查看。

Q:后台都是基于 Doris 吗?多少节点是一个集群呢?

A:后台主要的计算方面都是基于 Doris。在高吞吐方面我们也依赖于 Redis。TPP 方面我们用了 TiDB。当前 Doris 集群是 6 节点,64 核心 256g 的 BE;3 个 FE 是 6 节点,16 核心 32g 的集群配置。

Q:人群放大靠谱吗?所有的人群圈选占比有多大,用的是什么算法?

A:人群放大是比较靠谱的。从运营侧的反馈可以得知:如果只通过广告主或只通过基于列入历史运营效果拿到的数据基本上无法支持完成本次运营,但是如果把我们所有的特征都加入并进行训练的话,基本每次都会有比较明显的提升,在 CTR 方面,能够达到 80%-90%。置信度调整为了 80%。

人群圈选业务使用占比会比一般圈选要少一些。对于一般圈选而言,我们当前历史上已有的特征也带有置信度。我们基于这些已有特征基本就可以完成绝大部分的运营工作。而人群泛化主要是用来解决的是当我对这部分客户完全没有认知,同时又想将站内全部随机大盘用户导入,进行用户群体特征探测的情况。这个过程其实对运营侧而言工作量比较大,只有在这种特定情况下才会选用泛化,所以泛化的占比按照比例来讲是不多的。比如说每天有 300 个基于特征和标签的定向,而每天基于算法方面的泛化是 1~2 次。

用的是什么算法我还没有细看过。当前我们会通过数据来调用 AI 同学的相关的算法。我们当前提供的就是将用户的所有特征都准备好后灌入到 AI 的自动训练的模型之中。在完成训练之后,我们再调用这个模型并把所有特征都灌入进行推理。

Q:AB 如果要用 Reids 查标签该如何设计?要如何保持实时性呢?

A:对于问题中 A 表和 B 表要查标签,数据量会爆炸,这个情况是的确存在的。所以我建议做标签,最好所有的标签都在这一个表里。通过我们当前经历的探索得出的结论,我们对于该问题的解决方案就是每一台物理机可能会存多个 100 万,但是要确保每一个 100 万的分段都在同一台物理机上,它就可以变成这台物理机的 Scan 以及聚合之后进行直接运算,所以它就不存在双表的 Join 问题,可以直接在表内进行聚合。我们这边有好几个类似于 bitmap and or not 的标签的计算,但是在算子方面,算子已经是被合并在聚合算子里面并完成聚合,聚合后再做一个最终的数据合并,这样的话性能会好很多,而且也能避免 A 表和 B 表做 Join 的结果。

对于第二个问题,我们完成人群的 ID 聚合都会通过这个函数。当这个函数走完之后,它会生成当前投放特征下的人群列表,我才完成 Join。在这个时候,普通的 Join 就不会有非常爆炸的数量,也不会涉及到上千亿的快速的查询计算。

Q:可以解读一下关于 250 万个标签的相关内容吗?

A:大家可以在图 1.3 中看到,出现像 250 万个标签主要是因为一个性别在标签组内算作 1,而在标签方面则会拥有男、女、其他 3 个标签。在手机品牌中,一个标签组下我们当前也是收录了将近 20 多个手机品牌的标签。之后还有话题兴趣的标签组中相当多的话题兴趣的标签数量。比如说知乎站内其实有很多话题,某些用户可能对影视内容感兴趣,也可能对母婴内容感兴趣,同时也可能对教育或学生内容感兴趣,以上的话题兴趣有具有连续的共性点。连续标签方面我们会在后续的文章中继续为大家介绍。当前用户画像的内容方面,如果从标签进行分组,都是属于离散标签。连续标签更多的是用户行为或者是操作数值等。

Q:标签和特征的关系是什么?标签又是怎样建立的?

A:我们定义特征是要比较比标签大的,可以理解为我们当前的特征中 90% 是标签,剩下 10% 是用户行为的比例。

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

用户案例

导读:同程数科成立于 2015 年,是同程集团旗下的旅游产业金融服务平台。2020 年,同程数科基于 Apache Doris 丰富的数据接入方式、优异的并行运算能力、极简运维等特性,引入 Apache Doris 进行数仓架构 2.0 的搭建。本文详细讲述了架构 1.0 到 2.0 的演进过程及 Doris 的应用实践,希望对大家有所帮助。


业务背景

业务介绍

同程数科是同程集团旗下的旅游产业金融服务平台,前身是同程金服,正式成立于 2015 年。同程数科以“数字科技引领旅游产业”为愿景,坚持以科技的力量,赋能我国旅游产业。

目前,同程数科的业务涵盖产业金融服务、消费金融服务、金融科技及数字科技等板块,累计服务覆盖超过千万用户和 76 座城市。

图片

图 1.1 业务场景-业务介绍

业务需求

主要包含四大类:

  • 看板类:主要包括业务实时驾驶舱以及 T+1 业务看板等。
  • 预警类:主要包括风控熔断、资金异常以及流量监控等。
  • 分析类:主要包括及时性数据查询分析以及临时取数等。
  • 财务类:主要包括清算以及支付对账需求。

图片

图 1.2 业务场景-业务需求

综合以上业务需求,我们进行了系统架构建设。

架构演进之 1.0

工作流程

图片

图 2.1 架构演变-架构 1.0

架构 1.0 是前几年非常流行的以 SteamSets 和 Apache Kudu 为核心的第一代架构。

该架构通过 StreamSets 进行数据库 Binlog 采集后实时写入 Apache Kudu 中,最后通过 Apache Impala 和可视化工具进行查询和使用。这个过程存在架构链路较长以及 SteamSets 对部分配置复用性表现欠佳的问题,另外 Apache Kudu 的多表关联与大表关联存在一定的性能瓶颈,且对 IO 方面要求较高。

图 2.1 下半部分中实时计算流程的应用与上半部分较为相近,在实时计算中,埋点数据发送到 Kafka 后会通过 Flink 进行实时计算,并将计算结果数据落入分析库与 Hive 库中用于数仓关联。

优势与不足

图片

图 2.2 架构演变 优点与缺点

优势:

  • 架构 1.0 选择了 CDH 全家桶。CDH 提供了众多大数据组件,可以相互集成并投入使用,同时其配置相对灵活。
  • 使用的 SteamSets 支持可视化拖拉式与配置式的开发方式,因此开发人员对 SteamSets 的接受程度较高。。

不足:

  • 组件引入过多,维护成本随之增加;当数据出现问题时,排查与修复链路相对较长。
  • 多种技术架构和过长的开发链路,提高了数仓人员的学习成本与要求,数仓人员需要在不同地方转换后再进行开发,导致开发流程不顺畅、开发效率降低。
  • Apache Kudu 在大表关联 Join 方面性能差强人意。
  • 由于架构使用 CDH 构建,离线集群和实时集群未进行分离,形成资源相互竞争;离线跑批的过程中对 IO 或磁盘消耗较大,无法保证实时数据的及时性。
  • 虽然 SteamSets 配备了预警能力,但作业恢复能力仍相对欠缺。配置多个任务时对 JVM 的消耗较大,导致恢复速度较慢。

架构演进之 2.0

工作流程

由于架构 1.0 的不足远多于优点,在 2020 年,我们调研了市面许多进行实时开发的组件,发现了 Apache Doris,通过调研对比,最终决定将 Apache Doris 引入了架构 2.0。

图片

图 3.1 架构演变-架构 2.0

引入 Apache Doris 后,对整体架构进行了以下改造:

  • 通过 Canal 的 CDC 能力,将 MySQL 数据采集到 Kafka 中。因 Apache Doris 与 Kafka 的契合度较高,可以便捷地使用 Routine Load 进行数据加载与接入。
  • 对原有离线计算的数据链路进行了细微调整。对于存储在 Hive 中的数据,Apahce Doris 支持通过 Broker Load 将 Hive 数据引入进来,因此离线集群的数据可以直接加载进 Doris 之中。

选型 Doris

图片

图 3.2 架构 2.0-选型 Doris

在选型的过程中,Apache Doris 整体表现堪称惊艳:

  • 数据接入: 提供了丰富的数据导入方式,能够支持众多数据源的接入。
  • 数据连接: Doris 支持 JDBC 与 ODBC 等连接方式,对 BI 工具的可视化展示比较友好,能够便捷地与 BI 工具进行连接,另外 Doris 实现了 MySQL 协议层,可以通过各类 Client 工具直接访问 Doris。
  • SQL 语法: Doris 支持标准 SQL,语法向 MySQL 兼容,对于数仓人员学习成本较低;
  • MPP 并行计算: Doris 基于 MPP 架构提供了非常优秀的并行计算能力,对于大表 Join 支持得非常好。
  • 最重要的一点: Doris 官方文档非常健全,对于用户而言上手较快。

系统选型调研时,我们也了解了 ClickHouse,ClickHouse 对 CPU 的利用率较高,在单表查询时表现比较优秀,但是在多查询高 QPS 的情况下表现欠佳。

结合以上几点因素,最终我们选择了 Apache Doris。

Doris 部署架构

图片

图 3.3 架构 2.0-Doris 部署架构

Apache Doris 部署架构极为简单,主要是 FE 和 BE:

FE 是前端节点,主要进行用户请求的接入、元数据和集群的管理以及查询计划的生成。

BE 是后端节点,主要进行数据存储以及查询计划的生成及执行。

Doris 运维十分简便:

3 月份我们对机房的机器进行了滚动式迁移,12 台 Doris 节点机器在三天内全部迁移完成,整体操作较为简单,主要用于机器下架、搬移及上架;FE 扩容与缩容动作花费的时间也不多,只运用了 Add 与 Drop 等简单指令。

特别注意:尽量不要使用类似于 Drop 等指令直接对 BE 进行操作。当使用 Drop 指令进行强制删除时,Doris 会提示并要求手动确认是否删除,强制删除后数据将无法恢复。因此建议采用接触方式下线节点,该方式在数据迁移工作完成之后,可以直接将 BE 节点再次加入,较为灵活。

Doris 实时系统架构

图片

图 3.4 Doris 实时系统架构

数据源: 在实时系统架构中,数据源来自产业金融、消费金融、风控数据等业务线,通过 Canal 和 API 接口进行采集。

数据采集: Canal 通过 Canal- Admin 进行数据采集后,将数据发送到 Kafka 消息队列之中,再通过 Routine Load 接入到 Doris 集群。

Doris 数仓: Doris 集群构建了数据仓库的三层分层,分别是:使用了 Unique 模型的 DWD 明细层 、 Aggregate 模型的 DWS 汇总层以及 ADS 应用层。

数据应用: 架构应用于实时看板、数据及时性分析以及数据服务三方面。

Doris 新数仓特点

图片

图 3.5 Doris 新数仓特点

数据导入方式简便,根据不同场景采用 3 种不同的导入方式:

  • Routine Load:主要用于业务数据的接入并作为消费 Kafka 的常驻任务存在。当我们提交 Rountine Load 任务时,Doris 内部会有一个常驻进程实时消费 Kafka ,不断从 Kafka 中读取数据导入进 Doris 中。
  • Broker Load:进行如基础维度表及历史数据等离线数据导入任务。
  • Insert Into:用于定时跑批作业,负责将 DWD 层数据处理,形成 DWS 层以及 ADS 层。

Doris 的良好数据模型,提升了我们的开发效率:

  • Unique 模型在 DWD 层接入时使用,可以有效防止重复消费数据。
  • Aggregate 模型用作聚合。在 Doris 中,Aggregate 支持如 Sum、Replace、Min 、Max 4 种方式的聚合模型,聚合的过程中使用 Aggregate 底层模型可以减少很大部分 SQL 代码量,不再需要自己做 Sum、Min、Max 等动作,对于从 DWD 层到 DWS/ADS 层较为友好。

Doris 使用门槛低,查询效率高:

  • 支持 MySQL 协议,支持标准 SQL,查询语法高度兼容 MySQL,对分析人员较为友好。
  • 支持物化视图与 Rollup 物化索引。物化视图底层类似 Cube 的概念与预计算的过程,与 Kylin 中以空间换时间的方式类似,均是在底层生成特殊的表,在查询中命中物化视图时将快速响应。

特别提示:物化视图虽然很有帮助,但在过多使用时,每个物化视图均需要占用额外的存储空间,数据导入时将会导致效率下降。

Doris 极简的系统架构,运维成本低:

  • 系统只有 BE 和 FE 两个模块,不依赖如 Zookeeper 等三方组件,部署简单。
  • 针对 FE 和 BE 的操作进行了监控配置,发生异常时会进行及时性重启。

Doris 经验总结

图片

图 4.1 如何更友好地使用 Doris

在使用 Apache Doris 的过程中,我们整理了一部分经验,帮助开发人员更友好地使用 Doris 。对于开发人员,最关注的地方有以下几点:

  • 开发方面: 如何将外部数据接入 Doris 并快速实现 ETL 开发,这会影响开发人员的报表产出速度。
  • 调度管理: 开发人员不希望在开发完成并上线任务后,出现报错或不稳定的情况,需要保证任务调度的稳定性以及调度恢复能力。
  • 数据查询: 由于生产与办公网络中间有隔断,办公网络不能直接使用生产网络的连接,并且无法通过客户端的形式解决网络隔断,只能通过 Web 形式解决,如何安全便捷地进行查询和分析成为开发人员关注的问题。
  • 集群管理: 集群出现异常状况时能够及时进行捕捉及自动化处理。

总而言之,我们希望建设一个高效率、高质量,高稳定性的平台。

Doris 开发优化

根据开发者关注的几个问题,我们进行了一些开发优化。

数据接入

数据接入方面进行了半自动化相关工作并做了快速生成组件,可以根据数据源/表生成 Routine Load 脚本,只要对 Kafka 的 Broker 或 Topic 进行修改就可以快速形成 Routine Load 任务。Broker Load 任务与 Routine Load 类似,在选择数仓源之后就可以及时生成 Broker Load 所需脚本。在接入 Doris 时需要提前创建表,对于这方面也可以进行类似操作,通过源快速生成创建语句。

图片

图 5.1 数据平台- Doris 开发

上述主要运用了底层元数据,根据不同的数据源拿到不同的元数据后就可以对任务进行快速生成。

提交动作和维护管理

在任务生成后,我们在 Routine Load 方面进行了封装。由于 Routine Load 是常驻进程,我们只需要再进行一次提交,状态就会变成 Running ,若出现异常状态会被检测出来,监控方面在后续会向大家进行展示。

图片

图 5.2 数据平台- Doris 开发

监控与管理

我们可以在对提交的 Routine Load 进行查询并检查是否存在异常,同时可以将我们需要关注的 Routine Load 加入监控中,监控会定期对任务进行自动扫描,发生问题时会进行提示并尝试将任务重新拉起。

Broker Load 同样可以对任务进行监测。针对于 Broker Load Label 名称不能重复的问题,我们采取生成 UUID 的方式进行解决,以此更好地帮助大家提升使用体验。

图片

图 5.3 数据平台- Doris 开发

如上图展示,我们可以在 Routine Load 中进行暂停和终止的动作,帮助大家更好地使用开发的作业与管理。

自研查询页面,集成 Doris Help 功能

由于生产与办公网段隔离,我们只能通过 Web 进行查询。之前我们曾尝试使用 Hue 集成 Doris 进行查询的方案,Doris 支持通过 MySQL 协议连接到 Hue ,但如果我们集成 Hue 的话,所有人都可以通过 Hue 查询 Doris 的数据,安全性问题无法保证,无法满足我们对权限的要求。

图片

图 5.4 数据平台-Doris 数据查询

所以我们在自己的平台内开发了查询页面来解决此问题。图中左边部分可以根据 DB 列出下面所有的表,右边部分是查询分析页面与查询结果,是我们自行开发的类似于 Navicat 的客户端软件。

同时我们对 Doris Help 功能进行了功能集成,在大家在不知道如何使用 Doris 时提供帮助。通过集成 Doris Help,我们可以通过关键字搜索的功能进行语法和示例查询解决问题。

即使没有集成 Doris Help,也可以在 FE 节点自带的 Web 页面进行查看,FE 节点内置可以查看整个集群信息且具备 Help 功能的 Web 页面。在我们实现自研查询页面并集成 Doris Help 后,可以直接使用,从而跳过需要使用 Admin 账号连接才可以使用 FE 的步骤。

Doris 集群监控页面

同时我们开发了 Doris 集群监控页面,在集群监控页面中可以看到 FE 、BE 以及 Broker 的节点状况。当集群发生异常状况时,监控系统会发送自动提醒并尝试将集群拉起,同时也可以通过页面化的形式观察节点的健康度情况。

图片

图 5.5 数据平台- Doris 集群监控

对于 Doris 上层应用而言,主要还是依赖 Doris 提供的 API 与指令完成 Doris 上层的应用动作,我们做的只是将 Doris 提供的指令针对使用者进行更友好地集成以及页面化展示。

新架构的收益

图片

图 6.1 新架构收益

  • 数据接入: 在早期通过 SteamSets 进行数据接入的过程中需要手动建立 Kudu 表。由于缺乏工具,整个建表和创建任务的过程需要 20-30 分钟。如今可以通过平台与快速构建语句实现数据快速接入,每张表的接入过程从之前的 20-30 分钟缩短到现在的 3-5 分钟,性能提升了 5-6 倍。
  • 数据开发: 在早期架构中进行聚合或其他动作时,需要写大量长篇幅 SQL 代码。使用 Doris 之后,我们可以直接使用 Doris 中自带的 Unique、Aggregate 等数据模型及可以很好支持日志类场景的 Duplicate 模型,在 ETL 过程中大幅度加快开发过程。
  • 查询分析: Doris 底层带有物化视图及 Rollup 物化索引等功能,可以提升查询效率,同时 Doris 底层对于大表关联进行了诸多优化策略,如 Runtime Filter 以及其他 Join 和自定义优化策略。相较于 Doris,Apache Kudu 则需要有较为深入的优化经验才能更好地使用。
  • 数据报表: 最初使用 Kudu 报表查询需要 1-2 分钟才能够完成渲染,而 Doris 则是秒级甚至是毫秒级的响应速度。
  • 环境维护: Doris 没有 Hadoop 生态系统的复杂度,整个链路较为清晰,维护成本远低于 Hadoop,尤其是在集群迁移过程中,Doris 的运维便捷性尤为突出。

未来展望

图片

图 7.1 未来展望

  • 尝试引入 Doris Manager: 社区中正在进行关于 Doris Manager 宣导,后续我们也准备引入并积极参与 Doris Manager 进行集群维护与管理。
  • 实现基于 Flink CDC 的数据接入: 当前架构中没有引入 Flink CDC ,而是继续沿用了 Canal 采集到 Kafka 后再采集到 Doris 中的架构,链路相对来说较长。采用 Flink CDC 虽然可以继续精简整体架构,但是还需要写一定代码量,对于 BI 人员直接使用感受并不友好,我们希望数仓人员只需要 SQL 或在页面上完成操作就可以使用。在 3.0 架构的规划中,我们计划引入 Flink CDC 功能并对上层应用进行扩充。Flink CDC 的引入为大家带来“快就是慢,慢就是快”的思想理念,当然 Flink 社区的发展速度很快,只有在充分学习大家的经验后,才可以更友好地引入,并在学习经验的过程中对架构进行迭代与优化。
  • 紧跟社区迭代计划: 我们正在使用的 Doris 版本相对较老,现在新版本 Doris 在内存管理、查询性能等方面有了较大幅度的提升,后续我们将紧跟社区迭代节奏对集群进行升级并体现新特性。
  • 强化建设相关体系: 我们现在的指标体系管理如报表元数据、业务元数据等维护与管理依旧有待提高。数据质量监控方面,虽然目前包含了数据质量监控功能,但对于整个平台监控与数据自动化监控方面还需要强化与改善。

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

用户案例

蜀海供应链是集销售、研发、采购、生产、品保、仓储、运输、信息、金融为一体的餐饮供应链服务企业,为广大餐饮连锁企业及零售客户提供整体食材供应链解决方案服务。因其业务比较复杂,2020 年底完成了以 Apache Doris 为核心的架构升级,并在 2021 年开始建设以 Apache Doris 为核心的数据中台。

在使用 Doris 之前,我们采用了 CDH 这套数据平台,用了很多组件,但其链路过长,并且开发和维护成本比较大,最后没有引入一个很好的 OLAP 系统。

因为我们的数据历史包袱比较轻,经过对 Apache Doris 的调研和测试,决定使用以 Apache Doris 为核心建设数据平台,它有以下优势:

  • 同时支持高并发点查询和高吞吐的 Ad-hoc 查询。
  • 同时支持离线批量导入和实时数据导入。
  • 同时支持明细和聚合查询。
  • 兼容 MySQL 协议和标准 SQL。
  • 支持 Rollup Table 和 Rollup Table 的智能查询路由。
  • 支持较好的多表 Join 策略和灵活的表达式查询。
  • 支持 Schema 在线变更。
  • 支持 Range 和 Hash 二级分区。
  • 高可用,能容忍部分节点挂掉。
  • 运维简单,部署,维护,升级都比较简单,不依赖外部组件。

架构图如下:

img

由于之前已经对元数据,数据服务,接入数据质量,血缘关系的建设做过介绍,本文将从数据接入,数据服务编排,数据安全,Doris 应用等方面进行介绍。

数据接入

数据接入功能是数据开发的重要一环,我们开发了一套数据接入系统,在 Web 端操作,实现零代码数据接入到 Doris,以下为主要功能介绍:

  • 订阅 MySQL Binlog,入仓到 Doris 表。
  • 订阅 Kafka Topic,入仓到 Doris 表。
  • 数据动态清洗,在页面编写代码即可完成数据入仓之前的转换。
  • 接入任务合并,为节省资源,支持分库分表在一个任务接入,支持多个 TOPIC 在一个任务接入。
  • 动态数据质量校验,配置字段质量规则,进行接入数据质量校验。
  • 入仓加密,再接入过程中,可以对敏感数据进行加密后再入到 Doris 表。
  • 错误数据管理,因为网络或者数据错误等原因,在页面可完成数据的重新入仓。
  • 数据接入链路监控,比如错误数据监控,数据生产链路异常监控,数据消费链路异常监控,任务数据接入趋势图,集群数据接入趋势图等。

数据服务编排

数据服务是供业务系统调用 API 获取数据的一个系统。可以在页面进行 API 新建、编辑、在线开发调试、设置限流、上下线等操作。由于 API 之间可能存在业务逻辑关系,并且不能在配置同一个 API,我们开发了数据服务编排功能,通过拖拉拽的方式,让 API 之间能够进行编排并进行数据传递,对外提供 API 时,仍然暴露的是一个 API。

数据安全建设

数据安全是一个很大的话题,涉及到方方面面,这里从数据加密,数据权限和数仓数据备份几个方面进行简单介绍。

数据入仓加密

在数据接入过程中,可以选择对字段进行加密,当接入到 Doris 表后,就已经是加密的数据,后续的数据分析,可以利用密钥进行解密。

数据权限

由于公司查看报表的人员分布很广,对于同一个数据模型,每个城市每个区域的销售,运营,工厂人员,管理人员等人员查看到的数据是不一样的,需要精确控制到行权限和列权限,所以我们在 Doris 上层开发了一套数据权限系统,通过配置化的方式,完成数据权限配置,可以精确到行权限和列权限。BI 报表系统作为一个接入方,引入数据权限客户端并实现相应抽象方法即可。

数仓数据备份

我们以 Doris 作为存储和计算的核心,Doris 本身数据已经是多副本存储,但是考虑到容灾,我们还是会对核心接入数据进行备份到 HDFS,为此开发了一套数仓数据备份系统,把 Doris 表按照全量或者分区,定时备份到 HDFS。

Doris 的应用

我们用 Doris 承载了数据分析的计算和存储。此外,还存在一个这样的场景:业务的 MySQL 数据库数据一直在增长,大量的历史数据影响业务线上性能,而且不能直接删除,因为还有低频的历史数据查询,为此,我们基于 Doris 开发了一套业务历史数据归档系统,可以定时把不再变更的历史数据进行增量归档,通过数据服务系统提供数据查询,把归档的数据推送给业务方,业务方进行校验,并删除历史数据。

收益

目前以 Doris 为核心的数据平台,已经支撑了公司几十个业务系统的数据查询和数据分析需求。为BI智能分析,各业务系统提供了优异的查询性能,并且极大减少了数据平台维护,数据开发,数据中台建设的成本。

  • 数据实时接入稳定可靠,通过 Stream Load,实时接入了数千张表,每天接入数据总条数在亿级别,非常稳定可靠;
  • 支持高并发高性能的数据在线分析查询,每天对 Doris 的在线分析查询次数在百万级别,大部分 SQL 在毫秒级别,慢 SQL 也有很大优化空间,并且 Doris 会自动做一些场景下的查询优化;
  • 通过直接查询原始接入表,建立物化试图,建立索引,支撑了多个低延时高并发的实时查询需求。并且多表 Join 性能优异;

其他:

  • Doris 的整体架构简单,运维成本很低,可在线滚动升级,可节省人力专注于数据中台的建设以及业务开发;
  • Doris 高度兼容 MySQL 协议,交互式查询分析,提供高效的数据开发体验;
  • 高可用,数据分区多副本存储,不会因为部分节点的异常导致整体服务不可用;
  • 广泛生态兼容,社区提供了和 Flink,Datax 等大数据交互的 Doris 插件,通过 Broker 导入导出数据简单快捷;
  • 社区活跃,Doris 功能和性能在不断的扩充和提升,遇到问题能够得到社区的密切帮助。

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

用户案例

导读:本文是货拉拉大数据引擎负责人杨秋吉在 DataFunSummit 2022 多维分析架构峰会上的演讲分享,分享的主题是《货拉拉基于 Apache Doris 的 OLAP 体系演进及建设方法》,详细讲解了货拉拉从 OLAP1.0 到 3.0 的演进过程,其中不乏有值得借鉴的方法论以及深刻的技术思考,希望能对大家有所帮助。

业务背景

货拉拉成立于 2013 年,成长于粤港澳大湾区,是一家从事同城、跨城货运、企业版物流服务、搬家、汽车销售及车后市场服务的互联网物流公司。截至 2022 年 4 月,货拉拉的业务范围已经覆盖了国内 352 座城市,月活司机达到 58 万,月活用户达到 760 万,包含 8 条以上的业务线。

货拉拉大数据体系为支撑公司业务,现在已经成立三个 IDC 集群、拥有上千台规模的机器数量,存储量达到了 20PB、日均任务数达到了 20k 以上,并且还处在快速增长的过程中。

大数据体系

货拉拉大数据体系从下往上分为 5 层,最下面的是基础层和接入层,这两层主要会提供基础数据的存储、计算以及集群的管理功能。在基础层和接入层之上是平台层和数仓。在平台层之中包含了数据研发平台和数据治理平台,基于平台层的能力和数据仓库的数据体系,在这之上面包含了含有业务属性的服务层和应用层。整个体系自下而上相互支持,实现支持业务和赋能业务的能力。

img

图 1.1 货拉拉大数据体系

数据处理流

货拉拉典型的数据处理流,可以分成数据集成、采集、数据的存储计算、数据服务四部分,同时也包含了实时、离线以及在线三大业务场景。

img

图 1.2 货拉拉大数据数据流

在数据采集阶段会存在实时采集和离线采集两条路线。

  • 实时采集比较典型的场景为用户端上埋点会直接同步到大数据平台做存储,供后续的在线和离线计算使用。
  • 离线的数据主要是来自于业务方的数据库,会通过天或者是小时定期采集到大数据存储中,以供后续使用。

中间是数据的存储和计算阶段。在离线场景中会通过对数据 ETL 之后转换为构造数仓的分层体系。实时比较典型的场景为数据在经过 Flink 的处理后会直接落在线存储系统,类似于 HBase 和 OLAP 等等,为后续的业务系统提供数据服务。

OLAP 演进概览

货拉拉从 2021 年开始进行 OLAP 的技术研究,截至目前已经经历 3 个阶段:

  • 2021 年上半年为货拉拉的 OLAP1.0 阶段,这个阶段我们主要是支持公司的罗盘业务,我们引入的是能够提供较好的单表依据和查询能力的 Apache Druid 引擎。
  • 2021 年下半年为货拉拉的 OLAP2.0 阶段,我们支持了智能定位工具,这个阶段引入了够提供单表明细查询,并且还有较高的压缩率 ClickHouse。
  • 今年为货拉拉的 OLAP3.0 阶段,伴随着公司业务需求的不断增多,我们也会需要用到多数据源的关联分析。基于此,由于 Apache Doris 具备大表关联分析的能力,我们引入了 Apache Doris 引擎。

img

图 2.1 货拉拉 OLAP 体系演进过程

OLAP1.0 孕育期

业务需求分析

先看下没有引入 OLAP 之前的业务数据流:

img

图 3.1 OLAP1.0 业务场景

根据该图可以看到业务的数据通过实时和离线处理之后会落在 MySQL,MySQL 之中储存了维度聚合之后的结果数据,这也意味着会在 Flink 之中做大量的聚合分析,根据业务需要的相应维度所做的一系列组合都是在 Flink 之中做实时聚合,最后将结果储存到 MySQL。

存在的问题:

  • 存在存储瓶颈,类似于 Kylin 之中的维度爆炸的问题。
  • 开发成本、高效率低。当业务侧需要新增维度的时候会需要对 Flink 中的所有作业都做一定的修改,然后再重新上线。
  • 无法支持部分聚合需求。

对于存在的这些问题,我们经过分析之后,总结出了 3 个背后存在的需求点:

  • 业务侧希望能够横向扩容,解决存储瓶颈。
  • 希望能够自由组合维度做分析,提升业务侧开发效率。
  • 希望能够支持任意维度实现跨度的分析。

解决方案

根据业务需求,并通过调研,我们决定使用 OLAP 引擎来支持业务需求。那我们如何选择一款 OLAP 引擎,并把它稳定的应用到生产之中呢?

我们总结了如下的 4 个步骤作为解决思路:

img

图 3.2 OLAP 1.0 解决思路

技术调研

技术调研阶段,我们对比了 Durid、ClickHouse、Kylin、Presto 和 Doris 等等引擎。结合我们上述的 3 个业务需求,最终我们选择了 Druid 引擎

原因是 Druid 除了能够满足我们的业务需求之外,还有一个比较重要的影响因素是 Druid 引擎是纯 Java 开发,与我们的技术栈比较吻合,可控性更高。

img

图 3.3 OLAP1.0 技术调研

POC 阶段

POC 过程中,从以下 3 个步骤着手:

  • 功能验证。在功能验证中,我们会收集业务侧的 SQL,之后提取 SQL Pattern,然后再根据 Druid 引擎的 Rollup 语义做 SQL 的改写,涉及到大量 UDF 的改写、Rollup 语义兼容以及 Count Distinct 语义兼容等等。
  • 性能验证。我们会直接采用业务真实的数据和业务真实的 SQL 来执行。验证过程中我们会将 Cache 关闭,分别统计 P75、P90、P99 的查询耗时。在这过程中,我们会发现有部分查询的性能没有达到要求,之后我们会做性能分析。Druid 引擎本身没有比较完善的性能分析工具,不能够很好的打印出它的执行计划以及各个算子的耗时,所以我们采用了第三方的 Arthas 火焰图进行分析。定位了相应的算子后,最终我们通过优化我们建表导数的逻辑以及索引构建的逻辑,并主要通过调整 Segment 大小的同时加入物化视图的方法,进行一些参数的调整以此来优化性能。
  • 准确性验证。我们将业务真实数据同时写 Hive 表和 Druid,之后跑 Hive SQL 和 Druid SQL,来进行数据质量的校对。在这个过程中我们会发现例如 StringLast 函数等一些函数会在特定的场景下出现计算值不稳定的问题。

img

图 3.4 OLAP1.0 POC 验证

稳定性保障

当 POC 验证完成之后,接下来我们会进行稳定性的保障。我们将稳定性保障分为事前、事中、事后 3 个阶段

img

图 3.5 OLAP1.0 稳定性保障

上线阶段

当稳定性保障建立完成之后就会进入到上线阶段。上线过程我们同样分成了 3 个阶段

  • OLAP 测试阶段。在这个阶段中,业务的数据会接入到 Druid 之中,但是业务的真实查询还是通过原来的 MySQL 库。这个阶段主要会验证 Druid 引擎的数据质量和 Druid 集群的稳定性。
  • 上线观察阶段。在这个阶段,业务的查询会切回到 Druid。同时旧的 MySQL 链路还没有下线,业务侧能够随时切回 MySQL 链路。
  • OLAP 运行稳定阶段。我们会把 MySQL 旧的链路下线,做资源的回收。

img

图 3.6 OLAP1.0 上生产

问题总结

下面总结了 1.0 阶段时遇到的问题:

  • 数据导入部分中,实时数据乱序为典型问题。
  • 在数据准确性验证阶段发现 StringLast 的函数值不稳定。
  • Durid 没有一个高效的精准去重的函数。

img

图 3.7 OLAP1.0 问题总结

OLAP2.0 完善期

业务需求分析

在 OLAP2.0 阶段主要有以下 4 个业务需求:

img

图 4.1 OLAP2.0 业务需求分析

下图是简单的业务工具的截图,从图中可以看到,OLAP2.0 需要能够支持汇总与明细,同时基于这些能力能够做一个快速的问题定位。

img

图 4.2 OLAP2.0 业务需求分析骤去实现。

解决方案

img

图 4.3 OLAP2.0 技术调研

OLAP2.0 我们引入了 CliclkHouse。ClickHouse 能够比较好地支持复杂的数据类型,同时因为业务侧是埋点数据,所以对于实时导入语义要求并没有那么高。

没有采用 Druid 主要是有 2 个原因:

  • Druid 对于复杂的数据结构支持度并不是很好。
  • Druid 虽然能够支持明细查询,但是 Druid 的明细查询和聚合查询得分成不同的表,这样就会额外的引入一系列的存储成本。

剩下的部分就是 POC 、上生产的步骤,这两个步骤和 OLAP1.0 阶段比较类似,所以在这里就不过多展开介绍。

OLAP3.0 成熟期

业务需求分析

2022 年随着公司业务的发展,更多的产品线对于多数据源关联场景下的在线分析需求也会变得越来越迫切。比如说 AB 实验场景与实时数仓场景,这两个场景对于多表关联需求,尤其是大表的多表关联需求也变得越来越迫切。

img

图 5.1 OLAP3.0 需求分析

举一个 AB 实验的例子。从下图可以看到,例子中是需要把 AB 实验的一个数据和后面相应的司机与用户的埋点数据关联到一起并做分析。在这种情况下,我们就会发现之前的两种工具都会存在一系列的弊端。

img

图 5.2 OLAP3.0 需求分析

解决方案

技术调研

在技术调研阶段我们观察了 Druid 和 ClickHouse。Druid 引擎可以支持一些维表的简单 Join,ClickHouse 则能够支持 Broadcast 这种基于内存的 Join,但是对于大数据量千万级甚至亿级的一些表的 Join 而言,ClickHouse 的性能表现不是很好。

img

图 5.3 OLAP3.0 技术调研

接下来我们对 Doris 进行了调研,我们发现 Doris 是能够支持小表的 Join,对大表的话也同样能够支持基于 Shuffle 的 Join,对于复杂数据类型(Array、JSon)的支持,经过跟 Apache Doris 社区沟通,预计将在 2022 年 7 月份的新版本中发布。通过在多个维度和需求满足度上进行对比,我们最终选择了 Apache Doris,也是因为 Apache Doris 的 SQL 支持度非常的完善。

img

图 5.4 OLAP3.0 技术调研

POC 阶段

我们除了引用业务真实的数据和场景做验证以外,还引入了 TPC-DS 的数据集做了验证。

在多表关联的场景下对单天数据进行查询,对 5 亿左右的数据量进行 Join,TP75 大概是 9 秒左右。在数据质量阶段我们也是把 TPC- DS 的数据集以及业务真实数据集,分别在 Hive 和 Doris 里面做了双跑验证,发现两者都是能够完全对得上的。

img

图 5.5 OLAP3.0 POC

稳定性保障

与之前一样依然是从事前的容量评估和压测、事中的监控和定位来进行。

img

图 5.6 OLAP3.0 稳定性测试

下面是我们的监控图,主要是关于 Compaction 相关的一些监控,感兴趣的同学可以看看。(文末 QA 环节有部分讲解)

img

图 5.7 OLAP3.0 稳定性监控

问题总结

第一个问题是查询性能的优化。

业务侧的需求为 7 天的查询 RT 需要在 5 秒内完成,在优化前,我们发现 7 天的查询 RT 是在 30 秒左右。对于这个问题,我们的主要优化策略是把小表 Join 大表改成了大表 Join 小表,主要原理是因为 Doris 默认会使用右表的数据去构建一个 Hashtable。

还有类似下图中的情况:union all 是在子查询中,然后再和外层的另外一张大表做 Join 的查询方式。这种查询方式没有用到 Runtime Filter 的特性,因此我们将 union all 提到子查询外,这样就能够用到 Runtime Filter,这应该是由于这里的条件下没有推下去所导致的。同时运行时采用的 Bloom Filter 是可以将 HashKey 条件下推到大表 Scan 阶段做过滤。在经过对这两者优化之后便能够满足我们的查询性能需求了。

img

图 5.8 OLAP3.0 问题 1

第二个问题是 UnhealthyTablet 不下降,并且在查询阶段会出现 -230 的报错。

这个问题的场景是我们在没有停 FIink 写任务的时候,对 BE 机器交替重启,重启完会出现很多 UnhealthyTablet。经过我们后续的分析发现,其中一个原因主要是在 Coordinator BE 在做二阶段提交的时候比较巧合,Coordinator BE 的二阶段提交 Commit 后,也就是大部分的副本是已经 Commit 后且在 Publish 前,在这短短的时间范围内 BE 机器被重启,这也导致会出现 Tablet 状态不一致的情况。同时由于我们当时把参数调整的过大,导致了 Compaction 压力过大。

最后的解决办法:与 Aapache Doris 社区的同学经过互助排查,引入了社区 1.1.0 的 Patch,同时对相应的数据做了恢复。

img

图 5.9 OLAP3.0 问题 2

参数优化

  • 打开 Profile。Doris 对于查询的性能分析具有非常好的 Profile 文件,这一点是非常赞的!我们可以看到各个算子在每一个阶段查询耗时以及数据处理量,这方面相比于 Druid 来说是非常便捷的!
  • 调大单个查询的内存限制,同时把 BE 上的执行个数由 1 个调整成为 8 个,并且增加了 Compaction 在单个磁盘下的数据量。对于 Stream Load,我们把 Json 格式的最大的内存由 100 兆调整成为 150 兆,增大了 Rowset 内 Segment 的数量,并且开启了 SQL 级和 Partition 级的缓存。

img

图 5.10 OLAP3.0 参数优化

数据流

下图是使用 Doris 之后的数据流图:

img

图 5.11 OLAP3.0 数据流

数据流中,我们在 Flink 中做的事情已经很少了,经过数据简单的 ETL 后就可以把数据直接灌入到 Doris。经过 Doris 一系列的聚合计算、union 计算以及多表关联计算之后,业务侧就可以直接查询 Doris 来获取相关数据。

总结与思考

总结:我们 OLAP 的引进主要还是从业务需求的角度出发来匹配合适的引擎,为业务精细化运维提供技术支持。在这之后,我们也思考了一套较为完善的上线流程及稳定性保证方案,为业务的平稳运行提供能力保障。

思考:我们认为很难有单个引擎能够富含各种场景。因此在技术选型时,需要针对于需求特点和引擎特点进行合理选择。

后续规划

我们希望可以向 OLAP 平台化发展,通过实现自助化建模的同时在这方面做一些多引擎的路由,使其能够支持各类聚合、明细以及关联等场景。

img

图 6.1 后续规划 OLAP 平台化

除 OLAP 平台化之外,后续我们的引擎演进计划从高效、稳定和内核演进三部分来进行。

img

图 6.2 后续规划 引擎演进

稳定性方面:对 Doris 还要继续深入内核理解,提供一定的二次开发。另外 Doris 社区的相关原理以及代码级别的教程数量十分丰富,这也间接性降低了我们深入 Doris 原理的难度。

内核演进方面:我们发现 Doris 基本能够覆盖 Druid 所有场景,因此后续计划以 Doris 引擎为主,Clickhous 引擎为辅,逐渐将 Druid 的相关业务向 Doris 迁移。

Q&A 环节

Q:刚才讲到了后续要从 Druid 引擎迁移到 Doris,要实现迁移的成本有多大呢?

A:迁移成本方面和我们之前的成本是一样的。我们上线的时候也会采用以下方式:先把业务的数据同时往 Druid 和 Doris 之中写,写完之后的业务迁移会涉及一些 SQL 改造。因为 Doris 更加接近 MySQL 的协议,比起 Druid SQL 会更加便捷,所以这部分的迁移成本不是很大。

Q:刚才介绍的第二个场景之中的监控图都看了哪些指标呢?

A:关于监控图,我们会比较关注 Doris 的数据导入。而在数据导入部分,我们最关注的就是 Compaction 的效率,是否有 Compaction 的堆积。我们现在还是采用的默认参数,也就是 Compaction 的分数就代表它的版本号,所以我们监控的更多的是它的版本。对于这方面的监控,社区也已经有了比较完善的相应技术方案,我们也是参考了社区的技术方案来进行了监控的指标搭建。

Q:从指标上看,Doris 的实时服务在线查询性能怎么样?在数据导入情况下性能损耗可以从这些指标上看出来吗?

A:实时导入方面主要是从 Compaction 的效率来看。结合到我们这边的业务场景,最多的一张表,单表一天也有 6 亿到 10 亿的数据量的导入,也是一张埋点。另外关于峰值,它的 QPS 也是能达到千到万的,所以导入这一块压力不是很大。

Q:SQL 缓存和分区缓存实际效果怎么样?

A:SQL 缓存方面效果还好,对于很多离线场景,尤其是首页这种查询的数据量而言。比如以昨天或者是过去一个小时之前的这种情况来说,SQL 缓存命中率会非常高。分区级缓存方面,我们分区的时间还是设的是小时级,这意味着如果这个查询里面涉及到的一些分区在一个小时内没有数据更新的话,那么就会走 SQL 缓存;如果有更新的话就会走分区级缓存。总体来看效果还好,但是我们这边命中比较多的还是 SQL 级的缓存。

Q:Doris 的查询导入合并和缓存的 BE 节点的内存一般怎么分配?

A:缓存方面我们分配的不大,还是采用的偏默认的 1G 以内。导入方面我们设计的是 parallel_fragment_exec_instance_num 这个参数,大概在 8G 左右。

Q:可以解释一下 OLAP3.0 的解决思路吗?

A:对于 OLAP3.0 方面来说,业务的主要诉求就是大表 Join。除此之外,还有一些类似于导入的进度一致等等。

在大表 Join 方面,我们也对比了很多的引擎。Druid 这方面就是偏维表;Clickhouse 这方面还是偏基于内存方面的 Broadcast。正因如此,主要是基于大表 Join 的出发点,我们选择引入了在 Join 这方面能力更强的 Doris。

Q:Druid、ClickHouse 和 Doris 应该都是近实时的,就是 Near Real-time,他们的写入不是立刻可见的,是这样吗?

A:是这样的。像 Doris 和 ClickHouse 之前的写入都是 Flink 直接去写,我们也没有完全做到来一条数据就写一条,都是一个微批次。一个批次最大可以达到 150 兆的数据堆积,写入一次的时间间隔也是到 10 秒左右,没有做到完全的实时写入。

Q:方便透露一下货拉拉目前 Doris 的集群的使用情况,比如机器的数量和数据量吗?

A:我们的集群数量还不算很多,10 多台。

Q:对于 Doris 的运维方面,它的便捷性和 Druid、ClickHouse、Kylin、Presto 这些相比,有很好的扩展性吗?

A:我们觉得是有的。第一个是在我们 Druid 方面碰到了一个比较大的痛点,就是它的角色特别多,有 6 种角色,所以需要部署的机器会非常多。另外一点是 Druid 的外部依赖也非常多,Druid 依赖于 HDFS、离线导入还需要有 Hadoop 集群。

第二个是 ClickhHouse 方面,我们当时使用的版本对于 Zookeeper 也是有比较大的依赖。另外,ClickHouse 也是偏伪分布式的,有点类似于数据库的一种分表。Doris 自身就只有 FE、BE,外部依赖会非常少,所以我们从部署的角度同时考虑到 Doris 的横向扩展方面,Doris 的扩缩容也能够做到自平衡,所以相比而言 Doris 会更好一些。

Q:在实时特征场景下,分钟级的数据更新对服务性能要求比较高,可以用 Doris 吗?能达到 TP99 200 毫秒以下吗?

A:TP99 能够否达到 200 毫秒以下主要和你查询 SQL 相关。

例如我们这边的很多涉及到大表 Join 的查询,涉及的分区数据量大概在 10 亿量别,业务侧对于查询性能要求是 5 秒以内,通过 Doris 是可以满足我们需求的。如果是实时特征这种业务,是否能达到 200 毫秒可能需要经过一轮实际测试才能得到结果。

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

用户案例

导读:近几年随着跨境电商行业的快速发展,Lifewit 业务达到近十倍的增长,原先的痛点已经严重影响到用户的数据使用体验。技术端需要随着业务的飞速发展不断升级迭代适应业务的增长。Lifewit 规划了从旧数据架构进化成目前基于 Apache Doris 构建的轻量级业财一体化数据平台,来系统化地解决旧架构存在的痛点,打通业务数据和财务数据,构建综合数据平台提供全业务链自助数据分析能力,支撑完善的数据报表体系和高效的数据分析。

业务背景

Lifewit 是一家专注于打造全球创新家居场景品牌的企业,通过自主研发、自主设计、品牌策划、技术驱动、垂直供应链、数字化人力资源形成一套“六位一体”化的特色经营体系。Lifewit 拥有自主的 B2C 品牌商城,深耕 Amazon 平台,同步入驻全球潜力电商平台,销售市场已覆盖全球上百个国家,服务上千万全球客户。

在我们业务场景中,数据大多来源于各个平台报表和各个业务系统产生的数据,旧架构直接基于关系型数据库构建报表数据,数据源系统多而复杂,还经常发生变化;复杂计算缺少分层建设导致拖垮从库;ETL 存在多种形态,没有统一建设和管理,排查问题比较艰难;源头数据的变化导致大量的下游表发生差异,需要及时重新计算;但发生变化的数据影响面分析困难,异常问题排查耗时人工成本高。这些都属于旧数据架构的一些痛点。

经过了近几年跨境电商行业的快速发展,我们的业务达到近十倍的增长,原先的痛点已经严重影响到用户的数据使用体验。技术端需要随着业务的飞速发展不断升级迭代适应业务的增长,所以我们今年规划了从旧数据架构进化成目前基于 Apache Doris 构建的公司轻量级业财一体化数据平台,来系统化的解决旧架构存在的痛点,打通业务数据和财务数据,构建综合数据平台提供全业务链自助数据分析能力,支撑完善的数据报表体系和高效的数据分析。

整体架构

数据架构

CECP: 老综合业务系统,核心模块是供应链和财务相关,逐步升级成 LBP

LBP:Lifewit 新业务平台,覆盖公司全业务链业务平台

LDP:Lifewit 新数据平台,覆盖公司全业务链数据应用

数据调度:LDP 采用 Airlfow 承接调度能力,Airflow 是一个使用 Python 语言编写的 Data Pipeline 调度和监控工作流的平台。Airflow 是通过 DAG(Directed acyclic graph 有向无环图)来管理任务流程的调度工具,支持自定义 Operator/Hook,还支持触发规则自定义,具备丰富的可扩展能力。

数据采集:LDP 目前实现主要是分钟级和小时级任务,支持两类采集,一类定时 API 增量采集,一类定时 OLTP 数据源增量采集,OLTP 增量采集直接构建在 Airflow,通过 Doris 连接 OLTP 从库数据源进行自定义规则采集。

数据仓库和数据应用都是基于 Doris 构建,数据视图层基于开源版/商业版 BI 软件构建。

元数据:自研,支持 PG、MySQL、Doris 的元数据自动化采集和管理。

数据质量:自研,支持自定义 SQL 对数据仓库,数据应用层指标进行自定义监控和告警。

数据血缘:目前的开源数据血缘不太适合我们公司,还在调研 DBT 中,第二期重点考虑 DBT 生产化可行性。

测试集群概况

测试环境配置:

  • 三台 8 核 16 G 云服务器
  • 三台 BE,一台 FE,其中一台 BE 混布
  • Ubuntu 18.04,CPU 支持 avx2

目前我们还处于数据架构升级的初始阶段,仅接入了部分销售数据,测试集群规模如下,目前已接入业务的数据量在千万级别,后续持续会有更多旧数据业务以及新的业务线接入进来。

集群监控

基于 Grafana+Promethus 构建集群监控可视化,以下监控图是测试环境监控部分截图。

数据采集

采集方案

LDP 目前实现主要是分钟级和小时级任务,通过 Airflow+Doris 轻量级支持 2 类采集。

一类定时 API 增量采集,通过调用 ERP 等其他业务系统的 API 进行增量数据采集到 Doris ODS 层。

一类定时 OLTP 数据源增量采集,OLTP 增量采集直接构建在 Airflow ,通过 Doris 连接 OLTP 从库数据源进行自定义规则采集,达到增量数据源源不断的进入 Doris ODS 层。

第二期支持实时采集 Binlog 入 Doris ODS 功能。

采集接入

ODBC 环境搭建

注意:所有 BE 都需如法安装,并保持相同配置

安装操作系统 ODBC 驱动:

apt install unixodbc




检查是否安装成功:

安装 MySQL ODBC 驱动:

选型:

img

下载地址:https://downloads.mysql.com/archives/c-odbc/

放置 Lib 目录

注册 ODBC 驱动:

myodbc-installer -a -d -n "MySQL ODBC 8.0.11 Unicode Driver" -t "Driver=/usr/lib/mysql-odbc-8.0.11/libmyodbc8w.so"

myodbc-installer -a -d -n "MySQL ODBC 5.3.13 Unicode Driver" -t "Driver=/usr/lib/mysql-odbc-5.3.13/libmyodbc5w.so"

查看是否注册成功

myodbc-installer -d -l

MySQL ODBC 5.3.13 Unicode Driver

MySQL ODBC 8.0.11 Unicode Driver

  • 验证 ODBC 连接 MySQL

编辑 /etc/odbc.ini 文件:

[mysql]
Description = Data source MySQL
Driver = MySQL ODBC 8.0.11 Unicode Driver
Server = 192.168.20.17
Host = 192.168.20.17
Database = test
Port = 23306
User = root
Password = sakdfwexkjsga134wesdgdsa4

执行

isql -v mysql

至此操作系统层通过 ODBC 是可连接到 MySQL。

ODBC 接入 Doris

编辑 be/conf/odbcinst.ini 增加以下配置:

[MySQL ODBC 8.0.11 Unicode Driver]
Description = ODBC for MySQL 8
Driver = /usr/lib/mysql-odbc-8.0.11/libmyodbc8w.so
FileUsage = 1

[MySQL ODBC 5.3.13 Unicode Driver]
Description = ODBC for MySQL 5
Driver = /usr/lib/mysql-odbc-5.3.13/libmyodbc5w.so
FileUsage = 1




ODBC 如何使用

创建 Resource:

create external resource test_resource properties(
"type"="odbc_catalog",
"odbc_type" = "mysql",
"host"="127.0.0.1",
"port"="23306",
"user"="root",
"password"="sakdfwexkjsga134wesdgdsa4",
"database"="test_db",
"driver"="MySQL ODBC 8.0.11 Unicode Driver"
);

创建外部表:

CREATE EXTERNAL TABLE `sku` (
`id` int(11) NULL COMMENT "",
`sku` varchar(64) NULL COMMENT "",
`name` varchar(128) NULL COMMENT "",
`type` varchar(128) NULL COMMENT "",
`creator_id` int(11) NULL COMMENT "",
`create_time` datetime NULL COMMENT "",
`updater_id` int(11) NULL COMMENT "",
`update_time` datetime NULL COMMENT ""
) ENGINE=ODBC
COMMENT "TEST"
PROPERTIES (
"odbc_catalog_resource" = "test_resource",
"database" = "test_db",
"table" = "sku"
)

具体使用场景:

  • 从外表定时增量采集到 Doris,主要是通过 AirFlow 定时任务执行 insert into select 语句方式采集
  • 查询时直连外表(数据量小),业务表很多,无需采集数据即可方便直接查询,若外表数据过大,或查询批次太高不建议直连

数据仓库

分层设计

SRC:数据源,主要来自各业务系统和亚马逊报表,以 PG、MySQL、ES 为主,采用 Doris ODBC 外部表实时直连从库,用于采集。

ODS:原始数据层,存放原始数据,主要是离线/实时写入的数据,与数据来源保持一致,还原数据过程。

DWD:数据明细层,根据需求从 SRC/ODS 层清洗数据存储到 Doris 中,采用 Uniq 模型。

DWS:轻度汇总层,从 DWD 轻度汇总数据,采用 Uniq 模型,构建命名规范、口径一致的统计指标,为上层提供公共指标。

ADS:数据应用层,和业务强相关的数据应用层,构建 ADS 是以需求为驱动,应用层主要是各个业务方或者部门基于 DWD 和 DWS 建立的数据集市。

DVS:数据视图层,BI 可视化对应的视图表,在 DVS 直接抽取和计算来自从 ADS、DWS 等层次的数据。

根据实际业务复杂性会存在跨层建设场景,不会严格按照每一层进行建设。

从外表采集数据到 DWD 层:

根据业务规则生成 DWS 层数据:

通过 Airflow 编写简单的 Python 代码进行任务调度编排:

Airflow 作业销售数据报表作业 DAG:

维度 Join 好处

传统基于 Hadoop 生态构建数据仓库,在进行建模的时候,广泛的采用的是大宽表,将指标列和维度列放在同一张表上。这会带来一个问题:当维度修改的时候,需要对数据任务进行重跑对数据进行回溯,重新聚合计算,这样的话回溯时间越长需要消耗时间越久。

我们使用 Doris 做存储和分析,由于 Doris 具备多表 Join 性能良好,采用星型关联表来建模,可以支持维度的动态修改,降低数据重跑回溯的成本。

数据可视化

数据可视化属于 LDP 数据视图层,截图属于销售看板应用,数据来自 DWS/ADS 层销售数据。构建了销售数据的多维度的自助分析能力,主要使用用户是运营中心。销售数据属于我们第一期的建设范围,其他业务陆续接入。

数据质量

数据质量

新数据架构建了基础的作业流和复杂的作业流,随着业务任务量增加,作业的故障问题对于用户来说容忍度会越来越低,如何监控生产作业的稳定性,避免经常在发生用户投诉后才发现任务异常,对于数据平台来说极为重要的环节。

我们数据架构的作业健康分 2 类,作业质量(即 DAG/TASK 的质量),数据质量(即数据指标,数据时效等数据类质量)。

DAG 质量和 TASK 质量就需要定时监控 DAG 和 TASK 元数据(存储在 MySQL 数据库内),监控 DAG 和 TASK 增量运行健康情况,定义监控规则是监控 TASK 还是 DAG,具体的监控细节是捕获到何种异常进行对应的分级告警。解决作业失败发现不及时导致发生重大故障问题,解决数据堆积导致最终结果交付延迟问题。

数据质量,涵盖数据指标,数据时效等,以及反向要求数据底层需具备一定时限的自愈机制,降低数据质量异常频率。通过数据质量定时作业检查配置好的质量规则,通过监控数据质量结果,达到统一告警的机制。我们 LDP 架构的刚上线,服务的数据应用不多,系统化的数据质量还未完全铺开。第一期主要先针对具体数据应用常见问题构建分模块的数据质量应用进行监控告警或自愈。第二期进行数据质量系统化建设。

元数据

作业质量和数据质量的管理,离不开元数据和数据血缘的建设,广义的 LDP 数据血缘涵盖任务血缘(Airflow 的 DAG ),作业血缘(Airflow 的 DAG 内部的 TASK ),数据血缘(和 Airflow 无关,在整个数据平台,数据生产形成的数据血缘链路),只有掌握了数据流的具体流向才能识别单点故障对整个数据平台的影响,而不是遇到问题只是单点解决问题,无法找到波及面,更不用说如何及时的修复波及面。

第一期的数据平台我们任务不多,没有做到完整的数据血缘采集,只实现了元数据管理。对接入数据平台的所有库,表,字段,计算逻辑,依赖关系进行统一管理。

通过依赖关系的维护,以及具体应用的指标监控,来识别异常指标波及面进行人工的异常分析和作业重跑。

第二期进行完整的数据血缘采集,实现完整的通用的数据自愈和故障影响面自动分析功能。

数据自愈

第一期的数据自愈主要是针对具体应用需求进行开发,本次生产作业是在两层之间增加一个数据健康检查任务,由于报表数据和业务数据经常发生变动,导致 DWD、DWS 的数据和 SRC 层数据发生偏差,需要寻找有偏差的数据,并通过 Airflow 重跑任务,当前采用 Delete + Insert 方式。

img

旅行者

img

健康检查时间段有限制,不可能无条件检查历史数据,于是需要一个方案进行更久以前的各层数据比对、汇总和告警。

  • 健康检查例行过去 30 天的数据,数据对不上将触发重跑
  • 30 天以外的数据用新任务负责低频检查和告警

目前数据质量还是针对具体业务实现具体的告警规则,下一阶段实现通用的数据质量管理体系。

实践总结

数据质量

  • MySQL ODBC 版本选择问题:

img

具体选型见 ODBC 环境搭建环节,版本选择不对可能导致 BE 挂掉。

  • -235 问题:

img

img

解决方案:

curl -X POST http://{be_ip}:
{be_http_port}/api/update_config?min_compaction_failure_interval_sec=30&persist=true

在 Cumulative Compaction 过程中,当选中的 Tablet 没能成功地进行版本合并,则会等待一段时间后才会再次有可能被选中。等待的这段时间就是这个配置的值,默认 5s 在插入速率过大,而批次量过小时容易产生,此时需要调大配置,减少插入速率,增加单次插入量。

新架构的收益

  • 采用基于 Apache Doris 的数据平台方案减轻了传统大数据搭建的服务器成本和运维人力成本
  • 数据平台方案整个链路和传统 Hadoop 数仓链路相比很大缩短,链路越短,数据稳定性维护越简单
  • 磁盘占用量大幅度降低。旧数据架构存在大量索引和分区等优化策略,导致某些表的索引比业务数据还大,使用 Doris 后存储所占用的资源大幅降低。
  • 数据分层结构清晰。根据三种不同特性的数据模型设计不同层次的表结构。Uniq 作为 DWD 或者 ODS 层,Uniq/Agg 作为 DWS 层,Agg 作为 ADS 层。
  • 查询速度提升。BI 查询聚合好的 ADS 层数据,发挥 Agg 模型最大效能。
  • ODBC 模式的采集方式减少 ETL 流程,降低复杂度,提高开发效率。
  • 物化视图自动刷新。PG 的物化视图和源表数据分离,源表数据变动需要手动重刷容易出错。Doris 自动刷新和查询透明机制,直查源表自动匹配最优物化视图
  • 由于良好的多表 Join 性能,采用星型关联表来建模,可以支持维度动态修改,降低回溯成本

后续演进

随着 LDP 数据平台服务的数据应用越来越多,后续对整个 LDP 数据平台架构需要更丰富的功能,更实时,稳定,安全的数据交付能力,更便捷的平台管理能力。

LDP 数据平台第二期优先功能范围:

  • 数据血缘和数据自愈实现,任意表延迟多层自动修复
  • 更实时采集 Binlog,支撑实时数仓建设
  • 通用数据质量,支持任意数据源,任意指标的自定义监控和告警

目前基于 Apache Doris 的 LDP 数据平台在乐活科技的第一个数据应用得到用户的广泛认可, 用户更加期待后续数据应用可以快速产出和赋能业务。感谢 Apache Doris 社区给予的支持,使我们能够快速构建轻量级 LDP 数据平台的基建设施,祝愿 Apache Doris 社区发展越来越好!

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

用户案例

导读: 物易云通目前已成为国内产融供应链运营服务平台的领军企业之一,平台年交易额超过 200 亿元 随着公司业务的快速发展,对数据计算分析的时效要求也越来越高。经数据团队的调研对比,于 2021 年引入了 Apache Doris 作为实时数据仓库。实战过程中获得一些经验,在此分享给大家。

业务背景

武汉物易云通网络科技有限公司成立于 2015 年 6 月,总部位于湖北省武汉市东湖高新区。作为国内产业互联网的探索先行者,公司致力于将产业互联网思维与新一代信息技术深化应用于煤炭、建筑、再生资源三大业务领域,以标准化、场景化、数字化的供应链综合服务解决能力,开创互联网化的“供应链技术+物流服务+金融场景”的产融协同新生态。目前公司已成为国内产融供应链运营服务平台的领军企业之一,平台年交易额超过 200 亿元。公司入选 2020 年中国互联网企业综合实力 100 强,2021 年武汉市软件收入第一名。

随着公司业务的快速发展,对数据计算分析的时效要求也越来越高。之前的产品已经无法应对庞大的数据量,为解决这一问题,数据团队通过调研对比,在 2021 年引入了 Apache Doris 作为实时数据仓库。基于 Apache Doris 建设实时数仓的过程中,沉淀了许多经验,借此机会分享给大家。

数仓架构演进

公司创业之初,是使用 MySQL 作为 BI 仓库,每天增量卸数后导入,通过定时调度存储过程进行计算。该方案能快速满足公司的跨库数据关联计算的需求,但是随着业务发展,数据和任务不断增多,MySQL 已难以支持,另外该方案局限性比较大,如果业务表存在物理删除或者没有数据更新时间的情况下,则会导致数据不准。

为了解决上述问题,我们搭建了一套 CDH 作为数据仓库。通过 Canal 订阅 MySQL 的 Binlog 到 Kafka,进行编写消费程序,将数据写入 Hbase,然后增量合并到 Hive 中,通过 Oozie 调度计算脚本。

然而离线 T+1 的数据只能满足一部分的业务需求,因此我们需要一套能快速查询实时数据的数据仓库,同时可以支持离线需求和实时需求,经过许多产品的调研对比,证明 Apache Doris 可以很好地实现我们的业务需求。

img

Doris 数仓架构通过 Flink CDC 实时接入生产库数据到 Doris,支持实时 OLAP,然后通过海豚调度器定时执行 SQL 脚本,替代 Hive 的离线数据计算任务。

新架构的优势

1、数据处理架构简单 新的架构里我们使用了 Flink CDC 来做数据同步(Flink CDC 内置了一套 Debezium 和 Kafka 组件,但这个细节对用户屏蔽),它不但可以读取增量,还能读取全量数据,然后将数据通过 Stream load 的方式写入 Doris。

2、一份数据,实时全量 由于 Hive 查询很慢,所以之前是把 Hive 的数据通过 Sqoop 推送到 MySQL 进行查询,即有多份数据存储在不同的 MySQL 上,维护和存储成本都很高,并且 Hive 里只有 T-1 的数据,需要每天写脚本合并。Doris 支持 MySQL 协议,可直接查询,同时 Doris 支持主键数据去重及更新,有实时的全量数据,解决了实时报表和在线 OLAP 的需求。

3、架构简单,易于部署维护 相对于 Hadoop 那一套各种组件来说,Doris 部署维护非常简单。

4、一键全库接入,结构实时同步 通过自研的数据易平台,实现了 MySQL 一键全库接入 Doris,即通过页面选择后,一键生成 Flink CDC 任务在 Yarn 上持续运行。而且通过解析 Binlog 里面的 DDL 语句,将其转化为 Doris 语法,利用 Doris 的 Online Schema Change 特性,实时同步生产数据库的表结构变更,保障了表结构一致,新增字段数据一致。

5、秒级查询 Doris 查询是秒级,Hive 是分钟级,跑批的效率提升了 20-30 倍。而之前用的是 Impala 加速 Hive 的查询,每个表在使用前都要 Refresh 一次,非常麻烦,并且 Count Distinct Impala 近似计算不准确。

系统重点功能

数据接入

第一步选择需要接入的 MySQL 库类型,默认是 A,即最常见的全局库名唯一。 img

另外还存在几种其他的情况:

B、全局有多个名称相同、结构不一致的数据库。比如:部分大表做了数据切割归档到另一台机器上了。

C、全局有多个名称相同、结构一致的数据库,即分库。我们需要将数据合并到一个 Doris 库表中,方便数据分析。

D、全局有多个名称不同、结构一致的库。比如:DB租户 A 的库, DB租户 B 的库,我们也是要把数据合并分析。

第二步,选择 MySQL 库实例,进行提交(如果不想接入全库,可以勾选部分表)。对应的目标数据库是 Doris 里面的 ODS 贴源层,和生产数据保持一致,一个库一个任务,可以视情况调整内存等参数。

img

注意: 通过列表可以进行任务管理,恢复任务是运用了 Flink CDC 的 Checkpoint 机制,在任务异常挂掉时可以恢复运行。Flink CDC 任务目前是运行在 Yarn 上。

img

数据计算

我们在数据易平台上开发了数据计算任务功能,用户编写 SQL 后,点击 SQL 解析,即可自动识别出脚本里用到了哪些来源表,生成了哪些结果表,最终在海豚调度器里生成对应的任务和上游任务 Depend 关系。

img

说明: 为了保障各个 T+1 报表的数据一致性,我们做了 ODS 层到 DWD 层的一套计算任务,即每天 0 点将 ODS 层中近 2 天的增量数据 insert into 到 DWD 层进行更新,后续 T+1 的计算任务统一从 DWD 层进行计算。

注意: 需要把物理删除变成逻辑删除,使用时剔除。如果直接在 ODS 里面同步物理删除,会导致 DWD 层里无法通过增量方式同步删除。

新架构的收益

降低资源成本

当前我们的集群配置为 5 台阿里云 ESC,16 核 64G。在相同集群配置下,1000 个表的每日增量数据合并任务,用 Hive 需要 3-5 小时,用 Spark 需要 2-3 小时,然而同样的需求 Drois 运用 Unique Key 模型完成只需要 10 分钟,大幅提前了后续计算任务的开始时间。

另外,因 Hive 跑得慢,我们后续的几百个 Hive 计算任务,排队情况很严重,不得不把一些优先级低的任务排到下午甚至晚上,日任务全部跑完需要十几个小时。而我们把全部批任务迁移到 Doris 上计算后,全部任务跑完只需要 2 小时不到,后续增加新的需求任务完全无压力。

总而言之,使用 Doris 后,报表数据的更新时间大幅提前,临时的数据查询需求响应时长大幅缩短,至少节约了每年几万的大数据集群扩容成本,同时获得了各部门的认可。

提升开发效率

随着公司业务快速的发展,会不断的有新的数据分析需求,就需要我们接入新库新表,给老表加字段等,这对于 Hive 数仓是非常痛苦的,表要重建、全量数据要抽,这就需要每周有半天时间都在处理这些事情。

在使用 Doris 作为数仓后,通过我们的数据易平台配置 Flink CDC 任务快速接入 MySQL 库表的全量+增量数据,同时利用 Doris 的 Online Schema Change 特性,实时同步 Binlog 里的 DDL 表结构变更到 Doris,数据接入数仓零开发成本。

另外因为 Doris 支持 MySQL 协议直接对接数据可视化应用,我们不需要再把结果数据从 Hive 推到 MySQL 里提供数据服务,节约了数据库资源,减少了开发步骤。

体现数据价值

Doris 有审计日志,我们可以通过日志,分析出每个表每天的查询使用情况,以便我们评估跟进数据价值、下线废弃报表及任务。另外还可以预警资源消耗多、查询慢的查询语句,帮助用户进行语法优化等。

问题与经验

1、MySQL 和 Doris 字段类型不一致 MySQL 的 Blob、Mediumint、Year、Text 等字段类型在 Doris 中没有,需要我们转换成 Doris 对应的字段类型,Varchar 的长度我们对应在 Doris 需扩大成 3 倍。

2、MySQL DDL 语法兼容性问题 MySQL 的 Bigint Unsigned、AUTO_INCREMENT、CURRENT_TIMESTAMP 等语法在 Doris 里不支持。

3、多个大表 Join 计算时,内存使用过大,导致 BE 挂掉,影响数据写入。 目前 Doris 新版本已对内存控制这部分进行优化。

4、Hive 和 Doris 差异 将 Hive 计算脚本改成 Doris 计算脚本时遇到一些语法问题,如:

  • Doris 不支持 Lateral View ,升级新版本已解决。
  • 之前的一些 Hive UDF 函数是 Java 写的,Doris 不支持,我们用另外的程序对数据做的二次加工处理,后续 Doris 新版本会支持。
  • Doris 缺少一些函数,如 Last_Day 通过取日期下个月的第一天再减一天来实现, Collect_Set 通过先去重再 Group_Concat 实现等。

5、分析函数问题

  • 分析函数 XX() over(partition by) 在外层和子查询中同时存在时,报 errCode = 2, detailMessage = can't support。我们通过将子查询数据放入临时表解决该问题,后面 Doris 1.0 版本已解决该问题。
  • 多个 lag PARTITION by 函数和 min PARTITION by 一起使用时,有乱码的情况。撰文时该 Bug 已修复,等待合并发版。

6、Doris 动态分区 动态分区字段必须为 Date 到月、周、日,不能根据写入的数据自动创建分区,目前我们通过建表时指定初始化的分区数解决此问题。

7、Stream Load 写入过于频繁报错 Stream Load 写入 Doris,写入太频繁会报错误码 235 问题,同样的表 Routine Load 不会出现这个问题,我们通过批量提交解决,Doris 新版本已优化该问题。

以上问题在向社区反馈后,得到了社区的快速响应。截止目前,上述问题基本上都已经得到修复,并且将在即将发布的新版中上线。

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org