bl双性强迫侵犯h_国产在线观看人成激情视频_蜜芽188_被诱拐的少孩全彩啪啪漫画

PySpark進階--深入剖析wordcount.py-創新互聯

在本文中, 我們借由深入剖析wordcount.py, 來揭開Spark內部各種概念的面紗。我們再次回顧wordcount.py代碼來回答如下問題

成都創新互聯公司專注于企業成都營銷網站建設、網站重做改版、夏津網站定制設計、自適應品牌網站建設、H5技術成都做商城網站、集團公司官網建設、成都外貿網站制作、高端網站制作、響應式網頁設計等建站業務,價格優惠性價比高,為夏津等各大城市提供網站開發制作服務。
  1. 對于大多數語言的Hello Word示例,都有main()函數, wordcount.py的main函數,或者說調用Spark的main() 在哪里

  2. 數據的讀入,各個RDD數據如何轉換

  3. map與flatMap的工作機制,以及區別

  4. reduceByKey的作用

WordCount.py 的代碼如下:

from __future__ import print_functionimport sysfrom operator import add# SparkSession:是一個對Spark的編程入口,取代了原本的SQLContext與HiveContext,方便調用Dataset和DataFrame API# SparkSession可用于創建DataFrame,將DataFrame注冊為表,在表上執行SQL,緩存表和讀取parquet文件。from pyspark.sql import SparkSessionif __name__ == "__main__":    # Python 常用的簡單參數傳入     if len(sys.argv) != 2:         print("Usage: wordcount <file>", file=sys.stderr)         exit(-1)             # appName 為 Spark 應用設定一個應用名,改名會顯示在 Spark Web UI 上     # 假如SparkSession 已經存在就取得已存在的SparkSession,否則創建一個新的。     spark = SparkSession\         .builder\         .appName("PythonWordCount")\         .getOrCreate()             # 讀取傳入的文件內容,并寫入一個新的RDD實例lines中,此條語句所做工作有些多,不適合初學者,可以截成兩條語句以便理解。     # map是一種轉換函數,將原來RDD的每個數據項通過map中的用戶自定義函數f映射轉變為一個新的元素。原始RDD中的數據項與新RDD中的數據項是一一對應的關系。     lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])        # flatMap與map類似,但每個元素輸入項都可以被映射到0個或多個的輸出項,最終將結果”扁平化“后輸出      counts = lines.flatMap(lambda x: x.split(' ')) \                   .map(lambda x: (x, 1)) \                   .reduceByKey(add)                     # collect() 在驅動程序中將數據集的所有元素作為數組返回。 這在返回足夠小的數據子集的過濾器或其他操作之后通常是有用的。由于collect 是將整個RDD匯聚到一臺機子上,所以通常需要預估返回數據集的大小以免溢出。                  output = counts.collect()         for (word, count) in output:         print("%s: %i" % (word, count))     spark.stop()
Spark 入口 SparkSession

Spark2.0中引入了SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,這邊不妨對照Http Session, 在此Spark就在充當Web service的角色,程序調用Spark功能的時候需要先建立一個Session。因此看到getOrCreate()就很容易理解了, 表明可以視情況新建session或利用已有的session。

    spark = SparkSession\         .builder\         .appName("PythonWordCount")\         .getOrCreate()

既然將Spark 想象成一個Web server, 也就意味著可能用多個訪問在進行,為了便于監控管理, 對應用命名一個恰當的名稱是個好辦法。Web UI并不是本文的重點,有興趣的同學可以參考 ?Spark Application’s Web Console

加載數據

在建立SparkSession之后, 就是讀入數據并寫入到Dateset中。

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

為了更好的分解執行過程,是時候借助PySpark了, PySpark是python調用Spark的 API,它可以啟動一個交互式Python Shell。為了方便腳本調試,暫時切換到Linux執行

# pysparkPython 2.7.6 (default, Jun 22 2015, 17:58:13)  [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to       ____              __      / __/__  ___ _____/ /__     _\ \/ _ \/ _ `/ __/  '_/   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0       /_/ Using Python version 2.7.6 (default, Jun 22 2015 17:58:13) SparkSession available as 'spark'.>>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')>>> type(ds) <class 'pyspark.sql.dataframe.DataFrame'>>>> print ds DataFrame[value: string]>>> lines = ds.rdd

交互式Shell的好處是可以方便的查看變量內容和類型。此刻文件a.txt已經加載到lines中,它是RDD(Resilient Distributed Datasets)彈性分布式數據集的實例。

RDD操作

RDD在內存中的結構可以參考論文, 理解RDD有兩點比較重要:

一是RDD一種只讀、只能由已存在的RDD變換而來的共享內存,然后將所有數據都加載到內存中,方便進行多次重用。

二是RDD的數據默認情況下存放在集群中不同節點的內存中,本身提供了容錯性,可以自動從節點失敗中恢復過來。即如果某個節點上的RDD partition,因為節點故障,導致數據丟了,那么RDD會自動通過自己的數據來源重新計算該partition。

為了探究RDD內部的數據內容,可以利用collect()函數, 它能夠以數組的形式,返回RDD數據集的所有元素。

>>> lines = ds.rdd>>> for i in lines.collect():...     print i... Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')

lines存儲的是Row object類型,而我們希望的是對String類型進行處理,所以需要利用map api進一步轉換RDD

>>> lines_map = lines.map(lambda x: x[0])>>> for i in lines_map.collect():...     print i... These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.

為了統計每個單詞的出現頻率,需要對每個單詞分別統計,那么第一步需要將上面的字符串以空格作為分隔符將單詞提取出來,并為每個詞設置一個計數器。比如 These出現次數是1, 我們期望的數據結構是['There', 1]。但是如何將包含字符串的RDD轉換成元素為類似 ['There', 1] 的RDD呢?

>>> flat_map = lines_map.flatMap(lambda x: x.split(' '))>>> rdd_map = flat_map.map(lambda x: [x, 1])>>> for i in rdd_map.collect():...     print i... [u'These', 1] [u'examples', 1] [u'give', 1] [u'a', 1] [u'quick', 1]

下圖簡要的講述了flatMap 和 map的轉換過程。

PySpark進階--深入剖析wordcount.py

transfrom.png

不難看出,map api只是為所有出現的單詞初始化了計數器為1,并沒有統計相同詞,接下來這個任務由reduceByKey()來完成。在rdd_map 中,所有的詞被視為一個key,而key相同的value則執行reduceByKey內的算子操作,因為統計相同key是累加操作,所以可以直接add操作。

>>> from operator import add>>> add_map = rdd_map.reduceByKey(add)>>> for i in add_map.collect():...     print i... (u'a', 1) (u'on', 1) (u'of', 2) (u'arbitrary', 1) (u'quick', 1) (u'the', 2) (u'or', 1)>>> print rdd_map.count()26>>> print add_map.count()23

根據a.txt 的內容,可知只有 of 和 the 兩個單詞出現了兩次,符合預期。

總結

以上的分解步驟,可以幫我們理解RDD的操作,需要提示的是,RDD將操作分為兩類:transformation與action。無論執行了多少次transformation操作,RDD都不會真正執行運算,只有當action操作被執行時,運算才會觸發。也就是說,上面所有的RDD都是通過collect()觸發的, 那么如果將上述的transformation放入一條簡練語句中, 則展現為原始wordcount.py的書寫形式。

counts = lines.flatMap(lambda x: x.split(' ')) \                   .map(lambda x: (x, 1)) \                   .reduceByKey(add)

而真正的action 則是由collect()完成。

output = counts.collect()

至此,已經完成了對wordcount.py的深入剖析,但是有意的忽略了一些更底層的執行過程,比如DAG, stage, 以及Driver程序。

作者:或然子
鏈接:https://www.jianshu.com/p/067907b23546
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。

另外有需要云服務器可以了解下創新互聯cdcxhl.cn,海內外云服務器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務器、裸金屬服務器、高防服務器、香港服務器、美國服務器、虛擬主機、免備案服務器”等云主機租用服務以及企業上云的綜合解決方案,具有“安全穩定、簡單易用、服務可用性高、性價比高”等特點與優勢,專為企業上云打造定制,能夠滿足用戶豐富、多元化的應用場景需求。

網頁標題:PySpark進階--深入剖析wordcount.py-創新互聯
文章網址:http://vcdvsql.cn/article20/ccchjo.html

成都網站建設公司_創新互聯,為您提供定制開發網站導航Google網站維護商城網站營銷型網站建設

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

外貿網站建設