Posts 카프카 커넥트(kafka connect)로 mongoDB에 카프카 스트림 쓰기
Post
Cancel

카프카 커넥트(kafka connect)로 mongoDB에 카프카 스트림 쓰기

개요

  • 카프카 스트림의 취합된 결과를
    외부 스토리지에 저장하고 싶을 때가 있다.
  • 이때 결과가 담긴 토픽 데이터를
    카프카 커넥트(kafka connect)를 이용하여 다른 스토리지에 저장한다.
  • 카프카, mongodb, 카프카 커넥트를 도커 컴포즈로 구축한다.

카프카 커넥트

  • 카프카 커넥트는
    카프카 스트림 데이터를 다른 스토리지로 옮기거나(Sink Connector)
    다른 스토리지 데이터를 카프카로 옮길 수 있다(Source Connector).
  • 예시에서는 카프카 스트림 데이터를 mongoDB로 옮기는
    Sink Connector를 다룬다.

mongoDB Sink Connector driver

  • Sink Connector driver는 스토리지마다 별도로 존재한다.
    mongoDB는 confluent에서 Sink Connector를 제공한다.
  • mongoDB Sink Connector driver 다운로드
  • [압축 결과 폴더]/lib/mongo-kafka-connect-x.x.x-confluent.jar 가
    드라이버 파일이다.
  • 이 파일을 ./kafka/jar로 옮긴다.

카프카 커넥트 도커 이미지(docker image)

카프카 설정

  • 단일 노드를 사용한다고 가정한다.
  • 아파치 카프카 단일 노드 도커 허브 예시
  • 카프카 설정 파일 생성
    • kafka/config/file-inputs/server.properties
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      
      # Licensed to the Apache Software Foundation (ASF) under one or more  
      # contributor license agreements.  See the NOTICE file distributed with  
      # this work for additional information regarding copyright ownership.  
      # The ASF licenses this file to You under the Apache License, Version 2.0  
      # (the "License"); you may not use this file except in compliance with  
      # the License.  You may obtain a copy of the License at  
      #  
      #    http://www.apache.org/licenses/LICENSE-2.0  
      #  
      # Unless required by applicable law or agreed to in writing, software  
      # distributed under the License is distributed on an "AS IS" BASIS,  
      # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  
      # See the License for the specific language governing permissions and  
      # limitations under the License.  
                      
      advertised.listeners=PLAINTEXT_HOST://localhost:9092,SSL://localhost:9093,PLAINTEXT://broker:19092  
      controller.listener.names=CONTROLLER  
      group.initial.rebalance.delay.ms=0  
      inter.broker.listener.name=PLAINTEXT  
      listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT  
      log.dirs=/tmp/kraft-combined-logs  
      offsets.topic.replication.factor=1  
      process.roles=broker  
      ssl.client.auth=required  
      ssl.key.password=abcdefgh  
      ssl.keystore.location=/etc/kafka/secrets/kafka01.keystore.jks  
      ssl.keystore.password=abcdefgh  
      ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks  
      ssl.truststore.password=abcdefgh  
      transaction.state.log.min.isr=1  
      transaction.state.log.replication.factor=1  
      
    • kafka/config/secrets/kafka_keystore_creds
      1
      
      abcdefgh  
      
    • kafka/config/secrets/kafka_ssl_key_creds
      1
      
      abcdefgh  
      
    • kafka/config/secrets/kafka_truststore_creds
      1
      
      abcdefgh  
      
    • kafka/config/secrets/kafka.truststore.jks
    • kafka/config/secrets/kafka01.keystore.jks

docker-compose.yml 파일 작성

  • yml 파일
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    
    # Use root/example as user/password credentials  
    version: '3.8'  
                
    services:  
      kafka:  
        image: apache/kafka:3.7.0  
        restart: unless-stopped  
        hostname: broker  
        container_name: broker  
        ports:  
          - 9092:9092  
          - 9093:9093  
        volumes:  
          - ./kafka/config/secrets:/etc/kafka/secrets  
          - ./kafka/config/file-input:/mnt/shared/config  
        environment:  
          # Environment variables used by kafka scripts will be needed in case of File input.  
          CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'  
          # Set properties not provided in the file input  
          KAFKA_NODE_ID: 1  
          KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'  
          KAFKA_LISTENERS: 'PLAINTEXT_HOST://:9092,SSL://:9093,CONTROLLER://:29093,PLAINTEXT://:19092'  
          # Override an existing property  
          KAFKA_PROCESS_ROLES: 'broker,controller'  
                
                
      mongo:  
        image: mongo:jammy  
        restart: always  
        ports:  
          - 27017:27017  
        volumes:  
          - ./mongodb:/data/db  
        environment:  
          MONGO_INITDB_ROOT_USERNAME: mongodbuser  
          MONGO_INITDB_ROOT_PASSWORD: mongodbpw  
                
      kafka-connect:  
          image: confluentinc/cp-kafka-connect:7.6.0  
          ports:  
            - 8083:8083  
          depends_on:  
            - kafka  
            - mongo  
          environment:  
            CONNECT_BOOTSTRAP_SERVERS: broker:19092  
            CONNECT_REST_PORT: 8083  
            CONNECT_GROUP_ID: "quickstart-avro"  
            CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"  
            CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"  
            CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"  
            CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1  
            CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1  
            CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1  
            CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
            CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
            CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
            CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
            CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"  
            CONNECT_LOG4J_ROOT_LOGLEVEL: WARN  
            CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"  
          volumes:  
            - ./kafka-connect/jars:/etc/kafka-connect/jars  
                
                
    
  • 주의사항
    • kafka-connect.environment.CONNECT_BOOTSTRAP_SERVERS는
      카프카 KAFKA_LISTENERS의 PLAINTEXT 포트로 설정해야 한다.
    • kafka-connect.environment.CONNECT_PLUGIN_PATH는
      mongodb Sink Connector driver 파일 위치이다.

카프카 스트림 설정

  • 개요
  • 환경
    • java 21
    • spring boot 3.2.3
  • 설치
    1
    2
    
    implementation 'org.apache.kafka:kafka-streams'  
    implementation 'org.springframework.kafka:spring-kafka'  
    
  • expt1/Expt1.java
    1
    2
    3
    4
    5
    6
    7
    
      @Getter  
      @Setter  
      public class Expt1 {  
       public String name;  
       public String value;  
       public int count;  
      }  
    
  • expt1/Expt1Serde.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    import org.apache.kafka.common.serialization.Serdes.WrapperSerde;  
    import org.springframework.kafka.support.serializer.JsonDeserializer;  
    import org.springframework.kafka.support.serializer.JsonSerializer;  
                
    public class Expt1Serde extends WrapperSerde<Expt1> {  
        public Expt1Serde() {  
            super(new JsonSerializer<>(), new JsonDeserializer<>(Expt1.class));  
        }  
    }  
    
  • global/config/KafkaStreamsConfig.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    
    @Configuration  
    public class KafkaStreamsConfig {  
                
        @Value("${spring.kafka.producer.bootstrap-servers}")  
        private String bootstrapServers;  
                
        KafkaStreamsConfiguration kStreamsConfig(String applicationId, Object valueSerde) {  
            Map<String, Object> props = new HashMap<>();  
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);  
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());  
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde);  
                
            return new KafkaStreamsConfiguration(props);  
        }  
                
        @Bean("expt1DSLBuilder")  
        FactoryBean<StreamsBuilder> expt1DSLBuilder() {  
            StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(kStreamsConfig("expt1-id", Expt1.class));  
            return streamsBuilder;  
        }  
                    
        @Bean("expt1KStream")  
        KStream<String, Expt1> expt1KStream(@Qualifier("expt1DSLBuilder") StreamsBuilder expt1DSLBuilder) {  
            // stream을 불러오는 부분, "expt1" 토픽을 구독한다.  
            KStream<String, Expt1> kStream = expt1DSLBuilder.stream("expt1");  
                
            // 집계를 처리하는 부분  
            // "expt1"는 Materialized view 이름  
            kStream  
                .filter((key, value)-> value.getName() != null || value.getValue() != null)  
                .selectKey((key, value) -> {    
                    String myKey = value.getName() + "|" + value.getValue();  
                    return myKey.replace("\"", "");  
                })    
                .groupByKey()  
                .aggregate(  
                    new Initializer<Expt1>() {  
                        public Expt1 apply() {  
                            return new Expt1();  
                        }  
                    },  
                    new Aggregator<String, Expt1, Expt1>() {  
                        public Expt1 apply(String key, Expt1 value, Expt1 aggregate) {  
                            aggregate.setName(value.getName());  
                
                            aggregate.setValue(value.getValue());  
                                        
                            aggregate.setCount(aggregate.getCount() + 1);  
                                        
                            return aggregate;  
                        }  
                    },  
                    Materialized.as("expt1")  
                )  
             // 집계를 완료한 kTable을 "expt1-result" 토픽에 저장한다.  
            .toStream()  
            .to("expt1-result");  
                
            return kStream;  
        }  
    }  
    
    • kStream 집계 부분 설명
      • .filter
        • 최초 key, value에서 key는 null이다.
        • value에서 name과 value가 모두 존재하는지 확인한다.
        • 하나라도 존재하지 않으면 집계에서 제외한다.
      • .selectKey
        • key를 할당한다.
        • value에서 “${name}${value}” 문자열을 key로 할당한다.
      • .groupByKey
        • key를 기준으로 groupBy 한다.
      • .aggregate
        • groupBy 연산을 어떻게 할지 정의한다[참고3].

카프카 커넥트 설정

  • 개요
    • Rest API로 카프카 커넥트를 제어할 수 있다.
  • 동작 확인
    • 설명
      • 정상적으로 컨테이너가 실행 중이면 응답이 오고
        실행에 실패하면 timeout 에러가 난다.
    • 명령
      1
      
      curl --location 'http://localhost:8083/connectors'  
      
  • 설정 생성
    • 설명
      • name, value 필드가 같은 행은 갱신되도록 설정한다.
        [참고4 - Sink Connector Configuration Properties]
      • 주요 설정 설명
        • “topics”: “expt1-result”
          • 카프카 커넥트가 mongodb로 데이터를 가져올 토픽 명이다.
        • “value.converter”: “org.apache.kafka.connect.json.JsonConverter”
          • 카프카 스트림에서 Json 형태로 저장하고 있으므로
            JsonConverter를 사용한다.
        • “document.id.strategy.overwrite.existing”: true
          • 같은 키를 가진 행이 있으면 덮어쓰기 한다.
        • “document.id.strategy”: “com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy”
          • 여러 필드를 묶어서 키로 인식한다.
        • “document.id.strategy.partial.value.projection.list”: “name,value”
          “document.id.strategy.partial.value.projection.type”: “AllowList”,
          • name과 value 필드를 키로 인식한다.
        • “writemodel.strategy”: “com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy”
          • 쓰기 전략을 같은 키가 행이 있으면 교체
            없으면 삽입하도록 설정한다.
    • 명령
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      
      curl --location 'http://localhost:8083/connectors' \  
      --header 'Content-Type: application/json' \  
      --data-raw '{  
          "name": "mongo-sink-expt1",  
          "config": {  
              "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",  
              "connection.uri": "mongodb://mongodbuser:mongodbpw@mongo:27017/",  
              "tasks.max": "2",  
              "topics": "expt1-result",  
              "database": "expt",  
              "collection": "expt1",  
              "key.converter": "org.apache.kafka.connect.storage.StringConverter",  
              "value.converter": "org.apache.kafka.connect.json.JsonConverter",  
              "key.converter.schemas.enable": false,  
              "value.converter.schemas.enable": false,  
              "document.id.strategy.overwrite.existing": true,  
              "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",  
              "document.id.strategy.partial.value.projection.list": "name,value",  
              "document.id.strategy.partial.value.projection.type": "AllowList",  
              "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"  
          }  
      }'  
      
  • 설정 조회
    • 설명
      • 정상적으로 카프카 스트림을 mongodb로 쓰고 있는지 확인할 수 있다.
      • 에러가 발생할 경우 tasks 별 에러 로그를 확인할 수 있다.
    • 명령
      1
      
      curl --location 'http://localhost:8083/connectors/mongo-sink-expt1/status'  
      
  • 설정 삭제
    • 명령
      1
      
      curl --location --request DELETE 'http://localhost:8083/connectors/mongo-sink-expt1'  
      
  • 성공했을 경우
    • 카프카 스트림이 갱신되면
      mongodb의 expt database의 expt1 collection에
      문서가 작성된다.
    • name과 value가 같은 행이 존재하면 갱신한다.
    • mongoDB에서는 키로 사용하고 있는
      name과 value를 unique 키로 생성해서
      사용하는 것을 권장한다.

참고

This post is licensed under CC BY 4.0 by the author.