Hyesung Oh

Pyspark 도입 후 고도화하기/ 1. 프로젝트 구조 본문

Data Engineering/Apache Spark

Pyspark 도입 후 고도화하기/ 1. 프로젝트 구조

혜성 Hyesung 2021. 11. 1. 17:05
반응형

Scala Spark에서 Pyspark로의 이전을 진행하며 겪었던 시행착오와 고민들을 공유하고자 합니다.


 

프로젝트 구조

spark 아래에 core, tasks, util 폴더를 두었습니다.
tasks: spark-submit 호출시 PY_FILE로 넘겨주는 python file이며 spark context를 초기화하며 pyspark dataframe api를 이용하여 비즈니스로직을 구현합니다.
core: 모든 Spark application에서 데이터 로드, 처리, 적재시 공통적으로 사용되는 논리적인 개념을 추상화한 모듈을 정의했습니다. 아래에서 자세히 다루겠습니다.
util: tasks 또는 core에서 공통적으로 사용되는 패턴 또는 utility 기능들을 정의했습니다.

1. tasks
- app
- model
- table

app
DI -> 데이터 로드 -> 변환 -> 적재 흐름을 정의합니다.
model
도메인마다의 비즈니스로직을 Pyspark DataFrame API를 이용해 정의합니다. 각 로직은 함수단위로 분리하여 가독성을 최대한 높이고자 하였습니다. 가독성에 대한 고민은 추후 포스트에서 자세히 다루도록 하겠습니다.
table
각 도메인이 적재되는 Hive Table에 대한 명세를 정의합니다. table_name, partition, column 등의 정보를 주로 정의합니다. 데이터 베이스는 개발자가 직접 정의하지 않고, Runtime에 injection 되도록 구현했습니다.

2. core
보통 Spark에서 ELT 또는 ETL을 할 때에 신경써야할 부분은 크게 3가지 입니다.
- date
- database
- table

이를 context라는 개념으로 추상화 했습니다. 쉽게 말해, runtime에 injection되는 dependency들을 핸들링하는 부분입니다. tasks는 비즈니스로직만을 담당하도록 함으로써 유연성과 확장성을 고려하였습니다.

Context
코드레벨에서 context에 대해 살펴보도록 하겠습니다.

@dataclass(frozen=True) 
class HiveContext: 
  database: str date: 
  date = None 
  extra_partition_params: Dict[str, Union[str, int]] = field(default_factory=dict)

Spark에서 적재하는 데이터는 Hive External Table로 관리하고 있으며 metastore는 EMR에서 AWS Glue Catalog를 사용하고 있습니다. 테이블은 주로 date 컬럼을 기준으로 partitioning 하고 있으며 그 외 추가 파티셔닝이 필요한 경우 extra_partition_params에 정의하고 있습니다.

if __name__ == "__main__": 
  today_date = normal_date_formatter.parse(sys.argv[1]) 
  database = sys.argv[2] 
  hive_context = HiveContext(database, today_date)

위는 Run time(Drvier node에서 SparkContext 초기화)에 외부로부터 date, database 정보를 주입받는 부분입니다.
저희팀에서는 Airflow(정확히는 AWS Managed Airflow 서비스인 MWAA)를 이용하여 Spark Application간의 의존성 및 스케줄을 관리하고 있습니다. 해당 부분에 대해서는 추후 포스트에서 자세히 다루도록 하겠습니다.

HiveTable

class HiveTable(ABC): 
  def __call__(self, ctx: HiveContext = None): 
      if not ctx: 
          if self._ctx: 
              return self 
          raise ValueError("HiveContext is not provided.") 
      self._ctx = ctx 
      return self 
  def __init__(self, table: str, num_partitions: int, coalesce: bool = False): 
      self.table = table 
      self.num_partitions = num_partitions 
      self.coalesce = coalesce 
      self._ctx: Optional[HiveContext] = None 
  @property 
  def full_table_name(self): 
  	return f"{self._ctx.database}.{self.table}" 
 
  def drop(self): 
  	spark.sql(f"DROP TABLE {self.full_table_name}") @abstractmethod 
    
  def save(self, df: DataFrame): 
  	pass

HiveTable에서 HiveContext를 Runtime에 초기화하여 최종 처리된 데이터를 저장하는 로직을 담당하고 있습니다. 그 외 메소드는 partitioned table, unpartitioned table 이냐에 따라 자식 클래스에서 상속하여 별도 구현해주고 있습니다.

3. Util
공통적으로 자주 사용되는 디자인패턴 및 기능들을 정의하였습니다. 아래는 몇가지 예시입니다.
1. 싱글톤 패턴

class MetaSingleton(type):
    _instance = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instance:
            cls._instance[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs)
        return cls._instance[cls]

2. DataFrame rename

def rename_df(df: DataFrame, column_names: List[str]) -> DataFrame:
    return df.toDF(*column_names)

3. Cast column type

def cast_columns(df: DataFrame, **col_name_and_type: DataType) -> DataFrame:
    for col_name, cast_to in col_name_and_type.items():
        df = df.withColumn(col_name, F.col(col_name).cast(cast_to))
    return df

 

 

피드백이나 기술적 토론은 언제나 환영이니 댓글로 많은 관심 부탁드립니다.

반응형
Comments