Hyesung Oh

Deep Dive - Spark JDBC to Other devices feat. Connection Pool 본문

카테고리 없음

Deep Dive - Spark JDBC to Other devices feat. Connection Pool

혜성 Hyesung 2024. 3. 30. 19:42
반응형

개요

Spark에서는 JDBC api를 통해 접근할 수 있는 datasource(dbms)를 지원합니다.
jdbc datasource를 사용하기 위해선 JDBC interface를 구현한 Driver class가 필요합니다.
*현재 사용 중인 mysql-connector-java-8.0.23을 기준으로 작성했습니다.

spark.read.jdbc option으로 driver class path를 아래와 같이 설정해주면 됩니다.
driver class path: com.mysql.cj.jdbc.Driver

# Read from MySQL Table
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/emp") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "employee") \
    .option("user", "root") \
    .option("password", "root") \
    .load()

Mysql Connector/J

https://dev.mysql.com/doc/connector-j/en/connector-j-installing.html

공식문서에서 DriverManager를 사용한 connection 획득 방법에 대해서만 설명하고 있습니다만, JDBC 3.0 interface 에선 javax.sql.DataSource 라는 표준 인터페이스를 제공하여 이를 권장하고 있습니다.
mysql connector/j 에도 MysqlDataSource 가 구현되어 있습니다. 기본적으로 DriverManager와 사용방식은 비슷합니다.
아래는 mysql connector/j의 문서에 짧게 소개된 DataSource 사용법입니다.

To create a Statement instance, you call the createStatement() method on the Connection object you have retrieved using one of the DriverManager.getConnection() or DataSource.getConnection() methods described earlier.

DriverManager → DataSource 장점?

설정값과 소스코드 분리

하지만 차이점이 있습니다. DriverManager의 경우 매 connection 획득시 connection property를 인자로 넘겨야하지만, DataSource는 DataSource 객체 선언시 property를 넘겨주고 생성된 객체의 .getConnection 메소드만 호출해주면 됩니다.

// DriverManager
Connection con1 = DriverManager.getConnection(URL, USERNAME, PASSWORD);

// DataSource
DataSource dataSource = new DataSource(URL, USERNAME, PASSWORD);
Connection con1 = dataSource.getConnection();

따라서 DataSource를 사용하는 방법은 설정(URL, ID, PASSWORD)과 사용이 분리되어 있어서 향후 변경에 더 유연하게 대처할 수 있습니다.

Connection Pooling feat. DBCP

과거에는 mysql connector java(현재 mysql connector/j) 에서는 connection pooling을 지원하지 않았고 connection pool 기능을 위해 별도 dbcp(database connection pool) library를 사용해야 했습니다.(commons-dbcp 동작 방식 관련 여기 참고).

이후 JDBC 3.0 interface 에서 소개된 DataSource를 각 library에서 구현체를 지원하기 시작했고 (e.g. HikariCP는 DataSource interface를 구현한 HikariDataSource를 제공), 현재는 mysql connector/j에서도 MysqlConnectionPoolDataSource 를 자체적으로 지원하고 있습니다.

따라서 아래와 같이 Connection Pool 구현체를 변경하고 싶더라도 어플리케이션 코드는 interface에만 의존하고 있기 때문에 변경없이 DataSource 설정 값만 변경해주면 됩니다.

Spark JDBC

다음으로 Spark에서 이를 어떤식으로 활용하여 data를 dbms로 부터 가져오고 있는지 코드 레벨에서 알아볼 차례입니다.

Sparkr JDBC to Other Databases 문서를 보면 아래와 같이 설명하고있습니다.

Spark SQL also includes a data source that can read data from other databases using JDBC. This functionality should be preferred over using JdbcRDD. This is because the results are returned as a DataFrame and they can easily be processed in Spark SQL or joined with other data sources.

JdbcRDD Class를 직접 사용하지말고 이를 wrapping한 high level interface(DataFrameReader)를 사용하라고 권장하고 있습니다.

spark.read.jdbc 를 호출했을 때 어떤 일이 일어나는지 순서대로 나열해보겠습니다.

  1. spark.read.jdbc에서 JDBCRelation 객체를 생성합니다.
  2. JDBCRelationbuildScan 함수를 호출합니다.
  3. buildScan에서 JdbcRDD.buildScan을 호출합니다.
  4. JdbcDialects.createConnectionFactory 를 호출합니다.

여기까지 알게 된 사실은,

  • spark.read.jdbc는 JdbcRDD wrapper이다.
  • JdbcDialects클래스의 createConnectionFactory에서 connection 관련한 일들이 일어나는 거 같다.

정도 되겠습니다.

이제 본격적으로 JdbcDialects.createConnectionFactory에서 무슨일이 일어나는지 알아봅시다.

@Since("3.3.0")
def createConnectionFactory(options: JDBCOptions): Int => Connection = {
  val driverClass: String = options.driverClass
  (partitionId: Int) => {
    DriverRegistry.register(driverClass)
    val driver: Driver = DriverRegistry.get(driverClass)
    val connection =
      ConnectionProvider.create(driver, options.parameters, options.connectionProviderName)
    require(connection != null,
      s"The driver could not open a JDBC connection. Check the URL: ${options.getRedactUrl()}")
    connection
  }
}

*사전지식: 우선 위에서 설정해준 driverClass 는 spark runtime에 DriverRegistry(repository pattern)에 등록되게 됩니다.

createConnectionFactory는 DriverRegistry에 등록된 driver 객체를 불러오고, ConnectionProvider.create method에 넘겨주면 비로서 우리가 원하는 connection을 획득한다는 사실을 알게 되었습니다.

거의 다 왔습니다. 마지막으로 ConnectionProvider.create는 어떤 역할을 하는지 알아봅시다. Spark JDBC 공식문서에선 connectionProvider 을 지원하며, 별도 지정없을 시 built-in connection provider를 사용한다고 설명하고 있습니다.

There is a built-in connection providers for the following databases:

  • DB2
  • MariaDB
  • MS Sql
  • Oracle
  • PostgreSQL

코드가 다소 길지만 핵심은 간단합니다.

1. loadProviders 에서 JdbcConnectionProvider 를 구현한 Provider class를 load하여 providers에 저장한다.

def loadProviders(): Seq[JdbcConnectionProvider] = {
    val loader = ServiceLoader.load(classOf[JdbcConnectionProvider],
      Utils.getContextOrSparkClassLoader)
    val providers = mutable.ArrayBuffer[JdbcConnectionProvider]()

    val iterator = loader.iterator
    while (iterator.hasNext) {
      try {
        val provider = iterator.next
        logDebug(s"Loaded built-in provider: $provider")
        providers += provider
      } catch {
        case t: Throwable =>
          logError("Failed to load built-in provider.")
          logInfo("Loading of the provider failed with the exception:", t)
      }
    }

2. 구현된 built-in Provider중에 해당 driver를 지원할 수 있는 provider를 filter하고, optional로 넘겨준 connectionProviderName가 있으면 그에 맞는 provider 쓰고, 없으면 filteredProviders (driver 지원가능한 provider)가 1개 이상이면 error, 아니면 첫 번째 provider를 사용한다.

val filteredProviders = providers.filter(_.canHandle(driver, options))

val selectedProvider = connectionProviderName match {
    case Some(providerName) =>
      // It is assumed that no two providers will have the same name
      filteredProviders.find(_.name == providerName).getOrElse {
        throw new IllegalArgumentException(
          s"Could not find a JDBC connection provider with name '$providerName' " +
          "that can handle the specified driver and options. " +
          s"Available providers are ${providers.mkString("[", ", ", "]")}")
      }
    case None =>
      if (filteredProviders.size != 1) {
        throw new IllegalArgumentException(
          "JDBC connection initiated but more than one connection provider was found. Use " +
          s"'${JDBCOptions.JDBC_CONNECTION_PROVIDER}' option to select a specific provider. " +
          s"Found active providers ${filteredProviders.mkString("[", ", ", "]")}")
      }
      filteredProviders.head
  }

3. 최종 선택된 provider의 getConnection method를 호출한다.

selectedProvider.getConnection(driver, options)

built-in ConnectionProvider 구현체들은 여기서 확인 할 수 있습니다.

ConnectionProvider들은 JdbcConnectionProvider를 상속하여 getConnection method를 구현하고 있습니다. 그 중 BasicConnectionProvider를 살펴보겠습니다.

private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with Logging {
  /**
   * Additional properties for data connection (Data source property takes precedence).
   */
  def getAdditionalProperties(options: JDBCOptions): Properties = new Properties()

  override val name: String = "basic"

  override def canHandle(driver: Driver, options: Map[String, String]): Boolean = {
    val jdbcOptions = new JDBCOptions(options)
    jdbcOptions.keytab == null || jdbcOptions.principal == null
  }

  override def getConnection(driver: Driver, options: Map[String, String]): Connection = {
    val jdbcOptions = new JDBCOptions(options)
    val properties = getAdditionalProperties(jdbcOptions)
    jdbcOptions.asConnectionProperties.asScala.foreach { case(k, v) =>
      properties.put(k, v)
    }
    logDebug(s"JDBC connection initiated with URL: ${jdbcOptions.getRedactUrl()} " +
      s"and properties: $properties")
    driver.connect(jdbcOptions.url, properties)
  }

  override def modifiesSecurityContext(
    driver: Driver,
    options: Map[String, String]
  ): Boolean = {
    // BasicConnectionProvider is the default unsecure connection provider, so just return false
    false
  }
}

아..! 결국 타고타고 들어가서 driver.connect(jdbcOptions.url, properties)를 호출하게 되는 것이고, ConnectionProvider는 mysql connector/j의 DriverManager or DataSource 를 대신하는 역할이란 것 까지 알게 되었습니다.

그리고 한 가지 더 알게된 중요한 사실은, 왜 Spark가 표준 interface인 DataSource를 아직 지원하지 않는지는 모르겠으나, 현재 Connection Pooling을 지원하지 않는다는 사실입니다.

결과적으로 테이블을 predicates 만큼 쪼개서 병렬로 들고 올 때 각 executor의 Task 마다 connection을 생성하며, retry시에 새로운 connection이 새롭게 생성된다는 사실도 유추할 수 있게 되었습니다.

def jdbc(
    url: String,
    table: String,
    predicates: Array[String],
    connectionProperties: Properties): DataFrame = {
  assertNoSpecifiedSchema("jdbc")
  // connectionProperties should override settings in extraOptions.
  val params = extraOptions ++ connectionProperties.asScala
  val options = new JDBCOptions(url, table, params)
  val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
    JDBCPartition(part, i) : Partition
  }
  val relation = JDBCRelation(parts, options)(sparkSession)
  sparkSession.baseRelationToDataFrame(relation)
}

사실 조금 더 생각해보면, Spark는 분산환경에서 실행되는 각 executor 끼리 Pool을 공유할 일이 없으며, 각 executor 들도 1회 성 Job ( > Task) 를 실행하기 때문에 Batch Job에서 Pooled connection은 실효성이 없긴 하겠습니다. 

TroubleShooting

단, Spark는 아래 설정 값 (deafult 4)에 따라 실패한 Task를 retry 하는 mechanism을 가지고 있기 때문에 규모가 큰 Dump Job의 경우

spark.task.maxFailures

운영환경 Database의 client connection request 지표가 과도하게 증가할 수 있습니다. 실제 팀에서 무거운 테이블을 꽤 큰 수의 파티션으로 나누어 덤프를 할 시에, 특정 테이블의 경우 wait/io/socket/client_connection 값이 과도하게 증가하는 이슈가 있었습니다.

이는 Spark executor 당 core 수를 줄이고 core 당 memory를 늘리는 방향으로 간단히 튜닝 후 해결 할 수 있었습니다. 

지금까지 실무에서 겪었던 AWS Aurora MySQL Table Dump시 겪었던 client_connection 이상 증가 현상을 디버깅하는 과정에서 Spark JDBC를 Deep dive 해본 소소한 후기였습니다.

틀린 내용있으면 언제든지 지적 부탁드립니다. 감사합니다.

반응형
Comments