일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 빅데이터
- kubernetes
- 하둡에코시스템
- 블로그
- 데이터엔지니어링
- 추천시스템
- Python
- pyspark
- spark
- 빅데이터플랫폼
- dataengineer
- mlops
- hadoop
- Data engineering
- recommendation system
- 데이터엔지니어
- kafka
- 개발자혜성
- DataEngineering
- 하둡
- 개발자
- Terraform
- AWS SageMaker
- 클라우데라
- cloudera
- BigData
- Spark structured streaming
- eks
- redis bloom filter
- apache spark
- Today
- Total
Hyesung Oh
Bloom Filter 를 사용해봅시다 [2] Redis Bloom Filter feat. 추천시스템 본문
Bloom Filter 를 사용해봅시다 [2] Redis Bloom Filter feat. 추천시스템
혜성 Hyesung 2024. 6. 17. 21:09TL;DR
지난 포스팅에서는 Bloom Filter 소개와 Pyspark Integration에 대해서 소개했습니다.
https://surgach.tistory.com/140
Bloom Filter는 다양한 활용처가 있겠지만, 그중에서도 추천 시스템에 활용할 수 있습니다. Redis-stack에서는 Bloom Filter를 제공하는데요, 추천 결과를 유저에게 제공할 때, 유저별로 제외해야 할 아이템을 찾는 데 사용할 수 있습니다.
아키텍처 계층 관점에서는 Client가 UI를 renderning 하기 위해 호출하는 API와 추천 결과를 서빙하는 Storage or Inference server 사이에 Redis Bloom Filter를 위치시킬 수 있을 것입니다.
+---------+ +---------+ +---------------+ +-----------------+
| Client | --> | API | --> | Redis Bloom | --> | Storage or |
| | | Server | | Filter | | Inference Server|
+---------+ +---------+ +---------------+ +-----------------+
기존 방식은 유저별로 추천에서 제외해야할 아이템 목록을 RDBMS 테이블로 관리를 하는 것이고, index를 태우면 충분한 속도로 서빙이 가능했습니다.
다만, 사용자별로 제외해야 할 작품을 적용하는 로직은 100%의 정확도를 요구하지 않고 어느 정도의 false positive가 허용된다는 점(예: 소장하지 않았지만 소장했다고 보고 제외한다거나)을 고려했을 때, 보다 자원 효율적인 인-메모리 확률형 자료구조 Redis Bloom Filter를 도입해볼 수 있습니다.
많은 테크 기업에서 Bloom Filter를 활용하고 있고 아래는 대표적인 use case 들입니다.
- Google Bigtable, Apache HBase and Apache Cassandra and PostgreSQL use Bloom filters to reduce the disk lookups for non-existent rows or columns. Avoiding costly disk lookups considerably increases the performance of a database query operation.
- The Google Chrome web browser used to use a Bloom filter to identify malicious URLs. Any URL was first checked against a local Bloom filter, and only if the Bloom filter returned a positive result was a full check of the URL performed (and the user warned, if that too returned a positive result).
- Medium uses bloom filters for recommending post to users by filtering post which have been seen by user.
출처: https://medium.com/@divyanshchowdhary96/introduction-to-bloom-filter-d4235074aece
Redis Bloom Filter
https://redis.io/docs/latest/develop/data-types/probabilistic/bloom-filter/
item 수, 원하는 error_rate(false positive)을 입력하면 저장 공간에 필요한 memory를 계산할 수 있는데요
bits_per_item = -log(error)/ln(2) memory = capacity * bits_per_item memory = capacity * (-log(error)/ln(2))
- 1% error rate requires 10.08 bits per item
- 0.1% error rate requires 14.4 bits per item
- 0.01% error rate requires 20.16 bits per item
error rate 0.01% 기준 redis set에 비해 Item당 20bit가 절약된다고 합니다.
실제 저희 서비스에서 제공하는 모든 유저, 아이템 쌍을 기준으로 계산해 보니 1GB 정도가 소요되었습니다. 특정 사용목적을 가정했을 때에는, RDBMS로 관리되는 유저 <-> 아이템 소장 테이블이 300GB가 넘는 것과 비교하여 매우 공간 효율적인 자료구조임은 분명합니다.
Data Pipeline 구성
RDBMS 테이블의 경우 서비스 트랜잭션에서 실시간으로 싱크 되는 테이블인 반면, bloom filter를 사용하게 되면 직접 해당 테이블의 내역과 동기화해 주는 파이프라인이 필요하게 되며 아래와 같이 구성할 수 있습니다.
+---------------+ +------------+ +-----------------+
| user-book | | Kafka | synchronize | Redis Bloom |
| publisher | ------> | (user-book)| ------> | Filter |
| | | Topic | | |
+---------------+ +------------+ +-----------------+
Step 1: user-book publisher publihshes item ownership details to Kafka's `user-book` topic.
Step 2: Consumer application consumes the messages from the `user-book` topic.
Step 3: Consumer application synchronizes the data with the Redis Bloom Filter.
Spark Structured Streaming with Kafka, Redis Integration
저희의 경우 kafka topic subscriber로 Spark Structured Streaming을 사용하고 있어 이를 구현한 코드 일부를 수정하여 첨부하였습니다.
def main():
task_args = UserBookBFArguments.parse_argument()
logger.info(task_args)
spark = sc.create_spark_stream_session()
source_reader = kafka_sources.SourceReader(kafka_configs.USER_BOOK_CONFIG)
data_transformer = kafka_transformer.StreamTransformer(kafka_configs.USER_BOOK_CONFIG)
sink_writer = kafka_sinks.SinkWriter(kafka_configs.USER_BOOK_CONFIG, task_args.database)
r_client = redis_client.get_bloom_filter_writer()
try:
r_client.bf_create(task_args.bf_key, task_args.capacity, task_args.error_rate)
except redis_exceptions.ResponseError as e:
logger.error(f"Failed to create bloom filter: {e}")
logger.info(f"bloom filter {task_args.bf_key} is already exists. Skip creating bloom filter.")
else:
logger.info(f"Successfully created bloom filter {task_args.bf_key}")
# 1. user-book topic read
user_book_df = source_reader.read(spark, batch=is_batch_mode)
user_book_df = data_transformer.transform(user_book_df)
def custom_batch_callable(batch: sql.DataFrame, _: int | None):
user_books = batch.select("user_book_id").distinct().collect()
user_books = list(map(lambda row: row.user_book_id, user_books))
logger.info(f"Number of user_books: {len(user_books)}")
# 2. RDBMS db transaction 내역과 정확히 일치시키기 위함. 컬럼은 마스킹하였습니다.
user_book_df = rds_client.read_view_from_library(
spark_session=spark,
view=f"""(
SELECT
유저식별자,
아이템식별자,
삭제여부,
취소여부,
등록일자,
서비스타입,
FROM 소장내역테이블 ub
JOIN 소장내역테이블 ub2 ON ub.u_idx = ub2.u_idx AND ub.b_id = ub2.b_id
WHERE ub.PK_ID IN ({','.join(map(str, user_books))})
) AS tmp_alias
""",
)
user_book_df = get_service_type_user_book(user_book_df)
created_user_book_df = get_created_user_book(user_book_df)
# bloom filter value를 user identifier와 item id의 조합으로 만들어줍니다.
created_user_book_df = created_user_book_df.withColumn("value", F.concat_ws(":", "유저 식별자", "아이템 아이디"))
# 3. 소장내역을 bloom filter에 add
for chunk in _make_chunk(created_user_book_df.toLocalIterator(), 100):
r_client.bf_madd(task_args.bf_key, *map(lambda row: row.value, chunk))
sink_writer.write(user_book_df, custom_batch_callable)
if __name__ == "__main__":
main()
흐름을 요약하면 아래와 같습니다.
1. user-book topic을 micro batch 마다 consuming
2. batch내 user-book 세부 내역을 DB transaction과 비교하여 validation
3. 검증된 소장내역만 bloom filter에 add
E2E Test
보통 파이프라인은 외부저장소의 데이터를 읽어서 외부저장소로 데이터를 쓰는 것으로 끝나게 되는데요, 팀에서는 이를 로컬 또는 github actions에서도 테스트할 수 있도록 E2E 기능 테스트를 구현하여 robust 한 파이프라인을 유지하고 있습니다.
테스트 코드를 수정하여 일부 소개드립니다.
_TEST_REDIS_CLUSTER_NAME = "bloom-filter"
_TEST_BF_KEY = "ridi-user-book"
_TEST_DATE = datetime.date(2021, 1, 1)
fixture_rbus_user_book = pytest.fixture(
fixture_function=hook.rbus_fixture_hook(
topic="user-book",
messages=[
kafka_model.Message(key="1", value={"data": {"userBookId": 1, "type": "created", "timestamp": 1}}),
kafka_model.Message(key="2", value={"data": {"userBookId": 2, "type": "created", "timestamp": 1}}),
kafka_model.Message(key="3", value={"data": {"userBookId": 3, "type": "created", "timestamp": 1}}),
kafka_model.Message(key="4", value={"data": {"userBookId": 4, "type": "created", "timestamp": 1}}),
kafka_model.Message(key="5", value={"data": {"userBookId": 5, "type": "updated_status", "timestamp": 1}}),
],
),
name="rbus_user_book",
)
fixture_rds_user_book = pytest.fixture(
fixture_function=hook.store_fixture_hook(
db="{{ 생략 }}",
table="{{ 생략 }}",
data=[
{"pk_id": 1, "u_idx": 1, "b_id": "1"},
{"pk_id": 2, "u_idx": 2, "b_id": "2"},
{"pk_id": 3, "u_idx": 3, "b_id": "3"},
{"pk_id": 4, "u_idx": 4, "b_id": "4", "is_deleted": 1, "cancel": "Y"},
],
),
name="rds_tb_user_book",
)
@pytest.mark.e2e
def test_main(mocker: pytest_mock.MockFixture, spark_delta_session, setup_bloom_filter, rbus_user_book, rds_tb_user_book):
mocker.patch(
"sys.argv",
[
__name__,
"--database",
"test",
"--date",
str(_TEST_DATE),
"--bf-key",
_TEST_BF_KEY,
"--capacity",
"1000",
"--error-rate",
"0.01",
"--service-type",
"ridi",
],
)
bf_builder.main()
r_client = redis_client.get_bloom_filter_reader()
actual = r_client.bf_mexists(
_TEST_BF_KEY,
*[
"1:1",
"2:2",
"3:3",
"4:4",
]
)
expected = [True, True, True, False]
assert actual[:3] == expected[:3]
# we do not assert as false positive rate 0.01.
test_utils.expect(actual[3:], expected[3:])
다음번에 시간이 된다면 e2e test를 구성하는 각 fixture hook function들에 대해서도 소개하도록 하겠습니다.
추천 API
팀에서는 추천에 필요한 함수를 모듈로 포팅하여 서비스 백엔드팀에 제공하고 있는데요, 서버는 Typecsript + express.js로 운영하고 있습니다. 동일한 기술 스택을 사용하시는 경우 node-redis, @redis/bloom을 사용하여 client를 구성하고 bloom filter를 사용해 볼 수 있으니 코드는 생략하도록 하겠습니다.
- https://www.npmjs.com/package/redis
- https://github.com/redis/node-redis/tree/d5355d43275fedf1b2afc4db8a926f72b05f79c5/packages/bloom
이상으로 긴 글을 읽어주셔서 감사드리고 궁금하시거나 틀린 부분은 댓글로 부탁드립니다.
감사합니다 :).
'Data Engineering > MLOps' 카테고리의 다른 글
e-commerce 추천 시스템 고도화 하기 시리즈 [3] Realtime inference (0) | 2024.06.10 |
---|---|
e-commerce 추천 시스템 고도화 하기 시리즈 [2] AWS SageMaker model registry (0) | 2024.06.01 |
e-commerce 추천 시스템 고도화 하기 시리즈 [1] feature store (0) | 2024.06.01 |
Nvidia Container Toolkit, Nvidia device plugin에 대해 알아봅시다. feat. CRI, CDI (0) | 2024.03.30 |