FranciscoGarcíaParamés《长期投资》作品简介与读书感悟

《大数据和人工智能交流》头条号向广大初学者新增C、Java、Python、Scala、javascript等目前流行的计算机、大数据编程语言,希望大家以后关注本头条号更多的内容。(一)Spark简介1

《大数据和人工智能交流》头条号向广大初学者新增C 、Java 、Python 、Scala、javascript 等目前流行的计算机、大数据编程语言,希望大家以后关注本头条号更多的内容。

(一)Spark简介

1、什么是spark

Spark是一种基于内存计算的开源框架。它于2009年诞生于美国加州大学伯克利分校AMPLab,它最初属于研究项目,后来在2010年正式开源,2013年成立apache基金项目,到2014年成为apache基金顶级项目,2014年5月30日,spark1.0正式上线

Spark项目组核心成员在2013年创建了DataBricks公司,到目前为止已经在San Francisco连续举办了从2013年到2015年的spark峰会,会议得到大数据主流数据商

2、spark的特点

(1)轻量级的快速处理

Spark采用Scala语言编写,底层采用了Actor 模型的akka作为通讯框架,并发操作容易实现、功能强大;基于DAG图的执行引擎,减少多次计算之间中间结果写到Hdfs的开销RDD(分布式内存抽象)以基本一致的方式应对不同的大数据处理场景,spark2.0以后又完善DataSet等功能;提供Cache机制来支持需要反复迭代的计算或者多次数据共享,减少数据读取的IO开销;与Hadoop的MapReduce相比,Spark基于内存的运算比MR要快100倍;而基于硬盘的运算也要快10倍

(2)易于使用

Spark支持多种语言,包括Java、Scala、Python、R(1.4版本以后支持),这允许更多的开发者在自己熟悉的语言环境下进行工作,扩大了spark的应用范围。它自带80以上高级操作符,允许在shell中进行交互查询

(3)提供大数据整体解决方案

以其RDD模型的强大表现能力,逐渐形成了一套自己的生态圈,提供了整体的解决方案:主要包括Spark内存中批处理,Spark SQL交互式查询,Spark Streaming流式计算, GraphX和MLlib提供的常用图计算和机器学习算法

Spark支持实时的流处理,spark Streaming有简单、轻量、强大的API、容错能力强、集成性好的特点

(5)与Hadoop无缝衔接

Spark可以使用YARN作为它的集群管理器读取HDFS,HBase等一切Hadoop的数据,这个特性可以让用户轻而易举的迁移已有的hadoop应用

3、spark的整体架构

(1)Spark的整体架构如下图所示:

Spark提供了多种高级工具: Shark SQL应用于即时查询、Spark Streaming应用于流式计算、 MLlib应用于机器学习、GraphX应用于图处理

Spark可以基于自带的standalone集群管理器独立运行,也可以部署在Apache Mesos 和 Hadoop YARN 等集群管理器上运行

Spark可以访问存储在HDFS、 Hbase、Cassandra、Amazon S3、本地文件系统等等上的数据,Spark支持文本文件,序列文件,以及任何Hadoop的InputFormat

(2)Spark 架构设计

Spark应用在集群上作为独立的进程组来运行,在main程序中通过SparkContext来协调(称之为驱动程序)。具体的说,为了在集群上运行,SparkContext 可以连接至几种类型的Cluster Manager ( 既可以用Spark自己的独立的Cluster Manager,或者Mesos,也可以使用YARN),它们会分配应用的资源。一旦连接上,Spark 获得集群中节点上的Executor,这些进程可以运行计算并且为应用存储数据。接下来,它将发送应用代码(通过JAR或者Python文件定义传递给SparkContext) 至Executor最终,SparkContext将发送任务到Executor运行。Spark 运行架构如图10.9所示。

图10.9 Spark运行架构图

关于这个架构有几个需要注意的地方。

第一、每个应用获取到自己的Executor 进程,会在整个应用的生命周期中保持并且在多个线程中运行任务。这样做的优点是在调度方面(每个驱动程序调度自己的任务)和Executor方面( 来自不同应用的任务运行在不同的JVM中)把应用互相隔离。然而这也意味着若不把数据写到外部的存储系统中,数据就不能够在不同的Spark应用SparkContext 的实例)之间共享。

第二、 Spark 是不知道底层的Cluster Manager到底是什么类型的。只要它能够获得Executor进程,并且它们彼此之间可以通信,那么即使是在一个也 支持其他应用的Cluster Manager (例如,Mesos/YARN) 上运行, 也是相对简单的。

第三、driver程序必须在自己的生命周期内监听和接受来自它的Executor的连接请求同样的,driver 程序也必须可以从Worker Node上进行网络寻址。

第四、因为driver 调度了集群上的task,更好的方式应该是在相同的局域网中靠近Worker Node运行。如果不喜欢发送请求到远程的集群,不如打开一个RPC到diver并让它就近提交操作,而不是从很远的Worker Node上运行个driver。

(3)Spark 运行模式

系统当前支持3种Cluster Manager

第一、独立模式: 包含在Spark中使得它更容易安装集群的一个简单的Cluster Manager。

第二、 Mesos: 一个通用的Cluster Manager,它也可以运行Hadoop MapReduce和其他服务应用。

第三、Hadoop YARN Hadoop 2中的resource manager (资源管理器)。

不管采用什么运行模式,代码都是一样的,只是在提交的时候使用参数指定即可,具体在Spark应用程序开发案例中讲解。

4、Spark2.0版本新特性

第一:更容易的SQL和Streamlined APIs:

Spark 2.0主要聚焦于两个方面:对标准的SQL支持;统一DataFrame和Dataset API。

在编程API方面,我们对API进行了精简:

(1) 统一Scala和Java中DataFrames和Datasets的API

从Spark 2.0开始,DataFrame仅仅是Dataset的一个别名。有类型的方法(typed methods)(比如:map,filter,groupByKey)和无类型的方法(untyped methods)(比如:select,groupBy)目前在Dataset类上可用。同样,新的Dataset接口也在Structured Streaming中使用。因为编译时类型安全(compile-time type-safety)在Python和R中并不是语言特性,所以Dataset的概念并不在这些语言中提供相应的API。而DataFrame仍然作为这些语言的主要编程抽象

(2)SparkSession

一个新的切入点,用于替代旧的SQLContext和HiveContext。对于那些使用DataFrame API的用户,一个常见的困惑就是我们正在使用哪个context?现在我们可以使用SparkSession了,其涵括了SQLContext和HiveContext,仅仅提供一个切入点。需要注意的是为了向后兼容,旧的SQLContext和HiveContext目前仍然可以使用

(3)简单以及性能更好的Accumulator API

Spark 2.0中设计出一种新的Accumulator API,它拥有更加简洁的类型层次,而且支持基本类型。为了向后兼容,旧的Accumulator API仍然可以使用

(4)基于DataFrame的Machine Learning API可以作为主要的ML API了:

在Spark 2.0中, spark.ML包以其pipeline API将会作为主要的机器学习API了,而之前的spark.mllib仍然会保存,将来的开发会聚集在基于DataFrame的API上

(5)Machine learning pipeline持久化

现在用户可以保存和加载Spark支持所有语言的Machine learning pipeline和models。

(6)R的分布式算法

在R语言中添加支持了Generalized Linear Models (GLM),Naive Bayes,Survival Regression,and K-Means。

根据以往的调查,91%的用户认为Spark的最重要的方面就是性能,结果性能优化在Spark开发中都会看的比较重。

第二、速度更快:

Spark 2.0中附带了第二代Tungsten engine,这一代引擎是建立在现代编译器和MPP数据库的想法上,并且把它们应用于数据的处理过程中。主要想法是通过在运行期间优化那些拖慢整个查询的代码到一个单独的函数中,消除虚拟函数的调用以及利用CPU寄存器来存放那些中间数据。

第三、更智能:

作为首个尝试统一批处理与流处理计算的工具,Spark Streaming一直是大数据处理的领导者。首个流处理API叫做DStream,在Spark 0.7中初次引入,它为开发者提供了一些强大的特性,包括:只有一次语义,大规模容错,以及高吞吐

然而,在处理了数百个真实世界的Spark Streaming部署之后,我们发现需要在真实世界做决策的应用经常需要不止一个流处理引擎。他们需要深度整合批处理堆栈与流处理堆栈,整合内部存储系统,并且要有处理业务逻辑变更的能力。因此,各大公司需要不止一个流处理引擎,并且需要能让他们开发端对端&34;的全栈系统

Spark 2.0使用一个新的API:Structured Streaming模块来处理这些用例,与现有流系统相比,Structured Streaming有三个主要的改进:

(1)与批处理作业集成的API

想要运行流数据计算,开发者可针对DataFrame/Dataset API编写批处理计算,过程非常简单,而Spark会自动在流数据模式中执行计算,也就是说在数据输入时实时更新结果。强大的设计令开发者无需费心管理状态与故障,也无需确保应用与批处理作业的同步,这些都由系统自动解决。此外,针对相同的数据,批处理任务总能给出相同的结果

(2)与存储系统的事务交互

Structured Streaming会在整个引擎及存储系统中处理容错与持久化的问题,使得程序员得以很容易地编写应用,令实时更新的数据库可靠地提供、加入静态数据或者移动数据

(3)与Spark的其它组件的深入集成

Structured Streaming支持通过Spark SQL进行流数据的互动查询,可以添加静态数据以及很多已经使用DataFrames的库,还能让开发者得以构建完整的应用,而不只是数据流管道。未来,我们希望能有更多与MLlib及其它libraries的集成出现

(二)Spark的主要概念和整体运行过程

1、spark的主要概念

Spark任务提供多层分解的概念,Spark组件将用户的应用程序分解为内部执行任务并提供执行容器,资源管理器为spark组件提供资源管理和调度

2、spark的整体流程

Spark端到端流程如下图所示:

(1)Spark的Driver Program (简称Driver)包含用户的应用程序

(2)Driver完成task的解析和生成

(3)Driver向Cluster Manager(集群资源管理器)申请运行task需要的资源

(4)集群资源管理器为task分配满足要求的节点,并在节点按照要求创建Executor

创建的Executor向Driver注册

(5)Driver将spark应用程序的代码和文件传送给分配的executor

(6)executor运行task,运行完之后将结果返回给Driver或者写入HDFS或其他介质

3、Spark的安装

(1)在 官网下载Spark

地址为,打开上述链接,进入到下图,点击红框下载Spark-2.2.0-bin-hadoop2.7.tgz,如下图所示:

(2)将压缩包上传Linux并解压

tar –zxvf spark-2.2.0-bin-hadoop2.7.tgz

(3)在配置文件/etc/profile配置环境变量

export SPARK_HOME=/home/spark2/spark-2.2.0-bin-hadoop2.7

export PATH=PATH:SPARK_HOME/bin:$SPARK_HOME/sbin

(4)配置conf/slaves

打开配置文件conf/slaves,默认情况下没有slaves,需要使用cp命令复制slaves.template。

cp slaves.template slaves

加入slaves配置节点:

node001

node002

node003

(5)配置conf/spark-env.sh

打开配置文件spark-env.sh,如果不在/opt/app/spark-2.2.0-bin-hadoop2.7目录下,则使用该命令:

cp spark-env.sh.template spark-env.sh

在spark-env.sh加入如下内容,设置node0011为Master节点:

export JAVA_HOME=/usr/local/jdk18/jdk1.8

export SPARK_MASTER_IP=node0011

export SPARK_MASTER_PORT=7077

export SPARK_WORKER_CORES=1

export SPARK_WORKER_INSTANCES=1

export SPARK_WORKER_MEMORY=1024M

(6)向各节点分发Spark程序

进入node001机器/home/spark2/目录,使用如下命令把spark文件夹复制到node002和node003结点

cd /home/spark2

scp -r spark-2.2.0-bin-hadoop2.7 hadoop@hadoop2:/home/spark2

scp -r spark-2.2.0-bin-hadoop2.7 hadoop@hadoop3:/home/spark2

(7)启动Spark集群

cd /home/spark2/spark-2.2.0-bin-hadoop2.7/sbin

./start-all.sh

(8)使用jps在集群各个结点验证启动的进程

在node001上面运行的进程有:Worker和Master,在node002和node003上面运行的进程有Worker,在浏览器上输入http://node001:8080,查看Spark集群状态:

(9)验证客户端连接

在node001节点,进入spark的bin目录,使用spark-shell连接集群:

cd /home/spark2/spark-2.2.0-bin-hadoop2.7/bin

spark-shell --master spark://hadoop1:7077 --executor-memory 512m

(三)Spark的RDD对象

1、RDD概述

RDD是弹性分布式数据集,即一个RDD代表一个被分区的只读数据集。RDD是Spark操纵数据的一个高度抽象,即Spark所操作的数据集都是封装成RDD来进行操作的,例如Spark可以兼容处理Hadoop的HDFS数据文件,那么这个HDFS数据文件封装成Spark能够识别的RDD对象来完成数据的处理。

RDD的一个英文定义是:RDDs are fault-tolerant,parallel data structures that let users explicitly persist intermediate results in memory,control their partitioning to optimize data placement,and manipulate them using a rich set of operators. RDD是一个能够让用户可以准确的将中间结果数据持久化到内存中的一个可用错的并行数据结构,可以控制(RDD数据集)分区,优化数据存储,并且有一组丰富的操作集可以操作这份数据。

2、RDD 工作原理

RDD的运行主要分为三步:创建 RDD 对象, DAG 调度器创建执行计划, Task 调度器分配任务并调度 Worker 开始运。

下面是上图工作流程的详解:

第一、创建RDD,RDD创建后中间可能进行一些算子转换(Transformations)运算,比如map、flatMap、reduceByKey等运算,最后执行Action算子(这些算子是能够触发DAGScheduler执行的函数,例如collect、saveAsTextFile等)。

RDD对象的获取有如下3种方式:

(1) 通过文件加载(加载HDFS等文件)创建

sc.textFile(&34;)

(2)通过RDD之间的转换

使用parallelize将其它对象转化为RDD对象

val rdd=sc.parallelize(List(1,2,3))

通过makeRDD将其它对象转化为RDD对象

val rdd = sc.makeRDD(Array(1,2,3))

第二、Action会触发SparkContext的runJob方法,交给DAGScheduler处理,将DAG(directed acyclic graph ,有向无环图)生成Stage。

Stage的划分过程中,会涉及到宽依赖和窄依赖的概念,宽依赖是Stage的分界线,连续的窄依赖都属于同一Stage

比如上图中,在RDD G处调用了Action操作,在划分Stage时,会从G开始逆向分析,G依赖于B和F,其中对B是窄依赖,对F是宽依赖,所以F和G不能算在同一个Stage中,即在F和G之间会有一个Stage分界线。上图中还有一处宽依赖在A和B之间,所以这里还会分出一个Stage。最终形成了3个Stage,由于Stage1和Stage2是相互独立的,所以可以并发执行,等Stage1和Stage2准备就绪后,Stage3才能开始执行Stage有两个类型,最后的Stage为ResultStage类型,除此之外的Stage都是ShuffleMapStage类型

第三、然后他会将stage作为tasksets提交给底层的TaskScheduler,由TaskScheduler执行,由TaskScheduler启动Executor进行任务的计算。

(四)、RDD算子

(1)转换型算子:

使RDD相互转换的算子,如map、filter、flatMap、reduceByKey等。Transformations算子通常分为value型算子和Key-Value型算子。其中value型算子可以直接使用,而Key-Value型算子封装在PairRDDFunctions类中。

(2)Action(行动)算子

能够触发DAG执行的函数,比如collect、count、saveAsTextFile等。

下面举例说明常见的RDD的)转换型算子和Action算子的应用:

1、RDD的转换型算子

(1)RDD的map算子

val rdd1 = sc.parallelize(Array(1,2,3,4,5))

val r1 = rdd1.map (_ *2)

r1.foreach(println)

(2)filter算子

val rdd2 = sc.parallelize(1 to 10)

var r2 = rdd2.filter(_%2 == 0)

r2.foreach(println)

(3)RDD的flatMap算子

val rdd4 = sc.parallelize(Array(&34;,&34;,&34;))

val r4 = rdd4.flatMap (_ *2)//计算后合并结果集

r4.foreach(println)

(4)RDD的reduceByKey算子

val rdd5_1 = sc.makeRDD(Array((&34;,1),(&34;,1),(&34;,1),(&34;,1)))

val r5_1 = rdd5_1.reduceByKey(_ + _)

r5_1.foreach(println)

val r5_2 = rdd5_1.reduceByKey((x,y)=>x+y) ////或者写全函数

r5_2.foreach(println)

(5)RDD的join算子

val rdd6 = sc.makeRDD(Array((&34;,&34;),(&34;,&34;),(&34;,&34;)))

val rdd7 = sc.makeRDD(Array((&34;,&34;),(&34;,&34;),(&34;,&34;)))

val r8 = rdd6.join(rdd7) // 按照key关联结果集

r7.foreach(println)

2、RDD的Action算子

(1)RDD的first算子

val rdd1 = sc.parallelize(Array(&34;,&34;,&34;))

println(rdd1.first)

(2)RDD的take算子

val rdd2 = sc.parallelize(Array(1,2,3,4,5))

val arr2 = rdd2.take(3) //从对象中取出3个元素1,2,3

arr2.foreach (println)

(3)RDD的top算子

val rdd3 = sc.parallelize(Array(9,2,6,8,112))

val arr3 = rdd3.top(3) //降序排序后从对象中取出3个元素112,9,8

arr3.foreach (println)

(4)RDD的takeOrdered算子

val rdd4 = sc.parallelize(Array(9,2,6,8,112))

val arr4 = rdd4.takeOrdered(3) //升序排序后从对象中取出3个元素2,6,8

arr4.foreach (println)

(5)RDD的reduce算子

val rdd5 = sc.parallelize(Array(1,2,3,4,5))

println(rdd5.reduce((x,y)=>x*y))//1x2x3x4x5=120

(6)RDD的lookup算子

val rdd6 = sc.makeRDD(List((&34;,1),(&34;,1),(&34;,2),(&34;,1)))

val r6 = rdd6.lookup(&34;)//查询key值为&34;的value值

r6.foreach(println)

(7)RDD的countByValue算子

val rdd7 = sc.parallelize(List(&34;,&34;,&34;,&34;))

println(rdd7.countByValue()) //统计对象的数据出现次数

(8)最大值、最小值

val y = sc.parallelize(10 to 30)

println(y.max)

println(y.min)

(9)saveAsTextFile算子

val rdd = sc.parallelize(1 to 10 ,2)

rdd.saveAsTextFile(&34;)

(五)Spark的Scheduler阶段处理

Scheduler模块作为Spark核心的模块之一,充分体现了Spark与MapReduce的不同之处,体现了Spark DAG思想的精巧和设计的优雅。Scheduler模块分为两大主要部分,DAGScheduler和TaskScheduler,一个RDD运算如下图所示:

1、Scheduler阶段的DAGScheduler

DAGScheduler的作用把一个spark作业转换成成Stage的DAG(Directed Acyclic Graph有向无环图)图,根据RDD和Stage之间的关系,找出开销最小的调度方法,然后把Stage以TaskSet的形式提交给TaskScheduler

2、Scheduler阶段的TaskScheduler

TaskScheduler模块用于与DAGScheduler交互,负责任务的具体调度和运行。任务调度模块基于两个Trait:TaskScheduler和 SchedulerBackend

(1)TaskScheduler:定义了任务调度模块的对外接口(submitTasks等),供DAGScheduler调用

(2)TaskSchedulerImpl是TaskScheduler的具体实现类,完成资源与任务的调度

(3)SchedulerBackend封装了各种Backend,用于与底层资源调度系统交互,配合TaskSchedulerImpl实现任务执行所需的资源分配

(4)SchedulableBuilder负责taskset的调度

(5)TaskSetManager负责一个taskset中task的调度

TaskScheduler是trait,用于与DAGScheduler交互,主要负责任务的调度和运行,无具体实现,仅仅为对外统一接口。核心接口是: submitTasks,具体实现见TaskSchedulerImpl 中的submitTasks 。接收DAGScheduler的Task请求,分发Task到集群运行并监控运行状态,并将结果以event的形式汇报给DAGScheduler

TaskSchedulerImpl实现了TaskScheduler Trait,实现了资源和任务的调度。

核心接口是: ResourceOffers,根据提供的资源列表offers,返回满足条件的tasks,供SchedulerBackend 调用。资源和任务调度的核心思想:资源驱动。即当有空闲资源时,查看是否有task需要运行(遵循Locality)

3、TaskScheduler之SchedulableBuilder

SchedulableBuilder主要负责TaskSet的调度,核心接口是:getSortedTaskSetQueue,该接口返回排序后的TaskSetManager队列,该接口供TaskSchedulerImpl调用。SchedulableBuilder维护的是一颗树,根节点是rootpool,叶子节点是TaskSetManager对象

4、TaskScheduler之TaskSetManager

TaskSetManager主要负责一个taskset中task的调度和跟踪。核心接口是:resourceOffer,该接口根据输入的资源 在taskset内部调度一个task,主要考虑因素是Locality,该接口供TaskSchedulerImpl调

5、TaskScheduler之SchedulerBackend

SchedulerBackend是trait,封装了多种backend,用于与底层资源调度系统交互(如mesos/YARN),配合TaskScheduler实现具体任务执行所需的资源分配。核心接口是: reviveOffers,与TaskSchedulerImpl交互完成task的Launch

SchedulerBackend只关心资源,不关心task。提交资源供TaskSchedulerImpl分配task

(六)Spark的Shuffle

Shuffle中文翻译为&34;,需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算,运行Task的时候才会产生Shuffle。在一个Stage内部,不持久化中间结果,数据丢失重新计算依赖的RDD;但是在产生Shuffle的时候,会产生网络通信,这是需要持久化。持久化默认情况下放在磁盘中,也可以调整Spark的框架,将数据放在内存中,现在一般放在Local FileSystem上面,也可以放在Tachyon中,这些都可以通过调整Spark的配置和改造Spark源码来实现。

Spark本身的计算通常都是在内存中完成的,比如这样一个map结构的RDD:(String,Seq),key是字符串,value是一个Seq,如果只是对value进行一一映射的map操作,比如(1)先计算Seq的长度,(2)再把这个长度作为元素添加到Seq里面去。这两步计算,都可以在local完成,而事实上也是在内存中操作完成的,换言之,不需要跑到别的node上去拿数据,因此执行的速度是非常快的。但是,如果对于一个大的rdd,shuffle发生的时候,就会因为网络传输、数据序列化/反序列化产生大量的磁盘IO和CPU开销。这个性能上的损失是非常巨大的。要减少shuffle的开销,主要有要减少shuffle次数和减少shuffle的数据规模

Spark的操作模型是基于RDD的,当调用RDD的reduceByKey、groupByKey等类似的操作的时候,就需要有shuffle了

MapRduce和Spark的 Shuffle比较:

1、MapReduce

MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据

为 什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分 发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过 Shuffle来获取数据。

从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程,如图所示

Spill过程包括输出、排序、溢写、合并等步骤,如图所示:

2、spark的shuffle

hadoop和spark的shuffle,重要的区别是数据的聚合和聚合数据的计算。Hadoop中聚合是对于同一分区的数据进行排序,键值相同的数据彼此相邻,从而达到聚合的目的,而聚合后的数据交给map端或者reduce端的combine函数处理

spark聚合及数据计算是交给Aggregator处理的,其定义如下:

case class Aggregator[K,V,C] (

createCombiner: V => C。

mergeValue: (C,V) => C。

mergeCombiners: (C,C) => C){…

}

以reduceByKey(_ + _)转换操作为例,介绍3个函数是如何实现数据聚合计算的,假设需要聚合的数据的键值对为:<&34;,1>、<&34;,1>、<&34;,1>,数据是无序的。

对于<&34;,1>、<&34;,1>,Spark通过散列函数计算键值&34;和&34;对应的哈希表地址,首先调用createCombiner对数据进行初始化操作,如图所示:

这时候,又有个数据<&34;,1>,&34;的hashcode当然也是&34;,这时候就调用mergeValue(oldValue,&34;)来计算新的值,对于reduceByKey函数,&34;得到新值2,如图:

对于规约、合并操作,调用mergeCombiners函数。

Spark的shuffle分为哈希shuffle和排序shuffle:

(1)hash方式

shuffle不排序,效率高;生成MXR个shuffle中间文件,一个分片一个文件; 产生和生成这些中间文件会产生大量的随机IO,磁盘效率低;shuffle时需要全部数据都放在内存,对内存消耗大;适合数据量能全部放到内存,reduce操作不需要排序的场景。

(2)Sort方式

shuffle需要排序;生成M个shuffle中间数据文件,一个Map所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量;shuffle能够借助磁盘(外部排序)处理庞大的数据集;数据量大于内存时只能使用Sort方式,也适用于Reduce操作需要排序的场景。

(七)Spark的Storage模块

Storage模块负责了Spark计算过程中所有的存储,包括基于Disk的和基于Memory的。用户在实际编程中,面对的是RDD,可以将RDD的数据通过调用org.apache.spark.rdd.RDD#cache将数据持久化;持久化的动作都是由Storage模块完成的。包括Shuffle过程中的数据,也都是由Storage模块管理的。可以说,RDD实现了用户的逻辑,而Storage则管理了用户的数据。

Storage模块主要分为两层:

1、通信层:storage模块采用的是master-slave结构来实现通信层,master和slave之间传输控制信息、状态信息,这些都是通过通信层来实现的

2、存储层:storage模块需要把数据存储到disk或是memory上面,有可能还需replicate到远端,这都是由存储层来实现和提供相应接口。

Storage模块提供了统一的操作类BlockManager,外部类与storage模块打交道都需要通过调用BlockManager相应接口来实现。

Storage模块存取的最小单位是数据块(Block),Block与RDD中的Partition一一对应,所以所有的转换或动作操作最终都是对Block进行操作。

Spark数据写入过程分析:

1、RDD的iterator调用CacheManager的getOrCompute函数

FranciscoGarcíaParamés《长期投资》作品简介与读书感悟

2、CacheManager调用BlockManager的put接口来写入数据

3、BlockManager根据输入的storageLevel来确定是写内存还是写硬盘

4、通知BlockManagerMaster有新的数据写入,在BlockManagerMaster中保存元数据

5、将写入的数据与其它slave worker进行同步(一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即replicanumber=1)

通信层和存储层分析:

1、通信层

(1)Driver和Executor都有一个BlockManager,里面都包含了BlockManagerMasterActor和BlockManagerSlaveActor

(2)BlockManagerMasterActor类主要负责控制消息和状态之间的传递和处理。传递的消息包括Register、StorageStatus、updateBlockInfo、getLocation等,消息处理包括返回或更新block以及executor的元数据信息,并调用BlockManagerSlaveActor(ref )与Executor通信

(3)BlockManagerSlaveActor类传递的消息主要是removeRdd、removeBlock等消息,消息处理主要是对本executor进行rdd和block的删除操作

2、存储层

(1)BlockManager包含了DiskStore类和MemoryStore类。

(2)DiskStore:每一个block都被存储为一个file,通过计算block id的hash值将block映射到文件中。

(3)MemoryStore:内部维护了一个hash map来管理所有的block,以block id为key将block存放到hash map中。

(八)RDD的缓存机制

1、缓存概述

通过对Spark RDD的学习我们知道RDD的转换是迟缓型的,而有时候我们希望能够多次使用同个 RDD,如果简单地对RDD执行调用操作,Spark每次都会重算 RDD以及它的所有依赖,这在迭代算法中消耗很大,此时我们可以让Spark对数据进行缓存或者持久化操作。

当Spark持久化存储一个RDD时,如果一个有持久化数据的节 点发生故障,Spark会在需要用到缓存数据时重新计算丢失的数据分区,为此可以把数据备份到多个节点以避免这种情况发生。

Spark中一个很重要的能力是将数据缓存(或称为持久化),在多个操作间都可以访问这些持久化的数据。当持久化一个RDD时, 每个节点会将本节点计算的数据块存储到内存、磁盘,或者存储多副本。具体的缓存策略可以根据StorageLevel进行设置,在该数据上的其他动作算子将直接使用内存中的数据。这样会让以后的动作算子计算速度加快(通常运行速度会加速10倍)。缓存是迭代算法和快速的交互式使用的重要工具。

RDD可以使用persist() 方法或cache()方法进行持久化。数据将会在第一次动作时进行计算,并在各个节点的内存中缓存。Spark 的缓存具有容错机制,如果一个缓存的RDD的某个分区丢失了,Spark将按照原来的计算过程,自动重新计算并进行缓存。

2.是否使用缓存对比

(1)不使用缓存。不使用缓存的情况如图10.7所示,执行流程分析如下。

①从HDFS读取数据。

②执行第一次count操作,肯定会从HDFS上读取数据,形成linesRDD,然后再针对linesRDD执行count操作,从而统计出HDFS文件的行数。

❷count③执行第二次count操作,还是会从HDFS上读取数据,形成linesRDD,然后再针对linesRDD执行count操作,从而统计出HDFS文件的行数。这里会多次读取HDFS上的文件并进行统计操作。

图10.7不使用缓存

默认情况下,这种针对大量数据的动作算子都是非常耗时的,那么多执行几次这样的动作算子,性能就会降低很多。在Spark中,如果对某个RDD进行多次操作,每次都要重新计算一个RDD,就会反复消耗大量的时间,从而大大降低整体性能,定要避免这种情况的发生。

(2)使用缓存。使用缓存的情况如图10.8 所示,执行流程分析如下。

①从HDFS读取数据。

②对linesRDD进行持久化操作,需要注意的是cache是惰性的。

③执行第一次count操作,肯定会从HDFS.上读取数据,形成linesRDD,然后再针对linesRDD执行count操作,从而统计出HDFS文件的行数;由于触发了动作算子,那么就会把linesRDD的数据进行持久化;虽然第一次count操作执行完了,但是也不会清除掉linesRDD中的数据,反而将数据缓存到内存或者磁盘中。

④执行第二次count操作,就不会重新从HDFS上读取数据,而是直接从linesRDD所在的节点的内存或者磁盘中取出linesRDD 的数据,进行count操作;这样就不需要多次计算同一个RDD,在大数据场景下,可以大幅度提升Spark应用的性能。

图10.8使用缓存

3.缓存策略

每个持久化的RDD可以使用不同的存储级别进行缓存,例如,持久化到磁盘、以序列化的Java对象形式持久化到内存(可以节省空间)、跨节点间复制、以堆外内存(off-heap)的方式存储在Alluxio(以前叫Tachyon,类似的产品还有ApacheIgnite)。这些存储级别通过给persist(方法传递- 个StorageLevel对象(Scala、 Java、Python)进行设置。cache() 方法是使用默认存储级别StorageLevel.MEMORY-ONLY (将反序列化的对象存储到内存中)的快捷设置方法,详细的存储级别如下。

(1) MEMORY-ONLY: 将RDD以反序列化Java对象的形式存储在JVM中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这此数据时再重新进行计算。这是默认的级别。

(2) MEMORY-And-DISK: 将RDD以反序列化Java对象的形式存储在JVM中。如果内存空间不够,将未缓存的数据分区存储到磁盘,需要使用这些分区时再从磁盘读取。

(3) MEMORY-ONLY-SER: 将RDD以序列化的Java对象的形式进行存储(每个分区为一个 byte数组)。这种 方式会比反序列化对象的方式 节省很多空间,尤其是在使用快速序列化时会节省更多的空间,但是在读取时会增加CPU的计算负担。

(4) MEMORY-AND-DISK-SER: 类似于MEMORY ONLY SER,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。

(5) DISK-ONLY: 只在磁盘上缓存RDD.

(6) MEMORY-ONLY-2,MEMORY-AND-DISK-2,等等: 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。

在shufle操作中(例如reducByKey),即便是用户没有调用persist方法,Spark也会自动缓存部分中间数据。这么做的目的是,在shuffle的过程中某个节点运行失败时,不需要重新计算所有的输入数据。如果用户想多次使用某个RDD强烈推荐在该RDD上调用persist方法。

4.如何选择存储级别

Spark的存储级别选择的核心问题是在内存使用率和CPU效率之间进行权衡建议按下面的过程进行存储级别的选择。

(1)如果使用默认的存储级别(MEMORY-ONLY),存储在内存中的RDD没有发生溢出,那么就选择默认的存储级别。默认存储级别可以最大程度地提高CPU的效率,可以使RDD上的操作以最快的速度运行。

(2)如果内存不能全部存储RDD,那么使用MEMORY-ONLY-SER,并挑选一个快速序列化库将对象序列化,以节省内存空间。使用这种存储级别,计算速度仍然很快。

(3)除了在计算该数据集的代价特别高或者在需要过滤大量数据的情况下,尽量不要将溢出的数据存储到磁盘。因为,重新计算这个数据分区的耗时与从磁盘读取这些数据的耗时差不多。

(4)如果想快速还原故障,建议使用多副本存储级别(例如,在服务出故障时需要快速恢复的场景下使用Spark作为Web应用的后台服务)。所有的存储级别都通过重新计算丢失的数据的方式,提供了完全容错机制。但是多副本级别在发生数据丢失时,不需要重新计算对应的数据库,可以让任务继续运行。

(5)在高内存消耗或者多任务的环境下,还处于实验阶段的OFF-HEAP模式有以下几个优势:支持多个executor使用Tachyon中的同一个内存池;显著减少了内存回收的代价:如果个别executor崩溃掉,缓存的数据不会丢失。

5. 移除数据

(九)Spark的共享变量

通常情况下,一个传递给Spark操作(例如map或reduce)的方法是在远程集群上的节点执行。方法在多个节点执行过程中使用的变量,是同一个变量的多个副本。这些变量以副本的方式拷贝到每个机器上,各个远程机器上变量的更新并不会传回驱动程序。为了满足两种常见的使用场景,Spark提供了两种特定类型的共享变量:广播变量( broadcast variables)和累加器( accumulators)。

1.广播变量( broadcast variables )

广播变量允许开发者将一个只读变量缓存到每台机器上,而不是给每个任务传递一个副本。例如,广播变量可以用一种高效的方式给每个节点传递一份比较大的数据集副本。在使用广播变量时,Spark也尝试使使用高效广播算法分发变量, 以降低通信成本。

Spank的action操作是通过一系列的阶段(stage)执行的,这些阶段是通过分布式的shuffle操作进行切分的。Spark自动广播每个阶段内任务需要的公共数据,数据使用序列化的形式进行缓存,并在每个任务运行前进行反序列化。这明确说明,只有跨越多个阶段的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,广播变量会有比较好的效果。

广播变量通过在一个变量v上调用SparkContext. broadcast)方法创建。广播变量是v的一个封装器, 可以通过value方法访问V的值。代码示例如下。

scala> val broadcastVar = sc.broadcast(Array(1,2,3))

broadcastVar: org. apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value

res0: Array[nt] = Array(1,2,3)

广播变量创建之后,在集群上执行的所有函数中,应该使用该广播变量代替原来的v值。所以,每个节点上的v最多分发一次,另外,对象v在厂播后不应该将被修改以保证分发到所有有节点上的广播变量有同样的值 (例如,在分发广 播变量之后,又对广播变量进行了修改,然后还需要将广播变量分发到新的节点)。

2.累加器(accumulators )

累加器只允许进行关联的added操作,因此在并行计算中可以支持特定的计算。累加器可以用于实现计数(类似在MapReduce中那样)或者求和。原生Spark支持数值型的累加器,开发者可以添加新的支持类型。创建累加器并命名之后,在Spark的UI界面上将会显示该累加器,这样可以帮助理解正在运行阶段的情况(注意,在Python中还不支持)。

一个累加器可以在原始值v上调用SparkContext.accumulator(v)方法,然后集群上正在运行的任务就可以使用add方法或+=操作对该累加器进行累加操作。只有驱动程序可以使用value方法读取累加器的值。

下面代码对数组中的元素进行求和。

scala> val accum = sc.accumulator(0,&34;)

accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1,2,3,4).foreach(x => accum +=x)

10/09/29 18:41.08 INFO SparkContext: Tasks finished in 0.317106s

scala> accum.value

res2: Int= 10

上面的代码示例使用的是Spark内置的Int类型的累加器。开发者可以通过继承AccumulatorParam类来创建新的累加器类型。AccumulatorParam接口有两个方法:zero方法和addInPlace方法。Zero方法给数据类型提供了一个0值,addInPlace方法能够将两个值进行累加。例如,有一个表示数学上向量的vector类,我们可以写成如下形式。

object VectorAcumulatorParam extends AccumulatorParam[Vector]{

def zero(initialValue: Vector): Vector= {

Vector.zeros(initialValue.size)

}

def addInPlace(v1: Vector,v2: Vector): Vector= {

v1 += v2

}

}

val vecAccum = sc.accumulator(new Vector(…))(VectorAccumulatorParam)

(十)Spark性能调优

在应用开发中,需要根据具体的算法和应用场景选择调优方法,没有万能的解决方案,每一种调优方法只是适合特定场景。

1、通过配置参数

(1)配置文件spark-env.sh

export JAVA_HOME= /home/jdk1.7.0_67

export SPARK_MASTER_IP= 192.168.0.1 (指定主节点)

export SPARK_WORKER_CORES=3 (配置worker节点的cpu核数)export SPARK_MASTER_PORT= 8888 (spark URL端口,如spark://crxy172:8888)export SPARK_MASTER_WEBUI_PORT=8080 (master webUI地址)

export SPARK_WORKER_WEBUI_PORT=8081 (worker webUI地址)

export SPARK_WORKER_MEMORY=10g (每个worker占多少内存)

(2)在程序中通过Sparkconf对象添加

例如:

val conf = new SparkConf().setMaster(&34;).setAppName(&34;).set(&34;,&34;)

conf.set(&34;,&34;)//后面的值大于512m即可

conf.setMaster(&34;)

val sc = new SparkContext(conf)

(3)在程序中通过System.setProperty添加

如果在代码中添加,则需在SparkContext定义之前修改配置项,例如:

System.setProperty(&34;,&34;)

System.setProperty(&34;,&34;)

val conf = new SparkConf().setAppName(&34;)

val sc = new SparkContext(conf)

高级参数 配置方法:

spark-env.sh 配置(conf目录下)

export SPARK_JAVA_OPTS=&34; (用来设置GC参数)

export SPARK_WORKER_INSTANCES=2 (启动多少个worker,一台从节点机器可以有多个WORKER,一个worker启动一个JVM)

提示:SPARK_WORKER_CORES * SPARK_WORKER_INSTANCES= 每台机器总cores

应用场景:单台服务器的内存超过200G时。原因是单个机器内存配置超过200G时配置,因为java VM在超过200G的服务器上性能不好。

export SPARK_PID_DIR=/usr/local/spark/tmp (worker.pid或者master.pid默认位置在/tmp 文件夹)

说明:RHEL6系统默认会自动清理/tmp文件夹,默认是30天。

export spark.local.dir=/opt1/spark,/opt2/spark,/opt3/spark (各个磁盘挂载到不同opt目录下面)

说明:这是spark写shuffle输出的地⽅,增加IO输出到不同的磁盘等于加快了执行速度。

2、常见调优方法

(1)小分区合并问题

val rdd2=rdd1.filter(x=>x.contains(&34;)).filter(x=>x.contains(info)).collect

解决方式:采取RDD中重分区的函数进行数据紧缩,减少分区数,将小分区合并成大分区。

通过coalese函数来减少分区,该函数定义如下:

def coalesce(numPartitions: Int,shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

(implicit ord: Ordering[T] = null)

: RDD[T] = withScope {

require(numPartitions > 0,s&34;)

if (shuffle) {

** Distributes elements evenly across output partitions,starting from a random partition. *

val distributePartition = (index: Int,items: Iterator[T]) => {

var position = (new Random(index)).nextInt(numPartitions)

items.map { t =>

/ Note that the hash code of the key will just be the key itself. The HashPartitioner

/ will mod it with the number of total partitions.

position = position + 1

(position,t)

}

} : Iterator[(Int,T)]

/ include a shuffle step so that our upstream tasks are still distributed

new CoalescedRDD(

new ShuffledRDD[Int,T,T](mapPartitionsWithIndex(distributePartition)。

new HashPartitioner(numPartitions))。

numPartitions。

partitionCoalescer).values

} else {

new CoalescedRDD(this,numPartitions,partitionCoalescer)

}

}

这个函数会返回含有numPartitions数量个分区的新RDD,即将整个RDD重分区。当分区由10000重分区到100个时候,由于前后两个阶段的分区是窄依赖的,所以不会产生shuffle操作。但如果分区量急剧减少,如极端情况从10000重分区为一个分区时,就会造成一个问题:数据分布到一个节点进行计算,完全无法开掘集群并行计算的能力,为了规避这个问题,设置shuffle=true,见上述代码逻辑分支。由于shuffle可以分割Stage,这就保证了上一阶段stage中的上游任务仍是10000个分区并行计算。如果不加shuffle,则上下游的任务合并为一个stage计算,这个stage便会在一个分区状况下进行并行计算。

同时还有个需求,即当前的每个分区数据量过大,需要将分区数增加,以利用并行计算能力,这就需要把shuffle设置为true,然后执行coalesce函数,将分区数增大,在这个过程中,默认使用Hash分区器将数据进行重分区repartition:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {

coalesce(numPartitions,shuffle = true)

}

repartition方法会返回numPartitions个分区的新RDD。repartition本质上就是调用coalesce方法。因此如果用户不想进行shuffle,就需用coalesce配置分区,为了方便起见可直接调用repartition。

(2)倾斜问题

倾斜(skew)问题是分布式大数据计算中的重要问题,很多优化工作都围绕该问题展开。倾斜有任务倾斜和数据倾斜2种情况,数据倾斜导致的结果为任务倾斜,在个别分区上任务执行时间过长。当少量任务处理的数据量和其它任务差异过大时,任务进度长时间维持在99%(或者100%),此时任务监控页面有少量(1个或者几个)reduce子任务未完成。单一reduce的记录数与平均记录数差异过大,最长时远大于平均时长,常可能达到3倍甚至更多。

产生数据倾斜的原因大致有:

【1】key的数据分布不均,一般是分区key取得不好或者分区函数设计的不好

【2】业务数据本身就会产生数据倾斜

【3】结构化数据表设计问题

【4】某些sql语句会产生数据倾斜

产生任务倾斜的原因较为隐蔽,一般就是那台机器的正在执行的Executor执行时间过长,以为服务器架构、或者JVM、也可能是来自线程池的问题等等。

解决方式:可以考虑在其它并行处理方式中间加入聚集运算以减少倾斜数据量。

数据倾斜一般可以通过业务上将极不均匀的数据剔除解决。其实还有Skew Join的一种处理方式,将数据分两个阶段处理,倾斜的key数据作为数据源处理,剩下的key的数据再做同样的处理。二者分开做同样的处理。

产生原因可能是数据倾斜,也可能是执行任务的机器架构、OS、JVM各节点配置不同或其他原因。

解决方式:设置spark. Speculation=true把那些执行时间过长的节点去掉,重新调度分配任务,这个方式和Haoop的MapReduce的speculation是相通的。同时可以配置多长时间来推测执行,spark.speculation. interval用来设置执行间隔进行配置。在源代码中默认配置是100,如下:

val SPECULATION_INTERVAL_MS = conf.getTimeAsMs(&34;,&34;)

解决方案:

【1】增大任务数,减少每个分区数据量:增大任务数,也就是扩大分区量,同时减少单个分区的数据量

【2】对特殊key处理:空值映射为特定key,然后分发到不同节点,对空值不作处理

【3】广播:小数据量表直接广播,数据量大的表可以考虑切分为多个小表,多阶段进行Map Side Join

【4】聚集操作可以Map端聚集部分结果,然后Reduce端合并,减少reduce压力

【5】拆分RDD:将倾斜数据与原数据分离,分两个Job进行计算

(3)并行度

在分布式计算的环境下,如果不能正确配置并行度,就不能充分利用集群的并行计算能力。Spark会根据文件的大小,默认配置map阶段的任务数也就是分区数,也可以通过sparkContext.textFile等方法进行配置。Reduce阶段的任务数配置方式例如:

【1】在有关函数中进行配置,例如reduceByKey的第二个参数numPartitions

def reduceByKey(func: (V,V) => V,numPartitions: Int): RDD[(K,V)] = self.withScope {

reduceByKey(new HashPartitioner(numPartitions),func)

}

【2】配置spark.default.parallelism来配置默认任务数,spark官方推荐每个CPU core分配2到3个任务,即core num *2(或3)数量的并行度。如果并行度太高就会产生大量任务启动和切换的开销,如果太低就无法发挥集群并行计算能力,任务执行慢,同时可能会造成内存combine数据过多占用内存,而出现内存溢出异常。

分区器函数决定分区数量和分区方式,因为spark的任务数量由分区个数决定,一个分区对应一个任务,代码如下:

def defaultPartitioner(rdd: RDD[_],others: RDD[_]*): Partitioner = {

val rdds = (Seq(rdd) ++ others)

val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))

if (hasPartitioner.nonEmpty) {

hasPartitioner.maxBy(_.partitions.length).partitioner.get

} else {

if (rdd.context.conf.contains(&34;)) {

new HashPartitioner(rdd.context.defaultParallelism)

} else {

new HashPartitioner(rdds.map(_.partitions.length).max)

}

}

}

reduceByKey函数也可以通过默认分区器设置分区数量

(4)DAG调度执行优化

【1】同一个stage尽量容纳更多算子,以减少shuffle的发生。由于stage中的算子是按照流水线方式进行的,所以更多的Transformation放在一起执行能减少shuffle的开销和任务启动和切换的开销

【2】复用已经缓存过的数据,可以使用cache和persist函数将数据缓存在内存

(5)Java虚拟机、内存方面

JVM调优会专门有一个章节讲解。这里大致说下主要方法:将对象序列化存储减少对象存储空间;使用监控工具监控堆内存等内存空间。

上一篇 2022年12月16 13:40
下一篇 2022年12月07 21:39

相关推荐

关注微信