Backend boot camp/Session4
[Spring WebFlux] Reactor
by orioncsy
2022. 12. 9.
Project Reactor
개념
Reactor
- Reactive Streams 표준 사양을 구현한 구현체
- Spring 5 버전부터 지원하는 리액티브 스택에 포함되어 리액티브한 애플리케이션 동작에 있어 핵심 역할 담당하는 라이브러리
Reactor 특징
- Reactor는 Reactive Streams를 구현한 리액티브 라이브러리이다.
- 핵심 특징으로 Non-Blocking 통신을 지원한다.
- Publisher 타입으로 Mono[0|1]와 Flux[N]이라는 두 가지 타입 존재
- Mono[0|1]에서 0과 1은 0건 또는 1건의 데이터를 emit 가능함을 말한다.
- Flux[N]에서는 여러 건 데이터를 emit 가능
- 서비스들 간 통신이 잦은 MSA(MicroService Architecure) 기반 애플리케이션들은 요청 스레드가 차단되는 통신을 사용하기는 무리가 있어 Non-Blocking 통신을 지원하는 Reactor가 적합
- Backpressure
- Publisher로 부터 끊임없이 들어오는 데이터를 emit하는 것에 비해 Subsriber가 처리 속도가 느리면 데이터가 지속적으로 쌓이고 오버플로우가 발생되고 시스템이 다운될 수 있다.
- Backpressure는 속도를 적절하게 제어하는 전략이다.
Reactor 구성 요소
Flux
.just("Hello", "Reactor")
.map(message -> message.toUpperCase())
.publishOn(Schedulers.parallel())
.subscribe(System.out::println,
error -> System.out.println(error.getMessage()),
() -> System.out.println("# onComplete"));
Thread.sleep(100L);
- Flux로 시작하는 것은 Reactor Sequence가 시작되는 지점으로 여러 건 데이터 처리
- just() Operator는 원본 데이터 소스에서 데이터를 emit하는 Publisher 역할
- map() Operator는 Publisher로 받은 데이터를 가공
- publishOn Operator에 Scheduler를 지정하면 Downstream의 스레드가 지정한 유형의 스레드로 변경
- subscribe()에는 3개의 람다식을 가지는데 첫 번째 파라미터는 Publisher가 emit한 데이터를 받아 처리하는 역할, 두 번째는 에러 발생할 경우 처리, 세 번째는 reactor sequence 종료 후처리
- 마지막에 Thread.slepp(100L)은 scheduler를 지정하면 main 스레드 이외에 추가 스레드가 생성되는데 지정한 스레드는 데몬 스레드로 main 스레드 종료 시 동시에 종료된다. main 스레드를 0.1초 동작 지연시키면 그 사이에 지정한 데몬 스레드를 통해 reactor sequence 정상 작동
마블 다이어그램(Marble Diagram)
마블 다이어그램
- Marble은 구슬을 뜻한다.
- 구슬은 하나의 데이터를 의미하고 다이어그램 상 시간의 흐름에 따라 데이터의 변화 흐름을 표현
- Operator를 어떤 것을 사용하느냐에 따라 다양한 데이터 흐름 생성
마블 다이어그램으로 Mono와 Flux 이해하기
- Mono의 마블 다이어그램
- 위, 아래로 두 개의 타임라인이 존재하는데 데이터가 흘러가는 시간의 흐름을 표현
- 위쪽 타임라인에서 sequence가 시작되고 데이터가 emit되는 것을 아래 operator()쪽으로 화살표를 지어 표시(구슬 하나를 표시하여 Mono를 표현)
- 수직 막대바가 위쪽 타임라인에 나오는데 sequence 종료를 표시
- Operator가 두 개 타임라인 사이에 존재하는데 데이터를 가공하는 것 표시
- 아래 타임라인에는 가공 처리된 데이터가 downstream으로 전달될 때의 타임라인 표현
- 만약 emit된 데이터가 처리 과정에서 에러 발생하면 X로 아래 타임라인에 표시
- Flux의 마블 다이어그램
- Mono 다이어그램과 유사하지만 구슬이 여러 개가 존재하여 여러 데이터를 emit
스케줄러(Scheduler)
스케줄러
- 스레드를 관리하는 관리자 역할
- Reactor Sequence 상에 처리되는 동작들을 하나 이상의 스레드에서 동작하도록 별도 스레드 제공
- Reactor는 Non-Blocking 통신을 위한 비동기 프로그래밍을 위해 탄생
- 즉, Scheduler를 사용하면 멀티스레딩을 사용한 비동기 프로그래밍 프로세스를 단순하게 해 준다.
Scheduler 전용 Operator
- subscribeOn()
- 매개 변수로 지정한 스케줄러를 통해 스레드를 구독 직후에 실행되는 operator 체인에 대하여 변경된 스레드로 사용한다.
- doOnSubscribe()는 Operator로써 구족 직 후에 어떤 동작을 수행하고 싶을 때 그 안에 로직 작성
- publishOn()
- publishOn()을 기준으로 DownStream 쪽 스레드가 지정한 스레드로 변경
- doOnNext()는 앞에 위치한 Operator가 실행될 때 트리거된다.
- subscribeOn()은 구독 직후 실행 흐름을 다른 쓰레드로 바꾸는 데 사용
- 원본 데이터를 생성하고 데이터를 emit하는 작업이 구독 직후 실행
- publishOn()은 전달받은 데이터를 가공 처리하는 Operator 앞에 추가해 실행 스레드를 별도 추가하는 역할
- subscribeOn()은 여러 번 추가해도 하나의 스레드만 추가로 생성되고 publishOn()응 Operator 앞에 여러 번 추가하면 여러번 생성된다.
- 이렇게 사용하는 이유는 스레드가 직접 복잡한 계산을 수행하는 경우 응답 처리가 늦어질 수 있어 별도의 스레드를 사용
- 다양한 유형의 스레드가 존재하지만 일반적으로 SubscribeOn()에서는 Schedulers.boundedElastic 를 사용하고 publishOn()에서는 Schedulers.parallel을 주로 사용
Operators
새로운 Sequence를 생성하는 경우
- fromStream()
- stream을 입력받아 emit 하는 operator
- 예시
Flux
.fromStream(Stream.of(100,200,300))
.redue((a+b)->a+b)
.subcribe(System.out::println);
- fromIterable()
- Iterable(List, Map, Set 등 컬렉션)을 입력받아 emit하는 operator
- 예시
Flux
.fromStream(List.of(100,200,300))
.redue((a+b)->a+b)
.subcribe(System.out::println);
- create()
- Signal 이벤트를 발생시키는 Operator
- Publisher가 발생시키는 이벤트(Signal)를 발생시키는데 onNext Signal 이벤트 전송
- create()는 한 번에 여러 건의 데이터를 비동기적으로 emit 가능
- 예시
private static List<Integer> source= Arrays.asList(1, 3, 5, 7, 9);
Flux.create((FluxSink<Integer> sink) -> {
sink.onRequest(n -> {
for (int i = 0; i < source.size(); i++) {
sink.next(source.get(i));
}
sink.complete();
});
sink.onDispose(() -> log.info("# clean up"));
}).subscribe(d -> log.info("# onNext: {}", d));
- create() operator 파라미터는 FluxSink라는 람다 파라미터를 가지는 람다 표현식이다.
- 데이터 소스를 전달하면 내부에서 알아서 데이터를 emit 하는 방식이 아니라 직접 signal을 발생시켜 sequence 진행
- onRequest()는 Subscriber에서 데이터를 요청하면 파라미터인 람다식이 실행
- for문을 순회하여 next() 메서드로 원소를 emit
- onDisponse()는 sequence가 완전히 종료되기 전에 호출되면 후처리 작업 가능
기존 Sequence에서 변환 작업이 필요한 경우
- flatMap()
- 내부로 들어오는 데이터 한 건당 하나의 Sequence가 생성
- 예시
Flux
.range(2, 8)
.flatMap(dan -> Flux
.range(1, 9)
.publishOn(Schedulers.parallel())
.map(num -> dan + " x " + num + " = " + dan * num))
.subscribe(log::info);
Thread.sleep(100L);
- 2단에서 9단까지 출력하도록 range()를 통해 범위 지정
- flatMap() 내부에 range()를 통해 하나의 단을 출력하도록 숫자를 지정
- flatMap() 내부에 Inner Sequence를 처리할 스레드 할당
- 구구단 형식 문자열을 구성해서 출력한다.
- 그러나 결과는 구구단이 섞여서 출력된다. 작업의 처리 순서를 보장하지 않기 때문이다.
- concat()
- 입력으로 전달하는 Publisher의 sequence를 연결해서 차례대로 데이터를 emit
- 예시
Flux
.concat(Flux.just("A", "B", "C", "D", "E"),
Flux.just("F", "G"))
.subscribe(System.out::println);
- zip()
- 여러 개의 Publisher Sequence에서 emit된 데이터를 결합하는 operator
- 데이터가 emit되는 시점이 다르기 때문에 결합되어야 할 데이터(같은 index)가 emit될 때까지 기다린다.
- 예시
Flux<Long> s1= Flux.interval(Duration.ofMillis(100L)).take(4);
Flux<Long> s2 = Flux.interval(Duration.ofMillis(200L)).take(6);
Flux
.zip(s1, s2, (d1, d2) -> d1+ d2)
.subscribe(data -> log.info("# onNext: {}", data));
- Flux.interval()은 지정한 기간 동안 0부터 1씩 증가한 숫자를 emit한다.
- take()를 통해 제한된 데이터 개수만큼 가져온다.
- zip()을 통해 각자 데이터를 가져오는 시점이 달라도 기다렸다가 두 개 데이터를 받아서 처리
Sequence 내부의 동작을 확인하고자 할 경우
- doOnNext()
- 데이터 emit 시 트리거 되어 부수 효과를 추가
- 주로 로깅에 사용
- 예시
List<Integer> list=List.of(1,2,3,4);
Flux
.fromIterable(list)
.doOnNext(num-> validate(num))
.subscribe(data -> log.info("{}", data));
private static void validate(int num){
if(num ==0) throw new RuntimeException("zero is not acceptable");
}
- log()
- publisher에서 발생하는 signal 이벤트를 로그로 출력해주는 역할
- 예시
Flux
.fromStream(Stream.of(2, 3, 4, 5, 6))
.log()
.reduce((a, b) -> a + b)
.log()
.subscribe(System.out::println);
- onSubscribe signal 이벤트가 두 번 발생
- request Signal 이벤트 두 번 발생
- Publisher가 데이터 emit할 때 onNext Signal 5번 발생
- emit을 정상 종료할 때 두 번 onComplete Signal 발생
Error를 처리하는 경우
- error()
- 의도적으로 onError Signal 이벤트 발생
- 예시
Mono.justOrEmpty(func())
.switchIfEmpty(Mono.error(new RuntimeException("Not found coffee")))
.subscribe(
data -> log.info("{}", data),
error -> log.error("# onError: {}", error.getMessage()));
private static Integer func() {
return null;
}
- justOrEmpty()의 경우 null 값을 받아도 error가 발생하지 않는다. just()의 경우에는 null 데이터를 emit하면 에러 발생한다.
- switchIfEmptu()는 upstream에서 전달되는 데이터가 null이면 동작한다.
- error()를 통해 onError Signal을 발생시키고 에러 메시지를 출력하도록 작성한다.
- timeout(), retry()
- timeout()은 주어진 시간 동안 emit되는 데이터가 없으면 onError Signal 발생
- retry()는 sequence 상 에러가 발생할 경우 입력으로 주어진 숫자만큼 재구독하여 sequence 다시 시작
- 예시
public static void main(String[] args) throws InterruptedException {
func()
.collect(Collectors.toSet())
.subscribe(set-> set
.stream()
.forEach(d->
log.info("{}", d)));
Thread.sleep(12000);
}
private static Flux<Integer> func(){
List<Integer> list=List.of(1,2,3,4,5,6);
final int[] count={0};
return Flux
.fromIterable(list)
.delayElements(Duration.ofMillis(500))
.map(num->{
try{
count[0]++;
if(count[0] ==3){
Thread.sleep(2000);
}
} catch (InterruptedException e){
}
return num;
})
.timeout(Duration.ofSeconds(2))
.retry(1)
.doOnNext(num-> log.info("# func > doOnNext: {}", num));
}
- func 함수
- list로 값을 생성하고 count 배열을 선언한다.
- Flux 내에서는 final 변수를 사용해야 하고 변경 가능하게 하려면 배열로 선언해야 한다.
- 결과에서는 Flux를 통해 delayElements로 0.5초간 지연시켜서 데이터를 emit한다.
- 3번째 값이 올 경우 2초를 지연시키는 작업을 map을 통해 작성한다.
- timeout()으로 2초를 지정하고 retry는 1번으로 진행하여 로그를 작성한다.
- main 함수
- collect를 사용해 set으로 변경하는 이유는 도중에 retry되면 기존에 들어가 있던 값이 중복되기 때문에 중복을 허용하지 않는 set으로 변경
- 로그를 작성하면 list에 있는 값들을 출력