-
Project Reactor와 Spring WebfluxBack-End 2024. 7. 16. 00:05728x90
Project Reactor란?
Project Reactor 사이트에서는 다음과 같이 소개가 되어 있다.
Reactor is a fourth-generation reactive library, based on the Reactive Streams
specification, for building non-blocking applications on the JVM즉, Java와 같은 JVM 기반 언어 어플리케이션의 반응형 스트림 명세를 구현한 리액티브 프로그래밍 라이브러리이다.
좀 더 자세히 여러 가지 특징을 살펴보면
- Project Reactor는 비동기식으로 데이터를 처리하고, 블로킹이 발생하지 않도록 설계되어있다. 이를 통해 높은 처리량과 낮은 대기시간을 유지할 수 있다.
- 데이터 스트림을 손쉽게 생성, 변환, 결합 및 소비할 수 있는 다양한 연산자를 제공한다.
- 생산자와 소비자 간의 속도 불일치 문제를 해결하기 위해 백프레셔(BackPressure) 메커니즘을 지원합니다. 이를 통해 시스템이 과부하 상태에 빠지지 않도록 제어할 수 있다.
- 리액티브 스트림 사양에 따라 Publisher와 Subscriber 모델을 사용하여 데이터 스트림을 구독하고 처리한다.
반응형 스트림(Reactive Streams)이란?
- 비동기 데이터 스트림을 처리하기 위한 표준화된 사양이다.
- 생산자와 소비자 간의 데이터 전달을 효율적으로 처리하기 위해 고안된 프로그래밍 모델이고, 특히 백프레셔(backpressure) 문제를 해결하는 데 중점을 두고 있다.
- 고성능, 비동기 애플리케이션에서 데이터를 신뢰성 있게 처리하기 위해 설계되었다.
반응형 스트림에는 크게 4가지의 요소가 있다.
- Publisher(발행자)
- 데이터 스트림을 생성하고 발행한다. Publisher는 데이터 요소를 Subscriber에게 전달하는 역할을 한다.
- Subscriber (구독자)
- Publisher가 발행한 데이터를 소비한다. 즉, 데이터를 수신하고 처리하는 역할을 한다.
- Subscription (구독)
- Publisher와 Subscriber 간의 계약을 나타내고, 데이터 스트림의 생명 주기를 관리한다. 구독자는 데이터 스트림을 요청하고, Subscription은 데이터의 요청과 취소를 처리한다.
- Processor(프로세서)
- Publisher와 Subscriber의 역할을 모두 수행한다.
- 즉, 데이터를 발행하면서 동시에 구독하여 중간에서 데이터를 처리하고 변환한다.
Project Reactor를 활용한 간단한 예시 코드
import reactor.core.publisher.Flux; public class ReactiveStreamsExample { public static void main(String[] args) { Flux<Integer> flux = Flux.range(1, 10) .map(i -> i * 2) // 각 요소를 두 배로 변환 .filter(i -> i % 4 == 0); // 4의 배수 필터링 flux.subscribe(System.out::println, error -> System.err.println("Error: " + error), () -> System.out.println("Stream completed")); } }
- 1부터 10까지의 정수 스트림을 생성하는 Flux publisher를 선언한다.
- map 메서드를 통해, 내부 값을 2배로 변환한다.
- filter 메서드를 통해, 4의 배수 데이터 스트림만 추출한다.
- subscribe 호출을 통해, Subscriber를 등록하여 해당 데이터 스트림을 소비한다. 소비를 완료한 경우 Stream completed를 출력한다.
Spring WebFlux에서는 내부적으로 어떻게 request를 핸들링할까?
Spring WebFlux에서 Mono나 Flux 같은 리액티브 스트림의 구독과 데이터 수신 과정은 Reactive Streams를 기반으로 동작하며, 해당 과정은 다음과 같다.
1. 구독자(Subscriber)와 발행자(Publisher)
- Mono나 Flux는 발행자(Publisher)의 역할을 한다. 이 Publisher는 데이터를 비동기적으로 생성하고, 구독자(Subscriber)가 그 데이터를 요청하면, 이를 전달한다. Spring WebFlux에서는 HTTP 요청이 들어올 때 Mono나 Flux를 반환하는 컨트롤러 메서드가 발행자가 되고, Spring WebFlux의 런타임이 구독자 역할을 하게 된다.
2. 구독 시작 (Subscription)
- Mono 또는 Flux가 컨트롤러 단에서 return되면, Spring WebFlux가 해당 Publisher를 subscribe한다.
- Publisher.subscribe(Subscriber subscriber) 메서드 호출로 발행자가 구독자를 받고, 구독자의 onSubscribe 메서드를 호출한다.
- 이 시점에서 Subscription 객체가 생성된다. Subscription 객체는 구독자와 발행자 간의 연결을 관리하고, 구독자는 Subscription.request(long n) 메서드를 호출하여 데이터를 요청하게 된다.
- Publisher.subscribe(Subscriber subscriber) 메서드 호출로 발행자가 구독자를 받고, 구독자의 onSubscribe 메서드를 호출한다.
3. 데이터 요청 (Request)
- 구독자가 데이터를 수신하고자 할 때, 구독자는 Subscription.request(n)를 호출하여 데이터 항목 n개를 요청한다. 이 요청의 크기는 백프레셔(Backpressure)의 개념을 사용하고, 구독자는 자신이 처리할 수 있는 만큼의 데이터를 요청하게 된다.
- WebFlux는 논블로킹 방식으로 데이터를 처리하기 때문에, 한 번에 요청하는 데이터 수는 시스템의 상태에 따라 달라질 수 있다.
4. 데이터 생성 및 발행 (Emit Data)
발행자는 구독자의 요청에 따라 데이터를 생성하거나 외부 리소스에서 데이터를 비동기적으로 받아옵니다. 발행자가 데이터를 준비하면, 그 데이터를 구독자에게 전달합니다. 이때, 구독자의 다음 메서드가 호출됩니다:
- Subscriber.onNext(T item): 발행자가 데이터를 생성하여 구독자에게 전달할 때 호출됩니다. 이 메서드는 요청한 데이터가 준비되면 호출되며, 데이터를 하나씩 넘겨줍니다.
Mono는 최대 1개의 데이터만을 발행하기 때문에, 데이터가 준비되면 onNext 메서드가 한 번 호출됩니다.
5. 완료 및 종료 (Completion)
데이터가 모두 발행된 후에는 스트림을 종료하는 단계가 필요한데, 이때 발행자는 다음 중 하나를 호출한다.
- Subscriber.onComplete(): 발행자가 정상적으로 모든 데이터를 발행한 경우 호출되며, 구독이 성공적으로 완료되었음을 의미한다.
- Subscriber.onError(Throwable t): 발행자가 데이터를 발행하는 도중 오류가 발생한 경우 호출됩니다. 이때 구독자는 오류를 처리한다.
여기서 Mono의 경우 단일 값을 처리하기 때문에, 데이터가 수신되면 onNext() 메서드가 호출된 뒤 곧바로 onComplete()가 호출된다. Flux는 여러 데이터를 발행할 수 있으므로, 여러 번 onNext()가 호출된 후 최종적으로 onComplete()가 호출된다.
6. 논블로킹 I/O 처리
Spring WebFlux는 논블로킹 I/O를 사용하기 때문에, 데이터를 발행하거나 수신하는 동안 서버는 다른 작업을 처리할 수 있다. 즉, Mono나 Flux가 데이터를 발행할 때 서버는 블로킹되지 않고, 비동기적으로 데이터를 처리한다. 이러한 처리는 기본적으로 Netty 같은 논블로킹 서버에서 지원되며, Mono나 Flux의 구독자와 발행자 사이의 상호작용도 모두 비동기적으로 이루어진다.
요약하면 다음과 같다.
- 구독 시작: Spring WebFlux 런타임이 Mono 또는 Flux를 구독하며, Subscription 객체가 생성
- 데이터 요청: 구독자가 Subscription.request(n)을 호출하여 데이터를 요청
- 데이터 수신: 발행자가 데이터를 준비하면 onNext() 메서드를 통해 데이터를 구독자에게 전달
- 완료 또는 오류 처리: 데이터 발행이 완료되면 onComplete()가 호출되고, 오류가 발생하면 onError()가 호출
- 비동기 처리: 모든 처리는 논블로킹 방식으로 이루어지며, 서버는 데이터 처리 중에도 다른 작업을 처리