更新時間:2023年07月28日16時16分 來源:傳智教育 瀏覽次數(shù):
DataFrame對象可以從RDD轉(zhuǎn)換而來,都是分布式數(shù)據(jù)集 其實就是轉(zhuǎn)換一下內(nèi)部存儲的結(jié)構(gòu),轉(zhuǎn)換為二維表結(jié)構(gòu)。
將RDD轉(zhuǎn)換為DataFrame方式1:
調(diào)用spark
# 首先構(gòu)建一個RDD rdd[(name, age), ()] rdd = sc.textFile("../data/sql/people.txt").\ map(lambda x: x.split(',')).\ map(lambda x: [x[0], int(x[1])]) # 需要做類型轉(zhuǎn)換, 因為類型從RDD中探測 # 構(gòu)建DF方式1 df = spark.createDataFrame(rdd, schema = ['name', 'age'])
通過SparkSession對象的createDataFrame方法來將RDD轉(zhuǎn)換為DataFrame,這里只傳入列名稱,類型從RDD中進行推斷,是否允許為空默認為允許(True)。
# coding:utf8 # 演示DataFrame創(chuàng)建的三種方式 from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.\ appName("create df").\ master("local[*]").\ getOrCreate() sc = spark.sparkContext # 首先構(gòu)建一個RDD rdd[(name, age), ()] rdd = sc.textFile("../data/sql/people.txt").\ map(lambda x: x.split(',')).\ map(lambda x: [x[0], int(x[1])]) # 需要做類型轉(zhuǎn)換, 因為類型從RDD中探測 # 構(gòu)建DF方式1 df = spark.createDataFrame(rdd, schema = ['name', 'age']) # 打印表結(jié)構(gòu) df.printSchema() # 打印20行數(shù)據(jù) df.show() df.createTempView("ttt") spark.sql("select * from ttt where age< 30").show()將RDD轉(zhuǎn)換為DataFrame方式2:
通過StructType對象來定義DataFrame的“表結(jié)構(gòu)”轉(zhuǎn)換RDD
# 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF rdd = sc.textFile("../data/sql/stu_score.txt").\ map(lambda x:x.split(',')).\ map(lambda x:(int(x[0]), x[1], int(x[2]))) # StructType 類 # 這個類 可以定義整個DataFrame中的Schema schema = StructType().\ add("id", IntegerType(), nullable=False).\ add("name", StringType(), nullable=True).\ add("score", IntegerType(), nullable=False) # 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add, 每一個add代表一個StructField # add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空 df = spark.createDataFrame(rdd, schema) # coding:utf8 # 需求: 基于StructType的方式構(gòu)建DataFrame 同樣是RDD轉(zhuǎn)DF from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.\ appName("create_df"). \ config("spark.sql.shuffle.partitions", "4"). \ getOrCreate() # SparkSession對象也可以獲取 SparkContext sc = spark.sparkContext # 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF rdd = sc.textFile("../data/sql/stu_score.txt").\ map(lambda x:x.split(',')).\ map(lambda x:(int(x[0]), x[1], int(x[2]))) # StructType 類 # 這個類 可以定義整個DataFrame中的Schema schema = StructType().\ add("id", IntegerType(), nullable=False).\ add("name", StringType(), nullable=True).\ add("score", IntegerType(), nullable=False) # 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add # add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空 df = spark.createDataFrame(rdd, schema) df.printSchema() df.show()
將RDD轉(zhuǎn)換為DataFrame方式3:
使用RDD的toDF方法轉(zhuǎn)換RDD
# StructType 類 # 這個類 可以定義整個DataFrame中的Schema schema = StructType().\ add("id", IntegerType(), nullable=False).\ add("name", StringType(), nullable=True).\ add("score", IntegerType(), nullable=False) # 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add # add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空 # 方式1: 只傳列名, 類型靠推斷, 是否允許為空是true df = rdd.toDF(['id', 'subject', 'score']) df.printSchema() df.show() # 方式2: 傳入完整的Schema描述對象StructType df = rdd.toDF(schema) df.printSchema() df.show() # coding:utf8 # 需求: 使用toDF方法將RDD轉(zhuǎn)換為DF from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StringType, IntegerType if __name__ == '__main__': spark = SparkSession.builder.\ appName("create_df"). \ config("spark.sql.shuffle.partitions", "4"). \ getOrCreate() # SparkSession對象也可以獲取 SparkContext sc = spark.sparkContext # 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF rdd = sc.textFile("../data/sql/stu_score.txt").\ map(lambda x:x.split(',')).\ map(lambda x:(int(x[0]), x[1], int(x[2]))) # StructType 類 # 這個類 可以定義整個DataFrame中的Schema schema = StructType().\ add("id", IntegerType(), nullable=False).\ add("name", StringType(), nullable=True).\ add("score", IntegerType(), nullable=False) # 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add # add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空 # 方式1: 只傳列名, 類型靠推斷, 是否允許為空是true df = rdd.toDF(['id', 'subject', 'score']) df.printSchema() df.show() # 方式2: 傳入完整的Schema描述對象StructType df = rdd.toDF(schema) df.printSchema() df.show()