首先這個Scala spark程序和spark的鏈接,跟sql編程類似。首先new 一個新的val context = SparkContext()
對象,然后還要用到val conf = SparkConf.setMaster("local").setAppName("WordCount")
這個是配置信息,比如這個是本地連接所以里面是local,然后后面那個是程序的名字,這個寫完之后,吧這個conf對象放在SparkContext(conf)
這里面。然后在程序的最后,用完了要關閉連接,context.stop()
,使用stop方法關閉
先在D盤,把要測試的文件數(shù)據(jù)準備好
思路:首先連接之后,第一步是讀取文件,使用textFile()
方法,里面的參數(shù)是要讀取的文件的路徑,然后把文件一行一行的讀取出來。第二步是使用flatMap(_.split(" "))
方法,進行map映射和扁平化,把單詞按照空格分割開。第三步是groupBy(word =>word)
按照單詞進行分組,一樣的單詞分到一組。第四步map()映射進行模式匹配,取去key和他的集合的size也就是單詞出現(xiàn)的次數(shù)。然后使用collect()
方法將結果采集打印,最后使用foreach(println)
進行遍歷。
package com.atguigu.bigdata.spark.core.wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
class spark01_WordCount {}
object spark01_WordCount{def main(args: Array[String]): Unit = {// Application 我們自己寫的應用程序
// Spark 框架
//用我們的應用程序去連接spark 就跟那個sql 編程一樣
//TODD建立和Spark 框架的連接
//1、Java里面是Conntection 進行連接
//2、Scala 里有個類似的,SparkContext()
//2.1 SparkConf()配置不然不曉得連的哪個. setMaster() 里面是本地連接,setAppName() 里面是app的名稱
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val context = new SparkContext(sparkConf)
println(context)
//TODD 執(zhí)行業(yè)務操作
//1、讀取文件,獲取一行一行的數(shù)據(jù) 這一步是扁平化
//hello word
val value = context.textFile("D:\\wc.txt") //textFile 可以吧文件一行一行的讀出來
//2、將數(shù)據(jù)進行拆分,形成一個一個的單詞
//扁平化:將整體拆分為個體的操作
//"hello word" =>hello,word
val danci: RDD[String] = value.flatMap(a =>a.split(" ")) //根據(jù)空格進行拆分
//3、將數(shù)據(jù)根據(jù)單詞進行分組,便于統(tǒng)計
//(hello,hello,hello,hello,hello),(word,word,word) 這個樣子的
//按照單詞進行分組
val wordGroup = danci.groupBy(word =>word) //按照單詞進行分組
//4、對分組數(shù)的數(shù)據(jù)進行轉(zhuǎn)換
//(hello,hello,hello,hello,hello),(word,word,word)
//(hello,5),(word,3)
val wordToCount = wordGroup.map{//模式匹配
case (word,list) =>{(word,list.size) //匹配,第一個是單詞。第二個是長度,這個長度就是單詞出現(xiàn)的次數(shù)
}
}
//5、將轉(zhuǎn)換結果采集到控制臺打印出來
val tuples = wordToCount.collect() //collect()方法,將結果采集打印
tuples.foreach(println)
//TODD 關閉連接
context.stop() //這樣就關閉連接了
}
}
3、復雜版 WordCount因為之前那個是用size方法得到次數(shù),但是這樣就不像是一個聚合操作,所以使用map映射,然后使用reduce 進行聚合操作,這樣來得到單詞出現(xiàn)的次數(shù)。
package com.atguigu.bigdata.spark.core.wc
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//復雜版wordcount
class spark01_fuzaWrodCount {}
object spark01_fuzaWrodCount{def main(args: Array[String]): Unit = {//之前是使用size 方法,得出單詞出現(xiàn)的次數(shù),但是那樣實現(xiàn)不像是個聚合功能,所以我們改善一下
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val context = new SparkContext(sparkConf)
println(context)
//TODD 執(zhí)行業(yè)務操作
//1、讀取文件,獲取一行一行的數(shù)據(jù) 這一步是扁平化
//hello word
val value = context.textFile("D:\\wc.txt") //textFile 可以吧文件一行一行的讀出來
//2、將數(shù)據(jù)進行拆分,形成一個一個的單詞
//扁平化:將整體拆分為個體的操作
//"hello word" =>hello,word
val danci: RDD[String] = value.flatMap(a =>a.split(" ")) //根據(jù)空格進行拆分
val wordToOne: RDD[(String, Int)] = danci.map(word =>(word, 1)) //直接在這一步統(tǒng)計單詞出現(xiàn)的次數(shù)
val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(t =>t._1) //然后按照方式,取第一個元素為分組的依據(jù)
val wordToCount = wordGroup.map{//這一步不是用size了
case (word,list) =>{list.reduce(
(t1,t2) =>{(t1._1,t1._2 + t2._2)
}
)
}
}
//這里不是直接size,而是進行reduce,聚合操作,將key給加起來
//val wordCount2 = wordGroup.map{case (word,list)=>{ list.reduce((t1,t2)=>{(t1._1,t1._2+t2._2)})}}
val array: Array[(String, Int)] = wordToCount.collect() //采集結果打印輸出
array.foreach(println) //foreach()方法進行遍歷
//TODD 關閉連接
context.stop() //這樣就關閉連接了
}
}
4、Spark 框架WordcountSpark框架里面有個方法,分組和聚合可以一個方法完成reduceByKey(_ + _)
,這樣大大減少了代碼量,從讀取文件進來,到輸出結果四五行就能完成這個案例。
package com.atguigu.bigdata.spark.core.wc
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//使用saprk框架進行統(tǒng)計
class spark02_sparkCount {}
object spark02_sparkCount{def main(args: Array[String]): Unit = {//之前是使用size 方法,得出單詞出現(xiàn)的次數(shù),但是那樣實現(xiàn)不像是個聚合功能,所以我們改善一下
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val context = new SparkContext(sparkConf)
println(context)
//TODD 執(zhí)行業(yè)務操作
//1、讀取文件,獲取一行一行的數(shù)據(jù) 這一步是扁平化
//hello word
val value = context.textFile("D:\\wc.txt") //textFile 可以吧文件一行一行的讀出來
//2、將數(shù)據(jù)進行拆分,形成一個一個的單詞
//扁平化:將整體拆分為個體的操作
//"hello word" =>hello,word
val danci: RDD[String] = value.flatMap(a =>a.split(" ")) //根據(jù)空格進行拆分
val wordToOne: RDD[(String, Int)] = danci.map(word =>(word, 1)) //直接在這一步統(tǒng)計單詞出現(xiàn)的次數(shù)
//Spark 框架提供了更多的功能,可以將分組和聚合使用一個功能實現(xiàn)
//reduceByKey():相同的key的數(shù)據(jù),可以對value進行reduce聚合 這是spark提供的功能
val wordCount = wordToOne.reduceByKey((x,y) =>x+y) //相當于同一個key 進行累加_ + _ 可以簡化成這樣
val array: Array[(String, Int)] = wordCount.collect() //采集結果打印輸出
array.foreach(println) //foreach()方法進行遍歷
//TODD 關閉連接
context.stop() //這樣就關閉連接了
}
}
簡化下來就是這幾步
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
網(wǎng)站題目:SparkWordCount案例-創(chuàng)新互聯(lián)
網(wǎng)頁網(wǎng)址:http://vcdvsql.cn/article8/dgdcop.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、用戶體驗、響應式網(wǎng)站、網(wǎng)頁設計公司、動態(tài)網(wǎng)站、品牌網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容