Hyesung Oh

Pyspark 도입 후 고도화하기/ 4. Optimization feat. spark-default.conf 본문

Data Engineering/Apache Spark

Pyspark 도입 후 고도화하기/ 4. Optimization feat. spark-default.conf

혜성 Hyesung 2021. 11. 2. 00:50
반응형

조금만 검색해보면 spark Performance tuning과 관련된 좋은 참고자료들이 많이 있습니다. 그 중에서 실제 팀에서 적용하여 효과를 보고있는 내용만 선별하여 공유하고자 합니다.


크게 코드 레벨에서의 최적화와 configuration 레벨에서의 최적화 두 가지가 있을 것 같습니다.

1. Code Level Opimization

point 1. filter -> aggregation.
aggregation을 하게 되면 driver 노드에 많은 부하가 있을 수 있습니다. 따라서 이럴 경우 reduceByKey를 이용하여 driver로 전송되는 데이터 사이즈를 최대한 줄이는게 포인트입니다.

point 2. Iterator 최대한 활용하기
driver node로 데이터를 불러와서 작업을 해야하는 경우, toLocalIterator를 활용하여 메모리 부하를 줄였습니다. Iterator와 List의 차이점에 대한 설명은 생략하겠습니다.

iter = info_df\ ~ 중략 .distinct().orderBy(~).persist().toLocalIterator() 
result = asyncio.run(send(iter))

point3. persist 활용하기
spark 에서 wide transformation(left join, groupby, distinct 등)은 성능상 주요 병목이며 그 비용은 분산환경에서 다루는 데이터의 사이즈를 고려했을 때 결코 무시할 수 없습니다. 따라서 lazy evaluation 작동흐름에서 해당 action이 두번이상 적용되는 경우를 잘 캐치하여, persist를 통해 memory(storage memory 영역)에 캐싱함으로써 transformation 비용을 절감할 수 있습니다.

DataFrame의 persist와 rdd의 cache의 차이점에 대한 설명은 여타 블로그에 잘설명되어있어 따로 다루지 않겠습니다.
아래는 코드 예시입니다.

refined_df = (
        temp_df
            .withColumn(생)
            ~ 이하 생략략
    )

refined_df.persist().count()

aggregated_df = (
    refined_df
        .select("temp_unique_id", "event_params")
        ~ groupby, filter, join 생략
)

target_df = (
    refined_df
        .select("temp_unique_id", "properties")
        ~ 이하 groupb, filter, join 생략
)

보시다시피 aggregated_df와 target_df에서 refined_df가 모두 사용되고 있습니다. 따라서 refined_df.persist()를 통해 이전 연산 결과를 각 노드에 분산 저장하게 하였습니다. 여기서 중요한 점은, persist는 lazy하게 동작한다는 점입니다. persist()까지만 해주게 되면 의도한 것과 다르게 동작할 수 있으며, 실제로 겪었던 이슈기도 합니다.

아래는 SparkByExamples 문서에서 persist에 대해 설명하는 부분을 첨부했습니다.

Spark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or using least-recently-used (LRU) algorithm. As discussed in one of the above section you can also manually remove using unpersist() method.Spark caching and persistence is just one of the optimization techniques to improve the performance of Spark jobs.For RDD cache() default storage level is ‘MEMORY_ONLY‘ but, for DataFrame and Dataset, default is ‘MEMORY_AND_DISK‘On Spark UI, the Storage tab shows where partitions exist in memory or disk across the cluster.Dataset cache() is an alias for persist(StorageLevel.MEMORY_AND_DISK)Caching of Spark DataFrame or Dataset is a lazy operation, meaning a DataFrame will not be cached until you trigger an action.

 

2. spark-default.conf 활용하기

Spark 3.2.0 docs
Spark 각 버전별 configuration docs를 참고하고 있으며 정말 많은 옵션이 있기 때문에 모두 활용하거나 외우고 있진 못합니다. 필요할 때 마다 하나씩 찾아서 적용해나가고 있습니다.

 

Configuration - Spark 3.2.0 Documentation

Spark Configuration Spark provides three locations to configure the system: Spark properties control most application parameters and can be set by using a SparkConf object, or through Java system properties. Environment variables can be used to set per-mac

spark.apache.org

그 중 애용하고 있는 옵션 3가지 정도를 선별해 보았습니다.

spark.dynamicAllocation

 

Configuration - Spark 3.2.0 Documentation

Spark Configuration Spark provides three locations to configure the system: Spark properties control most application parameters and can be set by using a SparkConf object, or through Java system properties. Environment variables can be used to set per-mac

spark.apache.org

node별 executor 수를 스테이지 로드 부하에 따라 dynamic 하게 allocation하여 node 자원을 최대한 활용할 수 있게 해줍니다. AWS 환경이라면 EMR 4.4.0 이후부터는 default true입니다. 하지만 이는 무분별하게 사용될 시, 아래와 같은 문제를 겪을 수 있으니 아래 옵션들을 적절히 설정하여 사용해야합니다.

spark.default.parallelism Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user. 2X number of CPU cores available to YARN containers.
spark.driver.memory Amount of memory to use for the driver process, i.e. where SparkContext is initialized. (for example, 1g, 2g). Setting is configured based on the instance types in the cluster. However, because the Spark driver application may run on either the master or one of the core instances (for example, in YARN client and cluster modes, respectively), this is set based on the smaller of the instance types in these two instance groups.
spark.executor.memory Amount of memory to use per executor process. (for example, 1g, 2g) Setting is configured based on the core and task instance types in the cluster.
spark.executor.cores The number of cores to use on each executor. Setting is configured based on the core and task instance types in the cluster.
spark.executor.instances The number of executors. Setting is configured based on the core and task instance types in the cluster. Set unless spark.dynamicAllocation.enabled explicitly set to true at the same time.

spark application에서 겪는 성능상 대표적인 문제는 shufflememory spill이라 생각됩니다.
기본적으로 shulffle이 주요병목인 경우, broadcast join을 사용하여 개선한 사례가 많았습니다.
memory spill은 데이터가 메모리에 fit이 되지 않을 때 발생할 수 있으며, 심한 경우 cpu, memory utilization이 떨어져 성능에 영향을 줄 수 있습니다. 이경우 partition 수를 늘리고 executor의 core 수를 줄임으로서 극복한 사례가 많았습니다.

Adaptive query execution
하지만 이러한 고민들을 최근에는 Spark에서 자체적으로 많이 덜어주게 되었습니다. 바로 Adaptive query execution입니다. 아래는 Spark Submit 게시글에서 가져온 내용입니다.

So over the years, there has been an extensive and continuous effort to improve Spark SQL’s query optimizer. And one of the various improvement is the cost optimization framework. Adaptive query execution looks to tackle such issues by optimizing and adjusting query plans based on runtime statistics. It reduces manual effort of tuning Sparks shuffle partitions, and it dynamically changes sort, merge, join into broadcast-hash join, and also it is able to handle skew joins. This Spark query execution is available in our newest version which is seven dot X and Spark three dot O. There is also a different talk in the Spark Summit that you might wanna watch if you’re interested in learning more.

Adaptive Query Execution by examples

 

Spark 3.0 - Adaptive Query Execution with Example — SparkByExamples

Adaptive Query Execution (AQE) is one of the greatest features of Spark 3.0 which reoptimizes and adjusts query plans based on runtime statistics

sparkbyexamples.com

Adaptive Query Execution Docs

 

Performance Tuning - Spark 3.2.0 Documentation

spark.apache.org

을 읽어보시면 도움이 될 거라 생각합니다.

maximizeResourceAllocation
마지막으로 AWS EMR maximizeResourceAllocation 옵션입니다.
이는 EMR specific한 옵션이며 다른 vendor에서는 다른 명칭을 사용하고 있을 수 있습니다. 해당 옵션을 통해 node type별 exeuctor 및 driver의 cpu, memory 값을 크게 신경쓰지 않게 되었습니다.
설정법 및 자세한 설명은 Configure Spark - AWS EMR 문서를 읽어보는 것을 추천드립니다.

반응형
Comments