일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- AWS SageMaker
- DataEngineering
- Terraform
- 하둡에코시스템
- Data engineering
- Spark structured streaming
- mlops
- 빅데이터
- 데이터엔지니어링
- 하둡
- kafka
- 개발자혜성
- 빅데이터플랫폼
- 클라우데라
- 데이터엔지니어
- recommendation system
- pyspark
- dataengineer
- 블로그
- BigData
- 추천시스템
- eks
- Python
- 개발자
- hadoop
- redis bloom filter
- apache spark
- kubernetes
- spark
- cloudera
- Today
- Total
Hyesung Oh
Pyspark 도입 후 고도화하기/ 3. 가독성 높이기 feat. transform spark3.0 본문
Pyspark 도입 후 고도화하기/ 3. 가독성 높이기 feat. transform spark3.0
혜성 Hyesung 2021. 11. 1. 23:55Pyspark로 이전하면서 느꼈던 가장 큰 아쉬움 한가지는 바로, Scala의 Dataset API가 지원되지 않는다는 점이었습니다. Dataset API를 통해 객체지향 인터페이스를 마음껏 누리면서도 Spark tunsgten project 이후 (Scala code -> java object -> 자연스레 cpu, memory 오버헤드) CPU, Memory 작업에서 많은 최적화가 이루어져 성능도 우수한 편이었기에 개인적으로 선호했기 때문입니다. Pyspark에서 이러한 아쉬움을 달래기 위해 노력한 과정과 그 과정에서 바뀌게된 생각들을 공유하고자 합니다.
Code Style Guide
코드 스타일 가이드는 https://github.com/palantir/pyspark-style-guid를 우선적으로 참고하였습니다. 그리고 팀내부적으로 몇가지 규약을 추가 또는 제외하는 식으로 합의를 보았습니다.
Scala Dataset에서는 기존에 익숙한 객체지향 프로그래밍 방식과, 비즈니스 로직이 Scala 코드에서 곧바로 정의되어 있다는 점 때문에 디버깅이 용이했던 것 같습니다. 하지만 Pyspark Code의 핵심은 pyspark.sql.functions에 있는 native method들을 얼마나 잘활용하여 가독성있게 작성하느냐였습니다. 따라서 코드를 작성하다 보면 모든 비즈니스 로직은 withColumn으로 구현될 수 밖에 없었고, 이는 코드양이 많을 경우 코드를 이해하는 측면과 디버깅 측면에서 적잖게 부담이 었습니다.
Transform
다행히도, spark 3.0으로 빠르게 버전 업을 하고 난 이후 transform을 사용할 수 있게 되어 이러한 고민을 많이 덜 수 있었습니다. Scala에서 case class를 입력해줌으로써 데이터 처리 context를 전달하는 것과 비슷한 효과를 누릴 수 있었습니다. 코드를 통해 비교해보면 좋을 것 같습니다.
아래는 각각 동일한 결과물을 만드는 Scala, Python 코드 입니다.
scala
val AttributesDS = spark
.sql(
s"""
| SELECT
| u.id,
| ~ 이하 생략
|FROM source u
| LEFT JOIN b p on u.~ = p.~
| LEFT JOIN c m on u.~ = m.~
|""".stripMargin
)
.as[AttributeRaw]
.map {
case AttributeRaw(
id,
~ 이하 생략
) =>
var amount: Option[Int] = None
val is_verified_new = is_verified == "Y" && amount.getOrElse(0) >= 100
Attribute(
id
amount,
is_verified_new,
~ 이하 생략
)
}
python
attributes_df = spark.sql(
f"""
SELECT
id
~ 이하 생략
FROM source u
LEFT JOIN a p ON u.~ = p.~
LEFT JOIN b m ON u.~ = m.~
"""
).transform(to_attributes)
def to_attributes(df: DataFrame) -> DataFrame:
return df\
.withColumn("column1", F.lit(None).cast(StringType())) \
.withColumn("amount", calculate_amount(F.col("some column name"), F.col("is_verified"))) \
.withColumn("is_verified_new", is_verified(F.col("some column name"))) \
.select(USER_ATTRIBUTES)
개인적으로는 python 코드가 처음에 걱정했던 것 만큼 가독성 측면에서 부족하지 않다고 느껴졌습니다.
그 이유를 아래 정리해보았습니다.
첫번째, 두괄식이라 명료합니다. 코드 흐름을 그대로 따라가보면, to_user_attribute로 transform을 하겠다는 선언을 최상단에서 하고 있습니다. Scala 코드를 보면, amount와 is_verified_new 컬럼을 산출하는 코드 이후에 Attribute로 최종 변환됨을 인지하게 됩니다. 이부분을 AttributeRaw의 toAttribute함수에서 정의하여 아래와 같이 구현하면 동일합니다. 따라서 무승부입니다.
.as[AttributeRaw].map(ar.toAttribute)
두번째, 비즈니스 로직을 transform을 이용해 논리적인 단위로 분리할 수 있었습니다. transformation 과정이 길어지는 경우 코드 가독성에서 더욱 빛을 바랬습니다. 이는 Scala case class를 통해 데이터 처리 및 비즈니스로직 각 단계를 명시하는 효과와 동일했습니다. 기존 withColumn만으로 구현한다면 어떨까요? 상상에 맏기겠습니다.
def to_attributes(df: DataFrame) -> DataFrame:
return df\
.withColumn("column1", F.lit(None).cast(StringType())) \
.withColumn("amount", calculate_amount(F.col("some column name"), F.col("is_verified"))) \
.withColumn("is_verified_new", is_verified(F.col("some column name"))) \
.select(USER_ATTRIBUTES)
하지만 위와 같이 withColumn상의 각 컬럼 단위 변환을 함수로 정의하고, 그보다 한단계 위에서 to_attribute 함수로 감싸줌으로서 scope와 context를 단계적으로 파악할 수 있게 되었습니다.
물론, 위의 코드는 아직 많이 부족한 단계이며, 부단한 개인의 연습과 팀내 코드리뷰를 통해서 더욱 개선되어질 수 있는 여지가 있다고 생각합니다. 미래의 제 자신이 과거 작성한 코드를 보고 반성하는 계기가 된다면 그거대로 좋을 것 같습니다.
더 좋은 아이디어나 제언이 있다면 댓글로 남겨주시면 감사드리겠습니다.
긴글 읽어주셔서 감사합니다.
'Data Engineering > Apache Spark' 카테고리의 다른 글
Long running Spark Job Problem: NodeManager is unhealthy (0) | 2022.04.27 |
---|---|
Pyspark 도입 후 고도화하기/ 4. Optimization feat. spark-default.conf (0) | 2021.11.02 |
Pyspark 도입 후 고도화하기/ 2. Pyspark 작동 원리 feat. Py4J (2) | 2021.11.01 |
Pyspark 도입 후 고도화하기/ 1. 프로젝트 구조 (0) | 2021.11.01 |
spark-shell 실행 메커니즘 이해하기 (0) | 2021.01.22 |