본문 바로가기

프로젝트/미니 프로젝트

spring boot + apache kafka 를 활용한 미니 채팅앱 만들어보기(2) - spring boot 에서 producer, consumer 구현하기

https://loy124.tistory.com/413

 

spring boot + apache kafka 를 활용한 미니 채팅앱 만들어보기(1) - kafka 개요 및 테스트

Apache Kafka kafka는 분산 스트리밍 플랫폼으로서 메시지큐 역할을 할 수 있는 시스템이다.대규모 실시간 데이터 스트림을 안정적으로 처리할수 있는장점이 있다. message queue 는 뭘까? 발신자(Producer)

loy124.tistory.com

 

이제 위 내용을 기반으로 간단한

 

test-chat-room 이라는 토픽을 만들어서 해당 데이터를 저장하고 받아오는식으로 진행할 예정이다.

카프카에 데이터 전송하기 

클라이언트 → [ChatController] → [ChatService] → [ChatKafkaProducer] → Kafka 토픽(chat-room)


카프카로부터 데이터 조회하기 

kafka 토픽(chat-room) -> [ChatKafkaConsumer] -> [ChatService]

 

spring boot 세팅하기

 

 


https://start.spring.io/ 에 접속한 후 위와 같이 세팅 후  spring boot 서버를 생성해준다. 

 

 

앞으로 만들 서비스의 흐름은 다음과 같다. 

 

클라이언트 → [ChatController] → [ChatService] → [ChatKafkaProducer] → Kafka 토픽(chat-room)

 

 

폴더 구조는 다음과 같다.

 

topic 생성하기

 

 

core/config/KafkaTopicConfig.java

package com.example.chat.chat_server.core.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {


    @Bean
    public NewTopic chatRoomTopic(){
        return TopicBuilder
                .name("chat-room")
                //메시지가 물리적으로 저장되는 단위
                .partitions(1)
                //복제본 갯수 
                .replicas(1)
                .build();
    }
}

 

위와 같이 생성하면 kafka admin이 자동으로 앱이 기동될때마다 해당 topic을 생성한다.

 

 

 

topic 으로 데이터 전송 구현하기

 

Dto 생성하기

chat/dto/ChatMessage.java

package com.example.chat.chat_server.chat.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
    private String roomId;    // 채팅방 ID
    private String sender;    // 보낸 사람
    private String message;   // 메시지 내용
}

 

 

Producer 생성하기

 

채팅 메시지를 kafka 'chat-room' 토픽으로 전송하는 클래스 ChatKafkaProducer 를 생성해준다. 

 

chat/infrastructure/kafka/ChatKafkaProducer.java

package com.example.chat.chat_server.chat.infrastructure.kafka;

import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ChatKafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    // Kafka에 JSON 메시지 전송
    public void send(ChatMessage chatMessage) {
        try {
            String jsonMessage = objectMapper.writeValueAsString(chatMessage);  // 직렬화
            kafkaTemplate.send("chat-room", chatMessage.getRoomId(), jsonMessage);  // 토픽 전송
            System.out.println("Kafka 전송 완료 → " + jsonMessage);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Kafka 메시지 직렬화 실패", e);
        }
    }
}

 

 

Service 생성하기

chat/service/ChatService.java

package com.example.chat.chat_server.chat.service;

import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.example.chat.chat_server.chat.infrastructure.kafka.ChatKafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class ChatService {

    private final ChatKafkaProducer chatKafkaProducer;

    public void sendMessage(ChatMessage chatMessage) {
        chatKafkaProducer.send(chatMessage);
    }
}


 

 

Controller 생성하기

 

chat/controller/ChatController.java

package com.example.chat.chat_server.chat.controller;

import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.example.chat.chat_server.chat.service.ChatService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/chat")
public class ChatController {

    private final ChatService chatService;

    @PostMapping("/send")
    public ResponseEntity<String> send(@RequestBody ChatMessage message) {
        chatService.sendMessage(message);
        return ResponseEntity.ok("메시지 전송 완료");
    }
}

 

 

위와 같이 작성하게 되면순으로 로직이 완성된다. 

 

클라이언트 → [ChatController] → [ChatService] → [ChatKafkaProducer] → Kafka 토픽(chat-room)

 

이제 간략하게 테스트를 해볼 시간이다. 

 

먼저 POST 요청이기 때문에 클라이언트 구현 대신 PostMan을 사용해서 테스트 해보겠다. 

 

{
    "roomId": 1,
    "sender": 1, 
    "message":"hello-world!"   
}

 

hello-world라는 내용으로 데이터를 보내주었다. 

 

 

이제 터미널에 들어가서 데이터가 잘 들어왔는지 체크하면 된다. 

 docker exec -it kafka bash
 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chat-room --from-beginning

 

위와 같이 데이터가 잘 들어온것을 확인할수 있다. 

 

이제 spring boot에서도 consumer를 사용해서 데이터를 받아오는 부분을 만들어 보면 된다. 

 

 

 

Consumer로 데이터 수신하기

 

package com.example.chat.chat_server.chat.infrastructure.kafka;

import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.example.chat.chat_server.chat.service.ChatService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ChatKafkaConsumer {

    private final ObjectMapper objectMapper;
    private final ChatService chatService;

    @KafkaListener(topics = "chat-room", groupId = "chat-group")
    public void listen(ConsumerRecord<String, String> record) {
        try {
            String json = record.value(); // Kafka 메시지 값
            ChatMessage message = objectMapper.readValue(json, ChatMessage.class);

            System.out.println("Kafka 수신 메시지: " + message);

            // 서비스로 위임
            chatService.handleReceivedMessage(message);

        } catch (Exception e) {
            System.err.println("Kafka 메시지 처리 실패: " + e.getMessage());
        }
    }
}

 

위와 같이 작성하고 다시 테스트를 진행해 보겠다. 

 

 

안녕 세상!이라는 message를 보내주고 실행중인 spring boot의 log를 보게되면 kafka에서 정상적으로 수신처리 되는것을 확인할수 있다. 

 

 

 

 

groupId 활용하기

 

@KafkaListener(topics = "chat-room", groupId = "chat-group") 를 보면 topics와 groupId로 분기가 되어있다.

 

groupId를 다르게 하게되면 group이 다르기때문에 같은 메시지를 중복해서 받게 된다. 

그럼 이 groupId를 왜 다르게 쓰는가?

 

package com.example.chat.chat_server.chat.infrastructure.kafka;

import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.example.chat.chat_server.chat.service.ChatService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class ChatKafkaConsumer {

    private final ObjectMapper objectMapper;
    private final ChatService chatService;

    @KafkaListener(topics = "chat-room", groupId = "chat-group")
    public void userMessageListener(ConsumerRecord<String, String> record) {
        try {
            ChatMessage message = objectMapper.readValue(record.value(), ChatMessage.class);
            System.out.println("유저용 Kafka 수신 메시지: " + message);
            chatService.handleReceivedMessage(message);  // WebSocket 전송 등
        } catch (Exception e) {
            System.err.println("[chat-group] 처리 실패: " + e.getMessage());
        }
    }

    // 로그 저장용 Consumer (같은 토픽, 다른 groupId)
    @KafkaListener(topics = "chat-room", groupId = "log-group")
    public void logMessageListener(ConsumerRecord<String, String> record) {
        try {
            ChatMessage message = objectMapper.readValue(record.value(), ChatMessage.class);
            System.out.println("🗃[log-group] 채팅 로그 저장용 수신: " + message);

        } catch (Exception e) {
            System.err.println("[log-group] 처리 실패: " + e.getMessage());
        }
    }
}

 

위와 같이 목적에 따라 같은 메시지를 받아도 처리하는 방식을 분기해서 처리할수 있다. 

 

 

log와 유저용이 분리되어 저장된것을 확인할 수 있다.

 

 

https://github.com/loy124/kafka-chat-server/tree/v1.0.0

 

GitHub - loy124/kafka-chat-server

Contribute to loy124/kafka-chat-server development by creating an account on GitHub.

github.com

 

 전체 코드는 위 사이트에서 다운로드가 가능하다.

 

 

다음은 위 내용들을 기반으로 채팅 서비스를 구현할 예정이다.

 

반응형