Back-end

Kafka Consumer Retry 구현

HOONY_612 2024. 2. 27. 22:13
반응형

 

개요

만약 MSA 환경에서 Consumer 동작을 하는 Consumer Logic 중 Exception이 발생하는 경우가 있다.

이러한 경우는 두 가지로 나뉜다.

하나는 써드 파티 APP 과의 Network 지연과 같은 일시적인 에러이다.

이건 재시도로 충분하다.

그러나 두 번째는 진짜 에러이다. 존재하지 않는 사용자라던지 상품이 없다던지 이다.

위 2가지 상황을 대처할 수 있는 방안에 대해 알아보자.

 

개념1: Blocking Retry

일시적인 에러로 인해 Consumer 가 Message 를 가져가서 예외가 발생하는 경우 retry backoff 주기만큼 기다렸다가 재시도 한다.

ConsumerConfiguration 부터 만들어보자.

ACKMode 는 Consumer 예외 시 Broker 로 다시 메세지가 다시 전달되도록 설정한다.

이 때 AckMode.BATCH, AckMode.TIME 으로 설정하면 안된다.

왜냐하면 예를 들어서 여러 메세지를 일괄 처리하는 Consumer라고하면 10개 처리 중 3개 처리에서 예외가 발생한 경우 4~10개가 다시 Consuming 돼서 처리하는 경우가 발생할 수 있다. 

그래서 개별로 메세지 커밋을 관리하는게 좋다.

1초마다 5번 Retry 시도를 한다.

@Slf4j
@Configuration
public class KafkaOrderConsumerConfiguration {

    @Value("${kafka.server.url}")
    private String kafkaUrl;

    @Bean
    public ConsumerFactory<String, Object> factory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "payment");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainer() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(factory());
        factory.setCommonErrorHandler(errorHandler());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        return factory;
    }

    @Bean
    public DefaultErrorHandler errorHandler() {
        BackOff fixedBackOff = new FixedBackOff(1000, 5);
        return new DefaultErrorHandler((consumerRecord, exception) -> {
            log.error("Error Data: {}", consumerRecord.toString());
        }, fixedBackOff);
    }
}

 

이 때 예외를 구분해서 Retry 할지 안할지도 지정할 수 있다.

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
        // logic to execute when all the retry attemps are exhausted
    }, fixedBackOff);
    errorHandler.addRetryableExceptions(SocketTimeoutException.class);
    errorHandler.addNotRetryableExceptions(NullPointerException.class);
    return errorHandler;
}

 

이 방법은 메세지 Process 가 실패 시 Consumer 는 Retry 가 끝날 때까지 Block 상태가 된다는 단점이 있다.

 

실제 Example 로 만약 실패하는 요청을 2번 보내면 순차적으로 아래 로그를 찍으면서 10초가 걸린다.

2024-02-26T21:23:20.731+09:00  INFO 37456 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=3, username=martini1, price=20000)
2024-02-26T21:23:21.769+09:00  INFO 37456 --- [ntainer#0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-1, groupId=payment] Seeking to offset 28 for partition payment-0
2024-02-26T21:23:21.770+09:00 ERROR 37456 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception
	at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.1.1.jar:3.1.1]
	at org.

 

개념2: Non-Blocking Retry

위의 순차적 Consume 을 통한 Retry 방식이 아닌 여러 개의 실패 요청이 와도 비동기적으로 수행한다.

원리는 계속 예외가 발생하여 BackOff, maxAttempt 가 넘어가면 DLQ(Dead Letter Queue) 에 별도로 넣어서 처리하는 방식이다.

autoCreateTopics 를 "True" 로 하는 경우 아래와 같은 Queue 들이 생성된다.

{service명}-retry-{maxAttempt 값} 만큼 Topic 생성된다.

 

그리고 Retry에 대한 Producer KafkaTemplate을 빈으로 등록해주자.

@Bean
public ProducerFactory<String, OrderToPaymentRequest> producerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
    return new DefaultKafkaProducerFactory<>(
            config, new StringSerializer(), new JsonSerializer<>());
}

@Bean
public KafkaTemplate<String, OrderToPaymentRequest> retryableTopicKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

 

다음으로는 @RetryableTopic 어노테이션을 사용하여 kafkaTemplate 및 attempts를 설정하자.

@RetryableTopic(attempts = "2", kafkaTemplate = "retryableTopicKafkaTemplate")
@KafkaListener(topics = "payment", containerFactory = "kafkaListenerContainer")
public void listener(OrderToPaymentRequest request) {
    log.info("Data consuming: {}", request);
    if (request == null) {
        throw new IllegalArgumentException("Request is null");
    }

    Balance balance = balanceRepository.findBalanceByUserName(request.getUsername())
            .orElseThrow(EntityNotFoundException::new);

    balance.balance(request.getPrice());
    balanceRepository.save(balance);
}

 

@DltHandler는 @KafkaListener와 동일한 클래스에 위치시켜서 작성한다.

@DltHandler
public void handleDltPayment(OrderToPaymentRequest request) {
    log.info("Error Data : {}", request.toString());
}

 

위와 같이 작성하면 실패하는 요청을 여러 개 보내도 병렬적으로 실행할 수 있다.

2024-02-27T22:10:30.741+09:00  INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=11, username=martini1, price=20000)
2024-02-27T22:10:30.754+09:00  INFO 29589 --- [0-retry-0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-3, groupId=payment] Seeking to offset 2 for partition payment-retry-0-0
2024-02-27T22:10:31.740+09:00  INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=12, username=martini1, price=20000)
2024-02-27T22:10:33.735+09:00  INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=13, username=martini1, price=20000)
2024-02-27T22:10:33.753+09:00  INFO 29589 --- [0-retry-0-0-C-1] c.example.paymentservice.OrderConsumer   : Data consuming: OrderToPaymentRequest(orderId=11, username=martini1, price=20000)
2024-02-27T22:10:34.263+09:00  INFO 29589 --- [0-retry-0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-3, groupId=payment] Seeking to offset 3 for partition payment-retry-0-0
2024-02-27T22:10:34.265+09:00  INFO 29589 --- [0-retry-1-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-2, groupId=payment] Seeking to offset 2 for partition payment-retry-1-0
....

 

반응형