ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Kafka Consumer Retry 구현
    Back-end 2024. 2. 27. 22:13
    반응형

     

    개요

    만약 MSA 환경에서 Consumer 동작을 하는 Consumer Logic 중 Exception이 발생하는 경우가 있다.

    이러한 경우는 두 가지로 나뉜다.

    하나는 써드 파티 APP 과의 Network 지연과 같은 일시적인 에러이다.

    이건 재시도로 충분하다.

    그러나 두 번째는 진짜 에러이다. 존재하지 않는 사용자라던지 상품이 없다던지 이다.

    위 2가지 상황을 대처할 수 있는 방안에 대해 알아보자.

     

    개념1: Blocking Retry

    일시적인 에러로 인해 Consumer 가 Message 를 가져가서 예외가 발생하는 경우 retry backoff 주기만큼 기다렸다가 재시도 한다.

    ConsumerConfiguration 부터 만들어보자.

    ACKMode 는 Consumer 예외 시 Broker 로 다시 메세지가 다시 전달되도록 설정한다.

    이 때 AckMode.BATCH, AckMode.TIME 으로 설정하면 안된다.

    왜냐하면 예를 들어서 여러 메세지를 일괄 처리하는 Consumer라고하면 10개 처리 중 3개 처리에서 예외가 발생한 경우 4~10개가 다시 Consuming 돼서 처리하는 경우가 발생할 수 있다. 

    그래서 개별로 메세지 커밋을 관리하는게 좋다.

    1초마다 5번 Retry 시도를 한다.

    @Slf4j
    @Configuration
    public class KafkaOrderConsumerConfiguration {
    
        @Value("${kafka.server.url}")
        private String kafkaUrl;
    
        @Bean
        public ConsumerFactory<String, Object> factory() {
            Map<String, Object> config = new HashMap<>();
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "payment");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    
            return new DefaultKafkaConsumerFactory<>(config);
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainer() {
            ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(factory());
            factory.setCommonErrorHandler(errorHandler());
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
            return factory;
        }
    
        @Bean
        public DefaultErrorHandler errorHandler() {
            BackOff fixedBackOff = new FixedBackOff(1000, 5);
            return new DefaultErrorHandler((consumerRecord, exception) -> {
                log.error("Error Data: {}", consumerRecord.toString());
            }, fixedBackOff);
        }
    }

     

    이 때 예외를 구분해서 Retry 할지 안할지도 지정할 수 있다.

    @Bean
    public DefaultErrorHandler errorHandler() {
        BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
        DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
            // logic to execute when all the retry attemps are exhausted
        }, fixedBackOff);
        errorHandler.addRetryableExceptions(SocketTimeoutException.class);
        errorHandler.addNotRetryableExceptions(NullPointerException.class);
        return errorHandler;
    }

     

    이 방법은 메세지 Process 가 실패 시 Consumer 는 Retry 가 끝날 때까지 Block 상태가 된다는 단점이 있다.

     

    실제 Example 로 만약 실패하는 요청을 2번 보내면 순차적으로 아래 로그를 찍으면서 10초가 걸린다.

    2024-02-26T21:23:20.731+09:00  INFO 37456 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=3, username=martini1, price=20000)
    2024-02-26T21:23:21.769+09:00  INFO 37456 --- [ntainer#0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-1, groupId=payment] Seeking to offset 28 for partition payment-0
    2024-02-26T21:23:21.770+09:00 ERROR 37456 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Error handler threw an exception
    
    org.springframework.kafka.KafkaException: Seek to current after exception
    	at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-3.1.1.jar:3.1.1]
    	at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.1.1.jar:3.1.1]
    	at org.

     

    개념2: Non-Blocking Retry

    위의 순차적 Consume 을 통한 Retry 방식이 아닌 여러 개의 실패 요청이 와도 비동기적으로 수행한다.

    원리는 계속 예외가 발생하여 BackOff, maxAttempt 가 넘어가면 DLQ(Dead Letter Queue) 에 별도로 넣어서 처리하는 방식이다.

    autoCreateTopics 를 "True" 로 하는 경우 아래와 같은 Queue 들이 생성된다.

    {service명}-retry-{maxAttempt 값} 만큼 Topic 생성된다.

     

    그리고 Retry에 대한 Producer KafkaTemplate을 빈으로 등록해주자.

    @Bean
    public ProducerFactory<String, OrderToPaymentRequest> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        return new DefaultKafkaProducerFactory<>(
                config, new StringSerializer(), new JsonSerializer<>());
    }
    
    @Bean
    public KafkaTemplate<String, OrderToPaymentRequest> retryableTopicKafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

     

    다음으로는 @RetryableTopic 어노테이션을 사용하여 kafkaTemplate 및 attempts를 설정하자.

    @RetryableTopic(attempts = "2", kafkaTemplate = "retryableTopicKafkaTemplate")
    @KafkaListener(topics = "payment", containerFactory = "kafkaListenerContainer")
    public void listener(OrderToPaymentRequest request) {
        log.info("Data consuming: {}", request);
        if (request == null) {
            throw new IllegalArgumentException("Request is null");
        }
    
        Balance balance = balanceRepository.findBalanceByUserName(request.getUsername())
                .orElseThrow(EntityNotFoundException::new);
    
        balance.balance(request.getPrice());
        balanceRepository.save(balance);
    }

     

    @DltHandler는 @KafkaListener와 동일한 클래스에 위치시켜서 작성한다.

    @DltHandler
    public void handleDltPayment(OrderToPaymentRequest request) {
        log.info("Error Data : {}", request.toString());
    }

     

    위와 같이 작성하면 실패하는 요청을 여러 개 보내도 병렬적으로 실행할 수 있다.

    2024-02-27T22:10:30.741+09:00  INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=11, username=martini1, price=20000)
    2024-02-27T22:10:30.754+09:00  INFO 29589 --- [0-retry-0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-3, groupId=payment] Seeking to offset 2 for partition payment-retry-0-0
    2024-02-27T22:10:31.740+09:00  INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=12, username=martini1, price=20000)
    2024-02-27T22:10:33.735+09:00  INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=13, username=martini1, price=20000)
    2024-02-27T22:10:33.753+09:00  INFO 29589 --- [0-retry-0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=11, username=martini1, price=20000)
    2024-02-27T22:10:34.263+09:00  INFO 29589 --- [0-retry-0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-3, groupId=payment] Seeking to offset 3 for partition payment-retry-0-0
    2024-02-27T22:10:34.265+09:00  INFO 29589 --- [0-retry-1-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-2, groupId=payment] Seeking to offset 2 for partition payment-retry-1-0
    ....

     

    반응형

    댓글

Designed by Tistory.