Hyesung Oh

e-commerce 추천 시스템 고도화 하기 시리즈 [2] AWS SageMaker model registry 본문

Data Engineering/MLOps

e-commerce 추천 시스템 고도화 하기 시리즈 [2] AWS SageMaker model registry

혜성 Hyesung 2024. 6. 1. 20:45
반응형

서론

추천 시스템을 고도화하기 마음먹은 과정과 feature store 도입기에 대한 짧은 소개에 이어서, 이번 편에서는 본격적으로 파이프라인에 필요했던 개선사항들을 적용한 내용들을 정리해보려 합니다.

https://surgach.tistory.com/137

 

e-commerce 추천 시스템 고도화 하기 시리즈 [1] feature store

서론실무에서 머신러닝을 활용하는 도메인 중에서, 추천 도메인의 경우 대게 실시간성보다는 배치 파이프라인 만으로 요구사항을 충분히 만족시킬 수 있는 거 같습니다. 현재 팀에서 운영 중

surgach.tistory.com

Managed MLOps Platform에 올라탈 결심

MLOps를 위한 Open Source Tool들이 다양했지만, 최근에는 모델 실험 및 버전 관리를 위한 MlFlow, 모델 배포&서빙 및 모니터링을 위한 BentoML 등 업계 표준으로 어느 정도 자리 잡은 기술들도 하나 둘 보이는 거 같습니다. 하지만 각 기술에는 장단점이 있고, 부족한 부분을 개선한 새로운 tool들이 혜성 같이 등장하면서 늘 그래왔듯 자연스레 유행은 바뀌기 마련입니다.

더욱이 비교적 압도적으로 성숙한 기술이 존재하지 않는 MLOps 역사 그리고 새로운 기술을 도입하기 위한 PoC, 도입 후 추가 기능 등을 위한 새로운 기술로의 migration 등의 비용을 생각했을 때 배보다 배꼽이 크다 판단했습니다. 그리고 최소한의 비용으로 최대한의 비즈니스 임팩트를 만들기 위해 기술 보단 현재 필요한 요구사항에 더욱 집중하기로 했습니다.

1. 모델 학습,검증 배포 단계에서 target metric 가시성 확보
2. 모델 및 버전 관리
3. 학습과 추론 분리
4. 실시간 추천 위한 모델 서버 구축

위 요구사항을 최소한의 비용으로 충족 가능한 툴들을 리서치하였고, Cloud Vendor 중 저희가 사용하는 AWS에서 제공하는 Managed MLOps platform인 SageMaker를 일부 활용하여 원하는 기능을 모두 가져갈 수 있었습니다.

본론

요구사항 1. 모델 학습,검증 배포 단계에서 target metric 가시성 확보

Airflow에서 스케줄링된 Model training job은 학습 성공 시 별도로 metric reporting을 하고 있지 않았습니다. 해당 기능은 SageMaker Experiment과 PyTorch Module에서 제공하는 training, validation step, epoch cycle마다 제공하는 hook을 이용해 간단히 구현할 수 있었습니다. 

필요한 metric은 train, validation 두 단계에 있습니다. 
1. training, validation step 마다 metric을 수집
2. 매 epoch가 끝날 때 마다 metric을 remote storage에 저장

class _MetricLoggingMixin(abc.ABC):
    last_checkpoint_metric = {}

    @abc.abstractmethod
    def _collect_metric_on_train_step(self, metrics: dict[str, Any]) -> None:
        """Collect metrics on train step.
        Must be called in the training_step method.
        """

    @abc.abstractmethod
    def _log_metric_on_train_epoch_end(self) -> None:
        """Log metrics on train epoch end."""

    @abc.abstractmethod
    def _collect_metric_on_validation_step(self, metrics: dict[str, Any]) -> None:
        """Collect metrics on validation step.
        Must be called in the validation_step method.
        """

    @abc.abstractmethod
    def _log_metric_on_validation_epoch_end(self) -> None:
        """Log metrics on validation epoch end."""

_log_metric_* method의 경우 모든 모델에 대해 epoch마다 호출하면 되기 때문에 아래와 같이 on_train_epoch_end, on_validation_epoch_end method를 구현한 Class를 미리 만들어두고 이를 상속받도록 하였습니다.

class RecsysLoggingModelBase(RecsysModelBase, _ModelLoggingMixin):
    def on_train_epoch_end(self):
        self._log_metric_on_train_epoch_end()

    def on_validation_epoch_end(self):
        self._log_metric_on_validation_epoch_end()

아래는 사용 중인 개인화 추천 모델 중 하나인 VAE_CF(Variational Autoencoders for Collaborative Filtering) model 의 train과 validation 단계 구현 예시입니다. training_step에선 loss/train metric을 수집하고 validation_step에선 ndcg metric을 수집하도록 했습니다. 수집된 metric은 각 epoch마다 원격으로 수집되고 다시 dictionary를 초기화해줍니다. 

    def training_step(self, batch_data: dict[str, torch.Tensor], batch_idx: int):  # pylint: disable=arguments-differ
        batch = batch_data[self._inputs_col]
        recon_batch, mu, logvar = self(batch)
        loss = self.loss(recon_batch, batch_data[self._targets_col], mu, logvar)
        self.log("loss/train", loss, on_step=False, on_epoch=True, sync_dist=True)
        self._global_steps += 1
        self._collect_metric_on_train_step({"loss/train": loss})
        return loss

    def validation_step(self, batch_data: dict[str, torch.Tensor], batch_idx: int):  # pylint: disable=arguments-differ
        batch = batch_data[self._inputs_col]
        recon_batch, mu, logvar = self(batch)
        loss = self.loss(recon_batch, batch, mu, logvar)
        self.log("loss/dev", loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist=True)
        self._collect_metric_on_validation_step({"loss/dev": loss})

        metrics_ndcg = {}
        metrics_recall = {}
        for k in [10, 20, 100]:
            ndcg_metric = f"metrics/ndcg@{k}"
            recall_metric = f"metrics/recall@{k}"

            metrics_ndcg[ndcg_metric] = ndcg.normalized_dcg_at_k(batch_data[_TARGETS_COL], recon_batch, k=k)
            metrics_recall[recall_metric] = recall.recall_at_k(batch_data[_TARGETS_COL], recon_batch, k=k)
            self._collect_metric_on_validation_step(
                {f"{ndcg_metric}": metrics_ndcg[ndcg_metric], f"{recall_metric}": metrics_recall[recall_metric]}
            )

또한 학습이 완료되었을 때의 최종 metric은 slack으로 reporting 받도록 구현하였습니다. _MetricLoggingMixin class의 맴버 딕셔너리인 last_checkpoint_metric은 학습이 완료될 때까지 초기화하지 않고 매 validation_step마다 metric을 update 해줍니다.

    def _log_metric_on_validation_epoch_end(self, **kwargs) -> None:
        for key, value in self.validation_step_outputs.items():
            epoch_mean = value / self.validation_steps_per_epoch
            self.run.log_metric(key, epoch_mean, step=self.current_epoch)
            self.last_checkpoint_metric.update({key: epoch_mean})
        self.validation_step_outputs.clear()

그리고 model.fit method를 호출하는 Trainer class에서 last_checkpoint_metric를 참조하여 slack report 함수를 호출하게 됩니다.

def _report_last_checkpoint_metric(func):
    def wrapper(self, *args, **kwargs):
        # pylint: disable=protected-access
        func(self, *args, **kwargs)
        arguments = {
            "model": self._model.get_name(),
            "checkpoint_path": self._config.checkpoints[0].dirpath,
        }
        arguments.update(self._model.hparams)
        if hasattr(self._model, "last_checkpoint_metric"):
            arguments.update(self._model.last_checkpoint_metric)
            self._model.last_checkpoint_metric.clear()
        arguments_str = "\n".join([f"*{k.capitalize()}*: `{v}`" for k, v in arguments.items()])

        message = {
            "blocks": [
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": "*Training Job Done*",
                    },
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"{arguments_str}",
                    },
                },
            ],
        }

        slack.send_report(message)

    return wrapper
    
@gin.configurable
class TrainerBase(abc.ABC):
    def __init__(
        ...
    ):
        ...


    @_sagemaker_checkpoint
    @_report_last_checkpoint_metric
    def train(self):
        with sagemaker_utils.init_run() as run:
            self._model.run = run
            run.log_parameters(
                {
                    "model": sagemaker_utils.guard_none(self._model.hparams),
                    "train_config": sagemaker_utils.dataclass_to_dict(self._config),
                },
            )
            _export_train_config(self._model, self._config)

            self._preprocess()

            train_data_loader = td.DataLoader(
                ...
            )
            ...

            trainer = pl.Trainer(
                max_epochs=self._config.max_epoch,
                min_epochs=self._config.min_epoch,
                accelerator=self._config.accelerator,
                devices=self._config.devices if self._config.accelerator != "cpu" else "auto",
                logger=loggers.FastTensorboardLogger(self._config.log_dir, version=self._log_version),
                ...
            )
            self._trainer = trainer

            trainer.fit(self._model, train_data_loader, dev_data_loader)

            self._postprocess()

사실 위 기능은 개인별 실험환경에서만 사용하고 있는데요, sagemaker_utils.init_run method를 잠시 살펴보면

def init_run(region="us-east-1") -> sagemaker_run.Run | Run:
    exp_name = os.environ.get("EXP_NAME")
    if exp_name:
        session = sagemaker_session.Session(boto3.session.Session(region_name=region))
        return sagemaker_run.load_run(sagemaker_session=session)
    return Run()

환경변수로 주입받은 EXP_NAME이 있으면 sagemaker_run의 Run class를 load 하고(launch 시에 주입한 experiment_config를 참조), 아니라면 dummy class Run 객체를 반환합니다. 현재 이렇게 구성한 이유는, production 환경의 모델의 경우 SageMaker에서 제공하는 Tensorboard 기능만으로도 필요한 기능을 다 제공받을 수 있었기 때문입니다.

참고로 SageMaker container에 아래와 같이 /opt/ml/output/ 경로에 tensorboard logging 경로를 지정하게 되면 학습이 종료되고 container가 내려가기전 해당 로그 데이터를 s3에 save 하고 tensorboard에 job을 등록하는 과정을 sagemaker에서 추상화해 두어서 간편하게 이용가능합니다. 

_RESERVED_SAGEMAKER_OUTPUT_LOCAL_PATH = "/opt/ml/output"
_RESERVED_SAGEMAKER_TENSORBOARD_OUTPUT_LOCAL_PATH = f"{_RESERVED_SAGEMAKER_OUTPUT_LOCAL_PATH}/tensorboard"
tensorboard_output_config = debugger.TensorBoardOutputConfig(
    s3_output_path=_SAGEMAKER_MODEL_OUTPUT_S3_PATH,
    container_local_output_path=_RESERVED_SAGEMAKER_TENSORBOARD_OUTPUT_LOCAL_PATH,
)

 with exp_run.Run(
    experiment_name=config.exp_name,
    run_name=datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f"),
    sagemaker_session=sagemaker_session,
    ):
        est = estimator.Estimator(
            image_uri=config.image_uri,
            source_dir=f"{config.source_dir}/{_CODE_TAR}",
            environment=env_vars,
            entry_point=_ENTRY_POINT,
            role=_SAGEMAKER_ROLE_ARN,
            instance_count=config.instance_count,
            instance_type=config.instance_type,
            output_path=_SAGEMAKER_MODEL_OUTPUT_S3_PATH,
            sagemaker_session=sagemaker_session,
            tensorboard_output_config=tensorboard_output_config,
        )

        est.fit(
            wait=config.wait,
        )

SageMaker studio Experiment 에서 실험내역들의 세부 내용 및 export한 metric 들을 확인 할 수 있습니다.

또한 모든 실험내역을 모아서 시각화 하는 기능도 제공하니 fine tuning 시에 매우 유용합니다.

요구사항 2. 모델 버전 관리

설명하기에 앞서 팀에서 Airflow를 사용하고 있는 패턴에 대해서 먼저 간략히 소개하겠습니다. Task 실행에 필요한 의존성을 Job class에, Job의 정보를 불러와 각 workload를 순수하게 실행하는 역할을 Operator class에 위임하여 각 역할을 분리했습니다. 이렇게 한 이유는, Operator class의 경우 Airflow specific 한 component라서, 추후 다른 workflow management tool로의 이관 가능성을 고려하여 최대한 모든 구현을 저희가 직접 정의한 Job class에 추상화하여 이관 비용을 줄이기 위함입니다. 실제 Airflow를 사용하기 전 Luigi를 사용했었는데, 모든 구현이 Luigi에서 제공하는 Class를 상속받아 구현하였다 보니, Airflow로 이관하는 과정에서 바뀐 구조에 맞게 코드 구조를 새로 설계해야 했습니다.

각설하고, 아래는 ModelRegister에 필요한 의존성을 구현한 ModelRegisterJob class의 주요 부분입니다.

@final
class ModelRegisterJob(base.Job):
    def __init__(
        self,
        model_package_group: str,
        model_checkpoint_path: str | None = None,
        model_approval: sagemaker.ApprovalStatus = sagemaker.ApprovalStatus.APPROVED,
        aws_conn_id: str = setting.eks.EKS_ML_CLUSTER_AWS_CONN_ID,
        inference_image_parameter_store_key: str = setting.ML_INFERENCE_IMAGE_TAG_SSM_KEY,
        region: str | None = setting.region.N_VIRGINIA,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self._model_package_group = model_package_group
        self._model_checkpoint_path = model_checkpoint_path
        self._model_approval = model_approval
        self._inference_image_parameter_store_key = inference_image_parameter_store_key
        self._aws_conn_id = aws_conn_id
        self._region = region

    @property
    @override
    def job_id(self):
        return f"register_model_version_{self.model_package_group}"
	
    ...

    @property
    def _release_id(self):
        return self.config_store.get_parameter(
            alias=self._inference_image_parameter_store_key,
            role_arn=self._assume_role_arn,
            region=self._region,
        )

    @property
    def _inference_image_uri(self) -> str:
        """Return ECR URI for the inference image."""

        return f"{setting.registry.INFRA_N_VIRGINIA_REGISTRY}/{setting.data_project.ML_PROJECT_ECR_REPO}:{self._release_id}"

    @property
    def aws_conn_id(self):
        return self._aws_conn_id

    @property
    def model_package_group(self) -> str:
        return self._model_package_group

    @property
    def _best_model_url(self) -> str:
        ckpt = self._model_checkpoint_path or str(self.ctx["ti"].xcom_pull(key=ml_models.MODEL_CHECKPOINT_PATH))
        assert ckpt, "model_checkpoint_path is required for model registering."

        assert not ckpt.endswith(".pth") or not ckpt.endswith(
            "ckpt"
        ), f"model_checkpoint_path should be directory containing {ml_models.BEST_MODEL_CHECKPOINT_FILE}"

        best_model = os.path.join(ckpt, ml_models.BEST_MODEL_CHECKPOINT_FILE)
        return best_model

    @property
    def model_url(self) -> str:
        return self._best_model_url

    @property
    def model_approval(self) -> sagemaker.ApprovalStatus:
        return self._model_approval

    @property
    def inference_image_uri(self) -> str:
        return self._inference_image_uri

    @override
    def run(self, context):
        self.ctx = context

가장 핵심인 _best_model_url만 보면, 아래 정보를 외부로부터 가져오고 있습니다.
1. image_uri: training container에 사용된 docker image uri
2. model_checkpoint_path: training model의 ckpt path.

image_uri는 외부 저장소인 config store에서 불러오기 때문에 connection을 맺습니다. 따라서 DAG parsing interval 마다 connection이 생성되는 것을 방지하기 위해 Operator.execute method가 호출되는 시점에만 실행되도록 합니다.

model_checkpoint_path의 경우 upstream인 training job에서 xcom_push 해준 정보를 불러옵니다.

    @override
    def _xcom_push(self):
        logger.info("Pushing model_checkpoint_path: %s to the next task.", self._base_gin_params.MODEL_CHECKPOINT_PATH)
        self.ctx["ti"].xcom_push(key=ml_models.MODEL_CHECKPOINT_PATH, value=self._base_gin_params.MODEL_CHECKPOINT_PATH)

ModelRegisterJob을 주입받아 모델 등록을 실행하는 데에는 airflow aws provider에서 제공하는 sagemaker operator를 사용했습니다.

from airflow.providers.amazon.aws.operators import sagemaker
...

class RidiSageMakerRegisterModelVersionOperator(sagemaker.SageMakerRegisterModelVersionOperator):
    def __init__(self, job: base_job.ModelRegisterJob, **kwargs):
        super().__init__(
            task_id=job.job_id,
            # will be overwritten in execute()
            image_uri="",
            model_url="",
            package_group_name=job.model_package_group,
            model_approval=job.model_approval,
            aws_conn_id=job.aws_conn_id,
            **kwargs,
        )
        self._job = job

    def execute(self, context):
        self._job.run(context)
        self.image_uri = self._job.inference_image_uri
        self.model_url = self._job.model_url
        super().execute(context)

원격으로 불러와야 하는 image_uri, model_url(=model_checkpoint_path)를 execute에서 갱신해 줍니다.

이렇게 등록된 model은 default Approved 상태로 등록되게 됩니다. default Approved로 설정한 이유는 우선은 신규 학습이 된 모델은 대게 더 좋은 성능을 보여주고 있기 때문입니다. 


요구사항 3. 학습과 추론을 분리

기존에는 아래와 같이 TrainJob과 PredictJob(inference)에 정적으로 동일한 정보를 주입했습니다. 따라서 둘은 같은 checkpoint path를 바라보게 되었는데요, 

TRAINER_JOB = job.EksPodMlPartitionJob(
    job_id=f"train_{_MODEL}",
    config_file=utils.get_model_runner_gin_path(_MODEL),
    input_table=...,
    output_table=...,
    base_params=ml_base_config.BaseGinParam(
        TRAIN=True,
        PREDICT=False,
        MODEL_NAME=_MODEL,
        ...
    ),
    extra_params=configs.MODEL_CONFIG,
    ...
)

PREDICT_JOB = job.EksPodMlPartitionJob(
    job_id=f"predict_{_MODEL}",
    pod_name=utils.get_pod_name(utils.TaskType.PREDICT, utils.StageType.MODEL, _MODEL),
    config_file=utils.get_model_runner_gin_path(_MODEL),
    input_table=...,
    output_table=...,
    base_params=ml_base_config.BaseGinParam(
        TRAIN=False,
        PREDICT=True,
        MODEL_NAME=_MODEL,
        ...
    ),
    extra_params=configs.MODEL_CONFIG.from_eks_spec(configs.MODEL_SPEC),
    ...
)

PREDICT=True일 경우 아래와 같이 model checkpoint path를 주입한 데이터기반으로 합성하는 것이 아니라, model registry에서 가장 최신의 approved model을 불러와 사용하는 방식으로 변경하였습니다.

@property
def _reconciled_ckpt(self):
    if self._base_params.TRAIN:
        ckpt = self._base_params.MODEL_CHECKPOINT_PATH or self.ctx["ti"].xcom_pull(key=ml_models.MODEL_CHECKPOINT_PATH)
        assert ckpt, "model_checkpoint_path should be set from either arguments or xcom fro training."
    elif self._base_params.PREDICT:
        ckpt = self._base_params.MODEL_CHECKPOINT_PATH or job_utils.fetch_model_checkpoint(
            model_package_name=self._base_params.MODEL_PACKAGE_GROUP,
            region=self.region,
            role_arn=self._assume_role_arn,
        )
        assert ckpt, "model_checkpoint_path should be set from either arguments or model registry."
    else:
        raise ValueError("Either is_train or is_predict should be True.")

    if ckpt.endswith(".pth") or ckpt.endswith("ckpt"):
        ckpt = os.path.dirname(ckpt)

    logger.info("checkpoint path: %s", ckpt)
    return ckpt
    
... # utils.py
def fetch_model_checkpoint(model_package_name: str, region: str, role_arn: str) -> str | None:
    sm_client = boto3_conn.get_boto3_conn(region_name=region, role_arn=role_arn).sagemaker

    logger.info("Fetching model versions for model package group: %s", model_package_name)
    response = sm_client.list_model_packages(
        ModelPackageGroupName=model_package_name,
        ModelPackageType="Versioned",
        ModelApprovalStatus="Approved",
    )

    packages = response["ModelPackageSummaryList"]
    if not packages:
        logger.error("No approved model versions found for the specified model package group.")
        return None

    latest_approved = max(packages, key=lambda x: x["CreationTime"])["ModelPackageArn"]

    logger.info("Fetching latest approved checkpoint for model version: %s", latest_approved)
    response = sm_client.describe_model_package(ModelPackageName=latest_approved)

    # model cannot be created without specifying a model data url.
    return response["InferenceSpecification"]["Containers"][0]["ModelDataUrl"]

 

이를 통해 배포된 최신 모델의 status를 approved -> reject로 변경하게 되면 직전 모델을 사용할 수 있게 되었고 이는 현재 수준에서 필요한 롤백 기능으로 충분하다 생각했습니다. 결과적으로 학습과 추론을 분리하여 언제든지 원하는 checkpoint path를 사용할 수 있게 되었습니다.

요구사항 4. 필요시 실시간 추천 모델 서버 구축

이는 다음 게시글에서 다루도록 하겠습니다 :).

 

감사합니다. 

반응형
Comments