Spark SQL支持兩種方式將現有RDD轉換為DataFrame。
第一種方法使用反射來推斷RDD的schema并創建DataSet然后將其轉化為DataFrame。這種基于反射方法十分簡便,但是前提是在您編寫Spark應用程序時就已經知道RDD的schema類型。
第二種方法是通過編程接口,使用您構建的StructType,然后將其應用于現有RDD。雖然此方法很麻煩,但它允許您在運行之前并不知道列及其類型的情況下構建DataSet
創新互聯是一家集成都網站制作、成都網站建設、網站頁面設計、網站優化SEO優化為一體的專業網站設計公司,已為成都等多地近百家企業提供網站建設服務。追求良好的瀏覽體驗,以探求精品塑造與理念升華,設計最適合用戶的網站頁面。 合作只是第一步,服務才是根本,我們始終堅持講誠信,負責任的原則,為您進行細心、貼心、認真的服務,與眾多客戶在蓬勃發展的市場環境中,互促共生。
方法如下
1.將RDD轉換成Rows
2.按照第一步Rows的結構定義StructType
3.基于rows和StructType使用createDataFrame創建相應的DF
測試數據為order.data
1 小王 電視 12 2015-08-01 09:08:31
1 小王 冰箱 24 2015-08-01 09:08:14
2 小李 空調 12 2015-09-02 09:01:31
代碼如下:
object RDD2DF {
/**
* 主要有兩種方式
* 第一種是在已經知道schema已經知道的情況下,我們使用反射把RDD轉換成DS,進而轉換成DF
* 第二種是你不能提前定義好case class,例如數據的結構是以String類型存在的。我們使用接口自定義一個schema
* @param args
*/
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder()
.appName("DFDemo")
.master("local[2]")
.getOrCreate()
// rdd2DFFunc1(spark)
rdd2DFFunc2(spark)
spark.stop()
}
/**
* 提前定義好case class
* @param spark
*/
def rdd2DFFunc1(spark:SparkSession): Unit ={
import spark.implicits._
val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")
val orderDF=orderRDD.map(_.split("\t"))
.map(attributes=>Order(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))
.toDF()
orderDF.show()
Thread.sleep(1000000)
}
/**
*總結:第二種方式就是通過最基礎的DF接口方法,將
* @param spark
*/
def rdd2DFFunc2(spark:SparkSession): Unit ={
//TODO: 1.將RDD轉換成Rows 2.按照第一步Rows的結構定義StructType 3.基于rows和StructType使用createDataFrame創建相應的DF
val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")
//TODO: 1.將RDD轉換成Rows
val rowsRDD=orderRDD
// .filter((str:String)=>{val arr=str.split("\t");val res=arr(1)!="小李";res})
.map(_.split("\t"))
.map(attributes=>Row(attributes(0).trim,attributes(1),attributes(2),attributes(3).trim,attributes(4)))
//TODO: 2.按照第一步Rows的結構定義StructType
val schemaString="id|name|commodity|age|date"
val fields=schemaString.split("\\|")
.map(filedName=>StructField(filedName,StringType,nullable = true))
val schema=StructType(fields)
//TODO: 3.基于rows和StructType使用createDataFrame創建相應的DF
val orderDF= spark.createDataFrame(rowsRDD,schema)
orderDF.show()
orderDF.groupBy("name").count().show()
orderDF.select("name","commodity").show()
Thread.sleep(10000000)
}
}
case class Order(id:String,name:String,commodity:String,age:String,date:String)
在實際生產環境中,我們其實選擇的是方式二這種進行創建DataFrame的,因為我們生產中很難提前定義case class ,因為業務處理之后字段常常會發生意想不到的變化,所以一定要掌握這種方法。
baidu CN A E [01/May/2018:02:15:52 +0800] 2 61.237.59.0 - 112.29.213.35:80 0 movieshow2000.edu.chinaren.com GET http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 16374 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568
baidu CN A E [01/May/2018:02:25:33 +0800] 2 61.232.37.228 - 112.29.213.35:80 0 github.com GET http://github.com/user_upload/15316339776271/44y.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 83552 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object LogConverUtil {
private val struct=StructType(
Array(
StructField("domain",StringType)
,StructField("url",StringType)
,StructField("pv",LongType)
,StructField("traffic",LongType)
,StructField("date",StringType)
)
)
def getStruct():StructType={
struct
}
def parseLog(logLine:String): Row ={
val sourceFormat=new SimpleDateFormat("[dd/MMM/yyyy:hh:mm:ss +0800]",Locale.ENGLISH)
val targetFormat=new SimpleDateFormat("yyyyMMddhh")
try{
val fields=logLine.split("\t")
val domain=fields(10)
val url=fields(12)
val pv=1L
val traffic=fields(19).trim.toLong
val date=getFormatedDate(fields(4),sourceFormat,targetFormat)
Row(domain,url,pv,traffic,date)
}catch {
case e:Exception=>Row(0)
}
}
/**
*
* @param sourceDate Log中的未格式化日期 [01/May/2018:01:09:45 +0800]
* @return 按照需求格式化字段 2018050101
*/
def getFormatedDate(sourceDate: String, sourceFormat: SimpleDateFormat, targetFormat: SimpleDateFormat) = {
val targetTime=targetFormat.format(sourceFormat.parse(sourceDate))
targetTime
}
}
import org.apache.spark.sql.SparkSession
object SparkCleanJob {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder()
.master("local[2]")
.appName("SparkCleanJob")
.getOrCreate()
val logRDD=spark.sparkContext.textFile("file:///D:/baidu.log")
// logRDD.take(2).foreach(println(_))
//調用LogConverUtil里的parseLog方法和getStruct方法獲得Rows對象和StructType對象
val logDF=spark.createDataFrame(logRDD.map(LogConverUtil.parseLog(_)),LogConverUtil.getStruct())
logDF.show(false)
logDF.printSchema()
}
}
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
|domain |url |pv |traffic|date |
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 |1 |16374 |2018050102|
|github.com |http://github.com/user_upload/15316339776271/44y.mp4 |1 |83552 |2018050102|
|yooku.com |http://yooku.com/user_upload/15316339776271x0.html |1 |74986 |2018050101|
|rw.uestc.edu.cn |http://rw.uestc.edu.cn/user_upload/15316339776271515.mp4 |1 |55297 |2018050101|
|github.com |http://github.com/user_upload/15316339776271x05.mp4 |1 |26812 |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y4.html |1 |50392 |2018050103|
|github.com |http://github.com/user_upload/15316339776271x15.html |1 |40092 |2018050101|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/153163397762714z.mp4 |1 |8368 |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/5z.html |1 |29677 |2018050103|
|rw.uestc.edu.cn |http://rw.uestc.edu.cn/user_upload/153163397762710w.mp4 |1 |26124 |2018050102|
|yooku.com |http://yooku.com/user_upload/15316339776271yz.mp4 |1 |32219 |2018050101|
|yooku.com |http://yooku.com/user_upload/153163397762713w.html |1 |90389 |2018050101|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271z/.html |1 |15623 |2018050101|
|yooku.com |http://yooku.com/user_upload/1531633977627142.html |1 |53453 |2018050103|
|yooku.com |http://yooku.com/user_upload/15316339776271230.mp4 |1 |20309 |2018050102|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271/4w1.html|1 |87804 |2018050103|
|movieshow2000.edu.chinaren.com|http://movieshow2000.edu.chinaren.com/user_upload/15316339776271y5y.html |1 |69469 |2018050103|
|yooku.com |http://yooku.com/user_upload/15316339776271011/.mp4 |1 |3782 |2018050103|
|github.com |http://github.com/user_upload/15316339776271wzw.mp4 |1 |89642 |2018050102|
|github.com |http://github.com/user_upload/15316339776271/1/.mp4 |1 |63551 |2018050103|
+------------------------------+-------------------------------------------------------------------------+---+-------+----------+
only showing top 20 rows
root
|-- domain: string (nullable = true)
|-- url: string (nullable = true)
|-- pv: long (nullable = true)
|-- traffic: long (nullable = true)
|-- date: string (nullable = true)
Process finished with exit code 0
注:除了這種使用RDD讀取文本進而轉化成DataFrame之外,我們也會使用自定義DefaultSource來直接將text轉化成DataFrame
本文名稱:SparkRDD轉換成DataFrame的兩種方式
網站鏈接:http://vcdvsql.cn/article26/poogcg.html
成都網站建設公司_創新互聯,為您提供品牌網站設計、網站排名、微信公眾號、電子商務、建站公司、品牌網站制作
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯