본문 바로가기
Backend boot camp/Session4

[Spring WebFlux] Reactor

by orioncsy 2022. 12. 9.

Project Reactor

개념

Reactor

  • Reactive Streams 표준 사양을 구현한 구현체
  • Spring 5 버전부터 지원하는 리액티브 스택에 포함되어 리액티브한 애플리케이션 동작에 있어 핵심 역할 담당하는 라이브러리

Reactor 특징

  • Reactor는 Reactive Streams를 구현한 리액티브 라이브러리이다.
  • 핵심 특징으로 Non-Blocking 통신을 지원한다.
    • 요청 스레드가 차단이 되지 않는다.
  • Publisher 타입으로 Mono[0|1]와 Flux[N]이라는 두 가지 타입 존재
    • Mono[0|1]에서 0과 1은 0건 또는 1건의 데이터를 emit 가능함을 말한다.
    • Flux[N]에서는 여러 건 데이터를 emit 가능
  • 서비스들 간 통신이 잦은 MSA(MicroService Architecure) 기반 애플리케이션들은 요청 스레드가 차단되는 통신을 사용하기는 무리가 있어 Non-Blocking 통신을 지원하는 Reactor가 적합
  • Backpressure
    • Publisher로 부터 끊임없이 들어오는 데이터를 emit하는 것에 비해 Subsriber가 처리 속도가 느리면 데이터가 지속적으로 쌓이고 오버플로우가 발생되고 시스템이 다운될 수 있다.
    • Backpressure는 속도를 적절하게 제어하는 전략이다.

Reactor 구성 요소

Flux
            .just("Hello", "Reactor")
            .map(message -> message.toUpperCase())
            .publishOn(Schedulers.parallel())
            .subscribe(System.out::println,
                    error -> System.out.println(error.getMessage()),
                    () -> System.out.println("# onComplete"));

        Thread.sleep(100L);
  • Flux로 시작하는 것은 Reactor Sequence가 시작되는 지점으로 여러 건 데이터 처리
  • just() Operator는 원본 데이터 소스에서 데이터를 emit하는 Publisher 역할
  • map() Operator는 Publisher로 받은 데이터를 가공
  • publishOn Operator에 Scheduler를 지정하면 Downstream의 스레드가 지정한 유형의 스레드로 변경
  • subscribe()에는 3개의 람다식을 가지는데 첫 번째 파라미터는 Publisher가 emit한 데이터를 받아 처리하는 역할, 두 번째는 에러 발생할 경우 처리, 세 번째는 reactor sequence 종료 후처리
  • 마지막에 Thread.slepp(100L)은 scheduler를 지정하면 main 스레드 이외에 추가 스레드가 생성되는데 지정한 스레드는 데몬 스레드로 main 스레드 종료 시 동시에 종료된다. main 스레드를 0.1초 동작 지연시키면 그 사이에 지정한 데몬 스레드를 통해 reactor sequence 정상 작동

마블 다이어그램(Marble Diagram)

마블 다이어그램

  • Marble은 구슬을 뜻한다.
  • 구슬은 하나의 데이터를 의미하고 다이어그램 상 시간의 흐름에 따라 데이터의 변화 흐름을 표현
  • Operator를 어떤 것을 사용하느냐에 따라 다양한 데이터 흐름 생성

마블 다이어그램으로 Mono와 Flux 이해하기

  • Mono의 마블 다이어그램
    • 위, 아래로 두 개의 타임라인이 존재하는데 데이터가 흘러가는 시간의 흐름을 표현
    • 위쪽 타임라인에서 sequence가 시작되고 데이터가 emit되는 것을 아래 operator()쪽으로 화살표를 지어 표시(구슬 하나를 표시하여 Mono를 표현)
    • 수직 막대바가 위쪽 타임라인에 나오는데 sequence 종료를 표시
    • Operator가 두 개 타임라인 사이에 존재하는데 데이터를 가공하는 것 표시
    • 아래 타임라인에는 가공 처리된 데이터가 downstream으로 전달될 때의 타임라인 표현
    • 만약 emit된 데이터가 처리 과정에서 에러 발생하면 X로 아래 타임라인에 표시
  • Flux의 마블 다이어그램
    • Mono 다이어그램과 유사하지만 구슬이 여러 개가 존재하여 여러 데이터를 emit

스케줄러(Scheduler)

스케줄러

  • 스레드를 관리하는 관리자 역할
  • Reactor Sequence 상에 처리되는 동작들을 하나 이상의 스레드에서 동작하도록 별도 스레드 제공
  • Reactor는 Non-Blocking 통신을 위한 비동기 프로그래밍을 위해 탄생
  • 즉, Scheduler를 사용하면 멀티스레딩을 사용한 비동기 프로그래밍 프로세스를 단순하게 해 준다.

Scheduler 전용 Operator

  • subscribeOn()
    • 매개 변수로 지정한 스케줄러를 통해 스레드를 구독 직후에 실행되는 operator 체인에 대하여 변경된 스레드로 사용한다.
    • doOnSubscribe()는 Operator로써 구족 직 후에 어떤 동작을 수행하고 싶을 때 그 안에 로직 작성
  • publishOn()
    • publishOn()을 기준으로 DownStream 쪽 스레드가 지정한 스레드로 변경
    • doOnNext()는 앞에 위치한 Operator가 실행될 때 트리거된다.
  • subscribeOn()은 구독 직후 실행 흐름을 다른 쓰레드로 바꾸는 데 사용
    • 원본 데이터를 생성하고 데이터를 emit하는 작업이 구독 직후 실행
  • publishOn()은 전달받은 데이터를 가공 처리하는 Operator 앞에 추가해 실행 스레드를 별도 추가하는 역할
  • subscribeOn()은 여러 번 추가해도 하나의 스레드만 추가로 생성되고 publishOn()응 Operator 앞에 여러 번 추가하면 여러번 생성된다.
  • 이렇게 사용하는 이유는 스레드가 직접 복잡한 계산을 수행하는 경우 응답 처리가 늦어질 수 있어 별도의 스레드를 사용
  • 다양한 유형의 스레드가 존재하지만 일반적으로 SubscribeOn()에서는 Schedulers.boundedElastic 를 사용하고 publishOn()에서는 Schedulers.parallel을 주로 사용

Operators

새로운 Sequence를 생성하는 경우

  • fromStream()
    • stream을 입력받아 emit 하는 operator
    • 예시
    Flux
    	.fromStream(Stream.of(100,200,300))
    	.redue((a+b)->a+b)
    	.subcribe(System.out::println);
    
  • fromIterable()
    • Iterable(List, Map, Set 등 컬렉션)을 입력받아 emit하는 operator
    • 예시
    Flux
    	.fromStream(List.of(100,200,300))
    	.redue((a+b)->a+b)
    	.subcribe(System.out::println);
    
  • create()
    • Signal 이벤트를 발생시키는 Operator
    • Publisher가 발생시키는 이벤트(Signal)를 발생시키는데 onNext Signal 이벤트 전송
    • create()는 한 번에 여러 건의 데이터를 비동기적으로 emit 가능
    • 예시
    private static List<Integer> source= Arrays.asList(1, 3, 5, 7, 9);
    Flux.create((FluxSink<Integer> sink) -> {
                sink.onRequest(n -> {
                    for (int i = 0; i < source.size(); i++) {
                        sink.next(source.get(i));
                    }
                    sink.complete();
                });
                sink.onDispose(() -> log.info("# clean up"));
            }).subscribe(d -> log.info("# onNext: {}", d));
    
    • create() operator 파라미터는 FluxSink라는 람다 파라미터를 가지는 람다 표현식이다.
    • 데이터 소스를 전달하면 내부에서 알아서 데이터를 emit 하는 방식이 아니라 직접 signal을 발생시켜 sequence 진행
    • onRequest()는 Subscriber에서 데이터를 요청하면 파라미터인 람다식이 실행
    • for문을 순회하여 next() 메서드로 원소를 emit
    • onDisponse()는 sequence가 완전히 종료되기 전에 호출되면 후처리 작업 가능

기존 Sequence에서 변환 작업이 필요한 경우

  • flatMap()
    • 내부로 들어오는 데이터 한 건당 하나의 Sequence가 생성
    • 예시
    Flux
                .range(2, 8)
                .flatMap(dan -> Flux
                        .range(1, 9)
                        .publishOn(Schedulers.parallel())
                        .map(num -> dan + " x " + num + " = " + dan * num))
                .subscribe(log::info);
    
            Thread.sleep(100L);
    
    • 2단에서 9단까지 출력하도록 range()를 통해 범위 지정
    • flatMap() 내부에 range()를 통해 하나의 단을 출력하도록 숫자를 지정
    • flatMap() 내부에 Inner Sequence를 처리할 스레드 할당
    • 구구단 형식 문자열을 구성해서 출력한다.
    • 그러나 결과는 구구단이 섞여서 출력된다. 작업의 처리 순서를 보장하지 않기 때문이다.
  • concat()
    • 입력으로 전달하는 Publisher의 sequence를 연결해서 차례대로 데이터를 emit
    • 예시
    Flux
                .concat(Flux.just("A", "B", "C", "D", "E"),
                        Flux.just("F", "G"))
                .subscribe(System.out::println);
    
  • zip()
    • 여러 개의 Publisher Sequence에서 emit된 데이터를 결합하는 operator
    • 데이터가 emit되는 시점이 다르기 때문에 결합되어야 할 데이터(같은 index)가 emit될 때까지 기다린다.
    • 예시
    Flux<Long> s1= Flux.interval(Duration.ofMillis(100L)).take(4);
    Flux<Long> s2 = Flux.interval(Duration.ofMillis(200L)).take(6);
    
    Flux
        .zip(s1, s2, (d1, d2) -> d1+ d2)
        .subscribe(data -> log.info("# onNext: {}", data));
    
    • Flux.interval()은 지정한 기간 동안 0부터 1씩 증가한 숫자를 emit한다.
      • take()를 통해 제한된 데이터 개수만큼 가져온다.
    • zip()을 통해 각자 데이터를 가져오는 시점이 달라도 기다렸다가 두 개 데이터를 받아서 처리

Sequence 내부의 동작을 확인하고자 할 경우

  • doOnNext()
    • 데이터 emit 시 트리거 되어 부수 효과를 추가
    • 주로 로깅에 사용
    • 예시
    List<Integer> list=List.of(1,2,3,4);
    Flux
                    .fromIterable(list)
                    .doOnNext(num-> validate(num))
                    .subscribe(data -> log.info("{}", data));
    
    private static void validate(int num){
    	if(num ==0) throw new RuntimeException("zero is not acceptable");
    }
    
    • 유효성 검증처럼 사용 가능
  • log()
    • publisher에서 발생하는 signal 이벤트를 로그로 출력해주는 역할
    • 예시
    Flux
                .fromStream(Stream.of(2, 3, 4, 5, 6))
                .log()
                .reduce((a, b) -> a + b)
                .log()
                .subscribe(System.out::println);
    
    • onSubscribe signal 이벤트가 두 번 발생
    • request Signal 이벤트 두 번 발생
    • Publisher가 데이터 emit할 때 onNext Signal 5번 발생
    • emit을 정상 종료할 때 두 번 onComplete Signal 발생

Error를 처리하는 경우

  • error()
    • 의도적으로 onError Signal 이벤트 발생
    • 예시
    Mono.justOrEmpty(func())
                    .switchIfEmpty(Mono.error(new RuntimeException("Not found coffee")))
                    .subscribe(
                            data -> log.info("{}", data),
                            error -> log.error("# onError: {}", error.getMessage()));
    private static Integer func() {
            return null;
        }
    
    • justOrEmpty()의 경우 null 값을 받아도 error가 발생하지 않는다. just()의 경우에는 null 데이터를 emit하면 에러 발생한다.
    • switchIfEmptu()는 upstream에서 전달되는 데이터가 null이면 동작한다.
    • error()를 통해 onError Signal을 발생시키고 에러 메시지를 출력하도록 작성한다.
  • timeout(), retry()
    • timeout()은 주어진 시간 동안 emit되는 데이터가 없으면 onError Signal 발생
    • retry()는 sequence 상 에러가 발생할 경우 입력으로 주어진 숫자만큼 재구독하여 sequence 다시 시작
    • 예시
    public static void main(String[] args) throws InterruptedException {
            func()
                    .collect(Collectors.toSet())
                            .subscribe(set-> set
                                    .stream()
                                    .forEach(d->
                                            log.info("{}", d)));
            Thread.sleep(12000);
        }
        private static Flux<Integer> func(){
            List<Integer> list=List.of(1,2,3,4,5,6);
            final int[] count={0};
            return Flux
                    .fromIterable(list)
                    .delayElements(Duration.ofMillis(500))
                    .map(num->{
                        try{
                            count[0]++;
                            if(count[0] ==3){
                                Thread.sleep(2000);
                            }
                        } catch (InterruptedException e){
    
                        }
                        return num;
                    })
                    .timeout(Duration.ofSeconds(2))
                    .retry(1)
                    .doOnNext(num-> log.info("# func > doOnNext: {}", num));
        }
    
    • func 함수
      • list로 값을 생성하고 count 배열을 선언한다.
      • Flux 내에서는 final 변수를 사용해야 하고 변경 가능하게 하려면 배열로 선언해야 한다.
      • 결과에서는 Flux를 통해 delayElements로 0.5초간 지연시켜서 데이터를 emit한다.
      • 3번째 값이 올 경우 2초를 지연시키는 작업을 map을 통해 작성한다.
      • timeout()으로 2초를 지정하고 retry는 1번으로 진행하여 로그를 작성한다.
    • main 함수
      • collect를 사용해 set으로 변경하는 이유는 도중에 retry되면 기존에 들어가 있던 값이 중복되기 때문에 중복을 허용하지 않는 set으로 변경
      • 로그를 작성하면 list에 있는 값들을 출력

'Backend boot camp > Session4' 카테고리의 다른 글

[Cloud] 운영 환경 구성  (1) 2022.12.09
[Spring WebFlux] Spring WebFlux  (0) 2022.12.09
[Spring WebFlux] 리액티브 프로그래밍  (0) 2022.12.09
[Spring Security] OAuth 2  (0) 2022.12.09
[Spring Security] JWT 인증  (0) 2022.12.09