sjw_blog


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

换钱的方法数

发表于 2018-05-24 | 分类于 面试

给定数组arr,arr中所有的值都为正数且不重复。每个值代表一种面值的货币,没中面值的货币可以使用人一丈,再给定一个正数aim代表要找的钱数,求换钱有多少种方法。
解法一:暴力递归

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public int coins1(int[] arr , int aim){
if (arr == null || arr.length == 0 || aim < 0){
return -1;
}
return process1(arr,0,aim);
}

public int process1(int[] arr, index, int aim){
res = 0;
if (index == arr.length){
res = aim==0? 1:0;
}
else{
for (int i = 0; arr[index] *i <=aim; i++){
res += process1(arr,index+1,aim-arr[index]*i);
}
}
return res;
}

解法二:记忆搜索。使用map记录已经计算过的值,当使用0张5元和1张10元情况和使用2张5元和0张10元的情况是一样的。所以记录index和aim-arr[index]*i值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public int cosins1(int[] arr, int aim){
if(arr == null || arr.length == 0 || aim < 0){
return -1;
}
int[][] map = new int[arr.lenth+1][aim+1];
return process1(arr,index,aim,map);
}

public int process1(int[] arr, int index, int aim, int[][] map){
int res = 0;
if (index == arr.length){
res = aim ==0 ?1:0;
}
else{
int maxvalue = 0;
for (int i=0; arr[index] *i<=aim; i++){
mapvalue = int[index+1][aim-arr[index]*i];
if (mapvalue !=0){
res += mavalue ==-1?0:mapvalue;
}else{
res += process1(arr,index+1,aim,map);
}
}
}
map[index][aim] = res == 0? -1:res;
return res;
}

解法三:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public int coins3(int[] arr, int aim){
if(arr == null || arr.length == 0 || aim < 0){
return -1;
}

int[][] dp = new int[arr.length][aim+1];
for(int i=0; i< arr.length; i++){
dp[i][0] = 1;
}

for (int j = 1; arr[0] * j<=aim; j++){
dp[0][j] = 1;
}

int num = 0;

for(int i = 1; i< arr.length; i++){
for(int j=1; j <=aim; j++){
num = 0;
for(int k = 0; j-arr[i] * k>=0; k++){
num += dp[i-1][j-arr[i] * k];
}
dp[i][j] = num;
}
}
return dp[arr.length][aim];
}

解法三:动态规划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public int coins3(int[] arr, int aim){
if(arr == null || arr.length == 0 || aim < 0){
return -1;
}

int[][] dp = new int[arr.length][aim+1];
for (int i =0; i< arr.length; i++){
dp[i][0] = 1;
}

for (int j = 1; j *arr[0] <= aim; j++){
dp[0][j*arr[0]] = 1;
}

for (int i = 1; i< arr.length; i++){
for (int j = 1; j<= aim; j++){
dp[i][j] = dp[i-1][j];
dp[i][j] +=j-arr[i] >= 0 ? dp[i][j-arr[i]]:0;
}
}

}

解法四:动态规划+空间压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public int coins3(int[] arr, int aim){
if(arr == null || arr.length == 0 || aim < 0){
return -1;
}

int[] dp = new int[aim+1];


for (int j = 1; j *arr[0] <= aim; j++){
dp[j*arr[0]] = 1;
}

for (int i = 1; i< arr.length; i++){
for (int j = 1; j<= aim; j++){
dp[j] +=j-arr[i] >= 0 ? dp[j-arr[i]]:0;
}
}

}

通过本题目的优化过程,可以梳理出暴力递归通用的优化过程。一般先想到暴力递归的过程,在通过查看函数中有哪些参数是不发生变化的,忽略不变化的,只看变化的,并且可以表示递归过程的参数,找到这些参数之后,记忆搜索的方法就可以写出来,将计算完成的记录保存到map中,并在下次直接拿来使用。之后看每个位置是通过哪个位置求出来的,先求出被依赖的位置,就能写出动态规划,如果依赖的位置

换钱最少货币数

发表于 2018-05-24 | 分类于 面试

给定数组arr,arr中所有的值都是正数且不重复。每个值代表一种面值的货币,每种面值的货币可以使用任意张,再给定一个整数aim代表要找的钱数,求组成aim的最少货币数。

解法一:
时间空间复杂度都为O(N*aim)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public int mincoinsl(int[] arr, int aim){
if (arr == null || arr.length == 0 || aim < 0){
return -1;
}
int n = arr.length;
int max = Integer.MAX_VALUE;
int[][] dp = new int[n][aim+1];
for (int j = 1; j < aim +1; j ++){
dp[0][j] = max;
if (j - arr[0] >= 0 && dp[0][j-arr[0]] != max){
dp[0][j] = dp[0][j-arr[0]] + 1;
}
}

int left = 0;
for (int i = 1; i < n; i++){
for (int j = 1; j < aim +1; j++){
left = max;
if (j - arr[i] >=0 && dp[i][j-arr[i]] != max) {
left = dp[i][j-arr[i]] +1;
}
dp[i][j] = Math.min(left,dp[i-1][j]);
}
}

return dp[n-1][aim] != max ? dp[n-1][aim]:-1;
}

解法二:
使用空间压缩,只使用一维数组保存之前的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public int mincoinsl(int[] arr, int aim){
if (arr == null || arr.length== 0|| aim <0){
return -1;
}
int n = arr.length;
max = Integer.MAX_VALUE;
int[] dp = new int[aim+1];

for (int j = 1; j < aim+1; j++){
dp[j] = max;
if (j-arr[0]>=0 && dp[j-arr[0]] !=max){
dp[j] = dp[j-arr[0]] + 1;
}
}

int left = 0;
for (int i = 1; i < n; i ++){
for (int j = 1; j < aim+1; j++){
if (j- arr[i]>=0&& dp[j-arr[i]] !=max)
left = dp[j-arr[i]] +1;
dp[j]= Math.min(left,dp[j]);
}
}
return dp[aim] != max ? dp[aim]:-1;
}

Spark MLlib

发表于 2018-03-29 | 分类于 spark

Spark MLlib介绍

MLlib是Spark机器学习库,它的目标是让机器学习更加容易和可伸缩星。MLlib是MLBase的一部分,其中MLBase分为4个部分,分别是MLlib、MLI,ML Optimizer和MLRuntime。

  • ML Optimizer会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或其他的帮助分析的结果。
  • MLI 是一个进行特征抽取和高级ML编程抽象的算法实现的API或平台
  • MLlib是Spark实现一些常见的机器学习算法和使用程序。
  • MLRuntime基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。

MLlib提供了常用机器学习算法的实现,包括分类、回归,聚集、系统过滤和降维等。使用MLlib来做机器学习。

MLlib主要包含两部分。

  • 底层基础:主要包括Spark的运行库、矩阵库和想良苦,其中向量接口和矩阵接口基于Netlib和BLAS/LAPACK开发的先写袋鼠库Breeze。MLlib支持本地的密集向量和稀疏向量,并且支持标量向量;它同时支持本地矩阵和分布式矩阵,支持的分布式矩阵分为RowMatrix、IndexedRowMatrix和CoordinateMatrix等。
  • 算法库:包含分类、回归、聚集、协同过滤、梯度下降和特征提取和变换的呢该算法。

spark1

Spark MLlib数据类型

  1. 本地变量

    本地向量是由整型索引和双精度浮点型数值组成的数据结构,本地向量的使用方便了MLlib对数据的描述和操作。其中本地向量分为密集向量和稀疏向量两种类型,密集向量通过一个浮点数组来表示,而稀疏向量通过索引和值两个并列的数据来表示,比如,向量(1.0,0.0,3.0)可以以密集格式[1.0,0.0,3.0]或者稀疏格式(3,[0,2],[1.0,3.0])表示,后者第一个3表示向量长度,即列数。

  2. 标签点

    标签点由一个本地向量(密集或稀疏)和一个类标签组成。其中使用双精度浮点型来存储一个类标签,在二元分类中标签或为0(负向)或为1(正向),而在多元分类中标签应该是从0开始的索引,如0,1,2……

  3. 本地矩阵

    MLlib中的矩阵是向量型的RDD,分为本地矩阵和分布式矩阵两种。本地矩阵有整型行列索引数据和对应的双精度浮点型值数据组成并存储。MLlib支持密集矩阵,实体值以列优先的方式存储在一个双精度浮点数组中。本地矩阵提供了密集矩阵和稀疏矩阵两种实现方法。

  4. 分布式矩阵

    分布式矩阵有长整型行列索引和双精度浮点型数据组成,分布式存储在一个或多个RDD中。MLlib实现了三类分布式矩阵存储格式,分别为行矩阵、行索引矩阵、三元组矩阵和分块矩阵四种。

    • 行矩阵:行矩阵是一个面向行的分布式矩阵,通过一个RDD来代表所有的行,每一行就是一个本地向量。行矩阵直接通过RDD[Vector]来定义并可以用来统计平均数、方差、协同方差等。
    • 行索引矩阵:行索引矩阵和行矩阵类似,但其行索引具有特定含义,本质上是一个含有索引信息的行数据集合,每一行有长整型索引和一个本地向量组成。行索引可从一个RDD[IndexedRow]实例创建,IndexedRow是(Long,Vector)的封装类,剔除行索引矩阵中的行索引信息就变成一个行矩阵。
    • 三元组矩阵:三元组矩阵是一个分布式矩阵,其实体集合是一个RDD,每一个实体是一个(i:Long,j:Long,value:Double)三元组,其中i代表行索引,j代表列索引,value代表实体的值。三元组矩阵常用于稀疏星比较高的计算中,是由RDD[MatrixEntry]来构建的。MatrixEntry是一个Tuple类型的元素,其中包含行列和元素值。
    • 分块矩阵:分块矩阵是支持矩阵分块RDD的分布式矩阵,其中矩阵分块有((Int,Int),Matrix)元组所构成,(Int,Int)表示该分块矩阵所处福矩阵的索引位置,Matrix表示该索引位置上的子矩阵。分块矩阵支持矩阵的加法和乘法,并设有辅助函数验证用于检查分块矩阵是否设置正确。

Spark Streaming

发表于 2018-03-28 | 分类于 spark

Spark Streaming 简介

Spark Steaming 是Spark核心API的一个扩展,具有吞吐量高,容错能力强的实时流数据处理系统,支持包括Kafka、Flume、HFDS/S3、Twitter、ZeroMQ以及TCP Sockets等数据源,获取数据后可以使用Map、Reduce、Join和Window等高级函数进行复杂算法的处理,处理结果存储到文件系统、数据库火战士到仪表盘等,其中Spark Streaming数据处理流程如图所示:

spark1

Spark的各个子组件都是基于Spark核心,Spark Streaming在内部的处理 机制是:先接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,这些批数据在Spark内核对应一个RDD实例,因此,流数据的DStream可以看成一组RDDs,然后调用Spark核心的作业处理这些批数据,最终得到处理后的一批批结果数据。通俗点理解的话,在流数据分成一批批后,通过一个先进先出的队列,然后Spark核心的作业从该队列中一次取出一个个批数据,把批数据封装成一个RDD,然后进行处理,这是一个典型的生产者消费者模型。

术语定义

  1. 离散流(Discretized Stream)或DStream

    DStream 作为Spark Streaming的基础抽象,它代表持续性的数据流。这里数据流既可以通过外部输入源获取,也可以通过现有的DStream转换操作来获得。在内部实现上,DStream有一组时间序列上连续的RDD来表示。
    在DStream中定义了名为generatedRDDs离散数据流,它是以时间为键,RDD为值的哈希列表,在流数据接收过程中,远远不断地把接收到的数据放入到该列表中,而对于不需要的就RDD从该列表中删除。

Spark Streaming特点

  1. 流式处理

    Spark Streaming是讲流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照批处理间隔(如1s)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中检结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。

  2. 高容错

    对于流式计算来说,容错性至关重要。每一个RDD都是一个不可变的分布式可重算的数据集,其记录着确定性的操作“血统”,所以只要输入数据是可容错的,那么任意一个RDD的分区出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。

  3. 低延迟

    Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图分解以及Spark的任务集的调度过程。对于目前版本的Spark Streaming而言,其最小的Batch Size的选取在0.5~2s,所以Spark Streaming能够满足除对实时性要求非常高之外的所有流式准实时计算场景。

  4. 吞吐量高

    Spark目前在EC2上已经能够线性扩展到100个节点,可以以树苗的延迟处理6GB/s的数据量,其吞吐量也比流行的Storm高2~5倍。

Spark Streaming 运行架构

Spark Stream相对其他流处理系统最大的优势在于流处理引擎和数据处理在同一个软件栈,其中Spark Streaming功能主要包括流处理引擎的流数据接收与存储以及批处理作业的生成与管理,而Spark核心负责处理SPark Streaming发送过来的作业。Spark Streaming 分成为Driver端和Client端,运行在Driver端为StreamingContext十里。改十里包括DStreamGraph和JobScheduler等,而Client包括ReciveSupervisor和Reciver等。

如何仅用递归函数和栈操作逆序一个栈

发表于 2018-03-26 | 分类于 leetcode

一个栈一次压入1、2、3、4、5,那么从栈顶到栈底分别为5、4、3、2、1.将这个栈转置后,从栈顶到栈底为1、2、3、4、5,也就是实现栈中元素的逆序,但是只能用递归函数来实现,不能用其他数据结构。

解题思路

本题考查栈的操作和递归函数的设计,需要设计两个递归函数。
函数一:将栈stack的栈底元素返回并移除。

1
2
3
4
5
6
7
8
9
10
public static int getAndRemoveLastElement(Stack<Integer> stack){
int result = stack.pop();
if (stack.isEmpty()){
return result;
}else{
int last = getAndRemoveLastElement(stack);
stack.push(result);
return last;
}
}

fun1

函数二:逆序一个栈。

1
2
3
4
5
6
7
8
public static void reverse(Stack<Integer> stack){
if (stack.isEmpty()){
return;
}
int i = getAndRemoveLastElement(stack);
reverse(stack);
stack.push(i);
}

fun2

Spark SQL

发表于 2018-03-26 | 分类于 spark

Spark SQL 简介

Spark SQL是spark1.0版本中加入的组件,是Spark生态系统中最活跃的组件之一。它能够利用Spark进行结构化数据的存储和操作,结构化数据既可以来自外部结构化数据源(当前支持Hive、JSON和Parquet等操作,Spark1.2版本开始对JDBC/ODBC等的支持),也可以通过向已有RDD增加Schema的方式得到。

DataFrame/Dataset介绍

Spark的RDD API 比传统的MapReduce API 在易用性有了巨大的提升,但是对于没有MapReduce和函数式变成经验的新手,RDD Api还是存在一定的门槛。另一方面,数据分析人员所使用的R和Pandas等传统数据分析工具虽然提供只管的API,却由于这些工具只能处理单机的数据,无法胜任大数据处理任务。为了解决这两个问题,从Spark SQL 1.3版本开始在原有SchemaRDD的基础上提供了与R和Pandas风格类似的DataFrame Api,新的DataFrameAPi降低了学习门槛,还支持了Scala,java和python

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类细雨传统数据库汇总的二维表哥。与RDD的主要区别在于;前者带有Schema源数据,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。由于无法知道RDD数据集内部的结构,SPark作业执行只能在调度阶段层面进行简单通用的优化,而对于DataFrame带有数据集内部的结构,可以根据这些信息进行针对性的优化。

spark1

DataFrame优点

  1. 代码简洁

    DataFrame最大的优点之一就是能够使用更精简的代码。代码量Hadoop MapReduce>Python RDD API>Python DataFrame API。

  2. 提升执行效率

    Python DataFrame API相对于Python RDD API的执行效率有了5倍的提升,这是因为DataFrame API实际上仅仅组黄了一段体积小巧的逻辑查询计划,Python端只需要将查询计划发送到JVM端即可,计算任务的大头都有JVM端负责。使用Python RDD API时,Python VM和JVM之间需要进行大量跨进程数据交换。
    使用Scala DataFrame API的版本也要比Python RDD API快一倍,因为PYthonRDD API是函数试的,强调不变形,在大部分场景下倾向于创建新对象而不是修改老对象,这点岁软可以实现整洁的API,但使得SPark在应用程序运行过程中倾向创建大量临时对象,对GC造成较大压力。而在Spark SQL 中尽可能重用对象,这样虽然在内部打破了不变形,但在数据返回给用户时,还会重新转化为不可变数据,这样用户利用DataFrame API能能够使用到优化效果。

    spark2

  3. 减少数据存取

    分析大数据最有效的方法就是忽略无关的数据,根据查询条件进行适当的裁剪。对一些格式,Spark SQL可以根据数据文件中附带的统计信息来进行剪枝。在这类数据格式中,数据是分段保存的,每段数据都带有最大值、最小值和NULL值数量等统计信息。当统计信息表名某一数据段肯定不包括符合查询条件的目标数据时,改数据段就可以直接跳过(例如某整数列A某段的最大值为100,而查询条件要求a>200)

Spark SQL 运行原理

通用SQL执行原理

在传统关系型数据库中,最基本的SQL查询语句如SELECT fieldA,fieldB,fieldC FROM tableA WHERE fieldA > 10,由Projection(fieldA,fieldB,fieldC)、Data Source(tableA)和Filter(fieldA > 10)三部分组成,分别对应SQL查询过程中的Result、Data Source和Operation。也就是说SQL语句按Result->Data Source->Operation的次序来描述的,
而实际执行SQL语句的过程中是按照Operation->Data Source->Result的顺序来执行。
spark3

具体执行过程如下:

  • 词法和语法解析(Parse):对读入的SQL语句进行词法和语法解析,分辨出SQL语句中哪些词是关键词(如SELECT、FROM和WHERE),哪些是表达式、哪些是Projection、哪些是Data Source等,判断SQL语句是否规范,并形成逻辑计划。
  • 绑定(Bind):将SQL语句和数据库的数据字典(列、表和视图等)进行绑定(Bind),如果相关的Projection和Data Source等都存在的话,则表示这个SQL语句是可以执行的。
  • 优化(Optimize):一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划。
  • 执行(Execute):执行签名的步骤获取的最优执行计划,返回从数据库中查询的数据集。

关系数据库在运行过程中,会在缓冲解析锅的SQL语句,在后续的过程中如果能够命中缓存SQL就可以直接返回可执行的计划,比如重新运行刚运行过的SQL语句,坑你直接从数据库的缓冲池中获取返回结果。

SparkSQL运行架构

Spark SQL先换将SQL语句进行解析(Parse)形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理过程,通过模式匹配对不同类型的节点采用不同的操作。而Spark SQL的查询优化器是Catalyst,他负责处理查询语句的解析、绑定、优化和生成物理计划等过程,Catalyst是Spark SQL最和新部分,其性能优劣将决定整体的性能。

Spark SQL由Core、Catalyst、Hive和Hive-ThriftServer 4个部分组成。

  • Core:负责处理数据的输入/输出,从不同的数据源获取数据(如RDD、Parquet文件和JSON文件等),然后将查询结果输出成Data Frame。
  • Catalyst:负责处理查询语句的整个处理过程

Spark学习笔记(6) —— 容错和HA

发表于 2018-03-26

Spark学习笔记(6) —— 容错和HA

所谓容错是指一个系统的部分出现错误的情况还能够持续地提供服务,不会因为一些细微的错误导致系统性能严重下降或者出现系统瘫痪。在一个集群出现机器故障、网络问题等是常态,尤其集群达到较大规模后,很可能较频繁出现机器故障不能进行提供服务,因此对于分布式集群需要进行容错设计。

Executor异常

Spark支持多种运行模式,这些运行模式中的集群管理器会为任务分配运行资源,在运行资源中启动Executor,由Executor是负责执行任务的运行,最终把任务运行状态发送给Driver。下面以独立运行模式分析Executor出现异常的情况。

Executor异常容错过程图

  1. 首先看Executor的启动过程:在集群中由Master给应用程序分配资源后,然后在Worker中启动ExecutorRunner,而ExecutorRunner根据当前的运行模式启动CoarseGrainedExecutorBackend进程,当改进程会向Driver发送注册Executor信息,如果注册成功,则在其内部启动Executor。Executor由ExecutorRunner进行管理,当Executor出现异常时,由ExecutorRunner捕获该异常并发送ExecutorStateChanged消息给Worker。
  2. Worker接收到ExecutorStateChanged消息时,在Worker的handleExecutorStateChanged方法中,根据Executor状态进行信息更新,同时把Executor状态信息转发给Master。
  3. Master接收到Executor状态变换消息后,如果发现Executor出现异常退出,则调用Master schedule方法,尝试获取可用的Worker节点并启动Executor,而这个Worker很可能不是失败之前运行Executor的Worker节点。该尝试系统会进行10次,如果超过10次,则标记改应用运行失败并移除,集群中也移除该应用。

Worker异常

Spark独立运行模式才有的是Master/Slave的结构,其中Slave是有Worker来担任的,在运行的时候会发送信条给Master,让Master知道Worker的试试状态,另一方面Master也会检测注册的Worker是否超时,因为在集群运行过程中,可能由于机器宕机或者进程被杀死等原因造成Worker进程异常退出。

work异常检测

当Worker初心超时时,Master调用timeOutDeadWorkers方法进行处理,在处理时根据Worker运行的是Executor和Driver处理。

  • 如果是Executor,Master先把改Worker上运行的Executer发送消息ExecutorUpdated给对应的Driver,告知Executor已经丢失,同时把这些Executor从其应用程序运行列表中删除。另外,相关Executor的异常也需要按照之前Executor异常处理。
  • 如果是Driver,则判断是否设置重新启动。如果需要重新启动,则调用Master.schedule方法进行调度,分配合适节点重启Driver;如果不需要重启,则删除该应用程序。

Master异常

Master作为Spark独立运行模式中的核心,如果Master出现异常,则整个集群的运行情况和资源将无法进行管理。Spark在集群运行的时候,Master将启动一个或多个Standby Master,当Master出现异常的时候,Standby Master将根据一定规则确定其中一个接管Master。在独立运行模式中,Spark支持如下几种策略:

  • ZOOKEEPER:集群的元数据持久化到ZooKeeper中,当Master出现异常时,ZooKeeper会通过选举机制选举出新的Master,新的Master接管时需要从ZooKeeper获取持久化信息并根据这些信息回复集群状态。
  • FILESYSTEM:集群的元数据持久化到本地文件系统中,当Master出现异常时,只要在该机器上重新启动Master,启动后新的Master获取持久化信息并根据这些信息回复集群状态。
  • CUSTOM:自定义回复方式。
  • NONE:不持久化集群的元数据,当Master出现异常时,新启动的Master不进行恢复集群状态,而是直接接管集群。

Master异常容错

Spark学习笔记(5) —— 调度算法

发表于 2018-03-25 | 分类于 spark

Spark学习笔记(5) —— 调度算法

应用程序之间

ClusterManager提供了资源的分配和管理,而在独立运行模式中Master提供了资源管理调度功能。在调度过程中,Master先启动等待列表中应用程序的Driver,这些Driver尽可能分散在集群的Worker节点上,然后根据集群的内存和CPU使用情况,对等待的应用程序进行资源分配,在分配算法上根据先来先分配,先分配的应用程序会尽可能地获取满足条件的资源,后分配的应用程序只能在剩余资源中再次筛选。如果没有合适资源的应用程序只能等待,知道其他应用程序释放。该侧率可以认为是有条件的FIFO策略。

作业与调度阶段之间

Spark应用程序提交执行时,会根据RDD依赖关系形成有向无环图(DAG),然后交给DAGScheduler进行划分作业和调度阶段。这些作业之间可以没有任何依赖关系,对于多个作业之间的调度,Spark目前提供了两种调度策略:一种是FIFO模式,这也是目前默认的模式;另一种是FAIR模式,该模式的调度可以通过两个参数的配置来决定Job执行的邮箱模式。

在FAIR算法中,现货区两个调度的饥饿成都,饥饿程度为正在运行的任务是否小于最小任务,如果是,则表示该调度处于饥饿成都。获取饥饿程度后进行如下比较:

  • 如果某个调度处于饥饿状态另外一个非饥饿状态,则先满足处于饥饿状态的调度;
  • 如果两个调度都处于饥饿状态,则比较资源比,先满足资源比小的调度;
  • 如果两个调度都出与非饥饿状态,则比较权重比,先满足权重比小的调度;
  • 以上情况均相同的情况,根据调度的名称排序。

任务之间

数据本地性

数据的计算尽可能在数据所在节点上进行,这样可以减少数据在网络上传输。在Spark中数据本地性优先级从高到底为PROCESS_LOCAL>NONE_LOCAL>NO_PREF>RACK_LOCAL>ANY,即最好的是任务运行的节点内存中存在数据、次好的是同一个Node(同一机器)上的,再次是同机架上,最后是任意位置。其中任务数据本地性通过以上情况来确定:

  • 如果任务处于作业开始的调度阶段内,这些任务对应的RDD分区都有首选运行位置,该位置也是任务运行首选位置,数据本地性为NODE_LOCAL。
  • 如果任务处于非作业开头的调度阶段,可以根据父调度阶段运行的位置得到任务的首选位置,这种情况下,如果Executor处于活动状态,则数据本地性为PROCESS_LOCAL;如果Executor不处于活动状态,但存在父调度阶段运行结果,则数据本地性为NODE_LOCAL。
  • 如果没有首选位置,则数据本地性为NO_PREF。

延迟执行

在任务分配运行节点时,先判断任务最佳运行节点是否空闲,如果该节点没有足够的资源运行该任务,在这种情况下任务会等待一定时间;如果在等待内该节点释放出足够的资源,则任务在该节点运行,如果还是不足会找出次佳的节点进行运行。通过这样的方式进行能地让任务运行在更高级别数据本地性的节点,从而减少磁盘I/O和网络传输。一般来说只对PROCESS_LOCAL和NODE_LOCAL两个数据本地级别进行等待。

Spark任务分配的原则就是让任务运行在数据本地性优先级高的节点上,甚至可以为此等待一定的时间。

Spark学习笔记(4) —— 作业执行原理

发表于 2018-03-25 | 分类于 spark

Spark学习笔记(4) —— 作业执行原理

Spark的作业和任务调度系统是其核心,它能够有效地进行调度是因为对任务的DAG划分和容错,是的它对低层到顶层的各个模块之间的调度和处理都显得游刃有余。

  • 作业(job): RDD中由行动操作所生成的一个或多个调度阶段。
  • 调度阶段(stage): 每个作业会因为RDD之间的依赖关系拆分成多组任务集合,称为调度阶段,也叫任务集(TaskSet)。调度阶段的划分是由DAGScheduler来划分的。调度阶段有Shuffle Map Stage和Result Stage两种
  • 任务(task): 分发到Executor上的工作任务,是Spark实际执行应用的最小单元。
  • DAGScheduler: DAGScheduler是面向调度阶段的任务调度器,负责接收Spark应用提交的作业,根据RDD的依赖关系划分调度阶段,并提交调度阶段给TaskCheduler。
  • TaskScheduler: TaskScheduler是面向任务的调度器,它接收DagScheduler提交过来的调度阶段,然后以把任务分发到Work节点运行,由Worker节点的Executor来运行任务。

Spark的作业调度主要是指基于RDD的一系列操作构成一个作业,然后在Executor中执行。这些操作算子主要分为转换操作和行动操作,对于转换操作的计算是lazy级别的,也就是延迟执行,只有出现了行动操作才出发了作业的提交。在Spark调度中最重要的是DAGScheduler和TaskScheduler两个调度器,其中,DAGSscheduler负责任务的逻辑调度。将作业拆分成不同阶段的具有依赖关系的任务集,而TaskCheduler则负责具体任务的调度执行。

spark1

  1. Spark应用程序进行各种转换操作,通过行动操作触发作业运行。提交之后根据RDD之间的依赖关系构建DAG图,DAG图提交给DAGScheduler进行解析。
  2. DAGScheduler是面向调度阶段的高层次的调度器,DAGScheduler把DAG拆分成相互依赖的调度阶段,拆分调度阶段是以RDD的依赖是否为宽依赖,当遇到宽依赖就划分为新的调度阶段。每个调度阶段包含一个或多个任务,这些任务形成任务集,提交给底层调度器TaskCheduer进行调度运行。另外,DAGScheduler记录哪些RDD被存入磁盘等物化动作,同时要寻求任务的最优化调度,例如数据本地星等;DAGScheduler监控运行调度阶段过程,如果某个调度阶段运行失败,则需要重新提交该调度阶段。
  3. 每个TaskScheduler只为一个SparkContext实例服务,TaskScheduler接收来自DAGScheduler发送过来的任务集,TaskSchduler收到任务集后负责把任务集以任务的形式一个个分发到集群Worker节点的Executor中去运行。如果某个任务运行失败,TaskScheduler要负责重试。另外,如果TaskScheduler发现某个任务一直未运行完,就可能启动同样的任务运行同一个任务,哪一个任务先运行完就用哪个任务的结果。
  4. Worker中的Executor收到TaskScheduler发送过来的任务后,以多线程的方式运行,每一个线程负责一个任务。任务运行结束后要返回给TaskScheduler,不同类型的任务,返回的方式也不同。ShuffleMapTask返回的是一个MapStatus对象,而不是结果本身;ResultTask根据结果大小的不同,返回的方式又可以分为两类。

Spark学习笔记(3) —— 消息通信原理

发表于 2018-03-25 | 分类于 spark

Spark学习笔记(3) —— 消息通信原理

Spark 消息通信架构

Spark定义了通信框架接口,这些接口实现中调用Netty的具体方法。通信框架使用了工厂设计模式实现,这种设计方式实现了对Netty的解耦,能够根据需要引入其他的消息通信工具。

spark1

Spark 启动消息通信

Spark启动过程中主要是进行Master与Worker之间的通信,首先由Worker节点想Master发送注册消息,然后Master处理完毕后,返回注册成功消息或失败消息,如果成功注册,则Worker定时发送心跳消息给Master。

spark2

详细过程:

  1. 当Master启动后,随之启动个Worker,Worker启动是会创建消息通信环境RpcEnv和终端点EndPoint,并向Master发送注册Worker的消息RegisterWorker。

    由于Worker可能需要注册到多个Master中,在Worker的tryRegisterAllMasters方法中创建注册线程池registerMasterThreadPool,把需要申请注册的请求放在该线程池中,然后通过该线程池启动注册线程。在该注册过程中,获取Master终端店引用,截止调用RegisterWithMaster方法,根据master终端店引用的send方法发送注册RegisterWorker消息。

  2. Master收到消息后,需要对Worker发送的消息进行验证,记录。如果注册成功,则发送RegisteredWorker消息给对应的Worker,告诉他已经完成注册,随之进行步骤3,Worker定期发送心跳信息给Master;如果注册失败,则发送registerWorkerFailed消息。

    在Master中,Master接到注册消息后,先判断Master当前状态是否是standby,如果是则忽略注册消息。如果在注册列表中发现该Worker的编号,则返回重复注册的失败消息,如果没有问题,则使用registerWorker方法把该Worker加入到列表中。

Spark 运行时消息通信

spark3

用户提交应用程序时,应用程序的SparkContext会向Master发送应用注册消息。

Master接收到注册应用信息后,Master将该应用放入应用列表中,并返回RegisteredApplication消息给应用程序。

同时Master在Worker列表中选取运行应用的Worker,发送LaunchExecutor消息给Worker,通知Worker启动Executor。

Executor启动成功后,Worker返回ExecutorStateChanged消息给Master,通知Executor容器已经创建完毕。

同时Executor会向SparkContext发送注册成功消息。

当SparkContext的RDD出发行动操作后,将创建RDD的DAG,通过DAGScheduler进行划分Stage,并将Stage转化为TaskSe。

接着有TaskScheduler向注册的Executor发送执行消息,Executor接收到任务消息后启动并运行。

最后当所有任务运行时,有Driver处理结果并回收资源。

1…3456

sujunwei

53 日志
5 分类
11 标签
© 2018 true
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4