
[PySpark] 빈 데이터 프레임 생성하고 데이터 집어넣기

히또아빠 2023. 2. 10. 16:52

pyspark에서 다음과 같이 테이블의 스키마를 지정하고 빈 데이터 테이블을 만들 수 있다. 스키마 지정시 뭐 int, float, double 등 데이터 유형을 지정할 수 있는데 임의로 string 데이터 형식을 지정해뒀다. spark 세션과 config는 본인의 환경에 맞게 설정하면 된다.

from pyspark.sql import SparkSession
from pyspark import SparkConf

# spark-conf 
conf = SparkConf()
# conf.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.0")
conf.set("spark.driver.memory", "-g")
conf.set("spark.executor.memory", "-g")
# .config('spark.local.dir', '')\

spark = SparkSession.builder \
    .appName("test") \
    .master("local[*]") \
    .config(conf=conf) \

# creating an empty RDD to make a 
# DataFrame with no data
emp_RDD = spark.sparkContext.emptyRDD()

# define the schema of the dataframe
columns1_schema= StructType([StructField("id", StringType(), False), StructField("cat1", StringType(), False),\
                            StructField("cat1_cnt", StringType(), False), StructField("seed", StringType(), False)])

columns2_schema = StructType([StructField("ind", StringType(), False), StructField("id", StringType(), False), StructField("cat2", StringType(), False),\
                            StructField("cat2_cnt", StringType(), False)])
first_df = spark.createDataFrame(data = emp_RDD, schema = columns1_schema)
second_df = spark.createDataFrame(data = emp_RDD, schema = columns2_schema)


#| id|cat1|cat1_cnt|seed|

#|ind| id|cat2|cat2_cnt|

위처럼 생성된 빈 테이블에 임의로 데이터를 생성해 집어 넣어보자.

생성된 빈테이블에 데이터를 랜덤하게 발생시켜 만든 테이블을 union하는 방식이다. 스키마 구조가 맞기 때문에 적적할게 테이블이 구성됨을 알 수 있다. 데이터를 넣는 방식은 이것 말고도 spark.sql을 활용하는 방식도 있음.

import random

rows1 = [['%d'%i, random.randrange(100, 150), random.randrange(0, 31), random.randint(0,1)] for i in range(1,5001)]
columns1 = ['id', 'cat1', 'cat1_cnt', 'seed']

record1 = spark.createDataFrame(rows1, columns1)
df1 = first_df.union(record1)

rows2 = [[i, random.randint(1,5001), random.randrange(150, 200), random.randrange(0, 31)] for i in range(1, 300000)]
columns2 = ['ind', 'id', 'cat2', 'cat2_cnt']

record2 = spark.createDataFrame(rows2, columns2)
df2 = second_df.union(record2)


#| id|cat1|cat1_cnt|seed|
#|  1| 144|      30|   1|
#|  2| 127|      19|   0|
#|  3| 119|      12|   0|
#only showing top 3 rows

#|ind|  id|cat2|cat2_cnt|
#|  1|2868| 167|      16|
#|  2|1737| 157|      24|
#|  3|1476| 189|       3|
#only showing top 3 rows



