Hyesung Oh

Bloom Filter 를 사용해봅시다 [2] Redis Bloom Filter feat. 추천시스템 본문

Data Engineering/MLOps

Bloom Filter 를 사용해봅시다 [2] Redis Bloom Filter feat. 추천시스템

혜성 Hyesung 2024. 6. 17. 21:09
반응형

TL;DR

지난 포스팅에서는 Bloom Filter 소개와 Pyspark Integration에 대해서 소개했습니다.

https://surgach.tistory.com/140

 

Bloom Filter 를 사용해봅시다 [1] python, pyspark bloom filter 구현

TL;DR개발자가 작성하는 많은 비즈니스로직에는 특정 조건, 집단에 해당하는 item, user만 포함 or 제외하는 형태가 많은 부분을 차지합니다. 특히 데이터 엔지니어의 경우 통계 마트 테이블을 만들

surgach.tistory.com

Bloom Filter는 다양한 활용처가 있겠지만, 그중에서도 추천 시스템에 활용할 수 있습니다. Redis-stack에서는 Bloom Filter를 제공하는데요, 추천 결과를 유저에게 제공할 때, 유저별로 제외해야 할 아이템을 찾는 데 사용할 수 있습니다.

아키텍처 계층 관점에서는 Client가 UI를 renderning 하기 위해 호출하는 API와 추천 결과를 서빙하는 Storage or Inference server 사이에 Redis Bloom Filter를 위치시킬 수 있을 것입니다.

+---------+     +---------+     +---------------+     +-----------------+
|  Client | --> |   API   | --> | Redis Bloom   | --> | Storage or      |
|         |     |  Server |     |   Filter      |     | Inference Server|
+---------+     +---------+     +---------------+     +-----------------+

기존 방식은 유저별로 추천에서 제외해야할 아이템 목록을 RDBMS 테이블로 관리를 하는 것이고, index를 태우면 충분한 속도로 서빙이 가능했습니다.
다만, 사용자별로 제외해야 할 작품을 적용하는 로직은 100%의 정확도를 요구하지 않고 어느 정도의 false positive가 허용된다는 점(예: 소장하지 않았지만 소장했다고 보고 제외한다거나)을 고려했을 때, 보다 자원 효율적인 인-메모리 확률형 자료구조 Redis Bloom Filter를 도입해볼 수 있습니다.

많은 테크 기업에서 Bloom Filter를 활용하고 있고 아래는 대표적인 use case 들입니다.

  • Google BigtableApache HBase and Apache Cassandra and PostgreSQL use Bloom filters to reduce the disk lookups for non-existent rows or columns. Avoiding costly disk lookups considerably increases the performance of a database query operation.
  • The Google Chrome web browser used to use a Bloom filter to identify malicious URLs. Any URL was first checked against a local Bloom filter, and only if the Bloom filter returned a positive result was a full check of the URL performed (and the user warned, if that too returned a positive result).
  • Medium uses bloom filters for recommending post to users by filtering post which have been seen by user.

출처: https://medium.com/@divyanshchowdhary96/introduction-to-bloom-filter-d4235074aece

 

Introduction to Bloom Filter

A Bloom filter is a data structure designed to tell you, rapidly and memory-efficiently, whether an element is present in a set.

medium.com

Redis Bloom Filter

https://redis.io/docs/latest/develop/data-types/probabilistic/bloom-filter/

 

Bloom filter

Bloom filters are a probabilistic data structure that checks for presence of an element in a set

redis.io

item 수, 원하는 error_rate(false positive)을 입력하면 저장 공간에 필요한 memory를 계산할 수 있는데요

bits_per_item = -log(error)/ln(2) memory = capacity * bits_per_item memory = capacity * (-log(error)/ln(2))
  • 1% error rate requires 10.08 bits per item
  • 0.1% error rate requires 14.4 bits per item
  • 0.01% error rate requires 20.16 bits per item

error rate 0.01% 기준 redis set에 비해 Item당 20bit가 절약된다고 합니다.
실제 저희 서비스에서 제공하는 모든 유저, 아이템 쌍을 기준으로 계산해 보니 1GB 정도가 소요되었습니다. 특정 사용목적을 가정했을 때에는, RDBMS로 관리되는 유저 <-> 아이템 소장 테이블이 300GB가 넘는 것과 비교하여 매우 공간 효율적인 자료구조임은 분명합니다. 

Data Pipeline 구성

RDBMS 테이블의 경우 서비스 트랜잭션에서 실시간으로 싱크 되는 테이블인 반면, bloom filter를 사용하게 되면 직접 해당 테이블의 내역과 동기화해 주는 파이프라인이 필요하게 되며 아래와 같이 구성할 수 있습니다.

+---------------+            +------------+              +-----------------+
|  user-book    |            |  Kafka      | synchronize |  Redis Bloom    |
|  publisher    |  ------>   |  (user-book)|  ------>    |  Filter         |
|               |            |  Topic      |             |                 |
+---------------+            +------------+              +-----------------+

Step 1: user-book publisher publihshes item ownership details to Kafka's `user-book` topic.
Step 2: Consumer application consumes the messages from the `user-book` topic.
Step 3: Consumer application synchronizes the data with the Redis Bloom Filter.

Spark Structured Streaming with Kafka, Redis Integration

저희의 경우 kafka topic subscriber로 Spark Structured Streaming을 사용하고 있어 이를 구현한 코드 일부를 수정하여 첨부하였습니다.

def main():
    task_args = UserBookBFArguments.parse_argument()
    logger.info(task_args)

    spark = sc.create_spark_stream_session()

    source_reader = kafka_sources.SourceReader(kafka_configs.USER_BOOK_CONFIG)
    data_transformer = kafka_transformer.StreamTransformer(kafka_configs.USER_BOOK_CONFIG)
    sink_writer = kafka_sinks.SinkWriter(kafka_configs.USER_BOOK_CONFIG, task_args.database)
    r_client = redis_client.get_bloom_filter_writer()

    try:
        r_client.bf_create(task_args.bf_key, task_args.capacity, task_args.error_rate)
    except redis_exceptions.ResponseError as e:
        logger.error(f"Failed to create bloom filter: {e}")
        logger.info(f"bloom filter {task_args.bf_key} is already exists. Skip creating bloom filter.")
    else:
        logger.info(f"Successfully created bloom filter {task_args.bf_key}")
	
    # 1. user-book topic read
    user_book_df = source_reader.read(spark, batch=is_batch_mode)
    user_book_df = data_transformer.transform(user_book_df)

    def custom_batch_callable(batch: sql.DataFrame, _: int | None):
        user_books = batch.select("user_book_id").distinct().collect()
        user_books = list(map(lambda row: row.user_book_id, user_books))
        logger.info(f"Number of user_books: {len(user_books)}")
		
        # 2. RDBMS db transaction 내역과 정확히 일치시키기 위함. 컬럼은 마스킹하였습니다.
        user_book_df = rds_client.read_view_from_library(
            spark_session=spark,
            view=f"""(
                SELECT
                    유저식별자,
                    아이템식별자,
                    삭제여부,
                    취소여부,
                    등록일자,
                    서비스타입,
                FROM 소장내역테이블 ub
                    JOIN 소장내역테이블 ub2 ON ub.u_idx = ub2.u_idx AND ub.b_id = ub2.b_id
                WHERE ub.PK_ID IN ({','.join(map(str, user_books))})
            ) AS tmp_alias
            """,
        )

        user_book_df = get_service_type_user_book(user_book_df)
        created_user_book_df = get_created_user_book(user_book_df)
        # bloom filter value를 user identifier와 item id의 조합으로 만들어줍니다.
        created_user_book_df = created_user_book_df.withColumn("value", F.concat_ws(":", "유저 식별자", "아이템 아이디"))
		
        # 3. 소장내역을 bloom filter에 add
        for chunk in _make_chunk(created_user_book_df.toLocalIterator(), 100):
            r_client.bf_madd(task_args.bf_key, *map(lambda row: row.value, chunk))

    sink_writer.write(user_book_df, custom_batch_callable)


if __name__ == "__main__":
    main()

흐름을 요약하면 아래와 같습니다.

1. user-book topic을 micro batch 마다 consuming
2. batch내 user-book 세부 내역을 DB transaction과 비교하여 validation
3. 검증된 소장내역만 bloom filter에 add

E2E Test

보통 파이프라인은 외부저장소의 데이터를 읽어서 외부저장소로 데이터를 쓰는 것으로 끝나게 되는데요, 팀에서는 이를 로컬 또는 github actions에서도 테스트할 수 있도록 E2E 기능 테스트를 구현하여 robust 한 파이프라인을 유지하고 있습니다.

테스트 코드를 수정하여 일부 소개드립니다.

_TEST_REDIS_CLUSTER_NAME = "bloom-filter"
_TEST_BF_KEY = "ridi-user-book"
_TEST_DATE = datetime.date(2021, 1, 1)

fixture_rbus_user_book = pytest.fixture(
    fixture_function=hook.rbus_fixture_hook(
        topic="user-book",
        messages=[
            kafka_model.Message(key="1", value={"data": {"userBookId": 1, "type": "created", "timestamp": 1}}),
            kafka_model.Message(key="2", value={"data": {"userBookId": 2, "type": "created", "timestamp": 1}}),
            kafka_model.Message(key="3", value={"data": {"userBookId": 3, "type": "created", "timestamp": 1}}),
            kafka_model.Message(key="4", value={"data": {"userBookId": 4, "type": "created", "timestamp": 1}}),
            kafka_model.Message(key="5", value={"data": {"userBookId": 5, "type": "updated_status", "timestamp": 1}}),
        ],
    ),
    name="rbus_user_book",
)

fixture_rds_user_book = pytest.fixture(
    fixture_function=hook.store_fixture_hook(
        db="{{ 생략 }}",
        table="{{ 생략 }}",
        data=[
            {"pk_id": 1, "u_idx": 1, "b_id": "1"},
            {"pk_id": 2, "u_idx": 2, "b_id": "2"},
            {"pk_id": 3, "u_idx": 3, "b_id": "3"},
            {"pk_id": 4, "u_idx": 4, "b_id": "4", "is_deleted": 1, "cancel": "Y"},
        ],
    ),
    name="rds_tb_user_book",
)


@pytest.mark.e2e
def test_main(mocker: pytest_mock.MockFixture, spark_delta_session, setup_bloom_filter, rbus_user_book, rds_tb_user_book):
    mocker.patch(
        "sys.argv",
        [
            __name__,
            "--database",
            "test",
            "--date",
            str(_TEST_DATE),
            "--bf-key",
            _TEST_BF_KEY,
            "--capacity",
            "1000",
            "--error-rate",
            "0.01",
            "--service-type",
            "ridi",
        ],
    )

    bf_builder.main()

    r_client = redis_client.get_bloom_filter_reader()
    actual = r_client.bf_mexists(
        _TEST_BF_KEY,
        *[
            "1:1",
            "2:2",
            "3:3",
            "4:4",
        ]
    )
    expected = [True, True, True, False]

    assert actual[:3] == expected[:3]
    # we do not assert as false positive rate 0.01.
    test_utils.expect(actual[3:], expected[3:])

다음번에 시간이 된다면 e2e test를 구성하는 각 fixture hook function들에 대해서도 소개하도록 하겠습니다.

 

추천 API

팀에서는 추천에 필요한 함수를 모듈로 포팅하여 서비스 백엔드팀에 제공하고 있는데요, 서버는 Typecsript + express.js로 운영하고 있습니다. 동일한 기술 스택을 사용하시는 경우 node-redis, @redis/bloom을 사용하여 client를 구성하고 bloom filter를 사용해 볼 수 있으니 코드는 생략하도록 하겠습니다.

- https://www.npmjs.com/package/redis

 

redis

A modern, high performance Redis client. Latest version: 4.6.14, last published: a month ago. Start using redis in your project by running `npm i redis`. There are 9835 other projects in the npm registry using redis.

www.npmjs.com

- https://github.com/redis/node-redis/tree/d5355d43275fedf1b2afc4db8a926f72b05f79c5/packages/bloom

 

node-redis/packages/bloom at d5355d43275fedf1b2afc4db8a926f72b05f79c5 · redis/node-redis

Redis Node.js client. Contribute to redis/node-redis development by creating an account on GitHub.

github.com

 

이상으로 긴 글을 읽어주셔서 감사드리고 궁금하시거나 틀린 부분은 댓글로 부탁드립니다.

감사합니다 :).

 

반응형
Comments