[Spark]스파크 복습 - 2(DataFrame/Spark SQL)

Spark 2일차

DataFrame/Spark SQL

개념

  • Spark SQL
    • Query 작업을 가능하게 해주는 라이브러리
    • ANSI 표준 SQL, HIVE SQL 등의 문법을 사용해 데이터 관리
    • RDD-Like 타입 사용 (DataFrame, DataSet)
  • DataFrame
    • row(행) + column(열) 형식을 가지는 분산 테이블 형태의 렉션
    • schema : 데이터프레임의 컬럼, 데이터타입 등을 정의
    • execution plan : 연산이 데이터에 적용되는 순서 정의
    • ansi sql : ANSI 표준 SQL을 사용하여 데이터 질의 가능
      • spark.sql("ANSI SQL")


실습

  • myRange = spark.range(1000).toDF("number")
    myRange.head(10)
    myRange.tail(10)
    
    • number라는 컬럼을 가진 DF을 생성
    • pandas처럼 head tail 사용
    • image
  • divisBy2 = myRange.where("number % 2 = 0")
    divisBy2.head(5)
    divisBy2.tail(5)
    
    • 기존 myRange DF에서 짝수 값만 빼서 데이터프레임 생성
    • image
  • 기존에 저장해놓은 csv파일 불러오기

    • flights2010 = spark.read.csv("/home/big/data/flights/csv/2010-summary.csv")
      flights2010.printSchema()
      flights2010.take(5)
      
    • printSchema()로 구조 확인

    • 컬럼명을 지정해주지않으면 자동으로 지정해줌(ex) c0, c1)

    • 맨 첫 row는 컬럼명으로 생각된다.

    • image

    • flights2010 = spark.read.option("header","true").csv("/home/big/data/flights/csv/2010-summary.csv")
      flights2010.printSchema()
      flights2010.take(5)
      
    • header를 지정해주니 맨 첫줄의 값이 컬럼명으로 지정된다.

    • image

    • flights2010.sort("count").explain()
      
    • explain() : 실행계획을 볼 수 있음.

    • image

    • 결과는 정렬될 것인데 그 과정을 나타낸 것임
  • 이번에는 2015년도 비행운행정보 json파일을 가져오자.

    • f2015 = spark.read.format("json").load("/home/big/data/flights/json/2015-summary.json")
      f2015.show()
      
      • show() : 데이터베이스처럼 로우 x 컬럼 형태로 출력해줌(디폴트 20개)
      • image
      • 다 보고싶다면. df.show(df)
    • f2015.createOrReplaceTempView("flights2015")
          
      sqls = spark.sql("""
      SELECT DEST_COUNTRY_NAME, COUNT(*)
      FROM flights2015
      GROUP BY DEST_COUNTRY_NAME
      """)
      sqls.show()
      
      • f2015라는 table을 만들어주고, spark.sql 안에 ANSI표준 쿼리를 이용해서 sql명령을 작성했다.
      • 국가 이름을 기준으로 그룹핑해줌
      • image
    • dfs = f2015.groupBy("DEST_COUNTRY_NAME").count()
      dfs.show()
      
      • 위에 spark.sql을 사용했을 때와 결과가 같다.
      • image
      • 쿼리에 대응되는 메소드를 사용할 수도 있다
    • sqls.explain()
      dfs.explain()
      
      • 두 개의 과정을 한번 살펴보면 완전 똑같다.
      • image
    • spark.sql("SELECT MAX(COUNT) FROM FLIGHTS2015").take(1)
          
      from pyspark.sql.functions import max
      f2015.select(max("count")).take(1)
      
      • pyspark.sql.functions모듈에 ansi표준 쿼리에 대한 내용들이 들어있다.
      • 가장 큰 값인 행 하나만 가져오는것
    • f2015.select("DEST_COUNTRY_NAME").show(5)
          
      spark.sql("SELECT DEST_COUNTRY_NAME FROM flights2015 LIMIT 5").show()
      
      • image
      • show(5)와 limit 5가 동일하게 쓰일 수 있음
    • spark.sql("SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM flights2015 LIMIT 5").show()
          
      f2015.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(5)
      
      • 여러개의 컬럼을 가져올때는 ,(COMMA)를 이용
      • image
    • from pyspark.sql.functions import expr, col
      f2015.select(expr("DEST_COUNTRY_NAME"), col("DEST_COUNTRY_NAME")).show(5)
      
      • pyspark.sql.functions 모듈을 사용해보자.
      • expr과 col은 동일하게 쓰이는 것을 확인할 수 있다.
      • image
      • 그럼 과연 두 가지의 차이는 무엇일까?
    • f2015.select(expr("DEST_COUNTRY_NAME as destination")).show(5)
      f2015.select(col("DEST_COUNTRY_NAME").alias("destination")).show(5)
      
      • expr에서는 as와 같은 표현식을 사용할 수 있다.
      • col은 말그대로 컬럼만 가져올 때 사용하는 것이다.
      • image
    • spark.sql("SELECT DEST_COUNTRY_NAME destination FROM flights2015 LIMIT 5").show()
      
      • spark.sql을 사용해 ansi표준쿼리로 위와 같은 코드를 짜보았다.
      • image
    • f2015.selectExpr("DEST_COUNTRY_NAME as destination").show(5)
      
      • image
      • df.selectExpr()을 사용해도 select를 하면서 alias등 표현식을 사용할 수 있다.
    • f2015.selectExpr("*","(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as domestic_flight").show()
      
      • 도착지와 출발지가 같은 값을 찾아서 국내선 컬럼을 만들 수도 있다.
      • image
    • spark.sql("SELECT AVG(COUNT), COUNT(DISTINCT(DEST_COUNTRY_NAME)) FROM flights2015").show()
          
      f2015.selectExpr("AVG(COUNT)", "COUNT(DISTINCT(DEST_COUNTRY_NAME))").show()
      
      • image
    • # literal = 값 자체
      from pyspark.sql.functions import lit
      f2015.select(expr("*"), lit(1).alias("one")).show(5)
          
      spark.sql("SELECT *, 1 as one FROM flights2015 LIMIT 5").show()
      
      • lit은 literal(값 자체)를 뜻한다.
      • 단순히 1값이 들어가있는 컬럼을 만든것임
      • image
    • f2015.withColumn("DOMESTIC_FLIGHT", expr("DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME")).show()
      
      • withColumnselect * 과 같다.
      • image
    • f2015.withColumnRenamed("DEST_COUNTRY_NAME", "DESTINATION").show()
      
      • withColumnRenamed는 원본의 컬럼명을 바꿔주는 메서드
      • image
    • f2015.drop("count").show()
      f2015.drop("count").columns
      
      • drop()은 컬럼을 삭제
      • .columns로 현재 있는 컬럼명들을 볼 수 있다.
      • image
    • f2015.withColumn("count2", col("count").cast("string")).summary()
      f2015.withColumn("count2", col("count").cast("string")).show()
      spark.sql("SELECT *, CAST(COUNT AS STRING) AS COUNT2 FROM FLIGHTS2015").show()
      
      • cast() : 컬럼을 복제하면서 속성을 바꿔줌(str)
      • image
    • f2015.filter(col("count") < 2).show(5)
      f2015.where("count < 2").show(5)
      spark.sql("SELECT * FROM flights2015 WHERE count < 2 LIMIT 5").show()
      
      • image
      • filter() : WHERE절과 같이 조건을 걸 수 있음
    • # WHERE가 동시에 실행된다.
      f2015.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(5)
      spark.sql("SELECT * FROM FLIGHTS2015 WHERE COUNT < 2 AND ORIGIN_COUNTRY_NAME != 'Croatia' LIMIT 5").show()
      
      • image
      • WHERE절에서 AND조건을 사용하는것 처럼 여러 조건을 동시에 적용시킬 수 도 있다. where().where()
    • f2015.select("DEST_COUNTRY_NAME").distinct().count()
          
      spark.sql("SELECT COUNT(DISTINCT(DEST_COUNTRY_NAME)) FROM FLIGHTS2015").show()
      
      • image
      • distinct()는 중복되는 값들을 제거해준다.
      • count()는 개수를 셀 때 사용
    • from pyspark.sql import Row
      new_rows = [
          Row("Korea","Korea",5),
          Row("Korea","Wakanda",1)
      ]
      rdd_rows = sc.parallelize(new_rows)
      schema = f2015.schema
      df_rows = spark.createDataFrame(rdd_rows, schema)
      df_rows.show()
      
      • f2015의 구조(schema)를 가져와서 새로 만든 df_rows에도 적용할 수 있다.
      • image
    • # f2015와 df_rows를 union하고, count = 1이고 origin~ us가 아닌 데이터를 출력하자.
      f2015.union(df_rows).where("count=1").where(col("ORIGIN_COUNTRY_NAME") != "United States").show()
      
      • union()으로 데이터프레임 두 개를 합친다.
      • 그리고 조건 2가지는 where().where()로 해결
      • image
    • f2015.sort("count").show(f2015.count())
      f2015.orderBy(col("count").asc()).show(f2015.count())
      spark.sql("SELECT * FROM FLIGHTS2015 ORDER BY COUNT ASC").show(f2015.count())
      
      • count개수를 기준으로 오름차순으로 정렬한다.
      • image
      • 내림차순으로 하고싶으면 desc로 설정
    • # DEST ~ 내림차순, COUNT는 오름차순 정렬 후 5개만 출력
      f2015.orderBy(col("DEST_COUNTRY_NAME").desc(), col("count").asc()).show(5)
          
      spark.sql("SELECT * FROM FLIGHTS2015 ORDER BY DEST_COUNTRY_NAME DESC, COUNT ASC LIMIT 5").show()
      
      • image
      • order by를 중첩해서 사용하는 방법
    • f2015.limit(5).show()
      
      • .limit()도 사용할 수 있음


  • 이번엔 새로운 데이터를 가지고 다뤄보았다.

    • retails = spark.read.format("csv").option("header","true").option("inferSchema", "true").load("/home/big/data/retails/2010-12-01.csv")
          
      retails.printSchema()
      
      • printSchema()를 사용해 구조 확인
      • image
    • retails.createOrReplaceTempView("retails")
      retails.show()
      spark.sql("SELECT * FROM retails").show()
      
      • spark.sql방식도 사용하기 위해 테이블을 생성해준다.
      • 테이블 확인
    • retails.where(col("InvoiceNo") != 536365).select("InvoiceNo", "Description").show(5)
          
      retails.where("InvoiceNo <> 536365").show(5, False)
      
      • show(,False)를 하면 생략되어있는 내용까지 나온다.
      • image
    • from pyspark.sql.functions import instr
      priceFilter = col("UnitPrice") > 600
      descriptFilter = instr(retails.Description, "POSTAGE") >= 1
      retails.where(retails.StockCode.isin("DOT")).where(priceFilter | descriptFilter).show()
          
      spark.sql("""
      SELECT * FROM retails 
      WHERE StockCode in ('DOT')
      AND (UnitPrice > 600 OR INSTR(Description, 'POSTAGE') >= 1)
      """).show()
      
      • image
      • 필터를 만들어서 조건에 필터를 적용
      • |는 or과 같음
      • instr부분은 Description에 POSTAGE라는 값이 있으면 인덱스를 알려줌
    • dotCodeFilter = col("StockCode") == "DOT"
      priceFilter = col("UnitPrice") > 600
      descriptFilter = instr(col("Description"), "POSTAGE") >= 1
          
      retails.withColumn("isExpensive", dotCodeFilter & (priceFilter | descriptFilter)).where("isExpensive")\
      .select("UnitPrice", "isExpensive").show(5)
          
      spark.sql("""
      SELECT UnitPrice, (StockCode = 'DOT' AND (UnitPrice > 600 OR INSTR(Description, 'POSTAGE') >= 1)) as isExpensive
      FROM retails
      WHERE (StockCode = 'DOT' AND (UnitPrice > 600 OR INSTR(Description, 'POSTAGE') >= 1))
      """).show()
      
      • image
      • 이것이 비싼 것인지 알려주는 isExpensive 컬럼을 추가했다.
      • withColumn은 새 컬럼을 추가할 때 사용
      • 조건을 잘 확인해야함
    • # (현재 갯수 * 가격)^2 + 5
      from pyspark.sql.functions import pow
      quantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
      retails.select(col("CustomerId"), quantity.alias("myQuantity")).show(5)
          
      spark.sql("""
      SELECT CustomerId, (POWER((Quantity * UnitPrice), 2) + 5) as myQuantity
      FROM retails
      LIMIT 5
      """).show()
      
      • image
      • pyspark.sql.functions 모듈의 pow함수를 이용해 제곱을 할 수 있다.
      • 일반 Query문에서는 POWER를 사용하면 제곱을 할 수 있다.
    • from pyspark.sql.functions import round, bround
      retails.select(round(lit("2.5")), bround(lit("2.5")), lit("2.5")).show(5)
          
      spark.sql('SELECT ROUND(2.5), BROUND(2.5) FROM retails').show(5)
      
      • image
      • round(), bround()를 사용해서 반올림, 내림을 할 수 있다.
    • retails.describe().show()
          
      from pyspark.sql.functions import count, mean, stddev_pop, min, max
      retails.select(count("UnitPrice"), mean("UnitPrice"), stddev_pop("UnitPrice"), min("UnitPrice"), max("UnitPrice")).show()
      
      • image
      • describe는 count, mean, stdev, min, max를 summary 해준다.
      • pyspark.sql.functions 모듈에서 이 함수들을 가져와 사용해도 된다.
      • image
    • from pyspark.sql.functions import monotonically_increasing_id
      retails.select("*", monotonically_increasing_id()).show(5)
      
      • image
      • 자동으로 숫자가 올라가는 monotonically_increasing_id()
    • from pyspark.sql.functions import initcap
      retails.select(initcap(col("Description"))).show()
      
      • 첫 글자만 대문자로 바꿔주는 initcap()
      • 중략된 부분을 다 보고싶다면 show(,False)
      • image
    • from pyspark.sql.functions import lower, upper
      retails.select(col("Description"), lower(col("Description")), upper(col("Description"))).show(5, False)
      
      • 소문자로 만드는 lower(), 대문자로 만드는 upper()
      • image
    • from pyspark.sql.functions import ltrim, rtrim, trim, lpad, rpad
      retails.select(ltrim(lit('     hello     ')).alias("ltrim"), rtrim(lit('     hello     ')).alias("rtrim"), trim(lit("     hello     ")).alias("trim"), lpad(lit("hello"), 10, '*'), rpad(lit("hello"), 10, '*')).show(1)
          
      spark.sql("""
      SELECT LTRIM('     hello     ') as ltrim, RTRIM('     hello     ') as rtrim, TRIM('     hello     ') as trim, LPAD('hello', 10, '*') as lpad, RPAD('hello', 10, '*') as rpad
      FROM retails
      """).show(1)
      
      • image
      • ltrim() : 왼쪽 공백 없애기
      • rtrim() : 오른쪽 공백 없애기
      • trim() : 양쪽 공백 없애기
      • trim을 할 때는 tap(\t)은 지워지지 않는다!
      • lpad() : 오른쪽으로 정렬 후 왼쪽 공백 채우기
      • rpad() : 왼쪽으로 정렬 후 오른쪽 공백 채우기\
    • from pyspark.sql.functions import regexp_replace
      regex_str = "BLACK|WHITE|RED|GREEN|BLUE"
      retails.select(regexp_replace(col("Description"), regex_str, "COLOR").alias("color"), col("Description")).show(5, False)
          
      spark.sql("""
      SELECT REGEXP_REPLACE(Description, 'BLACK|WHITE|RED|GREEN|BLUE', 'COLOR') as color, Description
      from retails
      """).show(5, False)
      
      • image
      • regexp_replace() : 컬럼 선택 후 정규식에 맞게 선택 해서 새 컬럼을 만들어줌
      • BLAKC, WHITE, RED, GREEN, BLUE라는 값이 Description이라는 열의 값에 존재한다면 color로 바꿔달라는 정규식을 사용
    • from pyspark.sql.functions import translate
      retails.select(translate(col("Description"), "ABCD", "1234"), col("Description")).show(5, False)
      
      • image
      • regexp_replace 정규식과 비슷한 기능의 translate()
      • A는 1, B는 2, C는 3, D는 4로 변하는 것을 확인할 수 있다.
    • from pyspark.sql.functions import regexp_extract
      extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
      retails.select(regexp_extract(col("Description"), extract_str, 1).alias("extract"), col("Description")).show(5, False)
      
      • image
      • 해당 패턴(자바 정규식 형태)에 맞는 값을 가져오는 regexp_extract()
      • 처음에 찾은놈을 하나를 가져와서 뒤에오는 놈은 값이 반환되지 않는다.
    • containsBlack = instr(col("Description"), "BLACK") >= 1
      containsWhite = instr(col("Description"), "WHITE") >= 1
      retails.withColumn("hasBlackWhite", containsBlack | containsWhite).select("Description", "hasBlackWhite").show(5, False)
      
      • image
      • Description 컬럼에 BLACK, WHITE값이 존재했을 때의 조건을 만들어서 새로운 컬럼을 만드는 withColumn()를 사용했다.


  • 이번에는 날짜를 다뤄보자.

    • from pyspark.sql.functions import current_date, current_timestamp
      date_df = spark.range(10).withColumn("today_date", current_date()).withColumn("now_timestamp", current_timestamp())
      date_df.show()
      
      • image
      • current_date(), current_timestamp() : 현재 날짜(년,월,일)과 현재 시간(년,월,일,시,분,초)를 가져오는 메서드
    • date_df.createOrReplaceTempView("dateTable")
      spark.sql("SELECT * FROM dateTable").show()
      
      • image
      • Table을 생성 후 확인
    • from pyspark.sql.functions import date_add, date_sub
      date_df.select(date_sub(col("today_date"),5), date_add(col("today_date"), 5)).show(1)
          
      spark.sql("SELECT DATE_SUB(today_date, 5) as sub, DATE_ADD(today_date, 5) as add FROM dateTable").show(1)
      
      • image
      • date_add() : 날짜를 더하기
      • date_sub() : 날짜를 빼기
    • from pyspark.sql.functions import datediff, months_between, to_date
          
      date_df.withColumn("week_ago", date_sub(col("today_date"), 7)).select(datediff(col("week_ago"), col("today_date"))).show(1)
          
      date_df.select(to_date(lit("2022-03-15")).alias("now"), to_date(lit("2022-05-13")).alias("end")).select(months_between(col("now"), col("end"))).show(1)
      
      • image
      • datediff(): 두 개의 날짜의 일수 차이를 구하는 메서드
      • image
      • months_between() : 두 날짜의 개월 수 차이를 출력
      • 1.9~~ 로 약 2달 차이가 나는 것을 확인할 수 있다.
    • date_df.select(to_date(lit('2022-12-32'))).show(1)
      
      • image
      • 만약 2022-12-32일을 선택하면 null값이 리턴된다.
    • # simpleDateFormat (java)
      dateFormat = 'yyyy-dd-MM'
      clean_date = spark.range(1).select(to_date(lit('2022-11-12'), dateFormat).alias('date'))
      clean_date.show()
      
      • image
      • 날짜를 년-일-월 형식으로 포맷했기 때문에 실제 날짜가 2022-12-11로 나온다.


  • Null 관련 함수들을 다뤄보자.

    • null_df = sc.parallelize(
      [
          Row(name='Kang', phone='010-0000-0000', address='Seoul'),
          Row(name='Shin', phone='010-1111-1111', address=None),
          Row(name='You', phone=None, address=None)
      ]
      ).toDF()
      null_df.show()
          
      null_df.createOrReplaceTempView("nullTable")
      spark.sql("SELECT * FROM nullT")
      
      • image
      • Row로 한 줄 객체를 만들어서 그 객체 3개를 RDD로 만들었다.
      • 그리고 그 RDD를 .toDF()를 사용해 DataFrame으로 만들었다.
      • 그 데이터프레임을 사용해 Table을 만들고 그것을 SQL문으로 조회했다.
    • from pyspark.sql.functions import coalesce
      null_df.select(coalesce(col("address"), col("phone")).alias("coalesce")).show()
      
      • image
      • coalesce() : null이 아닌 첫 번째 컬럼 값을 출력한다.
      • 그래서 address에 값이 null이 아니면 값을 가져오고, 만약 null이라면 phone의 값을 가져온다.
    • # coalesce외에 null과 관련된 함수들
      spark.sql("""
      SELECT IFNULL(NULL, 'VALUE'), NULLIF('SAME','SAME'), NULLIF('SAME','NOTSAME'), NVL(NULL, 'VALUE'), NVL2(NULL, 'VALUE', 'VALUE2'), NVL2('NOTNULL', 'VALUE', 'VALUE2')
      FROM nullTable
      """).show(1)
      
      • 그 외에 null과 관련된 함수들
        • image
        • ifnull : 첫 번째 값이 null이면 두번째 값 return
        • nullif : 두 값이 같으면 null
        • nvl : 첫 번째 값이 null이면 두 번째 값 return
        • nvl2 : 첫 번째 값이 null이면 두 번째 값, 아니면 세번째 값 return
    • # DataFrameNaFunction : drop
      null_df.count() # 3
      null_df.na.drop().count() # 1
      null_df.na.drop('any').count() # 1
      null_df.na.drop('all').count() # 3
      null_df.na.drop('all', subset=['phone']).count() # 2
      null_df.na.drop('all', subset=['address']).count() # 1
      null_df.na.drop('all', subset=['phone', 'address']).count() # 2
          
      
      • image
      • na.drop() : null값이 존재하면 삭제?(제외) 해버린다.
      • na.drop() 안에는 'all'이나 'any'가 들어갈 수 있다.
      • na.drop('any') : null 값이 하나라도 존재한다면 삭제
      • na.drop('all') : 모든 값이 null값이면 삭제
      • subset=['컬럼명'] 속성을 사용해서 특정 컬럼 값이 null 값인 것을 삭제할 수 있다.
    • # DataFrameNaFunction : fill
      null_df.na.fill('n/a').show()
      null_df.na.fill('n/a', subset=['name','address']).show()
          
      fill_cols_val = {"phone": "070-000-0000", "address":"street"}
      null_df.na.fill(fill_cols_val).show()
      
      • image
      • na.fill() : null값이 존재하면 지정한 값으로 채워주기
      • dictionary형 객체를 이용해서 한번에 여러 개의 값들을 채울 수도 있다.
    • # DataFrameNaFunction : replace
      null_df.na.replace(["Seoul"], ["서울"], "address").show()
      
      • image
      • 기존 Seoul을 서울로 변경


  • 구조체 : dataframe 안에 dataframe

    • retails.selectExpr("(Description, InvoiceNo) as complex", "*").show(5, False)
      
      • image
    • from pyspark.sql.functions import struct
      complex_df = retails.select(struct("Description", "InvoiceNo").alias("complex"))
      complex_df.createOrReplaceTempView("complexdf")
      spark.sql('SELECT * FROM complexdf').show()
      
      • image
      • struct() : 컴플렉스 타입으로 컬럼들을 묶어버림
      • Description과 InvoiceNo을 합친 데이터프레임
      • complexdf라는 테이블을 생성했다.
    • complex_df.select("complex").show(5, False)
      # Description만 가져오기
      complex_df.select("complex.Description").show(5, False)
      # InvoiceNo만 가져오기
      complex_df.select(col("complex").getField("InvoiceNo")).show(5, False)
      
      • image
      • getField() : 특정 컬럼을 가져오는 함수
    • complex_df.select("complex.*").show(5,False)
      
      • image
      • "컬럼명.*" 을 사용하면 묶여있던 컬럼들이 다 조회된다.


  • 다시 기존 데이터를 사용해서 배열 가지고 놀아보자.

    • 여기서는 spark자체의 배열 함수를 사용하기 때문에 sql문으로 따로 사용할 수 없다고한다.

    • from pyspark.sql.functions import split
      retails.select(split(col("Description"), " ")).show(5, False)
      retails.select(split(col("Description"), " ").alias("arrays")).selectExpr("arrays[0]").show(5)
      
      • image
      • split() : 설정한 것을 기준으로 자름(여기서는 공백)
      • 자른 것들 중 첫 값을 가져올 수 있다.
    • from pyspark.sql.functions import size
      retails.select(size(split(col("Description"), " ")).alias("array_size")).show(5)
      
      • image
      • size() : 해당 배열의 길이(갯수)를 가져오는 함수
      • 공백을 기준으로 값을 나누고 그 배열의 길이를 출력했다.
    • from pyspark.sql.functions import array_contains
      retails.select(array_contains(split(col("Description"), " "), "WHITE")).show(5)
      
      • image
      • array_contains() : 배열에 해당 값이 존재하는지 확인하는 함수
      • WHITE란 값이 존재하는지 안하는지 확인할 수 있다.
    • from pyspark.sql.functions import create_map
          
      retails.select(create_map(col("StockCode"), col("Description")).alias("complex_map")).show(5, False)
          
      retails.select(create_map(col("StockCode"), col("Description")).alias("complex_map")).selectExpr("complex_map['84406B']").show()
      
      • image
      • create_map() : 키와 밸류의 한쌍으로 만들어주는 함수
      • image
      • 특정 키값(84406B)을 넣어서 데이터를 조회하면 해당되는 value값이 나온다.


  • 사용자 정의함수

    • def power3(value):
          return value**3
          
      # user define function
      from pyspark.sql.functions import udf
      pow3 = udf(power3)
          
      user_def_df = spark.range(5).toDF("num")
      user_def_df.select(pow3(col("num")), col("num")).show()
      
      • image
      • 넣은 인수를 3제곱한 값이 나오게하는 함수를 power3를 만들었다.
      • udf() : 이 함수는 사용자 정의 함수를 만들어주는 함수
      • 그래서 udf를 이용해 pow3라는 이름으로 세제곱시키는 함수를 등록해줬다.
      • 0부터 4까지 수를 만들어 DF를 만들어주고 pow3가 제대로 작동하는지 확인
      • spark를 껐다 키면 함수가 사라진다.


  • 모든 retails 데이터를 가져와서 묶어서 또 다뤄보자.

    • retails_all = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/home/big/data/retails/*.csv")
      
      • load할 때 asterisk(*)를 사용해서 모두 불러옴
    • retails_all.count()
      
      • image
      • 총 541909개의 Row가 있는 것을 확인
    • retails_all.printSchema()
      
      • image
      • 구조는 이러하다.
    • retails_all.createOrReplaceTempView("retailsAll")
      
      • retailsAll 이라는 테이블을 생성해주자.
      • retails_all. 을 적고 탭을 두번 누르면 사용할 수 있는 함수가 모두 나온다.
  • 여러가지 조회와 집계 함수 등을 다뤄보자.

    • from pyspark.sql.functions import count
      retails_all.select(count("StockCode")).show()
      spark.sql("SELECT COUNT(*) FROM retailsALL").show()
      
      • image
      • 똑같이 데이터가 541909개인 것을 확인할 수 있다.
    • from pyspark.sql.functions import countDistinct
      retails_all.select(countDistinct("*").alias("countDistinct")).show()
      spark.sql("SELECT COUNT(DISTINCT(*))) AS COUNTDISTINCT FROM retailsAll").show()
      
      • image
      • countDistinct() : 중복되는 값을 제하고 개수를 센다.
      • 중복을 없애고 데이터가 약 40만개로 줄은 것을 확인할 수 있다.
    • from pyspark.sql.functions import first, last
      retails_all.select(first("StockCode"), last("StockCode")).show()
      
      • image
      • first() : 첫 값 반환
      • last() : 마지막 값 반환
    • from pyspark.sql.functions import min, max
      retails_all.select(min("Quantity"), max("Quantity")).show()
      
      • image
      • min() : 최소 값
      • max() : 최대 값
    • from pyspark.sql.functions import sumDistinct
      retails_all.select(sumDistinct("Quantity")).show()
      spark.sql("SELECT SUM(DISTINCT(Quantity)) FROM retailsALL").show()
      
      • image
      • sumDistinct() : 중복값 빼고 다 더해버리기
    • from pyspark.sql.functions import count, sum, avg, expr
      retails_all.select(count("Quantity").alias("countQuantity"), sum("Quantity").alias("sumQuantity"), avg("Quantity").alias("avgQuantity"), expr("mean(Quantity)").alias("meanQuantity")).show()
      
      • image
      • avg() : 평균을 구하는 함수
      • sum() : 값들을 더하는 함수
      • expr() : 표현식을 가능하게 하는 함수
    • from pyspark.sql.functions import var_pop, stddev_pop, var_samp, stddev_samp
          
      retails_all.select(var_pop("Quantity").alias("varpop"), stddev_pop("Quantity").alias("stddevpop"), var_samp("Quantity").alias("varsamp"), stddev_samp("Quantity").alias("stddevsamp")).show()
      
      • image

      • 분산, 표준편차
        var_pop : 모집단 분산(모분산)
        stddev_pop : 모집단 표준편차(모표준편차)
        var_samp : 표본 분산
        stddev_samp : 표본 표준편차
        
      • 모, 표준의 차이가 엄청나게 크지 않다.
    • from pyspark.sql.functions import corr, covar_pop, covar_samp
          
      retails_all.select(corr("InvoiceNo", "Quantity").alias("corr"), covar_pop("InvoiceNo", "Quantity").alias("pop"), covar_samp("InvoiceNo", "Quantity").alias("samp")).show()
      
      • image

      • 공분산, 상관관계
        corr: 피어슨 상관관계 (DataFrame.corr과 같음)
        covar_pop : 모집단 공분산
        covar_samp : 표본집단 공분산
        
    • from pyspark.sql.functions import collect_set, collect_list, agg
      retails_all.agg(collect_set("Country"), collect_list("Country")).show()
      
      • image

      • agg : aggregate
        collect_set : 중복 X
        collect_list : 중복 O
        
    • retails_all.groupBy("InvoiceNo","CustomerId").count().show()
          
      spark.sql("SELECT InvoiceNo, CustomerId, COUNT(*) FROM retailsAll GROUP BY InvoiceNo, CustomerId").show()
      
      • image
      • groupBy() : 그룹으로 묶어준다.
      • 여기에서는 InvoiceNo와 CustomerId로 묶어서 개수를 세주었다.
    • retails_all.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"), expr("count(Quantity)")).show()
      
      • image
      • 송장번호별로 그룹지어서 quantity를 세기
    • from pyspark.sql.functions import to_date, col
          
          
      date_df = retails_all.withColumn("date", to_date(col("InvoiceDate"), "yyyy-MM-dd HH:mm:ss"))
          
      date_df.createOrReplaceTempView("date_df")
          
          
      from pyspark.sql.window import Window
      from pyspark.sql.functions import desc
          
      window_function = Window.partitionBy("CustomerId", "date").orderBy(desc("Quantity")).rowsBetween(Window.unboundedPreceding, Window.currentRow)
          
      max_quantity = max(col("Quantity")).over(window_function)
          
      from pyspark.sql.functions import dense_rank, rank
          
      win_dense_rank = dense_rank().over(window_function)
      win_rank = rank().over(window_function)
          
      date_df.where("CustomerId IS NOT NULL").orderBy("CustomerId").select(col("CustomerId"), col("date"), col("Quantity"), win_rank.alias("quantityRank"), win_dense_rank.alias("quantityDense"), max_quantity.alias("quantityMax")).show(5,False)
      
      • image

2022

[web]jQuery 복습 3

1 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]jQuery 복습 2

13 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]jQuery 복습 1

14 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리4

5 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리3

10 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리2

7 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리1

8 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]CSS 기초 정리

11 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]HTML 기초 정리

8 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[Pandas]pandas 연습

3 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

맨 위로 이동 ↑

2021

[Python기초]module

1 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

맨 위로 이동 ↑