일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- login crawling
- BigQuery
- Counterfactual Explanations
- requests
- 상관관계
- hadoop
- flask
- API
- correlation
- 공분산
- GenericGBQException
- integrated gradient
- grad-cam
- gather_nd
- XAI
- Retry
- youtube data
- top_k
- GCP
- subdag
- chatGPT
- spark udf
- Airflow
- API Gateway
- UDF
- airflow subdag
- 유튜브 API
- tensorflow text
- TensorFlow
- session 유지
- Today
- Total
데이터과학 삼학년
[Spark] 스파크 사용 최적화 / 유의사항!!! 본문
스파크 사용 시 흔히 저지르는 실수와 최적화 방법
스파크는 빅데이터 처리를 위한 프레임워크로, 빠른 처리 속도로 유명합니다. 이전 글에서는 스파크의 기본 개념과 빠른 이유에 대해 다루었습니다. 이번 글에서는 스파크 애플리케이션의 성능과 메모리 활용을 개선하기 위해 알아야 할 흔한 실수와 최적화 방법에 대해 다루겠습니다. 여기에는 클러스터 최적화, 설정 값 조정, 코드 수준의 최적화 등이 포함됩니다.
실수 1: 지연 평가(Lazy Evaluation)를 이해하지 못함
스파크는 전통적인 스크립트처럼 코드 라인별로 실행되지 않습니다.
data = spark.read.csv("large_file.csv")
data.filter(data["age"] > 30)
print("Filtering done.")
위 코드에서 print 문은 실행되지만, 스파크는 아직 필터링을 처리하지 않았습니다. 스파크는 지연 평가를 사용하여, 변환 작업은 액션(예: .collect() 또는 .saveAsTextFile())이 트리거될 때만 실행됩니다. 즉각적인 실행을 기대하면 혼란스러울 수 있습니다.
변환 작업(지연된 연산)과 액션(실행을 트리거하는 연산)의 차이를 이해해야 합니다.
data = spark.read.csv("large_file.csv")
filtered_data = data.filter(data["age"] > 30)
filtered_data.show() # 이 액션이 실행을 트리거합니다.
###
실수 2: 데이터 분포를 고려하지 않고 기본 파티션 사용
잘못된 데이터 파티셔닝은 작업 부하 분배의 불균형을 초래하여 작업이 느려지거나 충돌할 수 있습니다.
파티셔닝 무시
데이터 파티셔닝은 성능에 매우 중요합니다. 특히 데이터셋을 조인할 때 그렇습니다. 적절한 파티셔닝 없이 스파크는 데이터를 반복적으로 셔플링하여 불필요한 성능 저하를 초래할 수 있습니다. 병렬 처리는 스파크 작업 튜닝에서 매우 중요한 역할을 합니다. 각 파티션 ~ 작업은 처리에 단일 코어를 필요로 합니다.
너무 많은 또는 너무 적은 파티션 사용
너무 많은 파티션은 불필요한 오버헤드를 추가할 수 있고, 너무 적은 파티션은 클러스터 자원을 충분히 활용하지 못할 수 있습니다. 일반적인 가이드라인은 코어 수의 2~3배 정도의 파티션을 가지는 것입니다. 큰 데이터셋에 대해 파티션을 조정하지 않으면 불균형한 처리가 발생할 수 있습니다.
large_data = spark.read.csv("large_file.csv")
print(large_data.rdd.getNumPartitions()) # 기본 파티션 수가 높거나 낮을 수 있습니다.
###
.repartition() 또는 .coalesce() 사용하여 파티셔닝 조정
partitioned_data = large_data.repartition(10)
print(partitioned_data.rdd.getNumPartitions()) # 이제 10개의 파티션으로 설정되었습니다.
.repartition(numPartitions): 파티션 수를 증가 또는 감소시키는 데 사용됩니다. 클러스터의 노드 간에 데이터를 셔플링하므로 비용이 많이 듭니다. 더 많은 파티션이 필요하거나 조인 작업 후 데이터를 고르게 분배할 때 유용합니다.
- .coalesce(numPartitions): 주로 파티션 수를 줄이는 데 사용됩니다. 동일한 노드 내에서 파티션을 병합하여 셔플링을 피하므로 .repartition()보다 저렴합니다. 특히 처리 후 출력 준비를 위해 파티션을 통합할 때 유용합니다.
실수 3: 캐시/퍼시스트를 적절히 사용하지 않음
데이터를 여러 번 사용할 때 캐시하지 않으면 스파크는 각 변환을 다시 계산합니다. 이는 변환이 복잡할 경우 비용이 많이 들 수 있습니다.
# 캐시 없이 데이터를 여러 번 사용
filtered_data = data.filter(data["age"] > 30)
filtered_data.count()
filtered_data.show()
각 액션은 필터링 단계를 다시 트리거합니다.
.cache() 또는 .persist() 사용
# 데이터를 재사용할 의도가 있다면 캐시합니다.
filtered_data = data.filter(data["age"] > 30).cache()
filtered_data.count()
filtered_data.show() # 이제 더 빠르게 실행됩니다.
캐시 및 퍼시스트의 부적절한 사용
많은 사람들이 캐시(예: cache() 또는 persist())를 간과하여 반복 알고리즘이나 반복 액세스에서 재계산을 피할 수 있습니다. 그러나 모든 것을 캐시하면 메모리 문제가 발생할 수 있습니다. 필요할 때만 캐시하고 데이터가 더 이상 필요하지 않을 때는 unpersist를 사용하여 캐시를 해제하세요.
너무 많은 변환 체인
중간 액션이나 캐시 없이 긴 변환 체인은 계보를 너무 복잡하게 만들어 실행 시간을 증가시킬 수 있습니다. 때로는 복잡한 체인을 분할하고 액션을 도입하거나 데이터를 캐시하는 것이 좋습니다.
실수 4: 셔플 작업의 잘못된 구성
셔플 작업(예: join, groupBy)은 네트워크와 메모리에 무거운 작업입니다. 잘못된 구성은 이러한 작업이 실패하게 만들 수 있습니다.
joined_data = large_data1.join(large_data2, "id")
joined_data.show()
데이터 크기에 따라 셔플 관련 구성을 조정하고 가능한 셔플을 줄이세요.
spark.conf.set("spark.sql.shuffle.partitions", "100") # 데이터 크기에 따라 조정
joined_data = large_data1.join(large_data2, "id")
joined_data.show()
실수 5: 큰 데이터셋을 드라이버로 수집
큰 데이터셋을 .collect() 또는 .take()로 드라이버로 가져오는 것은 메모리 오류를 일으킬 수 있습니다.
all_data = data.collect() # 전체 데이터셋을 드라이버로 수집
데이터셋이 너무 크면 드라이버가 충돌할 수 있습니다. 작은 데이터 하위 집합에 대해 take 또는 takeSample과 같은 액션을 사용하는 것이 좋으며, 데이터셋 크기가 관리 가능한 경우를 제외하고 collect를 피하세요.
sample_data = data.limit(1000).collect() # 관리 가능한 양만 수집
# 또는 데이터를 직접 저장소에 씁니다.
data.write.parquet("output_path")
실수 6: 최적화 기술을 생략
스파크에는 성능을 향상시키기 위한 최적화가 있지만, 이를 언제 사용해야 하는지 이해해야 합니다.
large_data.join(small_data, "id").show() # 큰 테이블과 작은 테이블의 느린 조
작은 테이블에 대해 브로드캐스트 조인을 사용하고, 스파크의 Catalyst 옵티마이저에 의해 최적화된 DataFrame API를 활용하세요.
# 작은 테이블에 대해 브로드캐스트 조인을 사용
from pyspark.sql.functions import broadcast
large_data.join(broadcast(small_data), "id").show() # 브로드캐스트가 조인을 최적화합니다.
실수 7: 비효율적인 집계
집계 작업을 실행할 때 성능에 미치는 영향을 이해하지 못함.
# 큰 데이터를 직접 그룹화하는 대신
data.groupBy("category").sum("amount").show()
# 가능한 경우 맵 사이드 집계를 사용
data = data.map(lambda x: (x['category'], x['amount'])) \\
.reduceByKey(lambda a, b: a + b)
groupByKey 대신 reduceByKey 사용: groupByKey는 동일한 키를 가진 모든 값을 단일 실행기로 보내 잠재적으로 메모리 문제를 일으킬 수 있습니다. 반면 reduceByKey는 각 파티션에서 로컬 축소를 수행한 후 데이터를 노드 간에 전송하여 일반적으로 더 효율적입니다.
참고자료
https://medium.com/@vinciabhinav7/apache-spark-common-mistakes-14407bebe259
'Hadoop' 카테고리의 다른 글
Spark DataFrame vs Pandas DataFrame (0) | 2023.05.31 |
---|---|
[Spark] 사용자 정의함수(UDF) (0) | 2023.03.06 |
Hadoop Ecosystem 하둡 에코시스템 간단정리 (0) | 2022.10.02 |