일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- redis bloom filter
- spark
- 하둡에코시스템
- Data engineering
- kafka
- AWS SageMaker
- hadoop
- dataengineer
- Python
- 빅데이터
- 데이터엔지니어링
- BigData
- apache spark
- cloudera
- DataEngineering
- 빅데이터플랫폼
- 블로그
- 개발자혜성
- pyspark
- kubernetes
- 개발자
- 추천시스템
- 데이터엔지니어
- Spark structured streaming
- 클라우데라
- 하둡
- mlops
- Terraform
- eks
- recommendation system
- Today
- Total
Hyesung Oh
PyAthena를 사용한 AWS Athena cross account access feat. assume role chaining 본문
PyAthena를 사용한 AWS Athena cross account access feat. assume role chaining
혜성 Hyesung 2023. 7. 22. 14:13서론
사내 인프라는 개발 환경별로 별도 AWS 계정으로 운영중이고, 만찬가지로 팀에서 운영중인 데이터 인프라 또한 별도 AWS 계정으로 분리되어있다.
Datalake로 부르는 S3는 운영환경 계정에 존재하며, 이는 빅데이터 특성상 방대한 양의 데이터를 환경별로 관리하는데 드는 비용과 데이터 저장 비용을 고려했을 때 합리적인 선택지였다.
하지만 이로 인해 인프라적인 복잡도가 다소 올라가긴하였다. Batch, Streaming workload는 모두 EKS위에서 동작 중이지만, RDBS와 S3, Athena 등의 데이터 소스는 모두 운영환경에만 존재하기 때문이다. 이 과정에서의 문제 해결경험들을 정리해보려한다.
상황 이해
상황은 아래 그림과 같다.
그림에선 생략했지만 Application은 Data 환경의 EKS Cluster 위에서 동작중이며, AWS Resource에 접근하기 위해 필요한 권한과 연결된 ServiceAccount를 사용하고있다.
AWS IAM with ServiceAccount 개념은 AWS 환경 뉴비들에겐 다소 생소한 내용일 수 있어 부연설명을 남긴다.
1. Production환경에는 Athena에 접근할 수 있는 A라는 Role이 정의되어있다.
2. Data 환경에는 A in production Role을 assume 할 수 있는 권한을 가진 B라는 Role을 정의한다.
3. Application은 B Role과 연결된 ServiceAccount를 사용한다.
관련하여 더 자세한 내용은 아래 포스트 참고
https://surgach.tistory.com/119
이와 같은 상황을 assume role chaining이라 한다.
https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html
문제 상황
그리고 여기서 발생한 문제는 아래 세 가지이다.
1. Cross Account로 Athena 접근시 NAT Gateway를 통해 나감으로 대용량 테이블 스캔시 네트워크 과금 이슈
2. pyathena.connection.Connection 클래스에서 role_arn + endpoint_url 파라미터 사용시 invalid parameter error 발생
3. assume role chaing의 경우 session의 유효시간이 1시간이 최대
1. 네트워크 과금 이슈 해결
이는 athena endpoint 설정을 통해 간단히 해결할 수 있다. 운영환경과 데이터 환경은 transit gateway를 통한 vpc peering이 되어있는 상황이었기 때문에, 운영환경의 athena에 endpoint 설정해주고 이를 connection에 사용하면 간단히 해결되었다.
def _connect():
return _Connection(
s3_staging_dir=configs.output_location,
work_group=configs.workgroup,
region_name=configs.region,
role_arn=configs.role_arn,
endpoint_url=configs.endpoint_url,
)
데이터 환경의 어플리케이션의 요청은 transitgateway -> eni in private subnet -> athena로 요청이 가는 구조이다 . vpc endpoint에 대한 보다 자세한 내용은 아래 문서를 참고 바란다.
https://docs.aws.amazon.com/athena/latest/ug/interface-vpc-endpoint.html
2. invalid parameter error 해결
기존 endpoint_url을 사용하기 전에는 문제가 없었는데, endpoint_url을 설정하고는 Connection 객체 초기화 단계에 assume_role하는 과정에서 문제가 생겼다. 아래는 Connection class의 __init__ 함수 일부이다.
if role_arn:
creds = self._assume_role(
self.profile_name,
self.region_name,
role_arn,
role_session_name,
duration_seconds,
)
self.profile_name = None
self._kwargs.update(
{
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds["SecretAccessKey"],
"aws_session_token": creds["SessionToken"],
}
)
_assume_role은 아래와 같다.
def _assume_role(
self, profile_name, region_name, role_arn, role_session_name, duration_seconds
):
# MFA is not supported. If you want to use MFA, create a configuration file.
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#assume-role-provider
session = Session(profile_name=profile_name, **self._session_kwargs)
client = session.client("sts", region_name=region_name, **self._client_kwargs)
response = client.assume_role(
RoleArn=role_arn,
RoleSessionName=role_session_name,
DurationSeconds=duration_seconds,
)
return response["Credentials"]
문제는 세번째줄 boto3 client의 session을 초기화 하는 과정이었다. 세번째 인자로 넘겨주는 _client_kwargs는 아래와 같이 정의되어있는데,
_CLIENT_PASSING_ARGS = [
"aws_access_key_id",
"aws_secret_access_key",
"aws_session_token",
"config",
"api_version",
"use_ssl",
"verify",
"endpoint_url",
]
여기서 새롭게 추가한 endpoint_url 값을 session.client에서 지원하지 않기 때문에 발생한 문제이다.
def client(
self,
service_name,
region_name=None,
api_version=None,
use_ssl=True,
verify=None,
endpoint_url=None,
aws_access_key_id=None,
aws_secret_access_key=None,
aws_session_token=None,
config=None,
):
이 또한 아래와 같이 Connection 클래스를 상속하여 _assume_role 함수를 overriding 하여 간단히 해결할 수 있었다.
class _Connection(pa_conn.Connection):
def _assume_role(self, profile_name, region_name, role_arn, role_session_name, duration_seconds):
# MFA is not supported. If you want to use MFA, create a configuration file.
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#assume-role-provider
session = boto3_session.Session(profile_name=profile_name, **self._session_kwargs)
# boto3_session.Session() doesn't support passing in client_kwargs parameter.
client = session.client("sts", region_name=region_name)
response = client.assume_role(
RoleArn=role_arn,
RoleSessionName=role_session_name,
DurationSeconds=duration_seconds,
)
return response["Credentials"]
3. assume role chaining의 경우 세션 유효시간 최대 1시간 문제 해결
아래와 같이 cursor를 받아와 데이터를 yield하는 Iterator가 있고, 반복문을 돌며 chunk만큼의 데이터를 특정 Sink로 write를 하는 작업이 있다고 할 때, 데이터가 많은 경우 1시간을 넘기는 경우가 많았다.
cursor = athena.cursor()
cursor = cursor.execute(select())
for row in cursor:
sink.write(row)
이를 해결 하기 위해, 반복문을 돌며 1시간이 초과했을 경우 cursor를 초기화해주도록 수정하였다.
def get_fresh_cursor(cursor_: pa_cursor.Cursor = None):
logger.info("get fresh token")
fresh_conn = _connect()
fresh_cursor = fresh_conn.cursor()
if cursor_ is not None:
# pylint:disable=protected-access
fresh_cursor._query_id = cursor_._query_id
fresh_cursor._result_set = cursor_._result_set
fresh_cursor._result_set._connection = fresh_conn
logger.info(f"continue on query {fresh_cursor._query_id}")
return fresh_cursor
athena client는 query를 execution 하고 query_id (식별자) 를 받아온다. 그리고 이 식별자를 다시 요청하여 쿼리 결과를 순차적으로 fetch 해오는데, 매 요청마다 다음에 읽어야할 next cursor 정보를 가지고 있다. 따라서 새롭게 생성한 cursor에 이전 old cursor의 해당 정보들을 그대로 복사해와야한다. 안그러면 cursor를 새롭게 생성하는 의미가 없기 때문이다.
사실, 이렇게 까지 해야하나 싶은 순간도 있었지만 문제 해결과정에서 꽤나 재밌는 경험들을 하여 포스트까지 남기게 되었다.
이상으로 긴글 읽어주셔서 감사합니다.
'Cloud > AWS' 카테고리의 다른 글
Aurora3.0 Mysql 8.0 TempTable engine 동작 이해 (0) | 2024.03.30 |
---|---|
[AWS/cloud] 왕초보도 따라하는 AWS EC2 ubuntu 인스턴스 생성 및 Elastic Beanstalk 이용한 Django 웹 어플리케이션 배포하기 (0) | 2019.12.26 |