오늘도 개발자 Backend Developer

[Spring Webflux] 2. Mono and Flux

Mono and Flux

  • org.reactivestreams.Publisher의 구현체
  • 시퀀스를 제공하는 발행자 역할
  • Mono : 0-1 개의 데이터 전달
  • Flux : 0-N개의 데이터 전달

간단하게 사용해보기

Mono

empty()

Mono data = Mono.empty();

just()

Mono data = Mono.just("");

error()

Mono.error(RuntimeException::new);

위 내용에 대한 전체소스
@Test
public void mono() {
    Mono.empty().subscribe(s -> log.info("empty() : {}", s)); //출력안됨

    Mono.just("Test Value")
        .subscribe(s -> log.info("just() : {}", s));

    Mono.error(RuntimeException::new)
        .doOnError(e -> log.error(e.getMessage(), e))
        .subscribe(s -> log.info("error() : {}", s));
}

Flux

  • 한개의 시퀀스가 전달 될 때마다 doOnNext 이벤트 발생
  • 모든 데이터가 전달 완료되면 doOnComplete 이벤트 발생
  • 전달 과정에서 오류가 발생하면 doOnError 이벤트발생
  • Publisher는 subscribe가 되었을 경우에만 데이터를 전달함. subscribe() 호출 필요
    • 매개변수로 넣을 수 있는 Consumer의 경우 doOnNext가 호출 될 때마다 실행되는 함수
    • 소스 : 아래 소스 .subscribe(); 를 제거하게되면, 실행되지 않는다.
@Test
public void test() {
	Flux.just(1, 2, 3)
		.doOnNext(i -> log.info("doOnNext : {} ", i))
		.subscribe();
}
- 결과
xxx.FluxTestCase - doOnNext : 1 
xxx.FluxTestCase - doOnNext : 2 
xxx.FluxTestCase - doOnNext : 3 
empty()

Flux data = Flux.empty();

just()

Flux data = Flux.just("", "");

range()

Flux.range(0, 10);

fromArray(), fromIterable(), fromStream()

Flux.fromArray(new String[]{“value 1”, “value 2”, “value 3”}); Flux.fromIterable(Arrays.asList(“value 1”, “value 2”, “value 3”)); Flux.fromStream(IntStream.range(0, 10).boxed())

위 내용에 대한 전체소스
  • log() 함수를 붙여 로그를 확인 할 수 있다.
@Test
public void flux() throws InterruptedException {
    Flux.empty().subscribe(s -> log.info("empty() : {}", s)); //출력안됨
    Flux.just("value 1", "value 2", "value 3")
        .subscribe(s -> log.info("just() : {}", s));

    Flux.range(0, 10)
        .subscribe(s -> log.info("range() : {}", s));

    Flux.fromArray(new String[]{"value 1", "value 2", "value 3"})
        .subscribe(s -> log.info("fromArray() : {}", s));

    Flux.fromIterable(Arrays.asList("value 1", "value 2", "value 3"))
        .subscribe(s -> log.info("fromIterable() : {}", s));

    Flux.fromStream(IntStream.range(0, 10).boxed())
        .subscribe(s -> log.info("fromStream() : {}", s));

    Flux.interval(Duration.ofMillis(100))
        .map(item -> "tick: " + item)
        .take(10)
        .subscribe(s -> log.info("fromIterable() : {}", s));

    Thread.sleep(150);

    Flux.error(RuntimeException::new)
        .doOnError(e -> log.error(e.getMessage(), e))
        .subscribe(s -> log.info("error() : {}", s));
}