Posts 카프카 커넥트(kafka connect)로 mongoDB에 카프카 스트림 쓰기
Post
Cancel

카프카 커넥트(kafka connect)로 mongoDB에 카프카 스트림 쓰기

개요

  • 카프카 스트림의 취합된 결과를
    외부 스토리지에 저장하고 싶을 때가 있다.
  • 이때 결과가 담긴 토픽 데이터를
    카프카 커넥트(kafka connect)를 이용하여 다른 스토리지에 저장한다.
  • 카프카, mongodb, 카프카 커넥트를 도커 컴포즈로 구축한다.

카프카 커넥트

  • 카프카 커넥트는
    카프카 스트림 데이터를 다른 스토리지로 옮기거나(Sink Connector)
    다른 스토리지 데이터를 카프카로 옮길 수 있다(Source Connector).
  • 예시에서는 카프카 스트림 데이터를 mongoDB로 옮기는
    Sink Connector를 다룬다.

mongoDB Sink Connector driver

  • Sink Connector driver는 스토리지마다 별도로 존재한다.
    mongoDB는 confluent에서 Sink Connector를 제공한다.
  • mongoDB Sink Connector driver 다운로드
  • [압축 결과 폴더]/lib/mongo-kafka-connect-x.x.x-confluent.jar 가
    드라이버 파일이다.
  • 이 파일을 ./kafka/jar로 옮긴다.

카프카 커넥트 도커 이미지(docker image)

카프카 설정

  • 단일 노드를 사용한다고 가정한다.
  • 아파치 카프카 단일 노드 도커 허브 예시
  • 카프카 설정 파일 생성
    • kafka/config/file-inputs/server.properties
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      
      # Licensed to the Apache Software Foundation (ASF) under one or more  
      # contributor license agreements.  See the NOTICE file distributed with  
      # this work for additional information regarding copyright ownership.  
      # The ASF licenses this file to You under the Apache License, Version 2.0  
      # (the "License"); you may not use this file except in compliance with  
      # the License.  You may obtain a copy of the License at  
      #  
      #    http://www.apache.org/licenses/LICENSE-2.0  
      #  
      # Unless required by applicable law or agreed to in writing, software  
      # distributed under the License is distributed on an "AS IS" BASIS,  
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
      # See the License for the specific language governing permissions and  
      # limitations under the License.  
                      
      advertised.listeners=PLAINTEXT_HOST://localhost:9092,SSL://localhost:9093,PLAINTEXT://broker:19092  
      controller.listener.names=CONTROLLER  
      group.initial.rebalance.delay.ms=0  
      inter.broker.listener.name=PLAINTEXT  
      listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
      log.dirs=/tmp/kraft-combined-logs  
      offsets.topic.replication.factor=1  
      process.roles=broker  
      ssl.client.auth=required  
      ssl.key.password=abcdefgh  
      ssl.keystore.location=/etc/kafka/secrets/kafka01.keystore.jks  
      ssl.keystore.password=abcdefgh  
      ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks  
      ssl.truststore.password=abcdefgh  
      transaction.state.log.min.isr=1  
      transaction.state.log.replication.factor=1  
      
    • kafka/config/secrets/kafka_keystore_creds
      1
      
      abcdefgh  
      
    • kafka/config/secrets/kafka_ssl_key_creds
      1
      
      abcdefgh  
      
    • kafka/config/secrets/kafka_truststore_creds
      1
      
      abcdefgh  
      
    • kafka/config/secrets/kafka.truststore.jks
    • kafka/config/secrets/kafka01.keystore.jks

docker-compose.yml 파일 작성

  • yml 파일
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    
    # Use root/example as user/password credentials  
    version: '3.8'  
                
    services:  
      kafka:  
        image: apache/kafka:3.7.0  
        restart: unless-stopped  
        hostname: broker  
        container_name: broker  
        ports:  
          - 9092:9092  
          - 9093:9093  
        volumes:  
          - ./kafka/config/secrets:/etc/kafka/secrets  
          - ./kafka/config/file-input:/mnt/shared/config  
        environment:  
          # Environment variables used by kafka scripts will be needed in case of File input.  
          CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'  
          # Set properties not provided in the file input  
          KAFKA_NODE_ID: 1  
          KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'  
          KAFKA_LISTENERS: 'PLAINTEXT_HOST://:9092,SSL://:9093,CONTROLLER://:29093,PLAINTEXT://:19092'  
          # Override an existing property  
          KAFKA_PROCESS_ROLES: 'broker,controller'  
                
                
      mongo:  
        image: mongo:jammy  
        restart: always  
        ports:  
          - 27017:27017  
        volumes:  
          - ./mongodb:/data/db  
        environment:  
          MONGO_INITDB_ROOT_USERNAME: mongodbuser  
          MONGO_INITDB_ROOT_PASSWORD: mongodbpw  
                
      kafka-connect:  
          image: confluentinc/cp-kafka-connect:7.6.0  
          ports:  
            - 8083:8083  
          depends_on:  
            - kafka  
            - mongo  
          environment:  
            CONNECT_BOOTSTRAP_SERVERS: broker:19092  
            CONNECT_REST_PORT: 8083  
            CONNECT_GROUP_ID: "quickstart-avro"  
            CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"  
            CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"  
            CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"  
            CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1  
            CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1  
            CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1  
            CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
            CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
            CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
            CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
            CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"  
            CONNECT_LOG4J_ROOT_LOGLEVEL: WARN  
            CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"  
          volumes:  
            - ./kafka-connect/jars:/etc/kafka-connect/jars  
                
                
    
  • 주의사항
    • kafka-connect.environment.CONNECT_BOOTSTRAP_SERVERS는
      카프카 KAFKA_LISTENERS의 PLAINTEXT 포트로 설정해야 한다.
    • kafka-connect.environment.CONNECT_PLUGIN_PATH는
      mongodb Sink Connector driver 파일 위치이다.

카프카 스트림 설정

  • 개요
  • 환경
    • java 21
    • spring boot 3.2.3
  • 설치
    1
    2
    
    implementation 'org.apache.kafka:kafka-streams'  
    implementation 'org.springframework.kafka:spring-kafka'  
    
  • expt1/Expt1.java
    1
    2
    3
    4
    5
    6
    7
    
      @Getter  
      @Setter  
      public class Expt1 {  
       public String name;  
       public String value;  
       public int count;  
      }  
    
  • expt1/Expt1Serde.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    import org.apache.kafka.common.serialization.Serdes.WrapperSerde;  
    import org.springframework.kafka.support.serializer.JsonDeserializer;  
    import org.springframework.kafka.support.serializer.JsonSerializer;  
                
    public class Expt1Serde extends WrapperSerde<Expt1> {  
        public Expt1Serde() {  
            super(new JsonSerializer<>(), new JsonDeserializer<>(Expt1.class));  
        }  
    }  
    
  • global/config/KafkaStreamsConfig.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    
    @Configuration  
    public class KafkaStreamsConfig {  
                
        @Value("${spring.kafka.producer.bootstrap-servers}")  
        private String bootstrapServers;  
                
        KafkaStreamsConfiguration kStreamsConfig(String applicationId, Object valueSerde) {  
            Map<String, Object> props = new HashMap<>();  
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);  
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());  
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde);  
                
            return new KafkaStreamsConfiguration(props);  
        }  
                
        @Bean("expt1DSLBuilder")  
        FactoryBean<StreamsBuilder> expt1DSLBuilder() {  
            StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(kStreamsConfig("expt1-id", Expt1.class));  
            return streamsBuilder;  
        }  
                    
        @Bean("expt1KStream")  
        KStream<String, Expt1> expt1KStream(@Qualifier("expt1DSLBuilder") StreamsBuilder expt1DSLBuilder) {  
            // stream을 불러오는 부분, "expt1" 토픽을 구독한다.  
            KStream<String, Expt1> kStream = expt1DSLBuilder.stream("expt1");  
                
            // 집계를 처리하는 부분  
            // "expt1"는 Materialized view 이름  
            kStream  
                .filter((key, value)-> value.getName() != null || value.getValue() != null)  
                .selectKey((key, value) -> {    
                    String myKey = value.getName() + "|" + value.getValue();  
                    return myKey.replace("\"", "");  
                })    
                .groupByKey()  
                .aggregate(  
                    new Initializer<Expt1>() {  
                        public Expt1 apply() {  
                            return new Expt1();  
                        }  
                    },  
                    new Aggregator<String, Expt1, Expt1>() {  
                        public Expt1 apply(String key, Expt1 value, Expt1 aggregate) {  
                            aggregate.setName(value.getName());  
                
                            aggregate.setValue(value.getValue());  
                                        
                            aggregate.setCount(aggregate.getCount() + 1);  
                                        
                            return aggregate;  
                        }  
                    },  
                    Materialized.as("expt1")  
                )  
             // 집계를 완료한 kTable을 "expt1-result" 토픽에 저장한다.  
            .toStream()  
            .to("expt1-result");  
                
            return kStream;  
        }  
    }  
    
    • kStream 집계 부분 설명
      • .filter
        • 최초 key, value에서 key는 null이다.
        • value에서 name과 value가 모두 존재하는지 확인한다.
        • 하나라도 존재하지 않으면 집계에서 제외한다.
      • .selectKey
        • key를 할당한다.
        • value에서 “${name}${value}” 문자열을 key로 할당한다.
      • .groupByKey
        • key를 기준으로 groupBy 한다.
      • .aggregate
        • groupBy 연산을 어떻게 할지 정의한다[참고3].

카프카 커넥트 설정

  • 개요
    • Rest API로 카프카 커넥트를 제어할 수 있다.
  • 동작 확인
    • 설명
      • 정상적으로 컨테이너가 실행 중이면 응답이 오고
        실행에 실패하면 timeout 에러가 난다.
    • 명령
      1
      
      curl --location 'http://localhost:8083/connectors'  
      
  • 설정 생성
    • 설명
      • name, value 필드가 같은 행은 갱신되도록 설정한다.
        [참고4 - Sink Connector Configuration Properties]
      • 주요 설정 설명
        • “topics”: “expt1-result”
          • 카프카 커넥트가 mongodb로 데이터를 가져올 토픽 명이다.
        • “value.converter”: “org.apache.kafka.connect.json.JsonConverter”
          • 카프카 스트림에서 Json 형태로 저장하고 있으므로
            JsonConverter를 사용한다.
        • “document.id.strategy.overwrite.existing”: true
          • 같은 키를 가진 행이 있으면 덮어쓰기 한다.
        • “document.id.strategy”: “com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”
          • 여러 필드를 묶어서 키로 인식한다.
        • “document.id.strategy.partial.value.projection.list”: “name,value”
          “document.id.strategy.partial.value.projection.type”: “AllowList”,
          • name과 value 필드를 키로 인식한다.
        • “writemodel.strategy”: “com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy”
          • 쓰기 전략을 같은 키가 행이 있으면 교체
            없으면 삽입하도록 설정한다.
    • 명령
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      
      curl --location 'http://localhost:8083/connectors' \  
      --header 'Content-Type: application/json' \  
      --data-raw '{  
          "name": "mongo-sink-expt1",  
          "config": {  
              "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",  
              "connection.uri": "mongodb://mongodbuser:mongodbpw@mongo:27017/",  
              "tasks.max": "2",  
              "topics": "expt1-result",  
              "database": "expt",  
              "collection": "expt1",  
              "key.converter": "org.apache.kafka.connect.storage.StringConverter",  
              "value.converter": "org.apache.kafka.connect.json.JsonConverter",  
              "key.converter.schemas.enable": false,  
              "value.converter.schemas.enable": false,  
              "document.id.strategy.overwrite.existing": true,  
              "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",  
              "document.id.strategy.partial.value.projection.list": "name,value",  
              "document.id.strategy.partial.value.projection.type": "AllowList",  
              "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"  
          }  
      }'  
      
  • 설정 조회
    • 설명
      • 정상적으로 카프카 스트림을 mongodb로 쓰고 있는지 확인할 수 있다.
      • 에러가 발생할 경우 tasks 별 에러 로그를 확인할 수 있다.
    • 명령
      1
      
      curl --location 'http://localhost:8083/connectors/mongo-sink-expt1/status'  
      
  • 설정 삭제
    • 명령
      1
      
      curl --location --request DELETE 'http://localhost:8083/connectors/mongo-sink-expt1'  
      
  • 성공했을 경우
    • 카프카 스트림이 갱신되면
      mongodb의 expt database의 expt1 collection에
      문서가 작성된다.
    • name과 value가 같은 행이 존재하면 갱신한다.
    • mongoDB에서는 키로 사용하고 있는
      name과 value를 unique 키로 생성해서
      사용하는 것을 권장한다.

참고

This post is licensed under CC BY 4.0 by the author.

여러 개의 카프카 스트림 사용하기

카프카 커넥트(kafka connect), json 스키마 사용하기