본문 바로가기
개발일지/message queue,broker

Kafka Stream 이란

2024. 12. 5.

Kafka Stream이란?

Kafka Streams은 Apache Kafka에서 제공하는 스트림 처리 라이브러리이다.
이를 통해 실시간 데이터 스트림(실시간으로 끊임없이 들어오는 데이터의 연속적인 흐름)을 쉽게 처리하고 변환하는 애플리케이션을 작성할 수 있다.

Kafka Streams은 Kafka에 저장된 데이터를 처리하거나 데이터를 실시간으로 읽고 쓰는 데 사용되며, 다음과 같은 주요 특징을 갖는다:

  1. 라이브러리 기반: 별도의 클러스터를 필요로 하지 않고, Java 애플리케이션 내부에 포함되어 실행된다.
  2. 분산 처리: Kafka의 파티션 구조를 활용하여 확장 가능한 분산 처리가 가능하다.
  3. 상태 관리: 스트림 처리 중 상태 정보를 저장하기 위해 RocksDB와 같은 내부 스토리지를 활용한다.
  4. 배치 및 실시간 처리 통합: 같은 코드를 사용해 배치 처리와 실시간 처리를 모두 수행할 수 있다.
  5. Kafka와 통합: Kafka의 장점을 기반으로 설계되어 높은 처리량과 신뢰성을 제공한다.

Kafka Streams는 데이터 파이프라인 또는 이벤트 기반 애플리케이션에서 데이터를 변환하거나 필터링하는 데 자주 사용된다.

 

 

Java/spring boot 에서 

Apache Kafka Streams 애플리케이션을 설정하기 위한 Spring Boot Configuration 를 설정한 코드를 알아보자

이 설정은 Kafka Streams를 사용하여 실시간 데이터 스트림 처리를 수행한다.

 

먼저 KafkaConfig 에서 KafkaStreamsConfiguration 을 설정한다.

즉, Kafka Streams 애플리케이션의 동작 환경을 설정합니다.

@Configuration
@EnableKafkaStreams
@EnableKafka
public class KafkaConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration myKStreamConfig() {
        Map<String, Object> myKStreamConfig = new HashMap<>();
        myKStreamConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-test");
        myKStreamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.31.0.123:9092, 172.31.4.124:9092, 172.31.13.125:9092");
                                        // 스트림은 받고 바로 보내고 하기 때문에 SERIALIZER, DESERIALIZER를 하지 않고 한 번에 선언을 해준다.
        myKStreamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        myKStreamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        myKStreamConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
        return new KafkaStreamsConfiguration(myKStreamConfig);
    }
}

 

다음은 StreanService 코드를 작성한다.

@Service
public class StreamService {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    public void buildPipeline(StreamsBuilder sb) {
        KStream<String, String> myStream = sb.stream("defaultTopic", Consumed.with(STRING_SERDE, STRING_SERDE));
        myStream.print(Printed.toSysOut());
        myStream.filter((key, value)-> value.contains("freeClass")).to("freeClassList");
    }
}

참고로

Consumed.with()는 Kafka Streams에서 KStream을 생성할 때, 데이터를 어떻게 소비할지를 지정한다.

이는 Kafka Streams가 브로커에서 데이터를 가져와 처리하는 방식에 대한 설정이다.

 

그리고 로컬 터미널에서 kafka가 설치된 서버로 접속해서 명령어를 입력한다.

 ~/kafka/bin/kafka-console-producer.sh --broker-list 172.31.0.123:9092, 172.31.4.124:9092, 172.31.13.125:9092 --topic defaultTopic
> dance freeClass

 

그렇다면 흐름은 아래와 같다.

 

1. Producer 단계 ( Kafka Producer에서 메시지 전송 )

명령어 실행 후, 메시지 dance freeClass를 defaultTopic 토픽에 보낸다.

 

2. broker 단계

producer에서 전송된 메시지가 Kafka 브로커(클러스터)에 저장됩니다. 이 데이터는 defaultTopic이라는 이름으로 관리된다..
StreamService는 이 단계와도 직접적인 연관은 없다.

3. Kafka Streams 애플리케이션: 토픽 데이터를 소비

Kafka Streams 애플리케이션에서는 StreamService가 설정한 대로 defaultTopic 토픽에서 메시지를 소비한다.

 

StreamsBuilder를 사용해 KStream 객체(myStream)을 생성하고, defaultTopic에서 메시지를 읽어온다.

dance freeClass 메시지는 "freeClass"를 포함하므로, 필터 조건을 만족한다.

freeClassList 토픽에 메시지 전송한다.

 

 

Kafka Streams는 producer와 consumer 사이에서 데이터 처리 로직을 추가하는 애플리케이션이다.

따라서 일반적인 producer → broker → consumer의 흐름이 다음처럼 확장된다:

producer → broker → (Kafka Streams 애플리케이션) → consumer