作者:蒋晓伟(量仔)

整理:伍翀(云邪)

本文整理自 Flink Forward China 2018 大会上蒋晓伟老师的主题演讲《Apache Flink – Redefine Computation》

阿里巴巴有着世界上最大的电商平台,我们的平台有着海量的数据,总数据量上有数以EB,并且这个数据量每天在以数以PB的数量级在快速地增长,我们的平台每天产生数万亿条消息,在最高峰需要每秒钟处理数十亿条事件。今天我就给大家分享一下,在阿里巴巴我们是如何利用Flink来处理这海量的数据?我们先从流计算开始讲起。

流计算引擎

我先给大家举两个比较简单的场景,第一个可能是大家比较熟悉的双11的实时成交媒体大屏。在第一张图的大屏上大家可以看到,它会显示到目前为止双11的总成交量。大屏看起来虽然简单,但是却对计算提出了几个比较有意思的需求,第一个我们处理的是交易的数据,因为我们希望每笔交易都能够被计算在内,而且只被计算一次,我们需要精准的、只有一次的、正好一次的语义。同时当一笔交易发生之后,我们希望这笔交易能够尽快地在媒体的大屏当中显示出来。大家都知道每年双11媒体会看今年花了多少时间。如果我们的处理业务有比较大的延迟,可能会跟媒体造成不必要的误解。另外在双11当天会有很大的成交,所以我们的系统必须能够处理大量的数据。同时我们的系统希望是高可用的,能够保持我们业务平稳的运行。

我先给大家举两个比较简单的场景,第一个可能是大家比较熟悉的双11的实时成交媒体大屏。在第一张图的大屏上大家可以看到,它会显示到目前为止双11的总成交量。大屏看起来虽然简单,但是却对计算提出了几个比较有意思的需求,第一个我们处理的是交易的数据,因为我们希望每笔交易都能够被计算在内,而且只被计算一次,我们需要精准的、只有一次的、正好一次的语义。同时当一笔交易发生之后,我们希望这笔交易能够尽快地在媒体的大屏当中显示出来。大家都知道每年双11媒体会看今年花了多少时间。如果我们的处理业务有比较大的延迟,可能会跟媒体造成不必要的误解。另外在双11当天会有很大的成交,所以我们的系统必须能够处理大量的数据。同时我们的系统希望是高可用的,能够保持我们业务平稳的运行。

流计算不仅是能够用在实时分析的场景,同时他在实时的机器学习场景也是非常得重要。这是我给大家看的第二个例子,在这个例子里是搜索和推荐的实时个性化场景。在搜索和推荐当中,我们经常需要利用用户或者产品的实时特征去优化我们搜索和推荐的结果。比如说当一个商品快要售罄的时候,我们希望他不要在搜索结果中出现,以避免当用户点击一个商品进去之后却不能购买的不佳体验。

为了做到这一点,我们需要打通从数据产生、数据采集、数据处理,一直到在线上展示结果的一整条链路,并且能够让这条链路在秒级甚至亚秒级生效。这对实时计算的时效性提出了非常高的需求。同时在双11这一天,我们的流量会非常得大,这些海量的流数据对我们实时计算吞吐要求又上了一个台阶。所以我们可以看到,为了把阿里巴巴这么复杂的计算给支持好,我们需要一个强大的流计算引擎。

在三年前,我们经过仔细的评估和比较业界的各种实时计算引擎,我们决定采用Flink作为阿里巴巴的实时计算引擎。那么什么是 Flink?什么是流计算呢?大家对批处理可能比较熟悉。流计算跟批处理相比有一些显著得不一样,第一个不一样是流计算能够在不完整的数据上进行计算,当数据一边到来的时候,它就能一边计算。这也是流计算名字的由来,不需要等到数据完全到来才进行计算。正是因为这种计算模式使得流计算能够提供非常低的延迟。

为了更好地理解这个事情,我们来看一下如上图左边是一个最简单的流计算 WordCount 程序。从程序上可以看到,跟批处理的 WordCount 看起来是一样的,但是他的行为跟批处理却有点稍微不一样,它会从一个消息队列中不断地拿一条条消息,当消息到来的时候,会一条条地去处理每条消息。每条消息是一个句子,它会实时地打印出每个词到目前为止已经出现过多少次。和批处理不一样的是这个作业会一直在运行,每当新句子到来的时候,它都会不断的更新自己的计算结果。这样一个流计算的程序经过翻译会变成右边的 Flink DAG。每个 DAG 包括一个或多个算子,每个算子可能有多个并发度,在这个作业当中最重要的算子有两个,第一个是 Split 算子,他负责把每个句子拆成一个个单词,这些单词经过 Shuffle 到达了第二个算子,第二个算子会负责对每个单词出现的次数进行计数。所以大家可以看到 DAG 和 MapReduce 其实非常相像,实际上它就是一个流式的 MapReduce 。

但是这和传统的 MapReduce 却有一个非常不一样的地方。其中第二个算子是一个计数的算子,为了能够做到计数,他需要知道一个词到目前为止已经出现了多少次,才能在这之上加1得到新的出现次数。为了做到这一点,在 Flink 之前,业务方往往需要把这样的状态存在某个外部的存储中。这明显的增加了开发的难度,用户需要去维护一个外部的存储。更加糟糕的是,在一个分布式的系统里,出错是不可避免的,当出现错误的时候,如果状态是在一个外部的存储当中,当你试图恢复的时候,非常容易把一个词少数一次或者把一些词多数一遍!很难做到精准的语义。Flink 最大的一个创新是在流计算当中引入了状态的概念,Flink 作为一个流计算引擎,真正把状态给管理起来了。与之同样重要的是引入了 Chandy-Lamport Checkpoint 机制。这种 Checkpoint 的机制能够让一个快速运行的流计算作业,去拍一张全局一致的快照。所以当出现错误的时候,我们能够把系统回滚到上一次成功的快照,而保证所有状态的一致性,真正实现精准的正好一次的语义,这是 Flink 在流计算上的一个重大的突破。

我们来总结一下 Flink 的一些优势,Flink 作为一个流计算引擎,它能够提供很低的延迟,并与此同时提供很高的吞吐。他提供了精准的正好一次的语义,并且它有很好的容错性,能够从错误中快速地恢复。正是因为 Flink 在架构上的这些优点,我们决定使用 Flink 作为阿里巴巴流计算的引擎。

但是阿里巴巴的业务体量非常之庞大,为了把这么大的业务体量给支持好,我们在 Flink 之上也需要做一系列的改进,所以阿里巴巴维护了一个内部版本的 Flink ,它的名字叫做 Blink。我们在过去的几年内在 Flink 上做了一系列的改进,接下来我会介绍下我们做的改进。我们先快速介绍一下 Flink 的技术栈。

在最底层 Flink 可以跑在各种资源管理系统上,它可以以Standalone 的模式去跑,可以跑在 Yarn 的集群里,也可以跑在云的环境里,也可以跑在像 Kubernetes 这样的系统之上,在此之上我们有 Flink 的 Runtime,他负责运行一个分布式的 DAG。在 Runtime 之上我们有各种API层,有用来做批处理的 DataSet API,有用来做流处理的 DataStream API。

在每一层我们都做了很多的改进,比如说在 Runtime 层,我们和社区一起实现了一个分布式的架构,这种分布式架构去掉了 Flink 在集群上的一个性能的单点,同时让各个作业之间有更好的隔离,能够让 Flink 作业和集群去向资源管理系统动态地申请和释放资源。并且引入了一种抽象,使得 Flink 能够更容易的去对接新的资源管理系统。我们和社区一起基于 credit 重新实现了流控和反压机制,不但解决了原来可能的一个死锁问题,而且极大地提升了性能。在容错性方面,我们也做了一系列的改进,让 Flink 作业在出现错误的情况下,能够更高效地恢复。所有这一切的努力,极大地提升了 Flink 能够运行的规模、稳定性以及性能。但是在阿里巴巴的挑战不仅限于数据量和计算量,我们有着非常复杂的业务,这些业务上我们有着很多很多的开发者,如何提供提高开发者的效率,降低流计算的门槛,对推广流计算同样重要。所以在两年多前,我们和社区一起努力,大力推进了流式的 Flink SQL。我们和社区一起定义了流式 SQL 的语义,一起完善了流式 SQL 的大部分功能。

经过这一系列的努力,我们大大降低了流计算的使用门槛,使得流计算能够在阿里巴巴内部大规模的使用起来。我给大家看一下现在流计算在阿里巴巴的使用状况,我们在线上已经运行着上万个流计算的作业,我们的集群规模达到了上万台。在今年的双11,我们的集群更是创下了每秒钟处理 17 亿条消息的记录,并且这所有的一切都是在秒级的延迟下完成的。所以大家可以看到,Flink 作为一个流计算引擎,在阿里巴巴取得了极大的成功。那么是不是有可能用同样的技术,我们也能处理其他的一些计算场景,我再给大家看一个比较简单但是很有意思的例子。

统一的引擎

这个例子是搜索的索引创建流程。在一个搜索引擎,它需要先创建索引才能够对外服务,一般来说我们有个索引的全量的创建流程,每天会跑一次。这种全量的索引创建一般是通过批处理的技术来进行的。但如果我们只有这样一个批处理的流程,就会有全量索引创建完之后的任何更新必须要到下一天才能生效的问题。为了能够提供索引的实时性,我们引入了一个增量的索引更新的流程,这个一般是用流计算来完成的,在阿里巴巴我们使用 Flink 来做这件事情。

这种架构就是大家经常所说的 Lambda 架构。但是这种架构有两个问题,第一个问题是我们需要维护两套流程,一套增量流程,一套全量流程,这无疑让我们的开发难度、维护难度都增加了。同时这两套流程之间必须要有一定的自洽性,它们必须要保证一致。当业务变得越来越复杂的时候,这种一致性本身也会成为一个挑战,那么是不是有可能用一套技术把这两种流程都能够很好的支持起来?在回答这个问题之前,我们先看一下所有可能的计算的场景。

在这张图上大家看到的是一个简单的计算场景的分类,在最左边我们看到的是流计算的场景,在流计算场景它需要在不完整的数据上进行计算,并且能够提供非常低的延迟。在这张图的最右边是我们批处理的场景,在批处理场景我们需要完整的数据,但是它一般能有比较高的吞吐,而且能够允许你随便去改你的查询,在查询上能有很多的灵活性。但这个世界不是非白即黑,不是非流即批,其实在流计算和批处理之间有很多其他的计算场景,在延迟性、吞吐和查询的灵活性方面做了不同的权衡。

在几年之前,Flink 的创始人有一个很重要的洞察,他们意识到流计算作为一种技术,不仅能把流计算的场景给支持好,而且流计算作为一种技术,能够是所有不同计算的一个共同的抽象。这句话是什么意思呢?

如上图的上半部分大家看到的是一个流计算的作业,一般来说它的输入是一个无限的数据集。这张图的下方是批处理的一个作业,它所处理的数据量一般是有限的,所以批处理作业当把这些有限的数据处理完之后,它会退出,不是一直在运行的,流计算是一直在运行的一个作业。你可以想象一下,如果一个作业能够处理无限的数据输入,他一定也能处理有限的数据输入。所以从这个角度来说,批处理是流计算在输入是有限集情况下的一个特例。但是这仅仅是说明了从功能上流计算来用流计算的技术来处理批处理的场景是可行的。在性能上如果我们这样做是不是能够做到同样高效?

在几年前有研究人员做了一些这方面的调研,他比较了用 Flink 这种流式的方式来做批处理和传统的批处理方式在性能上的差异,他发现用 Flink 这种流式的方式来做批处理,在性能上有一定优势,这个优势的主要原因是流式的 Shuffle 能够更好地共同使用CPU、IO、和网络的带宽,通过更好地使用资源给我们带来了更高的性能。所以我们可以看到,用流的技术来做批处理,不仅在功能上是可行的,而且在性能上可能有一定的优势。 所以大概在一年半之前,我们大力投入了这个项目。我们希望以流计算技术为基础,去构建一个流批一体化的处理引擎。

为了做到这一点,我们需要对整个软件栈做一些架构上的调整。大家可以看到左边是现在 Flink 的架构图。它有几个明显的问题,第一个问题是在API层流和批已经割裂了,我们用 DataSet API 去处理批计算,用 DataStream API 去处理流计算。前面在流计算部分我们已经提到,引入流式 SQL 去降低流计算的门槛,但是流式 SQL 的好处不止于此。流式 SQL 还有一个同样重要或者更重要的作用,是他能够用一套语言把流计算和批处理的语义真正地统一起来。但我们流批一体化引擎的目标不仅仅是在语言层,我们希望整个实现也都是流和批一体化的,所以我们对 Flink 的现有架构做了一系列调整,右图即为我们现在的新架构。在新的架构图上我们引入了一个 Query Processor 层。在这一层我们定义了数据的格式,类型系统,以及各种各样的内置算子,使得上层的各种API能够共享这些代码。

接下来稍微详细地展开说一下我们所做的工作,在 Runtime 层,我们首先扩展了算子的定义,使得 Flink 的流式算子能够去更好地控制输入的处理顺序。比如说一个 HashJoin 的算子,往往需要把 build 端的所有消息全部处理完,才开始处理 probe 端的数据。这种对输入顺序的控制在以前是不可能的,但是我们通过扩展 Flink 算子的框架,使得这成为了可能。我们引入了插件化的调度机制,使得上层的引擎能够更好地控制算子的调度顺序。比如刚才所说的 HashJoin 的这种场景,我们希望把 build 端的算子全部执行完,再开始调度 probe 端的算子,这样能够更高效地利用集群的资源。同时我们也引入了一些性能的优化,比如说更加复杂的 chaining,使得能够提高我们 Runtime 的执行效率。在 Query 执行层,我们引入了二进制的数据格式,我们所有的表达式都能够直接在二进制的数据格式上进行操作,省去了序列化和反序列的开销,给我们带来了很大的性能提升。我们利用 CodeGen 的技术去动态地生成所有的算子,省掉了不必要的额外开销。我们的算子会动态地申请和使用资源,能够更好地利用资源,提升我们的效率,更加重要的是我们的算子对资源有着比较好的把控,你永远不用担心 OutOfMemory 的问题。我们的优化器是一个完全基于成本的优化器,我们利用成本去决定Join 的种类,Join 的顺序,聚合的策略等等。我们的优化器有着非常丰富的规则,同时为了让我们优化器更好的去做决定,我们支持了比较丰富的统计信息,所有的这一切不但在功能上极大地完善了流计算 SQL 和批处理 SQL 的功能,而且在性能上我们让流计算 SQL 和批处理 SQL 也都得到了极大的提升。

这里再分享一些性能上的数据。大家所看到的这个是 TBC-DS 的 benchmark,柱状图的高度代表了运行的总时间,所以高度越低说明性能越好。大家可以看到,阿里巴巴内部版的 Flink 在TPC-DS benchmark 相比 Spark 有着非常明显的性能优势。而且更加有意思的是,这种性能优势随着数据量的增加而变得越来越大。在实际的场景这种优势已经超过 Spark 三倍。一个很重要的原因是 Flink/Blink 它不依赖于使用大量的内存去 cache 数据来达到它的性能。

在流计算性能上我们也取得了类似的提升。我们线上的很多典型作业,它的性能是原来的3到5倍,并且使用新引擎的所有作业平均来说在性能上都提升了一倍以上。所以这一切可以看到新的引擎不但在功能上是一个提升,在性能上也是一个非常大的突破。新的引擎目前已经在阿里巴巴内部大规模上线,并且经历了今年双11和双12的考验。值得一提的是,在搜索的算法业务平台,新的引擎被同时用来进行流计算和批处理,它被用来实现了流批一体化的样本生成和特征抽取这些流程,我们能够处理的特征数达到了数千亿,而且每秒钟处理数亿条消息。在批处理上,我们单个作业处理的数据量已经超过400T,并且为了节省资源,我们的批处理作业是和搜索的在线引擎运行在同样的机器上。所以大家可以看到流批一体化已经在阿里巴巴取得了极大的成功,我们希望这种成功,以及阿里巴巴内部的经验都能够带回给社区。

我们明白除了一个强大的引擎之外,易用性也是非常重要。所以我们在生态上也做了一系列的工作。国内外很多公司都已经用 Hive 在做自己的批处理。在这种情况,如果安装了 Flink,你就可以直接用 Flink 去查询 Hive 的数据,真正能够做到在 Hive 引擎和 Flink 引擎之间的自由切换。同时我们在 Zeppelin 上也做了一系列的工作,在 Zeppelin 上增加了支持 Flink 的 Table API 和 Flink SQL 的功能,能让大家有更好的开发体验。

未来的引擎

接下来我给大家分享一下我们在其他的一些场景所做的一些探索。第一个场景是我们的一个机器学习的场景,左边大家看到的是一个标准的 PS (ParameterServer) 的架构图。 在标准的 PS 上,我们会有 Parameter Server —— 专门的参数服务器去管理这些参数,这种架构能够用 Flink 比较高效地实现。实现里有两个很有意思的地方,第一个有意思的地方是我们不需要专门的参数服务器,参数能够用 Flink 的状态管理起来,这是 Flink 流式计算引入状态创新的一个非常有意思的应用。第二个比较有意思的地方是我们对参数的更新是通过 Flink 引入的另外一个功能,叫做增量迭代的功能去进行的。通过这种实现方式,在生产上也取得了很大的成功。在一些算法上我们可以看到这种实现能处理模型的规模是其他类似计算引擎能够处理的模型规模的十倍以上。

同样的技术能够被应用在图计算上,我们能够用 Flink 的状态把一个图给管理起来,并且利用增量迭代去去更新图的每一步。在这方面我们也正在做一系列的探索,同时我们也正在探索如何去改进 Flink 对迭代的支持,我们会在流式的增量迭代中引入迭代的语义,同时我们将会提供给用户更加精细的控制,能够控制不同任务之间迭代进行的 pace。对同步有着更好、更细力度地掌控。所以大家所看到这一切都是用流计算技术去支持不同的大数据场景非常好的例子。

但是 Flink 的潜力不止于这些,我们来看一个不是大数据的计算场景。这个是一个应用和微服务的场景。对于一个复杂的业务应用,为了让我们的系统代码变得更加容易维护,我们往往需要把业务拆成多个微服务,每个微服务负责处理一部分事情。比如说在一个典型的电商业务,往往需要有着这样几个微服务,需要有一个微服务去管理库存,需要有个微服务去管理支付,需要另外一个微服务去管理物流等等。而实际上这个业务的过程往往就是把这些微服务串起来,相互调用的一个过程。

这些微服务之间的调用,在传统的做法里面是通过同步的调用去实现的。同步的调用有一个问题就是他需要用一个线程去等待,如果要支持很高的吞吐,系统里就会需要很多的线程。但当线程多了之后,系统的调度开销就会增加,从而影响实时性,影响对用户的体验,所以我们不能同时有太多的线程。这大大限制了吞吐,我们可以看到很多时候线上机器的使用率是非常低的。为了解决这个问题,大家不难想象一个比较直接了当的办法,就是把同步的调用变成异步,所以我们就这么去做。

但是仅仅把同步的调用变成异步是不行的,因为当你有着大量的异步调用之后,很有可能把异步的微服务给压垮。为了避免这种情况,保证系统的平稳运行,我们需要引入一种流控和反压的机制,保证在任何一个时刻不会有太多的异步调用在进行。同时我们线上业务的流量可能会随着时间在变化,当这些流量变化的时候,为了能够处理这些流量,我们需要扩容。而当这些流量消退的时候,我们又需要缩容,所以我们需要自动扩缩容的能力。讲到这儿大家可能已经看到,这里所需要处理的一些问题和流计算的技术非常的相似,其中有很多可以借鉴的地方。

但这种相似性不止于此,我们可以看到一个传统的微服务,它的状态往往是存储在外部的数据库当中。但是我们刚才说过,Flink 作为一个流计算引擎,它最大的一个创新是把状态引入了计算之中。我们是不是能够用 Flink 的状态,把这些原来存在数据库当中的数据给管理起来,而不再依赖外部的数据库呢?这是一个非常有意思的可能性。

Flink 里面还有另外一个很重要的特征是提供了精准的正好一次的语义。而在传统的数据库里,它提供了一个很重要的特征是事务,保证了ACID。但是在 Flink 的计算引擎当中,我们可以利用这种正好一次的语义去模拟传统事务的原子性,让我们的系统变得更加简单,用一个流计算的作业,就可以把你的业务逻辑和数据库的逻辑都包含起来,这是一个非常有意思的方向,具体可以看下 Stephan 在 Flink Forward China 2018 上做的主题演讲《Stream Processing takes on Everything》。

总结

最后总结一下,Flink 作为一个流计算的技术,首先很好地支持了流计算的场景。现在它能够用一个统一的技术把批处理也很好地支持起来。在机器学习、图计算等其他的大数据领域,它也有着自己的特色和优点。同时我们可以看到,Flink 除了在大数据之外,也在应用和微服务领域崭露头角,让我们看到了非常大的希望,一种全新的思路。在座的各位,你们都是 Flink 社区非常重要的一员,让我们一起努力,让 Flink 变得更强大,让 Flink 用流计算的技术去重新定义计算,谢谢大家。


欢迎转载,敬请在正文中标注并保留文章来源、原文链接、作者/译者等信息。

浙公网安备 33010802010061号 浙ICP备19028187号