更新時(shí)間:2021年09月08日18時(shí)13分 來(lái)源:傳智教育 瀏覽次數(shù):
當(dāng)case類不能提前定義的時(shí)候,就需要采用編程方式定義Schema信息,定義DataFrame主要包含3個(gè)步驟,具體如下:
(1)創(chuàng)建一個(gè)Row對(duì)象結(jié)構(gòu)的RDD;
(2)基于StructType類型創(chuàng)建Schema;
(3)通過(guò)SparkSession提供的createDataFrame()方法來(lái)拼接Schema。
根據(jù)上述步驟,創(chuàng)建SparkSqlSchema. scala文件,使用編程方式定義Schema信息的具體代碼如文件4-3所示。
文件4-3 SparkSqlSchema.scala
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sq1.types. {IntegerType,StringType,StructField,StructType} import org.apache.spark.sql.(DataFrame,Row,Sparkession) object SparkSqlSchema { def main(args: Array[string]): Unit=( //1.創(chuàng)建SparkSession val spark: sparkSession=Sparksession.bullder() .appName ("SparkSq1Schema") .master ("1oca1[2]") .getOrCreate () //2.獲取sparkConttext對(duì)象 val sc: SparkContext=spark.sparkContext //設(shè)置日志打印級(jí)別 sc.setLogLevel ( "WARN") //3.加載數(shù)據(jù) val dataRDD:RDD[String]=sc.textFile("D://spark//person.txt") //4.切分每一行 val dataArrayRDD:RDD[ Array[string]]=dataRDD.map( .split(" ")) //5.加載數(shù)據(jù)到Row對(duì)象中 val personRDD:RDD[Row]= dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) //6.創(chuàng)建Schema val schema:StructType=StructType(Seq( StructField("id",IntegerType,false), StructField("name",StringType,false), StructField("age", IntegerType, false) )) //7.利用personRDD與Schema創(chuàng)建DataFrame val personDF:DataFrame=spark.createDataFrame(personRDD,schema) //8.DSL操作顯示DataFrame的數(shù)據(jù)結(jié)果 personDF . show () //9.將DataFrame注冊(cè)成表 personDF.createOrReplaceTempView ("t_person") //10.sq1語(yǔ)句操作 spark.sq1 ("select¥from t_ person") .show() //11.關(guān)閉資源 sc.stop() spark.stop ()
在文件4-3中,第9~23行代碼表示將文件轉(zhuǎn)換成為RDD的基本步驟,第25~29行代碼即為編程方式定義Schema的核心代碼,Spark SQL提供了Class StructType( val fields:Array[StructField])類來(lái)表示模式信息,生成一個(gè)StructType對(duì)象,需要提供fields作為輸入?yún)?shù),fields是個(gè)集合類型,StructField(name,dataTypenullable)參數(shù)分別表示為字段名稱、字段數(shù)據(jù)類型、字段值是否允許為空值,根據(jù)person.txt文本數(shù)據(jù)文件分別設(shè)置id、name、age字段作為Schema,第31行代碼表示通過(guò)調(diào)用spark.createDataFrame()方法將RDD和Schema進(jìn)行合并轉(zhuǎn)換為DataFrame,第33~40行代碼即為操作DataFrame進(jìn)行數(shù)據(jù)查詢。
猜你喜歡:Kerberos是什么?Kerberos怎樣做身份認(rèn)證?
如何對(duì)序列執(zhí)行切片操作?【Python切片教程】
怎樣使用CLI調(diào)動(dòng)Hive的一些功能?
北京校區(qū)