sjw_blog


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

Spark学习笔记(2) —— spark RDD

发表于 2018-03-23 | 分类于 spark

Spark学习笔记(2) —— Spark RDD

Spark RDD

四类操作

  • 创建操作: 用于RDD创建工作,RDD创建只有两种方法,中是来自于内存集合和外部存储系统,另一种是通过转换操作生成的RDD
  • 转换操作:将RDD通过一定的操作变换成新的RDD,比如HadoopRDD可以使用map操作变换为MappedRDD,RDD的转换操作是惰性操作。
  • 控制操作:进行RDD出九华,可以让RDD按不同的存储侧率保存在磁盘和内存中,比如cache接口默认将RDD缓存在内存中。
  • 行动操作:能够出发Spark运行的操作,李璐,对RDD进行collect就是行动操作。Spark中行动操作。

RDD的实现

  • 作业调度

当对RDD执行转换操作是,调度器会根据RDD的血统来构建有若干调度阶段组成的有向无环图,每个调度阶段包含尽可能多的连续窄依赖转换。调度器按照有向无环图顺序进行计算,并最总得到目标RDD。

  • 解析器集成

Scala

  • 内存管理

Spark 提供了3中持久化RDD的存储策略:未序列化Java对象存在内存中、序列化的数据存于内存中已经存储在磁盘中。第一个选项的性能是最优的,因为可以直接访问在Java虚拟机内存里的RDD对象;在空间有限的情况下,第二种方式可以让用户采用比Java对象更有效的内存组织方式,但代价是降低了性能;第三种策略使用于RDD太大的情形,每次重新计算该RDD会带来额外的资源开销。

对于内存使用LRU回收算法进行管理,当计算得到一个新的RDD分区,但没有足够的空间来存储时,系统会从最近最少使用的RDD中回收一个分区的空间,除非新加的RDD分区是这个RDD对应的分区,在这种情况下则会保留这个最少使用的RDD,避免多次调入调出数据。

  • 检查点支持

尽管血统可以用于错误后RDD的恢复,但是对于很长的血统的RDD来说,这样的恢复耗时较长,所以需要通过检查点操作保存到外部存储中。

  • 多用户管理

RDD模型将计算分解为多个相互独立的细粒度任务,这使得它在多用户集群能够支持多种资源共享算法。

  • RDD依赖关系

RDD中将依赖划分成了两种类型:窄依赖和宽依赖。

窄依赖是指每个父RDD的分区都之多被一个子RDD的分区使用(一对一), 宽依赖是多个子RDD的分区依赖一个父RDD的分区(多对一)

Spark学习笔记(1)

发表于 2018-03-22 | 分类于 spark

Spark学习笔记(1)

Spark 和 Hadoop 速度优势对比

  1. Spark 默认情况下的迭代过程的数据保存到内存中,后续的运行作业利用这些结果进行计算,而Hadoop每次计算结果都直接存储到磁盘中。
  2. 由于较复杂的数据计算任务需要多个步骤才能实现,且步骤之间具有依赖性。Hadoop需要借助Oozie等工具今夕处理。而Spark在执行任务之前,可以将这些步骤依赖关系形成DAG图(有向无环图),任务可以按图索骥,从而优化了计算路径。

Spark的生态圈

  • Spark Core 提供内存计算框架
  • Spark streaminging的实时处理应用
  • Spark SQL的即席查询
  • MLlib的机器学习
  • GraphX的图处理

spark技术堆栈

Spark与MapReduce比较

  1. Spark 把中间数据存储放在内存中,迭代运算效率高,MapReduce则是放在磁盘上,而Spark支持DAG图的分布式并行计算的编程框架

  2. Spark的容错性高,Spark引进了弹性分布式数据集(Resilient DIstributed Dataset,RDD)的概念,他是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即允许基于数据衍生过程)对他们重建。另外在RDD计算时可以通过CheckPoint来实现容错。CheckPoint有两种方式,即CheckPoint Data和Logging The Update。

  3. Spark更加通用。Hadoop只提供了Map和Reduce两种操作,Spark提供的数据集操作类型有很多种,大致分为转换操作和行动操作两大类。转换操作包括Map,Filter,FlatMap,Sample,GroupByKey,ReduceBykey,Union,Join,Cogroup,MapValues,Sort和PartionBy等多种操作类型,行动操作包括Collect,Reduce,Lookup和Save等操作类型。另外,各个处理节点之间的通信模型不再像Hadoop只有Shuffle一种模式,用户可以命名,物化,控制中间结果的存储,分区等。

Spark core

连接Spark各种生态的核心
sparkcore

Spark streaming

Spark streaminging是一个对实时数据流进行高吞吐,高容错的六室处理系统,可以对多种数据源进行类似Map,Reduce和Join等复杂操作,并将结果保存到外部文件系统,数据库。相比其他的处理引擎要么只专注于流处理,要么只负责批处理,而Spark streaming最大的优势是听的处理引擎和RDD编程模型可以同时进行批处理于流处理。

对于传统流处理中一次处理一条记录的方式而言,Spark streaming使用的是将流数据离散化处理(Discretized streamings),通过该吹方式能够进行秒级以下的数据批处理。在Spark streaming处理过程中,Receiver并行接收数据,并将数据缓存至Spark工作节点的内存中。经过延迟优化后,Spark引擎对段任务能够进行批处理,并且可将结果暑促至其他系统中。

sparksteam

使用离散化流数据,Spark streaming将具有:

  1. 动态负载均衡:
    通过将数据划分为小批量,通过这种方式可以实现对资源更细粒度的分配。在传统实时流记录处理系统在输入数据流以键值进行分区处理情况下,如果一个节点计算压力较大超出了符合,会拖慢整个系统的处理速度。而Spark streaming会将作业任务动态平衡的分给各个几点。

  2. 快速故障恢复机制:
    在节点出现故障的情况下,传统流处理系统会在其他的节点上重启失败的连续算子,并可能重新运行先前数据流处理操作获取部分丢失数据。需要等新节点完成故障前的所有计算后整个系统才能够处理其他任务。在Spark中,计算分成多个小任务,保证任何节点运行后能够正确进行合并,在某个节点出现故障的情况,这个节点的任务将均匀的分散到集群中的节点进行计算,比单个节点计算能够更快的恢复数据。

  3. 批处理、流处理与交互式分析的一体化:
    Spark streaming把流式计算分解成一系列小的的批处理作业,也就是一段段离散数据流,每一段数据都转换成Spark中的RDD,然后将Spark streaming中对离散数据流处理操作变成对RDD的批处理操作。

Spark SQL

特点:

  • 引入了新的RDD类型SchemaRDD,可以向传统数据库定义表一样来定义SChemaRDD,SchemaRDD由定义了列数据类型的行对象构成。SchemaRDD既可以从RDD转换过来,也可以从Parquet文件读入,还可以使用HiveQL从Hive中获取。
  • 内嵌了Catalyst查询优化框架,在吧Sql解析成逻辑执行计划之后,利用Catalyst包里一些类和接口,执行了一些简单的执行计划优化,最后变成RDD的计算。
  • 在应用程序中可以混合使用不同来源的数据,如可以将来字HiveQl的数据和来自SQL的数据进行Jion操作。

Spark SQL性能优化:

  • 内存列存储:表数据在内存汇总存储不是采用原生态的JVM对象存储方式,而是采用内存列存储
  • 字节码生成技术:在Catalyst模块的Expressions增加了Codegen模块,使用动态字节码生成技术,对匹配的表达式采用特定的代码动态编译。另外对SQL表达式都做了CG优化。CG优化的实现主要还是依靠Scala运行的反射机制

MLBase/MLlib

MLBase分为4个部分:

  • MLRuntime:是由Spark Core提供的分布式内存计算框架,运行有Optimizer优化过的算法进行数据的计算并输出分析结果。
  • MLlib:是Spark实现一些常见的机器学习算法和使用程序。
  • MLI:s是一个特征抽取和高级ML编程抽象算法实现的API或平台。
  • ML Optimizer:会选择最合适的已经在内部实现好了的机器学习算法和相关参数来处理用户输入的数据。

GraphX

GraphX的核心抽象是Resilient Distributed Property Graph,一种电荷变都带属性的有向多重图。GraphX扩展了SparkRDD的抽象,他有Table和Graph两种视图,但只需要一份物理存储,两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。

Search for a Range

发表于 2018-03-16 | 分类于 Leetcode

[Leetcode] Search for a Range

Given an array of integers sorted in ascending order, find the starting and ending position of a given target value.

Your algorithm’s runtime complexity must be in the order of O(log n).

If the target is not found in the array, return [-1, -1].

For example,
Given [5, 7, 7, 8, 8, 10] and target value 8,
return [3, 4].

解题思路:
该题要求时间复杂度要在O(logn)内,所以使用折半查找法,在查找到对应的位置后,需要在该位置前后去查找相同的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Solution(object):
def searchRange(self, nums, target):
"""
:type nums: List[int]
:type target: int
:rtype: List[int]
"""
if nums == None or len(nums) == 0 or target < nums[0] or target > nums[-1]:
return [-1,-1]

l = 0
r = len(nums) -1
return self.helper(l, r, nums, target)

def helper(self, l, r, nums, target):
if l <= r:
m = (l + r)/2
if nums[m] == target:
i1 = m
i2 = m
while(i1-1 >=l and nums[i1-1] == target):
i1 -=1
while(i2+1 <len(nums) and nums[i2+1] == target):
i2 +=1
return [i1,i2]
elif nums[m] > target:
return self.helper(l, m-1, nums, target)
else:
return self.helper(m+1, r, nums, target)

else:
return [-1,-1]

Search in Rotated Sorted Array

发表于 2018-03-16 | 分类于 Leetcode

[Leetcode] Search in Rotated Sorted Array

Suppose an array sorted in ascending order is rotated at some pivot unknown to you beforehand.

(i.e., 0 1 2 4 5 6 7 might become 4 5 6 7 0 1 2).

You are given a target value to search. If found in the array return its index, otherwise return -1.

You may assume no duplicate exists in the array.

解题思路:
判断目标数字和数组头尾的大小比较,然后根据比较结果,进行遍历,同时添加遍历退出的判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Solution(object):
def search(self, nums, target):
"""
:type nums: List[int]
:type target: int
:rtype: int
"""
if nums == None or len(nums) == 0:
return -1

if nums[0] <= target:
for n in range(len(nums)):
if target < nums[n]:
return -1
if target == nums[n]:
return n
elif nums[-1] >= target:
for n in range(len(nums))[::-1]:
if target > nums[n]:
return -1
if target == nums[n]:
return n
return -1

Divide Two Integers

发表于 2018-03-15 | 分类于 Leetcode

[Leetcode] Divide Two Integers

Divide two integers without using multiplication, division and mod operator.(在不使用乘法,除法和取余操作的情况下实现整数的除法运算。)

If it is overflow, return MAX_INT.

解题思路
最简单的方法就是用被除数一直减去除数,知道差小于除数,则循环次数等于商,最后的差为余数,这个算法时间复杂度为O(n)的。除了这个方法,还可以使用移位运算。

例如 f(32,5)=32/5=20/5+10/5+2/5=4+2+0=6

5左移2位就是20,而左移3位就是40超过了32,因此第一部分商为2*2=4.

对余数12再进行分析,左移2位是20,超过了12,则第二部分的商为2*1=2.

对余数2再进行分析,发现5不需要移位就比2大,因此第二部分商为0.

加和可得,商为6,余数为2

该题还需要注意边界值的处理,除数为0时,结果为int最大值,当被除数为最小int值且除数为-1时,结果为int最大值。

同时符号的处理也需要注意。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Solution(object):
def divide(self, dividend, divisor):
"""
:type dividend: int
:type divisor: int
:rtype: int
"""
if divisor == 0:
return 2147483647;
if dividend == 0:
return 0;
if dividend == -2147483648 and divisor == -1:
return ~dividend
if (dividend < 0 and divisor > 0)or (dividend > 0 and divisor < 0):
sign = -1
else:
sign = 1
dividend = abs(dividend)
divisor = abs(divisor)
n = 0 #商
while(dividend >= divisor):
index = 1
while (dividend > divisor << index):
index +=1
index -=1
dividend -= divisor << index
n += 2**index
return n * sign

Remove Duplicates from Sorted Array

发表于 2018-03-15 | 分类于 Leetcode

[Leetcode] Remove Duplicates from Sorted Array

Given a sorted array, remove the duplicates in-place such that each element appear only once and return the new length.

Do not allocate extra space for another array, you must do this by modifying the input array in-place with O(1) extra memory.

Example:

1
2
3
4
Given nums = [1,1,2],

Your function should return length = 2, with the first two elements of nums being 1 and 2 respectively.
It doesn't matter what you leave beyond the new length.

解题思路
该题除了要求计算出不重复的数字长度外,还要返回一个不重复的数组,且不能增加额外的存储空间。第一个想法是,一边遍历一边把重复的数字从数组中删去。由于for循环中删除数组,index仍然会增加,所以用一个自定义的index(n)去遍历数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Solution(object):
def removeDuplicates(self, nums):
"""
:type nums: List[int]
:rtype: int
"""
if nums == None or len(nums) == 0:
return 0
i = None
l = 0
n = 0
while n < len(nums):
if i == nums[n]:
nums.remove(nums[n])
# continue
else:
i = nums[n]
l +=1
n+=1
return l

这个方法提交之后,运行时间较长,看了大神的代码。他们不使用remove操作,因为这个操作用时较长,使用替换的方法。用两个标记,一个记录当前数组中遍历的位置,一个是新数组的位置,当两个位置的值一样时说明有重复,则当前数组位置+1,如果不一样,则新数组位置的值等于当前数组位置的值,更新新数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Solution(object):
def removeDuplicates(self, nums):
"""
:type nums: List[int]
:rtype: int
"""
tail = 0
if not nums:
return 0
for i in range(1, len(nums)):
if nums[i] != nums[tail]:
tail += 1
nums[tail] = nums[i]
print nums
return tail + 1

Merge k Sorted Lists

发表于 2018-03-14 | 分类于 Leetcode

[Leetcode] Merge k Sorted Lists

Merge k sorted linked lists and return it as one sorted list. Analyze and describe its complexity.

解题思路:
使用分治算法。思路是先分成两个子任务,然后递归求子任务,最后回溯回来。这个题目也是这样,先把k个list分成两半,然后继续划分,直到剩下两个list就合并起来。这个算法的时间复杂度的计算:假设总共有k个list,每个list的最大长度是n,那么运行时间满足递推式T(k) = 2T(k/2)+O(n*k)。根据主定理,可以算出算法的总复杂度是O(nklogk)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Solution(object):
def mergeKLists(self, lists):
"""
:type lists: List[ListNode]
:rtype: ListNode
"""
if lists == None or len(lists) == 0:
return None
return self.helper(lists, 0, len(lists)-1)

def helper(self,lists, l, r):
if l < r:
m = (l+r)/2
return self.merge(self.helper(lists,l,m),self.helper(lists,m+1,r))
return lists[l]

def merge(self,l1,l2):
dump = ListNode(0)
dump.next = l1
temp = dump
while(l1 != None and l2 != None):
if l1.val < l2.val:
l1= l1.next
else:
t = l2.next
temp.next = l2
l2.next = l1
l2 = t
temp = temp.next

if l2 != None:
temp.next = l2
return dump.next

第二种思路是使用堆排序。维护一个大小为k的堆,每次取堆顶的最小元素放到结果中,然后读取该元素的下一个元素放入堆中,重新维护好。因为每个链表是有序的,每次又是去当前k个元素中最小的,所以当所有链表都读完时结束,这个时候所有元素按从小到大放在结果链表中。这个算法每个元素要读取一次,即是k*n次,然后每次读取元素要把新元素插入堆中要logk的复杂度,所以总时间复杂度是O(nklogk)。空间复杂度是堆的大小,即为O(k)。

代码略

hexo博客上传

发表于 2018-03-14 | 分类于 hexo

[hexo] 博客上传简单步骤和命令

1、进入hexo目录

2、生成博客文件

1
hexo new "博客名字"

3、发布博客

1
2
3
4
5
6
7
8
hexo clean  #清除缓存 网页正常情况下可以忽略此条命令

hexo generate #生成静态页面至public目录

# 写好之后可以现在本地预览,确定无误之后再部署到Github上。
hexo server #开启预览访问端口(默认端口4000,'ctrl + c'关闭server)

hexo deploy #将.deploy目录部署到GitHub

hexo 命令简写形式

1
2
3
4
hexo n "博客" == hexo new "博客"
hexo g == hexo generate
hexo s == hexo server
hexo d == hexo deploy

Generate Parentheses

发表于 2018-03-14 | 分类于 Leetcode

[Leetcode] Generate Parentheses

Given n pairs of parentheses, write a function to generate all combinations of well-formed parentheses.

For example:
given n = 3, a solution set is:

“((()))”, “(()())”, “(())()”, “()(())”, “()()()”

解题思路:
这道题给定一个数字n,让生成共有n个括号的所有正确的形式,对于这种列出所有结果的题首先还是考虑用递归Recursion来解。由于字符串只有左括号和右括号两种字符,而且最终结果必定是左括号n个,右括号n个,所以我们定义两个变量left和right分别表示剩余左右括号的个数,如果在某次递归时,左括号的个数大于右括号的个数,说明此时生成的字符串中右括号的个数大于左括号的个数,即会出现’)(‘这样的非法串,所以这种情况直接返回,不继续处理。如果left和right都为0,则说明此时生成的字符串已有n个左括号和n个右括号,且字符串合法,则存入结果中后返回。如果以上两种情况都不满足,若此时left大于0,则调用递归函数,注意参数的更新,若right大于0,则调用递归函数,同样要更新参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Solution(object):
def generateParenthesis(self, n):
"""
:type n: int
:rtype: List[str]
"""
res = []
res = helper(n,n,'',res)
return res


def helper(left,right,out,res):
if left < 0 or right < 0 or left > right:
return res
if left == 0 and right == 0:
res.append(out)
return res
if left > 0:
helper(left -1, right, out + '(', res)
if right > 0:
helper(left, right -1, out + ')', res)
return res

别人用循环完成的代码:

1
2
3
4
5
6
7
8
9
class Solution(object):
def generateParenthesis(self, N):
if N == 0: return ['']
ans = []
for c in xrange(N):
for left in self.generateParenthesis(c):
for right in self.generateParenthesis(N-1-c):
ans.append('({}){}'.format(left, right))
return ans

Remove Nth Node From End of List

发表于 2018-02-05 | 分类于 Leetcode

[Leetcode] Remove Nth Node From End of List

Given a linked list, remove the nth node from the end of list and return its head.

For example:

1
2
3
Given linked list: 1->2->3->4->5, and n = 2.

After removing the second node from the end, the linked list becomes 1->2->3->5.

Note:
Given n will always be valid.
Try to do this in one pass.

解题思路:
用双指针,两个指针距离是n,当第一个指针到达末尾时,第二个指针就是到要删除的节点前面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Solution(object):
def removeNthFromEnd(self, head, n):
"""
:type head: ListNode
:type n: int
:rtype: ListNode
"""
p1=p2=head

for i in range(n):
p1=p1.next

if not p1:
return head.next
while p1.next:
p1=p1.next
p2=p2.next

p2.next=p2.next.next

return head
1…456

sujunwei

53 日志
5 分类
11 标签
© 2018 true
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4