Hyesung Oh

ack-emr-containers controller&custom Helm chart를 이용한 Spark Structured Streaming 어플리케이션 배포 feat. ArgoCD 본문

Data Engineering/Apache Spark

ack-emr-containers controller&custom Helm chart를 이용한 Spark Structured Streaming 어플리케이션 배포 feat. ArgoCD

혜성 Hyesung 2023. 8. 13. 06:59
반응형

서론

배치성 Spark Job은 Airflow에서 trigger 하였지만 Streaming application은 이와 달리 클러스터를 상시 점유하고 있어 다음과 같은 요구사항을 가진다.

  1. 애플리케이션 배포, 상태를 선언적으로 관리할 수 있어야 한다.
  2. 하나 이상의 Kakfa Topic을 구독하는 애플리케이션(subscriber) 배포 시에 필요한 resource 정의 (코드)를 재활용할 수 있어야 한다.
  3. 개발자는 master에 merge 하면 코드는 자동으로 배포되어야 한다.

참고로 필자는 1을 만족하기 위해 ack-emrcontainers-controller를, 2를 위해 helm chart를, 3을 위해 argocd를 사용하였지만, 이는 각자 처한 상황에 맞게 다르게 가져갈 수 있다.

또한 아래 기술들에 대한 선수 지식을 필요로 하니 링크를 참고하면 도움이 될 거 같다.

선수지식: emr on eks, aws controller for k8s, helm chart template engine, argocd


ack-emrcontainers-controller 설치

자세한 설치법은 아래 문서를 참고하여 그대로 따라 하면 된다. 기본적인 EKS 개발 환경이 갖추어진 상태에서 진행하는 것을 가정하고 있기에 EKS cluster, RBAC, IRSA 등에 대한 과정은 생략하였다.

https://aws-controllers-k8s.github.io/community/docs/tutorials/emr-on-eks-example/

 

Run Spark jobs using the ACK EMR on EKS controller

ACK service controller for EMR on EKS enables customers to run spark jobs on EKS clusters

aws-controllers-k8s.github.io

순수히 ack-emr-containers만 helm chart로 배포하고자 한다면 필자가 사용한 아래 간단한 스크립트를 사용해도 된다.

# https://aws-controllers-k8s.github.io/community/docs/tutorials/emr-on-eks-example/#create-iam-identity-mapping

WORKSPACE_DIR=$(git rev-parse --show-toplevel)
PROJECT_DIR="$WORKSPACE_DIR/users/hyesung/data-streaming/ack-emrcontainers-controller"
SERVICE=emrcontainers
RELEASE_VERSION=$(curl -sL https://api.github.com/repos/aws-controllers-k8s/${SERVICE}-controller/releases/latest | jq -r '.tag_name | ltrimstr("v")')
ACK_SYSTEM_NAMESPACE=$1

echo "installing ack-$SERVICE-controller"
aws ecr-public get-login-password --region us-east-1 | helm registry login --username AWS --password-stdin public.ecr.aws
helm install -f ${PROJECT_DIR}/values-emrcontainers-controller.yaml --create-namespace -n $ACK_SYSTEM_NAMESPACE ack-$SERVICE-controller \
  oci://public.ecr.aws/aws-controllers-k8s/$SERVICE-chart --version=$RELEASE_VERSION
# ${PROJECT_DIR}/values-emrcontainers-controller.yaml
# https://github.com/aws-controllers-k8s/emrcontainers-controller/blob/main/helm/values.yaml
#
deployment:
  nodeSelector:
    kubernetes.io/arch: amd64
serviceAccount:
  annotations:
    eks.amazonaws.com/role-arn: arn:aws:iam::171943046871:role/eks-ridi-data-serviceaccounts
aws:
  region: ap-northeast-2

CRD 생성

이제 custom resource를 간단히 배포해 보자. 배포엔 kustomization을 사용한다.

VirtualCluster

---
apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: VirtualCluster
metadata:
  name: poc
spec:
  name: poc
  containerProvider:
    id: {{ your-eks-cluster-name }}
    type_: EKS
    info:
      eksInfo:
        namespace: {{ your-namespace }}

JobRun

아래는 tutorial 예시이며, Spark Structured Streaming 예시는 아래에서 다루겠다.

# ${PROJECT_DIR}/kustomization/jobruns.yaml

apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: JobRun
metadata:
  name: my-ack-jobrun
spec:
  name: my-ack-jobrun
  virtualClusterRef:
    from:
      name: poc
  executionRoleARN: {{ your-service-roll-account-arn }}
  releaseLabel: "emr-6.7.0-latest"
  jobDriver:
    sparkSubmitJobDriver:
      entryPoint: "local:///usr/lib/spark/examples/src/main/python/pi.py"
      entryPointArguments:
      sparkSubmitParameters: "--conf spark.executor.instances=2 --conf spark.executor.memory=1G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
  configurationOverrides: |
    ApplicationConfiguration: null
    MonitoringConfiguration:
      CloudWatchMonitoringConfiguration:
        LogGroupName: "/aws/eks/emr"
        LogStreamNamePrefix: "hyesung"
      PersistentAppUI: "ENABLED"
      S3MonitoringConfiguration:
        LogUri: "s3://hyesung/logs"

driver, executor pod 로그는 S3로 받도록 설정하였다. 여기서 주의할 것은, LogStreamNamePrefix를 설정하지 않으면 nill pointer에 대한 참조 에러가 난다. 이는 다음 코드에서 해당 config에 대한 nill check가 제대로 되어있지 않아 발생한 문제이며 추후 수정될 것으로 기대된다. 

RBAC

아래 권한이 있어야 aws emr-containers service에서 해당 k8s api server를 통해 namespace에 접근할 수 있게 된다. aws emr-containers service는 service-linked role을 사용하며, 해당 role에 대한 aws-config 설정은 eks cluster 설치 시 설정하였음을 가정한다. 참고로 필자의 경우 terraform eks module을 사용하여 배포하였다. 

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: emr-containers
  namespace: {{ your-namespace }}
rules:
  - apiGroups: [""]
    resources: ["namespaces"]
    verbs: ["get"]
  - apiGroups: [""]
    resources: ["serviceaccounts", "services", "configmaps", "events", "pods", "pods/log"]
    verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"]
  - apiGroups: [""]
    resources: ["secrets"]
    verbs: ["create", "patch", "delete", "watch"]
  - apiGroups: ["apps"]
    resources: ["statefulsets", "deployments"]
    verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
  - apiGroups: ["batch"]
    resources: ["jobs"]
    verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
  - apiGroups: ["extensions", "networking.k8s.io"]
    resources: ["ingresses"]
    verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
  - apiGroups: ["rbac.authorization.k8s.io"]
    resources: ["roles", "rolebindings"]
    verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"]
  - apiGroups: [""]
    resources: ["persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: emr-containers
  namespace: {{ your-namespace }}
subjects:
- kind: User
  name: emr-containers
  apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: Role
  name: emr-containers
  apiGroup: rbac.authorization.k8s.io

kustomization

# kustomization/kustomization.yaml

resources:
  - virtualclusters.yaml
  - jobruns.yaml
  - rbac.yaml

아래 명령어를 통해 배포하고 나면 EMR on EKS console과 k9s console 모두에서 확인할 수 있다.

kubectl apply -f kustomization/kustomization.yaml

 

Helm Chart 를 사용한 production 배포

이제 PoC를 마쳤으니 프로덕션 환경 배포를 위한 helm chart를 작성해 보자.
우선 프로젝트 경로에서 아래 명령어를 실행하여 Chart를 초기화한다.

$ helm create mychart

그러면 왼쪽과 같은 구조의 프로젝트가 생성된다. 그리고 templates 디렉토리 내 모든 파일을 지우고 values.yaml 파일 내용을 초기화해주면 준비 완료.

 

 

 

templates 폴더 아래에 위 kustomization 예시에서 사용한 두 개의 파일을 생성한다. 다만, 이때는 template + values 랜더링을 통해 코드를 재사용하여 복수의 application을 배포할 것이기에 helm chart template을 사용한 차이점이 있다. kustomization에서는 이와 유사한 요구사항을 patch 를 사용하여 해결 할 수 있다. 

완성된 상태의 디렉토리 계층과 내용물

values.yaml

기본적으로 사용할 values.yaml 값이다. 우선 모두 default null로 설정하고, 각 환경별 values-{{site}}.yaml 에서 이를 명시하여 사용하도록 했다.

# Default values for data-streaming.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.


image:
  repository: 801714584815.dkr.ecr.ap-northeast-2.amazonaws.com/data-etl
  tag: ""

env: ""
emrReleaseLabel: "emr-6.10.0-20230421"  # refer to https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-6x.html
emrBucket: ""
s3DeploymentURI: ""
s3LogURI: ""

aws:
  region: ""

virtualClusters:
  - containerProvider:
      id: ""
    name: ""
    namespace: ""
    targetNamespace: ""
  - containerProvider:
      id: ""
    name: ""
    namespace: ""
    targetNamespace: ""

jobRuns:
  - name: ""  # must be less than 64 characters
    appFile: ""
    tableName: ""
    spark:
      driver:
        cores: ""
        memory: ""
        instanceType: ""
        envs: []
        secrets: []
      executor:
        cores: ""
        memory: ""
        numInstances: ""
        instanceType: ""
jobRunExecutionRoleARN: ""

jars: ""  # comma separated list of jars to be added to the classpath of the driver and executor

volume:
  enabled: false

prometheus:
  enabled: false

templates/virtualclusters.yaml

{{- range .Values.virtualClusters }}
---
apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: VirtualCluster
metadata:
  name: {{ .name }}
  namespace: {{ default "default" $.Release.Namespace }}
spec:
  name: {{ .name }}
  containerProvider:
    id: {{ .containerProvider.id }}
    type_: EKS
    info:
      eksInfo:
        namespace: {{ .targetNamespace }}
{{- end }}

templates/jobruns.yaml

entrypoint로 사용할 python file은 remote storage인 s3에 두고 사용하고 있다. emr-containers는 job-submitter, drvier-pod, executor-pod 3가지 컴포넌트를 실행한다. 이때 job-submitter는 원격 파일을 로컬로 다운받아 client 모드로 spark-submit을 하는 역할을 한다. 다음 영상을 참고하면 도움이 된다.

{{- $remoteDeploymentURI := .Values.s3DeploymentURI }}
{{- $region := .Values.aws.region }}
{{- range .Values.jobRuns }}
---
apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: JobRun
metadata:
  name: {{ .name }}
  namespace: {{ default "default" $.Release.Namespace }}
spec:
  name: {{ .name }}
  virtualClusterRef:
    from:
      name: {{ .virtualClusterName | quote }}
  executionRoleARN: {{ $.Values.jobRunExecutionRoleARN | quote }}
  releaseLabel: {{ $.Values.emrReleaseLabel | quote }}
  jobDriver:
    sparkSubmitJobDriver:
      {{- if .appFile }}
      entryPoint: "{{ $remoteDeploymentURI }}/{{ .appFile }}"
      {{- else }}
      entryPoint: "{{ $remoteDeploymentURI }}/{{ my-default-py-file(지워도 무방) }}"
      {{- end }}
      entryPointArguments:
        - "--database"
        - "stage"
        - "--table-name"
        - {{ .tableName | quote }}
      sparkSubmitParameters: "--py-files {{ $remoteDeploymentURI }}/src.zip --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --jars https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.1/delta-core_2.12-2.1.1.jar,https://repo1.maven.org/maven2/io/delta/delta-storage/2.1.1/delta-storage-2.1.1.jar,https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.0/spark-avro_2.12-3.3.0.jar --conf spark.driver.cores={{ .spark.driver.cores | default "1" }} --conf spark.driver.memory={{ .spark.driver.memory | default "1g" }} --conf spark.executor.cores={{ .spark.executor.cores | default "1" }} --conf spark.executor.memory={{ .spark.executor.memory | default "1g" }} --conf spark.executor.instances={{ .spark.executor.numInstances | default "1" }} --conf spark.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.hive.metastore.glue.catalogid=697122891294 --conf spark.hadoop.fs.s3.canned.acl=BucketOwnerFullControl"
  configurationOverrides: |
    ApplicationConfiguration:
      - Classification: spark-defaults
        Properties:
          spark.dynamicAllocation.enabled: "false"
          spark.executor.processTreeMetrics.enabled: "true"
          spark.kubernetes.container.image: "{{ $.Values.image.repository }}:{{ $.Values.image.tag }}"
          spark.kubernetes.driver.annotation.prometheus.io/path: "/metrics/executors/prometheus/"
          spark.kubernetes.driver.annotation.prometheus.io/port: "4040"
          spark.kubernetes.driver.annotation.prometheus.io/scrape: "true"
          spark.kubernetes.driver.node.selector.karpenter.sh/capacity-type: "on-demand"
          spark.kubernetes.driver.node.selector.node.kubernetes.io/instance-type: {{ .spark.driver.instanceType | default "c6g.xlarge" | quote }}
          {{- range $secretKey, $valueFrom := .spark.driver.secrets }}
          spark.kubernetes.driver.secretKeyRef.{{ $secretKey }}: {{ $valueFrom | quote }}
          {{- end }}
          {{- range $envName, $value := .spark.driver.envs }}
          spark.kubernetes.driverEnv.{{ $envName }}: {{ $value | quote }}
          {{- end }}
          spark.kubernetes.driverEnv.SITE: {{ $.Values.env | quote }}
          spark.kubernetes.executor.node.selector.karpenter.sh/capacity-type: "spot"
          spark.kubernetes.executor.node.selector.node.kubernetes.io/instance-type: {{ .spark.executor.instanceType | default "c6g.xlarge" | quote }}
          spark.kubernetes.executor.podNamePrefix: {{ .name | quote }}
          {{- if $.Values.volume.enabled }}
          spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.mount.path: "/var/data/spill"
          spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.mount.readOnly: "false"
          spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.options.claimName: "OnDemand"
          spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.options.sizeLimit: {{ $.Values.volume.size | quote }}
          spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.options.storageClass: {{ $.Values.volume.storageClass | quote }}
          {{- end }}
          spark.kubernetes.node.selector.kubernetes.io/arch: "arm64"
          spark.kubernetes.node.selector.kubernetes.io/os: "linux"
          spark.kubernetes.node.selector.topology.kubernetes.io/zone: {{ $region | quote }}
          {{- if $.Values.prometheus.enabled }}
          spark.metrics.conf.*.sink.prometheusServlet.class: "org.apache.spark.metrics.sink.PrometheusServlet"
          spark.metrics.conf.*.sink.prometheusServlet.path: "/metrics/driver/prometheus/"
          spark.metrics.conf.applications.sink.prometheusServlet.path: "/metrics/applications/prometheus/"
          spark.metrics.conf.master.sink.prometheusServlet.path: "/metrics/master/prometheus/"
          spark.ui.prometheus.enabled: {{ $.Values.prometheus.enabled | quote }}
          {{- end }}
      - Classification: emr-job-submitter
        Properties:
          jobsubmitter.node.selector.karpenter.sh/capacity-type: "on-demand"
          jobsubmitter.node.selector.node.kubernetes.io/instance-type: "c6g.xlarge"
          jobsubmitter.node.selector.topology.kubernetes.io/zone: {{ $region | quote }}
    MonitoringConfiguration:
      CloudWatchMonitoringConfiguration:
        LogGroupName: "/aws/eks/emr"
        LogStreamNamePrefix: "hyesung"
      PersistentAppUI: "ENABLED"
      S3MonitoringConfiguration:
        LogUri: {{ $.Values.s3LogURI | quote }}
{{- end }}

실제 팀에선 common 이라는 name의 chart를 정의하고 이를 subchart로 다운받아 사용하고 있다. 해당 Chart는 name, namespace에 대한 규격화, 공통 annotation 및 label 를 정의하는 목적으로 사용하고 있다. 아래는 간단한 예시이다.

# mychart/Chart.yaml

apiVersion: v2
appVersion: 0.1.0
description: A Helm chart for Kubernetes
name: poc
type: application
version: 0.1.0
dependencies:
  - name: common
    version: 1.x.x
    repository: file://../common
# templates/jobruns.yaml

apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: JobRun
metadata:
  name: {{ printf "%s-%s" (include "common.names.deploymentName" $) (.tableName) | trunc 63 | replace "_" "-" | trimSuffix "-" }}
  namespace: {{ include "common.names.namespace" $ | quote }}
  labels: {{- include "common.labels.standard" $ | nindent 4 }}
    {{- if $.Values.commonLabels }}
    {{- include "common.tplvalues.render" ( dict "value" $.Values.commonLabels "context" $ ) | nindent 4 }}
    {{- end }}
  {{- if $.Values.commonAnnotations }}
  annotations: {{- include "common.tplvalues.render" ( dict "value" $.Values.commonAnnotations "context" $ ) | nindent 4 }}
  {{- end }}
  ... 이하 생략 ..

이상으로 ack-emrcontainers-controller를 배포하고 helm chart를 정의하여 eks cluster에 spark structured streaming application을 배포하는 방법에 대해 다뤄보았다. 배포한 애플리케이션은 datadog, prometheus 등을 통해 메트릭을 수집하여 grafana로 모니터링하고 있고, 각 플랫폼별 alert manager 등을 사용하여 slack으로 alert를 받아보고 있다.

+ CD로 사용한 argocd는 각자 다른 방식을 사용할 수 있다 생각하여 생략하였고(e.g. gitops repo에 merge되면 github actions에서 apply를 한다던가..?), 소스코드는 개인 github에 정리하여 올릴 예정입니다.

긴 글 읽어주셔서 감사합니다.

반응형
Comments