프로그래밍 38

[PySpark] array_intersect로 array간 같은 value값 찾기

다음과 같은 'data'라는 데이터 프레임이 있다. 그리고 데이터 프레임에 collect_list(site_app)이라는 array 구조의 컬럼이 있는데 해당 컬럼에서 list_A의 list value 값이 몇개가 있는지 궁금하다. data = [("oaid_1", ["app1", "app2", "app3"], [10, 20, 30], [1, 2, 3]), ("oaid_2", ["app2", "app4", "app5"], [15, 25, 35], [2, 4, 5]), ("oaid_3", ["app1", "app3", "app4"], [12, 22, 32], [1, 3, 4])] # Create a DataFrame with the sample data columns = ["oaid", "collect_l..

[PySpark] Union(= unionAll) 함수로 두 데이터 프레임 합치기

PySpark union() 및 unionAll() 함수는 동일한 스키마 또는 구조의 둘 이상의 DataFrame을 병합하는 데 사용된다. 그러면 union 과 unionAll 함수의 차이는 뭘 까? 결론적으로 말하자면 두 함수의 차이는 없다. Spark 2.0.0 버전부터는 unionAll()은 사용되지 않고 union으로 사용된다. 참고사항으로 SQL 언어에서 Union은 중복을 제거하지만 UnionAll은 중복 레코드를 포함한 두 개의 데이터 세트를 병합한다. PySpark에서는 둘 다 동일하게 동작하며 DataFrame 중복을 제거하기 위해선 duplicate() 함수를 사용하는 것이 좋다. import pyspark from pyspark.sql import SparkSession spark =..

[PySpark] explode, explode_outer 함수 차이

다음과 같은 스키마를 가진 테이블을 생성했다. an_array 라는 array 컬럼 하나와 a_map 으로 묶어진 key - value 컬럼이 존재한다 df = spark.createDataFrame( [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], ("id", "an_array", "a_map") ) df.show() +---+----------+----------+ | id| an_array| a_map| +---+----------+----------+ | 1|[foo, bar]|{x -> 1.0}| | 2| []| {}| | 3| null| null| +---+----------+----------+ df.printSchema()..

[PySpark] dense 벡터와 sparse 벡터, UDF로 sparse vector 만들기

1.vector 개념 희소 벡터를 생성하려면 벡터 길이(엄격하게 증가해야 하는 0이 아닌 값과 0이 아닌 값의 인덱스)를 제공해야 합니다. pyspark.mllib.linag.Vecotors 라이브러리는 dense(고밀도), sparse(희소) 두 유형의 로컬 벡터를 지원한다. 희소 벡터는 벡터안에 숫자가 0이 많은 경우에 사용한다. 희소 벡터를 생성하려면 벡터 길이와 0이 아닌 값과 0이 아닌 값의 인덱스 총 세가지의 인자를 넣어야 한다. tip! 일반적으로 ML 모델 feature 구성시 vector assembler를 활용해서 features를 하나의 벡터로 구성한다. Vector Assembler가 메모리를 적게 사용하는 형식 중 하나를 기준으로 dense 벡터와 sparse 벡터를 선택한다. 또..

[PySpark] array 값 합계 컬럼 생성하기

PySpark의 Column() 함수를 사용하여 열에 있는 배열 값의 합계를 계산하려면 expr() 함수를 aggregate() 함수와 함께 사용할 수 있다. from pyspark.sql import SparkSession from pyspark.sql.functions import col, expr, aggregate # Create a SparkSession spark = SparkSession.builder.getOrCreate() # Create a sample DataFrame data = [("Alice", [1, 2, 3]), ("Bob", [4, 5]), ("Charlie", [6, 7, 8, 9])] df = spark.createDataFrame(data, ["Name", "Numbe..

[pandas] 날짜차이 컬럼 생성하기

판다스 데이터 프레임에서 특정 고정 날짜와 datetime64 열 사이의 기간을 나타내는 새 열을 만들려면 datetime64 열에서 고정 날짜를 뺄 수 있습니다. 그러면 고정 날짜와 각 날짜 시간 값 사이의 시간 차이를 나타내는 time delta64 데이터 유형이 생성된다. import pandas as pd # Example DataFrame df = pd.DataFrame({'Date': ['2023-05-10', '2023-05-11', '2023-05-12'], 'Time': ['09:00:00', '10:30:00', '14:15:00']}) # Convert 'Date' and 'Time' columns to datetime64 type df['Date'] = pd.to_datetime(d..

[PySpark] 데이터프레임 값을 리스트로 반환하기

pyspark 데이터프레임의 특정 column의 value들을 리스트로 반환하는 방법이다. 여기서 x[2]는 cat2 column을 지칭하고 고유값이 아니라 전체 value를 list로 반환하려면 distinct() 없이 작업을 하면 된다. df2.show(3) #+---+----+----+--------+ #|ind| id|cat2|cat2_cnt| #+---+----+----+--------+ #| 1|2868| 167| 16| #| 2|1737| 157| 24| #| 3|1476| 189| 3| #+---+----+----+--------+ #only showing top 3 rows cat2_list = df2.rdd.map(lambda x: x[2]).distinct().collect() cat..

[PySpark] 빈 데이터 프레임 생성하고 데이터 집어넣기

pyspark에서 다음과 같이 테이블의 스키마를 지정하고 빈 데이터 테이블을 만들 수 있다. 스키마 지정시 뭐 int, float, double 등 데이터 유형을 지정할 수 있는데 임의로 string 데이터 형식을 지정해뒀다. spark 세션과 config는 본인의 환경에 맞게 설정하면 된다. from pyspark.sql import SparkSession from pyspark import SparkConf # spark-conf conf = SparkConf() # conf.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.2.0") conf.set("spark.driver.memory", "-g") conf.set("spark.exec..

[PySpark] 랜덤표본추출(sample, sampleBy, take_Sample)

Pyspark 데이터 프레임에서 랜덤 샘플링하는 방법을 찾아보다가 세가지 방법이 있길래 정리하려고 한다. 우선, 샘플링하고자 하는 데이터 프레임을 생성했다. from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("regexp_replace").getOrCreate() address = [("RJFK-SLFKW-DFG1T","M BS", "1"), ("aedw-dg93r-d62g1","W SE", "2"), ("DFGE-FD23k-DA4G1", "M GJ", "3"), ("dssf-dg93r-d62g1","W SE", "4"), ("grdg-dg93r-d62g1","W SE", "5"), ("h..

[PySpark] SparkConf로 Spark 환경설정

Spark를 사용하기 이전에 SparkConf 객체를 사용해 Java system properties를 사용한다. 다같이 사용하는 분석서버에서 데이터 처리나 분석을 할 때 상황에 맞게 적절한 core수 제어나 메모리를 지정한다면 제한된 리소스로 효율적인 데이터 처리 및 분석이 가능하다. from pyspark.sql import SparkSession from pyspark import SparkConf # spark-conf conf = SparkConf() conf.set("spark.driver.memory", "50g") conf.set("spark.executor.memory", "30g") conf.set("spark.ui.port","4051") spark = SparkSession.buil..