일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 추천시스템
- cloudera
- dataengineer
- 개발자
- 클라우데라
- redis bloom filter
- AWS SageMaker
- hadoop
- Data engineering
- kafka
- 데이터엔지니어링
- 빅데이터
- spark
- eks
- BigData
- 빅데이터플랫폼
- mlops
- 하둡
- recommendation system
- DataEngineering
- Python
- 블로그
- pyspark
- 하둡에코시스템
- apache spark
- kubernetes
- Spark structured streaming
- 개발자혜성
- Terraform
- 데이터엔지니어
- Today
- Total
Hyesung Oh
Pyspark 도입 후 고도화하기/ 1. 프로젝트 구조 본문
Scala Spark에서 Pyspark로의 이전을 진행하며 겪었던 시행착오와 고민들을 공유하고자 합니다.
프로젝트 구조
spark 아래에 core, tasks, util 폴더를 두었습니다.
tasks: spark-submit 호출시 PY_FILE로 넘겨주는 python file이며 spark context를 초기화하며 pyspark dataframe api를 이용하여 비즈니스로직을 구현합니다.
core: 모든 Spark application에서 데이터 로드, 처리, 적재시 공통적으로 사용되는 논리적인 개념을 추상화한 모듈을 정의했습니다. 아래에서 자세히 다루겠습니다.
util: tasks 또는 core에서 공통적으로 사용되는 패턴 또는 utility 기능들을 정의했습니다.
1. tasks
- app
- model
- table
app
DI -> 데이터 로드 -> 변환 -> 적재 흐름을 정의합니다.
model
도메인마다의 비즈니스로직을 Pyspark DataFrame API를 이용해 정의합니다. 각 로직은 함수단위로 분리하여 가독성을 최대한 높이고자 하였습니다. 가독성에 대한 고민은 추후 포스트에서 자세히 다루도록 하겠습니다.
table
각 도메인이 적재되는 Hive Table에 대한 명세를 정의합니다. table_name, partition, column 등의 정보를 주로 정의합니다. 데이터 베이스는 개발자가 직접 정의하지 않고, Runtime에 injection 되도록 구현했습니다.
2. core
보통 Spark에서 ELT 또는 ETL을 할 때에 신경써야할 부분은 크게 3가지 입니다.
- date
- database
- table
이를 context라는 개념으로 추상화 했습니다. 쉽게 말해, runtime에 injection되는 dependency들을 핸들링하는 부분입니다. tasks는 비즈니스로직만을 담당하도록 함으로써 유연성과 확장성을 고려하였습니다.
Context
코드레벨에서 context에 대해 살펴보도록 하겠습니다.
@dataclass(frozen=True)
class HiveContext:
database: str date:
date = None
extra_partition_params: Dict[str, Union[str, int]] = field(default_factory=dict)
Spark에서 적재하는 데이터는 Hive External Table로 관리하고 있으며 metastore는 EMR에서 AWS Glue Catalog를 사용하고 있습니다. 테이블은 주로 date 컬럼을 기준으로 partitioning 하고 있으며 그 외 추가 파티셔닝이 필요한 경우 extra_partition_params에 정의하고 있습니다.
if __name__ == "__main__":
today_date = normal_date_formatter.parse(sys.argv[1])
database = sys.argv[2]
hive_context = HiveContext(database, today_date)
위는 Run time(Drvier node에서 SparkContext 초기화)에 외부로부터 date, database 정보를 주입받는 부분입니다.
저희팀에서는 Airflow(정확히는 AWS Managed Airflow 서비스인 MWAA)를 이용하여 Spark Application간의 의존성 및 스케줄을 관리하고 있습니다. 해당 부분에 대해서는 추후 포스트에서 자세히 다루도록 하겠습니다.
HiveTable
class HiveTable(ABC):
def __call__(self, ctx: HiveContext = None):
if not ctx:
if self._ctx:
return self
raise ValueError("HiveContext is not provided.")
self._ctx = ctx
return self
def __init__(self, table: str, num_partitions: int, coalesce: bool = False):
self.table = table
self.num_partitions = num_partitions
self.coalesce = coalesce
self._ctx: Optional[HiveContext] = None
@property
def full_table_name(self):
return f"{self._ctx.database}.{self.table}"
def drop(self):
spark.sql(f"DROP TABLE {self.full_table_name}") @abstractmethod
def save(self, df: DataFrame):
pass
HiveTable에서 HiveContext를 Runtime에 초기화하여 최종 처리된 데이터를 저장하는 로직을 담당하고 있습니다. 그 외 메소드는 partitioned table, unpartitioned table 이냐에 따라 자식 클래스에서 상속하여 별도 구현해주고 있습니다.
3. Util
공통적으로 자주 사용되는 디자인패턴 및 기능들을 정의하였습니다. 아래는 몇가지 예시입니다.
1. 싱글톤 패턴
class MetaSingleton(type):
_instance = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instance:
cls._instance[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs)
return cls._instance[cls]
2. DataFrame rename
def rename_df(df: DataFrame, column_names: List[str]) -> DataFrame:
return df.toDF(*column_names)
3. Cast column type
def cast_columns(df: DataFrame, **col_name_and_type: DataType) -> DataFrame:
for col_name, cast_to in col_name_and_type.items():
df = df.withColumn(col_name, F.col(col_name).cast(cast_to))
return df
피드백이나 기술적 토론은 언제나 환영이니 댓글로 많은 관심 부탁드립니다.
'Data Engineering > Apache Spark' 카테고리의 다른 글
Long running Spark Job Problem: NodeManager is unhealthy (0) | 2022.04.27 |
---|---|
Pyspark 도입 후 고도화하기/ 4. Optimization feat. spark-default.conf (0) | 2021.11.02 |
Pyspark 도입 후 고도화하기/ 3. 가독성 높이기 feat. transform spark3.0 (0) | 2021.11.01 |
Pyspark 도입 후 고도화하기/ 2. Pyspark 작동 원리 feat. Py4J (2) | 2021.11.01 |
spark-shell 실행 메커니즘 이해하기 (0) | 2021.01.22 |