前段时间京东公开了面向第二个十二年的战略规划,表示京东将全面走向技术化,大力发展人工智能和机器人自动化技术,将过去传统方式构筑的优势全面升级。京东Y事业部顺势成立,该事业部将以服务泛零售为核心,着重智能供应能力的打造,核心使命是利用人工智能技术来驱动零售革新。 目前京东在全国范围内的运营256个大型仓库,按功能可划分为RDC、FDC、大件中心仓、大件卫星仓、图书仓和城市仓等等。RDC(Regional Distribution Center)即区域分发中心,可理解为一级仓库,向供货商采购的商品会优先送往这里,一般设置在中心城市,覆盖范围大。FDC(Forward Distribution Center)即区域运转中心,可理解为二级仓库,覆盖一些中、小型城市及边远地区,通常会根据需求将商品从RDC调配过来。 结合人工智能、大数据等技术,京东首先从供货商那里合理采购定量的商品到RDC,再根据实际需求调配到FDC,然后运往离客户最近的配送站,最后快递员将商品带到客户手中。这只是京东供应链体系中一个普通的场景,但正因为有这样的体系,使得京东对用户的响应速度大大提高,用户体验大大提升。 用户体验提升的同时也伴随着大量资金的投入和成本的提高,成本必须得到控制,整个体系才能发挥出最大的价值,于是对供应链的优化就显得至关重要了。 优化其实是一门运筹学问题,需考虑在各种决策目标之间如何平衡以达到最大收益,在这个过程中需要考虑很多问题,把这些考虑清楚,问题就容易解决了。举几个简单的例子:
虽然看上去这些问题都很容易回答,但仔细想想却又很难给出答案,原因就在于想要做到精确不是那么容易的事情,就拿补货来说,补的太多会增加库存成本,补的太少会增加缺货成本,只有合理的补货量才能做到成本最低。 借助机器学习、大数据等相关技术,京东在很多供应链优化问题上都已经实现系统化,由系统自动给出优化建议,并与生产系统相连接,实现全流程自动化。在这里有一项技术起着至关重要的低层支撑作用预测技术。据粗略估算,1%的预测准确度的提升可以节约数倍的运营成本。 怎样理解预测在供应链优化中的作用呢?拿商品补货举例,一家公司为了保证库房不缺货,可能会频繁的从供货商那里补充大量商品,这样做虽然不会缺货,但可能会造成更多卖不出去的商品积压在仓库中,从而使商品的周转率降低,库存成本增加。反之,这家公司有可能为了追求零库存而补很少的商品,但这就可能出现严重的缺货问题,从而使现货率降低,严重影响用户体验,缺货成本增加。于是问题就来了,要补多少商品才合适,什么时间补货,这就需要权衡考虑了,最终目的是要使库存成本和缺货成本达到一个平衡。 考虑一下极端情况,等库存降到零时再去补货,这时供货商接到补货通知后将货物运往仓库。但是这么做有个问题,因为运送过程需要时间,这段时间库房就缺货了。那怎么办呢?就是利用预测技术。利用预测我们可以计算出未来商品在途的这段时间里销量大概是多少,然后我们让仓库保证这个量,低于这个量就给供货商下达补货通知,于是问题得以解决。总而言之,预测技术在这里发挥了重要的作用,成为关键的一个环。 预测系统在整个供应链体系中处在最底层并且起到一个支撑的作用,支持上层的多个决策优化系统,而这些决策优化系统利用精准的预测数据结合运筹学技术得出最优的决策,并将结果提供给更上层的业务执行系统或是业务方直接使用。 目前,预测系统主要支持三大业务:销量预测、单量预测和GMV预测。其中销量预测主要支持商品补货、商品调拨;单量预测主要支持仓库、站点的运营管理;GMV预测主要支持销售部门计划的定制。 销量预测按照不同维度又可以分为RDC采购预测、FDC调拨预测、城市仓调拨预测、大建仓补货预测、全球购销量预测和图书促销预测等;单量预测又可分为库房单量预测、配送中心单量预测和配送站单量预测等(在这里“单量”并非指用户所下订单的量,而是将订单拆单后流转到仓库中的单量。例如一个用户的订单中包括3件物品,其中两个大件品和一个小件品,在京东的供应链环节中可能会将其中两个大件品组成一个单投放到大件仓中,而将那个小件单独一个单投放到小件仓中,单量指的是拆单后的量);GMV预测支持到商品粒度。 整体架构从上至下依次是:数据源输入层、基础数据加工层、核心业务层、数据输出层和下游系统。首先从外部数据源获取我们所需的业务数据,然后对基础数据进行加工清洗,再通过时间序列、机器学习等人工智能技术对数据进行处理分析,最后计算出预测结果并通过多种途径推送给下游系统使用。 数据源输入层:京东数据仓库中存储着我们需要的大部分业务数据,例如订单信息、商品信息、库存信息等等。而对于促销计划数据则大部分来自于采销人员通过Web系统录入的信息。除此之外还有一小部分数据通过文本形式直接上传到HDFS中。 基础数据加工层:在这一层主要通过Hive对基础数据进行一些加工清洗,去掉不需要的字段,过滤不需要的维度并清洗有问题的数据。 核心业务层:这层是系统的的核心部分,横向看又可分为三层:特征构建、预测算法和预测结果加工。纵向看是由多条业务线组成,彼此之间不发生任何交集。
预测结果输出层:将最终预测结果同步回京东数据仓库、MySql、HBase或制作成JSF接口供其他系统远程调用。 下游系统:包括下游任务流程、下游Web系统和其他系统。 预测系统核心层技术主要分为四层:基础层、框架层、工具层和算法层。 HDFS用来做数据存储,Yarn用来做资源调度,BDP(Big Data Platform)是京东自己研发的大数据平台,我们主要用它来做任务调度。 以Spark RDD、Spark SQL、Hive为主, MapReduce程序占一小部分,是原先遗留下来的,目前正逐步替换成Spark RDD。 选择Spark除了对性能的考虑外,还考虑了Spark程序开发的高效率、多语言特性以及对机器学习算法的支持。在Spark开发语言上我们选择了Python,原因有以下三点:
一方面我们会结合自身业务有针对性的开发一些算法,另一方面我们会直接使用业界比较成熟的算法和模型,这些算法都封装在第三方Python包中。我们比较常用的包有xgboost、numpy、pandas、sklearn、scipy和hyperopt等。
我们用到的算法模型非常多,原因是京东的商品品类齐全、业务复杂,需要根据不同的情况采用不同的算法模型。我们有一个独立的系统来为算法模型与商品之间建立匹配关系,有些比较复杂的预测业务还需要使用多个模型。我们使用的算法总体上可以分为三类:时间序列、机器学习和结合业务开发的一些独有的算法。 1. 机器学习算法主要包括GBDT、LASSO和RNN : GBDT:是一种迭代的决策树算法,该算法由多棵决策树组成,所有树的结论累加起来做最终答案。我们用它来预测高销量,但历史规律不明显的商品。 RNN:这种网络的内部状态可以展示动态时序行为。不同于前馈神经网络的是,RNN可以利用它内部的记忆来处理任意时序的输入序列,这让它可以更容易处理如时序预测、语音识别等。 LASSO:该方法是一种压缩估计。它通过构造一个罚函数得到一个较为精炼的模型,使得它压缩一些系数,同时设定一些系数为零。因此保留了子集收缩的优点,是一种处理具有复共线性数据的有偏估计。用来预测低销量,历史数据平稳的商品效果较好。 2. 时间序列主要包括ARIMA和Holt winters : ARIMA:全称为自回归积分滑动平均模型,于70年代初提出的一个著名时间序列预测方法,我们用它来主要预测类似库房单量这种平稳的序列。 Holt winters:又称三次指数平滑算法,也是一个经典的时间序列算法,我们用它来预测季节性和趋势都很明显的商品。 3. 结合业务开发的独有算法包括WMAStockDT、SimilarityModel和NewProduct等: WMAStockDT:库存决策树模型,用来预测受库存状态影响较大的商品。 SimilarityModel:相似品模型,使用指定的同类品数据来预测某商品未来销量。 NewProduct:新品模型,顾名思义就是用来预测新品的销量。 预测核心流程主要包括两类:以机器学习算法为主的流程和以时间序列分析为主的流程。
我们使用Spark SQL和Spark RDD相结合的方式来编写程序,对于一般的数据处理,我们使用Spark的方式与其他无异,但是对于模型训练、预测这些需要调用算法接口的逻辑就需要考虑一下并行化的问题了。我们平均一个训练任务在一天处理的数据量大约在500G左右,虽然数据规模不是特别的庞大,但是Python算法包提供的算法都是单进程执行。我们计算过,如果使用一台机器训练全部品类数据需要一个星期的时间,这是无法接收的,所以我们需要借助Spark这种分布式并行计算框架来将计算分摊到多个节点上实现并行化处理。 我们实现的方法很简单,首先需要在集群的每个节点上安装所需的全部Python包,然后在编写Spark程序时考虑通过某种规则将数据分区,比如按品类维度,通过groupByKey操作将数据重新分区,每一个分区是一个样本集合并进行独立的训练,以此达到并行化。流程如下图所示: 伪码如下: repartitionBy方法即设置一个重分区的逻辑返回(K,V)结构RDD,train方法是训练数据,在train方法里面会调用Python算法包接口。saveAsPickleFile是Spark Python独有的一个Action操作,支持将RDD保存成序列化后的sequnceFile格式的文件,在序列化过程中会以10个一批的方式进行处理,保存模型文件非常适合。 虽然原理简单,但存在着一个难点,即以什么样的规则进行分区,key应该如何设置。为了解决这个问题我们需要考虑几个方面,第一就是哪些数据应该被聚合到一起进行训练,第二就是如何避免数据倾斜。 针对第一个问题我们做了如下几点考虑:
针对第二个问题我们采用了如下的方式解决:
总之对于后两种处理方式可以单独通过一个Spark任务定期运行,并将这种分区规则保存。 注:《图解Spark:核心技术与案例实战》为本文作者所著。 《图解Spark:核心技术与案例实战》一书以Spark2.0版本为基础进行编写,系统介绍了Spark核心及其生态圈组件技术。其内容包括Spark生态圈、实战环境搭建和编程模型等,重点介绍了作业调度、容错执行、监控管理、存储管理以及运行架构,同时还介绍了Spark生态圈相关组件,包括了Spark SQL的即席查询、Spark Streaming的实时流处理、MLlib的机器学习、GraphX的图处理和Alluxio的分布式内存文件系统等。下面介绍京东预测系统如何进行资源调度,并描述如何使用Spark存储相关知识进行系统优化。 在图解Spark书的第六章描述了Spark运行架构,介绍了Spark集群资源调度一般分为粗粒度调度和细粒度调度两种模式。粗粒度包括了独立运行模式和Mesos粗粒度运行模式,在这种情况下以整个机器作为分配单元执行作业,该模式优点是由于资源长期持有减少了资源调度的时间开销,缺点是该模式中无法感知资源使用的变化,易造成系统资源的闲置,从而造成了资源浪费。 而细粒度包括了Yarn运行模式和Mesos细粒度运行模式,该模式的优点是系统资源能够得到充分利用,缺点是该模式中每个任务都需要从管理器获取资源,调度延迟较大、开销较大。 由于京东Spark集群属于基础平台,在公司内部共享这些资源,所以集群采用的是Yarn运行模式,在这种模式下可以根据不同系统所需要的资源进行灵活的管理。在YARN-Cluster模式中,当用户向YARN集群中提交一个应用程序后,YARN集群将分两个阶段运行该应用程序: 第一个阶段是把Spark的SparkContext作为Application Master在YARN集群中先启动;第二个阶段是由Application Master创建应用程序,然后为它向Resource Manager申请资源,并启动Executor来运行任务集,同时监控它的整个运行过程,直到运行完成。下图为Yarn-Cluster运行模式执行过程: 我们都知道大数据处理的瓶颈在IO。我们借助Spark可以把迭代过程中的数据放在内存中,相比MapReduce写到磁盘速度提高近两个数量级;另外对于数据处理过程尽可能避免Shuffle,如果不能避免则Shuffle前尽可能过滤数据,减少Shuffle数据量;最后,就是使用高效的序列化和压缩算法。在京东预测系统主要就是围绕这些环节展开优化,相关Spark存储原理知识可以参见图解Spark书第五章的详细描述。 由于资源限制,分配给预测系统的Spark集群规模并不是很大,在有限的资源下运行Spark应用程序确实是一个考验,因为在这种情况下经常会出现诸如程序计算时间太长、找不到Executor等错误。我们通过调整参数、修改设计和修改程序逻辑三个方面进行优化:
参数的调整虽然容易做,但往往效果不好,这时候需要考虑从设计的角度去优化:
为了进一步提高程序的运行效率,通过修改程序的逻辑来提高性能,主要是在如下方面进行了改进:避免过多的Shuffle、减少Shuffle时需要传输的数据和处理数据倾斜问题等。 1. 避免过多的Shuffle Spark提供了丰富的转换操作,可以使我们完成各类复杂的数据处理工作,但是也正因为如此我们在写Spark程序的时候可能会遇到一个陷阱,那就是为了使代码变的简洁过分依赖RDD的转换操作,使本来仅需一次Shuffle的过程变为了执行多次。我们就曾经犯过这样一个错误,本来可以通过一次groupByKey完成的操作却使用了两回。 业务逻辑是这样的:我们有三张表分别是销量(s)、价格(p)、库存(v),每张表有3个字段:商品id(sku_id)、品类id(category)和历史时序数据(data),现在需要按sku_id将s、p、v数据合并,然后再按category再合并一次,最终的数据格式是:[category,[[sku_id, s , p, v], [sku_id, s , p, v], […],[…]]]。一开始我们先按照sku_id category作为key进行一次groupByKey,将数据格式转换成[sku_id, category , [s,p, v]],然后按category作为key再groupByKey一次。 后来我们修改为按照category作为key只进行一次groupByKey,因为一个sku_id只会属于一个category,所以后续的map转换里面只需要写一些代码将相同sku_id的s、p、v数据group到一起就可以了。两次groupByKey的情况: 修改后变为一次groupByKey的情况: 多表join时,如果key值相同,则可以使用union groupByKey flatMapValues形式进行。比如:需要将销量、库存、价格、促销计划和商品信息通过商品编码连接到一起,一开始使用的是join转换操作,将几个RDD彼此join在一起。后来发现这样做运行速度非常慢,于是换成union groypByKey flatMapValue形式,这样做只需进行一次Shuffle,这样修改后运行速度比以前快多了。实例代码如下: 如果两个RDD需要在groupByKey后进行join操作,可以使用cogroup转换操作代替。比如, 将历史销量数据按品类进行合并,然后再与模型文件进行join操作,流程如下: 使用cogroup后,经过一次Shuffle就可完成了两步操作,性能大幅提升。 2. 减少Shuffle时传输的数据量
comebineyeByKey属于聚合类操作,由于它支持map端的聚合所以比groupByKey性能好,又由于它的map端与reduce端可以设置成不一样的逻辑,所以它支持的场景比reduceByKey多,它的定义如下: educeByKey和groupByKey内部实际是调用了comebineyeByKey, 我们之前有很多复杂的无法用reduceByKey来实现的聚合逻辑都通过groupByKey来完成的,后来全部替换为comebineyeByKey后性能提升了不少。 声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系 [邮箱地址] 删除 |