Hyesung Oh

PyAthena를 사용한 AWS Athena cross account access feat. assume role chaining 본문

Cloud/AWS

PyAthena를 사용한 AWS Athena cross account access feat. assume role chaining

혜성 Hyesung 2023. 7. 22. 14:13
반응형

서론

사내 인프라는 개발 환경별로 별도 AWS 계정으로 운영중이고, 만찬가지로 팀에서 운영중인 데이터 인프라 또한 별도 AWS 계정으로 분리되어있다.  
Datalake로 부르는 S3는 운영환경 계정에 존재하며, 이는 빅데이터 특성상 방대한 양의 데이터를 환경별로 관리하는데 드는 비용과 데이터 저장 비용을 고려했을 때 합리적인 선택지였다.
하지만 이로 인해 인프라적인 복잡도가 다소 올라가긴하였다. Batch, Streaming workload는 모두 EKS위에서 동작 중이지만, RDBS와 S3, Athena 등의 데이터 소스는 모두 운영환경에만 존재하기 때문이다. 이 과정에서의 문제 해결경험들을 정리해보려한다.


상황 이해

상황은 아래 그림과 같다.

그림에선 생략했지만 Application은 Data 환경의 EKS Cluster 위에서 동작중이며, AWS Resource에 접근하기 위해 필요한 권한과 연결된 ServiceAccount를 사용하고있다.

AWS IAM with ServiceAccount 개념은 AWS 환경 뉴비들에겐 다소 생소한 내용일 수 있어 부연설명을 남긴다. 
1. Production환경에는 Athena에 접근할 수 있는 A라는 Role이 정의되어있다.
2. Data 환경에는 A in production Role을 assume 할 수 있는 권한을 가진 B라는 Role을 정의한다.
3. Application은 B Role과 연결된 ServiceAccount를 사용한다. 

관련하여 더 자세한 내용은 아래 포스트 참고
https://surgach.tistory.com/119

 

AWS EKS의 RBAC, IRSA 딥다이브

EKS의 권한 제어 관련해서 궁금했던게 많았던 터라 공부하면서 내 나름대로 이해한 내용을 정리해보았다. RBAC (role based access control) K&8에서 정의할 수 있는 리소스 객체들을 이용하여 접근 제어를

surgach.tistory.com

 

이와 같은 상황을 assume role chaining이라 한다.
https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_terms-and-concepts.html

 

Roles terms and concepts - AWS Identity and Access Management

If you are already using a service when it begins supporting service-linked roles, you might receive an email announcing a new role in your account. In this case, the service automatically created the service-linked role in your account. You don't need to

docs.aws.amazon.com


문제 상황

그리고 여기서 발생한 문제는 아래 세 가지이다. 

1. Cross Account로 Athena 접근시 NAT Gateway를 통해 나감으로 대용량 테이블 스캔시 네트워크 과금 이슈
2. pyathena.connection.Connection 클래스에서 role_arn + endpoint_url 파라미터 사용시 invalid parameter error 발생
3. assume role chaing의 경우 session의 유효시간이 1시간이 최대

1. 네트워크 과금 이슈 해결

이는 athena endpoint 설정을 통해 간단히 해결할 수 있다. 운영환경과 데이터 환경은 transit gateway를 통한 vpc peering이 되어있는 상황이었기 때문에, 운영환경의 athena에 endpoint 설정해주고 이를 connection에 사용하면 간단히 해결되었다.

def _connect():
    return _Connection(
        s3_staging_dir=configs.output_location,
        work_group=configs.workgroup,
        region_name=configs.region,
        role_arn=configs.role_arn,
        endpoint_url=configs.endpoint_url,
    )

데이터 환경의 어플리케이션의 요청은 transitgateway -> eni in private subnet -> athena로 요청이 가는 구조이다 . vpc endpoint에 대한 보다 자세한 내용은 아래 문서를 참고 바란다.
https://docs.aws.amazon.com/athena/latest/ug/interface-vpc-endpoint.html

 

Connect to Amazon Athena using an interface VPC endpoint - Amazon Athena

The following example allows requests by organization identities to organization resources and allows requests by AWS service principals. { "Version": "2012-10-17", "Statement": [ { "Sid": "AllowRequestsByOrgsIdentitiesToOrgsResources", "Effect": "Allow",

docs.aws.amazon.com

2. invalid parameter error 해결

기존 endpoint_url을 사용하기 전에는 문제가 없었는데, endpoint_url을 설정하고는 Connection 객체 초기화 단계에 assume_role하는 과정에서 문제가 생겼다. 아래는 Connection class의 __init__ 함수 일부이다.

if role_arn:
    creds = self._assume_role(
        self.profile_name,
        self.region_name,
        role_arn,
        role_session_name,
        duration_seconds,
    )
    self.profile_name = None
    self._kwargs.update(
        {
            "aws_access_key_id": creds["AccessKeyId"],
            "aws_secret_access_key": creds["SecretAccessKey"],
            "aws_session_token": creds["SessionToken"],
        }
    )

_assume_role은 아래와 같다.

    def _assume_role(
        self, profile_name, region_name, role_arn, role_session_name, duration_seconds
    ):
        # MFA is not supported. If you want to use MFA, create a configuration file.
        # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#assume-role-provider
        session = Session(profile_name=profile_name, **self._session_kwargs)
        client = session.client("sts", region_name=region_name, **self._client_kwargs)
        response = client.assume_role(
            RoleArn=role_arn,
            RoleSessionName=role_session_name,
            DurationSeconds=duration_seconds,
        )
        return response["Credentials"]

문제는 세번째줄 boto3 client의 session을 초기화 하는 과정이었다. 세번째 인자로 넘겨주는 _client_kwargs는 아래와 같이 정의되어있는데, 

_CLIENT_PASSING_ARGS = [
    "aws_access_key_id",
    "aws_secret_access_key",
    "aws_session_token",
    "config",
    "api_version",
    "use_ssl",
    "verify",
    "endpoint_url",
]

여기서 새롭게 추가한 endpoint_url 값을 session.client에서 지원하지 않기 때문에 발생한 문제이다.

def client(
    self,
    service_name,
    region_name=None,
    api_version=None,
    use_ssl=True,
    verify=None,
    endpoint_url=None,
    aws_access_key_id=None,
    aws_secret_access_key=None,
    aws_session_token=None,
    config=None,
):

이 또한 아래와 같이 Connection 클래스를 상속하여  _assume_role 함수를 overriding 하여 간단히 해결할 수 있었다.

class _Connection(pa_conn.Connection):
    def _assume_role(self, profile_name, region_name, role_arn, role_session_name, duration_seconds):
        # MFA is not supported. If you want to use MFA, create a configuration file.
        # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html#assume-role-provider

        session = boto3_session.Session(profile_name=profile_name, **self._session_kwargs)
        # boto3_session.Session() doesn't support passing in client_kwargs parameter.
        client = session.client("sts", region_name=region_name)
        response = client.assume_role(
            RoleArn=role_arn,
            RoleSessionName=role_session_name,
            DurationSeconds=duration_seconds,
        )
        return response["Credentials"]

 

3. assume role chaining의 경우 세션 유효시간 최대 1시간 문제 해결

아래와 같이 cursor를 받아와 데이터를 yield하는 Iterator가 있고, 반복문을 돌며 chunk만큼의 데이터를 특정 Sink로 write를 하는 작업이 있다고 할 때, 데이터가 많은 경우 1시간을 넘기는 경우가 많았다.

cursor = athena.cursor()
cursor = cursor.execute(select())
for row in cursor:
    sink.write(row)

이를 해결 하기 위해, 반복문을 돌며 1시간이 초과했을 경우 cursor를 초기화해주도록 수정하였다.

def get_fresh_cursor(cursor_: pa_cursor.Cursor = None):
    logger.info("get fresh token")

    fresh_conn = _connect()
    fresh_cursor = fresh_conn.cursor()

    if cursor_ is not None:
        # pylint:disable=protected-access
        fresh_cursor._query_id = cursor_._query_id
        fresh_cursor._result_set = cursor_._result_set
        fresh_cursor._result_set._connection = fresh_conn
        logger.info(f"continue on query {fresh_cursor._query_id}")
    return fresh_cursor

athena client는 query를 execution 하고 query_id (식별자) 를 받아온다. 그리고 이 식별자를 다시 요청하여 쿼리 결과를 순차적으로 fetch 해오는데, 매 요청마다 다음에 읽어야할 next cursor 정보를 가지고 있다. 따라서 새롭게 생성한 cursor에 이전 old cursor의 해당 정보들을 그대로 복사해와야한다. 안그러면 cursor를 새롭게 생성하는 의미가 없기 때문이다.

사실, 이렇게 까지 해야하나 싶은 순간도 있었지만 문제 해결과정에서 꽤나 재밌는 경험들을 하여 포스트까지 남기게 되었다. 

이상으로 긴글 읽어주셔서 감사합니다. 

 

반응형
Comments