Posts Spring Kafka에서 최신 메시지만 소비하는 Pub/Sub 방식 설정
Post
Cancel

Spring Kafka에서 최신 메시지만 소비하는 Pub/Sub 방식 설정

설명

  • Kafka를 Pub/Sub 방식으로 활용하고 싶을 때,
    특정 토픽의 가장 최신 메시지만 소비하는 구조가 필요하다.
    이 글에서는 Spring Kafka에서
    auto-offset-reset: latest와 seekToEnd() 설정을 조합하여,
    컨슈머가 파티션에 연결되었을 때 항상 최신 메시지만
    소비하도록 설정하는 방법을 소개한다.

auto-offset-reset: latest

  • 컨슈머 그룹이 아직 오프셋을 저장하지 않은 파티션을
    처음 읽을 때 어디서부터 읽을지를 지정한다.
  • latest로 설정하면, 해당 컨슈머 그룹은
    파티션의 가장 마지막 메시지 이후부터 소비를 시작한다.
  • 즉, 기존 메시지는 건너뛰고 새로 들어오는 메시지부터 읽는다.

seekToEnd()

  • 리밸런싱 등으로 컨슈머가 파티션을 할당받을 때마다
    seekToEnd를 실행하여
    이전에 읽던 위치와 상관없이
    현재 파티션의 마지막 메시지 이후부터
    새로 도착하는 메시지만 읽게 된다.

정책

  • Pub/Sub 대상이 되는 토픽을 구독하는 컨슈머 그룹 ID에
    컨테이너 ID가 포함되도록 설정한다.
    (컨슈머 그룹 ID를 동적으로 할당)
  • 이를 통해 새 컨테이너가 생성될 시 자동으로 새 컨슈머 그룹 ID를 갖게 되고
    latest 설정에 의해 최신 메세지만 받게 된다.
  • 컨테이너가 재시작되어 기존 컨슈머 그룹 ID로 Kafka에 재연결되는 경우에도,
    seekToEnd() 덕분에 이전 오프셋이 아닌
    가장 최신 메시지부터 소비를 시작할 수 있다.

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:  
kafka:  
  bootstrap-servers: "kafka:9092"  
  producer:  
    key-serializer: org.apache.kafka.common.serialization.StringSerializer  
    value-serializer: org.springframework.kafka.support.serializer.StringSerializer  
  consumer:  
    auto-offset-reset: latest  
    enable-auto-commit: false  
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
    value-deserializer: org.springframework.kafka.support.serializer.StringSerializer  
    properties:  
      spring.json.trusted.packages: "*"  
  • 기본 카프카 설정이지만 Pub/Sub 용 카프카 컨슈머는
    MessageListenerContainer에서
    auto-offset-reset: latest 설정을 직접할 것이다.

SeekToEndReblanaceListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.example.demo.global.config  
        
import org.apache.kafka.clients.consumer.Consumer  
import org.apache.kafka.common.TopicPartition  
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener  
import org.springframework.stereotype.Component  
        
@Component  
class SeekToEndRebalanceListener: ConsumerAwareRebalanceListener {  
  override fun onPartitionsAssigned(consumer: Consumer<*, *>, partitions: MutableCollection<TopicPartition>) {  
      // 파티션 할당 시, 해당 파티션의 오프셋을 끝으로 이동  
      partitions.forEach { partition ->  
          consumer.seekToEnd(listOf(partition))  
      }  
  }  
}  

KafkaMessageListenerConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.example.demo.global.config  
        
import org.apache.kafka.clients.consumer.ConsumerConfig  
import org.apache.kafka.common.serialization.StringDeserializer  
import org.springframework.beans.factory.annotation.Value  
import org.springframework.context.annotation.Configuration  
import org.springframework.kafka.core.DefaultKafkaConsumerFactory  
import org.springframework.kafka.listener.ContainerProperties  
import org.springframework.kafka.listener.KafkaMessageListenerContainer  
import org.springframework.kafka.listener.MessageListener  
        
@Configuration  
class KafkaMessageListenerConfig(  
  private val seekToEndRebalanceListener: SeekToEndRebalanceListener,  
) {  
        
  @Value("\${spring.kafka.bootstrap-servers}")  
  lateinit var bootstrapServers: String  
        
  fun createKafkaMessageListenerContainer(  
      topic: String,  
      groupId: String,  
      messageListener: MessageListener<String, String>  
  ): KafkaMessageListenerContainer<String, String> {  
      val consumerProps = mapOf<String, Any>(  
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,  
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,  
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,  
          ConsumerConfig.GROUP_ID_CONFIG to groupId,  
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest"  
      )  
        
      val consumerFactory = DefaultKafkaConsumerFactory<String, String>(consumerProps)  
        
      // 구독할 토픽을 동적으로 지정  
      val containerProperties = ContainerProperties(topic)  
        
      // 메시지를 수동으로 처리할 Listener  
      containerProperties.messageListener = messageListener  
        
      // 컨슈머 리밸런싱이 일어날 때, 항상 최신 오프셋에서 읽기 시작  
      containerProperties.setConsumerRebalanceListener(seekToEndRebalanceListener)  
        
      return KafkaMessageListenerContainer(consumerFactory, containerProperties)  
  }  
        
}  

프로듀서

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.example.demo.kafka.producer  
        
import com.example.demo.constant.KafkaTopic  
import org.springframework.kafka.core.KafkaTemplate  
import org.springframework.stereotype.Service  
        
        
@Service  
class UserProducer(  
  private val kafkaTemplate: KafkaTemplate<String, String>  
) {  
        
  fun send(message: String) {  
      kafkaTemplate.send(KafkaTopic.USER, message)  
  }  
        
}  

상수 토픽

1
2
3
4
5
package com.example.demo.constant  
        
object KafkaTopic {  
  const val USER = "user"  
}  

상수 컨슈머 그룹 ID

1
2
3
4
5
package com.example.demo.constant  
        
object KafkaConsumerGroupId {  
  const val USER = "user"  
}  

AppConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.demo.global.config  
        
import jakarta.annotation.PostConstruct  
import org.springframework.context.annotation.Configuration  
import java.io.BufferedReader  
import java.io.InputStreamReader  
import java.util.UUID  
        
@Configuration  
class AppConfig {  
  var containerId = UUID.randomUUID().toString()  
        
  @PostConstruct  
  private fun initContainerId() {  
      try {  
          // 'hostname' 명령어 실행을 위한 ProcessBuilder 사용  
          val processBuilder = ProcessBuilder("hostname")  
          val process = processBuilder.start()  
          val reader = BufferedReader(InputStreamReader(process.inputStream))  
          val containerIdString = reader.readLine()  // hostname 명령어의 결과  
        
          // hostname 값이 비어 있지 않으면 containerId 업데이트  
          if (!containerIdString.isNullOrBlank()) {  
              containerId = containerIdString  
          }  
      } catch (e: Exception) {  
          // 예외 발생 시 아무 일도 일어나지 않음  
          println("Failed to retrieve container ID. Using default value.")  
          e.printStackTrace()  
      }  
  }  
        
}  

컨슈머

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example.demo.kafka.consumer  
        
import com.example.constant.KafkaConsumerGroupId  
import com.example.constant.KafkaTopic  
import com.example.global.config.AppConfig  
import com.example.global.config.KafkaMessageListenerConfig  
import org.springframework.context.annotation.Bean  
import org.springframework.kafka.listener.KafkaMessageListenerContainer  
import org.springframework.kafka.listener.MessageListener  
import org.springframework.stereotype.Service  
        
@Service  
class UserConsumer(  
  private val kafkaMessageListenerConfig: KafkaMessageListenerConfig,  
  private val appConfig: AppConfig,  
) {  
        
  @Bean  
  fun userConsume(): KafkaMessageListenerContainer<String, String> {  
      val container = kafkaMessageListenerConfig.createKafkaMessageListenerContainer(  
          topic = KafkaTopic.USER,  
          groupId = "${KafkaConsumerGroupId.USER}-${appConfig.containerId}",  
          messageListener = messageListener()  
      )  
      container.start()  
        
      return container  
  }  
        
  private fun messageListener(): MessageListener<String, String> {  
      return MessageListener { record ->  
                    
          println("record++" + record)  
      }  
  }  
        
}  
  • 컨슈머 그룹 ID에 containerId를 포함시켜
    새 도커 컨테이너가 참여할 때마다 새로운 컨슈머 그룹으로 컨슈머가 생성된다.
  • @KafkaListener 기반이 아닌 직접 컨테이너를 생성하는 방식이므로, container.start()를 명시적으로 호출해 컨슈머를 시작해야 한다.
  • KafkaMessageListenerContainer를 @Bean으로 등록했기 때문에
    스프링 컨텍스트가 해당 빈의 라이프사이클을 관리하게 된다.
    따라서 스프링이 종료되면 userConsume 객체도 자동으로 종료된다.

참고

Spring.io - Seeking to a Specific Offset

Spring.io - Message Listener Conatiners

This post is licensed under CC BY 4.0 by the author.

Spring Kafka 컨슈머 배치 설정

DB를 활용한 리더선출락 구현