Hyesung Oh

Pyspark 도입 후 고도화하기/ 3. 가독성 높이기 feat. transform spark3.0 본문

Data Engineering/Apache Spark

Pyspark 도입 후 고도화하기/ 3. 가독성 높이기 feat. transform spark3.0

혜성 Hyesung 2021. 11. 1. 23:55
반응형

Pyspark로 이전하면서 느꼈던 가장 큰 아쉬움 한가지는 바로, Scala의 Dataset API가 지원되지 않는다는 점이었습니다. Dataset API를 통해 객체지향 인터페이스를 마음껏 누리면서도 Spark tunsgten project 이후 (Scala code -> java object -> 자연스레 cpu, memory 오버헤드) CPU, Memory 작업에서 많은 최적화가 이루어져 성능도 우수한 편이었기에 개인적으로 선호했기 때문입니다. Pyspark에서 이러한 아쉬움을 달래기 위해 노력한 과정과 그 과정에서 바뀌게된 생각들을 공유하고자 합니다.


Code Style Guide

코드 스타일 가이드는 https://github.com/palantir/pyspark-style-guid를 우선적으로 참고하였습니다. 그리고 팀내부적으로 몇가지 규약을 추가 또는 제외하는 식으로 합의를 보았습니다.

Scala Dataset에서는 기존에 익숙한 객체지향 프로그래밍 방식과, 비즈니스 로직이 Scala 코드에서 곧바로 정의되어 있다는 점 때문에 디버깅이 용이했던 것 같습니다. 하지만 Pyspark Code의 핵심은 pyspark.sql.functions에 있는 native method들을 얼마나 잘활용하여 가독성있게 작성하느냐였습니다. 따라서 코드를 작성하다 보면 모든 비즈니스 로직은 withColumn으로 구현될 수 밖에 없었고, 이는 코드양이 많을 경우 코드를 이해하는 측면과 디버깅 측면에서 적잖게 부담이 었습니다.

Transform
다행히도, spark 3.0으로 빠르게 버전 업을 하고 난 이후 transform을 사용할 수 있게 되어 이러한 고민을 많이 덜 수 있었습니다. Scala에서 case class를 입력해줌으로써 데이터 처리 context를 전달하는 것과 비슷한 효과를 누릴 수 있었습니다. 코드를 통해 비교해보면 좋을 것 같습니다.
아래는 각각 동일한 결과물을 만드는 Scala, Python 코드 입니다.

scala

val AttributesDS = spark
  .sql(
    s"""
          | SELECT
          |     u.id,
          |     ~ 이하 생략
          |FROM source u 
          |  LEFT JOIN b p on u.~ = p.~
          |  LEFT JOIN c m on u.~ = m.~
          |""".stripMargin
  )
  .as[AttributeRaw]
  .map {
    case AttributeRaw(
          id,
          ~ 이하 생략
        ) =>
      var amount: Option[Int] = None
      val is_verified_new = is_verified == "Y" && amount.getOrElse(0) >= 100

      Attribute(
        id
        amount,
        is_verified_new,
        ~ 이하 생략
      )

  }

python

attributes_df = spark.sql(
        f"""
        SELECT
            id
            ~ 이하 생략
        FROM source u
          LEFT JOIN a p ON u.~ = p.~
          LEFT JOIN b m ON u.~ = m.~
        """
    ).transform(to_attributes)
    
def to_attributes(df: DataFrame) -> DataFrame:
    return df\
        .withColumn("column1", F.lit(None).cast(StringType())) \
        .withColumn("amount", calculate_amount(F.col("some column name"), F.col("is_verified"))) \
        .withColumn("is_verified_new", is_verified(F.col("some column name"))) \
        .select(USER_ATTRIBUTES)

개인적으로는 python 코드가 처음에 걱정했던 것 만큼 가독성 측면에서 부족하지 않다고 느껴졌습니다.
그 이유를 아래 정리해보았습니다.

첫번째, 두괄식이라 명료합니다. 코드 흐름을 그대로 따라가보면, to_user_attribute로 transform을 하겠다는 선언을 최상단에서 하고 있습니다. Scala 코드를 보면, amount와 is_verified_new 컬럼을 산출하는 코드 이후에 Attribute로 최종 변환됨을 인지하게 됩니다. 이부분을 AttributeRaw의 toAttribute함수에서 정의하여 아래와 같이 구현하면 동일합니다. 따라서 무승부입니다.

.as[AttributeRaw].map(ar.toAttribute)


두번째, 비즈니스 로직을 transform을 이용해 논리적인 단위로 분리할 수 있었습니다. transformation 과정이 길어지는 경우 코드 가독성에서 더욱 빛을 바랬습니다. 이는 Scala case class를 통해 데이터 처리 및 비즈니스로직 각 단계를 명시하는 효과와 동일했습니다. 기존 withColumn만으로 구현한다면 어떨까요? 상상에 맏기겠습니다.

def to_attributes(df: DataFrame) -> DataFrame:
    return df\
        .withColumn("column1", F.lit(None).cast(StringType())) \
        .withColumn("amount", calculate_amount(F.col("some column name"), F.col("is_verified"))) \
        .withColumn("is_verified_new", is_verified(F.col("some column name"))) \
        .select(USER_ATTRIBUTES)

하지만 위와 같이 withColumn상의 각 컬럼 단위 변환을 함수로 정의하고, 그보다 한단계 위에서 to_attribute 함수로 감싸줌으로서 scope와 context를 단계적으로 파악할 수 있게 되었습니다.

물론, 위의 코드는 아직 많이 부족한 단계이며, 부단한 개인의 연습과 팀내 코드리뷰를 통해서 더욱 개선되어질 수 있는 여지가 있다고 생각합니다. 미래의 제 자신이 과거 작성한 코드를 보고 반성하는 계기가 된다면 그거대로 좋을 것 같습니다.

더 좋은 아이디어나 제언이 있다면 댓글로 남겨주시면 감사드리겠습니다.
긴글 읽어주셔서 감사합니다.

반응형
Comments