들어가며..
- 기존에는 터미널에서 CLI를 이용해 생산자를 생성했지만, 이제는 스프링 부트를 사용해 터미널 없이 생산자를 생성해보자.
- 스프링 부트 서비스를 생산자로 설정하여 아래와 같은 역할을 수행하도록 하자:
- 메시지 전송(publish)
- 메시지를 바이너리 형식으로 직렬화(serialize)
- 메시지를 보낼 토픽과 파티션 지정
동기 vs 비동기
동기식(Synchronous) 메시지 전송
- 생산자가 브로커에 메시지를 전송할 때, 메시지가 브로커에 성공적으로 저장될 때까지 대기하는 방식으로 다른 작업이 잠시 Blocked됨.
- 신뢰성이 높아 주로 사용되지만, 성능이 다소 느려질 수 있음
- 언제 사용?:신뢰성과 전달 보장이 중요한 상황에서 활용
- 사용자 로그인 인증, 금융 거래, 데이터 업데이트 등
비동기식(asynchronous) 메시지 전송
- 생산자가 브로커에 메시지를 전송하면서 브로커의 응답을 기다리지 않고 계속해서 코드 실행을 이어가는 방식(unblocked)
- 브로커의 응답을 받을 수 있지만 나중에 받게 됨
- 언제 사용?:데이터 유실이 큰 문제가 되지 않거나 실시간 처리가 필요하지 않은 비핵심 작업
- 로그 기록, 분석을 위한 이벤트 수집, 알림 전송 등
+ 멱등성(idempotent)
문제 | 해결 |
![]() 카프카 브로커에 성공적으로 저장이 되었지만, 응답에서 오류가 발생해 생산자가 메시지 재전송을 해서 중복저장이 발생한 문제 |
![]() 카프카 브로커에서 이전에 동일한 메시지가 저장이 되어있으면 새로 저장하지 않고 응답 신호만 전달 |
구현 순서 요약
- 의존성 추가
- Config 파일 작성
1. 의존성 추가
dependencies {
...
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
2. Config 파일 작성
Topic 설정
- Topic 이름 지정
- 파티션 설정
- 복제본 설정: 복제본은 실행한 카프카 브로커 수만큼
- 동기화 설정: Topic이 저장 되어야 하는 최소 브로커 개수
- Topic이 설정한 수 만큼 저장이 안된다면 예외 발생
- 속도는 느려지지만 안정성은 증가
Kafka 프로듀서 설정
- 카프카 서버 설정: bootstrap-servers는 카프카 클러스터의 시작점 역할을 하는 브로커(서버)들의 주소를 지정하는 곳
- 메시지 키 직렬화 설정 : 문자열로 지정
- 메시지 값 직렬화 설정: Json으로 지정
- 신뢰성 설정:
- retries: 최대 10회 재시도
- delivery.timeout.ms: 전송 최대 대기 시간 120초
- retry.backoff.ms: 재시도 간의 대기 시간 1000ms
- acks: 모든 리더와 팔로워가 메시지를 확인해야 성공
- IDEMPOTENCE: 멱등성 활성화( 메시지 중복 저장 방지 )
@Configuration
public class KafkaConfig {
@Bean
NewTopic createTopic(){
return TopicBuilder.name("product-created-events-topic") //토픽 이름
.partitions(3) // 파티션 설정
.replicas(3) // 복제본 설정
.configs(Map.of("min.insync.replicas", "2")) //동기화 설정
.build();
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094"); // 브로커 주소
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 키 직렬화
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 값 직렬화
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 모든 리더와 팔로워가 확인해야 성공
configProps.put(ProducerConfig.RETRIES_CONFIG, 10); // 최대 재시도 횟수
configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 재시도 간 대기 시간(ms)
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 최대 전송 시간(ms)
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성 활성화 (중복 전송 방지)
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
예제 코드
컨트롤러
@RestController
@RequestMapping("/products")
@RequiredArgsConstructor
public class ProductController {
private final ProductService productService;
//상품이 생성되면 상품 db에 저장하고 상품 저장 이벤트를 topic으로 발행
@PostMapping
public ResponseEntity<String> createProduct(@RequestBody CreateProductRestModel product){
String productId = productService.createProduct(product);
return ResponseEntity.status(HttpStatus.CREATED).body(productId);
}
}
서비스
비동기 방식
@Service
@Slf4j
@RequiredArgsConstructor
public class ProductService {
// Kafka 메시지를 보내는 데 사용되는 KafkaTemplate
private final KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
public String createProduct(CreateProductRestModel productRestModel) {
// 고유한 제품 ID 생성
String productId = UUID.randomUUID().toString();
// 제품 생성 이벤트 객체 생성
ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(
productId,
productRestModel.title(),
productRestModel.price(),
productRestModel.quantity()
);
// Kafka로 메시지 비동기 전송
CompletableFuture<SendResult<String, ProductCreatedEvent>> future =
kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent);
// 메시지 전송 성공/실패 처리
future.whenComplete((result, exception) -> {
if (exception != null) {
// 전송 실패 시 에러 로그
log.error("Failed to send message: ", exception.getMessage());
} else {
// 전송 성공 시 성공 로그
log.info("Message sent successfully", result.getRecordMetadata());
}
});
// 생성된 제품 ID 반환
return productId;
}
}
동기 방식
- 정의: 메시지를 브로커로 보낸 후, 전송이 완료되거나 실패가 확인될 때까지 대기하는 방식
- 장점: 메시지가 손실되지 않고 카프카에 유지된다는 보장을 가질 수 있음
- 단점: 속도가 느려짐
@Service
@Slf4j
@RequiredArgsConstructor
public class ProductService {
// Kafka 메시지를 보내는 데 사용되는 KafkaTemplate
private final KafkaTemplate<String, Object> kafkaTemplate;
public String createProduct(CreateProductRestModel productRestModel) throws Exception {
// 고유한 제품 ID 생성
String productId = UUID.randomUUID().toString();
// 제품 생성 이벤트 객체 생성
ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(
productId,
productRestModel.title(),
productRestModel.price(),
productRestModel.quantity()
);
// Kafka로 메시지 동기 전송
SendResult<String, Object> result =
kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent).get();
// (선택) 메시지 저장 후 메타데이터 로깅 처리
log.info("Partition: " + result.getRecordMetadata().partition());
log.info("Topic: " + result.getRecordMetadata().topic());
log.info("Offset: " + result.getRecordMetadata().offset());
// 생성된 제품 ID 반환
return productId;
}
}