-
Kafka Consumer Retry 구현Back-end 2024. 2. 27. 22:13반응형
개요
만약 MSA 환경에서 Consumer 동작을 하는 Consumer Logic 중 Exception이 발생하는 경우가 있다.
이러한 경우는 두 가지로 나뉜다.
하나는 써드 파티 APP 과의 Network 지연과 같은 일시적인 에러이다.
이건 재시도로 충분하다.
그러나 두 번째는 진짜 에러이다. 존재하지 않는 사용자라던지 상품이 없다던지 이다.
위 2가지 상황을 대처할 수 있는 방안에 대해 알아보자.
개념1: Blocking Retry
일시적인 에러로 인해 Consumer 가 Message 를 가져가서 예외가 발생하는 경우 retry backoff 주기만큼 기다렸다가 재시도 한다.
ConsumerConfiguration 부터 만들어보자.
ACKMode 는 Consumer 예외 시 Broker 로 다시 메세지가 다시 전달되도록 설정한다.
이 때 AckMode.BATCH, AckMode.TIME 으로 설정하면 안된다.
왜냐하면 예를 들어서 여러 메세지를 일괄 처리하는 Consumer라고하면 10개 처리 중 3개 처리에서 예외가 발생한 경우 4~10개가 다시 Consuming 돼서 처리하는 경우가 발생할 수 있다.
그래서 개별로 메세지 커밋을 관리하는게 좋다.
1초마다 5번 Retry 시도를 한다.
@Slf4j @Configuration public class KafkaOrderConsumerConfiguration { @Value("${kafka.server.url}") private String kafkaUrl; @Bean public ConsumerFactory<String, Object> factory() { Map<String, Object> config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl); config.put(ConsumerConfig.GROUP_ID_CONFIG, "payment"); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); return new DefaultKafkaConsumerFactory<>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainer() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(factory()); factory.setCommonErrorHandler(errorHandler()); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); return factory; } @Bean public DefaultErrorHandler errorHandler() { BackOff fixedBackOff = new FixedBackOff(1000, 5); return new DefaultErrorHandler((consumerRecord, exception) -> { log.error("Error Data: {}", consumerRecord.toString()); }, fixedBackOff); } }
이 때 예외를 구분해서 Retry 할지 안할지도 지정할 수 있다.
@Bean public DefaultErrorHandler errorHandler() { BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts); DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> { // logic to execute when all the retry attemps are exhausted }, fixedBackOff); errorHandler.addRetryableExceptions(SocketTimeoutException.class); errorHandler.addNotRetryableExceptions(NullPointerException.class); return errorHandler; }
이 방법은 메세지 Process 가 실패 시 Consumer 는 Retry 가 끝날 때까지 Block 상태가 된다는 단점이 있다.
실제 Example 로 만약 실패하는 요청을 2번 보내면 순차적으로 아래 로그를 찍으면서 10초가 걸린다.
2024-02-26T21:23:20.731+09:00 INFO 37456 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer : Data consuming: OrderToPaymentRequest(orderId=3, username=martini1, price=20000) 2024-02-26T21:23:21.769+09:00 INFO 37456 --- [ntainer#0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-1, groupId=payment] Seeking to offset 28 for partition payment-0 2024-02-26T21:23:21.770+09:00 ERROR 37456 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Error handler threw an exception org.springframework.kafka.KafkaException: Seek to current after exception at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:208) ~[spring-kafka-3.1.1.jar:3.1.1] at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.1.1.jar:3.1.1] at org.
개념2: Non-Blocking Retry
위의 순차적 Consume 을 통한 Retry 방식이 아닌 여러 개의 실패 요청이 와도 비동기적으로 수행한다.
원리는 계속 예외가 발생하여 BackOff, maxAttempt 가 넘어가면 DLQ(Dead Letter Queue) 에 별도로 넣어서 처리하는 방식이다.
autoCreateTopics 를 "True" 로 하는 경우 아래와 같은 Queue 들이 생성된다.
{service명}-retry-{maxAttempt 값} 만큼 Topic 생성된다.
그리고 Retry에 대한 Producer KafkaTemplate을 빈으로 등록해주자.
@Bean public ProducerFactory<String, OrderToPaymentRequest> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl); return new DefaultKafkaProducerFactory<>( config, new StringSerializer(), new JsonSerializer<>()); } @Bean public KafkaTemplate<String, OrderToPaymentRequest> retryableTopicKafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
다음으로는 @RetryableTopic 어노테이션을 사용하여 kafkaTemplate 및 attempts를 설정하자.
@RetryableTopic(attempts = "2", kafkaTemplate = "retryableTopicKafkaTemplate") @KafkaListener(topics = "payment", containerFactory = "kafkaListenerContainer") public void listener(OrderToPaymentRequest request) { log.info("Data consuming: {}", request); if (request == null) { throw new IllegalArgumentException("Request is null"); } Balance balance = balanceRepository.findBalanceByUserName(request.getUsername()) .orElseThrow(EntityNotFoundException::new); balance.balance(request.getPrice()); balanceRepository.save(balance); }
@DltHandler는 @KafkaListener와 동일한 클래스에 위치시켜서 작성한다.
@DltHandler public void handleDltPayment(OrderToPaymentRequest request) { log.info("Error Data : {}", request.toString()); }
위와 같이 작성하면 실패하는 요청을 여러 개 보내도 병렬적으로 실행할 수 있다.
2024-02-27T22:10:30.741+09:00 INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer : Data consuming: OrderToPaymentRequest(orderId=11, username=martini1, price=20000) 2024-02-27T22:10:30.754+09:00 INFO 29589 --- [0-retry-0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-3, groupId=payment] Seeking to offset 2 for partition payment-retry-0-0 2024-02-27T22:10:31.740+09:00 INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer : Data consuming: OrderToPaymentRequest(orderId=12, username=martini1, price=20000) 2024-02-27T22:10:33.735+09:00 INFO 29589 --- [ntainer#0-0-C-1] c.example.paymentservice.OrderConsumer : Data consuming: OrderToPaymentRequest(orderId=13, username=martini1, price=20000) 2024-02-27T22:10:33.753+09:00 INFO 29589 --- [0-retry-0-0-C-1] c.example.paymentservice.OrderConsumer : Data consuming: OrderToPaymentRequest(orderId=11, username=martini1, price=20000) 2024-02-27T22:10:34.263+09:00 INFO 29589 --- [0-retry-0-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-3, groupId=payment] Seeking to offset 3 for partition payment-retry-0-0 2024-02-27T22:10:34.265+09:00 INFO 29589 --- [0-retry-1-0-C-1] fkaConsumerFactory$ExtendedKafkaConsumer : [Consumer clientId=consumer-payment-2, groupId=payment] Seeking to offset 2 for partition payment-retry-1-0 ....
반응형'Back-end' 카테고리의 다른 글
MSA - Outbox Pattern? Saga Pattern? (0) 2024.03.19 TestCode 의 properties override 에 대하여 (1) 2024.03.08 멀티 모듈에서 @ConditionalOnProperty 사용 시 주의사항 (0) 2024.03.07 Micrometer custom metric 생성하기 (0) 2024.01.28