spark 10

Hadoop의 Mapreduce(맵리듀스) vs Spark(스파크)?

하둡 Hdfs 상에 데이터를 저장하고 PySpark로 분석하면서 정확한 시스템 용어나 개념에 대해서 정리해야지 말만하다가 이제라도 공부하고 정리하려고 한다. 다는 모르더라도 기본적인 것은 알고쓰자! 1.하둡(Hdoop)이란 - HDFS, MapReduce? 하둡은 대규모 데이터를 저장하고 처리하는데 사용되는 오픈소스 분산 컴퓨팅 프레임 웤이다. 아파치 소프트웨어 재단에서 개발하고 관리하며, 대용량(large-scalable) 데이터를 여러 대의 컴퓨터 클러스터에서 처리할 수 있도록 설계되어 있다. 여기서 우리가 알고 가야할 핵심적인 개념을 뽑자면 hdfs와 mapreduce이다. Hadoop 분산 파일 시스템(Hadoop Distributed File System, HDFS) HDFS는 대용량 데이터를 ..

[PySpark] 오버샘플링(oversampling), 언더샘플링(undersampling)

기계 학습에서 불균형 훈련 데이터 세트로 분류 문제를 다룰 때 오버 샘플링과 언더 샘플링은 결과를 개선하는 두 가지 쉽고 종종 효과적인 방법이다. 1.불균형(imbalanced) 데이터란? 데이터셋의 한 클래스(레이블) 수가 다른 클래스보다 상당히 많거나 적을때 데이터의 클래스가 불균형 상태라고 말한다. '불균형'이라는것이 상당히 분석가나 과학자의 입장에서 주관적인 판단이 개입된다. 그리고 실제 문제를 ML(머신러닝, 기계학습)으로 해결할 때 깔끔하게 균형잡힌 데이터를 만나기는 힘들다. (예를들어, 이상치 탐지, 사기탐지 데이터, 질병 데이터 등) 머신러닝 모델은 맹목적으로 다수의 클래스를 대부분 학습하기 때문에 대다수 예측 데이터를 학습된 다수의 클래스로 예측하려는 경향이 있다. 그렇기 때문에, 소수 ..

[PySpark] 특정(여러) 문자열(strings)이 포함된 데이터 필터로 뽑아내기.

우선 데이터 프레임 내에서 한 string 스키마 구조의 컬럼에서 특정 문자열이 포함된 데이터를 filter로 걸러내는 코드는 아래와 같다. 아래에서는 search strings list에 있는 단어들을 포함하고 있는 데이터를 뽑아내는 방법이다. 여러 단어들이 아닌 한 단어를 포함하고 있는 데이터를 뽑고자 할때는 reduce 함수를 쓸 필요 없이 아래와 같이 pyspark 내장함수를 활용하면 된다. df.filter(col("col1").contains("기내용")) - 기내용이 포함된 데이터 산출 df.filter(~col("col1").contains("기내용")) - 기내용이 포함되지 않은 데이터 산출 from pyspark.sql import SparkSession from pyspark.sql...

[PySpark] array_intersect로 array간 같은 value값 찾기

다음과 같은 'data'라는 데이터 프레임이 있다. 그리고 데이터 프레임에 collect_list(site_app)이라는 array 구조의 컬럼이 있는데 해당 컬럼에서 list_A의 list value 값이 몇개가 있는지 궁금하다. data = [("oaid_1", ["app1", "app2", "app3"], [10, 20, 30], [1, 2, 3]), ("oaid_2", ["app2", "app4", "app5"], [15, 25, 35], [2, 4, 5]), ("oaid_3", ["app1", "app3", "app4"], [12, 22, 32], [1, 3, 4])] # Create a DataFrame with the sample data columns = ["oaid", "collect_l..

[PySpark] explode, explode_outer 함수 차이

다음과 같은 스키마를 가진 테이블을 생성했다. an_array 라는 array 컬럼 하나와 a_map 으로 묶어진 key - value 컬럼이 존재한다 df = spark.createDataFrame( [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)], ("id", "an_array", "a_map") ) df.show() +---+----------+----------+ | id| an_array| a_map| +---+----------+----------+ | 1|[foo, bar]|{x -> 1.0}| | 2| []| {}| | 3| null| null| +---+----------+----------+ df.printSchema()..

[빅데이터를 지탱하는 기술] Ch5.빅데이터 파이프관리

5-1.워크플로우 관리 정해진 업무를 원활하게 진행하기 위한 구조 워크플로 관리도구 정기적으로 태스크 실행 비정상적인 상태 감지후 해결 ex) Airflow, azkaban, Digdag, Luigi, Oozie 워크플로 관리도구와 태스크 정해진 개별 반복 처리를 태스크(Task)라고 함 도구를 사용하는 이유는 태스크 실행의 실패 때문 태스크 수 증가로 재실행이 어려워 도구를 활용함 워크플로 관리 도구의 기능 정기적인 스케쥴로 태스크 실행후 통지 태스크간 의존 관계를 정하구 정해진 순서대로 실행 태스크 실행결과 보관, 오류 발생시 재실행 워크플로 관리 도구의 종류 선언 형과 스크립트 형 선언형(declarative) XML, YAML 등의 서식으로 워크플로 기술 미리 제공된 기능만 이용가능하지만 최소한의..

[빅데이터를 지탱하는 기술] Ch3.빅데이터 분산 처리 프레임워크

3-1 대규모 분산 처리의 프레임워크 구조화 데이터(structured data) 스키마(schema) 명확하게 정의된 데이터 테이블 비구조화 데이터(unstured data) 스키마가 없는 데이터 텍스트, 이미지 스키마리스 데이터(schemaless data) CSV, JSON(인터넷을 통해 주고받는 데이터), XML 서식은 정해져 있지만, 컬럼 수나 데이터형은 명확X 일반적으로 구조화 데이터 압축률 높이기 위해 열지향 스토리지로 저장 즉, MPP DB로 저장하거나 Hadoop 상에서 열 지향 스토리지 형식으로 변환 구조화된 데이터중 시간에 따라 증가하는 데이터를 팩트 테이블, 그에 따른 부속 데이터를 디멘젼 테이블데이터 구조화 열 지향 스토리지의 작성 MPP DB의 경우 제품에 따라 스토리지의 형식이..

[프로그래머스] 데이터 엔지니어 study - 7주차

1.Spark 기초 hadoop이 1세대라면 스파크 2세대 빅데이터 처리 기술 YARN등을 분산환경으로 사용 Scala로 작성됨 등장 버클리 대학 AMPLab에서 아파치 오픈소스 프로젝트로 2013년 시작 Databricks 스타트업 창업 Spark 3.0 구성 Spark Core: pandas의 데이터 프레임과 같음 Spark SQL Spark ML: scikit learn의 분산 버전 Spark Streaming Spark GraphX 자체 서버 엔진을 들고있는게 아니라 하둡(YARN), Kubernetes 와 같은 resource 매니저 위에서 돌아감 Spark vs MapReduce Spark 기본적으로 메모리 기반 → 메모리 부족시 디스크 사용 하둡(YARN) 이외에도 다른 분산 컴퓨팅 환경 지..

[PySpark] SparkConf로 Spark 환경설정

Spark를 사용하기 이전에 SparkConf 객체를 사용해 Java system properties를 사용한다. 다같이 사용하는 분석서버에서 데이터 처리나 분석을 할 때 상황에 맞게 적절한 core수 제어나 메모리를 지정한다면 제한된 리소스로 효율적인 데이터 처리 및 분석이 가능하다. from pyspark.sql import SparkSession from pyspark import SparkConf # spark-conf conf = SparkConf() conf.set("spark.driver.memory", "50g") conf.set("spark.executor.memory", "30g") conf.set("spark.ui.port","4051") spark = SparkSession.buil..

[PySpark] 백분위수(percentile), 사분위수(quantile)

약 1억 5천만건 데이터에서 특정한 값 기준으로 상위 n개의 데이터를 뽑아내는데 orderBy 후 limit(n)을 이용해 추출했다. 근데 너무 오래걸리더라... 값을 기준으로 정렬하는 연산방식이 spark에서 비효율적이라고 어디서 본거 같은데... 그래서 분위수 값을 구해 cut-off 방식으로 데이터를 추출했는데 시간이 더 적게 걸리더라. 우선, 예시를 들기 위해 다음과 같이 PySpark 세팅과 데이터 프레임을 구성했다. 그리고 visit(string 형식)의 값에 대해 분위수 값을 구하려고 한다. from pyspark.sql import SparkSession spark = SparkSession.builder.master("local[1]").appName("regexp_replace").ge..