ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 웹 크롤러에 Kafka 도입
    Back-End/Kafka 2022. 3. 30. 15:40
    728x90
    • 수습 기간 프로젝트로 진행한 웹 크롤러에 RabbitMQ에 이어서 Kafka를 넣기로 결정.
    • 원래는 consumer group이 하나라서 메세지가 재활용되지 않고, 적은 데이터에서는 Kafka보다 우위이기 때문에 RabbitMQ를 사용하기로 했지만, Kafka를 그냥 써 보고 싶어서 브랜치를 하나 따서 Kafka를 사용해서 성능을 비교하기로 했다.

     

    간단한 Flow

     

     

    - 카프카 브로커 URL 부트스트랩 : 카프카 프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나 이상의 브로커에 연결한다. 프로듀서가 연결하길 원하는 첫 번째 브로커가 다운될 경우를 대비하여 보통 한개 이상의 브로커 URL 리스트를 설정한다.

     

    - 데이터 직렬화 : 카프카는 TCP 기반의 데이터 송수신을 위해 이진 프로토콜을 사용한다. 이는 카프카에 데이터를 기록할 때, 프로듀서는 미리 설정한 직렬화 클래스를 이용해 직렬화 한 이후에 전송한다. 즉, 카프카 프로듀서는 네트워크 상의 브로커들에게 데이터를 보내기 전에 모든 메시지 데이터 객체를 바이트 배열로 직렬화한다.

     

    - 토픽 파티션의 결정 :  어떤 파티션으로 데이터가 전송 돼야 하는지 결정하는 일은 카프카 프로듀서의 책임이다. 만약 프로듀서가 데이터를 보내기 위해 API에 파티션을 명시하지 않은 경우 파티셔너에 의해 키의 해시값을 이용해 파티션을 결정하고 데이터를 보낸다. 하지만 키값이 존재하지 않으면 라운드 로빈 방식으로 파티션을 선택하여 데이터를 전송한다.

     

    - 처리 실패/재시도 : 처리 실패 응답이나 재시도 횟수는 프로듀서 애플리케이션에 의해 제어된다.

     

    - 배치처리 : 호율적인 메시지 전송을 위해서 배치는 매우 유용하다. 미리 설정한 바이트 임계치를 넘지 않으면 버퍼에 데이터를 유지하고 임계치를 넘으면 데이터를 토픽으로 전송한다.



     

    소프트웨어 설치 및 설정

     

    • kafka 2.13-2.8 실행
    • zookeeper 실행
    • kafka topic 추가

     

    kafka_2.13-2.8.0 % bin/kafka-topics.sh --create --topic queue-html --bootstrap-server localhost:9092

     

    kafka 실행

     

    bin/kafka-server-start.sh config/server.properties

    Common 

     

    - spring-kafka 디펜던시 추가 (trawler, workers 모두)

     

    - spring template config 추가

     

     

     

    • bootstrap.servers : 카프카 브로커 주소의 목록을 설정한다. 주소는 hostname:port 형식으로 지정되며 하나 이상의 브로커 리스트를 지정한다.
    • key.serializer : 메시지는 key-value 쌍으로 이뤄진 형태로 카프카 브로커에게 전송된다. 브로커는 이 키값이 바이트 배열로 돼있다고 가정한다. 그래서 프로듀서에게 어떠한 직렬화 클래스가 키값을 바이트 배열로 변환할때 사용됬는 지 알려주어야한다. 카프카는 내장된 직렬화 클래스를 제공한다.(ByteArraySerializer, StringSerializer, IntegerSerializer / org.apache.kafka.common.serialization)
    • value.serializer : key.serializer 속성과 유사하지만 이 속성은 프로듀서가 값을 직렬화 하기 위해 사용하는 클래스를 알려준다.

     

     

    - 설정 후, kafkaTemplate.send 해 주면 된다.

     

    - 참고로, 상단의 KafkaJsonTemplate은 객체 데이터를 보내기 위해 명명했는데, 객체 대신 Url String을 보내기로 계획이 수정되어서 kafkaUrlTemplate으로 이름이 변경되었다.

     

     

    Consumer

     

    - kafkaListener에 id(해당 리스너의 고유값, topics를 추가 해 주면 된다.)

     

    - topic의 경우, CLI로 미리 생성을 해 두었지만 Producer쪽에서 topic builder로 producer application 실행 시 생성 할 수도 있다.

        - CLI(command line interface) : 사용자와 컴퓨터가 상호작용 하는 인터페이스. 터미널 등

     

    - Optional Element들은 공식문서에 잘 나와있다.

     

     

    https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html

     

    KafkaListener (Spring for Apache Kafka 2.8.4 API)

    The bean name of the KafkaListenerContainerFactory to use to create the message listener container responsible to serve this endpoint. If not specified, the default container factory is used, if any. If a SpEL expression is provided (#{...}), the expressio

    docs.spring.io

     

     


    Referencehttps://coding-start.tistory.com/193

    댓글

Designed by Tistory.