Posts Spring Kafka 컨슈머 배치 설정
Post
Cancel

Spring Kafka 컨슈머 배치 설정

개요

  • 컨슈머에서 폴링 시 배치 단위로 가져오게 할 수 있다.
  • 이를 통하여 처리 성능을 높일 수 있다.

설정

  • application.yml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    
    spring:  
      kafka:  
        bootstrap-servers: "kafka:9092"  
        producer:  
          key-serializer: org.apache.kafka.common.serialization.StringSerializer  
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer  
        consumer:  
          auto-offset-reset: latest  
          enable-auto-commit: true  
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer  
          # | -- 추가 시작 -- |  
          max-poll-records: 20  
          # | -- 추가 끝 -- |  
          properties:  
            spring.json.trusted.packages: "*"  
    
  • KafkaConfig
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    
    package com.example.demo.config  
                
    import org.apache.kafka.common.TopicPartition  
    import org.springframework.context.annotation.Bean  
    import org.springframework.context.annotation.Configuration  
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory  
    import org.springframework.kafka.core.ConsumerFactory  
    import org.springframework.kafka.core.KafkaTemplate  
    import org.springframework.kafka.listener.ContainerProperties  
    import org.springframework.kafka.listener.DeadLetterPublishingRecoverer  
    import org.springframework.kafka.listener.DefaultErrorHandler  
    import org.springframework.util.backoff.FixedBackOff  
                
    @Configuration  
    class KafkaConfig {  
                
        @Bean(name = ["batchKafkaListenerContainerFactory"])  
        fun batchKafkaListenerContainerFactory(  
            consumerFactory: ConsumerFactory<String, String>  
        ): ConcurrentKafkaListenerContainerFactory<String, String> {  
            val factory = ConcurrentKafkaListenerContainerFactory<String, String>()  
            factory.consumerFactory = consumerFactory  
            factory.isBatchListener = true  
                
            return factory  
        }  
    }  
    
  • 프로듀서
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    
    package com.example.demo.kafka.producer  
                
    import com.example.demo.constant.KafkaTopic  
    import org.springframework.kafka.core.KafkaTemplate  
    import org.springframework.stereotype.Service  
                
                
    @Service  
    class UserProducer(  
        private val kafkaTemplate: KafkaTemplate<String, String>  
    ) {  
                
        fun send(message: String) {  
            kafkaTemplate.send(KafkaTopic.USER, message)  
        }  
                
    }  
    
  • 컨슈머
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    package com.example.demo.kafka.consumer  
                
    import com.example.demo.constant.KafkaTopic  
    import org.springframework.kafka.annotation.KafkaListener  
    import org.springframework.kafka.support.Acknowledgment  
    import org.springframework.stereotype.Component  
                
    @Component  
    class UserConsumer {  
                
        @KafkaListener(  
            topics = [KafkaTopic.USER],  
            groupId = "user",  
            containerFactory = "batchKafkaListenerContainerFactory"  
        )  
        fun consume(record: List<String>) {  
            Thread.sleep(2000)  
            println("record++" + record)  
        }  
    }  
    
  • 컨트롤러
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    
    package com.example.demo.controller  
                
    import com.example.demo.kafka.producer.UserProducer  
    import io.swagger.v3.oas.annotations.Operation  
    import org.springframework.http.HttpStatus  
    import org.springframework.http.MediaType  
    import org.springframework.http.ResponseEntity  
    import org.springframework.web.bind.annotation.GetMapping  
    import org.springframework.web.bind.annotation.ResponseStatus  
    import org.springframework.web.bind.annotation.RestController  
    import java.io.IOException  
                
    @RestController  
    class UserController(  
        private val userProducer: UserProducer,  
    ) {  
                
        @Operation(  
            summary = "user 메세지 생성 및 처리",  
            description = "user 메세지 생성 및 처리",  
            tags = ["user"]  
        )  
        @ResponseStatus(HttpStatus.OK)  
        @Throws(IOException::class)  
        @GetMapping(value = ["/user"], produces = [MediaType.APPLICATION_JSON_VALUE])  
        fun user(): ResponseEntity<String> {  
                
            userProducer.send("hello world message")  
                
            return ResponseEntity.ok().body(  
                "success"  
            )  
        }  
    }  
                
    
  • 카프카 메세지 생성 및 소비 명령
    1
    
    curl http://localhost:8080/user  
    

결과

  • 배치 단위로 잘 넘어온다.
    1
    2
    3
    4
    5
    6
    7
    8
    
    record++[hello world message]  
    record++[hello world message, hello world message]  
    record++[hello world message, hello world message, hello world message, hello world message, hello world message, hello world message, hello world message]  
    record++[hello world message, hello world message, hello world message, hello world message, hello world message, hello world message, hello world message, hello world message]  
    record++[hello world message, hello world message, hello world message, hello world message, hello world message, hello world message, hello world message, hello world message]  
    record++[hello world message, hello world message, hello world message, hello world message, hello world message, hello world message, hello world message, hello world message, hello world message]  
    record++[hello world message, hello world message, hello world message, hello world message]  
                
    
  • max.poll.records=20은 최대값일 뿐이므로
    적절한 규모로 배치가 넘어오는 것을 확인할 수 있다.
This post is licensed under CC BY 4.0 by the author.

Spring Kafka 컨슈머 수동 커밋 설정

Spring Kafka에서 최신 메시지만 소비하는 Pub/Sub 방식 설정