[Spark]스파크 복습 - 4(Streaming/Mlib/번외/파이프라인)

Spark 4일차

Spark Streaming

image

  • DStream(Discretized Stream[데이터의 연속적인 흐름]) : 데이터 추상화
    • 연속적인 데이터 스트림에서 생성된 연속적인 RDD 시퀀스
    • 소켓, 메시징 시스템, 스트리밍 api 등의 데이터 소스를 받아서 생성
    • RDD가 작성되는 것과 같은 방식으로 입력 데이터 저장(내부적으로 RDD)
    • Tumbling window, Sliding window

image

  • spark 사이트 예제
  1. vim streaming_test.py
  2. 사이트 예제 복사해서 붙여넣기 후 저장
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

image

  1. nc -lk 9999 입력

  2. 새 프롬프트 창 실행 후 spark-submit streaming_test.py localhost 9999

    • 그러면 뭐가 계속 작동되는 것이 보인다.
  3. 기존의 프롬프트창에서 뭔가를 입력해보자.

    • 그러면 새 프롬프트창에서 5초마다 입력해준 값들을 받아준다.

    image



MLlib

  • java, scala, python, r 등의 인터페이스에 모두 사용 가능하다.
  • RDD 기반의 Machine Learning LIBrary다.
  • pyspark.mllib -> pyspark.ml(DataFrame 기반의 Machine Learning Package로 전환)
    • pipline : 각 stage 생성 및 연결, 실행
    • spark.mllib -: 저 수준 rdd api 사용
    • spark.ml : dataframe api 사용
  • 예제
# 벡터 생성
from pyspark.ml.linalg import Vectors
denseVec = Vectors.dense(1.0, 2.0, 3.0)
size = 3
idx = [1, 2]
values = [2.0, 3.0]
sparseVec = Vectors.sparse(size, idx, values)
# 샘플 로딩
df = spark.read.json("/home/big/data/simple-ml/")
df.show()
# 고객 건강 데이터셋
# color : 건강 등급
# lab : 실제 건강 상태
# value1 / value2 : 사이트 체류 시간 / 구매 소요 시간

###
from pyspark.ml.feature import RFormula
# RFormula : 데이터 변환 설정
# MLlib에서 사용할 수 있는 데이터 형태로 변환 (Double / Vector[Double] : 레이블 / 값)
# ~ : target과 term 분리
# + : 연결
# - : 삭제
# : 상호작용 (곱셈)
# . : 모든 컬럼
# 모든 변수를 사용해서, value1과 color / value2와 color를 곱해서 새로운 값으로 변경
supervised = RFormula(formula="lab ~ . + color:value1 + color:value2")

# dataframe
fittedRF = supervised.fit(df)
preparedDF = fittedRF.transform(df)
preparedDF.show(5, False)
# 여기까지가 데이터 전처리

# trainset / testset 분리
train, test = preparedDF.randomSplit([0.7, 0.3])
# 모델 알고리즘
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="label", featuresCol="features")
# lr에서 사용할 수 있는 파라미터 (하이퍼파라미터)
print(lr.explainParams())
# 학습 모델 생성
fittedLR = lr.fit(train)
# 학습 모델로 예측할 때 형태 확인 
fittedLR.transform(train).select("label", "prediction").show()
### 사전학습이라고 함

# 파이프라인
train, test = df.randomSplit([0.7, 0.3])
# 변환
from pyspark.ml.feature import RFormula
rForm = RFormula()

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression().setLabelCol("label").setFeaturesCol("features")
from pyspark.ml import Pipeline
# 변환 및 모델 튜닝을 pipeline의 단계로 생성
stages = [rForm, lr]
pipeline = Pipeline().setStages(stages)

# 모델 학습
# ParamGridBuilder : 하이퍼파라미터를 조합해주는 객체
# elasticNetParam : 0 ~ 1 사이의 부동소수점 지정하여 가중치 생성
# regParam : 0보다 크거나 같은 값 지정. 가중치 생성
from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder().addGrid(rForm.formula, ["lab ~ . + color:value1", "lab ~ . + color:value1 + color:value2"]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).addGrid(lr.regParam, [0.1, 0.2]).build()
# rformular 2개 (color:value1, color: value2)
# eleasticnetparam 3개 (0.0, 0.5, 1.0)
# regparam 2개 (0.1,  0.2)
# 하이퍼파라미터를 변경하여 총 12개 학습

# 평가 객체
# 이진 분류 평가
# areaUnderROC : 그래프 아래쪽 면적을 계산하여 모델 평가 (얼마나 잘 맞추고 있는지)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC").setRawPredictionCol("prediction").setLabelCol("label")

# k-fold (k-겹 교차검증)
# training set을 k개의 fold로 나누고, fold 중 하나를 validation set으로 사용 (나머지는 training set)
from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit().setTrainRatio(0.75).setEstimatorParamMaps(params).setEstimator(pipeline).setEvaluator(evaluator)

# 파이프라인 구동 (학습)
tvsFitted = tvs.fit(train)

# 평가
evaluator.evaluate(tvsFitted.transform(test))



번외 : 채팅로그파일 가지고 분석하기

  • 미리 pip install numpy로 넘파이 패키지를 다운받아 놔야 한다.

  • 우리들의 줌 채팅창 대화가 담겨있는 all.csv 를 우분투 /home 으로 옮겨준다.

    image

  • cat all.csv 로 내용 확인

    image

  • 하둡으로 파일 옮기기 : hdfs dfs -put ./all.csv /home/big/all.csv

  • 잘 올라갔는지 확인 : hdfs dfs -ls /home/big

    image

  • 파이스파크 실행 : pyspark

  • pyspark에서 all.csv파일 불러오기

    • multi = spark.read.format("csv").option('header', "true").load("/home/big/all.csv")
  • 대화내용 줄 수 확인 : multi.count()

    image

  • 테이블 생성 후 확인

    • multi.createOrReplaceTempView("multi")
    • multi.show(multi.count())
    • 컬럼은 이렇게 구성되어있음

    image

  • 채팅을 가장 많이 보낸 사람 순서대로 정렬해보자.(7등을 했다..ㅋㅋㅋㅋ😅)

from pyspark.sql.functions import col
multi.groupBy("chat_from").count().orderBy(col("count").desc()).show()

image

  • 강사님에게 DM을 가장 많이 보낸 사람 순으로 정렬하기
multi.where(col("chat_to") == "이동헌강사").groupBy("chat_from", "chat_to")\
.count().orderBy(col("count").desc()).show()

image

  • 시간 별 채팅 횟수
from pyspark.sql.functions import substring
multi.groupBy(substring("chat_time", 1, 2)).count().orderBy(substring("chat_time", 1, 2)).show()

image

  • 날짜 별 채팅 횟수
multi.groupBy("chat_date").count().orderBy("chat_date").show()

image

  • 날짜와 시간 별 채팅 횟수
multi.groupBy("chat_date", substring("chat_time", 1, 2))\
.count().orderBy("chat_date",substring("chat_time", 1, 2)).show()

image

  • 시간 별 각 인원들의 채팅 수
hour_chat = multi.groupBy(substring("chat_time", 1, 2).alias("hour"), "chat_from")\
.count().orderBy(substring("chat_time", 1, 2), col("count").desc())

hour_chat.show(hour_chat.count())

image

  • 바로 위 데이터를 파일로 저장해보자.
hour_chat.write.format("csv").mode("overwrite").save("hour_chat")

로컬 호스트에서 확인하니 RDD가 하나씩 나뉘어서 저장되어있다.

image

hour_chat.coalesce(1).write.format("csv").mode("overwrite").save("/home/big/hour_chat")

image

coalesce()를 사용해서 데이터를 나뉘지 않고 하나에 몰아서 저장된 것을 확인할 수 있다.


  • 이제 이 과정들을 파일로 만드는 방법을 지금부터 해보자.
  • vim hour_chat.py 생성 후 내용 작성
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, substring

# SparkSession와 클러스터매니져 yarn을 사용해 세션 객체 생성
spark = SparkSession.builder.master("yarn").appName("hour_chat").getOrCreate()

multi = spark.read.format("csv").option("header", "true").load("/home/big/all.csv")

hour_chat = multi.groupBy(substring("chat_time", 1, 2).alias("hour"), "chat_from")\
.count().orderBy(substring("chat_time", 1, 2), col("count").desc())

hour_chat.coalesce(1).write.foramt("csv").mode("overwrite").save("/hour_chat")
  • 우분투에서 설치 : pip instal pyspark
  • 파이썬으로 pyspark 파일 실행 : python hour_chat.py
    • spark-submit hour_chat.py 도 가능
  • 잘 올라갔는지 확인 : hdfs dfs -cat /hour_chat/*.csv

  • 하둡 끄기 : stop.all.sh


  • vim multi.sh 생성 후 작성 후 저장!
# 하둡 등 On
start-all.sh

# 대기 시간 설정
sleep 20

pyspark
  • sudo chomd 755 multi.sh
    • 권한 속성을 변경해주자.
  • ./multi.sh 실행
    • 하둡이 켜지고 pyspark까지 켜진다.



Zeppelin 연결

  • 제플린 연결
  1. sudo vim ~/.bashrc

    # zeppelin
    export ZEPPELIN_HOME=/home/big/zeppelin
    export PATH=$PATH:$ZEPPELIN_HOME/bin
    
  2. source ~/.bashrc

  3. cd $ZEPPELIN_HOME/conf

    • ls

    image

  4. cp zeppelin-env.sh.template zeppelin-env.sh

  5. vim zeppelin-env.sh

    • 좌표 19,1

    • export JAVA_HOME=/home/big/java
      
    • 좌표 79,1

    • export PARK_HOME=/home/big/spark
      
    • 좌표 89,1

    • export HADOOP_CONF_DIR=/home/big/hadoop/etc/hadoop
      
  6. cp zeppelin-site.xml.template zeppelin-site.xml

  7. vim zeppelin-site.xml

    • 좌표 24,1 value 부분(어디서든 접속 가능하게)

    • <value>0.0.0.0</value>
      
    • 좌표 30,1 value부분

    • <value>8082</value>
      
  8. cd

  9. start-all.sh

  10. zeppelin-daemon.sh start

    image

  11. FireFox에서 localhost:8082 로 접속

image

  1. [anonymous] - [interpreter] 클릭

image

  1. spark 검색 후 Edit 클릭

    image

  2. 내용 수정 후 SAVE

    • spark.master -> yarn
    • spark.submit.deployMode -> client
    • image
  3. [Notebook] - [Create New Note] 에서 노트 생성

  4. 노트북에 내용 작성 후 Run

    %pyspark
        
    test = [1,2,3,4,5]
    test_rdd = sc.parallelize(test)
        
    test_rdd.collect()
    
  5. zeppelin-daemon.sh stop, stop-all.sh



MySQL 연결

설치

  1. sudo apt install mysql-server -y

  2. sudo service mysql start

  3. sudo mysql_secure_installation

    • 복잡한 패스워드 할꺼? No
    • anonymous user 삭제할꺼? Yes
    • 원격 접속 허용? Yes
    • db 제거 가능, 접근 가능 ? Yes
    • 주요한 테이블 지금 다시 리로드? Yes
  4. cd /etc/mysql/mysql.conf.d

  5. sudo vim mysqld.cnf

    • 어떤 IP든 접속 허용하게 만들기

    • 31, 32번째 줄

    • bind-adderss = 0.0.0.0
      mysqlx-bind-address = 0.0.0.0
      
  6. cd

  7. sudo mysql -u root -p

  8. use mysql;

  9. alter user 'root'@'localhost' identified with mysql_native_password by '1234';

  10. CREATE USER 'root'@'%' IDENTIFIED BY '1234';

    • %는 ip 0.0.0.0 과 같다.
  11. GRANT ALL PRIVILEGES ON *.* TO 'root'@'localhost' WITH GRANT OPTION;

    • 모든 권한을 root의 localhost에게 줌
  12. GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;

    • 모든 권한을 %한테 줌
  13. flush privileges;

    • 부여한 권한 저장
  14. exit

연결

  1. Mysql 사이트로 가서 Community Downloads -> Connector/J

image

  1. Ubuntu Linux 20 버전 선택 후 다운로드 클릭

    image

  2. 링크 주소 복사

    image

  3. 우분투에서 설치

    • wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java_8.0.28-1ubuntu20.04_all.deb
  4. 압축 풀기

  • sudo dpkg -i mysql-connector-java_8.0.28-1ubuntu20.04_all.deb
  1. cd /usr/share/java/

    • ls

      image

  2. cd $SPARK_HOME/conf

  3. vim spark-defaults.conf

    • 작성

      spark.jars   			/usr/share/java/mysql-connector-java-8.0.26.jar
      
  4. pyspark 켜기

    • start-all.sh
    • pyspark
  5. pyspark에서 mysql에 접속하기 위한 변수 만들기

    user="root"
    password="1234"
    url="jdbc:mysql://localhost:3306/mysql"
    driver="com.mysql.cj.jdbc.Driver"
    dbtable="test"
    
  6. DB에서 데이터 불러오기

    test_df = spark.read.format("jdbc").option("user", user).option("password", password)\
    .option("url", url).option("driver", driver).option("dbtable", dbtable).load()
        
    # .options로도 가능하다.
    # test_df = spark.read.format("jdbc").options(user=user, password=password, url=url, driver=driver, dbtable=dbtable).load()
    
  7. 연결된 DB의 Table 확인

    test_df.show()
    
  8. 새로운 데이터를 만들어 Insert준비

    test_insert = [(3, "mysql"), (4, "zeppelin")]
    insert_df = sc.parallelize(test_insert).toDF(["id", "name"])
    insert_df.show()
    
  9. 지정한 DB의 테이블에 append(Insert)

    insert_df.write.jdbc(url, dbtable, "append", properties={"driver": driver, "user": user, "password": password})
    
  10. 제대로 들어갔는지 확인

    test_df.show()
    



MongoDB와 연결

설치

  1. MongoDB 홈페이지 [Resources] - [Documentation] - [Server] - [Installation]

    image

  2. 해당 링크 복사

    image

  3. 우분투에서 설치

    • wget -qO - https://www.mongodb.org/static/pgp/server-5.0.asc | sudo apt-key add -
    • echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu focal/mongodb-org/5.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-5.0.list
  4. 우분투 패키지 APT 업데이트 및 업그레이드

    sudo apt-get update
    sudo apt-get upgrade -y
    
  5. mongodb 패키지 설치

    sudo apt install -y mongodb-org
    
  6. MongoDB 시동 걸기

    sudo systemctl start mongod
    
  7. mongodb 실행 : mongo

pyspark랑 연결

  1. 데이터 삽입 후 확인, 확인되면 종료

    db.test.insert({"id":"10", "name":"mongo"})
    db.test.find()
    
  2. vim spark-defaults.conf 에 내용 작성

    spark.mongodb.input.uri		mongodb://localhost/test
    spark.mongodb.output.uri	mongodb://localhost/test
    # spark.jar.packages			org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
    
  3. spark.jars.packages설정이 제대로 동작하지 않아서, 실행할 때 –packages 옵션으로 dependency명시

    • pyspark --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
    • jar들을 모두 가져옴
  4. pyspark 실행 : pyspark

  5. 잘 연결되었는지 테스트

    • test라는 DB에서 test라는 collection에서 데이터를 가져와서 확인

      test = spark.read.format("mongo").option("database", "test").option("collection", "test").load()
      test.show()
      
    • 데이터프레임 만들어서 insert해보기

      insert_df = spark.createDataFrame([("11", "mongo-spark")],["id", "name"])
      insert_df.write.format("mongo").option("database", "test").option("collection", "test").mode("append").save()
      
    • insert 잘 되었는지 확인

      test = spark.read.format("mongo").option("database", "test").option("collection", "test").load()
      test.show()
      

2022

[web]jQuery 복습 3

1 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]jQuery 복습 2

13 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]jQuery 복습 1

14 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리4

5 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리3

10 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리2

7 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]JavaScript 정리1

8 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]CSS 기초 정리

11 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[web]HTML 기초 정리

8 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

[Pandas]pandas 연습

3 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

맨 위로 이동 ↑

2021

[Python기초]module

1 분 소요

[Noitce] 고쳐야하거나 틀린 것이 있으면 말씀해주세요!

맨 위로 이동 ↑