일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Data engineering
- 빅데이터플랫폼
- eks
- pyspark
- 클라우데라
- hadoop
- apache spark
- 하둡에코시스템
- DataEngineering
- 블로그
- AWS SageMaker
- dataengineer
- 개발자
- redis bloom filter
- 빅데이터
- 하둡
- Spark structured streaming
- 데이터엔지니어
- spark
- 추천시스템
- 데이터엔지니어링
- kafka
- 개발자혜성
- mlops
- kubernetes
- Terraform
- Python
- cloudera
- BigData
- recommendation system
- Today
- Total
Hyesung Oh
e-commerce 추천 시스템 고도화 하기 시리즈 [3] Realtime inference 본문
e-commerce 추천 시스템 고도화 하기 시리즈 [3] Realtime inference
혜성 Hyesung 2024. 6. 10. 23:03서론
지난 게시글에서는 end-to-end 추천 파이프라인에서 꼭 필요했던 기능들을 적용하며 개선한 과정을 소개했습니다.
https://surgach.tistory.com/138
이번 포스트에선 AWS SageMaker를 활용하여 실시간 추천 모델 서버를 구축해 본 경험을 정리해보려 합니다. 소위 real-time inference는 당장의 비즈니스 요구 사항은 아닙니다만, 팀의 기술력 제고 및 앞으로를 대비하는 차원에서 PoC 수준으로 진행해 보았습니다 :).
본론
Model Server
모델 서버를 위한 모든 Tool 을 조사하진 못했지만, 대체로 아래 5개 정도를 많이 사용하는 것으로 보입니다.
- TorchServe (documentation)
- TensorFlow Serving (documentation)
- Triton™ Inference Server (documentation)
- Multi Model Server (documentation)
- BentoML (documentation)
위 Tool들에 대한 비교 내용을 다룬 괜찮은 게시글이 있어 첨부합니다.
기본적으로 REST API를 지원하며, gRPC 지원여부, 사용 중인 deep learning framework 지원 및 최적화 여부 등을 고려하여 각자 상황에 맞게 사용하게 될 거 같습니다. 개인적으로는 그중에서 Nvidia의 Triton에 관심이 많은데요, Triton은 모델 경량화 및 하드웨어 최적화를 잘추상화하여 지원합니다.
대용량 데이터를 비교적 여유로운 시간동안 핸들링하는 배치성 Inference는 비교적 풍부한 컴퓨팅 자원을 활용할 수 있지만, Real-time inference는 mobile과 같은 비교적 컴퓨팅과 스토리지 자원이 제한된 환경에서 돌아가는 경우가 많아 `최적화` 및 `경량화`가 핵심이기에 최근 각광받고 있는 게 아닐까 합니다.
팀에서는 PyTorch Framework를 사용하여 모델을 개발하고 있고 그 중에서도 꽤나 사이즈가 큰 모델도 있어 이번 기회에 Triton을 사용해볼까 싶었지만 , 테스트 간소화 및 실시간 추천 서버 구축 가능성 검토가 주목적이었기에 torchserve를 선택했습니다.
Deploy model with TorchServe
https://docs.aws.amazon.com/sagemaker/latest/dg/deploy-models-frameworks-torchserve.html
위 공식문서는 내용이 다소 중구난방이라 참고만 하였고 구현 과정에서 꼭 필요한 내용 위주로 코드 레벨에서 정리해보겠습니다.
모델 서버를 실행하기 위해선 크게 세 가지가 필요합니다.
1. Docker image
2. inference handler
3. model file
1. Docker image
base image는 AWS Deep Learning containers에서 제공하는 pytorch inference image를 사용했습니다.
ARG BASE_IMAGE
FROM $BASE_IMAGE
# Install python dependencies.
...
2. inference handler
프로젝트 구성에서 infernece/handler 디렉토리에는 각 모델별로 구현한 handler function들이 정의되어 있습니다.
모델별로 필요한 handler 파일은 아래 스크립트를 통해 S3에 배포되고
cp ${WORKSPACE_DIR}/${HANDLER_PATH} ${TMP_DIR}/inference.py
Model Deploy 시에 Docker container의 환경변수(SAGEMAKER_PROGRAM)로 등록됩니다. 사용자 entrypoint를 지정하지 않으면 default handler가 사용되며 자세한 사항은 아래 github repo를 참고하시면 이해할 수 있습니다.
https://github.com/aws/sagemaker-pytorch-inference-toolkit/blob/master/src/sagemaker_inference/transformer.py#L200
코드 구현 중, class 계층 관점에서 보면
user request의 진입점인 HandlerService <- 각 요청을 변환 및 inference 하는 Transforemer <- input, model, predict, output에 대한 세부구현을 다루는 Handler 참조 구조를 하고 있습니다.
class HandlerService(DefaultHandlerService):
"""Handler service that is executed by the model server.
Determines specific default inference handlers to use based on the type MXNet model being used.
This class extends ``DefaultHandlerService``, which define the following:
- The ``handle`` method is invoked for all incoming inference requests to the model server.
- The ``initialize`` method is invoked at model server start up.
Based on: https://github.com/awslabs/mxnet-model-server/blob/master/docs/custom_service.md
"""
def __init__(self):
self._initialized = False
transformer = Transformer(default_inference_handler=DefaultPytorchInferenceHandler())
super(HandlerService, self).__init__(transformer=transformer)
def initialize(self, context):
# Adding the 'code' directory path to sys.path to allow importing user modules when multi-model mode is enabled.
if (not self._initialized) and ENABLE_MULTI_MODEL:
code_dir = os.path.join(context.system_properties.get("model_dir"), 'code')
sys.path.append(code_dir)
self._initialized = True
super().initialize(context)
class Transformer(object):
"""Represents the execution workflow for handling inference requests
sent to the model server.
"""
def __init__(self, default_inference_handler=None):
"""Initialize a ``Transformer``.
Args:
default_inference_handler (DefaultInferenceHandler): default implementation of
inference handlers to use in absence of expected serving functions within
the user module. Defaults to ``DefaultInferenceHandler``.
"""
self._default_inference_handler = default_inference_handler or DefaultInferenceHandler()
self._initialized = False
self._environment = None
self._model = None
self._pre_model_fn = None
self._model_warmup_fn = None
self._model_fn = None
self._transform_fn = None
self._input_fn = None
self._predict_fn = None
self._output_fn = None
self._context = None
Transformer의 docstring 에서 말하는 user module이 바로 위에서 저희가 환경변수 SAGEMAKER_PROGRAM으로 주입한 inference.py 파일 경로입니다.
S3에 업로드 했지만 sagemaker model wrapper 구현체에서 이를 docker image로 repacking 하게 되어있습니다. 자세한 내부 구현은 sagemaker sdk 코드를 보면 누구나 쉽게 이해할 수 있습니다.
# inference.py
아래는 테스트시 사용한 handler method 구현 예시입니다. decorator 부분은 테스트 디버깅을 위해 slack으로 stack trace를 받아 보는 용도로 무시하셔도 됩니다.
@watcher.report_error
def model_fn(model_dir):
"""Load the PyTorch model from the `model_dir` directory."""
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
return model_utils.load_model(
model_path=model_dir,
model_klass=vae_cf.VAE,
generator_klass=item2user.ExclusionGenerator,
device=device,
)
@watcher.report_error
def input_fn(
input_data,
content_type,
):
"""input_fn that can handle JSON formats.
Args:
input_data: the request payload serialized in the content_type format
content_type: the request content_type
Returns: input_data deserialized into torch.FloatTensor or torch.cuda.FloatTensor depending if cuda is available.
"""
input_data = json.loads(input_data) if isinstance(input_data, str) else input_data
logger.info(f"input_data: {input_data}")
assert _INPUT_ITEM_ID in input_data, f"input_data must have {_INPUT_ITEM_ID} key, but got {input_data.keys()}"
indices = meta_utils.convert_id_to_idx(input_data.get(_INPUT_ITEM_ID, []))
refined_input = torch_utils.multi_hot_encoding(
indices,
meta_utils.get_item_size(),
).unsqueeze(0)
return refined_input
@watcher.report_error
def predict_fn(input_object, model: model_base.ModelBase):
"""predict_fn for PyTorch. Calls a model on data deserialized in input_fn."""
output, _, _ = model(input_object)
if model.exclude_inputs_from_predictions:
output[input_object > 0] = -np.inf
topk_score, topk_idx = torch.topk(output, k=_INFERENCE_TOP_K)
topk_score, topk_idx = topk_score.detach().cpu().numpy().tolist(), topk_idx.detach().cpu().numpy().tolist()
# Flatten the batch dimension
topk_score, topk_idx = list(itertools.chain(*topk_score)), list(itertools.chain(*topk_idx))
predict_result = {
"item_id": meta_utils.convert_idx_to_id(topk_idx),
"score": topk_score,
}
return predict_result
@watcher.report_error
def output_fn(predictions, response_content_type):
"""Serializes predictions from predict_fn into JSON format."""
return json.dumps(predictions)
3. Model File
model registry에 등록된 model checkpoint 경로를 사용하면 됩니다. model registry에 대해서는 지난번 게시글을 참고해 주세요 :)
https://surgach.tistory.com/138
위 기반 작업이 끝나면 최종적으로 아래 sagemaker sdk 를 사용한 짧은 스크립트를 실행하여 모델 서버를 실행할 수 있습니다.
class Config(pydantic_settings.BaseSettings):
aws_region: str = pydantic.Field(default="us-east-1", env="AWS_REGION")
output_path: str = pydantic.Field(env="OUTPUT_PATH")
source_dir: str = pydantic.Field(env="SOURCE_DIR")
image_uri: str = pydantic.Field(env="IMAGE_URI")
...
inference_instance_type: str = pydantic.Field(default="ml.c4.2xlarge", env="INFERENCE_INSTANCE_TYPE")
inference_instance_count: int = pydantic.Field(default=1, env="INFERENCE_INSTANCE_COUNT")
pytorch_model_path: str = pydantic.Field(env="PYTORCH_MODEL_PATH")
wait: bool = pydantic.Field(default=True, env="WAIT")
...이하 생략...
class Config:
env_file = ".env.inference"
extra = "allow"
def main():
config = Config()
logger.info(
f"region: {config.aws_region}\n"
f"source_dir: {config.source_dir}\n"
f"image_uri: {config.image_uri}\n"
f"instance_type: {config.inference_instance_type}\n"
f"instance_count: {config.inference_instance_count}\n"
f"sample_inference_data_dir: {config.sample_inference_data_dir}\n"
f"sample_inference_meta_dir: {config.sample_inference_meta_dir}\n"
f"role_arn: {_SAGEMAKER_ROLE_ARN}\n"
f"pytorch_model_path: {config.pytorch_model_path}\n"
)
boto3_session = boto3.Session(region_name=config.aws_region)
sagemaker_session = sagemaker.Session(boto3_session)
model = sagemaker_model.Model(
image_uri=config.image_uri,
model_data=config.pytorch_model_path,
sagemaker_session=sagemaker_session,
env=...
role=_SAGEMAKER_ROLE_ARN,
entry_point=_ENTRY_POINT,
source_dir=f"{config.source_dir}/{_CODE_TAR}",
)
logger.info("Deploying model in real-time mode.")
model.deploy(
initial_instance_count=config.inference_instance_count,
instance_type=config.inference_instance_type,
wait=config.wait,
)
logger.info(
"Model deployed successfully."
"You can now use the endpoint to get inference from the model."
"Here is an example code to get inference from the model:"
)
logger.info(
"\nimport boto3\n"
f"boto3_session = boto3.Session(region_name='{config.aws_region}')\n"
f"sagemaker_runtime = boto3_session.client('sagemaker-runtime')\n"
f"endpoint_name = '{model.endpoint_name}'\n"
f"response = sagemaker_runtime.invoke_endpoint(\n"
f" EndpointName='{model.endpoint_name}',\n"
" ContentType='application/json',\n"
' Body="{"item_id": [your_item_id, ...]}\',\n'
")\n"
)
if __name__ == "__main__":
main()
Real-time inference..!?
마지막으로 모델 추론이 잘되나 테스트 해볼 차례입니다.
참고로 Body의 item_id/는 각 도서별 식별자로 학습 시 모델 Input으로 들어가기 전 indexing에 사용된 값을 마스킹한 것입니다.
import boto3
def main():
sagemaker_runtime = boto3.client("sagemaker-runtime", region_name={{ your region }})
endpoint_name = {{ your endpoint name }}
response = sagemaker_runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType='application/json',
Body='{"item_id": [item_id_1, item_id_1, item_id_3, ...]}',
)
print(response['Body'].read().decode('utf-8'))
if __name__ == "__main__":
main()
성공..! 의도한대로 추천 item id list와 score list가 return 되었습니다 :)
앞으로 가야할 길..
이렇게 SageMaker를 사용하여 모델을 배포하고 실시간으로 추천 결과를 받아볼 수 있는 환경을 간단하게 구성해 볼 수 있었습니다만, 실제 운영환경에 도입하기엔 넘어야 할 산이 많습니다.
개인적으로는 모델 추론 최적화 분야에 관심이 많은데요, 앞으로 시간이 날 때마다 사이드 프로젝트를 진행하고 과정에서 공부한 관련 최적화 방법론(Quantization, Layer and Tensor Fusion 등)들을 Deep dive 시리즈로 포스팅해 볼 계획입니다.
설레는 마음을 한편에 간직한 채, 구상 중인 사이드 프로젝트에 대한 간략히 소개로 글을 마치도록 하겠습니다 :)
최종 Goal은 Triton에서 Pytorch로 구현한 Bert4rec 모델의 추론 속도를 최적화해보는 것입니다
그러기 위해선 먼저 Pytorch를 TensorRT라는 중간표현으로 변환해볼 것입니다. TorchScript -> compiled model -> TensorRT 순서지만, 이를 간단하게 구현 가능하도록 TorchScript의 확장버전인 Torch-TensorRT 라이브러리를 Nvidia에서 제공하고 있습니다.
자세한 내용은 공식 가이드 문서.
최종적으로 낮아진 precision과 증가한 추론 속도를 비교해 보며 운영환경에 사용하기 위한 그 사이의 나름의 타협 지점을 고민해 볼 것입니다.
공부해야 할 게 산더미지만 무엇을 해야 할지 알기에 기분이 좋은 거 같습니다 :)
이상으로 글을 마치겠습니다.
'Data Engineering > MLOps' 카테고리의 다른 글
Bloom Filter 를 사용해봅시다 [2] Redis Bloom Filter feat. 추천시스템 (0) | 2024.06.17 |
---|---|
e-commerce 추천 시스템 고도화 하기 시리즈 [2] AWS SageMaker model registry (0) | 2024.06.01 |
e-commerce 추천 시스템 고도화 하기 시리즈 [1] feature store (0) | 2024.06.01 |
Nvidia Container Toolkit, Nvidia device plugin에 대해 알아봅시다. feat. CRI, CDI (0) | 2024.03.30 |