Posts 카프카 커넥트(kafka connect), Custom Post Processor 만들기
Post
Cancel

카프카 커넥트(kafka connect), Custom Post Processor 만들기

개요

Post Processor

  • 설명
    • [참고2 - How to Create a Custom Post Processor]
    • Custom Post Processor는 sink connector의 plugin으로 처리된다.
    • mongoDB Sink Connector PostProcessor.java
      상속 받는 클래스를 생성한 후
      process() 메소드에 원하는 로직을 입력하여 만든다.
      예시에서는 sinkPostProcessor 패키지의 ObjectIdPostProcessor 클래스로 가정한다.
    • 작성한 ObjectIdPostProcessor .java를
      class로 빌드 후 jar 파일로 만든다.
    • 이후 mongoDB sink connector plugin 디렉토리에 jar를 이동시킨다.
    • Sink Connector 생성 명령에서 ObjectIdPostProcessor를 추가한다.
      1
      2
      3
      4
      5
      
      {  
          ...  
          "post.processor.chain": "sinkPostProcessor.ObjectIdPostProcessor",  
          "value.projection.list": "id"  
      }  
      

카프카, 카프카 스트림, 카프카 커넥트 설정

트러블 슈팅1 - java version

  • 설명
    • ObjectIdPostProcessor 클래스는 mongoDB sink connector의
      자바 버전과 동일한 자바 버전에서 컴파일 되어야 한다.
    • 그렇지 않으면 아래와 같은 아래가 발생한다.
      2024-04-04 10:42:51 Caused by: java.lang.UnsupportedClassVersionError: com/myapp/streams/ObjectIdPostProcessor has been compiled by a more recent version of the Java Runtime (class file version 65.0), this version of the Java Runtime only recognizes class file versions up to 55.0
    • 내 작업 환경에서는 jdk 21을 사용 중이었으나
      mongoDB sink connector가 설치된
      confluentinc/cp-kafka-connect:7.6.0 이미지는
      jdk11을 사용 중인 것으로 추정된다[참고3].
    • OpenJdk 11을 설치하고 “트러블 슈팅2”로 넘어간다.
      [참고4]

트러블 슈팅2 - 이클립스 그래들 프로젝트 생성

  • 설명
    • mongoDB Sink Connector PostProcessor.java에는
      의존성 패키지들이 존재한다.
    • ObjectIdPostProcessor 빌드 시
      의존성 패키지들도 함께 처리해주기 위해
      이클립스 그래들 프로젝트를 생성하고
      기본 compiler를 java 11로 변경한다.
    • [참고5][참고6]
  • 이클립스에 그래들 설치
    • (상단 메뉴)Help > Eclipse Marketplace > gradle 검색 후
      Buildship Gradle Integration 3.0 설치
  • 그래들 프로젝트 생성
    • (상단 메뉴)File > New > Project…(Ctrl + N)로 New Project 팝업 생성
      • gradle 버전: 7.4.2를 선택
      • Java home: jdk11 디렉토리 선택
  • 컴파일러 버전 변경
    • (상단 메뉴)Window > Preferences >
      compiler 검색 후 Java Compiler 선택,
      Compiler compliance level 11로 변경

ObjectIdPostProcessor 코드

  • 설명
    • mongoDB sink connector 생성 명령에서
      value.projection.list에 입력된 필드명(쉼표 구분 comma separated)을
      읽어 들인다.
    • 카프카 메세지에서 해당 필드의 값(field value)를
      BsonObjectID 타입으로 변경 후 저장한다.
  • 의존성 패키지
    1
    2
    3
    4
    
    compileOnly 'org.mongodb.kafka:mongo-kafka-connect:1.11.2'  
    compileOnly 'org.apache.kafka:connect-api:3.7.0'  
    compileOnly 'org.mongodb:bson:3.3.0'  
    compileOnly 'org.slf4j:slf4j-api:1.7.25'  
    
  • sinkPostProcessor/ObjectIdPostProcessor.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    
    package sinkPostProcessor;  
                
    import org.bson.types.ObjectId;  
                
    import org.apache.kafka.connect.sink.SinkRecord;  
    import org.bson.BsonObjectId;  
                
    import org.slf4j.Logger;  
    import org.slf4j.LoggerFactory;  
                
    import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;  
    import com.mongodb.kafka.connect.sink.converter.SinkDocument;  
    import com.mongodb.kafka.connect.sink.processor.PostProcessor;  
                
    import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.VALUE_PROJECTION_LIST_CONFIG;  
                
    import java.util.ArrayList;  
    import java.util.Arrays;  
    import java.util.List;  
                
    public class ObjectIdPostProcessor extends PostProcessor {  
    	private static final Logger LOGGER = LoggerFactory.getLogger(PostProcessor.class);  
    	private final List<String> fieldNames;  
                
    	public ObjectIdPostProcessor(final MongoSinkTopicConfig config) {  
    		super(config);  
    		String rawFields = config.getString(VALUE_PROJECTION_LIST_CONFIG);  
    		fieldNames = new ArrayList<String>(Arrays.asList(rawFields.split(",")));  
    	}  
                
    	@Override  
    	public void process(final SinkDocument doc, final SinkRecord orig) {  
    		doc.getValueDoc().ifPresent(vd -> {  
    			for (String fieldName : fieldNames) {  
    				if (vd.containsKey(fieldName)) {  
    					String hash = vd.getString(fieldName).getValue();  
    //					LOGGER.warn("+++" + hash);  
      					  
    					vd.put(fieldName, new BsonObjectId(new ObjectId(hash)));  
    				}  
    			}  
    		});  
    	}  
                
    }  
                
    

트러블 슈팅3 - ObjectIdPostProcessor.java를 jar 파일로 추출

  • ObjectIdPostProcessor.java 우클릭 > Export 선택
  • JAR file 선택
  • 패지키(sinkPostProcessor) 선택 후 ObjectIdPostProcessor.java 선택,
    jar 파일 경로를 mongoDB Sink Connector plugin 경로 하위로 선택
  • mongoDB Sink Connector plugin 경로란?
    • docker-compose.yml 예시
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      
      # Use root/example as user/password credentials  
      version: '3.8'  
                      
      services:  
        kafka:  
          ...  
                      
        mongo:  
          ...  
                      
        kafka-connect:  
            image: confluentinc/cp-kafka-connect:7.6.0  
            ports:  
              - 8083:8083  
            depends_on:  
              - kafka  
              - mongo  
            environment:  
              CONNECT_BOOTSTRAP_SERVERS: broker:19092  
              CONNECT_REST_PORT: 8083  
              CONNECT_GROUP_ID: "quickstart-avro"  
              CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"  
              CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"  
              CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"  
              CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1  
              CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1  
              CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1  
              CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
              CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
              CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
              CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"  
              CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"  
              CONNECT_LOG4J_ROOT_LOGLEVEL: WARN  
              CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"  
            volumes:  
              - ./kafka-connect/jars:/etc/kafka-connect/jars  
      
    • CONNECT_PLUGIN_PATH와 연결된 ./kafka-connect/jars가
      mongoDB Sink Connector plugin 경로이다.
    • [참고7]

ObjectIdPostProcessor 사용하기

  • mongoDB Sink Connector 재실행
    • ObjectIdPostProcessor.jar를 인식시키기 위하여
      kafka-connect 컨테이너를 재시작한다.
  • sink connector 생성 명령
    • 설명
      • post.processor.chain에 삽입할 Custom Post Processor를 입력한다.
        ex) “post.processor.chain”: “sinkPostProcessor.ObjectIdPostProcessor”
      • value.projection.list에 대상 필드명을 콤마 구분하여 입력한다.
        ex) “value.projection.list”: “id,target-id”
      • !주의 - PartialValueStrategy 사용불가
        ObjectIdPostProcessor의 대상 필드가 PartialValueStrategy 일 경우
        writeModel을 ReplaceOneBusinessKeyStrategy로 지정하여도
        같은 키를 가진 문서가 갱신되지 않고 추가만 된다.
        키로 사용되지 않는 필드를 추가하거나
        FullKeyStrategy를 사용하여 대응할 수 있다.
        [참고8]
    • 명령
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      
      curl --location 'http://localhost:8083/connectors' \  
      --header 'Content-Type: application/json' \  
      --data-raw '{  
          "name": "mongo-sink-user",  
          "config": {  
              "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",  
              "connection.uri": "mongodb://mongodbuser:mongodbpw@mongo:27017/",  
              "tasks.max": "1",  
              "topics": "user-result",  
              "database": "my-database",  
              "collection": "user",  
              "key.converter": "org.apache.kafka.connect.storage.StringConverter",  
              "value.converter": "org.apache.kafka.connect.json.JsonConverter",  
              "key.converter.schemas.enable": false,  
              "value.converter.schemas.enable": false,  
              "document.id.strategy.overwrite.existing": true,  
              "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy",  
              "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",  
              "post.processor.chain": "sinkPostProcessor.ObjectIdPostProcessor",  
              "value.projection.list": "id,targetId"  
          }  
      }'  
      

추가 - Custom Post Processor 에러 확인

  • mongoDB sink connector 생성 시
    PostProcessor에 에러가 있는 경우 ,
    kafka-connect container 로그에서 확인할 수 있다.
  • mongoDB sink connector는 생성 후
    처리 중에 에러가 발생했을 경우,
    GET connectors/{connector-name}/status 명령으로 확인할 수 있다.
    1
    
    curl --location 'http://localhost:8083/connectors/mongo-sink-user/status'  
    

추가 - ObjectIdPostProcessor.jar 파일

  • ObjectIdPostProcessor.jar 파일 링크에서
    download raw file을 눌러 다운로드 할 수 있다.
  • ObjectIdPostProcessor를 그대로 사용하고 싶은 경우
    위 jar 파일만 받아서 mongoDB sink connector plugin 디렉토리에 복사하여
    사용하면 된다.

참고

This post is licensed under CC BY 4.0 by the author.

카프카 커넥트(kafka connect), json 스키마 사용하기

mongoDB Sink Connector, FullKeyStrategy id-strategy 사용하기