[Spark]스파크 복습 - 3(DataFrame/Spark SQL)

Spark 3일차

  • hdfs dfs -put 보낼(로컬) 받을(하둡)


파일 읽기/쓰기

  • dataframewrite.mode : append, overwrite, error/errorifexists, ignore
  • 2010년 비행운행정보를 요약한 csv파일을 불러오자.
    • csv_file01 = spark.read.format("csv").option("header", "true").load("/home/big/data/flights/csv/2010-summary.csv")
    • csv_file02 = spark.read.option("header","true").csv("/home/big/data/flights/csv/2010-summary.csv")
  • 만약 파일을 저장하고 싶다면?
    • csv_file02.write.format("csv").mode("overwrite").save("/tmp/csv") 이런 식으로 저장
  • 저장된 파일을 우분투에서 확인해보자.
    • hdfs dfs -ls /tmp/csv
    • image
    • hdfs dfs -cat /tmp/csv/*.csv
    • image
  • 이번엔 json 파일을 불러보자.
    • json_file01 = spark.read.format("json").load("/home/big/data/flights/json/2010-summary.json")
  • 반대로 json 파일을 저장하려면?
    • json_file01.write.format("json").mode("append").save("/tmp/json")
  • 저장한 파일을 확인해보자.
    • hdfs dfs -ls /tmp/json
    • image
  • 그 외의 방식들
    • parquet : spark 컬럼 기반 데이터 저장 방식
    • orc : hadoop 컬럼 기반 데이터 저장 방식
    • text
    • db



집계(Rollup, Cube..)

  • 2일차에서 썼던 데이터파일을 다시 가져오자. 모두 가져오기에는 너무 용량이 커서 내 컴퓨터가 버거워하기 때문에 ㅠㅠ 축소된 데이터만 가져와야했다.

    • retails = spark.read.format("csv").option("header","true").option("inferSchema", "true").load("/home/big/data/retails/2010-12-01.csv")
  • 그리고 2일차와 환경을 맞추기 위해 함수를 불러오고, 데이터프레임을 만들어줬다.

    • from pyspark.sql.functions import to_date, col
      date_df = retails.withColumn("date", to_date(col("InvoiceDate"), "yyyy-MM-dd HH:mm:ss"))
      
  • CustomerId와 StockCode를 기준으로 그룹화

    • notnull_df = date_df.drop()
          
      # sql쿼리를 사용하기 위한 테이블 생성
      notnull_df.createOrReplaceTempView("notnullDf")
          
      spark.sql("""
      SELECT CustomerId, StockCode, sum(Quantity)
      FROM notnullDf
      GROUP BY CustomerId, StockCode
      ORDER BY CustomerId DESC, StockCode DESC
      """).show()
      

      image

  • Date와 Country컬럼을 기준으로 Rollup해서 Quantity의 합계를 집계해줬다. 그리고 Date를 기준으로 정렬하여 rollup_df를 만들어 줌

    • from pyspark.sql.functions import sum
          
      # selectExpr를 사용해서 SUM(Quantity)부분 싱글쿼테이션이 아닌 백틱 `이다.
      rollup_df = notnull_df.rollup("Date","Country").agg(sum("Quantity")).selectExpr("Date","Country","`SUM(Quantity)` as total_quantity").orderBy("Date")
          
      rollup_df.show()
      

      image

    • # Country가 null인 것 찾기
      rollup_df.where("Country IS NULL").show()
      

      image

  • 이번엔 Cube를 사용해보자.

    • notnull_df.cube("Date","Country").agg(sum(col("Quantity"))).select("Date","Country","SUM(Quantity)").show()
      

      image

      • (Date, Country), (null, Country), (null, null), (Date, null) 형태로 집계



조인(Join)

  • 이번에는 스파크에서 조인(join)을 해보자.

    • # 데이터 만들기
      person = spark.createDataFrame(
      [
          (1, "shin dongyeop", 2, [1]),
          (2, "seo janghun", 3, [2]),
          (3, "you jaeseok", 1, [1,2]),
          (4, "kang hodong", 0, [0])
      ]
      ).toDF("id", "name", "program", "job")
          
      program = spark.createDataFrame(
      [
          (1, "MBC", "놀면 뭐하니"),
          (2, "KBS", "불후의 명곡"),
          (3, "SBS", "미운 우리 새끼"),
          (4, "JTBC", "뭉쳐야 찬다")
      ]
      ).toDF("id", "broadcaster", "program")
          
          
      job = spark.createDataFrame([
          (1, "main mc"), (2, "member")
      ]).toDF("id", "job")
          
      person.show()
      program.show()
      job.show()
      

      image

    • # spark.sql fucntions를 사용하기 위해 테이블 생성
      person.createOrReplaceTempView("person")
      program.createOrReplaceTempView("program")
      job.createOrReplaceTempView("job")
      
    • # person이 가진 program과 program의 id가 같은 것을 활용해 join
      person.join(program, person['program'] == program['id']).show()
          
      # sql사용
      spark.sql("SELECT * FROM person JOIN program ON(person.program = program.id)").show()
          
      # 위에 조건을 변수에 담아서 사용 가능하다.
      conditions = person.program == program.id
      person.join(program, conditions).show()
          
      # inner 조인임을 명시할때는
      person.join(program, conditions, "inner").show()
      

      image

    • # full outer join
      person.join(program, conditions, "outer").show()
          
      # sql 사용
      spark.sql("SELECT * FROM person FULL OUTER JOIN program ON(person.program = program.id)").show()
      

      image

    • # left, right outer join
      person.join(program, conditions, "left_outer").show()
      person.join(program, conditions, "right_outer").show()
      

      image

      image

    • # semi : inner join인 결과에서 왼쪽 부분만 가져오는 것
      person.join(program, conditions, "left_semi").show()
      

      image

    • # anti : inner join하고 남는 애들만 나옴
      person.join(program, conditions, "left_anti").show()
      

      image

    • # cross join : 교차해서 나올 수 있는 모든 경우 join(Cartesian곱)
      person.crossJoin(program).show()
          
      person.join(program, how = "cross").show()
      

      image

    • # person과 job을 조인해보자.
      from pyspark.sql.functions import expr
          
      # 이런식으로 하면 모호해서 어떤 job인지 잘 모른다.
      # person.join(job, expr("array_contains(job, id)")).show()
          
      # 따라서 withColumnRenamed를 사용해 컬럼명을 변경해주고 조인
      person.withColumnRenamed("id", "num").withColumnRenamed("job", "role")\
      .join(job, expr("array_contains(role, id)")).show()
      

      image

    • # 서브쿼리를 사용해, sbs에서 프로그램을 하고 있는 사람을 출력하자.
      spark.sql("""
      SELECT * FROM PERSON WHERE PROGRAM = (SELECT ID FROM PROGRAM WHERE BROADCASTER = 'SBS')
      """).show()
      

      image

2022

[web]jQuery 복습 3

1 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]jQuery 복습 2

13 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]jQuery 복습 1

14 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리4

5 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리3

10 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리2

7 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리1

8 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]CSS 기초 정리

11 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]HTML 기초 정리

8 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[Pandas]pandas 연습

3 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

맨 위로 이동 ↑

2021

[Python기초]module

1 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

맨 위로 이동 ↑