개요
- PartialValueStrategy 사용할 때 Custom Post Processor를 사용하는 경우,
키로 사용되는 필드가 Post Processing 대상이 되면
ReplaceOneBusinessKeyStrategy이 동작하지 않는다. - id-strategy와 상관없이 Post Processing을 적용하기 쉽도록
메세지 키를 이용하여 문서 _id를 만드는 전략인 FullKeyStrategy를 사용한다.
FullKeyStrategy
- 메세지의 키에서 _id 필드만 추출하여 그 값을 키로 사용하는 전략이다.
- 메세지 예시
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
SinkRecord{ kafkaOffset=2, timestampType=CreateTime, originalTopic=user-result, originalKafkaPartition=0, originalKafkaOffset=2 } ConnectRecord{ topic='expt1-result', kafkaPartition=0, key={'_id':"my-key"}, keySchema=Schema{STRING}, value={ value=my-value, name=my-name, count=17 }, valueSchema=null, timestamp=1712566487462, headers=ConnectHeaders( headers=[ ConnectHeader( key=__TypeId__, value=com.myapp.api.expt1.Expt1, schema=Schema{STRING} ) ] ) }
- [참고]
카프카, 카프카 스트림, 카프카 커넥트 설정
- 기본적으로 아래 링크와 동일한 설정이며
앞으로의 내용은 변경된 부분을 주로 서술한다. - 카프카 커넥트(kafka connect)로 mongoDB에 카프카 스트림 쓰기
카프카 스트림 설정
- 설명
- selectKey에서 {“_id”: “my-key”}와 같이
json 문자열을 키로 설정해야한다.
- selectKey에서 {“_id”: “my-key”}와 같이
- global/config/KafkaStreamsConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
@Configuration public class KafkaStreamsConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; KafkaStreamsConfiguration kStreamsConfig(String applicationId, Object valueSerde) { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde); return new KafkaStreamsConfiguration(props); } @Bean("expt1DSLBuilder") FactoryBean<StreamsBuilder> expt1DSLBuilder() { StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(kStreamsConfig("expt1-id", Expt1.class)); return streamsBuilder; } @Bean("expt1KStream") KStream<String, Expt1> expt1KStream(@Qualifier("expt1DSLBuilder") StreamsBuilder expt1DSLBuilder) { // stream을 불러오는 부분, "expt1" 토픽을 구독한다. KStream<String, Expt1> kStream = expt1DSLBuilder.stream("expt1"); // 집계를 처리하는 부분 // "expt1"는 Materialized view 이름 kStream .filter((key, value)-> value.getName() != null || value.getValue() != null) .selectKey((key, value) -> { return "{'_id':\"" + value.getName() + "|" + value.getValue() + "\"}"; }) .groupByKey() .aggregate( new Initializer<Expt1>() { public Expt1 apply() { return new Expt1(); } }, new Aggregator<String, Expt1, Expt1>() { public Expt1 apply(String key, Expt1 value, Expt1 aggregate) { aggregate.setName(value.getName()); aggregate.setValue(value.getValue()); aggregate.setCount(aggregate.getCount() + 1); return aggregate; } }, Materialized.as("expt1") ) // 집계를 완료한 kTable을 "expt1-result" 토픽에 저장한다. .toStream() .to("expt1-result"); return kStream; } }
카프카 커넥트 설정
- 설명
- document.id.strategy를 FullKeyStrategy로 설정한다.
- document.id.strategy.overwrite.existing를 true로 설정한다.
- 설정 생성 명령
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
curl --location 'http://localhost:8083/connectors' \ --header 'Content-Type: application/json' \ --data-raw '{ "name": "mongo-sink-expt1", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri": "mongodb://mongodbuser:mongodbpw@mongo:27017/", "tasks.max": "2", "topics": "expt1-result", "database": "expt", "collection": "expt1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter.schemas.enable": false, "document.id.strategy.overwrite.existing": true, "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy", "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy" } }'