跳到主要内容

5 篇博文 含有标签「技术分享」

查看所有标签

技术分享

作者介绍:

刘常良:Apache Doris Contributor,SelectDB 存储层研发工程师。

吴迪:Apache Doris Committer,SelectDB 生态研发工程师。

在 OLAP 的业务场景中,Schema Change 是一个相对常见的业务需求,当上游数据源维度发生变化时,通常需要将数仓中的表结构进行相应的变更。相对于业界其他 OLAP 数据库,Apache Doris 对于 Schema Change 的支持非常友好,可支持 Online Schema Change,进行加减列或修改列类型时无须停服,保证了系统的高可用和业务的平稳运转。但在部分场景下,Schema Change 也存在一定的瓶颈,例如:在面对大数据量宽表场景下, Schema Change 执行效率相对较低、耗费时间较长;另外基于 Flink 和 Doris 构建实时数仓时,因 Schema Change 是异步作业,一旦上游表发生维度变化,需要自己维护 Schema Change 的执行状态,并在完成后重启 Flink Job,无法做到自动化变更,冗长复杂的操作流程无疑增加了许多开发和运维的成本,且可能会带来消费数据的积压。

基于此,Apache Doris 将在 1.2.0 版本推出新功能 Light Schema Change ,我们将通过本文为大家揭秘 Light Schema Change 的设计与实现,并分享其如何提升表结构变更的执行效率、以及如何在 Flink + Doris 实时数仓场景中实现 DDL 操作的自动同步。

需求背景

在正式介绍之前,需要认识一下 Apache Doris 1.2.0 版本之前支持的 3 种 Schema Change 方式,均是异步作业,分别为:

  1. Hard Linked Schema Change: 无需转换数据,直接完成。新摄入的数据都按照新的Schema处理,对于旧数据,新加列的值直接用对应数据类型的默认值填充。例如加列操作 ALTER TABLE site_visit ADD COLUMN click bigint SUM default '0';

  2. Hard Sort Schema Change: 改变了列的排序方式,需对数据进行重新排序。例如删除排序列中的一列, 字段重排序 ALTER TABLE site_visit DROP COLUMN city;

  3. Hard Direct Schema Change: 重刷全量数据,成本最高。当修改列的类型,稀疏索引中加一列时需要按照这种方法进行 ALTER TABLE site_visit MODIFY COLUMN username varchar(64);

而值得注意的是,1.2.0 版本新增的 Light Schema Change 新特性正是 替换了 加减列操作的 Hard Linked Schema Change 流程 在了解 Light Schema Change 的优势之前,我们需要先知道 Hard Linked Schema Change 的技术原理。

Hard Linked Schema Change 在作用于加减 Value 列时,当接收到加减 Value 列的 DDL,Doris FE(下文简称 FE)会发起一个异步的作业,并立刻返回。作业主要做以下事情:

  1. 创建新的 Tablet。

  2. 等待已经开始的导入完成。

  3. 将当前已有的数据文件 Hard Link 到新 Tablet 对应的数据目录下。

在使用过程中,我们发现 Hard Linked Schema Change 存在几个明显的问题:

  1. 当集群规模和表数据量达到一定数量时,Hard Linked Schema Change 等待时间就会明显增加。

  2. 在 Hard Linked Schema Change 过程中,如果有导入任务,为保证 Schema Change 的事务性,会对新/旧 Tablet 进行双写,Schema Change 则需要等待导入任务完成之后才可以进行。在遇到这种情况时,用户往往只有 2 个选择:一个是等待 Schema Change 完成再进行导入;一个是接受双写的代价。而这两种选择对用户都不太友好。

  3. Hard Linked Schema Change 无法处理 Delete Predicate。如果用户在 Schema Change 之前调用过 Delete 语句,Doris 不会立刻删除数据,而是记一个Rowset,并把 Delete Predicate 和此 Rowset 进行关联;如果在做 Hard Linked Schema Change 过程中,发现 Delete Predicate,则会转化为 Sort/Direct Schema Change,对数据进行重写。

Light Schema Change 的设计与实现

相较于 Hard Linked Schema Change 的作业流程,Apache Doris 1.2.0 新版本的 Light Schema Change 的实现原理就要简单的多,只需要在加减 Value 列的时候,对 FE 中表的元数据进行修改并持久化。在设计过程中,需要考虑到以下实现细节:

解决 Schema 不一致问题

由于 Light Schema Change 只修改了 FE 的元数据,没有同步给 BE,而 BE 对读写操作依赖于自身的 Schema,这时候就会出现 Schema 不一致的问题。为了解决此问题,我们对 BE 读写流程进行了修改。主要包含以下方面:

  1. 读取数据的时候下发 Schema 。Schema 每一列都有相应的 Unique ID,该 Unique ID 由 BE 的每个 Tablet 负责产生和赋值,但对于所有的 Tablet,其 Schema 列的 Unique ID 都是一致的,因此将此过程移在 FE 上去实现。FE 在生成查询计划时,会把最新的 Schema 附在其中,并一起发给 BE,BE 会拿最新的 Schema 读取数据,以此来解决读过程中 Schema 的不一致。

  2. 将 Schema 持久化到 Rowset 的元数据中。FE 在发起导入任务的时候,会把最新的 Schema 一起下发给 BE,BE 会根据最新的 Schema 对数据进行写入并与 Rowset 绑定,将该 Schema 持久化到 Rowset 的元数据之中,实现了 Rowset 数据的自解析,解决了写过程中 Schema 的不一致

  3. 在进行 Compaction 的时候,选取需要进行 Compaction 的 Rowset 中最新的 Schema,作为Compaction 之后 Rowset 所对应的 Schema,以此来解决拥有不同 Schema 的 Rowset 合并问题。

全局 Schema Cache

由于 Rowset 的元数据一直存储在内存中,如果每个 RowsetMeta 都存储一份 Schema,会对内存造成较大的压力。为了解决这个问题,实现了一个全局的 Schema Cache 管理相同的 Schema,这样就算有成千上万个 Rowset,只要 Schema 相同,内存中只会存在一份 Schema。

支持物化视图

Light Schema Change 也实现了对物化视图的支持。对读写流程修改之后,物化视图也可以正常读写。同时,如果要删除的列在物化视图中是 Value 列,则会与主表一起触发 Light Schema Change;如果主表的 Value 列是物化视图中的 Key 列,则需要发起异步任务,对物化视图进行 Sort/Direct Schema Change。

解决数据重写问题

由于 Delete Predicate 绑定了 Rowset,且每个 Rowset 都绑定了Schema,当 Delete Predicate 所涉及的列被删除后,可以通过寻找到对应的 Rowset,Merge 该列的信息进当前的 Schema 中,这样对 Delete Predicate 之前的数据也可以正常过滤。解决了数据中有 Delete Predicate 需要重写数据的问题。

以上就是 Light Schema Change 功能实现过程中对 Doris 进行的修改,在使用的时候只需在建表的时候指定参数即可打开 Light Schema Change 功能,如下所示:

CREATE TABLE IF NOT EXISTS `customer` (
`c_custkey` int(11) NOT NULL COMMENT "",
`c_name` varchar(26) NOT NULL COMMENT "",
`c_address` varchar(41) NOT NULL COMMENT "",
`c_city` varchar(11) NOT NULL COMMENT "",
`c_nation` varchar(16) NOT NULL COMMENT "",
`c_region` varchar(13) NOT NULL COMMENT "",
`c_phone` varchar(16) NOT NULL COMMENT "",
`c_mktsegment` varchar(11) NOT NULL COMMENT ""
)
DUPLICATE KEY(`c_custkey`)
DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 32
PROPERTIES (
"replication_num" = "1",
"light_schema_change" = "true"
);

性能对比

为进一步体验 Light Schema Change 的执行效率,我们在 1 FE 1 BE 的集群上对加减列操作分别在有导入任务时和无导入任务时进行了对比。硬件配置为 16C 64G,数据均在 SSD 盘,使用了TPC-H SF100 的 lineitem 表,数据量约74G,具体测试对比如下:

无导入任务时

加列:

  1. Hard Linked Schema Change: 耗时 1s 310ms。

  1. Light Schema Change: 耗时 7ms

减列:

  1. Hard Linked Schema Change: 耗时 1s 438ms

  1. Light Schema Change: 耗时 3ms

由上面测试可以看出,Light Schema Change 加减列速度远快于 Hard Linked Schema Change,并且随着 BE 节点和表数据量的增多,Hard Linked Schema Change 的耗时是远高于 Light Schema Change 的,原因是 Light Schema Change 只需要和 FE Master 进行交互,并可以实现同步返回。

有导入任务时

加列

  1. Hard edLink Schema Change: 耗时 13 mins

Light Schema Change: 耗时 3 ms

从上面测试可以看出,同样的加列行为,如果有导入任务,Hard Linked Schema Change 需要等待导入的完成,才可以做 Schema Change,而 Light Schema Change 无需等待毫秒内即可完成导入。

欢迎读者尝试上手做一下有导入任务时的减列对比测试,也可以做一下在 Schema Change 过程中进行导入的情况下,两种 Schema Change 的速度对比。

Flink 结合 Light Schema Change 实现 DDL 同步

之前在基于 Apache Doris 和 Apache Flink 构建实时数仓时,当上游数据源发生表结构变更时,Doris 在同步 DDL 操作时主要有以下痛点:

  1. 在发起 Schema Change 后为了避免双写对集群产生的压力,通常会选择阻塞上游数据,在等待 Schema Change 操作执行完成后再解除阻塞,这个时候如果遇到一些数据量特别大的表时,往往会造成 Flink 上游数据积压;

  2. 需要自己处理解析 SQL 语句、发起 Schema Change 及维护执行状态等操作

  3. 修改 Schema 后,需要同步修改 Flink 中 Schema 相关的参数并重启 Job

而在 1.2.0 新版本实现 Light Schema Change 后,DDL 的同步就变得非常简易。利用 Flink CDC 的 DataStream API,可获取到上游业务数据库的 DDL 变更记录,在 Doris 对应数据表中开启 Light Schema Change,即可实现 DDL 自动同步。核心步骤如下:

  1. 在 CDC Source 中开启 DDL 变更同步;

  2. 在 Doris Sink 中对上游数据进行判断,识别 DDL 操作(add/drop column)并解析;

  3. 对 Table 进行校验,判断是否可以进行 Light Schema Change 操作;

  4. 对 Table 发起 Schema Change 操作。

Light Schema Change 可以保证 DDL 在毫秒级执行完成,避免了双写以及阻塞数据的问题;

同时 Flink Doris Connector 封装了序列化类 JsonDebeziumSchemaSerializer,在作业启动的时候,只需指定序列化方式即可,无需关心 Schema Change 的底层逻辑,以及无需重启Job。

总结

通过 Light Schema Change ,使得 Apache Doris 在面对上游数据表维度变化时,可以更加快速稳定实现表结构同步,保证系统的高效且平稳运转,具体体现在:

  1. 执行效率提升明显,且存在导入任务时效率提升更为显著。Light Schema Change 无需对 Tablet 双写,无需等待导入任务完成。在相同集群下,相较于过去版本,Schema Change 效率从数秒或数分钟提升至数毫秒,极大幅度提升了执行效率。

  2. 作业流程更加简单,加减列只需修改 FE 元数据,不需要与 BE 进行交互,实现同步返回,避免较长等待时间,以及因异步长时间执行而可能导致的误操作行为,提升了系统容错性和稳健性。

  3. Flink CDC 结合 Light Schema Change 快速实时同步 DDL ,有效解决了双写以及阻塞数据等问题,避免了增删列需要修改程序且需要停服重启的操作,实现 DDL 毫秒级快速同步,进一步提升了实时数仓数据处理和分析链路的时效性与便捷性。

Apache Doris 1.2.0 版本即将发布,如果想体验最新特性欢迎大家扫码加入下方社群中,如有任何问题/建议可通过下方论坛进行反馈,社区专家将帮助你更快定位和解决问题。

GitHub 论坛:https://github.com/apache/doris/discussions/12134

技术分享

导读:Apache Doris 使用 C++ 语言实现了执行引擎,C++ 开发过程中,影响开发效率的一个重要因素是指针的使用,包括非法访问、泄露、强制类型转换等。本文将会通过对 Sanitizer 和 Core Dump 分析工具的介绍来为大家分享:如何快速定位 Apache Doris 中的 C++ 问题,帮助开发者提升开发效率并掌握更高效的开发技巧。

Apache Doris 是一款高性能 MPP 分析型数据库,出于性能的考虑,Apache Doris 使用了 C++ 语言实现了执行引擎。在 C++ 开发过程中,影响开发效率的一个重要因素是指针的使用,包括非法访问、泄露、强制类型转换等。Google Sanitizer 是由 Google 设计的用于动态代码分析的工具,在 Apache Doris 开发过程中遭遇指针使用引起的内存问题时,正是因为有了 Sanitizer,使得问题解决效率可以得到数量级的提升。除此以外,当出现一些内存越界或非法访问的情况导致 BE 进程 Crash 时,Core Dump 文件是非常有效的定位和复现问题的途径,因此一款高效分析 CoreDump 的工具也会进一步帮助更加快捷定位问题。

本文将会通过对 Sanitizer 和 Core Dump 分析工具的介绍来为大家分享:如何快速定位 Apache Doris 中的 C++ 问题,帮助开发者提升开发效率并掌握更高效的开发技巧。

Sanitizer 介绍

定位 C++ 程序内存问题常用的工具有两个,Valgrind 和 Sanitizer。

二者的对比可以参考:https://developers.redhat.com/blog/2021/05/05/memory-error-checking-in-c-and-c-comparing-sanitizers-and-valgrind

其中 Valgrind 通过运行时软件翻译二进制指令的执行获取相关的信息,所以 Valgrind 会非常大幅度的降低程序性能,这就导致在一些大型项目比如 Apache Doris 使用 Valgrind 定位内存问题效率会很低。

而 Sanitizer 则是通过编译时插入代码来捕获相关的信息,性能下降幅度比 Valgrind 小很多,使得能够在单测以及其它测试环境默认使用 Saintizer。

Sanitizer 的算法可以参考:https://github.com/google/sanitizers/wiki/AddressSanitizerAlgorithm

在 Apache Doris 中,我们通常使用 Sanirizer 来定位内存问题。LLVM 以及 GNU C++ 有多个 Sanitizer:

  • AddressSanitizer(ASan)可以发现内存错误问题,比如 use after free,heap buffer overflow,stack buffer overflow,global buffer overflow,use after return,use after scope,memory leak,super large memory allocation;
  • AddressSanitizerLeakSanitizer (LSan)可以发现内存泄露;
  • MemorySanitizer(MSan)可以发现未初始化的内存使用;
  • UndefinedBehaviorSanitizer (UBSan)可以发现未定义的行为,比如越界数组访问、数值溢出等;
  • ThreadSanitizer (TSan)可以发现线程的竞争行为;

其中 AddressSanitizer, AddressSanitizerLeakSanitizer 以及 UndefinedBehaviorSanitizer 对于解决指针相关的问题最为有效。

Sanitizer 不但能够发现错误,而且能够给出错误源头以及代码位置,这就使得问题的解决效率很高,通过一些例子来说明 Sanitizer 的易用程度。

可以参考此处使用 Sanitizer:https://github.com/apache/doris/blob/master/be/CMakeLists.txt

Sanitizer 和 Core Dump 配合定位问题非常高效,默认 Sanitizer 不生成 Core Dump 文件,可以使用如下环境变量生成 Core Dump 文件,建议默认打开。

可以参考:https://github.com/apache/doris/blob/master/bin/start_be.sh

export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1

使用如下环境变量让 UBSan 生成代码栈,默认不生成。

export UBSAN_OPTIONS=print_stacktrace=1

有时候需要显示指定 Symbolizer 二进制的位置,这样 Sanitizer 就能够直接生成可读的代码栈。

export ASAN_SYMBOLIZER_PATH=your path of llvm-symbolizer

Sanitizer 使用举例

Use after free

User after free 是指访问释放的内存,针对 use after free 错误,AddressSanitizer 能够报出使用释放地址的代码栈,地址分配的代码栈,地址释放的代码栈。比如:https://github.com/apache/doris/issues/9525中,使用释放地址的代码栈如下:

82849==ERROR: AddressSanitizer: heap-use-after-free on address 0x60300074c420 at pc 0x56510f61a4f0 bp 0x7f48079d89a0 sp 0x7f48079d8990
READ of size 1 at 0x60300074c420 thread T94 (MemTableFlushTh)
#0 0x56510f61a4ef in doris::faststring::append(void const*, unsigned long) /mnt/ssd01/tjp/incubator-doris/be/src/util/faststring.h:120
// 更详细的代码栈请前往https://github.com/apache/doris/issues/9525查看

此地址初次分配的代码栈如下:

previously allocated by thread T94 (MemTableFlushTh) here:
#0 0x56510e9b74b7 in __interceptor_malloc (/mnt/ssd01/tjp/regression_test/be/lib/palo_be+0x536a4b7)
#1 0x56510ee77745 in Allocator<false, false>::alloc_no_track(unsigned long, unsigned long) /mnt/ssd01/tjp/incubator-doris/be/src/vec/common/allocator.h:223
#2 0x56510ee68520 in Allocator<false, false>::alloc(unsigned long, unsigned long) /mnt/ssd01/tjp/incubator-doris/be/src/vec/common/allocator.h:104

地址释放的代码栈如下:

0x60300074c420 is located 16 bytes inside of 32-byte region [0x60300074c410,0x60300074c430)
freed by thread T94 (MemTableFlushTh) here:
#0 0x56510e9b7868 in realloc (/mnt/ssd01/tjp/regression_test/be/lib/palo_be+0x536a868)
#1 0x56510ee8b913 in Allocator<false, false>::realloc(void*, unsigned long, unsigned long, unsigned long) /mnt/ssd01/tjp/incubator-doris/be/src/vec/common/allocator.h:125
#2 0x56510ee814bb in void doris::vectorized::PODArrayBase<1ul, 4096ul, Allocator<false, false>, 15ul, 16ul>::realloc<>(unsigned long) /mnt/ssd01/tjp/incubator-doris/be/src/vec/common/pod_array.h:147

有了详细的非法访问地址代码栈、分配代码栈、释放代码栈,问题定位就会非常容易。

说明:限于文章篇幅,示例中的栈展示不全,完整代码栈可以前往对应 Issue 中进行查看。

heap buffer overflow

AddressSanitizer 能够报出 heap buffer overflow 的代码栈。

比如https://github.com/apache/doris/issues/5951 里的,结合运行时生成的 Core Dump 文件就可以快速定位问题。

==3930==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x60c000000878 at pc 0x000000ae00ce bp 0x7ffeb16aa660 sp 0x7ffeb16aa658
READ of size 8 at 0x60c000000878 thread T0
#0 0xae00cd in doris::StringFunctions::substring(doris_udf::FunctionContext*, doris_udf::StringVal const&, doris_udf::IntVal const&, doris_udf::IntVal const&) ../src/exprs/string_functions.cpp:98

memory leak

AddressSanitizer 能够报出哪里分配的内存没有被释放,就可以快速的分析出泄露原因。

==1504733==ERROR: LeakSanitizer: detected memory leaks
Direct leak of 688128 byte(s) in 168 object(s) allocated from:
#0 0x560d5db51aac in __interceptor_posix_memalign (/mnt/ssd01/doris-master/VEC_ASAN/be/lib/doris_be+0x9227aac)
#1 0x560d5fbb3813 in doris::CoreDataBlock::operator new(unsigned long) /home/zcp/repo_center/doris_master/be/src/util/core_local.cpp:35
#2 0x560d5fbb65ed in doris::CoreDataAllocatorImpl<8ul>::get_or_create(unsigned long) /home/zcp/repo_center/doris_master/be/src/util/core_local.cpp:58
#3 0x560d5e71a28d in doris::CoreLocalValue::CoreLocalValue(long)

https://github.com/apache/doris/issues/10926

https://github.com/apache/doris/pull/3326

异常分配

分配过大的内存 AddressSanitizer 会报出 OOM 错误,根据栈以及 Core Dump 文件可以分析出何处分配了过大内存。栈举例如下:

Fix PR 见:https://github.com/apache/doris/pull/10289

UBSan 能够高效发现强制类型转换的错误,如下方 Issue 链接中描述,它能够精确的描述出强制类型转换带来错误的代码,如果不能在第一现场发现这种错误,后续因为指针错误使用,会比较难定位。

Issue:https://github.com/apache/doris/issues/9105

UndefinedBehaviorSanitizer 也比 AddressSanitizer 及其它的更容易发现死锁。

比如:https://github.com/apache/doris/issues/10309

程序维护内存 Pool 时 AddressSanitizer 的使用

AddressSanitizer 是编译器针对内存分配、释放、访问 生成额外代码来实现内存问题分析的,如果程序维护了自己的内存 Pool,AddressSanitizer 就不能发现 Pool 中内存非法访问的问题。这种情况下需要做一些额外的工作来使得 AddressSanitizer 尽可能工作,主要是使用 ASAN_POISON_MEMORY_REGION 和 ASAN_UNPOISON_MEMORY_REGION 管理内存是否可以访问,这种方法使用比较难,因为 AddressSanitizer 内部有地址对齐等的处理。出于性能以及内存释放等原因,Apache Doris 也维护了内存分配 Pool ,这种方法不能确保 AddressSanitizer 能够发现所有问题。

可以参考:https://github.com/apache/doris/pull/8148

当程序维护自己的内存池时,按照 https://github.com/apache/dorisw/pull/8148 中方法,use after free 错误会变成 use after poison。但是 use after poison 不能够给出地址失效的栈(https://github.com/google/sanitizers/issues/191),从而导致问题的定位分析仍然很困难。

因此建议程序维护的内存 Pool 可以通过选项关闭,这样在测试环境就可以使用 AddressSanitizer 高效地定位内存问题。

Core dump 分析工具

分析 C++ 程序生成的 Core Dump 文件经常遇到的问题就是怎么打印出 STL 容器中的值以及 Boost 中容器的值,有如下三个工具可以高效的查看 STL 和 Boost 中容器的值。

STL-View

可以将此文件 https://github.com/dataroaring/tools/blob/main/gdb/dbinit_stl_views-1.03.txt 放置到~/.gdbinit 中使用 STL-View。STL-View 输出非常友好,支持 pvector,plist,plist_member,pmap,pmap_member,pset,pdequeue,pstack,pqueue,ppqueue,pbitset,pstring,pwstring。以 Apache Doris 中使用 pvector 为例,它能够输出 vector 中的所有元素。

(gdb) pvector block.data
elem[0]: $5 = {
column = {
<COW<doris::vectorized::IColumn>::intrusive_ptr<doris::vectorized::IColumn const>> = {
t = 0x606000fdc820
}, <No data fields>},
type = {
<std::__shared_ptr<doris::vectorized::IDataType const, (__gnu_cxx::_Lock_policy)2>> = {
<std::__shared_ptr_access<doris::vectorized::IDataType const, (__gnu_cxx::_Lock_policy)2, false, false>> = {<No data fields>},
members of std::__shared_ptr<doris::vectorized::IDataType const, (__gnu_cxx::_Lock_policy)2>:
_M_ptr = 0x6030069e9780,
_M_refcount = {
_M_pi = 0x6030069e9770
}
}, <No data fields>},
name = {
static npos = 18446744073709551615,
_M_dataplus = {
<std::allocator<char>> = {
<__gnu_cxx::new_allocator<char>> = {<No data fields>}, <No data fields>},
members of std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >::_Alloc_hider:
_M_p = 0x61400006e068 "n_nationkey"
},
_M_string_length = 11,
{
_M_local_buf = "n_nationkey\000\276\276\276\276",
_M_allocated_capacity = 7957695015158701934
}
}
}
elem[1]: $6 = {
column = {
<COW<doris::vectorized::IColumn>::intrusive_ptr<doris::vectorized::IColumn const>> = {
t = 0x6080001ec220
}, <No data fields>},
type = {
...

Pretty-Printer

GCC 7.0 开始支持了 Pretty-Printer 打印 STL 容器,可以将以下代码放置到~/.gdbinit 中使 Pretty-Printer 生效。

注意:/usr/share/gcc/python 需要更换为本机对应的地址。

python
import sys
sys.path.insert(0, '/usr/share/gcc/python')
from libstdcxx.v6.printers import register_libstdcxx_printers
register_libstdcxx_printers (None)
end

以 vector 为例, Pretty-Printer 能够打印出详细内容。

(gdb) p block.data
$1 = std::vector of length 7, capacity 8 = {{
column = {
<COW<doris::vectorized::IColumn>::intrusive_ptr<doris::vectorized::IColumn const>> = {
t = 0x606000fdc820
}, <No data fields>},
type = std::shared_ptr<const doris::vectorized::IDataType> (use count 1, weak count 0) = {
get() = 0x6030069e9780
},
name = "n_nationkey"
}, {
column = {
<COW<doris::vectorized::IColumn>::intrusive_ptr<doris::vectorized::IColumn const>> = {
t = 0x6080001ec220
}, <No data fields>},
type = std::shared_ptr<const doris::vectorized::IDataType> (use count 1, weak count 0) = {
get() = 0x6030069e9750
},
name = "n_name"
}, {
column = {
<COW<doris::vectorized::IColumn>::intrusive_ptr<doris::vectorized::IColumn const>> = {
t = 0x606000fd52c0
}, <No data fields>},
type = std::shared_ptr<const doris::vectorized::IDataType> (use count 1, weak count 0) = {
get() = 0x6030069e9720
},
name = "n_regionkey"
}, {
column = {
<COW<doris::vectorized::IColumn>::intrusive_ptr<doris::vectorized::IColumn const>> = {
t = 0x6030069e96b0
}, <No data fields>},
type = std::shared_ptr<const doris::vectorized::IDataType> (use count 1, weak count 0) = {
get() = 0x604000a66160
},
name = "n_comment"

Boost Pretty Printer

因为 Apache Doris 使用 Boost 不多,因此不再举例。

可以参考:https://github.com/ruediger/Boost-Pretty-Printer

总结

有了 Sanitizer 能够在单测、功能、集成、压力测试环境及时发现问题,最重要的是大多数时候都可以给出程序出问题的关联现场,比如内存分配的调用栈,释放内存的调用栈,非法访问内存的调用栈,配合 Core Dump 可以查看现场状态,解决 C++ 内存问题从猜测变成了有证据的现场分析。

作者介绍:杨勇强,SelectDB 联合创始人兼产品 VP,同时也是 Apache Doris Committer。曾担任百度智能云存储部总架构师,主导构建了云存储技术产品体系,是 Linux 内核社区贡献者。

— End —

相关链接:

SelectDB 官方网站:

https://selectdb.com

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

技术分享

导读:Apache Doris Routine Load 支持了将 Kafka 数据接入 Apache Doris,并保障了数据接入过程中的事务性操作。Apache Pulsar 定位为一个云原生时代企业级的消息发布和订阅系统。那么 Apache Pulsar 用户如何将数据接入 Apache Doris 呢?本次分享将介绍利用 KoP 如何将 Pulsar 数据快速且无缝接入 Apache Doris。

KoP 架构介绍:

KoP 是 Kafka on Pulsar 的简写,顾名思义就是如何在 Pulsar 上实现对 Kafka 数据的读写。KoP 将 Kafka 协议处理插件引入 Pulsar Broker 来实现 Apache Pulsar 对 Apache Kafka 协议的支持。将 KoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 Kafka 应用程序和服务迁移到 Pulsar。

Apache Pulsar 主要特点如下:

  • 利用企业级多租户特性简化运营。
  • 避免数据搬迁,简化操作。
  • 利用 Apache BookKeeper 和分层存储持久保留事件流。
  • 利用 Pulsar Functions 进行无服务器化事件处理。

KoP 架构如下图,通过图可以看到 KoP 引入一个新的协议处理插件,该协议处理插件利用 Pulsar 的现有组件(例如 Topic 发现、分布式日志库-ManagedLedger、cursor 等)来实现 Kafka 传输协议。

kop架构

Routine Load 订阅 Pulsar 数据思路

Apache Doris Routine Load 支持了将 Kafka 数据接入 Apache Doris,并保障了数据接入过程中的事务性操作。Apache Pulsar 定位为一个云原生时代企业级的消息发布和订阅系统,已经在很多线上服务使用。那么 Apache Pulsar 用户如何将数据数据接入 Apache Doris 呢,答案是通过 KoP 实现。

由于 Kop 直接在 Pulsar 侧提供了对 Kafka 的兼容,那么对于 Apache Doris 来说可以像使用 Kafka 一样使用 Plusar。整个过程对于 Apache Doris 来说无需任务改变,就能将 Pulsar 数据接入 Apache Doris,并且可以获得 Routine Load 的事务性保障。

--------------------------
| Apache Doris |
| --------------- |
| | Routine Load | |
| --------------- |
--------------------------
|Kafka Protocol(librdkafka)
------------v--------------
| --------------- |
| | KoP | |
| --------------- |
| Apache Pulsar |
--------------------------

操作实战

一. Pulsar Standalone 安装环境准备:

  1. JDK 安装:略
  2. 下载 Pulsar 二进制包,并解压:
#下载
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#解压并进入安装目录
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0

二. KoP 组件编译和安装:

  1. 下载 KoP 源码
git clone https://github.com/streamnative/kop.git
cd kop
  1. 编译 KoP 项目:
mvn clean install -DskipTests
  1. protocols 配置:在解压后的 apache-pulsar 目录下创建 protocols文 件夹,并把编译好的 nar 包复制到 protocols 文件夹中。
mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols
  1. 添加后的结果查看:
[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar

三. KoP 配置添加:

  1. 在 standalone.conf 或者 broker.conf 添加如下配置
#kop适配的协议
messagingProtocols=kafka
#kop 的NAR文件路径
protocolHandlerDirectory=./protocols
#是否允许自动创建topic
allowAutoTopicCreationType=partitioned
  1. 添加如下服务监听配置
# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0 
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false

​ 当出现如下错误:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.

​ 添加如下配置,开启 transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true

这个错误一定要修复,不然看到的现象就是使用 Kafka 自带的工具:bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh在Pulsar 上进行数据的生产和消费正常,但是在 Apache Doris 中数据无法同步过来。

四. Pulsar 启动

#前台启动
#bin/pulsar standalone
#后台启动
pulsar-daemon start standalone

五. 创建 Doris 数据库和建表

#进入Doris
mysql -u root -h 127.0.0.1 -P 9030
# 创建数据库
create database pulsar_doris;
#切换数据库
use pulsar_doris;
#创建clicklog表
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
(
`clickTime` DATETIME NOT NULL COMMENT "点击时间",
`type` String NOT NULL COMMENT "点击类型",
`id` VARCHAR(100) COMMENT "唯一id",
`user` VARCHAR(100) COMMENT "用户名称",
`city` VARCHAR(50) COMMENT "所在城市"
)
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

六. 创建 Routine Load 任务

CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "test",
"property.group.id" = "doris"
);

上述命令中的参数解释如下:

  • pulsar_doris :Routine Load 任务所在的数据库
  • load_from_pulsar_test:Routine Load 任务名称
  • clicklog:Routine Load 任务的目标表,也就是配置 Routine Load 任务将数据导入到 Doris 哪个表中。
  • strict_mode:导入是否为严格模式,这里设置为 False。
  • format:导入数据的类型,这里配置为 Json。
  • kafka_broker_list:Kafka Broker 服务的地址
  • kafka_broker_list:Kafka Topic 名称,也就是同步哪个 Topic 上的数据。
  • property.group.id:消费组 ID

七. 数据导入和测试

  1. 数据导入

​ 构造一个 ClickLog 的数据结构,并调用 Kafka 的 Producer 发送 5000 万条数据到 Pulsar。

​ ClickLog 数据结构如下:

public class ClickLog {
private String id;
private String user;
private String city;
private String clickTime;
private String type;
... //省略getter和setter
}

​ 消息构造和发送的核心代码逻辑如下:

       String strDateFormat = "yyyy-MM-dd HH:mm:ss";
@Autowired
private Producer producer;
try {
for(int j =0 ; j<50000;j++){
int batchSize = 1000;
for(int i = 0 ; i<batchSize ;i++){
ClickLog clickLog = new ClickLog();
clickLog.setId(UUID.randomUUID().toString());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
clickLog.setClickTime(simpleDateFormat.format(new Date()));
clickLog.setType("webset");
clickLog.setUser("user"+ new Random().nextInt(1000) +i);
producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));
}
}
} catch (Exception e) {
e.printStackTrace();
}
  1. ROUTINE LOAD 任务查看 执行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;命令,查看导入任务的状态。
mysql>  SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;
*************************** 1. row ***************************
Id: 87873
Name: load_from_pulsar_test
CreateTime: 2022-05-31 12:03:34
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:pulsar_doris
TableName: clicklog1
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
Progress: {"0":"51139566"}
Lag: {"0":0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
ERROR:
No query specified

从上面结果可以看到 totalRows 为 50000000,errorRows 为 0。说明数据不丢不重的导入 Apache Doris 了。

  1. 数据统计验证

    执行如下命令统计表中的数据,发现统计的结果也是 50000000,符合预期。

mysql> select count(*) from clicklog;
+----------+
| count(*) |
+----------+
| 50000000 |
+----------+
1 row in set (3.73 sec)
mysql>

通过 KoP 我们实现了将 Apache Pulsar 数据无缝接入 Apache Doris ,无需对 Routine Load 任务进行任何修改,并保障了数据导入过程中的事务性。与此同时,Apache Doris 社区已经启动了 Apache Pulsar 原生导入支持的设计,相信在不久后就可以直接订阅 Pulsar 中的消息数据,并保证数据导入过程中的 Exactly-Once 语义。

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

技术分享

导读:随着数据实时化需求的日益增多,数据的时效性对企业的精细化运营越来越重要,使得实时数仓在这一过程中起到了不可替代的作用。本文将基于用户遇到的问题与挑战,揭秘 Apache Doris 1.1 特性,对 Flink 实时写入 Apache Doris 的优化实现与未来规划进行详细的介绍。

背景

随着数据实时化需求的日益增多,数据的时效性对企业的精细化运营越来越重要,在海量数据中,如何能实时有效的挖掘出有价值的信息,快速的获取数据反馈,协助公司更快的做出决策,更好的进行产品迭代,实时数仓在这一过程中起到了不可替代的作用

在这种形势下,Apache Doris 作为一款实时 MPP 分析型数据库脱颖而出,同时具备高性能、简单易用等特性,具有丰富的数据接入方式,结合 Flink 流式计算,可以让用户快速将 Kafka 中的非结构化数据以及 MySQL 等上游业务库中的变更数据,快速同步到 Doris 实时数仓中,同时 Doris 提供亚秒级分析查询的能力,可以有效地满足实时 OLAP、实时数据看板以及实时数据服务等场景的需求。

挑战

通常实时数仓要保证端到端高并发以及低延迟,往往面临诸多挑战,比如:

  • 如何保证端到端的秒级别数据同步
  • 如何快速保证数据可见性
  • 在高并发大压力下,如何解决大量小文件写入的问题?
  • 如何确保端到端的 Exactly Once 语义?

结合这些挑战,同时对用户使用 Flink+Doris 构建实时数仓的业务场景进行深入调研,在掌握了用户使用的痛点之后,我们在 Doris 1.1 版本中进行了针对性的优化,大幅提升实时数仓构建的用户体验,同时提升系统的稳定性,系统资源消耗也得到了大幅的优化。

优化

流式写入

Flink Doris Connector 最初的做法是在接收到数据后,缓存到内存 Batch 中,通过攒批的方式进行写入,同时使用 batch.size、batch.interval 等参数来控制 Stream Load 写入的时机。这种方式通常在参数合理的情况下可以稳定运行,一旦参数不合理导致频繁的 Stream Load,便会引发 Compaction 不及时,从而导致 version 过多的错误(-235);其次,当数据过多时,为了减少 Stream Load 的写入时机,batch.size 过大的设置还可能会引发 Flink 任务的 OOM。为了解决这个问题,我们引入了流式写入

  1. Flink 任务启动后,会异步发起一个 Stream Load 的 Http 请求。

  2. 接收到实时数据后,通过 Http 的分块传输编码(Chunked transfer encoding)机制持续向 Doris 传输数据。

  3. 在 Checkpoint 时结束 Http 请求,完成本次 Stream Load 写入,同时异步发起下一次 Stream Load 的请求。

  4. 继续接收实时数据,后续流程同上。

由于采用 Chunked 机制传输数据,就避免了攒批对内存的压力,同时将写入的时机和 Checkpoint 绑定起来,使得 Stream Load 的时机可控,并且为下面的 Exactly-Once 语义提供了基础。

Exactly-Once

Exactly-Once 语义是指即使在机器或应用出现故障的情况下,也不会重复处理数据或者丢失数据。Flink 很早就支持 End-to-End 的 Exactly-Once 场景,主要是通过两阶段提交协议来实现 Sink 算子的 Exactly-Once 语义。在 Flink 两阶段提交的基础上,同时借助 Doris 1.0 的 Stream Load 两阶段提交,Flink Doris Connector 实现了 Exactly Once 语义,具体原理如下:

  1. Flink 任务在启动的时候,会发起一个 Stream Load 的 PreCommit 请求,此时会先开启一个事务,同时会通过 Http 的 Chunked 机制将数据持续发送到 Doris。

  1. 在 Checkpoint 时,结束数据写入,同时完成 Http 请求,并且将事务状态设置为预提交(PreCommitted),此时数据已经写入 BE,对用户不可见。

  1. Checkpoint 完成后,发起 Commit 请求,并且将事务状态设置为提交(Committed),完成后数据对用户可见。

  1. Flink 应用意外挂掉后,从 Checkpoint 重启时,若上次事务为预提交(PreCommitted)状态,则会发起回滚请求,并且将事务状态设置为 Aborted。

基于此,可以借助 Flink Doris Connector 实现数据实时入库时数据不丢不重。

秒级别数据同步

高并发写入场景下的端到端秒级别数据同步以及数据的实时可见能力,需要 Doris 具备如下几方面的能力:

事务处理能力

Flink 实时写入以 Stream Load 2PC 的方式与 Doris 进行交互,需要 Doris 具备对应的事务处理能力,保障事务基本的 ACID 特性,在高并发场景下支撑 Flink 秒级别的数据同步。

数据版本的快速聚合能力

Doris 里面一次导入会产生一个数据版本,在高并发写入场景下必然带来的一个影响是数据版本过多,且单次导入的数据量不会太大。持续的高并发小文件写入场景对 Doris 并不友好,极其考验 Doris 数据合并的实时性以及性能,进而会影响到查询的性能。Doris 在 1.1 中大幅增强了数据 Compaction 能力,对于新增数据能够快速完成聚合,避免分片数据中的版本过多导致的 -235 错误以及带来的查询效率问题。

首先,在 Doris 1.1 版本中,引入了 QuickCompaction,增加了主动触发式的 Compaction 检查,在数据版本增加的时候主动触发 Compaction。同时通过提升分片元信息扫描的能力,快速的发现数据版本多的分片,触发 Compaction。通过主动式触发加被动式扫描的方式,彻底解决数据合并的实时性问题。

同时,针对高频的小文件 Cumulative Compaction,实现了 Compaction 任务的调度隔离,防止重量级的 Base Compaction 对新增数据的合并造成影响。

最后,针对小文件合并,优化了小文件合并的策略,采用梯度合并的方式,每次参与合并的文件都属于同一个数据量级,防止大小差别很大的版本进行合并,逐渐有层次的合并,减少单个文件参与合并的次数,能够大幅的节省系统的 CPU 消耗。

Doris 1.1 对高并发导入、秒级别数据同步、数据实时可见等场景都做了针对性优化,大大增加了 Flink + Doris 系统的易用性以及稳定性,节省了集群整体资源。

效果

在调研的通用场景中,使用 Flink 同步上游 Kafka 中的非结构化数据,经过 ETL 后使用 Flink Doris Connector 将数据实时写入 Doris 中。这里客户场景极其严苛,上游维持以每秒 10w 的超高频率写入,需要数据能够在 5s 内完成上下游同步,实现秒级别的数据可见。这里 Flink 配置为 20 并发,Checkpoint 间隔 5s,Doris 1.1 的表现相当优异。具体体现在如下几个方面:

Compaction 实时性

数据能快速合并,Tablet 数据版本个数维持在 50 以下, Compaction Score 稳定。相比于之前高并发导入频出的 -235 问题,Compaction 合并效率有 10+ 倍提升

CPU 资源消耗

Doris 1.1 针对小文件的 Compaction 进行了策略优化,在上述高并发导入场景,CPU 资源消耗下降 25%。

QPS 查询延迟稳定

通过降低 CPU 使用率,减少数据版本的个数,提升了数据整体有序性,从而减少了 SQL 查询的延迟。

秒级别数据同步场景(极限大压力)

单 BE 单 Tablet,客户端 30 并发极限 Stream Load 压测,数据在实时性<1s,Compaction Score 优化前后对比

使用建议

数据实时可见场景

对延迟要求特别严格的场景,比如秒级别数据同步,通常意味着单次导入文件较小,此时建议调小 cumulative_size_based_promotion_min_size_mbytes,单位是 MB,默认 64,可以设置成 8,能够很大程度提升 Compaction 的实时性。

高并发场景

对于高并发的写入场景,可以通过增加 Checkpoint 的间隔来减少 Stream Load 的频率,比如 Checkpoint 可以设置为 5-10s,不仅可以增加 Flink 任务的吞吐,也可以减少小文件的产生,避免给 Compaction 造成更多压力。

此外,对数据实时性要求不高的场景,比如分钟级别的数据同步,可以增加 Checkpoint 的间隔,比如 5-10 分钟,此时 Flink Doris Connector 依然能够通过两阶段提交 +checkpoint 机制来保证数据的完整性。

未来规划

实时 Schema Change

目前通过 Flink CDC 实时接入数据时,当上游业务表进行 Schema Change 操作时,必须先手动修改 Doris 中的 Schema 和 Flink 任务中的 Schema,最后再重启任务,新的 Schema 的数据才可以同步过来。这样使用方式需要人为的介入,会给用户带来极大的运维负担。后续会针对 CDC 场景做到支持 Schema 实时变更,上游的 Schema Change 实时同步到下游,全面提升 Schema Change 的效率。

Doris 多表写入

目前 Doris Sink 算子仅支持同步单张表,所以对于整库同步的操作,需要手动在 Flink 层面进行分流,写到多个 Doris Sink 中,这无疑增加了开发者的难度,在后续版本中我们也将支持单个 Doris Sink 同步多张表,这样就大大的简化了用户的操作。

自适应的 Compaction 参数调优

目前 Compaction 策略参数较多,在大部分通用场景能发挥较好的效果,但是在一些特殊场景下并不能高效的发挥作用。我们将在后续版本中持续优化,针对不同的场景,进行自适应的 Compaction 调优,在各类场景下提高数据合并效率,提升实时性。

单副本 Compaction

目前的 Compaction 策略是各 BE 单独进行,在后续版本中我们将实现单副本 Compaction,通过克隆快照的方式实现 Compaction 任务,减少集群 2/3 的 Compaction 任务,降低系统的负载,把更多的系统资源留给用户侧。

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

技术分享

1.概览

这篇教程将展示如何使用 Flink CDC + Iceberg + Doris 构建实时湖仓一体的联邦查询分析,Doris 1.1 版本提供了 Iceberg 的支持,本文主要展示 Doris 和 Iceberg 怎么使用,同时本教程整个环境是都基于伪分布式环境搭建,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。

1.1 软件环境

本教程的演示环境如下:

  1. Centos7
  2. Apache doris 1.1
  3. Hadoop 3.3.3
  4. hive 3.1.3
  5. Fink 1.14.4
  6. flink-sql-connector-mysql-cdc-2.2.1
  7. Apache Iceberg 0.13.2
  8. JDK 1.8.0_311
  9. MySQL 8.0.29
wget https://archive.apache.org/dist/hadoop/core/hadoop-3.3.3/hadoop-3.3.3.tar.gz
wget https://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

1.2 系统架构

  1. 首先我们从 Mysql 数据中使用 Flink 通过 Binlog 完成数据的实时采集
  2. 然后再 Flink 中创建 Iceberg 表,Iceberg 的元数据保存在 hive 里
  3. 最后我们在 Doris 中创建 Iceberg 外表
  4. 在通过 Doris 统一查询入口完成对 Iceberg 里的数据进行查询分析,供前端应用调用,这里 iceberg 外表的数据可以和 Doris 内部数据或者 Doris 其他外部数据源的数据进行关联查询分析

Doris 湖仓一体的联邦查询:

  1. Doris 通过 ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer
  2. 同时支持 Elasticsearch 外表
  3. 1.0 版本支持 Hive 外表
  4. 1.1 版本支持 Iceberg 外表
  5. 1.2 版本支持 Hudi 外表

2.环境安装部署

2.1 安装 Hadoop、Hive

tar zxvf hadoop-3.3.3.tar.gz
tar zxvf apache-hive-3.1.3-bin.tar.gz

配置系统环境变量

export HADOOP_HOME=/data/hadoop-3.3.3
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HIVE_HOME=/data/hive-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HIVE_HOME/bin:$HIVE_HOME/conf

2.2 配置 hdfs

2.2.1 core-site.xml

vi etc/hadoop/core-site.xml

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

2.2.2 hdfs-site.xml

vi etc/hadoop/hdfs-site.xml

  <configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/hdfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/hdfs/datanode</value>
</property>
</configuration>

2.2.3 修改 Hadoop 启动脚本

sbin/start-dfs.sh

sbin/stop-dfs.sh

在文件开始加上下面的内容

HDFS_DATANODE_USER=root
HADOOP_SECURE_DN_USER=hdfs
HDFS_NAMENODE_USER=root
HDFS_SECONDARYNAMENODE_USER=root

sbin/start-yarn.sh

sbin/stop-yarn.sh

在文件开始加上下面的内容

YARN_RESOURCEMANAGER_USER=root
HADOOP_SECURE_DN_USER=yarn
YARN_NODEMANAGER_USER=root

2.3 配置 yarn

这里我改变了 Yarn 的一些端口,因为我是单机环境和 Doris 的一些端口冲突。你可以不启动 yarn

vi etc/hadoop/yarn-site.xml

<property>
<name>yarn.resourcemanager.address</name>
<value>jiafeng-test:50056</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>jiafeng-test:50057</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>jiafeng-test:50058</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>jiafeng-test:50059</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>jiafeng-test:9090</value>
</property>
<property>
<name>yarn.nodemanager.localizer.address</name>
<value>0.0.0.0:50060</value>
</property>
<property>
<name>yarn.nodemanager.webapp.address</name>
<value>0.0.0.0:50062</value>
</property>

vi etc/hadoop/mapred-site.xm

<property>
<name>mapreduce.jobhistory.address</name>
<value>0.0.0.0:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>0.0.0.0:19888</value>
</property>
<property>
<name>mapreduce.shuffle.port</name>
<value>50061</value>
</property>

2.2.4 启动 hadoop

sbin/start-all.sh

2.4 配置 Hive

2.4.1 创建 hdfs 目录

hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -mkdir /tmp
hdfs dfs -chmod g+w /user/hive/warehouse
hdfs dfs -chmod g+w /tmp

2.4.2 配置 hive-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>MyNewPass4!</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
<description>location of default database for the warehouse</description>
</property>
<property>
<name>hive.metastore.uris</name>
<value/>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>javax.jdo.PersistenceManagerFactoryClass</name>
<value>org.datanucleus.api.jdo.JDOPersistenceManagerFactory</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
</configuration>

2.4.3 配置 hive-env.sh

加入一下内容

HADOOP_HOME=/data/hadoop-3.3.3

2.4.4 hive 元数据初始化

schematool -initSchema -dbType mysql

2.4.5 启动 hive metaservice

后台运行

nohup bin/hive --service metaservice 1>/dev/null 2>&1 &

验证

lsof -i:9083
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 20700 root 567u IPv6 54605348 0t0 TCP *:emc-pp-mgmtsvc (LISTEN)

2.5 安装 MySQL

具体请参照这里:

使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

2.5.1 创建 MySQL 数据库表并初始化数据

CREATE DATABASE demo;
USE demo;
CREATE TABLE userinfo (
id int NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255),
PRIMARY KEY (`id`)
)ENGINE=InnoDB ;
INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);
tar zxvf flink-1.14.4-bin-scala_2.12.tgz

然后需要将相应的依赖拷贝到 Flink 安装目录下的 lib 目录下。

下面将几个 Hadoop 和 Flink 里没有的依赖下载地址放在下面

wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
wget https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.2/iceberg-flink-runtime-1.14-0.13.2.jar
wget https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/flink/flink-shaded-hadoop-3-uber/3.1.1.7.2.9.0-173-9.0/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar

其他的:

hadoop-3.3.3/share/hadoop/common/lib/commons-configuration2-2.1.1.jar
hadoop-3.3.3/share/hadoop/common/lib/commons-logging-1.1.3.jar
hadoop-3.3.3/share/hadoop/tools/lib/hadoop-archive-logs-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/lib/hadoop-auth-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/lib/hadoop-annotations-3.3.3.jar
hadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jar
adoop-3.3.3/share/hadoop/hdfs/hadoop-hdfs-3.3.3.jar
hadoop-3.3.3/share/hadoop/client/hadoop-client-api-3.3.3.jar
hive-3.1.3/lib/hive-exec-3.1.3.jar
hive-3.1.3/lib/hive-metastore-3.1.3.jar
hive-3.1.3/lib/hive-hcatalog-core-3.1.3.jar
bin/start-cluster.sh
 bin/sql-client.sh embedded

开启 checkpoint,每隔 3 秒做一次 checkpoint

Checkpoint 默认是不开启的,我们需要开启 Checkpoint 来让 Iceberg 可以提交事务。 并且,mysql-cdc 在 binlog 读取阶段开始前,需要等待一个完整的 checkpoint 来避免 binlog 记录乱序的情况。

注意:

这里是演示环境,checkpoint 的间隔设置比较短,线上使用,建议设置为 3-5 分钟一次 checkpoint。

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.

2.6.3 创建 Iceberg Catalog

CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
);

查看 catalog

Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| hive_catalog |
+-----------------+
2 rows in set

2.6.4 创建 Mysql CDC 表

 CREATE TABLE user_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'MyNewPass4!',
'database-name' = 'demo',
'table-name' = 'userinfo'
);

查询 CDC 表:

select * from user_source;

2.6.5 创建 Iceberg 表

---查看catalog
show catalogs;
---使用catalog
use catalog hive_catalog;
--创建数据库
CREATE DATABASE iceberg_hive;
--使用数据库
use iceberg_hive;

2.6.5.1 创建表
CREATE TABLE all_users_info (
database_name STRING,
table_name STRING,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
) WITH (
'catalog-type'='hive'
);

从 CDC 表里插入数据到 Iceberg 表里

use catalog default_catalog;

insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;

然后停掉任务,我们去查询 iceberg 表

select * from hive_catalog.iceberg_hive.all_users_info

我们也可以通过 Hive 建好 Iceberg 表,然后通过 Flink 将数据插入到表里

下载 Iceberg Hive 运行依赖

 wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.2/iceberg-hive-runtime-0.13.2.jar

在 hive shell 下执行:

SET engine.hive.enabled=true;
SET iceberg.engine.hive.enabled=true;
SET iceberg.mr.catalog=hive;
add jar /path/to/iiceberg-hive-runtime-0.13.2.jar;

创建表

CREATE EXTERNAL TABLE iceberg_hive(
`id` int,
`name` string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
TBLPROPERTIES (
'iceberg.mr.catalog'='hadoop',
'iceberg.mr.catalog.hadoop.warehouse.location'='hdfs://localhost:8020/user/hive/warehouse/iceber_db/iceberg_hive'
);

然后再 Flink SQL Client 下执行下面语句将数据插入到 Iceberg 表里

INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(2, 'c');
INSERT INTO hive_catalog.iceberg_hive.iceberg_hive values(3, 'zhangfeng');

查询这个表

select * from hive_catalog.iceberg_hive.iceberg_hive

3. Doris 查询 Iceberg

Apache Doris 提供了 Doris 直接访问 Iceberg 外部表的能力,外部表省去了繁琐的数据导入工作,并借助 Doris 本身的 OLAP 的能力来解决 Iceberg 表的数据分析问题:

  1. 支持 Iceberg 数据源接入 Doris
  2. 支持 Doris 与 Iceberg 数据源中的表联合查询,进行更加复杂的分析操作

3.1 安装 Doris

这里我们不在详细讲解 Doris 的安装,如果你不知道怎么安装 Doris 请参照官方文档:快速入门

3.2 创建 Iceberg 外表

CREATE TABLE `all_users_info`
ENGINE = ICEBERG
PROPERTIES (
"iceberg.database" = "iceberg_hive",
"iceberg.table" = "all_users_info",
"iceberg.hive.metastore.uris" = "thrift://localhost:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);

参数说明:

  • ENGINE 需要指定为 ICEBERG
  • PROPERTIES 属性:
    • iceberg.hive.metastore.uris:Hive Metastore 服务地址
    • iceberg.database:挂载 Iceberg 对应的数据库名
    • iceberg.table:挂载 Iceberg 对应的表名,挂载 Iceberg database 时无需指定。
    • iceberg.catalog.type:Iceberg 中使用的 catalog 方式,默认为 HIVE_CATALOG,当前仅支持该方式,后续会支持更多的 Iceberg catalog 接入方式。
mysql> CREATE TABLE `all_users_info`
-> ENGINE = ICEBERG
-> PROPERTIES (
-> "iceberg.database" = "iceberg_hive",
-> "iceberg.table" = "all_users_info",
-> "iceberg.hive.metastore.uris" = "thrift://localhost:9083",
-> "iceberg.catalog.type" = "HIVE_CATALOG"
-> );
Query OK, 0 rows affected (0.23 sec)

mysql> select * from all_users_info;
+---------------+------------+-------+----------+-----------+--------------+-------+
| database_name | table_name | id | name | address | phone_number | email |
+---------------+------------+-------+----------+-----------+--------------+-------+
| demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL |
| demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL |
| demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL |
| demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL |
| demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL |
| demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL |
| demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL |
| demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL |
| demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |
+---------------+------------+-------+----------+-----------+--------------+-------+
9 rows in set (0.18 sec)

3.3 同步挂在

当 Iceberg 表 Schema 发生变更时,可以通过 REFRESH 命令手动同步,该命令会将 Doris 中的 Iceberg 外表删除重建。

-- 同步 Iceberg 表
REFRESH TABLE t_iceberg;

-- 同步 Iceberg 数据库
REFRESH DATABASE iceberg_test_db;

3.4 Doris 和 Iceberg 数据类型对应关系

支持的 Iceberg 列类型与 Doris 对应关系如下表:

IcebergDoris描述
BOOLEANBOOLEAN
INTEGERINT
LONGBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMESTAMPDATETIMETimestamp 转成 Datetime 会损失精度
STRINGSTRING
UUIDVARCHAR使用 VARCHAR 来代替
DECIMALDECIMAL
TIME-不支持
FIXED-不支持
BINARY-不支持
STRUCT-不支持
LIST-不支持
MAP-不支持

3.5 注意事项

  • Iceberg 表 Schema 变更不会自动同步,需要在 Doris 中通过 REFRESH 命令同步 Iceberg 外表或数据库。
  • 当前默认支持的 Iceberg 版本为 0.12.0,0.13.x,未在其他版本进行测试。后续后支持更多版本。

3.6 Doris FE 配置

下面几个配置属于 Iceberg 外表系统级别的配置,可以通过修改 fe.conf 来配置,也可以通过 ADMIN SET CONFIG 来配置。

  • iceberg_table_creation_strict_mode

    创建 Iceberg 表默认开启 strict mode。 strict mode 是指对 Iceberg 表的列类型进行严格过滤,如果有 Doris 目前不支持的数据类型,则创建外表失败。

  • iceberg_table_creation_interval_second

    自动创建 Iceberg 表的后台任务执行间隔,默认为 10s。

  • max_iceberg_table_creation_record_size

    Iceberg 表创建记录保留的最大值,默认为 2000. 仅针对创建 Iceberg 数据库记录。

4. 总结

这里 Doris On Iceberg 我们只演示了 Iceberg 单表的查询,你还可以联合 Doris 的表,或者其他的 ODBC 外表,Hive 外表,ES 外表等进行联合查询分析,通过 Doris 对外提供统一的查询分析入口。

自此我们完整从搭建 Hadoop,hive、flink 、Mysql、Doris 及 Doris On Iceberg 的使用全部介绍完了,Doris 朝着数据仓库和数据融合的架构演进,支持湖仓一体的联邦查询,给我们的开发带来更多的便利,更高效的开发,省去了很多数据同步的繁琐工作,快快来体验吧。

相关链接:

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris