教育行業(yè)A股IPO第一股(股票代碼 003032)

全國(guó)咨詢(xún)/投訴熱線:400-618-4000

多種方法創(chuàng)建DataFrame【大數(shù)據(jù)技術(shù)文章】

更新時(shí)間:2021年03月23日14時(shí)14分 來(lái)源:傳智教育 瀏覽次數(shù):

在Spark2.0版本之前,Spark SQL中的SQLContext是創(chuàng)建DataFrame和執(zhí)行SQL的入口,我們可以利用HiveContext接口,通過(guò)HiveQL語(yǔ)句操作Hive表數(shù)據(jù),實(shí)現(xiàn)數(shù)據(jù)查詢(xún)功能。而在Spark2.0之后,Spark使用全新的SparkSession接口替代SQLContext及HiveContext接口完成數(shù)據(jù)的加載、轉(zhuǎn)換、處理等功能。

創(chuàng)建SparkSession對(duì)象可以通過(guò)“SparkSession.builder().getOrCreate()”方法獲取,但當(dāng)我們使用Spark-Shell編寫(xiě)程序時(shí),Spark-Shell客戶(hù)端會(huì)默認(rèn)提供了一個(gè)名為“sc”的SparkContext對(duì)象和一個(gè)名為“spark”的SparkSession對(duì)象,因此我們可以直接使用這兩個(gè)對(duì)象,不需要自行創(chuàng)建。啟動(dòng)Spark-Shell命令如下所示。

$ spark-shell --master local[2]

在啟動(dòng)Spark-Shell完成后,效果如圖1所示。

DataFrame的創(chuàng)建方法【大數(shù)據(jù)文章】

圖1 啟動(dòng)Spark-Shell

在圖1中可以看出,SparkContext、SparkSession對(duì)象已創(chuàng)建完成。創(chuàng)建DataFrame有多種方式,最基本的方式是從一個(gè)已經(jīng)存在的RDD調(diào)用toDF()方法進(jìn)行轉(zhuǎn)換得到DataFrame,或者通過(guò)Spark讀取數(shù)據(jù)源直接創(chuàng)建。

在創(chuàng)建DataFrame之前,為了支持RDD轉(zhuǎn)換成DataFrame及后續(xù)的SQL操作,需要導(dǎo)入spark.implicits._包啟用隱式轉(zhuǎn)換。若使用SparkSession方式創(chuàng)建DataFrame,可以使用spark.read操作,從不同類(lèi)型的文件中加載數(shù)據(jù)創(chuàng)建DataFrame,具體操作API如表1所示。

表1 spark.read操作

代碼示例 描述
spark.read.text("people.txt") 讀取txt格式的文本文件,創(chuàng)建DataFrame                                 
spark.read.csv ("people.csv") 讀取csv格式的文本文件,創(chuàng)建DataFrame
spark.read.json("people.json") 讀取json格式的文本文件,創(chuàng)建DataFrame
spark.read.parquet("people.parquet") 讀取parquet格式的文本文件,創(chuàng)建DataFrame

1.?dāng)?shù)據(jù)準(zhǔn)備

在HDFS文件系統(tǒng)中的/spark目錄中有一個(gè)person.txt文件,內(nèi)容如文件1所示。

文件1 person.txt

 zhangsan 20
 lisi 29
 wangwu 25
 zhaoliu 30
 tianqi 35
 jerry 40

2.通過(guò)文件直接創(chuàng)建DataFrame

我們通過(guò)Spark讀取數(shù)據(jù)源的方式進(jìn)行創(chuàng)建DataFrame,在Spark-Shell輸入下列代碼:

scala > val personDF = spark.read.text("/spark/person.txt")
personDF: org.apache.spark.sql.DataFrame = [value: String]
scala > personDF.printSchema()
root
 |-- value: String (Nullable = true)

從上述返回結(jié)果personDF的屬性可以看出,創(chuàng)建DataFrame對(duì)象完成,之后調(diào)用DataFrame的printSchema()方法可以打印當(dāng)前對(duì)象的Schema元數(shù)據(jù)信息。從返回結(jié)果可以看出,當(dāng)前value字段是String數(shù)據(jù)類(lèi)型,并且還可以為Null。

使用DataFrame的show()方法可以查看當(dāng)前DataFrame的結(jié)果數(shù)據(jù),具體代碼和返回結(jié)果如下所示。

scala > personDF.show()
+-------------+                          
|   value   |
+-------------+
|1 zhangsan 20|
|2 lisi    29|
|3 wangwu  25|
|4 zhaoliu 30|
|5 tianqi  35|
|6 jerry  40|
+-------------+

從上述返回結(jié)果看出,當(dāng)前personDF對(duì)象中的6條記錄就對(duì)應(yīng)了person.txt文本文件中的數(shù)據(jù)。

3.RDD轉(zhuǎn)換DataFrame

調(diào)用RDD的toDF()方法,可以將RDD轉(zhuǎn)換為DataFrame對(duì)象,具體代碼如下所示。

   scala > val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))
   lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at
   map at <console>:24
   scala > case class Person(id:Int,name:String,age:Int)
   defined class Person
   scala > val personRDD = 
lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
   personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map
   at <console>:27
   scala > val personDF = personRDD.toDF()
  personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more
  field]
  scala > personDF.show
  +----+--------+----+
  | id |  name | age|
  +----+--------+----+
  | 1 |zhangsan | 20|
  | 2 |lisi   |  29|
  | 3 |wangwu  |  25|
  | 4 |zhaoliu |  30|
  | 5 |tianqi  |  35|
  | 6 |jerry  |  40|
  +----+--------+----+
  scala > personDF.printSchema
  root
   |-- id: integer (nullable = false)
   |-- name: string (nullable = true)
   |-- age: integer (nullable = false)

在上述代碼中,第1行代碼將文本文件轉(zhuǎn)換成RDD,第4行代碼定義Person樣例類(lèi),相當(dāng)于定義表的Schema元數(shù)據(jù)信息,第6行代碼表示使RDD中的數(shù)組數(shù)據(jù)與樣例類(lèi)進(jìn)行關(guān)聯(lián),最終會(huì)將RDD[Array[String]]更改為RDD[Person],第9行代碼表示調(diào)用RDD的toDF()方法,就可以把RDD轉(zhuǎn)換成了DataFrame了。第12-27行代碼表示調(diào)用DataFrame方法并從返回結(jié)果可以看出,RDD對(duì)象成功轉(zhuǎn)換DataFrame。




猜你喜歡:

Redis、傳統(tǒng)數(shù)據(jù)庫(kù)、HBase以及Hive的區(qū)別

怎樣安裝和配置Sqoop?

DataFrame是什么意思?與RDD相比有哪些優(yōu)點(diǎn)?

大數(shù)據(jù)Hadoop生態(tài)圈包含哪些子系統(tǒng)?

傳智教育大數(shù)據(jù)項(xiàng)目開(kāi)發(fā)培訓(xùn)

0 分享到:
和我們?cè)诰€交談!