개요
- 생산자(procuder)와 카프카 스트림(kafka streams)이
서로 Json으로 메세지를 주고 받도록 한다[참고1]. - 커스텀 Serde를 선언하고
카프카 스트림 설정의 value_serde에 등록한다.
Serde란?
- Serde란 Serializer<>와 Deserializer<> 담긴 Wrapper 이다[참고2].
- Serializer는 생산자가 메시지를 Serialization 할 때 사용한다.
- Deserializer는 소비자가 메세지를 Deserialization 할 때 사용한다.
예시
- 환경
- java 21
- spring boot 3.2.3
- 설치
1 2
implementation 'org.apache.kafka:kafka-streams' implementation 'org.springframework.kafka:spring-kafka'
- 가정
- “블로그 포스트 조회 수”를 기록하는 카프카 스트림을 구성한다.
- 생산자의 메세지에는 postId와 userId가 포함된다.
- blog/CountBlogViewMessage.java
1 2 3 4 5 6
@Getter @Setter public class CountBlogViewMessage { public String postId; public String userId; }
- blog/CountBlogViewsSerde.java
1 2 3 4 5 6 7 8 9
import org.apache.kafka.common.serialization.Serdes.WrapperSerde; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; public class CountBlogViewsSerde extends WrapperSerde<CountBlogViewMessage > { public CountBlogViewsSerde() { super(new JsonSerializer<>(), new JsonDeserializer<>(CountBlogViewMessage .class)); } }
- global/config/KafkaStreamsConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
@Configuration public class KafkaStreamsConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; KafkaStreamsConfiguration kStreamsConfig(String applicationId, Object valueSerde) { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde); return new KafkaStreamsConfiguration(props); } @Bean FactoryBean<StreamsBuilder> countBlogViewsDSLBuilder() { StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(kStreamsConfig("count-blog-views-id", CountBlogViewsSerde.class)); return streamsBuilder; } @Bean KStream<String, CountBlogViewMessage> countBlogViewsKStream(StreamsBuilder countBlogViewsDSLBuilder) { // stream을 불러오는 부분, "count-blog-views" 토픽을 구독한다. KStream<String, CountBlogViewMessage> kStream = countBlogViewsDSLBuilder.stream("count-blog-views"); // 집계를 처리하는 부분 // "countBlogViews"는 Materialized view 이름 kStream .filter((key, value)-> value.getPostId() != null || value.getUserId() != null) .selectKey((key, value) -> { String myKey = value.getPostId() + "|" + value.getUserId(); return myKey.replace("\"", ""); }) .groupByKey() .count(Materialized.as("countBlogViews")) // 집계를 완료한 kTable을 "count-blog-views-result" 토픽에 저장한다. .toStream() .to("count-blog-views-result"); return kStream; } }
- blog/BlogController.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
@RequestMapping(value = "/count-blog-views") @RequiredArgsConstructor @RestController public class BlogController { public final KafkaTemplate<String, Object> kafkaTemplate; public final FactoryBean<StreamsBuilder> countBlogViewsDSLBuilder; // kStream 생성자 @GetMapping(value = "/produce") public String produceCountBlogViews(@RequestBody CountBlogViewMessage countBlogViewMessage) { this.kafkaTemplate.send("count-blog-views", countBlogViewMessage); return "success"; } // 집계 결과를 출력하는 함수 @GetMapping("/all") public void printAllCountBlogViews() { StreamsBuilderFactoryBean builder = (StreamsBuilderFactoryBean) countBlogViewsDSLBuilder; KafkaStreams kafkaStreams = builder.getKafkaStreams(); // "countBlogViews"는 Materialized view 이름 ReadOnlyKeyValueStore<String, CountBlogViewMessage> view = kafkaStreams .store(StoreQueryParameters.fromNameAndType("countBlogViews", QueryableStoreTypes.keyValueStore())); KeyValueIterator<String, CountBlogViewMessage> address = view.all(); address.forEachRemaining(keyValue -> System.out.println("keyValue.toString()++" + keyValue.toString())); } }