스트림이란?
카프카 스트림이란?
- 카프카 스트림은 스트림을 처리하는 라이브러리이다.
- 카프카 스트림은 소비자(consumer)에서 동작한다.
- 예를 들면, ‘블로그 포스트 조회 수’ 같은 이벤트와 같이
실시간으로 갱신되는 데이터를 처리할 수 있다.
카프카 스트림 특징
- 카프카 스트림은 정확히 한 번(exactly once)만 데이터를 저장한다[참고3].
- 하나의 카프카 스트림에도
멀티 스레딩과 리커버리를 지원하기 때문에
쉽게 대용량 처리가 가능하다[참고4]
카프카 스트림 구조
- [참고5: streams_concepts_duality]에 자세히 설명되어 있다.
- 크게 kStream과 kTable로 구성된다.
- kStream
- 각 시간 별 스트림 데이터이다.
- 예를 들어 블로그 포스트 조회 수라면
{“post_id”: 1}
- kTable
- 각 시간 별 스트림 데이터의 변경을 기록한다.
- 예를 들어 변경을 합산(sum)이라고 생각하면 아래와 같이 동작한다.
- time=1
- kStream={“post_id”:1} -> kTable= {“post_id”:1, “summed”: 1}
- time=2
- kStream={“post_id”:1} -> kTable= {“post_id”:1, “summed”: 2}
- time=3
- kStream={“post_id”:2} -> kTable= [{“post_id”:1, “summed”: 2}, {“post_id”:2, “summed”: 1}]
- time=1
- 집계(aggregation)
- kTable에서 ‘변경을 합산(sum)’이라고 설명한 부분이 집계이다.
- kStream이 들어왔을 때 key를 기준으로
원하는 집계 로직을 구축하면
kTable에 반영된다.
- kStream과 kTable의 join
- kStream과 kTable 모두 join이 가능하다.
예시
- 환경
- java 21
- spring boot 3.2.3
- 설치
1 2
implementation 'org.apache.kafka:kafka-streams' implementation 'org.springframework.kafka:spring-kafka'
- 가정
- global/config/KafkaStreamsConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
@Configuration public class KafkaStreamsConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; KafkaStreamsConfiguration kStreamsConfig() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration(props); } @Bean FactoryBean<StreamsBuilder> countBlogViewsDSLBuilder() { StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(); return streamsBuilder; } @Bean KStream<String, String> countBlogViewsKStream(StreamsBuilder countBlogViewsDSLBuilder) { // stream을 불러오는 부분, "count-blog-views" 토픽을 구독한다. KStream<String, String> kStream = countBlogViewsDSLBuilder.stream("count-blog-views"); // 집계를 처리하는 부분 // "countBlogViews"는 Materialized view 이름 kStream .selectKey((key, value) -> { return value.getPostId().replace("\"", ""); }) .groupByKey() .count(Materialized.as("countBlogViews")) // 집계를 완료한 kTable을 "count-blog-views-result" 토픽에 저장한다. .toStream() .to("count-blog-views-result"); return kStream; } }
- blog/BlogController.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
@RequestMapping(value = "/count-blog-views") @RequiredArgsConstructor @RestController public class BlogController { public final KafkaTemplate<String, String> kafkaTemplate; public final FactoryBean<StreamsBuilder> countBlogViewsDSLBuilder; // kStream 생성자 @GetMapping(value = "/produce") public String produceCountBlogViews(@RequestParam(value = "post-id") String postId) { this.kafkaTemplate.send("count-blog-views", postId); return "success"; } // 집계 결과를 출력하는 함수 @GetMapping("/all") public void printAllCountBlogViews() { StreamsBuilderFactoryBean builder = (StreamsBuilderFactoryBean) countBlogViewsDSLBuilder; KafkaStreams kafkaStreams = builder.getKafkaStreams(); // "countBlogViews"는 Materialized view 이름 ReadOnlyKeyValueStore<String, String> view = kafkaStreams .store(StoreQueryParameters.fromNameAndType("countBlogViews", QueryableStoreTypes.keyValueStore())); KeyValueIterator<String, String> address = view.all(); address.forEachRemaining(keyValue -> System.out.println("keyValue.toString()++" + keyValue.toString())); } }