博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark中parallelize函数和makeRDD函数的区别
阅读量:7250 次
发布时间:2019-06-29

本文共 2637 字,大约阅读时间需要 8 分钟。

hot3.png

我们知道,在Spark中创建RDD的创建方式大概可以分为三种:

  • 从集合中创建RDD;
  • 从外部存储创建RDD;
  • 从其他RDD创建。

而从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD。我们可以先看看这两个函数的声明:

def parallelize[T: ClassTag](    seq: Seq[T],    numSlices: Int = defaultParallelism):RDD[T]def makeRDD[T: ClassTag](    seq: Seq[T],    numSlices: Int = defaultParallelism): RDD[T]def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]

我们可以从上面看出makeRDD有两种实现,而且第一个makeRDD函数接收的参数和parallelize完全一致。其实第一种makeRDD函数实现是依赖了parallelize函数的实现,来看看Spark中是怎么实现这个makeRDD函数的:

def makeRDD[T: ClassTag](    seq: Seq[T],    numSlices: Int = defaultParallelism): RDD[T] = withScope {    parallelize(seq, numSlices)}

我们可以看出,这个makeRDD函数完全和parallelize函数一致。但是我们得看看第二种makeRDD函数函数实现了,它接收的参数类型是Seq[(T, Seq[String])],Spark文档的说明是

Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item.

原来,这个函数还为数据提供了位置信息,来看看我们怎么使用:

scala> val iteblog1 = sc.parallelize(List(1,2,3))iteblog1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at 
:21scala> val iteblog2 = sc.makeRDD(List(1,2,3))iteblog2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at
:21scala> val seq = List((1, List("iteblog.com", "sparkhost1.com", "sparkhost2.com")),(2, List("iteblog.com", "sparkhost2.com")))seq: List[(Int, List[String])] = List((1,List(iteblog.com, sparkhost1.com,sparkhost2.com)),(2,List(iteblog.com, sparkhost2.com)))scala> val iteblog3 = sc.makeRDD(seq)iteblog3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at
:23scala> iteblog3.preferredLocations(iteblog3.partitions(1))res26: Seq[String] = List(iteblog.com, sparkhost2.com)scala> iteblog3.preferredLocations(iteblog3.partitions(0))res27: Seq[String] = List(iteblog.com, sparkhost1.com, sparkhost2.com)scala> iteblog1.preferredLocations(iteblog1.partitions(0))res28: Seq[String] = List()

我们可以看到,makeRDD函数有两种实现,第一种实现其实完全和parallelize一致;而第二种实现可以为数据提供位置信息,而除此之外的实现和parallelize函数也是一致的,如下:

def parallelize[T: ClassTag](    seq: Seq[T],    numSlices: Int = defaultParallelism): RDD[T] = withScope {    assertNotStopped()    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())}def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {    assertNotStopped()    val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap    new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)}

都是返回ParallelCollectionRDD,而且这个makeRDD的实现不可以自己指定分区的数量,而是固定为seq参数的size大小。

转载于:https://my.oschina.net/u/1859996/blog/698420

你可能感兴趣的文章
Linux命令(15):type命令
查看>>
第一单元作业
查看>>
Azure云端部署Exchange 2016双数据中心—Part6(DAG切换测试)
查看>>
通过ansible部署高可用LNAMMKP架构
查看>>
IBM Aix系统添加硬盘步骤
查看>>
“esxcli software vib” commands to patch an ESXi 5.x/6.x host (2008939)
查看>>
heartbeat管理与虚拟IP介绍
查看>>
Syslog-ng+Rsyslog收集日志:RELP可靠传输,替代UDP、TCP(五)
查看>>
课程第八天内容《基础交换八》补充案例
查看>>
ionic 之 基本布局
查看>>
nginx开启目录浏览
查看>>
32位Linux设置超大Oracle SGA的分析
查看>>
const 的用法总结
查看>>
2017企业网盘年终盘点|机遇与挑战并存,寡头显现
查看>>
将linux用在开发环境中
查看>>
在 Cent OS 6.5 中安装桌面环境
查看>>
liquibase判断mysql表字段是否存在
查看>>
透彻理解VLAN技术
查看>>
linux-Centos 7下bond与vlan技术的结合
查看>>
sqoop2安装配置
查看>>