IoT物联网观察之美图架构师详解从0-1构建大数据平台

来源:华体会全站官网登录入口 发布时间:2024-01-26 20:10:46 阅读: 1

  如今大数据在各行业的应用愈来愈普遍:运营基于数据关注运营效果,产品基于数据分析关注转化率情况,开发基于数据衡量系统优化效果等。

  美图公司有美拍、美图秀秀、美颜相机等十几个 app,每个 app 都会基于数据做个性化推荐、搜索、报表分析、反作弊、广告等,整体对数据的业务需求比较多、应用也比较广泛。

  因此美图数据技术团队的业务背景大多数表现在:业务线多以及应用较为广泛。这也是促使我们搭建数据平台的一个最主要的原因,由业务驱动。

  如图所示是我们数据平台的整体架构。在数据收集这部分,我们构建一套采集服务端日志系统 Arachnia,支持各 app 集成的客户端 SDK,负责收集 app 客户端数据;同时也有基于 DataX 实现的数据集成(导入导出);Mor 爬虫平台支持可配置的爬取公网数据的任务开发。

  数据存储层主要是依据业务特点来选不一样的存储方案,目前主要有用到 HDFS、MongoDB、Hbase、ES 等。在数据计算部分,当前离线计算其实是基于 Hive&MR、实时流计算是 Storm 、 Flink 以及还有另外一个自研的 bitmap 系统 Naix。

  在数据开发这块我们构建了一套数据工坊、数据总线分发、任务调度等平台。数据可视化与应用部分主要是基于客户的真实需求构建一系列数据应用平台,包括:A/B 实验平台、渠道推广跟踪平台、数据可视化平台、用户画像等等。

  右侧展示的是一些各组件都可能依赖的基础服务,包括地理位置、元数据管理、唯一设备标识等。

  如下图所示是基本的数据架构流图,典型的 lamda 架构,从左端数据源收集开始,Arachnia、AppSDK 分别将服务端、客户端数据上报到代理服务 collector,通过解析数据协议,把数据写到 kafka,然后实时流会经过一层数据分发,最终业务消费 kafka 数据来进行实时计算。

  离线会由 ETL 服务负责从 Kafka dump 数据到 HDFS,然后异构数据源(比如 MySQL、Hbase 等)主要基于 DataX 以及 Sqoop 进行数据的导入导出,最终通过 hive、kylin、spark 等计算把数据写入到各类的存储层,最后通过统一的对外 API 对接业务系统和我们自己的可视化平台等。

  除了那些第三方提供的基本指标其他分析、推荐等都没办法实现。所以有从 0 到 1 的过程,让我们自己有数据可以用;

  ,让更多的人参与数据开发、使用到数据,而不仅仅局限于数据研发人员使用,所以就涉及到把数据、计算存储能力开放给各个业务线,而不是握在自己手上;

  在当数据开放了以后,业务方会要求数据任务能否跑得更快,能否秒出,能否更实时;另外一方面,为满足业务需求集群的规模慢慢的变大,因此会开始考虑满足业务的同时,怎么来实现更节省资源。

  美图现在是处于第二与第三阶段的过渡期,在逐渐完备数据开放的同时,也逐步提升查询分析效率,以及开始考虑怎么进行优化成本。接下来会重点介绍 0 到 1 以及数据开放这两个阶段我们平台的实践以及优化思路。

  从 0 到 1 解决从数据采集到最终能够正常的使用数据。如图 4 所示是数据收集的演进过程,从刚开始使用类似 umeng、flurry 这类的免费第三方平台,到后面快速使用 rsync 同步日志到一台服务器上存储、计算,再到后面快速开发了一个简单的python脚本支持业务服务器上报日志,最终我们开发了服务端日志采集系统 Arachnia 以及客户端 AppSDK。

  数据采集是数据的源头,在整个数据链路中是相对重要的环节,需要更多关注:数据是不是完整、数据是不是支持实时上报、数据埋点是否规范准确、以及维护管理成本。因此我们的日志采集系统要满足以下需求:

  基于以上需求我们没使用 flume、scribe、fluentd,最终选择自身开发一套采集系统 Arachnia。

  上图是 Arachnia 的简易架构图,它通过系统大脑进行集中式管理。puppet 模块主要作为单个 IDC 内统一汇总 Agent 的 metrics,中转转发的 metrics 或者配置热更命令。采集器 Agent 主要是运维平台负责安装、启动后从 brain 拉取到配置,并开始采集上报数据到 collector。

  接着看 Arachnia 的实践优化,首先是 at least once 的可靠性保证。不少的系统都是采用把上报失败的数据通过 WAL 的方式记录下来,重试再上报,以免上报失败丢失。我们的实践是去掉 WAL,增加了 coordinator 来统一的分发管理 tx 状态。

  开始采集前会从 coordinator 发出 txid,source 接收到信号后开始采集,并交由 sink 发送数据,发送后会ack tx,告诉 coordinator 已经 commit。coordinator 会进行校验确认,然后再发送 commit 的信号给 source、sink 更新状态,最终 tx 完 source 会更新采集进度到持久层(默认是本地 file)。该方式如果在前面 3 步有问题,则数据没有发送成功,不会重复执行;如果后面 4 个步骤失败,则数据会重复,该 tx 会被重放。

  基于上文的 at least once 可靠性保证,有些业务方是需要唯一性的,我们这边支持为每条日志生成唯一 ID 标识。另外一个数据采集系统的主要实践是:唯一定位一个文件及给每条日志做唯一的 MsgID,方便业务方可以基于 MsgID 在发生日志重复时能在后面做清洗。

  数据上报之后由 collector 负责解析协议推送数据到 Kafka,那么 Kafka 如何落地到 HDFS 呢? 首先看美图的诉求:

  涉及到较多业务线因此有多种数据格式,所以要支持多种数据格式的序列化,包括 json、avro、特殊分隔符等;

  支持因为机器故障、服务问题等导致的数据落地失败重跑,而且需要能有比较快速的重跑能力,因为一旦这块故障,会影响到后续各个业务线的数据使用;

  支持可配置的 HDFS 分区策略,能支持各个业务线相对灵活的、不一样的分区配置;

  支持一些特殊的业务逻辑处理,包括:数据校验、过期过滤、测试数据过滤、注入等;

  基于上述诉求痛点,美图从 Kafka 落地到 HDFS 的数据服务实现方式如图 7 所示。

  落地成功后会把这次处理的 meta 信息(包括 topic、partition、开始的 offset、结束的offset)存储到 MySQL。下次再处理的时候,会从上次处理的结束的 offset 开始读取消息,开始新一批的数据消费落地。

  实现了基本功能后难免会遇到一些问题,比如不同的业务 topic 的数据量级是不一样的,这样会导致一次任务需要等待 partition 数据量最多以及处理时间最长的 mapper 结束,才能结束整个任务。那我们如何来解决这个问题呢?系统设计中有个不成文原则是:分久必合、合久必分,针对数据倾斜的问题我们采用了类似的思路。

  首先对数据量级较小的 partition 合并到一个 inputsplit,达到一个 mapper 可以处理多个业务的 partition 数据,最终落地写多份文件。

  另外对数据量级较大的 partition 支持分段拆分,平分到多个 mapper 处理同一个 partition,这样就实现了更均衡的 mapper 处理,能更好地应对业务量级的突增。

  除了数据倾斜的问题,还出现各种原因导致数据 dump 到 HDFS 失败的情况,比如因为 kafka 磁盘问题、hadoop 集群节点宕机、网络故障、外部访问权限等导致该 ETL 程序出现异常,最终可能导致因为未 close HDFS 文件导致文件损坏等,需要重跑数据。那我们的数据时间分区基本都是以天为单位,用原来的方式可能会导致一个天粒度的文件损坏,解析无法读取。

  我们采用了分两阶段处理的方式:mapper 1 先把数据写到一个临时目录,mapper 2 把 Hdfs 的临时目录的数据 append 到目标文件。这样当 mapper1 失败的时候可以直接重跑这个批次,而不用重跑整天的数据;当 mapper2 失败的时候能直接从临时目录 merge 数据替换最终文件,减少了重新 ETL 天粒度的过程。

  在数据的实时分发订阅写入到 kafka1 的数据基本是每个业务的全量数据,但是针对需求方大部分业务都只关注某个事件、某小类别的数据,而不是任何业务都消费全量数据做处理,所以我们增加了一个实时分发 Databus 来解决这一个问题。

  Databus 支持业务方自定义分发 rules 往下游的 kafka 集群写数据,方便业务方订阅处理自己想要的数据,并且支持更小粒度的数据重复利用。

  有了原始数据并且能做离线、实时的数据开发以后,随之而来的是数据开发需求的井喷,数据研发团队应接不暇。所以我们通过数据平台的方式开放数据计算、存储能力,赋予业务方有数据开发的能力。

  对实现元数据管理、任务调度、数据集成、DAG 任务编排、可视化等不一一赘述,主要介绍数据开放后,美图对稳定性方面的实践心得。

  数据开放和系统稳定性是相爱相杀的关系:一方面,开放了之后不再是有数据基础的研发人员来做,经常会遇到提交非法、高资源消耗等问题的数据任务,给底层的计算、存储集群的稳定性造成了比较大的困扰;另外一方面,实际上也是因为数据开放,才不断推进我们一定要提高系统稳定性。

  针对不少的高资源、非法的任务,我们第一步考虑能否在 HiveSQL 层面能做一些校验、限制。如图 13 所示是 HiveSQL 的整个解析编译为可执行的 MR 的过程:

  首先基于 Antlr 做语法的解析,生成 AST,接着做语义解析,基于AST 会生成 JAVA 对象 QueryBlock。基于 QueryBlock 生成逻辑计划后做逻辑优化,最后生成物理计划,进行物理优化后,最终转换为一个可执行的 MR 任务。

  我们主要在语义解析阶段生成 QueryBlock 后,拿到它做了不少的语句校验,包括:非法操作、查询条件限制、高资源消耗校验判断等。

  我们完整地对 Hive、Hadoop 集群做了一次升级。还在于之前在低版本有 fix 一些问题以及合并一些社区的 patch,在后面新版本都有修复;另外一个原因是新版本的特性及性能方面的优化。我们把 Hive 从 0.13 版本升级到 2.1 版本,Hadoop 从 2.4 升级到 2.7;

  对 Hive 做了 HA 的部署优化。我们把 HiveServer 和 MetaStoreServer 拆分开来分别部署了多个节点,避免合并在一个服务部署运行相互影响;

  之前执行引擎基本都是 On MapReduce 的,我们也在做 Hive On Spark 的迁移,逐步把线上任务从 Hive On MR 切换到 Hive On Spark;

  拉一个内部分支对平时遇到的一些问题做 bugfix 或合并社区 patch 的特性;

  在平台稳定性方面的实践最后一部分是提高权限、安全性,防止对集群、数据的非法访问、攻击等。提高权限主要分两块:API 访问与集群。

  :上文提到我们有 OneDataAPI,提供给各个业务系统访问数据的统一 API。这方面主要是额外实现了一个统一认证 CA 服务,业务系统必须接入 CA 拿到 token 后来访问OneDataAPI,OneDataAPI 在 CA 验证过后,合法的才允许真正访问数据,从而防止业务系统能任意访问所有数据指标。

  :目前主要是基于 Apache Ranger 来统一各类集群,包括 Kafka、Hbase、Hadoop 等做集群的授权管理和维护;

  以上就是美图在搭建完数据平台并开放给各个业务线使用后,对平台稳定性做的一些实践和优化。

  ,看业务的整体体量是否比较大、业务线是否比较广、需求量是否多到极度影响我们的生产力。如果都是肯定答案,那可优先考虑尽快搭建数据平台,以更高效、快速提高数据的开发应用效率。如果本身的业务量级、需求不多,不一定非得套大数据或者搭建多么完善的数据平台,以快速满足支撑业务优先。

  ,比如关注数据源采集的完整性、时效性、设备的唯一标识,多在平台的稳定性方面做优化和实践,为业务方提供一个稳定可靠的平台。

  在提高分析决策效率以及规模逐渐扩大后需要对成本、资源做一些优化和思考。

  【2020全球物联网黑科技大赛暨物联网TOP100强创新排行榜火热启动】

  随着5G的全面部署,物联网郑重进入了爆发期,各大企业纷纷布局物联网,抢占物联网制高点。2020年,全球正经历了一场罕见的疫情,而物联网技术在这场抗疫斗争中发挥了及其重要的作用。全球物联网大会将汇聚各方资源,在历届大会创新成果评选的基础上,正式推出了“全球物联网黑科技大赛暨物联网TOP100强创新排行榜”的活动。

  本届大赛将广泛关注物联网、云计算、大数据、人工智能、工业互联网、区块链、5G等战略性新兴起的产业如何创新性地应用于智慧城市建设领域。同时将深入挖掘智慧交通、智慧社区、智慧家庭、智慧健康、智慧教育、智慧环保、智慧农业、智慧电网、智慧能源、智能硬件等公众关心的民生领域,面向全球广泛征集,评选出优秀的技术、产品和应用案例。

  组委会将邀请物联网行业的头部企业按照不同的细致划分领域进行分赛场的路演。围绕企业大咖组成企业导师团队,带领企业从分赛场脱颖而出。全部赛程将历时4个月,总决赛将在12月会师海南,同时将举办“2020全球物联网黑科技大赛成果点映和颁奖盛典”。

  这是全球物联网大会三年总结后的一次全面思考,更是一次创新突破。我们大家都希望实现黑科技的新律动,成就物联网黑科技的创新发展。参赛申报将于10月31日截止,参赛单位按照大赛启动通知及有关要求,组织申报材料,并报送大赛组委会,具体详情欢迎垂询中关村物联网产业联盟。