Hyesung Oh

Spark Structured Streaming API의 Kafka Integration 옵션에 대한 이해 본문

Data Engineering/Apache Spark

Spark Structured Streaming API의 Kafka Integration 옵션에 대한 이해

혜성 Hyesung 2022. 5. 31. 08:51
반응형

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) 기준 내용이며, 실무에서 Kafka를 컨슈밍하는 Spark 스트리밍 어플리케이션을 운영하며 나름대로 적용하고 실험해본 옵션들에 대한 본인만의 노트인 점 참고바란다.


kafka.group.id (default None)
kafka consumer는 항상 하나의 consumer group에 속하게 되는데 이에 대한 식별자이다. Spark 내부적으로 스트리밍 쿼리마다 default generation 대상이며, 이를 특별히 지정하고자 하는 경우 사용한다(예를 들어 Kafka group-based authorization). 하지만 이를 사용하게 되면 batch, streaming 등의 concurrently 실행되는 쿼리간의 간섭으로 인해 예상치 못한 데이터 읽기 동작 결과를 초래할 수 있으니 유의해서 사용해야하며 sesssion timeout 과 같은 값을 매우 작게 설정하는게 하나의 팁이 될 수 있다.

startingTimestamp (default None)
컨슈머에서 읽기 지정한 토픽의 모든 파티션에서 특정 Timestamp와 매치되는 offset부터 읽어오게 된다. 단, 매칭되는 offset이 없을 시 startingOffsetsByTimestampStrategy 설정 값을 따르게 된다. startingOffsetsByTimestampStrategy default 값은 error이다. default None이며 문서상 next preference는 startingOffsetsByTimestamp이다. (startingOffsetsByTimestamp의 precedence라는 의미와 동일)

startingOffsetsByTimestamp (default None)
startingTimestamp 옵션과 차이점은 토픽의 특정 파티션 단위로 지정할 수 있는 Timetstamp offset 값이다. 마찬가지로 매칭되는 offset이 없을시 startingOffsetsByTimestampStrategy에 따라 동작하게 된다. 바로 다음으로 다룰 startingOffsets 옵션의 precedence이다.

startingOffsets (default 'latest' for streaming, 'earliest' for batch)
TopicPartition 단위로 지정할 수 있으며 latest, earliest는 말그대로를 의미한다. 이는 json 스트링으로 지정할 수도 있고 -2=earliest, -1=latest를 의미한다. 단, latest는 batch query의 경우 not allowed 옵션이다. 
예시: """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
중요한 점은, 이 옵션은 query가 새롭게 시작될 때만 적용된다는 점이다. 알다시피 Spark Strutured Streaming은 Topic Offest을 자체적인 Checkpoint로 관리하며 우리의 경우 저장소로 S3를 사용하고 있다.
이는 reconsuming 시에 가장 첫 번째로 최근 offest 조회 대상이다. 따라서, startingOffsets latest는 query가 처음으로 시작 되는 경우에만 적용되며 (checkpoint location empty) 컨슈밍 동안 최초로 발견된 partition (증설된)에 대해선 무조건 earliest를 취한다.

*Checkpoint location은 Recovery from failure 및 Fault tolerance를 위한 중요한 설정이기에 공식 문서를 참고하길 바란다.

위와 같은 맥락으로 endingTimestamp, endingOffsetsByTimestamp, endingOffsets도 설정할 수 있으니 문서를 참고 바란다.

failOnDataLoss (default true)
Checkpoint location에 마지막으로 남은 offset 이후 부터 쿼리를 재개하려 하지만 해당 offest 데이터의 TTL이 지났거나 또는 장애로 인해 Kafka 브로커의 offset이 유실된 경우 failOnDataLoss에 따라 동작하며 default true이므로 실패하게 된다.

이와 비슷한 맥락으로 위에서 startingOffset과 같은 옵션으로 지정해준 offset이 없는 경우 startingOffsetsByTimestampStrategy (default error)에 따라 동작한다고 했다. 하지만 startingOffest, startingOffsetsByTimestampStrategy는 쿼리 최초 실행 시 적용되는 옵션인 것과 반대로 failOnDataLoss는 쿼리 resume의 경우만 적용되는 옵션이다.

이해를 돕기 위해 하나의 예시를 들어보자.
startingOffest="""{"topciA": {"0":23}}""", startingOffsetsByTimestampStrategy=latest, failOnDataLoss=true로 세팅하고 쿼리를 최초 실행했고 만약 topicA의 0번 파티션에 23번(편의상 23번으로 표기하였으나 timestamp를 의미) offset이 없는 경우 결과는 어떻게 될까?
정답은, error 없이 해당 파티션의 latest offset 부터 컨슈밍하게 되며 이후 checkpoint location에 commit된 offset에 따라 컨슈밍을 이어나가게 된다. 

maxOffsetsPerTrigger (default None)
컨슈밍할 offset range에 대한 제한이며 rate limit과 동일하게 이해하면 된다. 이는 pull 방식의 카프카 컨슈머를 안정적으로 운영할 수 있게 해주는 옵션이라 유용하게 사용하고 있다. 하지만 이 값이 producing 되는 데이터에 비해 현저하게 낮을 경우, 컨슈머 lag가 지속적으로 증가하게 되고 offset의 TTL 설정 값에 따라 데이터 유실이 일어날 수 있게 되니 모니터링하며 그에 맞게 튜닝해주는 것이 바람직하다. 설정한 값은 컨슈밍 대상 파티션에 고루 분배되게 된다. 자세한 분배 알고리즘이 궁금하며 Spark Structured Streaming 소스코드를 확인해볼 예정이다.

maxTriggerDelay (default 15m)
maxOffsetsPerTrigger가 설정되었을 때 함께 사용되는 옵션 중에 하나이다. trigger interval이 10분으로 설정했다고 가정하자. 만약 직전 마이크로 배치의 프로세싱 타임이 10분을 초과하여 12분이 걸리면 Delay는 2분이다. default 15m이므로 최대 15분을 기다리고 다음 trigger를 작동하여 마이크로 배치를 시작한다. 당연히 이 과정이 누적되게 되면 Spark 클러스터 노드에 부하가 생길 것이며 장애상황으로 이어질 것이다. 이와 같은 상황을 방지하기 위해 개인적으로는 maxOffsetsPerTrigger를 조금 여유있게 (작게) 잡는 것이 좋다고 생각한다.

minPartitions (default None)
default None이라고 해서 partition을 컨슈밍하지 않는다고 생각하지 않길 바란다. 개인적으로 Spark Structured Streaming으로 Kafka Integration 작업 과정에서 제일 중요하다 생각하며, 가장 첫 번째로 궁금했던 사항이기도 하여 포스팅 도입부에 링크한 원문을 인용하겠다.

 By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. If you set this option to a value greater than your topicPartitions, Spark will divvy up large Kafka partitions to smaller pieces. Please note that this configuration is like a hint: the number of Spark tasks will be approximately minPartitions. It can be less or more depending on rounding errors or Kafka partitions that didn't receive any new data.

 

위에서 설명한 옵션들과는 달리 Spark Structured Streaming에서 지정할 수 없는 옵션들이 있다. 처음 Kafka Integration 작업을 진행하며 다소 혼란을 야기했던 부분이기도 하여 따로 첨부해 보았다. 

Kafka Specific Configurations (출처)

  • group.id: Kafka source will create a unique group id for each query automatically. The user can set the prefix of the automatically generated group.id’s via the optional source option groupIdPrefix, default value is “spark-kafka-source”. You can also set “kafka.group.id” to force Spark to use a special group id, however, please read warnings for this option and use it with caution.
  • auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off. Note that when the offsets consumed by a streaming application no longer exist in Kafka (e.g., topics are deleted, offsets are out of range, or offsets are removed after retention period), the offsets will not be reset and the streaming application will see data loss. In extreme cases, for example the throughput of the streaming application cannot catch up the retention speed of Kafka, the input rows of a batch might be gradually reduced until zero when the offset ranges of the batch are completely not in Kafka. Enabling failOnDataLoss option can ask Structured Streaming to fail the query for such cases.
  • key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
  • value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
  • key.serializer: Keys are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
  • value.serializer: values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the values into either strings or byte arrays.
  • enable.auto.commit: Kafka source doesn’t commit any offset.
  • interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.

나의 경우 auto.offset.reset 를 지정할 수 없다는 부분이 처음에 다소 의아했던 부분이기도 하나 지금은 잘이해가 된다.

Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed

Kafka Consumer에 의존하기 보단 Checkpoint location을 통한 Structured Streaming의 자체적인 offset 관리를 통해 data missing을 strict하게 관리, exactly once를 보장하기 위함으로 이해가 된다.

Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off. Note that when the offsets consumed by a streaming application no longer exist in Kafka (e.g., topics are deleted, offsets are out of range, or offsets are removed after retention period), the offsets will not be reset and the streaming application will see data loss

위에서 설명한 failOnDataLoss 값이 default true이므로 컨슈밍 lag를 따라잡지 못할 시 설정된 TTL에 따라 언젠가 streaming application이 실패 후 종료 될 수 있었다. 이는 운영환경에서는 당연히 모니터링 및 관리되어야 할 대상이므로 바람직한 동작이라 생각된다.
그런데 테스트나 스테이지 환경에서 간헐적으로 실행 시 Checkpoint location에 commit된 가장 최근 offset은 당연히 Kafka broker에서 사라진 경우가 대부분이므로 (TTL), failOnDataLoss false로 설정하지 않으면 어플리케이션이 실패하게 된다. 이는 SITE 환경 변수에 따라 failOnDataLoss 값을 다르게 설정하도록 하여 간단히 해결할 수 있었다. 

 

더 많고 디테일한 내용은 아래 링크를 참고
- https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations

 

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.2.1 Documentation

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) Structured Streaming integration for Kafka 0.10 to read data from and write data to Kafka. Linking For Scala/Java applications using SBT/Maven project definitions, link

spark.apache.org

- https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

반응형
Comments