일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 빅데이터플랫폼
- Data engineering
- Spark structured streaming
- AWS SageMaker
- Python
- recommendation system
- 빅데이터
- DataEngineering
- 하둡에코시스템
- redis bloom filter
- BigData
- 개발자
- dataengineer
- 개발자혜성
- cloudera
- 하둡
- pyspark
- apache spark
- 데이터엔지니어링
- mlops
- 데이터엔지니어
- 클라우데라
- hadoop
- Terraform
- spark
- 블로그
- kafka
- 추천시스템
- eks
- kubernetes
- Today
- Total
Hyesung Oh
ack-emr-containers controller&custom Helm chart를 이용한 Spark Structured Streaming 어플리케이션 배포 feat. ArgoCD 본문
ack-emr-containers controller&custom Helm chart를 이용한 Spark Structured Streaming 어플리케이션 배포 feat. ArgoCD
혜성 Hyesung 2023. 8. 13. 06:59서론
배치성 Spark Job은 Airflow에서 trigger 하였지만 Streaming application은 이와 달리 클러스터를 상시 점유하고 있어 다음과 같은 요구사항을 가진다.
- 애플리케이션 배포, 상태를 선언적으로 관리할 수 있어야 한다.
- 하나 이상의 Kakfa Topic을 구독하는 애플리케이션(subscriber) 배포 시에 필요한 resource 정의 (코드)를 재활용할 수 있어야 한다.
- 개발자는 master에 merge 하면 코드는 자동으로 배포되어야 한다.
참고로 필자는 1을 만족하기 위해 ack-emrcontainers-controller를, 2를 위해 helm chart를, 3을 위해 argocd를 사용하였지만, 이는 각자 처한 상황에 맞게 다르게 가져갈 수 있다.
또한 아래 기술들에 대한 선수 지식을 필요로 하니 링크를 참고하면 도움이 될 거 같다.
선수지식: emr on eks, aws controller for k8s, helm chart template engine, argocd
ack-emrcontainers-controller 설치
자세한 설치법은 아래 문서를 참고하여 그대로 따라 하면 된다. 기본적인 EKS 개발 환경이 갖추어진 상태에서 진행하는 것을 가정하고 있기에 EKS cluster, RBAC, IRSA 등에 대한 과정은 생략하였다.
https://aws-controllers-k8s.github.io/community/docs/tutorials/emr-on-eks-example/
순수히 ack-emr-containers만 helm chart로 배포하고자 한다면 필자가 사용한 아래 간단한 스크립트를 사용해도 된다.
# https://aws-controllers-k8s.github.io/community/docs/tutorials/emr-on-eks-example/#create-iam-identity-mapping
WORKSPACE_DIR=$(git rev-parse --show-toplevel)
PROJECT_DIR="$WORKSPACE_DIR/users/hyesung/data-streaming/ack-emrcontainers-controller"
SERVICE=emrcontainers
RELEASE_VERSION=$(curl -sL https://api.github.com/repos/aws-controllers-k8s/${SERVICE}-controller/releases/latest | jq -r '.tag_name | ltrimstr("v")')
ACK_SYSTEM_NAMESPACE=$1
echo "installing ack-$SERVICE-controller"
aws ecr-public get-login-password --region us-east-1 | helm registry login --username AWS --password-stdin public.ecr.aws
helm install -f ${PROJECT_DIR}/values-emrcontainers-controller.yaml --create-namespace -n $ACK_SYSTEM_NAMESPACE ack-$SERVICE-controller \
oci://public.ecr.aws/aws-controllers-k8s/$SERVICE-chart --version=$RELEASE_VERSION
# ${PROJECT_DIR}/values-emrcontainers-controller.yaml
# https://github.com/aws-controllers-k8s/emrcontainers-controller/blob/main/helm/values.yaml
#
deployment:
nodeSelector:
kubernetes.io/arch: amd64
serviceAccount:
annotations:
eks.amazonaws.com/role-arn: arn:aws:iam::171943046871:role/eks-ridi-data-serviceaccounts
aws:
region: ap-northeast-2
CRD 생성
이제 custom resource를 간단히 배포해 보자. 배포엔 kustomization을 사용한다.
VirtualCluster
---
apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: VirtualCluster
metadata:
name: poc
spec:
name: poc
containerProvider:
id: {{ your-eks-cluster-name }}
type_: EKS
info:
eksInfo:
namespace: {{ your-namespace }}
JobRun
아래는 tutorial 예시이며, Spark Structured Streaming 예시는 아래에서 다루겠다.
# ${PROJECT_DIR}/kustomization/jobruns.yaml
apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: JobRun
metadata:
name: my-ack-jobrun
spec:
name: my-ack-jobrun
virtualClusterRef:
from:
name: poc
executionRoleARN: {{ your-service-roll-account-arn }}
releaseLabel: "emr-6.7.0-latest"
jobDriver:
sparkSubmitJobDriver:
entryPoint: "local:///usr/lib/spark/examples/src/main/python/pi.py"
entryPointArguments:
sparkSubmitParameters: "--conf spark.executor.instances=2 --conf spark.executor.memory=1G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
configurationOverrides: |
ApplicationConfiguration: null
MonitoringConfiguration:
CloudWatchMonitoringConfiguration:
LogGroupName: "/aws/eks/emr"
LogStreamNamePrefix: "hyesung"
PersistentAppUI: "ENABLED"
S3MonitoringConfiguration:
LogUri: "s3://hyesung/logs"
driver, executor pod 로그는 S3로 받도록 설정하였다. 여기서 주의할 것은, LogStreamNamePrefix를 설정하지 않으면 nill pointer에 대한 참조 에러가 난다. 이는 다음 코드에서 해당 config에 대한 nill check가 제대로 되어있지 않아 발생한 문제이며 추후 수정될 것으로 기대된다.
RBAC
아래 권한이 있어야 aws emr-containers service에서 해당 k8s api server를 통해 namespace에 접근할 수 있게 된다. aws emr-containers service는 service-linked role을 사용하며, 해당 role에 대한 aws-config 설정은 eks cluster 설치 시 설정하였음을 가정한다. 참고로 필자의 경우 terraform eks module을 사용하여 배포하였다.
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: emr-containers
namespace: {{ your-namespace }}
rules:
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get"]
- apiGroups: [""]
resources: ["serviceaccounts", "services", "configmaps", "events", "pods", "pods/log"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["create", "patch", "delete", "watch"]
- apiGroups: ["apps"]
resources: ["statefulsets", "deployments"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
- apiGroups: ["extensions", "networking.k8s.io"]
resources: ["ingresses"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
- apiGroups: ["rbac.authorization.k8s.io"]
resources: ["roles", "rolebindings"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "deletecollection", "annotate", "patch", "label"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "describe", "create", "edit", "delete", "annotate", "patch", "label"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: emr-containers
namespace: {{ your-namespace }}
subjects:
- kind: User
name: emr-containers
apiGroup: rbac.authorization.k8s.io
roleRef:
kind: Role
name: emr-containers
apiGroup: rbac.authorization.k8s.io
kustomization
# kustomization/kustomization.yaml
resources:
- virtualclusters.yaml
- jobruns.yaml
- rbac.yaml
아래 명령어를 통해 배포하고 나면 EMR on EKS console과 k9s console 모두에서 확인할 수 있다.
kubectl apply -f kustomization/kustomization.yaml
Helm Chart 를 사용한 production 배포
이제 PoC를 마쳤으니 프로덕션 환경 배포를 위한 helm chart를 작성해 보자.
우선 프로젝트 경로에서 아래 명령어를 실행하여 Chart를 초기화한다.
$ helm create mychart
그러면 왼쪽과 같은 구조의 프로젝트가 생성된다. 그리고 templates 디렉토리 내 모든 파일을 지우고 values.yaml 파일 내용을 초기화해주면 준비 완료.
templates 폴더 아래에 위 kustomization 예시에서 사용한 두 개의 파일을 생성한다. 다만, 이때는 template + values 랜더링을 통해 코드를 재사용하여 복수의 application을 배포할 것이기에 helm chart template을 사용한 차이점이 있다. kustomization에서는 이와 유사한 요구사항을 patch 를 사용하여 해결 할 수 있다.
values.yaml
기본적으로 사용할 values.yaml 값이다. 우선 모두 default null로 설정하고, 각 환경별 values-{{site}}.yaml 에서 이를 명시하여 사용하도록 했다.
# Default values for data-streaming.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
image:
repository: 801714584815.dkr.ecr.ap-northeast-2.amazonaws.com/data-etl
tag: ""
env: ""
emrReleaseLabel: "emr-6.10.0-20230421" # refer to https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-6x.html
emrBucket: ""
s3DeploymentURI: ""
s3LogURI: ""
aws:
region: ""
virtualClusters:
- containerProvider:
id: ""
name: ""
namespace: ""
targetNamespace: ""
- containerProvider:
id: ""
name: ""
namespace: ""
targetNamespace: ""
jobRuns:
- name: "" # must be less than 64 characters
appFile: ""
tableName: ""
spark:
driver:
cores: ""
memory: ""
instanceType: ""
envs: []
secrets: []
executor:
cores: ""
memory: ""
numInstances: ""
instanceType: ""
jobRunExecutionRoleARN: ""
jars: "" # comma separated list of jars to be added to the classpath of the driver and executor
volume:
enabled: false
prometheus:
enabled: false
templates/virtualclusters.yaml
{{- range .Values.virtualClusters }}
---
apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: VirtualCluster
metadata:
name: {{ .name }}
namespace: {{ default "default" $.Release.Namespace }}
spec:
name: {{ .name }}
containerProvider:
id: {{ .containerProvider.id }}
type_: EKS
info:
eksInfo:
namespace: {{ .targetNamespace }}
{{- end }}
templates/jobruns.yaml
entrypoint로 사용할 python file은 remote storage인 s3에 두고 사용하고 있다. emr-containers는 job-submitter, drvier-pod, executor-pod 3가지 컴포넌트를 실행한다. 이때 job-submitter는 원격 파일을 로컬로 다운받아 client 모드로 spark-submit을 하는 역할을 한다. 다음 영상을 참고하면 도움이 된다.
{{- $remoteDeploymentURI := .Values.s3DeploymentURI }}
{{- $region := .Values.aws.region }}
{{- range .Values.jobRuns }}
---
apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: JobRun
metadata:
name: {{ .name }}
namespace: {{ default "default" $.Release.Namespace }}
spec:
name: {{ .name }}
virtualClusterRef:
from:
name: {{ .virtualClusterName | quote }}
executionRoleARN: {{ $.Values.jobRunExecutionRoleARN | quote }}
releaseLabel: {{ $.Values.emrReleaseLabel | quote }}
jobDriver:
sparkSubmitJobDriver:
{{- if .appFile }}
entryPoint: "{{ $remoteDeploymentURI }}/{{ .appFile }}"
{{- else }}
entryPoint: "{{ $remoteDeploymentURI }}/{{ my-default-py-file(지워도 무방) }}"
{{- end }}
entryPointArguments:
- "--database"
- "stage"
- "--table-name"
- {{ .tableName | quote }}
sparkSubmitParameters: "--py-files {{ $remoteDeploymentURI }}/src.zip --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --jars https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.1/delta-core_2.12-2.1.1.jar,https://repo1.maven.org/maven2/io/delta/delta-storage/2.1.1/delta-storage-2.1.1.jar,https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.0/spark-avro_2.12-3.3.0.jar --conf spark.driver.cores={{ .spark.driver.cores | default "1" }} --conf spark.driver.memory={{ .spark.driver.memory | default "1g" }} --conf spark.executor.cores={{ .spark.executor.cores | default "1" }} --conf spark.executor.memory={{ .spark.executor.memory | default "1g" }} --conf spark.executor.instances={{ .spark.executor.numInstances | default "1" }} --conf spark.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.hive.metastore.glue.catalogid=697122891294 --conf spark.hadoop.fs.s3.canned.acl=BucketOwnerFullControl"
configurationOverrides: |
ApplicationConfiguration:
- Classification: spark-defaults
Properties:
spark.dynamicAllocation.enabled: "false"
spark.executor.processTreeMetrics.enabled: "true"
spark.kubernetes.container.image: "{{ $.Values.image.repository }}:{{ $.Values.image.tag }}"
spark.kubernetes.driver.annotation.prometheus.io/path: "/metrics/executors/prometheus/"
spark.kubernetes.driver.annotation.prometheus.io/port: "4040"
spark.kubernetes.driver.annotation.prometheus.io/scrape: "true"
spark.kubernetes.driver.node.selector.karpenter.sh/capacity-type: "on-demand"
spark.kubernetes.driver.node.selector.node.kubernetes.io/instance-type: {{ .spark.driver.instanceType | default "c6g.xlarge" | quote }}
{{- range $secretKey, $valueFrom := .spark.driver.secrets }}
spark.kubernetes.driver.secretKeyRef.{{ $secretKey }}: {{ $valueFrom | quote }}
{{- end }}
{{- range $envName, $value := .spark.driver.envs }}
spark.kubernetes.driverEnv.{{ $envName }}: {{ $value | quote }}
{{- end }}
spark.kubernetes.driverEnv.SITE: {{ $.Values.env | quote }}
spark.kubernetes.executor.node.selector.karpenter.sh/capacity-type: "spot"
spark.kubernetes.executor.node.selector.node.kubernetes.io/instance-type: {{ .spark.executor.instanceType | default "c6g.xlarge" | quote }}
spark.kubernetes.executor.podNamePrefix: {{ .name | quote }}
{{- if $.Values.volume.enabled }}
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.mount.path: "/var/data/spill"
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.mount.readOnly: "false"
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.options.claimName: "OnDemand"
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.options.sizeLimit: {{ $.Values.volume.size | quote }}
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-spill.options.storageClass: {{ $.Values.volume.storageClass | quote }}
{{- end }}
spark.kubernetes.node.selector.kubernetes.io/arch: "arm64"
spark.kubernetes.node.selector.kubernetes.io/os: "linux"
spark.kubernetes.node.selector.topology.kubernetes.io/zone: {{ $region | quote }}
{{- if $.Values.prometheus.enabled }}
spark.metrics.conf.*.sink.prometheusServlet.class: "org.apache.spark.metrics.sink.PrometheusServlet"
spark.metrics.conf.*.sink.prometheusServlet.path: "/metrics/driver/prometheus/"
spark.metrics.conf.applications.sink.prometheusServlet.path: "/metrics/applications/prometheus/"
spark.metrics.conf.master.sink.prometheusServlet.path: "/metrics/master/prometheus/"
spark.ui.prometheus.enabled: {{ $.Values.prometheus.enabled | quote }}
{{- end }}
- Classification: emr-job-submitter
Properties:
jobsubmitter.node.selector.karpenter.sh/capacity-type: "on-demand"
jobsubmitter.node.selector.node.kubernetes.io/instance-type: "c6g.xlarge"
jobsubmitter.node.selector.topology.kubernetes.io/zone: {{ $region | quote }}
MonitoringConfiguration:
CloudWatchMonitoringConfiguration:
LogGroupName: "/aws/eks/emr"
LogStreamNamePrefix: "hyesung"
PersistentAppUI: "ENABLED"
S3MonitoringConfiguration:
LogUri: {{ $.Values.s3LogURI | quote }}
{{- end }}
실제 팀에선 common 이라는 name의 chart를 정의하고 이를 subchart로 다운받아 사용하고 있다. 해당 Chart는 name, namespace에 대한 규격화, 공통 annotation 및 label 를 정의하는 목적으로 사용하고 있다. 아래는 간단한 예시이다.
# mychart/Chart.yaml
apiVersion: v2
appVersion: 0.1.0
description: A Helm chart for Kubernetes
name: poc
type: application
version: 0.1.0
dependencies:
- name: common
version: 1.x.x
repository: file://../common
# templates/jobruns.yaml
apiVersion: emrcontainers.services.k8s.aws/v1alpha1
kind: JobRun
metadata:
name: {{ printf "%s-%s" (include "common.names.deploymentName" $) (.tableName) | trunc 63 | replace "_" "-" | trimSuffix "-" }}
namespace: {{ include "common.names.namespace" $ | quote }}
labels: {{- include "common.labels.standard" $ | nindent 4 }}
{{- if $.Values.commonLabels }}
{{- include "common.tplvalues.render" ( dict "value" $.Values.commonLabels "context" $ ) | nindent 4 }}
{{- end }}
{{- if $.Values.commonAnnotations }}
annotations: {{- include "common.tplvalues.render" ( dict "value" $.Values.commonAnnotations "context" $ ) | nindent 4 }}
{{- end }}
... 이하 생략 ..
이상으로 ack-emrcontainers-controller를 배포하고 helm chart를 정의하여 eks cluster에 spark structured streaming application을 배포하는 방법에 대해 다뤄보았다. 배포한 애플리케이션은 datadog, prometheus 등을 통해 메트릭을 수집하여 grafana로 모니터링하고 있고, 각 플랫폼별 alert manager 등을 사용하여 slack으로 alert를 받아보고 있다.
+ CD로 사용한 argocd는 각자 다른 방식을 사용할 수 있다 생각하여 생략하였고(e.g. gitops repo에 merge되면 github actions에서 apply를 한다던가..?), 소스코드는 개인 github에 정리하여 올릴 예정입니다.
긴 글 읽어주셔서 감사합니다.
'Data Engineering > Apache Spark' 카테고리의 다른 글
Bloom Filter 를 사용해봅시다 [1] python, pyspark bloom filter 구현 (0) | 2024.06.17 |
---|---|
Spark Structured Streaming API의 Kafka Integration 옵션에 대한 이해 (0) | 2022.05.31 |
AWS EMR: EMRFS의 핵심 기능 들 feat. consistent view, S3-optimized committer (0) | 2022.04.28 |
Long running Spark Job Problem: NodeManager is unhealthy (0) | 2022.04.27 |
Pyspark 도입 후 고도화하기/ 4. Optimization feat. spark-default.conf (0) | 2021.11.02 |