이 포스팅은 공부목적으로 작성된 포스팅 입니다. 왜곡된 내용이 포함되어 있을 수 있습니다.
0. 개요
webflux, reactor를 처음 사용하게 되면 Publisher(Mono, Flux)와 Subscriber에 대해 학습하게 된다. 아주 쉽게(?) 이야기하면 Publisher가 데이터를 발행하면 Subscriber가 데이터를 구독하게 되는데, 이러한 이해만으로 webflux를 사용하는데에는 문제가 있다.
실제로 글쓴이는 아무런 이해도 없이 webflux를 구현하면서 다음과 같은 트러블 슈팅을 경험하게 된다.
- subscribe하지 않으면 실행조차 되지 않는 건가?
- 그러면 모든 로직에 대해서 subscribe를 하면 되나?
- 그런데 subcribe 하지 않아도 되는 경우도 있었는데?
Publisher와 Subscriber에 대해서 학습하면서 위와 같은 의문점을 해결해보자
1. Publisher
Publisher는 데이터를 생성하는 리액티브 스트림즈 컴포넌트 인터페이스이다. 여기서 중요한것이 데이터를 생성할 뿐만 아니라 통지하는 역활까지 한다.
Publisher 인터페이스를 보자
/**
* Flux.class
*/
public abstract class Flux<T> implements CorePublisher<T> {
...
/**
* CorePublisher.class
*/
public interface CorePublisher<T> extends Publisher<T> {
void subscribe(CoreSubscriber<? super T> var1);
}
/**
* Publisher.class
*/
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}
익히 알려진 Publisher와 달리 실제로는 Mono.class, Flux.class 는 CorePublisher 인터페이스를 구현하고 있었고 CorePublisher에서 Publisher를 상속하고 있었다.
CorePublisher 에는 subscribe 메소드 하나만 구현하면 되는데 Subscribe가 데이터를 구독함에 있어 CorePublisher의 subscribe에서 파라미터인 CoreSubscriber를 등록하는 형태로 구독이 이루어지게 된다.
2. Subscribe
이제 Subscriber를 보자
/**
* CoreSubscriber.class
*/
public interface CoreSubscriber<T> extends Subscriber<T> {
default Context currentContext() {
return Context.empty();
}
void onSubscribe(Subscription var1);
}
}
/**
* Subscriber.class
*/
public interface Subscriber<T> {
void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
/**
* Subscription.class
*/
public interface Subscription {
void request(long var1);
void cancel();
}
Subscriber 인터페이스 또한 CoreSubscriber가 상속하는 형태로 사용되고 있었다. Subscriber에서는 4개의 함수를 구현해야한다.
- onSubscribe(Subscription varl): 구독시 할 처리
- onNext(T t): Publisher가 발행한 데이터를 처리
- onError(Throwable t): Publisher가 에러를 발행하였을 경우 에러 처리
- onComplete(): Publisher가 데이터 발행 통보를 완료했을때 처리
CoreSubscriber에 보면 currentContext를 확인할 수 있는데, dependent component에게 Subscribing하는 동안 downstream operator을 요청하거나 Subscriber을 요청한다고 한다.(reactor에서 메소드 체이닝을 지원하는 부분이라고 이해했다.)
3. Publish -> Subscribe
publish에서 subscribe까지의 일련의 과정을 정리하면
- Publisher가 Subscriber 구현체를 subscribe 메서드의 파라미터 형태로 전달한다.
- Publisher 에서 onSubscribe 메서드를 호출하여 CorePublisher의 Subscription의 구현체를 전달한다.
- Subscription에 있는 요청 데이터 개수를 onNext singal를 통해 Subscriber에게 전달한다
- 더 이상 발행할 데이터가 없는 경우, onComplete singal를 통해 Subscirber에게 종료 singal을 보낸다.
100% 이해한것 같지 않지만 어느정도 납득했다.
좀 더 쉽게 이야기하면
- Publisher는 데이터를 발행한다.
- Subscriber는 데이터를 구독한다.
- Publisher의 Subscribe() 메소드를 호출하여 구독한다.
- 이때, Subscribe() 메소드안의 Subscription에 데이터 요청 정보를 가져와서 Publisher에게 전달한다.
4. 마블 다이어 그램(Operator)
publish와 subscribe 이외에도 하나 생각 해볼 것이 있는데 Operator이다.
여기 가장 많이 사용하는 Operator인 Flux.map() 메소드를 가져왔다.
위에 배웠던 내용을 생각해보면
onNext singal을 받으면 map() 메소드가 실행되고 처리가 모두 끝났다면 OnComplete signal을 통해 종료된다.
map() 메소드 또한 return type이 Flux이라는 것을 생각해보면 결국 map()메소드 하나의 publisher인 것이다.
4. hot sequence, cold sequence
그러면 subscribe하는 경우에 hot sequence로 진행될까 cold sequence일까?
일반적으로 cold sequence의 방식을 사용한다. hot sequence를 사용하는 방법으로 cache()와 같은 메소드를 사용하여 hot sequence로 바뀔 수 있다.(cache()와 같이 특정 시점을 기억한다)
cold sequence 환경에서 구독할때 마다 처음부터 실행하기 때문에 구독하지 않으면 데이터 처리가 안되는 것도 cold sequence와 관련있는 내용이다.
5. 정리
- subscribe하지 않으면 실행되지 않는 다고 생각하자(operator도 publisher 이니까)
- 해당 코드 블록에서 데이터에 대한 처리가 필요하다면 subscribe해야 한다.(subscribe을 최대한 한번에 진행하는 것이 비동기 프로그래밍에 더 알맞다고 생각한다)
- 일부 스트림 메소드 내부에서 subscribe 과정이 발생한다.(block 같은 경우)
'Java > 스프링 부트' 카테고리의 다른 글
[JMeter] JMeter를 통해 테스트 (0) | 2024.06.17 |
---|---|
[Webflux] Mono, Flux.Block() (0) | 2024.06.05 |
[Webflux] Reactor 시작하기 (0) | 2024.03.25 |
[Spring boot] @ConfigurationProperties 에서 "Failed to bind properties" (0) | 2024.03.06 |
[Spring boot] @Transactional readOnly에 대한 고찰 -1 (0) | 2024.02.28 |