본문 바로가기
개발일지/message queue,broker

Kafka와 SSE를 활용한 비동기 처리

2025. 1. 7.

이번 게시물에는 알람을 발생 시킬때 일반적으로

kafka와 sse 를 사용한 방법이 적절한 방법인지 알아보자

Kafka(카프카)란?

Kafka는 Apache Software Foundation에서 개발한 오픈 소스 메시지 브로커 프로젝트로, 대용량의 실시간 처리를 가능하게 하는 분산 스트리밍 플랫폼이다. 즉, 분산 스트리밍 플랫폼이란 대량의 데이터를 실시간으로 수집, 처리, 전달하는 시스템을 의미한다. 카프카는 분산 메시징 시스템으로, 대용량 데이터의 비동기 처리에 최적화된 도구이다. 카프카를 사용하면 데이터 생산자(Producer)와 소비자(Consumer) 간의 메시지 교환을 효율적으로 처리할 수 있다. 이를 통해 실시간 데이터 스트리밍이 가능해지며, 시스템 간의 결합도를 낮추고 확장성을 높일 수 있다.

 

Kafka의 데이터 저장 방식

Kafka에서는 토픽(Topic) 단위로 데이터를 저장한다. 토픽은 여러 개의 파티션(Partition)으로 분할되고, 각 파티션은 서로 다른 브로커(Broker) 서버에 저장될 수 있다. 📌 즉, 한 개의 토픽이 여러 서버에 나뉘어 저장되는 구조이다. 이 덕분에 데이터를 병렬로 처리할 수 있고, 성능이 향상된다.

참고로 토픽(Topic)은 논리적인 개념이라서 직접 브로커에 저장되지 않고, 파티션(Partition)이 물리적으로 저장되는 것이다. 즉, 토픽은 개념적인 그룹일 뿐이고, 실제로 브로커에 저장되는 것은 개별 파티션이다.

kafka 구조

위 이미지는 Kafka의 기본 구조를 보여준다. Producer(생산자), Broker(중개자), 그리고 Consumer(소비자)로 나뉘어 데이터가 어떻게 처리되는지 나타낸다.

  • Producer
    • Producer는 데이터를 생성하고 Kafka 브로커로 전송하는 주체이다.
    • Producer가 생성하는 데이터는 Event(이벤트)로 표현되며, 이 이벤트에는 고유한 Key(키)가 할당된다.
    • 이 키는 데이터를 어느 Partition에 저장할지 결정하는 기준이 된다.
  • Broker
    • Broker는 데이터를 관리하고 저장하는 Kafka의 중심 역할을 한다.
    • Producer가 보낸 이벤트는 Broker 내의 Topic(토픽)에 저장되며, 하나의 Topic은 여러 개의 Partition으로 나뉜다.
    • 파티션(Partition)
      • 파티션은 토픽의 데이터를 물리적으로 나누어 저장하는 단위이다. ( 토픽은 논리적인 개념이라서 직접 브로커에 저장되지 않고, 파티션(Partition)이 물리적으로 저장되는 것이다.)
      • 각 Partition은 데이터를 순서대로 기록하고 저장하며, 메시지를 관리한다. Partition을 사용하면 데이터가 여러 서버에 분산 저장되어 확장성을 크게 높일 수 있다.
      • 파티션은 토픽의 데이터를 물리적으로 나누는 단위이다. 각 파티션은 독립적인 로그로 저장되며, 여러 브로커에 분산되어 데이터를 처리한다. 파티션을 사용함으로써 병렬 처리와 높은 확장성이 가능하며, 각 파티션 내에서는 메시지 순서가 보장된다.
    • Topic 0부터 Topic N까지 다양한 토픽이 존재할 수 있으며, 각각의 토픽이 여러 파티션으로 나누어진다.
  • Consumer
    • Consumer는 Broker에 저장된 데이터를 가져가 처리하는 주체이다.
    • 여러 Consumer가 Consumer Group을 이루며, 이 그룹 내에서 데이터를 효율적으로 처리한다.
    • 한 Consumer Group에 속한 Consumer들은 서로 다른 파티션에서 데이터를 읽어가며, 동시에 여러 데이터를 병렬로 처리할 수 있다.
    • 이미지에서 볼 수 있듯이, 각각의 파티션으로부터 데이터를 읽어오는 Consumer가 여러 개 있을 수 있다.
    • 예를 들어, Consumer 1과 Consumer 2는 각각 다른 파티션에서 데이터를 읽어온다.
  • Cluster
    • 여러개의 브로커로 구성된 분산 시스템으로 대량의 데이터를 안정적으로 처리하고 확장성을 높이기 위한 구조

카프카는 Producer와 Consumer 사이에 Broker를 두어 데이터를 주고받는 시스템이다. 데이터를 파티셔닝 하여 분산 처리함으로써 대용량 데이터의 비동기 처리가 가능하다. Producer는 데이터를 보내고, Broker는 데이터를 저장 및 관리하며, Consumer는 해당 데이터를 받아서 처리하는 구조이다. 이러한 구조 덕분에 Kafka는 대규모 데이터를 빠르게 처리하고, 시스템의 확장성과 유연성을 보장할 수 있다.

 

 

실습

로컬에서 Kafka를 사용하기 위해서는 먼저 Kafka를 설치해야 한다. Mac에서는 brew를 이용해 설치할 수 있다

brew install kafka

 

Kafka를 실행하기 전에는 zookeeper를 먼저 실행해야 한다.

 brew services start zookeeper

 

그 후, Kafka를 실행한다.

brew services start kafka

Kafka 토픽 생성

이번 프로젝트에서는 alarm이라는 토픽을 생성하여 알림 이벤트를 관리한다. 아래 명령어로 alarm 토픽을 생성할 수 있다.

kafka-topics --create --topic alarm --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1Copy

-topic alarm:

생성할 토픽의 이름을 지정한다. 여기서는 alarm이라는 이름의 토픽을 생성한다.

--bootstrap-server localhost:9092:

Kafka 브로커의 주소를 지정한다.

--partitions 3:

생성할 토픽에 대해 파티션 수를 설정한다.

--replication-factor 1:

파티션의 복제본 수를 설정한다. 1은 복제본이 하나만 있다는 뜻으로, 이 경우 각 파티션은 하나의 브로커에만 저장되고 복제본은 없다.

 

Kafka 프로듀서 생성

Kafka에서 이벤트를 생성하는 프로듀서를 생성하기 위해 콘솔 프로듀서를 실행한다.

kafka-console-producer --topic alarm--bootstrap-server localhost:9092

Gradle 의존성 추가

Kafka와 Spring Kafka를 연동하기 위해 build.gradle 파일에 아래와 같은 의존성을 추가한다.

implementation 'org.springframework.kafka:spring-kafka'

 

 

Kafka를 사용한 비동기 알림 처리 흐름

💡 기존 방식 (동기 처리)

예를 들어, 사용자 A, B, C에게 알림을 보낼 때, 동기 방식이라면 서버가 각 사용자에게 직접 알림을 보내고 응답을 받을 때까지 기다려야 한다.

  • 알림 1 → 전송 완료 ✅ → 알림 2 → 전송 완료 ✅ → 알림 3 → 전송 완료 ✅
  • 만약 한 명에게 알림을 보내는 데 오래 걸리면, 다음 알림도 늦어짐

💡 Kafka를 사용한 비동기 방식

Kafka를 이용하면 알림을 즉시 전송하는 것이 아니라 Kafka에 넣어두고, 따로 실행되는 소비자(Consumer)가 이를 가져가서 비동기적으로 처리한다.

  • 알림 1 → Kafka에 저장
  • 알림 2 → Kafka에 저장
  • 알림 3 → Kafka에 저장
  • 소비자가 가져가면서 개별적으로 처리 → 사용자에게 알림 발송

 

PostService 클래스

PostService 클래스는 게시물의 생성, 수정, 삭제와 같은 핵심 기능을 제공하며, 이번 수정에서는 알림 기능을 비동기적으로 처리하기 위해 Kafka를 활용한 AlarmProducer로 전환하였다.

   private final AlarmProducer alarmProducer;
    
    @Transactional
    public void like(Integer postId, String userName) {
        ...
        
        // like save
        likeEntityRepository.save(LikeEntity.of(userEntity, postEntity));
        alarmProducer.send(new AlarmEvent(postEntity.getUser().getId(), AlarmType.NEW_LIKE_ON_POST, new AlarmArgs(userEntity.getId(), postEntity.getId())));
    }
	
    ...
    
    @Transactional
    public void comment(Integer postId, String userName, String comment) {
	     ...

        // comment save
        commentEntityRepository.save(CommentEntity.of(userEntity, postEntity, comment));
        AlarmEntity alarmEntity = alarmEntityRepository.save(AlarmEntity.of(postEntity.getUser(), AlarmType.NEW_COMMENT_ON_POST, new AlarmArgs(userEntity.getId(), postEntity.getId())));

        alarmProducer.send(new AlarmEvent(postEntity.getUser().getId(), AlarmType.NEW_COMMENT_ON_POST, new AlarmArgs(userEntity.getId(), postEntity.getId())));
    }


AlarmProducer 클래스

AlarmProducer 클래스는 Kafka를 사용하여 알림 이벤트를 비동기적으로 처리하기 위해 설계된 프로듀서 클래스이다. 이 클래스의 주요 역할은 알림(AlarmEvent)을 Kafka 브로커로 전송하는 것이다.

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmProducer {

    private final KafkaTemplate<Integer, AlarmEvent> kafkaTemplate;

    @Value("${spring.kafka.topic.alarm}")
    private String topic;

    public void send(AlarmEvent event) {
        kafkaTemplate.send(topic, event.getReceiveUserId(), event);
        log.info("Send to Kafka finished");
    }
}


AlarmConsumer 클래스

AlarmConsumer 클래스는 Kafka로부터 수신된 알림 이벤트를 처리하는 역할을 하는 컨슈머 클래스이다. 이 클래스는 Kafka에서 전송된 AlarmEvent를 수신하고, 해당 알림을 처리하는 로직을 포함한다

@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {

    private final AlarmService alarmService;

    @KafkaListener(topics = "${spring.kafka.topic.alarm}")
    public void consumeAlarm(AlarmEvent event, Acknowledgment ack){
        log.info("Consume the event {}", event);
        alarmService.send(event.getAlarmType(), event.getArgs(), event.getReceiveUserId());
        ack.acknowledge();
    }
}


AlarmService 클래스

AlarmService 클래스에서는 알림 데이터를 저장하고, Kafka로 이벤트를 전송한다.

    private final AlarmEntityRepository alarmEntityRepository;
    private final UserEntityRepository userEntityRepository;

    public void send(AlarmType type, AlarmArgs arg, Integer receiverUserId) {
        UserEntity user = userEntityRepository.findById(receiverUserId).orElseThrow(
                () -> new SnsApplicationException(ErrorCode.USER_NOT_FOUND));
        // alarm save
        AlarmEntity alarmEntity = alarmEntityRepository.save(AlarmEntity.of(user, type, arg) );

        // 인스턴스를 가져온다.
        emitterRepository.get(receiverUserId).ifPresentOrElse(sseEmitter -> {
            try {
                // ALARM_NAME = "alarm" 이란 이름의 알람이 발생되면 클라이언트에서 "alarm"이란 이름으로 발생된 이벤트가 발생한다.
                sseEmitter.send(SseEmitter.event().id(alarmEntity.getId().toString()).name(ALARM_NAME).data("new alarm"));
            } catch (IOException e){
                emitterRepository.delete(receiverUserId);
                throw new SnsApplicationException(ErrorCode.ALARM_CONNECT_ERROR);
            }
        }, () -> log.info("No emitter founded"));
    }

 


Kafka를 사용하면 뭐가 달라지나?

1️⃣ 동시 알림 처리가 가능하다 (비동기 방식)

  • 동기 방식에서는 한 개의 알림을 보낸 후 다음 알림을 보낼 수 있지만,
  • Kafka를 사용하면 여러 개의 알림을 동시에 전송 가능
  • 예: 사용자 1, 2, 3에게 동시에 알림이 가도록 처리 가능

2️⃣ 알림이 실패해도 손실되지 않는다 (내결함성)

  • Kafka가 중간에서 버퍼 역할을 하기 때문에,
  • 알림 전송 중 서버가 장애가 나도 Kafka에 저장된 알림은 유지
  • → 이후 장애 복구 후 다시 알림을 보낼 수 있음

3️⃣ 서버 부하를 줄일 수 있다 (부하 분산)

  • 동기 방식이라면 알림을 보낼 때마다 서버가 바빠지지만,
  • Kafka는 한 번에 많은 알림을 받아 저장하고, 소비자가 알아서 가져가도록 할 수 있음
  • → 서버가 알림을 바로 보내느라 바빠지지 않음

 

결론

  • Kafka를 이용하면 알림을 동기적으로 즉시 보내는 것이 아니라, 저장 후 비동기적으로 처리
  • 여러 개의 알림이 발생해도 Kafka가 큐처럼 저장하고, Consumer가 병렬로 처리 가능
  • 알림 전송이 실패해도 Kafka가 유지하므로 다시 전송 가능
  • 서버 부하를 줄이고, 대량의 알림을 빠르게 처리할 수 있음
  • 즉, Kafka를 사용하면 서버가 부담 없이 대량의 알림을 안정적으로 전송할 수 있다!

 

Kafka 없이 SseEmitter만 사용하면?

SseEmitter는 서버가 클라이언트(브라우저 등)와 지속적으로 연결을 유지하면서 비동기적으로 데이터를 푸시할 수 있는 방식이다.

즉, 서버에서 발생한 이벤트를 바로 클라이언트에 전달하는 역할을 한다.

 

비동기 처리 가능 여부

✅ SseEmitter는 내부적으로 별도의 쓰레드에서 데이터를 전송하므로 비동기적으로 알림을 전송하는 것이 가능하다.

❌ 그러나 알림이 여러 개 발생했을 때 처리 방식이 Kafka와 다름 → Kafka처럼 이벤트를 저장하고 순차적으로 처리하는 큐 역할이 없음

 

Kafka만 사용하면?

알림 데이터를 비동기적으로 처리하고 안정적으로 저장할 수 있음

클라이언트가 알림을 즉시 받을 방법이 없음 (Polling 필요)

네트워크 부하 증가 가능성 (Polling이 빈번하게 일어나면 비효율적)

✅ Kafka만 사용해도 Polling이나 WebSocket을 활용하면 클라이언트로 알림을 전송할 수 있음

❌ 그러나 Kafka 자체에는 푸시 기능이 없어서, Polling/WebSocket/SSE 등의 추가 기술이 필요

 

Kafka + SSE (최적의 조합)

Kafka만 사용하면 클라이언트가 직접 요청해야 하기 때문에,

Kafka(비동기 알림 처리) + SSE(클라이언트에게 즉시 푸시) 를 같이 사용하는 것이 일반적이다.

'개발일지 > message queue,broker' 카테고리의 다른 글

Kafka Stream 이란  (0) 2024.12.05
Kafka 인스턴스 생성 과정  (0) 2024.11.25
Redis설치 및 Configuration(2)  (0) 2024.08.28
Redis설치 및 Configuration(1)  (0) 2024.08.21
Redis 구조 및 성능 비교  (0) 2024.08.16