学习记录1-10

发布时间 2024-01-10 22:11:03作者: 南北啊
今天学习了spark,

环境介绍:

2 spark简介

简单地说,spark扩展了MapReduce计算模型,数据在内存中并行式计算。

3 安装spark

①验证java是否安装:java -version,已安装为java1.8.0。

②验证Scala是否安装:scala -version。

如果未安装scala,scala的安装步骤:

  • 1)下载scala,下载网址:,本次选择了scala-2.13.1.tgz文件。
  • 2)执行命令tar -zxvf scala-2.13.1.tgz。
  • 3)设置环境变量:切换到root账户,在/etc/profile文件中配置export SCALA_HOME=/home/grid/scala和export PATH=$PATH:$SCALA_HOME/bin,然后source /etc/profile,gird账户也需要source /etc/profile。
  • 4)scala -version验证是否安装成功。

③下载和安装spark:

  • 1)本次下载的是spark-3.0.0-preview-bin-hadoop2.7.tgz。
  • 2)解压文件,tar -zxvf spark-3.0.0-preview-bin-hadoop2.7.tgz。
  • 3)输入spark-shell进入到spark,python使用pyspark进入。

4 RDD弹性分布式数据集

4.1 RDD基本概念

RDD,resilient distributed dataset,弹性分布式数据集。spark的RDD是不可变的、分布式的数据集合。

  1. RDD会被划分为多个分区,运行在集群的不同节点。
  2. RDD的数据类型可以是java、scala、python的数据类型,也可以是用户自定义的。
  3. 定义RDD时,spark会惰性计算这些值。只有spark开始转化操作时,了解到完整的数据转化链,才会去计算,计算真正需求的数据。
  4. 每当我们调用一个新的行动操作时,整个 RDD 都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化。
  5. 如果不重用 RDD,就没有必要浪费存储空间,再加上“在定义RDD时spark惰性计算的”,也就是说数据并没有加载,需要时直接重新计算,这就是为什么称之为“弹性”。

4.2 创建RDD

两种方式:

读取外部数据集:textFile函数可以加载一个外部数据集。

使用已存在的数据集:把已有集合传递给SparkContext.parallelize函数。

创建RDD示例:(使用Python语言)

>>> lines = sc.textFile("file:///home/grid/spark3/README.md")
>>> lines.take(5)
[u'# Apache Spark', u'', u'Spark is a unified...']
>>> 
>>> dozen = sc.parallelize(range(1,13))
>>> dozen.take(12)
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

4.3 RDD的两种操作

RDD的两种操作是:

转化操作:返回一个新的RDD的操作。

行动操作:向程序返回结果或把结果写入外部系统的操作,会触发实际的计算。

转化操作和行动操作的示例:使用filter函数过滤“python”、“java”相关的行,使用union函数合并这些行,使用take函数触发计算。

>>> lines = sc.textFile("file:///home/grid/spark3/README.md")
>>> java_lines = lines.filter(lambda x: 'Java' in x)
>>> java_lines.first()
u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'
>>> union_lines = python_lines.union(java_lines) 
>>> union_lines.take(10)
[u'high-level APIs ', u'## Interactive Python Shell',...]

注:省略部分输出结果。

4.4 向spark传递函数

可以传递lambda表达式、局部函数。

注意:传递类的函数时,不要直接写,要复制给一个变量,因为spark会把整个对象发到工作节点。

传递函数示例:

>>> def filter_scala_lines(s):
...     return 'Scala' in s
... 
>>> scala_lines = lines.filter(filter_scala_lines)
>>> scala_lines.collect()
[u'high-level APIs in Scala, ', u'## Interactive Scala Shell',...]

4.5 常见的转化操作和行动操作

常见的RDD转化操作:

  • map():将函数应用于 RDD 中的每个元素,将返回值构成新的 RDD
  • flatMap():将函数应用于 RDD 中的每个元素,将返回的迭代器的所有内容构成新的 RDD。通常用来切分单词
  • filter():返回一个由通过传给 filter()的函数的元素组成的 RDD
  • distinct():去重
  • sample(withReplacement, fraction, [seed]):对 RDD 采样,以及是否替换
  • union():生成一个包含两个 RDD 中所有元素的 RDD
  • intersection():求两个 RDD 共同的元素的 RDD
  • subtract():移除一个 RDD 中的内容(例如移除训练数据)
  • cartesian():与另一个 RDD 的笛卡儿积

常见的RDD行动操作:

  • collect() 返回 RDD 中的所有元素
  • count() RDD 中的元素个数
  • countByValue() 各元素在 RDD 中出现的次数
  • take(num) 从 RDD 中返回 num 个元素
  • top(num) 从 RDD 中返回最前面的 num个元素
  • takeOrdered(num)(ordering)从 RDD 中按照提供的顺序返回最前面的 num 个元素
  • takeSample(withReplacement, num, [seed])从 RDD 中返回任意一些元素
  • reduce(func) 并 行 整 合 RDD 中 所 有 数 据(例如 sum)
  • fold(zero)(func) 和 reduce() 一 样, 但 是 需 要提供初始值
  • aggregate(zeroValue)(seqOp, combOp)和 reduce() 相 似, 但 是 通 常返回不同类型的函数
  • foreach(func) 对 RDD 中的每个元素使用给定的函数

4.6 持久化

对 RDD 执行行动操作,每次都会重新计算RDD,这个时候可以使用persist函数对数据进行持久化。

出于不同的目的,我们可以为 RDD 选择不同的持久化级别:MEMORY_ONLY、DISK_ONLY等。