일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 데이터엔지니어링
- redis bloom filter
- 클라우데라
- 데이터엔지니어
- 추천시스템
- 하둡에코시스템
- 블로그
- kafka
- mlops
- pyspark
- DataEngineering
- BigData
- Python
- spark
- kubernetes
- cloudera
- 개발자혜성
- Terraform
- 빅데이터플랫폼
- Spark structured streaming
- Data engineering
- AWS SageMaker
- recommendation system
- apache spark
- 개발자
- eks
- 빅데이터
- 하둡
- hadoop
- dataengineer
- Today
- Total
Hyesung Oh
Bloom Filter 를 사용해봅시다 [1] python, pyspark bloom filter 구현 본문
Bloom Filter 를 사용해봅시다 [1] python, pyspark bloom filter 구현
혜성 Hyesung 2024. 6. 17. 20:01TL;DR
개발자가 작성하는 많은 비즈니스로직에는 특정 조건, 집단에 해당하는 item, user만 포함 or 제외하는 형태가 많은 부분을 차지합니다. 특히 데이터 엔지니어의 경우 통계 마트 테이블을 만들일이 많기 때문에 이와 같은 패턴에 익숙할 것입니다.
그중 Spark를 예를 들어,
(1) user별 구매이력, (2) user 정보 두 테이블이 있을 때, user id를 key로 하여 두 테이블을 join 후 특정 조건 (1)에 해당하는 row만 (2)에서 filter 하는 식일 것입니다.
하지만 join시 여러 worker node간에 분산 배치되어 있는 동일 key들을 동일 partition에 위치시키기 위해 필연적으로 동반되는 shuffling 동작은 데이터가 커질 시 병목으로 작용하게 됩니다. Spark는 이러한 일련의 동작들을 Operator 단위로 구성된 phisical plan으로 최적화하며 그 단계에서 `Join Selection Strategy`를 사용합니다. 관련해서 더 자세한 내용은 아래 포스팅을 추천드립니다.
보통 Join Selection Strategy가 사용하는 여러 join strategy 중 대표적인 3가지만 기억해두면 되는데요,
1. Sort Merge Join
2. Shuffle Hash Join
3. Broadcast Hash Join
spark의 default join 은 1. sort merge join으로 동작하게 됩니다. 하지만 Spark SQL에서는 join 하는 한쪽 테이블의 크기가 default 10MB(spark.sql.autoBroadcastJoinThreshold) 이하일 경우 자동으로 3. Broadcast hash join으로 동작하게 되는데요, 이는 Spark 3.2부터 default로 지원되는 AQE를 사용하더라도 동일합니다.
다들 아시다시피 Broadcast Hash Join의 경우 1, 2에 비해 보통 더 빠릅니다. 하지만 이 또한 테이블의 사이즈가 과도하게 클 시에 네트워크 병목 또는 OOM이 발생할 수 있으니 주의해야 합니다.
위의 예시로 들면, 사용자 테이블의 사이즈가 10mb를 초과할 시 broadcast hash join을 사용할 수 없게 되었다고 가정해봅시다. 이 경우 sort merge join으로 동작하겠지만, 만약 사용자 테이블을 10mb 이하로 줄일 수 있다면 어떨까요?? bloom filter를 사용해서 이를 최적화 할 수 있습니다.
Bloom Filter
https://en.wikipedia.org/wiki/Bloom_filter
Bloom Filter는 인-메모리 확률형 자료구조라고 표현하는데요, 거창한 것이 아니라 비트맵을 구성하는 것입니다. 아래는 Python을 이용한 간단한 구현 예시입니다.
*모든 예시 코드는 pyspark-shell 기준 입니다.
Python 구현
먼저 m: 담을 item 수, k: item 추가, 조회시 통과해야 하는 hash function 수를 지정합니다.
import numpy as np
import mmh3
from bitarray import bitarray
# Parameters for Bloom Filter
m = 1000 # Size of the bit array
k = 3 # Number of hash functions
# Initialize the bit array
bit_array = bitarray(m)
bit_array.setall(0)
각 item은 bloom filter에 추가될 때 정해진 수(k)의 hash function을 통과합니다. hash function의 return 값은 길이 m의 bitmap의 index가 되고 각 index 위치의 값을 1로 치환하면 끝입니다.
def hash_item(item, seed):
"""Generate a hash for an item with a given seed."""
return mmh3.hash(item, seed) % m
def add_item_to_bloom_filter(item):
"""Add an item to the Bloom Filter."""
for i in range(k):
bit_index = hash_item(item, i)
bit_array[bit_index] = 1
items = ["item-01", "item-02", "item-03"]
# Add item to Bloom Filter
for item in items:
add_item_to_bloom_filter(item)
반대로 각 item별 membership 조회시에는 모든 hash function의 return index 위치의 값이 1인지 확인하면 됩니다.
def check_item_in_bloom_filter(item):
"""Check if an item is in the Bloom Filter."""
for i in range(k):
bit_index = hash_item(item, i)
if not bit_array[bit_index]:
return False
return True
# Check membership
print(check_item_in_bloom_filter("item-01")) # Expected: True
print(check_item_in_bloom_filter("item-02")) # Expected: True
print(check_item_in_bloom_filter("item-03")) # Expected: True
print(check_item_in_bloom_filter("item-04")) # Expected: False
Bloom Filter with Pyspark
위에서 구현한 함수를 Pyspark에서 활용하려면 UDF로 등록하여 사용하면 됩니다. 아래는 전체 코드 예시입니다.
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
df = spark.createDataFrame(items, columns)
@udf(BooleanType())
def add_to_bloom_filter_udf(item):
add_item_to_bloom_filter(item)
return True
@udf(BooleanType())
def check_in_bloom_filter_udf(item):
return check_item_in_bloom_filter(item)
# broadcast bitarray
spark.sparkContext.broadcast(bit_array)
# Add item to Bloom Filter
df.withColumn("added", add_to_bloom_filter_udf(df.item)).show()
# Check membership
df.withColumn("in_bloom_filter", check_in_bloom_filter_udf(df.item)).show()
production에서는 내부적으로 C API를 사용하는 Python Interface를 제공하는 라이브러리인 Pybloom-live를 사용할 수 있습니다.
Pybloom-live
아래는 pybloom-live와 pyspark integration 예시 코드입니다.
from pybloom_live import BloomFilter
bloom_filter = BloomFilter(capacity=capacity, error_rate=error_rate)
items = ["item-01", "item-02", "item-03"]
for item in items:
bloom_filter.add(item)
bloom_filter_broadcast = spark.sparkContext.broadcast(bloom_filter)
def check_in_bloom_filter(item):
return item in bloom_filter_broadcast.value
check_in_bloom_filter_udf = udf(check_in_bloom_filter, BooleanType())
items = [("item-01",), ("item-02",), ("item-03",), ("item-04",)]
columns = ["item"]
df = spark.createDataFrame(items, columns)
df = df.withColumn("in_bloom_filter", check_in_bloom_filter_udf(df.item))
df.show()
FYI.
Pybloom의 경우 파일로 부터 읽기, 쓰기가 가능하니 이를 활용하여 파이프라인을 구성할 수도 있습니다.
from pybloom_live import BloomFilter
bf = BloomFilter(capacity=1000, error_rate=0.001)
items = [str(i) for i in range(1000)]
for item in items:
bf.add(item)
_FILE_NAME = "bloom_filter.bf"
with open(_FILE_NAME, "wb") as f:
bf.tofile(f)
with open(_FILE_NAME, "rb") as f:
bf2 = BloomFilter.fromfile(f)
print(bf2.capacity)
print(bf2.error_rate)
print(bf2.num_bits)
print(bf2.num_slices)
여기까지 Bloom Filter를 활용한 Spark Job 성능 개선 방법에 대해서 알아보았습니다.
이상으로 글을 마치겠습니다.
감사합니다.