Back-End/Kafka

[Spring-kafka] MSA 환경에서 토픽 이름에 맞게 KafkaMessage 역직렬화하여 수신하기

쩡류 2024. 10. 13. 21:15
728x90

개요

 

 

Spring-kafka에서, @KafkaListener 를 활용하여 Consumer Record를 수신 하는 방법은 다양하다.

 

특히, 내가 원하는 메세지포멧에 맞게 serialize 및 deserialize 하는 과정을 잘 설정하여

 

비즈니스 로직 단에서의 내가 원하는 타입의 dto를 바로 사용하는 것은 생산성 향상에 아주 큰 도움이 된다.

 

그래서 공식문서와 레퍼런스들을 참고해 가면서 찾아낸 몇 가지 방법들을 소개해보려 한다.

 

 


 

1.  KafkaListenerContainerFactory 설정값에서, 내가 원하는 클래스의 deserializer를 직접 설정하기

 

 

가장 간단한 방법이다.

 

 

@Configuration
class KafkaConsumerConfigV1 {
    @Bean
    fun kafkaConsumerConfigV1() : ConcurrentKafkaListenerContainerFactory<String, Any> {
        val factory  = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
        return factory
    }

    fun consumerFactory() : ConsumerFactory<String, Any> {
        return DefaultKafkaConsumerFactory(consumerConfig())
    }


    fun consumerConfig() : Map<String, Any> {
        val props = HashMap<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"  // 로컬 실행 시 broker 기본 ip
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
        props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
        props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 5
        return props
    }
}

 

 

 

JsonDeserializer를 VALUE DESERIALIZER로 설정 해 줌으로써 해결한다.

 

하지만 해당 방법은 MSA 환경에서는 정상 동작하지 않을 수 있다는 문제가 있고, 아마 다음과 같은 오류가 뜰 것이다.

 

Caused by: org.springframework.messaging.converter.MessageConversionException:
failed to resolve class name. Class not found [um.uamserver.domain.dto.AdsbData]

 

카프카의 JsonDeserializer는 역직렬화 시 메세지의 헤더에 들어있는 패키지 타입을 보고 역직렬화를 수행한다.

 

예를 들어, Producer측에서 직렬화를 수행할 때 com.example.msaorder.dto.CreateOrderReq 클래스로 직렬화를 했다면, 수신 측도 역직렬화 시 com.example.msaorder.dto.CreateOrderReq를 활용하여 역직렬화를 시도한다.

 

하지만 저 패키지 경로는 어디까지나 송신 측의 경로일뿐, 일반적으로는 수신 측과 일치하지 않기 때문에 클래스를 찾을 수 없다는 오류가 뜨는 것이다.

 

 

이 부분에 대한 해결 방안은 다음 글을 참조바란다 - https://yeon-kr.tistory.com/181

 

 


 

2. 패키지 구조를 활용하여 역직렬화 클래스 매핑하기

 

 

ConsumerFactory 설정 시, JsonDeserializer.TYPE_MAPPINGS 에 값을 넣어주는 방법이다.

 

역직렬화 시 헤더값의 패키지 정보와 해당 값을 어떤 클래스로 역직렬화 할 지를 코드로 명시하면, 런타임 시점에 이를 동적으로 결정하는 방법이다.

 

@Configuration
class KafkaConsumerConfigV2 {
	@Bean
    fun kafkaListenerBatchContainerFactory(
        errorHandler: DefaultErrorHandler
    ) : ConcurrentKafkaListenerContainerFactory<String, Any> {
        val factory  = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.setCommonErrorHandler(errorHandler)
        factory.consumerFactory = DefaultKafkaConsumerFactory(consumerConfig())
        return factory
    }

    @Bean
    fun consumerConfig() : Map<String, Any> {
        val props = HashMap<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = JsonDeserializer::class.java
        props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
        props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 5
        props[JsonDeserializer.TYPE_MAPPINGS] =
            "com.example.msaorder.dto.ProvisioningItemReq:com.msa.msaitem.dto.ProvisioningItemReq," +
            "com.example.msaorder.dto.CancelProvisioningReq:com.msa.msaitem.dto.CancelProvisioningItemReq"
        props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
        return props
    }
    
    @Bean
    fun errorHandler(kafkaTemplate: KafkaTemplate<Any, Any>): DefaultErrorHandler {
        val backOff = ExponentialBackOffWithMaxRetries(3)
        backOff.initialInterval = 1000
        backOff.multiplier = 2.0
        backOff.maxInterval = 4000


        val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { record, _ ->
            TopicPartition("Global-topic.DLT", record.partition())
        }
        return DefaultErrorHandler(recoverer, backOff)
    }
}

 

 

다음과 같이, TYPE_MAPPINGS에 헤더의 패키지 정보와 매핑할 dto클래스의 매핑 경로를 적어주면 된다.

 

만약 처리에 실패한 메세지가 존재하면, errorHandler를 통해 DeadLetterQueue로 보내진다.

 

장점

  • 다른 서비스 도메인 담당자와 협업 진행 시, 패키지 경로와 json 포맷만 협의한다면 간단하고 좋은 방법이다.

 

단점

  • 프로젝트간의 의존도가 높아져서, 리팩토링을 진행하거나 할 때 매번 해당하는 api 명세를 모두 수정해 주어야 하는 번거로움이 있다. 따라서, 프로젝트 초기에는 많이 번거로울 수 있는 방법이다.

 

 


 

 

 

3. 토픽 이름을 기반으로  역직렬화 클래스 매핑하기(DelegatingByTopicDeserializer Class  사용)

 

 

spring kafka 2.8버전 이후 등장한 DelegatingByTopicDeserializer를 활용하여, 토픽 이름 별로 deserializer를 지정해주는 방법이다.

 

 

@Configuration
class KafkaConsumerConfig {
    @Bean
    fun kafkaListenerBatchContainerFactory(
        errorHandler: DefaultErrorHandler
    ) : ConcurrentKafkaListenerContainerFactory<String, Any> {
        val factory  = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.setCommonErrorHandler(errorHandler)
        factory.consumerFactory = consumerFactory()
        return factory
    }

    fun consumerFactory() : ConsumerFactory<String, Any> {
        return DefaultKafkaConsumerFactory(consumerConfig(), StringDeserializer(), DelegatingByTopicDeserializer(
            mapOf(
                Pattern.compile("order-service.create.order") to JsonDeserializer(ProvisioningItemReq::class.java, false),
                Pattern.compile("order-service.create.order.DLT") to JsonDeserializer(ProvisioningItemReq::class.java, false)
            ),
            JsonDeserializer(Any::class.java)
        ))
    }

    @Bean
    fun consumerConfig() : Map<String, Any> {
        val props = HashMap<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ErrorHandlingDeserializer::class.java
        props[ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS] = DelegatingByTopicDeserializer::class.java
        props[ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS] = StringDeserializer::class.java
        props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 5
        props[JsonDeserializer.TRUSTED_PACKAGES] = "*"
        return props
    }

    @Bean
    fun errorHandler(kafkaTemplate: KafkaTemplate<Any, Any>): DefaultErrorHandler {
        val backOff = ExponentialBackOffWithMaxRetries(3)
        backOff.initialInterval = 1000
        backOff.multiplier = 2.0
        backOff.maxInterval = 4000


        val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { record, _ ->
            TopicPartition("Global-topic.DLT", record.partition())
        }
        return DefaultErrorHandler(recoverer, backOff)
    }
}

 

 

DefaultKafkaConsumerFactory 객체를 생성할 때, 세 번째 파라미터로 Deserializer의 구현체인 DelegatingByTopicDeserializer를 넘긴다.

 

그리고 DelegatingByTopicDeserializer 의 파라미터로 Map을 넘기는데, 해당 맵은

정규표현식을 포함한 Pattern 객체를 key로, JsonDeserializer를 value로 갖는다.

 

이를 통해 단일 consumerFactory에서 여러 JsonDeserializer를 사용할 수 있게 된다.

 

 

장점

  • 타 서비스 담당자와 토픽 이름만 협의하면, 쉽게 명세를 맞출 수 있다.
  • 2번 방법에서 나온 단점인 프로젝트간의 의존도를 높이는 문제점을 해결할 수 있다.

 

 

 


 

 

 

Ref

 

https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages.html

 

Receiving Messages :: Spring Kafka

You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

docs.spring.io