https://loy124.tistory.com/413
spring boot + apache kafka 를 활용한 미니 채팅앱 만들어보기(1) - kafka 개요 및 테스트
Apache Kafka kafka는 분산 스트리밍 플랫폼으로서 메시지큐 역할을 할 수 있는 시스템이다.대규모 실시간 데이터 스트림을 안정적으로 처리할수 있는장점이 있다. message queue 는 뭘까? 발신자(Producer)
loy124.tistory.com
이제 위 내용을 기반으로 간단한
test-chat-room 이라는 토픽을 만들어서 해당 데이터를 저장하고 받아오는식으로 진행할 예정이다.
카프카에 데이터 전송하기
클라이언트 → [ChatController] → [ChatService] → [ChatKafkaProducer] → Kafka 토픽(chat-room)
카프카로부터 데이터 조회하기
kafka 토픽(chat-room) -> [ChatKafkaConsumer] -> [ChatService]
spring boot 세팅하기
https://start.spring.io/ 에 접속한 후 위와 같이 세팅 후 spring boot 서버를 생성해준다.
앞으로 만들 서비스의 흐름은 다음과 같다.
클라이언트 → [ChatController] → [ChatService] → [ChatKafkaProducer] → Kafka 토픽(chat-room)
폴더 구조는 다음과 같다.
topic 생성하기
core/config/KafkaTopicConfig.java
package com.example.chat.chat_server.core.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic chatRoomTopic(){
return TopicBuilder
.name("chat-room")
//메시지가 물리적으로 저장되는 단위
.partitions(1)
//복제본 갯수
.replicas(1)
.build();
}
}
위와 같이 생성하면 kafka admin이 자동으로 앱이 기동될때마다 해당 topic을 생성한다.
topic 으로 데이터 전송 구현하기
Dto 생성하기
chat/dto/ChatMessage.java
package com.example.chat.chat_server.chat.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
private String roomId; // 채팅방 ID
private String sender; // 보낸 사람
private String message; // 메시지 내용
}
Producer 생성하기
채팅 메시지를 kafka 'chat-room' 토픽으로 전송하는 클래스 ChatKafkaProducer 를 생성해준다.
chat/infrastructure/kafka/ChatKafkaProducer.java
package com.example.chat.chat_server.chat.infrastructure.kafka;
import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ChatKafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
// Kafka에 JSON 메시지 전송
public void send(ChatMessage chatMessage) {
try {
String jsonMessage = objectMapper.writeValueAsString(chatMessage); // 직렬화
kafkaTemplate.send("chat-room", chatMessage.getRoomId(), jsonMessage); // 토픽 전송
System.out.println("Kafka 전송 완료 → " + jsonMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException("Kafka 메시지 직렬화 실패", e);
}
}
}
Service 생성하기
chat/service/ChatService.java
package com.example.chat.chat_server.chat.service;
import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.example.chat.chat_server.chat.infrastructure.kafka.ChatKafkaProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class ChatService {
private final ChatKafkaProducer chatKafkaProducer;
public void sendMessage(ChatMessage chatMessage) {
chatKafkaProducer.send(chatMessage);
}
}
Controller 생성하기
chat/controller/ChatController.java
package com.example.chat.chat_server.chat.controller;
import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.example.chat.chat_server.chat.service.ChatService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/chat")
public class ChatController {
private final ChatService chatService;
@PostMapping("/send")
public ResponseEntity<String> send(@RequestBody ChatMessage message) {
chatService.sendMessage(message);
return ResponseEntity.ok("메시지 전송 완료");
}
}
위와 같이 작성하게 되면순으로 로직이 완성된다.
클라이언트 → [ChatController] → [ChatService] → [ChatKafkaProducer] → Kafka 토픽(chat-room)
이제 간략하게 테스트를 해볼 시간이다.
먼저 POST 요청이기 때문에 클라이언트 구현 대신 PostMan을 사용해서 테스트 해보겠다.
{
"roomId": 1,
"sender": 1,
"message":"hello-world!"
}
hello-world라는 내용으로 데이터를 보내주었다.
이제 터미널에 들어가서 데이터가 잘 들어왔는지 체크하면 된다.
docker exec -it kafka bash
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chat-room --from-beginning
위와 같이 데이터가 잘 들어온것을 확인할수 있다.
이제 spring boot에서도 consumer를 사용해서 데이터를 받아오는 부분을 만들어 보면 된다.
Consumer로 데이터 수신하기
package com.example.chat.chat_server.chat.infrastructure.kafka;
import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.example.chat.chat_server.chat.service.ChatService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ChatKafkaConsumer {
private final ObjectMapper objectMapper;
private final ChatService chatService;
@KafkaListener(topics = "chat-room", groupId = "chat-group")
public void listen(ConsumerRecord<String, String> record) {
try {
String json = record.value(); // Kafka 메시지 값
ChatMessage message = objectMapper.readValue(json, ChatMessage.class);
System.out.println("Kafka 수신 메시지: " + message);
// 서비스로 위임
chatService.handleReceivedMessage(message);
} catch (Exception e) {
System.err.println("Kafka 메시지 처리 실패: " + e.getMessage());
}
}
}
위와 같이 작성하고 다시 테스트를 진행해 보겠다.
안녕 세상!이라는 message를 보내주고 실행중인 spring boot의 log를 보게되면 kafka에서 정상적으로 수신처리 되는것을 확인할수 있다.
groupId 활용하기
@KafkaListener(topics = "chat-room", groupId = "chat-group") 를 보면 topics와 groupId로 분기가 되어있다.
groupId를 다르게 하게되면 group이 다르기때문에 같은 메시지를 중복해서 받게 된다.
그럼 이 groupId를 왜 다르게 쓰는가?
package com.example.chat.chat_server.chat.infrastructure.kafka;
import com.example.chat.chat_server.chat.dto.ChatMessage;
import com.example.chat.chat_server.chat.service.ChatService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class ChatKafkaConsumer {
private final ObjectMapper objectMapper;
private final ChatService chatService;
@KafkaListener(topics = "chat-room", groupId = "chat-group")
public void userMessageListener(ConsumerRecord<String, String> record) {
try {
ChatMessage message = objectMapper.readValue(record.value(), ChatMessage.class);
System.out.println("유저용 Kafka 수신 메시지: " + message);
chatService.handleReceivedMessage(message); // WebSocket 전송 등
} catch (Exception e) {
System.err.println("[chat-group] 처리 실패: " + e.getMessage());
}
}
// 로그 저장용 Consumer (같은 토픽, 다른 groupId)
@KafkaListener(topics = "chat-room", groupId = "log-group")
public void logMessageListener(ConsumerRecord<String, String> record) {
try {
ChatMessage message = objectMapper.readValue(record.value(), ChatMessage.class);
System.out.println("🗃[log-group] 채팅 로그 저장용 수신: " + message);
} catch (Exception e) {
System.err.println("[log-group] 처리 실패: " + e.getMessage());
}
}
}
위와 같이 목적에 따라 같은 메시지를 받아도 처리하는 방식을 분기해서 처리할수 있다.
https://github.com/loy124/kafka-chat-server/tree/v1.0.0
GitHub - loy124/kafka-chat-server
Contribute to loy124/kafka-chat-server development by creating an account on GitHub.
github.com
전체 코드는 위 사이트에서 다운로드가 가능하다.
다음은 위 내용들을 기반으로 채팅 서비스를 구현할 예정이다.
'프로젝트 > 미니 프로젝트' 카테고리의 다른 글
spring boot + apache kafka 를 활용한 미니 채팅앱 만들어보기(3) - socket.io를 활용한 채팅 구현하기 (0) | 2025.05.06 |
---|---|
spring boot + apache kafka 를 활용한 미니 채팅앱 만들어보기(1) - kafka 개요 및 테스트 (0) | 2025.05.05 |
원격 로그 수집 시스템 구성하기 (elastic stack) (0) | 2022.11.17 |
데이터 백업 시스템 구성하기 (0) | 2022.10.26 |
Docker + PM2 + nestjs + winston.js를 활용한 무중단 운영 시스템 구축하기 (0) | 2021.08.25 |