作者:陈胜威、舒建明 芒果TV 大数据架构师
StarRocks 小编导语:
芒果 TV 作为湖南广电旗下互联网视频平台,在“一云多屏,多元一体”的战略指导下,通过内容自制,培植核心竞争力,从独播、独特走向独创,并通过市场化运作完成 A 轮、B 轮融资,并于 2018 年 6 月,顺利实现资产重组,成为国内 A 股首家国有控股的视频平台。
芒果TV 基于StarRocks 构建了云原生实时湖仓,并已经大规模投产,线上性能提升至原有的 3-5 倍。通过 StarRocks,芒果TV 不仅统一了实时和离线的分析架构,提供了统一的分析体验,更在存算分离的模式下完成了验证上线,进一步增强集群的扩展性并降低成本。
#01
复杂挑战与革新:芒果TV 的统一分析之路
芒果TV 的数据管理部在集团内部承担了重要的数据分析任务,服务范围广泛、形式多样,包括生成固定报表、自定义报表、adhoc 查询、以及多种数据应用。自 2018 年起,芒果TV 全面采用 Hadoop 架构,并引入 Trino 作为主要的查询引擎,以满足各种数据需求。
然而,随着平台接入的业务越来越多,数据量越来越大,业务分析需求也变得更加实时、更加复杂和精细,Trino 的性能逐渐出现瓶颈。总结来说,平台面临两大主要挑战:
- 实时分析与离线分析共存
除了传统的离线分析,随着数据在业务中的应用不断深化,对实时分析的需求迫切增长。例如,需要实时统计计算一些关键业务指标(如 PV、UV 等),以及进行活动促销的实时数据分析等任务。这些都需要查询引擎能够高效地处理实时数据,并提供方便而极速的查询能力。
- 数据分析需求非常复杂
业务需求的复杂性直接反映在分析需求的复杂性上,其中包括一些典型的复杂场景(例如用户行为分析)以及一些复杂的业务指标计算(例如会员实时分摊、节目热度榜计算等)。这些任务不仅需要复杂的 SQL 查询,还可能需要使用大量用户自定义函数(UDF)。此外,平台还提供了用户自定义 adhoc 查询的功能,允许用户根据他们个性化的需求来自定义查询,从而可能使查询变得非常复杂。因此,我们需要一个强大的查询引擎,能够轻松应对这种复杂性,并提供极速的查询性能。
考虑到如此多样复杂的分析需求,最直观的解决方案可能是针对不同场景分别构建 OLAP 数据库,但由于芒果TV 内 Hadoop 数据体系已经存在并运行了相当长时间,因此无论是数据迁移还是查询迁移都面临着巨大的挑战。
因此,我们亟需一种统一的解决方案,以最大程度地利用现有资源,同时确保分析的统一性。即既能满足实时分析,又能满足离线数据即席查询,同时对复杂查询也能有优异的性能表现。
#02
为什么选择 StarRocks
在 2022 年 11 月,我们开始了 OLAP 迭代选型的工作。在选型过程中,我们明确了主要的七个需求:
-
极致的查询性能: 包括单表查询以及多表联合查询
-
实时分析能力: 支持数据的流式写入
-
灵活的分析能力: 自定义 UDF/UDAF 开发、维护方便,日常分析查询,特别是行为分析中需要使用大量的自定义的 UDF/UDAF
-
联邦查询能力: 支持常用的联邦查询,避免数据搬迁
-
良好的可扩展性: 支持存算分离架构
-
易于迁移: 支持标准化 SQL,且 SQL 语法最好能与 Trino 完全兼容
-
架构简单、易维护: 架构简单不依赖于过多其他组件,集群的扩容和缩容较方便,社区活跃便于维护与升级,遇到棘手问题能够得到社区支持
基于上述需求,我们进行了综合的市场调研和对比分析,主要关注了 ClickHouse、StarRocks 和 Doris 这三种主流 MPP 数据库引擎。
StarRocks 架构简单易运维,具备兼容 MySQL 协议、标准化 SQL,支持流式及批式数据等优点,同时 StarRocks 在查询性能上预期更加优秀,并且支持存算分离架构,整体朝着云原生实时湖仓的方向发展,与我们的规划非常吻合,因此最终选择了StarRocks 与线上的 Trino 环境进行了性能比对测试。
在调研中我们发现,阿里云EMR 提供了半托管的 StarRocks 服务。在阿里云EMR 上可以非常快速的快速创建和销毁集群,灵活调整集群规模,⽀持阿⾥云 ECS 和 ACK 部署等部署形式。最后我们选择了EMR on ECS 模式,主要因为可以帮我们解决以下问题:
-
即开即用的 StarRocks 服务,可以分钟级完成集群的搭建,便捷的监控运维能力,方便快速上手和降低运维管理成本。大大减少我们日常运维和管理的需求;
-
EMR 提供了灵活的弹性能力,结合业务高低峰不同,可以灵活配置弹性计算资源,保障业务稳定性;
因此最终选择了EMR StarRocks 与线上的 Trino 环境进行了性能比对测试。
测试情况如下:
StarRocks 2.3.2 64C 512G (独享测试集群)
Trino 358 704C 5632G (某业务线上集群)
-
性能上,在资源相差很多、且没有打开 Data Cache 的情况下,StarRocks 的性能还明显优于 Trino, 平均效率是原有的 2-3 倍 。StarRocks的 JAVA UDF/UDAF 在功能以及性能上也能达到预期。
-
功能上,StarRocks 具备优秀的分析能力,对各类复杂查询的支持得很好;并且在实时处理上的能力也非常优秀,可以很好地对接 Flink 实时数据流,并且支持部分列更新。
-
架构上,StarRocks 支持存储计算分离的架构,拥有非常好的可扩展性,并且易于维护,可以进一步帮助我们降低成本。
-
迁移成本上,StarRocks 兼容 Trino语法,更加易于迁移。我们将 Trino 的历史 SQL 进行了回放, SQL 语法兼容程度到达了 90%。
因此我们决定最终选择 StarRocks 作为新的统一分析引擎。
#03
实时离线双箭合璧,StarRocks 在芒果TV 的应用现状
引入 StarRocks 之后,芒果TV 的整体数据架构如下:
-
实时流:业务数据通过 Flink 加工后实时同步给 StarRocks,根据不同的业务场景选择 StarRocks 的明细模型(日志数据)、聚合模型(统计数据)、主键模型(业务数据)
-
离线流:Hive 上的离线数据保留在对象存储上,通过 StarRocks 来进行联邦查询。
通过 StarRocks,我们成功实现了离线和实时分析的统一体验,为各类指标、报表和大屏提供了高效的数据服务。
接下来展开讲解各应用场景的具体情况。
01 离线数据湖分析
芒果TV 大数据部为内部的所有业务部门提供了多种数据分析工具,包含固定报表、自定义看板等,同时也为分析师提供的一站式 SQL 分析平台。业务分析师可能会不定期地向平台手动发送查询,这些查询具有以下特点:
-
通常非常复杂。这些查询可能涉及到十几个表的关联查询以及深层次的嵌套查询等复杂操作。在这种情况下,Trino 性能表现不佳,导致平台无法为业务分析师提供交互式的查询分析体验。这也是我们迁移到 StarRocks 的主要原因。
-
基于 Trino 语法。分析师的查询习惯很难改变,并且一些历史查询可能非常复杂难以修改,这都对语法迁移造成了不小的挑战。
得益于 StarRocks 优秀的数据湖查询性能和 Trino 语法兼容能力,让我们平滑地对架构进行了升级。 目前, 我们已经成功将 StarRocks 应用于 经营分析类产品、内容分析类产品、用户分群、自定义 看板 等场景上线, 并 替换 了 所有 原本由 Trino 执行的 查询。
我们通过创建 Hive Catalog 的方式对 Hive 表进行访问,StarRocks 除了原生的向量化引擎、CBO 等功能,还针对湖上查询还做了很多性能优化,包括延迟物化、I/O 合并等。上线后, 我们发现 Hive Catalog 查询性能相较于 Trino 提升了 3.5 倍 。未来,我们也会尝试开启 Data Cache 来进一步提速查询,避免重复从远端拉取数据产生的 I/O 开销。目前已完成迁移的查询性能对比情况:
线上开启了 Trino 方言,仅需要通过一行命令set sql_dialect=‘trino’;即可将语法切换到 Trino 模式,大大降低了迁移的成本。
02 实时数据分析
目前 3.0 架构下已完成基于 Flink SQL+StarRocks 实时分析数仓搭建,基于已经搭建完毕的 FlinkSQL 的数仓分层体系,且由 StarRocks2.5X 版本升级到 StarRocks3.0X 并已大规模投入在生产环境中。
-
实时业务指标计算
除了常见的Unique Visitors(uv)、Video View(vv)、时长等实时指标以外,我们经常面临一些计算逻辑较为复杂、数据量庞大的实时计算需求,如会员实时分摊、节目热度榜单计算等。这类需求在之前实时计算框架中难以得到很好的支撑。通过 StarRocks 的聚合表模型、物化视图等特性,我们成功完成了对这类实时计算业务的支撑。
- 聚合模型:方便快捷的基础指标累计
StarRocks 的聚合模型支持定义聚合计算的指标列,在数据导入时自动完成指标聚合计算。因此在一些基础指标的计算场景,数据由 FlinkSQL 完成清洗后,会直接写入 StarRocks 聚合表中,完成用户观看时长、点击量、订单统计等基础指标的累计,减少后续计算的数据量 。 建表代码如下:
- 物化视图 :复杂计算场景的查询提速
StarRocks 的物化视图不仅支持内表和内表关联,也支持内表和外表关联。比如数据在 MySQL、Hudi、Hive 等外部引擎中,都可以通过 StarRocks 物化视图来查询加速,并设定定期刷新规则,从而避免手动调度关联任务。通过多级物化视图的建模方式,拆分复杂计算过程、实现不同数据源的联邦查询、实现结果复用。降低了计算复杂度的同时也提升了处理性能。并且物化视图还具备透明查询改写的能力,可以最大程度加速查询。
因此在一些复杂的计算场景下,我们选择使用物化视图来对数据进行加工处理。由于其易于创建和维护的特点,让查询加速更加的按需、简便。
- Bitmap 函数:精确去重查的加速利器
Bitmap 函数在我们的离线数仓中已经有了一段使用历史,通常是通过 Spark 自定义函数实现的。
所以 StarRocks 上线后,StarRocks 丰富的原生 Bitmap 函数就立即应用在了日常的查询分析中。在查询实时指标时,我们常常需要跟踪每 5 分钟的累积去重指标。传统的计算方法在数据量较大的情况下基本很难查出结果。利用 Bitmap 函数可以很好地解决这一问题,先按 5 分钟聚合,计算出每 5 分钟当前的用户合并结果。再通过窗口函数,计算截止到每个 5 分钟点的累积指标。
通过 Bitmap 函数,我们能够在资源可控的情况下实现精确去重的实时计算。
-
用户留存分析模型
StarRocks 在芒果TV 内部还承担了各种复杂场景的计算工作。以用户行为分析系统为例,这个系统基于用户海量行为数据,为用户提供了一站式、多模型、多维度的自助查询平台,包括事件分析、留存分析、漏斗分析、归因分析、路径分析、分布分析、用户生命周期分析等多种分析模型。
由于用户行为数据整体数据规模大,查询周期长,分析模型计算复杂,而集群资源有限等原因,查询性能一直是制约该系统大规模使用的瓶颈。
借助 StarRocks 强大的多维分析查询性能,我们取得了显著的性能提升。例如,事件分析和分布分析整体性能 提高了 5 倍以上 。而通过 StarRocks 提供的 JAVA UDF 功能实现的留存分析、漏斗分析、归因分析、路径分析、用户生命周期等高级分析模型也带来了 2-5 倍的性能提升 。
留存分析是一种衡量用户健康度、参与度的方法,它考察了完成起始行为的用户中,有多少人会进行后续行为。虽然社区提供了基础的留存和漏斗函数,但是比较通用。为了满足复杂的留存计算需求,我们基于 StarRocks 的 JAVA UDF 框架开发了 2 个自定义的聚合函数。
retention() 函数:汇总生成单个用户的活跃列表。函数通过传入的起始事件、回访事件状态,周期,计算范围等参数计算出单个用户的活跃周期列表。同时,为了减少计算过程中内存使用,减少 shuffle 的 I/O 以及减少函数 output 的数据量大小。计算过程中,我们用二进制的方式保存用户的活跃周期列表。如下:1 个 BYTE 有 8 位可以保存 8 个周期起始行为状态,4 个周期回访行为的状态。
基于以上设计,计算 30 天的 30 日留存,单个用户只需要 30/8+(30+30)/4 一共 19 个 BYTE 的大小,不到 3 个长整形的内存占用。
retention_sum() 函数:基于 retention() 的输出计算最终的留存结果,通过 type 参数控制计算留存或者连续流失。由于函数的输入为 retention() 函数已经计算好的各个用户的活跃列表。因此 retention_sum() 函数中,各个用户的留存数据可以在 update 阶段直接计算,这样 merge 和 finalize 阶段只需要进行简单的数据汇总,从而带来性能的提升。
与留存分析相同,基于 StarRocks 的 JAVA UDF 框架,我们同时还开发了漏斗分析、路径分析、归因分析、用户生命周期等自定义聚合函数用以满足不同场景的分析需求。
#04
基于 StarRocks 云原生湖仓一体的展望
目前,StarRocks 已经在芒果TV 内得到了广泛应用,我们也开始在离线场景率先尝试了存算分离(shared_data)模式。我们将数据存储于对象存储中,并通过在本地开启了 SSD 的磁盘缓存,获得了跟存算一体类似的性能体验。
此外,我们也做了一些权限同步工作,以确保 StarRocks 可以使用 Ranger 中 Trino 的权限认证。目前了解到社区也已经提供了官方的 Ranger 对接功能,并且可以直接复用已经配置的 Hive Service,这非常方便,我们计划尝试使用这一功能。
在湖仓一体分析上, StarRocks 作为性能优秀 MPP 数据库,同时也是一个高性能高扩展的分析和计算引擎,具有处理海量数据的能力。并且支持各种外部表 Catalog (Hive、MySQL、ElasticSearch 等 ),可以与湖仓数据进行联动,能通过外部 Catalog 进行湖仓数据转换。这使得 StarRocks 既可以作为数据存储的底座,又能作为访问数据湖仓桥梁。在后续新业务中我们也非常期待使用 StarRocks 来实现湖仓一体化。