개요
- 토픽마다 다른 json을 받는 카프카 스트림을 생성한다.
- 카프카 스트림은
같은 이름의 StreamsBuilderFactoryBean Bean을
오직 하나만 허용한다. - 따라서 StreamsBuilderFactoryBean에 이름을 부여하여
여러 개 생성할 수 있도록 한다. - 이름이 다른 StreamsBuilderFactoryBean은
@Qualifier를 이용하여 로드한다[참고1][참고2]. - 카프카 스트림 스토어(store)를 직접 조회해야 하는 경우
@Autowired로 StreamBuilder를 의존성 주입하여 사용한다[참고3].
예시
- 환경
- java 21
- spring boot 3.2.3
- 설치
1 2
implementation 'org.apache.kafka:kafka-streams' implementation 'org.springframework.kafka:spring-kafka'
- expt1/Expt1.java
1 2 3 4 5 6
@Getter @Setter public class Expt1 { public String name; public String value; }
- expt1/Expt1Serde.java
1 2 3 4 5 6 7 8 9
import org.apache.kafka.common.serialization.Serdes.WrapperSerde; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; public class Expt1Serde extends WrapperSerde<Expt1> { public Expt1Serde() { super(new JsonSerializer<>(), new JsonDeserializer<>(Expt1.class)); } }
- expt2/Expt2.java
1 2 3 4 5 6
@Getter @Setter public class Expt2 { public String name; public String value; }
- expt2/Expt2Serde.java
1 2 3 4 5 6
... public class Expt2Serde extends WrapperSerde<Expt2> { public Expt2Serde() { super(new JsonSerializer<>(), new JsonDeserializer<>(Expt2.class)); } }
- global/config/KafkaStreamsConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
@Configuration public class KafkaStreamsConfig { @Value("${spring.kafka.producer.bootstrap-servers}") private String bootstrapServers; KafkaStreamsConfiguration kStreamsConfig(String applicationId, Object valueSerde) { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerde); return new KafkaStreamsConfiguration(props); } @Bean("expt1DSLBuilder") FactoryBean<StreamsBuilder> expt1DSLBuilder() { StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(kStreamsConfig("expt1-id", Expt1.class)); return streamsBuilder; } @Bean("expt1KStream") KStream<String, Expt1> expt1KStream(@Qualifier("expt1DSLBuilder") StreamsBuilder expt1DSLBuilder) { // stream을 불러오는 부분, "expt1" 토픽을 구독한다. KStream<String, Expt1> kStream = expt1DSLBuilder.stream("expt1"); // 집계를 처리하는 부분 // "expt1"는 Materialized view 이름 kStream .selectKey((key, value) -> { return value.getPostId().replace("\"", ""); }) .groupByKey() .count(Materialized.as("expt1")) // 집계를 완료한 kTable을 "expt1-result" 토픽에 저장한다. .toStream() .to("expt1-result"); return kStream; } @Bean("expt2DSLBuilder") FactoryBean<StreamsBuilder> expt2DSLBuilder() { StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean(kStreamsConfig("expt2-id", Expt2.class)); return streamsBuilder; } @Bean("expt2KStream") KStream<String, Expt2> expt2KStream(@Qualifier("expt2DSLBuilder") StreamsBuilder expt2DSLBuilder) { // stream을 불러오는 부분, "expt2" 토픽을 구독한다. KStream<String, Expt2> kStream = expt2DSLBuilder.stream("expt2"); // 집계를 처리하는 부분 // "expt2"는 Materialized view 이름 kStream .selectKey((key, value) -> { return value.getPostId().replace("\"", ""); }) .groupByKey() .count(Materialized.as("expt2")) // 집계를 완료한 kTable을 "expt2-result" 토픽에 저장한다. .toStream() .to("expt2-result"); return kStream; } }
- expt1/Expt1Controller.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@RequestMapping(value = "/expt1") @RequiredArgsConstructor @RestController public class Expt1Controller { public final KafkaTemplate<String, Object> kafkaTemplate; @Autowired private @Qualifier("expt1DSLBuilder") FactoryBean<StreamsBuilder> expt1DSLBuilder; @PostMapping(value = "/produce") public void produceExpt1(@RequestBody Expt1 expt1) { this.kafkaTemplate.send("expt1", expt1); } @GetMapping("/all") public void printAllExpt1() { StreamsBuilderFactoryBean builder = (StreamsBuilderFactoryBean) expt1DSLBuilder; KafkaStreams kafkaStreams = builder.getKafkaStreams(); ReadOnlyKeyValueStore<String, Expt1> view = kafkaStreams .store(StoreQueryParameters.fromNameAndType("expt1", QueryableStoreTypes.keyValueStore())); KeyValueIterator<String, Expt1> address = view.all(); address.forEachRemaining(keyValue -> System.out.println("keyValue.toString()++" + keyValue.toString())); } }