개요
- Spring Kafka의 @KafkaListener 어노테이션에서 concurrency 속성을 설정하면,
리스너 컨테이너가 해당 수만큼의 스레드를 생성하여
동일한 리스너 로직을 병렬로 실행하며 메시지를 처리한다.
가정
- 도커로 카프카 컨테이너 한 개를 띄운다.
- 스프링으로 카프카와 소통한다.
- 스프링 카프카 의존성 및 스프링 의존성 설치는 이미 된 상태이다.
설정
- application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13
spring: kafka: bootstrap-servers: "kafka:9092" producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer consumer: auto-offset-reset: latest enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: "*"
- 토픽 생성
- partition 3개짜리 user 토픽을 생성한다.
1 2 3 4
kafka-topics.sh --create \ --bootstrap-server kafka:9092 \ --topic user \ --partitions 3
- partition 3개짜리 user 토픽을 생성한다.
- 컨슈머
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@Component class UserConsumer { @KafkaListener( topics = [KafkaTopic.USER], groupId = "user", concurrency = "3", // 리스너를 실행할 스레드 수 ) fun consume(record: ConsumerRecord<String, String>, acknowledgment: Acknowledgment) { Thread.sleep(2000) println("record++" + record) acknowledgment.acknowledge() } }
- 프로듀서
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
package com.example.demo.kafka.producer import com.example.demo.constant.KafkaTopic import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Service @Service class UserProducer( private val kafkaTemplate: KafkaTemplate<String, String> ) { fun send(message: String) { kafkaTemplate.send(KafkaTopic.USER, message) } }
- 컨트롤러
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
package com.example.demo.controller import com.example.demo.kafka.producer.UserProducer import io.swagger.v3.oas.annotations.Operation import org.springframework.http.HttpStatus import org.springframework.http.MediaType import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.ResponseStatus import org.springframework.web.bind.annotation.RestController import java.io.IOException @RestController class UserController( private val userProducer: UserProducer, ) { @Operation( summary = "user 메세지 생성 및 처리", description = "user 메세지 생성 및 처리", tags = ["user"] ) @ResponseStatus(HttpStatus.OK) @Throws(IOException::class) @GetMapping(value = ["/user"], produces = [MediaType.APPLICATION_JSON_VALUE]) fun user(): ResponseEntity<String> { userProducer.send("hello world message") return ResponseEntity.ok().body( "success" ) } }
실험
- 카프카 메세지 생성 및 소비 명령
1
curl http://localhost:8080/user
- 카프카 파티션에 할당된 스레드 확인
- 명령
1 2 3 4
kafka-consumer-groups.sh \ --bootstrap-server kafka:9092 \ --describe \ --group user
- 응답
1 2 3 4
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID user user 0 8 8 0 consumer-user-10-8ae274db-3b0f-4cd2-bb7a-9dc0c5c5cfc9 /172.18.0.1 consumer-user-10 user user 2 4 4 0 consumer-user-9-c4bda59a-ac70-41f4-8925-01497cb6202c /172.18.0.1 consumer-user-9 user user 1 0 0 0 consumer-user-8-4e721ce9-b567-4f97-81c6-3f1877cbb76f /172.18.0.1 consumer-user-8
- 명령
- 결과
- 3개의 파티션 모두 다른 CUNSUMER-ID가 할당된 것을 볼 수 있다.
의문1 - 하나의 파티션만 CURRENT-OFFSET이 증가하는 이유?
- 균일한 속도로 카프카 메세지를 생성하다보면
하나의 파티션만 CURRENT-OFFSET이 증가하는 것을 볼 수 있다. - 이는 기본 파티션 분배 정책이 스티키(sticky) 때문이다.
- 프로듀서에서 메시지 키를 입력하지 않은 경우,
스티키 파티셔너는 새로운 메시지가 이전 메시지를
소비했던 컨슈머로 전달되도록 같은 파티션에 넣는다. - 이를 통해 동일한 파티션에 메시지를 모아 더 큰 배치를 생성하여
전송 횟수와 네트워크 오버헤드를 감소시켜
결과적으로 지연 시간을 단축시킨다. - 실제로 실험해보면 라운드로빈 설정보다
스티키 설정이 전체 처리 시간이 더 짧게 나타난다.
의문2 -스티키 설정 시 나머지 파티션은 사용되지 않는가?
- 아니다. 스티키도 일정 조건이 되면 다른 파티션으로 메세지를 전송한다.
- 프로듀서의 메시지 버퍼가 꽉 차거나 linger.ms 시간(배치 전송 지연시간)이
만료되면 현재 배치를 전송하고 새로운 파티션을 선택한다. - 따라서 트래픽이 많아 프로듀서에 메시지가 많이 전달되면
메시지 버퍼가 더 빠르게 차게 되고,
이 경우 다른 파티션으로 전달하는 일이 잦아져
결국 메시지가 파티션에 고루 분배된다.
의문3 - 그럼 특정 파티션으로 메세지를 보내고 싶으면 어떻게 하는가?
- 프로듀서에서 메세지 키를 입력하면 된다.
- 동일한 메세지 키를 가진 메세지는 해당 파티션으로 전달된다.
- 반대로 매번 새로운 파티션으로 보내고 싶다면
메세지 키를 랜덤으로 설정하면 된다(라운드로빈).1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// 프로듀서 @Service class UserProducer( private val kafkaTemplate: KafkaTemplate<String, String> ) { fun send(message: String) { // 랜덤 키 생성 val key = UUID.randomUUID().toString() // 랜덤 키 추가 kafkaTemplate.send(KafkaTopic.USER, key, message) } }
의문4 - 파티션 수가 3, 컨슈머 수가 1이라면?
- 3개의 파티션을 1개의 컨슈머가 한 번씩 파티션을 순회하며 처리한다.
1 2 3 4
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID user user 0 8 8 0 consumer-user-8-540afe8a-734e-439c-a7d4-e8fe84088068 /172.18.0.1 consumer-user-8 user user 2 4 4 0 consumer-user-8-540afe8a-734e-439c-a7d4-e8fe84088068 /172.18.0.1 consumer-user-8 user user 1 0 0 0 consumer-user-8-540afe8a-734e-439c-a7d4-e8fe84088068 /172.18.0.1 consumer-user-8
- 새로운 메세지가 생성되지 않는 상태에서
3개의 파티션에 골고루 메세지가 쌓여있는 상황이라면
결과적으로 1개의 컨슈머가 순회하면서 모두 처리한다.