Posts 카프카 스트림이란?(kafka stream)
Post
Cancel

카프카 스트림이란?(kafka stream)

스트림이란?

  • 한 방향으로 흐르는 연속된 데이터 흐름[참고1[참고2]

카프카 스트림이란?

  • 카프카 스트림은 스트림을 처리하는 라이브러리이다.
  • 카프카 스트림은 소비자(consumer)에서 동작한다.
  • 예를 들면, ‘블로그 포스트 조회 수’ 같은 이벤트와 같이
    실시간으로 갱신되는 데이터를 처리할 수 있다.

카프카 스트림 특징

  • 카프카 스트림은 정확히 한 번(exactly once)만 데이터를 저장한다[참고3].
  • 하나의 카프카 스트림에도
    멀티 스레딩과 리커버리를 지원하기 때문에
    쉽게 대용량 처리가 가능하다[참고4]

카프카 스트림 구조

  • [참고5: streams_concepts_duality]에 자세히 설명되어 있다.
  • 크게 kStream과 kTable로 구성된다.
  • kStream
    • 각 시간 별 스트림 데이터이다.
    • 예를 들어 블로그 포스트 조회 수라면
      {“post_id”: 1}
  • kTable
    • 각 시간 별 스트림 데이터의 변경을 기록한다.
    • 예를 들어 변경을 합산(sum)이라고 생각하면 아래와 같이 동작한다.
      • time=1
        • kStream={“post_id”:1} -> kTable= {“post_id”:1, “summed”: 1}
      • time=2
        • kStream={“post_id”:1} -> kTable= {“post_id”:1, “summed”: 2}
      • time=3
        • kStream={“post_id”:2} -> kTable= [{“post_id”:1, “summed”: 2}, {“post_id”:2, “summed”: 1}]
  • 집계(aggregation)
    • kTable에서 ‘변경을 합산(sum)’이라고 설명한 부분이 집계이다.
    • kStream이 들어왔을 때 key를 기준으로
      원하는 집계 로직을 구축하면
      kTable에 반영된다.
  • kStream과 kTable의 join
    • kStream과 kTable 모두 join이 가능하다.

예시

  • 환경
    • java 21
    • spring boot 3.2.3
  • 설치
    1
    2
    
    implementation 'org.apache.kafka:kafka-streams'  
    implementation 'org.springframework.kafka:spring-kafka'  
    
  • 가정
    • “블로그 포스트 조회 수”를 기록하는 카프카 스트림을 구성한다.
    • 생산자에서 전송하는 메세지는 문자열이라고 가정한다.
    • 편의를 위하여 문자열은 “블로그 포스트 ID”라고 가정한다.
    • [참고6][참고7]
  • global/config/KafkaStreamsConfig.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    
    @Configuration  
    public class KafkaStreamsConfig {  
                
        @Value("${spring.kafka.producer.bootstrap-servers}")  
        private String bootstrapServers;  
                
        KafkaStreamsConfiguration kStreamsConfig() {  
            Map<String, Object> props = new HashMap<>();  
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");  
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());  
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());  
                
            return new KafkaStreamsConfiguration(props);  
        }  
                
        @Bean  
        FactoryBean<StreamsBuilder> countBlogViewsDSLBuilder() {  
            StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean();  
            return streamsBuilder;  
        }  
                    
        @Bean  
        KStream<String, String> countBlogViewsKStream(StreamsBuilder countBlogViewsDSLBuilder) {  
            // stream을 불러오는 부분, "count-blog-views" 토픽을 구독한다.  
            KStream<String, String> kStream = countBlogViewsDSLBuilder.stream("count-blog-views");  
                
            // 집계를 처리하는 부분  
            // "countBlogViews"는 Materialized view 이름  
            kStream  
                .selectKey((key, value) -> {  
                    return value.getPostId().replace("\"", "");  
                })  
                .groupByKey()  
                .count(Materialized.as("countBlogViews"))  
             // 집계를 완료한 kTable을 "count-blog-views-result" 토픽에 저장한다.  
            .toStream()  
            .to("count-blog-views-result");  
                
            return kStream;  
        }  
    }  
    
  • blog/BlogController.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    @RequestMapping(value = "/count-blog-views")  
    @RequiredArgsConstructor  
    @RestController  
    public class BlogController {  
        public final KafkaTemplate<String, String> kafkaTemplate;
        public final FactoryBean<StreamsBuilder> countBlogViewsDSLBuilder;  
                
        // kStream 생성자  
        @GetMapping(value = "/produce")  
        public String produceCountBlogViews(@RequestParam(value = "post-id") String postId) {  
            this.kafkaTemplate.send("count-blog-views", postId);  
                
            return "success";  
        }  
                
        // 집계 결과를 출력하는 함수  
        @GetMapping("/all")  
        public void printAllCountBlogViews() {  
            StreamsBuilderFactoryBean builder = (StreamsBuilderFactoryBean) countBlogViewsDSLBuilder;  
                
            KafkaStreams kafkaStreams = builder.getKafkaStreams();  
                
            // "countBlogViews"는 Materialized view 이름  
            ReadOnlyKeyValueStore<String, String> view = kafkaStreams  
                .store(StoreQueryParameters.fromNameAndType("countBlogViews", QueryableStoreTypes.keyValueStore()));  
                
            KeyValueIterator<String, String> address = view.all();  
            address.forEachRemaining(keyValue -> System.out.println("keyValue.toString()++" + keyValue.toString()));  
        }  
    }  
    

참고

This post is licensed under CC BY 4.0 by the author.

MultipartFile.getInputStream() java.nio.file.NoSuchFileException in async

카프카 스트림, JsonSerializer 사용하기