일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Spark structured streaming
- 데이터엔지니어
- kubernetes
- 데이터엔지니어링
- DataEngineering
- apache spark
- recommendation system
- AWS SageMaker
- cloudera
- 하둡에코시스템
- 하둡
- BigData
- Terraform
- Python
- 개발자
- 빅데이터
- dataengineer
- hadoop
- 클라우데라
- Data engineering
- pyspark
- mlops
- 블로그
- redis bloom filter
- 추천시스템
- spark
- eks
- 빅데이터플랫폼
- 개발자혜성
- kafka
- Today
- Total
Hyesung Oh
AWS EMR: EMRFS의 핵심 기능 들 feat. consistent view, S3-optimized committer 본문
AWS EMR: EMRFS의 핵심 기능 들 feat. consistent view, S3-optimized committer
혜성 Hyesung 2022. 4. 28. 22:07EMRFS
EMR의 S3 파일 읽기 쓰기와 관련된 프로토콜 집합이며, Amazon S3로 직접 일반 파일을 읽고 쓰는 데 사용하는 HDFS 구현체이다. 그리고 여기엔 다양한 기능들이 포함되는데, 그 중에서도 대표적으로 실무에서 가장 많이 이슈를 겪었던 consistent view와 s3-optimized commiter에 대해 다뤄보려 한다.
EMRFS consistent view
EMR에서는 file consistency를 강화하기 위해 자체적인 consistent view 메커니즘을 지원한다. EMR이 S3 또는 file system에 파일에 대한 메타정보를 DynamoDB에 관리한다. EMR 단에서 file을 create, delete를 하게 되면, 정상적으로 DynamoDB 메타정보와 sync가 된다. 하지만, EMR이 아닌 외부에서 파일을 삭제 시, 파일의 메타 정보(상태)가 EMR이 바라보고 있는 (DynamoDB 메타정보) consistent view 정보와 불일치 하게 되어 EMR에서 해당 file을 read 시, 에러가 나게 된다.
이 문제는 EMRFS의 consistent view를 비활성화 함으로서 문제를 해결할 수 있다.
"Classification": "emrfs-site",
"Properties": {
"fs.s3.consistent.retryPeriodSeconds": "10",
"fs.s3.consistent": "true",
"fs.s3.consistent.retryCount": "5",
"fs.s3.consistent.metadata.tableName": "EmrFSMetadata",
"fs.s3.enableServerSideEncryption": "true"
},
"Configurations": []
위 Properties 내용에서 fs.s3.consistent 관련 내용을 모두 지우면 된다.
consistency가 걱정되는가? 이제는 Amazone S3 Strong Consistency가 지원되어 걱정없다. 자세한 내용은 해당 포스팅 스코프에 벗어나므로 여기를 참고
EMRFS S3-optimized committer
EMRFS S3-optimized committer는 s3 파일 쓰기에 최적화된 OutputCommitter 구현체이다.
5.19 이상 버전부터 default true로 사용가능하며 SparkSQL, DataFrames, Datasets 고수준 API에 모두 사용된다.
EMR 6.4 버전 전 까지는 Parquet만 지원이 되었지만, 현재는 모든 포맷의 파일 (ORC, and text-based formats (including CSV and JSON) 등이 지원된다.
하지만, EMRFS S3-optimized committer가 사용되지 못하는 케이스들이 있다.
아래 두 가지의 경우, EMRFS S3-optimized committer 가 비활성화 되고 Spark builtin OutputCommiter가 적용된다.
Custom partition location
val table = "dataset"
val location = "s3://bucket/table"
spark.sql(s"""
CREATE TABLE $table (id bigint, dt date)
USING PARQUET PARTITIONED BY (dt)
LOCATION '$location'
""")
// Add a partition using a custom locationval customPartitionLocation = "s3://bucket/custom"
spark.sql(s"""
ALTER TABLE $table ADD PARTITION (dt='2019-01-28')
LOCATION '$customPartitionLocation'
""")
// Add another partition using default location
spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')")
def asDate(text: String) = lit(text).cast("date")
spark.range(0, 10)
.withColumn("dt",
when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29")))
.write.insertInto(table)
위의 코드는, 아래와 같은 S3 objects를 생성한다.
custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$
Dynamic partition overwrite mode
default인 option(partitionOverwriteMode, static) 대신 dynamic을 사용하는 경우, dynamic partitioning을 사용하게 되어, EMRFS S3-optimized committer가 아닌 Spark 자체 OutputCommitter를 사용하게 된다.
dataset.write.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.partitionBy("dt")
.parquet("s3://EXAMPLE-DOC-BUCKET/output")
그에 반해 아래와 같이 사용할 경우 EMR에서 default enabled대로 S3에 parquet 쓰기시 성능 이점을 얻을 수 있다.
1. default인 spark.option(partitionOverwriteMode, static)을 그대로 사용
2. Hive Table에 write 시, custom partition location이 아닌 default partition locaction에 쓰기
추가로 도움이 될까 싶어, 아래는 기존 Commiter에 비해 어떤 점이 좋은지에 대한 내용을 발췌하였습니다. 참고
When writing to partitions at custom locations, Spark uses a commit algorithm similar to the previous example, which is outlined below. As with the earlier example, the algorithm results in sequential renames, which may negatively impact performance. steps:
- When writing output to a partition at a custom location, tasks write to a file under Spark's staging directory, which is created under the final output location. The name of the file includes a random UUID to protect against file collisions. The task attempt keeps track of each file along with the final desired output path.
- When a task completes successfully, it provides the driver with the files and their final desired output paths.
- After all tasks complete, the job commit phase sequentially renames all files that were written for partitions at custom locations to their final output paths.
- The staging directory is deleted before the job commit phase completes.
핵심은, 3번에 spark-stage directory에 썼던 임시파일이 S3에 commit 될 때 sequentially rename 되는 과정이 병목이라는 것이다.
예상치 못한 부작용
The EMRFS S3-optimized committer consumes a small amount of memory for each file written by a task attempt until the task gets committed or aborted. In most jobs, the amount of memory consumed is negligible. For jobs that have long-running tasks that write a large number of files, the memory that the committer consumes may be noticeable and require adjustments to the memory allocated for Spark executors. You can tune executor memory using thespark.executor.memory property. As a guideline, a single task writing 100,000 files would typically require an additional 100MB of memory. For more information, see Application properties in the Apache Spark Configuration documentation.
Kafka -> Spark Streaming -> S3 파이프라인 테스트 과정에서, S3에 실시간으로 적재된 로그 데이터가 Athena(Athena와 연결된 BI툴을 의미)에서 조회가 되지 않았다. 이유는, EMRFS s3-optimized commiter의 경우 일정 size동안 메모리에 잡아뒀다가 임계치를 초과하면 한번에 commit 하기 때문에 실시간 S3 write의 경우 Athena에서 조회가 되지 않았던 것이다.
이런 니즈가 있는 경우, EMRFS S3-optimized committer를 비활성화 하면 해결할 수 있다.
spark.sql.parquet.fs.optimized.committer.optimization-enabled=false
이후 실시간으로 S3 데이터가 조회되는 것을 확인할 수 있었다.
참고자료
Understanding how EMRFS consistent view tracks objects in Amazon S3