ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring-kafka] MSA 환경에서 토픽 이름에 맞게 KafkaMessage 역직렬화하여 수신하기
    Back-End/Kafka 2024. 10. 13. 21:15
    728x90

    개요

     

     

    Spring-kafka에서, @KafkaListener 를 활용하여 Consumer Record를 수신 하는 방법은 다양하다.

     

    특히, 내가 원하는 메세지포멧에 맞게 serialize 및 deserialize 하는 과정을 잘 설정하여

     

    비즈니스 로직 단에서의 내가 원하는 타입의 dto를 바로 사용하는 것은 생산성 향상에 아주 큰 도움이 된다.

     

    그래서 공식문서와 레퍼런스들을 참고해 가면서 찾아낸 몇 가지 방법들을 소개해보려 한다.

     

     


     

    1.  KafkaListenerContainerFactory 설정값에서, 내가 원하는 클래스의 deserializer를 직접 설정하기

     

     

    가장 간단한 방법이다.

     

     

    @Configuration
    class KafkaConsumerConfigV1 {
        @Bean
        fun kafkaConsumerConfigV1() : ConcurrentKafkaListenerContainerFactory<String, Any> {
            val factory  = ConcurrentKafkaListenerContainerFactory<String, Any>()
            factory.consumerFactory = consumerFactory()
            factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
            return factory
        }
    
        fun consumerFactory() : ConsumerFactory<String, Any> {
            return DefaultKafkaConsumerFactory(consumerConfig())
        }
    
    
        fun consumerConfig() : Map<String, Any> {
            val props = HashMap<String, Any>()
            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"  // 로컬 실행 시 broker 기본 ip
            props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
            props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
            props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
            props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 5
            return props
        }
    }

     

     

     

    JsonDeserializer를 VALUE DESERIALIZER로 설정 해 줌으로써 해결한다.

     

    하지만 해당 방법은 MSA 환경에서는 정상 동작하지 않을 수 있다는 문제가 있고, 아마 다음과 같은 오류가 뜰 것이다.

     

    Caused by: org.springframework.messaging.converter.MessageConversionException:
    failed to resolve class name. Class not found [um.uamserver.domain.dto.AdsbData]

     

    카프카의 JsonDeserializer는 역직렬화 시 메세지의 헤더에 들어있는 패키지 타입을 보고 역직렬화를 수행한다.

     

    예를 들어, Producer측에서 직렬화를 수행할 때 com.example.msaorder.dto.CreateOrderReq 클래스로 직렬화를 했다면, 수신 측도 역직렬화 시 com.example.msaorder.dto.CreateOrderReq를 활용하여 역직렬화를 시도한다.

     

    하지만 저 패키지 경로는 어디까지나 송신 측의 경로일뿐, 일반적으로는 수신 측과 일치하지 않기 때문에 클래스를 찾을 수 없다는 오류가 뜨는 것이다.

     

     

    이 부분에 대한 해결 방안은 다음 글을 참조바란다 - https://yeon-kr.tistory.com/181

     

     


     

    2. 패키지 구조를 활용하여 역직렬화 클래스 매핑하기

     

     

    ConsumerFactory 설정 시, JsonDeserializer.TYPE_MAPPINGS 에 값을 넣어주는 방법이다.

     

    역직렬화 시 헤더값의 패키지 정보와 해당 값을 어떤 클래스로 역직렬화 할 지를 코드로 명시하면, 런타임 시점에 이를 동적으로 결정하는 방법이다.

     

    @Configuration
    class KafkaConsumerConfigV2 {
    	@Bean
        fun kafkaListenerBatchContainerFactory(
            errorHandler: DefaultErrorHandler
        ) : ConcurrentKafkaListenerContainerFactory<String, Any> {
            val factory  = ConcurrentKafkaListenerContainerFactory<String, Any>()
            factory.setCommonErrorHandler(errorHandler)
            factory.consumerFactory = DefaultKafkaConsumerFactory(consumerConfig())
            return factory
        }
    
        @Bean
        fun consumerConfig() : Map<String, Any> {
            val props = HashMap<String, Any>()
            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
            props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
            props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
            props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
            props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 5
            props[JsonDeserializer.TYPE_MAPPINGS] =
                "com.example.msaorder.dto.ProvisioningItemReq:com.msa.msaitem.dto.ProvisioningItemReq," +
                "com.example.msaorder.dto.CancelProvisioningReq:com.msa.msaitem.dto.CancelProvisioningItemReq"
            props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
            return props
        }
        
        @Bean
        fun errorHandler(kafkaTemplate: KafkaTemplate<Any, Any>): DefaultErrorHandler {
            val backOff = ExponentialBackOffWithMaxRetries(3)
            backOff.initialInterval = 1000
            backOff.multiplier = 2.0
            backOff.maxInterval = 4000
    
    
            val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { record, _ ->
                TopicPartition("Global-topic.DLT", record.partition())
            }
            return DefaultErrorHandler(recoverer, backOff)
        }
    }

     

     

    다음과 같이, TYPE_MAPPINGS에 헤더의 패키지 정보와 매핑할 dto클래스의 매핑 경로를 적어주면 된다.

     

    만약 처리에 실패한 메세지가 존재하면, errorHandler를 통해 DeadLetterQueue로 보내진다.

     

    장점

    • 다른 서비스 도메인 담당자와 협업 진행 시, 패키지 경로와 json 포맷만 협의한다면 간단하고 좋은 방법이다.

     

    단점

    • 프로젝트간의 의존도가 높아져서, 리팩토링을 진행하거나 할 때 매번 해당하는 api 명세를 모두 수정해 주어야 하는 번거로움이 있다. 따라서, 프로젝트 초기에는 많이 번거로울 수 있는 방법이다.

     

     


     

     

     

    3. 토픽 이름을 기반으로  역직렬화 클래스 매핑하기(DelegatingByTopicDeserializer Class  사용)

     

     

    spring kafka 2.8버전 이후 등장한 DelegatingByTopicDeserializer를 활용하여, 토픽 이름 별로 deserializer를 지정해주는 방법이다.

     

     

    @Configuration
    class KafkaConsumerConfig {
        @Bean
        fun kafkaListenerBatchContainerFactory(
            errorHandler: DefaultErrorHandler
        ) : ConcurrentKafkaListenerContainerFactory<String, Any> {
            val factory  = ConcurrentKafkaListenerContainerFactory<String, Any>()
            factory.setCommonErrorHandler(errorHandler)
            factory.consumerFactory = consumerFactory()
            return factory
        }
    
        fun consumerFactory() : ConsumerFactory<String, Any> {
            return DefaultKafkaConsumerFactory(consumerConfig(), StringDeserializer(), DelegatingByTopicDeserializer(
                mapOf(
                    Pattern.compile("order-service.create.order") to JsonDeserializer(ProvisioningItemReq::class.java, false),
                    Pattern.compile("order-service.create.order.DLT") to JsonDeserializer(ProvisioningItemReq::class.java, false)
                ),
                JsonDeserializer(Any::class.java)
            ))
        }
    
        @Bean
        fun consumerConfig() : Map<String, Any> {
            val props = HashMap<String, Any>()
            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
            props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
            props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = DelegatingByTopicDeserializer::class.java
            props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
            props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 5
            props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
            return props
        }
    
        @Bean
        fun errorHandler(kafkaTemplate: KafkaTemplate<Any, Any>): DefaultErrorHandler {
            val backOff = ExponentialBackOffWithMaxRetries(3)
            backOff.initialInterval = 1000
            backOff.multiplier = 2.0
            backOff.maxInterval = 4000
    
    
            val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { record, _ ->
                TopicPartition("Global-topic.DLT", record.partition())
            }
            return DefaultErrorHandler(recoverer, backOff)
        }
    }

     

     

    DefaultKafkaConsumerFactory 객체를 생성할 때, 세 번째 파라미터로 Deserializer의 구현체인 DelegatingByTopicDeserializer를 넘긴다.

     

    그리고 DelegatingByTopicDeserializer 의 파라미터로 Map을 넘기는데, 해당 맵은

    정규표현식을 포함한 Pattern 객체를 key로, JsonDeserializer를 value로 갖는다.

     

    이를 통해 단일 consumerFactory에서 여러 JsonDeserializer를 사용할 수 있게 된다.

     

     

    장점

    • 타 서비스 담당자와 토픽 이름만 협의하면, 쉽게 명세를 맞출 수 있다.
    • 2번 방법에서 나온 단점인 프로젝트간의 의존도를 높이는 문제점을 해결할 수 있다.

     

     

     


     

     

     

    Ref

     

    https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages.html

     

    Receiving Messages :: Spring Kafka

    You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

    docs.spring.io

     

     

    'Back-End > Kafka' 카테고리의 다른 글

    웹 크롤러에 Kafka 도입(2)  (0) 2022.04.01
    웹 크롤러에 Kafka 도입  (0) 2022.03.30
    kafka란?  (0) 2021.10.21

    댓글

Designed by Tistory.