Posts mongoDB Sink Connector, FullKeyStrategy id-strategy 사용하기
Post
Cancel

mongoDB Sink Connector, FullKeyStrategy id-strategy 사용하기

개요

  • PartialValueStrategy 사용할 때 Custom Post Processor를 사용하는 경우,
    키로 사용되는 필드가 Post Processing 대상이 되면
    ReplaceOneBusinessKeyStrategy이 동작하지 않는다.
  • id-strategy와 상관없이 Post Processing을 적용하기 쉽도록
    메세지 키를 이용하여 문서 _id를 만드는 전략인 FullKeyStrategy를 사용한다.

FullKeyStrategy

  • 메세지의 키에서 _id 필드만 추출하여 그 값을 키로 사용하는 전략이다.
  • 메세지 예시
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    SinkRecord{  
        kafkaOffset=2,   
        timestampType=CreateTime,   
        originalTopic=user-result,   
        originalKafkaPartition=0,   
        originalKafkaOffset=2  
    }   
                
    ConnectRecord{  
        topic='expt1-result',   
        kafkaPartition=0,   
        key={'_id':"my-key"},   
        keySchema=Schema{STRING},   
        value={  
            value=my-value,   
            name=my-name,  
            count=17  
        },   
        valueSchema=null,   
        timestamp=1712566487462,   
        headers=ConnectHeaders(  
            headers=[  
                ConnectHeader(  
                    key=__TypeId__,   
                    value=com.myapp.api.expt1.Expt1,   
                    schema=Schema{STRING}  
                )  
            ]  
        )  
    }  
    
  • [참고]

카프카, 카프카 스트림, 카프카 커넥트 설정

카프카 스트림 설정

  • 설명
    • selectKey에서 {“_id”: “my-key”}와 같이
      json 문자열을 키로 설정해야한다.
  • global/config/KafkaStreamsConfig.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    
    @Configuration    
    public class KafkaStreamsConfig {    
                            
        @Value("${spring.kafka.producer.bootstrap-servers}")    
        private String bootstrapServers;    
                            
        KafkaStreamsConfiguration kStreamsConfig(String applicationId, Object valueSerde) {    
            Map<String, Object> props = new HashMap<>();    
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);    
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);    
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());    
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde);    
                            
            return new KafkaStreamsConfiguration(props);    
        }    
                            
        @Bean("expt1DSLBuilder")    
        FactoryBean<StreamsBuilder> expt1DSLBuilder() {    
            StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(kStreamsConfig("expt1-id", Expt1.class));    
            return streamsBuilder;    
        }    
                                
        @Bean("expt1KStream")    
        KStream<String, Expt1> expt1KStream(@Qualifier("expt1DSLBuilder") StreamsBuilder expt1DSLBuilder) {    
            // stream을 불러오는 부분, "expt1" 토픽을 구독한다.    
            KStream<String, Expt1> kStream = expt1DSLBuilder.stream("expt1");    
                            
            // 집계를 처리하는 부분    
            // "expt1"는 Materialized view 이름    
            kStream    
                .filter((key, value)-> value.getName() != null || value.getValue() != null)    
                .selectKey((key, value) -> {      
                    return "{'_id':\"" + value.getName() + "|" + value.getValue() + "\"}";    
                })      
                .groupByKey()    
                .aggregate(    
                    new Initializer<Expt1>() {    
                        public Expt1 apply() {    
                            return new Expt1();    
                        }    
                    },    
                    new Aggregator<String, Expt1, Expt1>() {    
                        public Expt1 apply(String key, Expt1 value, Expt1 aggregate) {    
                            aggregate.setName(value.getName());    
                            
                            aggregate.setValue(value.getValue());    
                                                    
                            aggregate.setCount(aggregate.getCount() + 1);    
                                                    
                            return aggregate;    
                        }    
                    },    
                    Materialized.as("expt1")    
                )    
             // 집계를 완료한 kTable을 "expt1-result" 토픽에 저장한다.    
            .toStream()    
            .to("expt1-result");    
                            
            return kStream;    
        }    
    }    
    

카프카 커넥트 설정

  • 설명
    • document.id.strategy를 FullKeyStrategy로 설정한다.
    • document.id.strategy.overwrite.existing를 true로 설정한다.
  • 설정 생성 명령
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    curl --location 'http://localhost:8083/connectors' \    
    --header 'Content-Type: application/json' \    
    --data-raw '{    
        "name": "mongo-sink-expt1",    
        "config": {    
            "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",    
            "connection.uri": "mongodb://mongodbuser:mongodbpw@mongo:27017/",    
            "tasks.max": "2",    
            "topics": "expt1-result",    
            "database": "expt",    
            "collection": "expt1",    
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",    
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",    
            "key.converter.schemas.enable": false,    
            "value.converter.schemas.enable": false,    
            "document.id.strategy.overwrite.existing": true,    
            "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy",    
            "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"    
        }    
    }'    
    

참고

This post is licensed under CC BY 4.0 by the author.

카프카 커넥트(kafka connect), Custom Post Processor 만들기

nestjs에서 Google OAuth2.0 인증 구현