曾經(jīng)在一次面試中被問到 Spark WordCount 產(chǎn)生多少個(gè) RDD,您知道么?下面通過源碼來說明經(jīng)典得 WordCount 到底產(chǎn)生多少個(gè) RDD。
import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCountApp").setMaster("local[2]") val sc = new SparkContext(conf) val wc = sc.textFile("hdfs://hadoop001:9000/data/wordcount.txt") .flatMap(x=>(x.split(","))).map(x=>(x,1)).reduceByKey(_+_) .saveAsTextFile("hdfs://hadoop001:9000/data/output") sc.stop() } }
textFile()
通過下面得源碼,可以看到在這個(gè)方法中先調(diào)用了一個(gè) hadoopFile 方法再調(diào)用 map 方法
hadoopFile 方法返回得是個(gè) RDD(HadoopRDD),在對(duì)這個(gè)RDD調(diào)用map方法,
點(diǎn)到map方法中可以看到 ,map方法中產(chǎn)生了一個(gè)MapPartitionsRDD
也就是說 textFile 產(chǎn)生 2個(gè) RDD分別是 HadoopRDD 和 MapPartitionsRDD
flatMap ()flatMap 產(chǎn)生了一個(gè) RDD,MapPartitionsRDD
map()map 產(chǎn)生了一個(gè) RDD,MapPartitionsRDD
reduceByKey()這里要注意啦,reduceByKey 雖然是一個(gè) rdd 調(diào)用得,但 reduceByKey 這個(gè)方法不是 RDD 中得方法,我們可以在 RDD 中找到如下得一個(gè)隱式轉(zhuǎn)換,當(dāng)我們?nèi)フ{(diào)用reduceByKey 方法時(shí),會(huì)發(fā)生隱式轉(zhuǎn)換,隱式得 RDD 轉(zhuǎn)化成了PairRDDFunctions這個(gè)類,reduceByKey 是 PairRDDFunctions 得方法
reduceByKey 產(chǎn)生了一個(gè)RDD,ShuffledRDD
saveAsTextFile()其實(shí),在執(zhí)行saveAsTextFile之前,我們可以通過RDD提供得toDebugString看到這些個(gè)算子在調(diào)用得時(shí)候到底產(chǎn)生了多少個(gè)RDD
scala> val rdd = sc.textFile("file:///home/hadoop/data/wordcount.txt").flatMap(_.split(",")).map((_,1)).reduceByKey(_+_) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:24 scala> rdd.toDebugString res1: String = (2) ShuffledRDD[9] at reduceByKey at <console>:24 [] +-(2) MapPartitionsRDD[8] at map at <console>:24 [] | MapPartitionsRDD[7] at flatMap at <console>:24 [] | file:///home/hadoop/data/wordcount.txt MapPartitionsRDD[6] at textFile at <console>:24 [] | file:///home/hadoop/data/wordcount.txt HadoopRDD[5] at textFile at <console>:24 []
總結(jié)
我們可以看見在 Spark 得一個(gè)標(biāo)準(zhǔn)得 WordCount 中一共會(huì)產(chǎn)生 6 個(gè) RDD,textFile() 會(huì)產(chǎn)生一個(gè) HadoopRDD 和一個(gè) MapPerPartitionRDD,flatMap() 方法會(huì)產(chǎn)生一個(gè) MapPartitionsRDD,map() 方法會(huì)產(chǎn)生一個(gè) MapPartitionsRDD ,reduceByKey() 方法會(huì)產(chǎn)生一個(gè) ShuffledRDD,saveAsTextFile 會(huì)產(chǎn)生一個(gè) MapPartitionsRDD,所以一共會(huì)產(chǎn)生 6 個(gè) RDD。
如果感覺上面得文章對(duì)各位有幫助,歡迎各位大佬我個(gè)人,感謝。