Hyesung Oh

e-commerce 추천 시스템 고도화 하기 시리즈 [3] Realtime inference 본문

Data Engineering/MLOps

e-commerce 추천 시스템 고도화 하기 시리즈 [3] Realtime inference

혜성 Hyesung 2024. 6. 10. 23:03
반응형

서론

지난 게시글에서는 end-to-end 추천 파이프라인에서 꼭 필요했던 기능들을 적용하며 개선한 과정을 소개했습니다. 
https://surgach.tistory.com/138

 

e-commerce 추천 시스템 고도화 하기 시리즈 [2] AWS SageMaker model registry

서론추천 시스템을 고도화하기 마음먹은 과정과 feature store 도입기에 대한 짧은 소개에 이어서, 이번 편에서는 본격적으로 파이프라인에 필요했던 개선사항들을 적용한 내용들을 정리해보려 합

surgach.tistory.com

이번 포스트에선 AWS SageMaker를 활용하여 실시간 추천 모델 서버를 구축해 본 경험을 정리해보려 합니다. 소위 real-time inference는 당장의 비즈니스 요구 사항은 아닙니다만, 팀의 기술력 제고 및 앞으로를 대비하는 차원에서 PoC 수준으로 진행해 보았습니다 :).

본론

Model Server

모델 서버를 위한 모든 Tool 을 조사하진 못했지만, 대체로 아래 5개 정도를 많이 사용하는 것으로 보입니다.
- TorchServe (documentation)
- TensorFlow Serving (documentation)
- Triton™ Inference Server (documentation)
- Multi Model Server (documentation)
- BentoML (documentation)

위 Tool들에 대한 비교 내용을 다룬 괜찮은 게시글이 있어 첨부합니다.

https://biano-ai.github.io/research/2021/08/16/quantitative-comparison-of-serving-platforms-for-neural-networks.html

 

A Quantitative Comparison of Serving Platforms for Neural Networks

Choosing the suitable method of production serving your neural network model is one of the most critical decisions. We tried to compare the most popular serving platforms performance, stability and usage complexity.

biano-ai.github.io

 

기본적으로 REST API를 지원하며, gRPC 지원여부, 사용 중인 deep learning framework 지원 및 최적화 여부 등을 고려하여 각자 상황에 맞게 사용하게 될 거 같습니다. 개인적으로는 그중에서 Nvidia의 Triton에 관심이 많은데요, Triton은 모델 경량화 및 하드웨어 최적화를 잘추상화하여 지원합니다.

대용량 데이터를 비교적 여유로운 시간동안 핸들링하는 배치성 Inference는 비교적 풍부한 컴퓨팅 자원을 활용할 수 있지만, Real-time inference는 mobile과 같은 비교적 컴퓨팅과 스토리지 자원이 제한된 환경에서 돌아가는 경우가 많아 `최적화` 및 `경량화`가 핵심이기에 최근 각광받고 있는 게 아닐까 합니다.

팀에서는 PyTorch Framework를 사용하여 모델을 개발하고 있고 그 중에서도 꽤나 사이즈가 큰 모델도 있어 이번 기회에 Triton을 사용해볼까 싶었지만 , 테스트 간소화 및 실시간 추천 서버 구축 가능성 검토가 주목적이었기에 torchserve를 선택했습니다. 

Deploy model with TorchServe

https://docs.aws.amazon.com/sagemaker/latest/dg/deploy-models-frameworks-torchserve.html

 

Deploy models with TorchServe - Amazon SageMaker

Deploy models with TorchServe TorchServe is the recommended model server for PyTorch, preinstalled in the AWS PyTorch Deep Learning Container (DLC). This powerful tool offers customers a consistent and user-friendly experience, delivering high performance

docs.aws.amazon.com

위 공식문서는 내용이 다소 중구난방이라 참고만 하였고 구현 과정에서 꼭 필요한 내용 위주로 코드 레벨에서 정리해보겠습니다. 

모델 서버를 실행하기 위해선 크게 세 가지가 필요합니다.

1. Docker image
2. inference handler
3. model file

1. Docker image

base image는 AWS Deep Learning containers에서 제공하는 pytorch inference image를 사용했습니다.

ARG BASE_IMAGE
FROM $BASE_IMAGE

# Install python dependencies.
...

2. inference handler

프로젝트 구성에서 infernece/handler 디렉토리에는 각 모델별로 구현한 handler function들이 정의되어 있습니다.

모델별로 필요한 handler 파일은 아래 스크립트를 통해 S3에 배포되고

cp ${WORKSPACE_DIR}/${HANDLER_PATH} ${TMP_DIR}/inference.py

Model Deploy 시에 Docker container의 환경변수(SAGEMAKER_PROGRAM)로 등록됩니다. 사용자 entrypoint를 지정하지 않으면 default handler가 사용되며 자세한 사항은 아래 github repo를 참고하시면 이해할 수 있습니다.
https://github.com/aws/sagemaker-pytorch-inference-toolkit/blob/master/src/sagemaker_inference/transformer.py#L200

 

sagemaker-pytorch-inference-toolkit/src/sagemaker_inference/transformer.py at master · aws/sagemaker-pytorch-inference-toolkit

Toolkit for allowing inference and serving with PyTorch on SageMaker. Dockerfiles used for building SageMaker Pytorch Containers are at https://github.com/aws/deep-learning-containers. - aws/sagema...

github.com

코드 구현 중, class 계층 관점에서 보면
user request의 진입점인 HandlerService <- 각 요청을 변환 및 inference 하는 Transforemer <- input, model, predict, output에 대한 세부구현을 다루는 Handler 참조 구조를 하고 있습니다.

# HandlerService

class HandlerService(DefaultHandlerService):

    """Handler service that is executed by the model server.

    Determines specific default inference handlers to use based on the type MXNet model being used.

    This class extends ``DefaultHandlerService``, which define the following:
        - The ``handle`` method is invoked for all incoming inference requests to the model server.
        - The ``initialize`` method is invoked at model server start up.

    Based on: https://github.com/awslabs/mxnet-model-server/blob/master/docs/custom_service.md

    """
    def __init__(self):
        self._initialized = False

        transformer = Transformer(default_inference_handler=DefaultPytorchInferenceHandler())
        super(HandlerService, self).__init__(transformer=transformer)

    def initialize(self, context):
        # Adding the 'code' directory path to sys.path to allow importing user modules when multi-model mode is enabled.
        if (not self._initialized) and ENABLE_MULTI_MODEL:
            code_dir = os.path.join(context.system_properties.get("model_dir"), 'code')
            sys.path.append(code_dir)
            self._initialized = True

        super().initialize(context)

# Transformer

 

sagemaker-pytorch-inference-toolkit/src/sagemaker_inference/transformer.py at master · aws/sagemaker-pytorch-inference-toolkit

Toolkit for allowing inference and serving with PyTorch on SageMaker. Dockerfiles used for building SageMaker Pytorch Containers are at https://github.com/aws/deep-learning-containers. - aws/sagema...

github.com

class Transformer(object):
    """Represents the execution workflow for handling inference requests
    sent to the model server.
    """

    def __init__(self, default_inference_handler=None):
        """Initialize a ``Transformer``.

        Args:
            default_inference_handler (DefaultInferenceHandler): default implementation of
                inference handlers to use in absence of expected serving functions within
                the user module. Defaults to ``DefaultInferenceHandler``.

        """
        self._default_inference_handler = default_inference_handler or DefaultInferenceHandler()
        self._initialized = False
        self._environment = None
        self._model = None

        self._pre_model_fn = None
        self._model_warmup_fn = None
        self._model_fn = None
        self._transform_fn = None
        self._input_fn = None
        self._predict_fn = None
        self._output_fn = None
        self._context = None

Transformer의 docstring 에서 말하는 user module이 바로 위에서 저희가 환경변수 SAGEMAKER_PROGRAM으로 주입한 inference.py 파일 경로입니다. 

S3에 업로드 했지만 sagemaker model wrapper 구현체에서 이를 docker image로 repacking 하게 되어있습니다. 자세한 내부 구현은 sagemaker sdk 코드를 보면 누구나 쉽게 이해할 수 있습니다.

# inference.py

아래는 테스트시 사용한 handler method 구현 예시입니다. decorator 부분은 테스트 디버깅을 위해 slack으로 stack trace를 받아 보는 용도로 무시하셔도 됩니다. 

@watcher.report_error
def model_fn(model_dir):
    """Load the PyTorch model from the `model_dir` directory."""

    device = "cuda" if torch.cuda.is_available() else "cpu"
    logger.info(f"Using device: {device}")

    return model_utils.load_model(
        model_path=model_dir,
        model_klass=vae_cf.VAE,
        generator_klass=item2user.ExclusionGenerator,
        device=device,
    )


@watcher.report_error
def input_fn(
    input_data,
    content_type,
):
    """input_fn that can handle JSON formats.

    Args:
        input_data: the request payload serialized in the content_type format
        content_type: the request content_type
    Returns: input_data deserialized into torch.FloatTensor or torch.cuda.FloatTensor depending if cuda is available.
    """

    input_data = json.loads(input_data) if isinstance(input_data, str) else input_data
    logger.info(f"input_data: {input_data}")

    assert _INPUT_ITEM_ID in input_data, f"input_data must have {_INPUT_ITEM_ID} key, but got {input_data.keys()}"

    indices = meta_utils.convert_id_to_idx(input_data.get(_INPUT_ITEM_ID, []))
    refined_input = torch_utils.multi_hot_encoding(
        indices,
        meta_utils.get_item_size(),
    ).unsqueeze(0)

    return refined_input


@watcher.report_error
def predict_fn(input_object, model: model_base.ModelBase):
    """predict_fn for PyTorch. Calls a model on data deserialized in input_fn."""

    output, _, _ = model(input_object)
    if model.exclude_inputs_from_predictions:
        output[input_object > 0] = -np.inf

    topk_score, topk_idx = torch.topk(output, k=_INFERENCE_TOP_K)
    topk_score, topk_idx = topk_score.detach().cpu().numpy().tolist(), topk_idx.detach().cpu().numpy().tolist()
    # Flatten the batch dimension
    topk_score, topk_idx = list(itertools.chain(*topk_score)), list(itertools.chain(*topk_idx))

    predict_result = {
        "item_id": meta_utils.convert_idx_to_id(topk_idx),
        "score": topk_score,
    }
    return predict_result


@watcher.report_error
def output_fn(predictions, response_content_type):
    """Serializes predictions from predict_fn into JSON format."""
    return json.dumps(predictions)

 

3. Model File

model registry에 등록된 model checkpoint 경로를 사용하면 됩니다. model registry에 대해서는 지난번 게시글을 참고해 주세요 :)

https://surgach.tistory.com/138

 

e-commerce 추천 시스템 고도화 하기 시리즈 [2] AWS SageMaker model registry

서론추천 시스템을 고도화하기 마음먹은 과정과 feature store 도입기에 대한 짧은 소개에 이어서, 이번 편에서는 본격적으로 파이프라인에 필요했던 개선사항들을 적용한 내용들을 정리해보려 합

surgach.tistory.com

 

위 기반 작업이 끝나면 최종적으로 아래 sagemaker sdk 를 사용한 짧은 스크립트를 실행하여 모델 서버를 실행할 수 있습니다.

class Config(pydantic_settings.BaseSettings):
    aws_region: str = pydantic.Field(default="us-east-1", env="AWS_REGION")
    output_path: str = pydantic.Field(env="OUTPUT_PATH")
    source_dir: str = pydantic.Field(env="SOURCE_DIR")
    image_uri: str = pydantic.Field(env="IMAGE_URI")
    ...
    inference_instance_type: str = pydantic.Field(default="ml.c4.2xlarge", env="INFERENCE_INSTANCE_TYPE")
    inference_instance_count: int = pydantic.Field(default=1, env="INFERENCE_INSTANCE_COUNT")
    pytorch_model_path: str = pydantic.Field(env="PYTORCH_MODEL_PATH")
    wait: bool = pydantic.Field(default=True, env="WAIT")
    ...이하 생략...

    class Config:
        env_file = ".env.inference"
        extra = "allow"


def main():
    config = Config()

    logger.info(
        f"region: {config.aws_region}\n"
        f"source_dir: {config.source_dir}\n"
        f"image_uri: {config.image_uri}\n"
        f"instance_type: {config.inference_instance_type}\n"
        f"instance_count: {config.inference_instance_count}\n"
        f"sample_inference_data_dir: {config.sample_inference_data_dir}\n"
        f"sample_inference_meta_dir: {config.sample_inference_meta_dir}\n"
        f"role_arn: {_SAGEMAKER_ROLE_ARN}\n"
        f"pytorch_model_path: {config.pytorch_model_path}\n"
    )
    boto3_session = boto3.Session(region_name=config.aws_region)
    sagemaker_session = sagemaker.Session(boto3_session)

    model = sagemaker_model.Model(
        image_uri=config.image_uri,
        model_data=config.pytorch_model_path,
        sagemaker_session=sagemaker_session,
        env=...
        role=_SAGEMAKER_ROLE_ARN,
        entry_point=_ENTRY_POINT,
        source_dir=f"{config.source_dir}/{_CODE_TAR}",
    )

    logger.info("Deploying model in real-time mode.")
    model.deploy(
        initial_instance_count=config.inference_instance_count,
        instance_type=config.inference_instance_type,
        wait=config.wait,
    )

    logger.info(
        "Model deployed successfully."
        "You can now use the endpoint to get inference from the model."
        "Here is an example code to get inference from the model:"
    )
    logger.info(
        "\nimport boto3\n"
        f"boto3_session = boto3.Session(region_name='{config.aws_region}')\n"
        f"sagemaker_runtime = boto3_session.client('sagemaker-runtime')\n"
        f"endpoint_name = '{model.endpoint_name}'\n"
        f"response = sagemaker_runtime.invoke_endpoint(\n"
        f"   EndpointName='{model.endpoint_name}',\n"
        "   ContentType='application/json',\n"
        '   Body="{"item_id": [your_item_id, ...]}\',\n'
        ")\n"
    )


if __name__ == "__main__":
    main()

 

Real-time inference..!?

마지막으로 모델 추론이 잘되나 테스트 해볼 차례입니다.
참고로 Body의 item_id/는 각 도서별 식별자로 학습 시 모델 Input으로 들어가기 전 indexing에 사용된 값을 마스킹한 것입니다.

import boto3


def main():
    sagemaker_runtime = boto3.client("sagemaker-runtime", region_name={{ your region }})
    endpoint_name = {{ your endpoint name }}
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType='application/json',
        Body='{"item_id": [item_id_1, item_id_1, item_id_3, ...]}',
    )

    print(response['Body'].read().decode('utf-8'))


if __name__ == "__main__":
    main()

성공..! 의도한대로 추천 item id list와 score list가 return 되었습니다 :) 

 

앞으로 가야할 길..

이렇게 SageMaker를 사용하여 모델을 배포하고 실시간으로 추천 결과를 받아볼 수 있는 환경을 간단하게 구성해 볼 수 있었습니다만, 실제 운영환경에 도입하기엔 넘어야 할 산이 많습니다.

개인적으로는 모델 추론 최적화 분야에 관심이 많은데요, 앞으로 시간이 날 때마다 사이드 프로젝트를 진행하고 과정에서 공부한 관련 최적화 방법론(Quantization, Layer and Tensor Fusion 등)들을 Deep dive 시리즈로 포스팅해 볼 계획입니다. 

설레는 마음을 한편에 간직한 채, 구상 중인 사이드 프로젝트에 대한 간략히 소개로 글을 마치도록 하겠습니다 :) 

최종 Goal은 Triton에서 Pytorch로 구현한 Bert4rec 모델의 추론 속도를 최적화해보는 것입니다
그러기 위해선 먼저 Pytorch를 TensorRT라는 중간표현으로 변환해볼 것입니다. TorchScript -> compiled model -> TensorRT 순서지만, 이를 간단하게 구현 가능하도록 TorchScript의 확장버전인 Torch-TensorRT 라이브러리를 Nvidia에서 제공하고 있습니다. 

자세한 내용은 공식 가이드 문서.

 

Torch-TensorRT를 통해 PyTorch에서 추론 속도 최대 6배 향상하기

코드 한 줄로 추론 속도를 높여주는 NVIDIA TensorRT와 PyTorch의 새로운 통합인 Torch-TensorRT가 매우 기대됩니다. PyTorch는 오늘날 전 세계 수백만 명이 사용하는 최고의 딥 러닝 프레임워크입니다. TensorR

developer.nvidia.com

최종적으로 낮아진 precision과 증가한 추론 속도를 비교해 보며 운영환경에 사용하기 위한 그 사이의 나름의 타협 지점을 고민해 볼 것입니다.

공부해야 할 게 산더미지만 무엇을 해야 할지 알기에 기분이 좋은 거 같습니다 :)

이상으로 글을 마치겠습니다.

반응형
Comments