Kafka/Spring

Kafka Producers - Spring Boot

초코chip 2024. 12. 17. 16:06

들어가며..

  • 기존에는 터미널에서 CLI를 이용해 생산자를 생성했지만, 이제는 스프링 부트를 사용해 터미널 없이 생산자를 생성해보자.
  • 스프링 부트 서비스를 생산자로 설정하여 아래와 같은 역할을 수행하도록 하자:
    • 메시지 전송(publish)
    • 메시지를 바이너리 형식으로 직렬화(serialize)
    • 메시지를 보낼 토픽과 파티션 지정

 

 

동기 vs 비동기

동기식(Synchronous) 메시지 전송

    • 생산자가 브로커에 메시지를 전송할 때, 메시지가 브로커에 성공적으로 저장될 때까지 대기하는 방식으로 다른 작업이 잠시 Blocked됨.
    • 신뢰성이 높아 주로 사용되지만, 성능이 다소 느려질 수 있음
    • 언제 사용?:신뢰성과 전달 보장이 중요한 상황에서 활용 
      • 사용자 로그인 인증, 금융 거래, 데이터 업데이트 등

 

 

 

비동기식(asynchronous) 메시지 전송

  • 생산자가 브로커에 메시지를 전송하면서 브로커의 응답을 기다리지 않고 계속해서 코드 실행을 이어가는 방식(unblocked)
    • 브로커의 응답을 받을 수 있지만 나중에 받게 됨
  • 언제 사용?:데이터 유실이 큰 문제가 되지 않거나 실시간 처리가 필요하지 않은 비핵심 작업
    • 로그 기록, 분석을 위한 이벤트 수집, 알림 전송 등

 

+ 멱등성(idempotent)

 문제 해결

카프카 브로커에 성공적으로 저장이 되었지만, 응답에서 오류가 발생해 생산자가 메시지 재전송을 해서 중복저장이 발생한 문제



카프카 브로커에서 이전에 동일한 메시지가 저장이 되어있으면 새로 저장하지 않고 응답 신호만 전달
 

구현 순서 요약

  1. 의존성 추가
  2. Config 파일 작성

 

1. 의존성 추가

dependencies {
	...
	implementation 'org.springframework.kafka:spring-kafka'
	testImplementation 'org.springframework.kafka:spring-kafka-test'
}

 

 

2. Config 파일 작성

Topic 설정

  1. Topic 이름 지정
  2. 파티션 설정
  3. 복제본 설정: 복제본은 실행한 카프카 브로커 수만큼
  4. 동기화 설정: Topic이 저장 되어야 하는 최소 브로커 개수
    • Topic이 설정한 수 만큼 저장이 안된다면 예외 발생
    • 속도는 느려지지만 안정성은 증가

 

Kafka 프로듀서 설정

  1. 카프카 서버 설정: bootstrap-servers는 카프카 클러스터의 시작점 역할을 하는 브로커(서버)들의 주소를 지정하는 곳
  2. 메시지 키 직렬화 설정 : 문자열로 지정
  3. 메시지 값 직렬화 설정: Json으로 지정
  4. 신뢰성 설정:
    • retries: 최대 10회 재시도
    • delivery.timeout.ms: 전송 최대 대기 시간 120초
    • retry.backoff.ms: 재시도 간의 대기 시간 1000ms
    • acks: 모든 리더와 팔로워가 메시지를 확인해야 성공
    • IDEMPOTENCE: 멱등성 활성화( 메시지 중복 저장 방지 ) 
@Configuration
public class KafkaConfig {

    @Bean
    NewTopic createTopic(){
        return TopicBuilder.name("product-created-events-topic") //토픽 이름
                .partitions(3)	// 파티션 설정
                .replicas(3)	// 복제본 설정
                .configs(Map.of("min.insync.replicas", "2")) //동기화 설정
                .build();
    }
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();

        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9094"); // 브로커 주소
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 키 직렬화
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 값 직렬화
        configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 모든 리더와 팔로워가 확인해야 성공
        configProps.put(ProducerConfig.RETRIES_CONFIG, 10); // 최대 재시도 횟수
        configProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 재시도 간 대기 시간(ms)
        configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 최대 전송 시간(ms)
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성 활성화 (중복 전송 방지)
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

예제 코드

컨트롤러

@RestController
@RequestMapping("/products")
@RequiredArgsConstructor
public class ProductController {
    private final ProductService productService;

    //상품이 생성되면 상품 db에 저장하고 상품 저장 이벤트를 topic으로 발행
    @PostMapping
    public ResponseEntity<String> createProduct(@RequestBody CreateProductRestModel product){

        String productId = productService.createProduct(product);

        return ResponseEntity.status(HttpStatus.CREATED).body(productId);
    }
}

 

서비스

비동기 방식

@Service
@Slf4j
@RequiredArgsConstructor
public class ProductService {

    // Kafka 메시지를 보내는 데 사용되는 KafkaTemplate
    private final KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;

    public String createProduct(CreateProductRestModel productRestModel) {

        // 고유한 제품 ID 생성
        String productId = UUID.randomUUID().toString();

        // 제품 생성 이벤트 객체 생성
        ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(
            productId, 
            productRestModel.title(), 
            productRestModel.price(), 
            productRestModel.quantity()
        );

        // Kafka로 메시지 비동기 전송
        CompletableFuture<SendResult<String, ProductCreatedEvent>> future =
                kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent);

        // 메시지 전송 성공/실패 처리
        future.whenComplete((result, exception) -> {
           if (exception != null) {
                // 전송 실패 시 에러 로그
                log.error("Failed to send message: ", exception.getMessage());
           } else {
                // 전송 성공 시 성공 로그
                log.info("Message sent successfully", result.getRecordMetadata());
           }
        });

        // 생성된 제품 ID 반환
        return productId;
    }
}

 

 

동기 방식

  • 정의: 메시지를 브로커로 보낸 후, 전송이 완료되거나 실패가 확인될 때까지 대기하는 방식
  • 장점: 메시지가 손실되지 않고 카프카에 유지된다는 보장을 가질 수 있음
  • 단점: 속도가 느려짐
@Service
@Slf4j
@RequiredArgsConstructor
public class ProductService {

    // Kafka 메시지를 보내는 데 사용되는 KafkaTemplate
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public String createProduct(CreateProductRestModel productRestModel) throws Exception {

        // 고유한 제품 ID 생성
        String productId = UUID.randomUUID().toString();

        // 제품 생성 이벤트 객체 생성
        ProductCreatedEvent productCreatedEvent = new ProductCreatedEvent(
                productId,
                productRestModel.title(),
                productRestModel.price(),
                productRestModel.quantity()
        );

        // Kafka로 메시지 동기 전송
        SendResult<String, Object> result =
                kafkaTemplate.send("product-created-events-topic", productId, productCreatedEvent).get();

        // (선택) 메시지 저장 후 메타데이터 로깅 처리
        log.info("Partition: " + result.getRecordMetadata().partition());
        log.info("Topic: " + result.getRecordMetadata().topic());
        log.info("Offset: " + result.getRecordMetadata().offset());

        // 생성된 제품 ID 반환
        return productId;
    }
}