[Spark]스파크 복습 - 1(RDD)

Spark 1일차

  • Tips!
    • [Ctrl] + [L] : clear와 똑같이 화면 청소
    • sc는 sparkContext의 약자
    • 로컬호스트 8088번(localhost:8088)에 접속해서 여기 클릭

      • image
      • 이 곳에서 내가 했던 작업들에 대한 여러가지 정보나 로그들을 yarn이 관리하고 있는 것을 확인할 수 있다.
        • image
    • zeppelin 노트북 다운받을 때는 두 번째껄로 다운!
      • image


RDD

Transformation

  • filter, map, flatmap, zip, reduceByKey, sortBy, glom 등…

Action

  • collect, first, stats, take, reduce, fold, aggregate 등..

RDD 실습

transforamtion과 action

  • rdd01 = sc.range(0, 1000, 1, 2)
    rdd01.collect()
    
    • 0부터 999까지 수를 갖고 있는 배열을 생성
  • .collect()

    • 갖고 있는 내용 확인
  • .getNumPartitions()

    • 몇 개의 파티션으로 나뉘어서 저장되어 있는지 확인
  • .take()

    • 앞에 적어준 값 만큼 가져오는 함수
    • like pandas의 head()
  • rdd02 = rdd01.filter(lambda x:x % 2)
    rdd02.take(10)
    
    • 짝수가 아닌. 즉, 홀수만 필터링해서 rdd02에 저장
    • 앞의 10개 값만 가져오기
    • image
  • rdd03 = rdd01.filter(lambda x: not x % 2)
    rdd03.take(10)
    
    • not을 사용해서 짝수만 가져오기
    • 앞의 10개 값만 가져오기
    • image
  • countries = ['korea','united states america','united kingdom','japan','france','germany','italia','canada','korea']
    g8 = sc.parallelize(countries, 2)
    g8.collect()
    g8 = g8.distinct()
    g8.count()
    
    • countries를 파티션 2개로 나눠서 저장한 후 내용 확인
    • 중복되는 내용을 없애고 싶다면 .distinct()사용
    • image
  • g8_upper = g8.map(lambda x: x.upper())
    g8_upper.collect()
    
    • map()으로 타겟들에 upper()함수(대문자) 적용
    • image
  • g8_list01 = g8.map(lambda x:list(x))
    g8_list01.collect()
    
    • 단어 하나하나가 리스트로 바뀜
    • 맵은 각각의 요소에 적용
    • image
  • g8_list02 = g8.flatMap(lambda x: list(x))
    g8_list02.collect()
    
    • 플랫맵은 각각의 요소에 넣은 후 전체에 적용
    • 맵과 차이는 접시하나에 올리냐 여러개에 올리냐 차이인듯(flatMap이 한 접시)
    • image
  • counting = sc.range(1, 9, 1, 2)
    counting_g8 = counting.zip(g8)
    counting_g8.collect()
    
    • 1부터 9까지의 수를 g8과 zip해줌
    • image
  • score = [('강호동',10),('유재석',30),('강호동',30),('신동엽',70),('유재석',60)]
    score_rdd = sc.parallelize(score, 2)
    score_rdd.collect()
    score_rdd_rbk = score_rdd.reduceByKey(lambda x, y: x + y)
    score_rdd_rbk.collect()
    
    • 키를 기준(reduceByKey)으로 값들이 밸류를 연산해준다.
    • 여기서는 키값에 맞는 밸류들끼리 더해졌다.
    • image
  • nums = sc.parallelize([1,2,3,1,1,2,5,4],2)
    nums.sortBy(lambda x:x).collect()
    
    • sortBy()는 정렬을 도와준다.
    • image
  • arrs = g8.glom()
    arrs.collect()
    
    • glom() : 파티션 별로 배열 형태로 그루핑시켜줌
    • glom은 데이터의 크기가 너무 크면 에러가 잘 나니깐 꼭 주의!
    • france부터 japan이 파티션1, korea부터 germany가 파티션 2라고 보면되겠다.
    • image
  • nums.max()
    nums.min()
    nums.mean()
    nums.variance()
    nums.stdev()
    nums.stats()
    nums.countByValue()
    
    • min(), max() : 최대, 최소 값
    • mean() : 평균값
    • variance() : 분산
    • stdev() : 표준편차
    • stats() : 개수, 평균, 표준편차, 최대최소값 출력
    • countByValue() : 각 요소의 개수 카운트
    • image
  • g8.first()
    g8.take(3)
    g8.takeOrdered(3)
    g8.top(3)
    g8.count()
    
    • first() : 가장 처음에 위치한 값
    • take() : 적은 만큼의 앞의 값 가져오기
    • takeOrdered() : 정렬해서 값 가져오기
    • top() : 내림차순으로 정렬해서 가져오기
    • count() : 개수 가져오기
    • image
  • # 홀수데이터
    rdd02_sum = rdd02.sum()
    rdd02_sum
    # 짝수데이터
    rdd03_sum = rdd03.sum()
    rdd03_sum
    
    • sum() : 합 구하기
    • image
  • rdd02_fold = rdd02.fold(0, lambda x, y:x + y)
    rdd02_fold
    
    • fold(기본값, 연산) : 데이터를 연산
      • 파티션 단위 연산
    • 여기서는 홀수(rdd02)들이 다 더해지는 연산
    • image
  • rdd02_aggr = rdd02.aggregate(0, max, lambda x, y:x + y)
    rdd02_aggr
    
    • image
  • rdd02_reduce = rdd02.reduce(lambda x, y: x + y)
    
    • reduce() : 데이터를 병렬 연산
  • g8에서 가장 긴 단어와, 가장 짧은 단어를 찾기

    • # 가장 긴 단어
      def g8Max(x, y):
          if len(x) > len(y):
              return x
          else:
              return y
      g8_max_length = g8.reduce(g8Max)
      g8_max_length
      
    • image

    • # 가장 짧은 단어
      g8_min_length = g8.reduce(lambda x, y: x if len(x) < len(y) else y)
      g8_min_length
      
    • image
  • g8.saveAsTextFile("/tmp/g8")
    result = sc.textFile("/tmp/g8/part-000*")
    result.collect()
    
    • g8의 내용을 텍스트 파일로 저장하고 다시 불러오는 과정

    • image

    • 이 파일은 로컬저장소가 아닌 하둡 hdfs에 저장되어있다.

      image

    • hdfs dfs -cat /tmp/g8/part-00000

      image

    • ``hdfs dfs -cat /tmp/g8/part-00001`

      image

    • 파티션 별로 저장되는 것을 눈으로 확인할 수 있다.

  • hdfs(하둡의 프레임워크) 안에 폴더 만들기

    • hdfs dfs -mkdir -p /home/big
      
    • 로컬 호스트 페이지(localhost:9870)에서도 확인할 수 있다.

      • [Utilities] - [Browse the file System]
      • image
  • g8 = sc.textFile("/tmp/g8/part-000*")
    g8.collect()
    key = g8.keyBy(lambda x: x[0])
    key.collect()
    key.keys().collect()
    key.values().collect()
    
    • keyBy(내용) : 첫 글짜를 짤라서 키값으로 만들어 달라는 내용
    • keys() : 키값들
    • values() : 밸류값들
    • image
  • key.mapValues(lambda x: list(x)).collect()
    key.flatMapValues(lambda x: list(x)).collect()
    
    • mapValues() : values들 하나하나에 적용
    • flatMapValues() : 매핑한 결과를 전체에 적용하기 때문에 키 1개 : value 1개 이런식으로 적용
    • image
  • key.groupByKey().mapValues(lambda x: list(x)).collect()
    key.groupByKey().mapValues(lambda x: len(x)).collect()
    
    • key를 가지고 묶은 상태로 values를 list로 저장
    • len()을 사용하면 키 당 만들어진 애들의 개수가 출력
    • image
  • 그렇다면 [('g', 'germany'),... ('u', 'united states america, united kingdom')] 형태로 나오게 하려면 어떻게 해야할까?

    • key.reduceByKey(lambda x, y:x + ", " + y).collect()
      
    • image
  • key.countByKey()
    
    • key에 맞는 values의 개수가 나옴
    • image
  • g8 = sc.textFile("/tmp/g8/part-000*")
    g8_upper = g8.map(lambda x: x.upper())
    g8_upper.collect()
    
    • image


Spark를 사용해 WordCount(단어 세기)를 해보자.

  • data.zip(강사님께 받은 파일)을 드래그앤 드롭으로 우분투로 보내줬다.

    image

  • 폴더를 만들고 압축파일을 풀어보자.

    • mkdir data
    • unzip data.zip -d ./data

    image

  • 이 폴더를 hdfs에 올려보자.

    • hdfs dfs -put /home/big/data /home/big/data

    • 앞이 내가 가지고 있는 경로(ubuntu), 뒤쪽이 보낼 위치(hdfs)

    • hdfs dfs -ls /home/big/data 으로 잘 보내졌는지 확인

      image

  • spark_wc라는 파이썬 파일을 만들어 밑에 내용을 작성하자.

    • vim spark_wc.py

    • 밑에 내용 작성하고 저장

    • import sys, re
      from pyspark import SparkConf, SparkContext
          
      conf = SparkConf().setAppName('Word Count')
      sc = SparkContext(conf=conf)
          
      if (len(sys.argv) != 3):
          print("wordcount.py input_file output_dir 형태로 실행해 주세요")
          sys.exit(0)
      else:
          inputfile = sys.argv[1]
          outputdir = sys.argv[2]
              
      wordcount = sc.textFile(inputfile)\
                 .repartition(10)\
                 .filter(lambda x: len(x) > 0)\
                  .flatMap(lambda x: re.split('\W+', x))\
                   .filter(lambda x: len(x) > 0)\
                    .map(lambda x:(x.lower(), 1))\
                    .reduceByKey(lambda x, y: x + y)\
                    .map(lambda x:(x[1], x[0]))\
                    .sortByKey(ascending=False)\
                    .persist()
          
      wordcount.saveAsTextFile(outputdir)
      top10 = wordcount.take(5)
      result = []
      for counts in top10:
          result.append(counts[1])
      print(result)
          
      
  • 셰익스피어.txt(아까 받은 파일)의 내용을 result에 저장시켜 spark_wc.py 를 실행

    • spark-submit spark_wc.py ~/data/shakespeare.txt ~/result
  • 가장 많이 나온 5개가 출력된다.

    • image
  • 어떤 단어가 몇번 나왔는지 알 수 있다.

    • hdfs dfs -cat ~/result/part-00000
    • image
  • 이 파이썬 파일 해석하는게 오늘 숙제!

2022

[web]jQuery 복습 3

1 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]jQuery 복습 2

13 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]jQuery 복습 1

14 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리4

5 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리3

10 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리2

7 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리1

8 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]CSS 기초 정리

11 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]HTML 기초 정리

8 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[Pandas]pandas 연습

3 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

맨 위로 이동 ↑

2021

[Python기초]module

1 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

맨 위로 이동 ↑