이 포스팅은 공부목적으로 작성된 포스팅 입니다. 왜곡된 내용이 포함되어 있을 수 있습니다.
0. 개요
https://github.com/schananas/practical-reactor
위 포스팅은 해당 repo의 c1_introduction를 실습한 내용이다. 각각 chapter를 학습하면서 webflux에 사용되는 reactor를 정리하고자 한다. 학습하면서 키워드를 정리해서 학습 순서가 바뀔 수 있다.
1. block (hello world)
/**
* Every journey starts with Hello World!
* As you may know, Mono represents asynchronous result of 0-1 element.
* Retrieve result from this Mono by blocking indefinitely or until a next signal is received.
*/
public Mono<String> hello_world_service() {
return Mono.just("Hello World!");
}
@Test
public void hello_world() {
Mono<String> serviceResult = hello_world_service();
String result = null; //todo: change this line only
assertEquals("Hello World!", result);
}
프로그래밍을 배우는 만큼 hello world로 시작된다.
serviceResult가 가지고 있는 "hello world"문자열을 result에 할당해야한다.
webflux 환경에서는 코드가 비동기적으로 실행됨을 주의하자
String result = null가 할당되는 지점에서 serviceResult의 상태를 알 수 없다.
따라서 result에 serviceResult하려면 serviceResult가 끝날때 까지 기다려야 한다.(Block)
아래코드를 추가하자
String result = serviceResult.block();
다음 문제이다
/**
* Retrieving result should last for a limited time amount of time, or you might get in trouble.
* Try retrieving result from service by blocking for maximum of 1 second or until a next signal is received.
*/
public Mono<String> unresponsiveService() {
return Mono.never();
}
@Test
public void unresponsive_service() {
Exception exception = assertThrows(IllegalStateException.class, () -> {
Mono<String> serviceResult = unresponsiveService();
String result = null; //todo: change this line only
});
String expectedMessage = "Timeout on blocking read for 1";
String actualMessage = exception.getMessage();
assertTrue(actualMessage.contains(expectedMessage));
}
예외를 발생시켜야한다. unresponsiveService()에서 Mono.never() 이라는 방출하지않는 Mono를 반환한다. 따라서 onComplete signal을 보내지 않기 때문에 subscribe하게 되면 무한 루프에 빠지게 된다.
subscribe에 제한 시간을 포함시켜 예외를 발생시키자.
아래 코드를 추가하자
String result = serviceResult.block(Duration.ofSeconds(1));
이외에도 다양한 block함수들이 존재한다.
- Optional<T> blockOptional(): Mono를 Optional로 반환
- T blockFirst(): Flux의 첫번째 값을 반환
block을 사용하는 것은 당연하게도 권장되는 방식이 아니다.
block을 사용하는 순간 부터 해당 리소스에 대한 접근은 동기적으로 수행되기 때문이다.
2. subscribe
다음 문제를 보자
/***
* If you finished previous task, this one should be a breeze.
*
* Upgrade previously used solution, so that it:
* - adds each emitted item to `companyList`
* - does nothing if error occurs
* - sets `serviceCallCompleted` to `true` once service call is completed.
*
* Don't use doOnNext, doOnError, doOnComplete hooks.
*/
public Flux<String> fortuneTop5() {
return Flux.defer(() -> {
fortuneTop5ServiceIsCalled.set(true);
return Flux.just("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group");
});
}
@Test
public void leaving_blocking_world_behind() throws InterruptedException {
AtomicReference<Boolean> serviceCallCompleted = new AtomicReference<>(false);
CopyOnWriteArrayList<String> companyList = new CopyOnWriteArrayList<>();
fortuneTop5()
//todo: change this line only
;
Thread.sleep(1000);
assertTrue(serviceCallCompleted.get());
assertEquals(Arrays.asList("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group"), companyList);
}
Flux<String>으로 받은 문자열에 대해 companyList에 넣어야 한다. 또한 처리가 정상적으로 완료되었다면 serviceCallCompleted를 true로 설정해야한다.
subscribe의 전형적인 형태에 대한 문제이다.
일반적으로 subscribe에 대해서 아래와 같이 이해하고 있다.
public final Disposable subscribe()
그러나 subscribe은 다양한 형태로 오버로딩되서 사용할 수 있는데, 아래와 같이 사용할 수 있다.
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) {
return this.subscribe(consumer, errorConsumer, completeConsumer, (Context)null);
}
onComplete signal이 발생했을 때 사용할 consumer와 성공시 사용할 consumer, 실패시 예외 처리로 사용할 conumer을 지정할 수 있다. 아래 코드를 추가하자
fortuneTop5().subscribe(companyList::add,e->{},()->serviceCallCompleted.set(true));
2. 결론
이번 chapter를 실습하고 느낀 것을 정리하면
- block을 사용할 경우 동기적인 리소스접근이 시작된다(최대한 지양해야한다)
- subscribe은 단순히 publisher를 구독한다는 것에서 나아가서 종단 처리, 성공 실패 처리를 할 수 있다.
3. 의문점
- subscribe를 실행함에 있어서 subscribe은 onComplete singal을 받는 순간 실행된다. subscribe를 하지 않는 경우에 로직이 실행이 되지 않는 경우가 있는데 이것은 해당 로직이 cold sequence여서 그런것인가
- subscribe에 종단 연산을 넣을 수 있는데 그렇다면 종단 연산을 subscribe에 넣는 것과 중간 연산으로 처리하는 것의 차이가 없을까
- subscribe을 자체적으로 처리하는 메소드를 쉽게 구별할 수 없을까(docs를 뒤져야한다.)
- subscribe와 block의 차이점이 뭘까
'Java > 스프링 부트' 카테고리의 다른 글
[JMeter] JMeter를 통해 테스트 (0) | 2024.06.17 |
---|---|
[Webflux] Publisher, Subscriber (0) | 2024.06.05 |
[Webflux] Reactor 시작하기 (0) | 2024.03.25 |
[Spring boot] @ConfigurationProperties 에서 "Failed to bind properties" (0) | 2024.03.06 |
[Spring boot] @Transactional readOnly에 대한 고찰 -1 (0) | 2024.02.28 |