티스토리 뷰

Language/JAVA

rxjava란? - Observable, .doOnComplete()

KyeongRok Kim 2018. 3. 11. 00:00

rxjava란?

RxJava는 Reactive java에서 이름을 따왔다.


Reactive programming(리액티브 프로그래밍) 패러다임을 자바에서 구현한 프로그래밍 라이브러리이다.


프로그래밍 패러다임에는 여러가지가 있는데 OOP(객체지향), Function(함수), Imperative(명령형) 등이 있다.


대체로 많은 프로그램들이 명령형 프로그래밍이라고 할 수 있고 여기에는 자바, 파이썬, C, Node.js등도 포함 되어 있다.

특정 언어라고 해서 한가지 프로그래밍 패러다임만 사용하는건 아니지만 특정 목적에 맞게 설계된 언어들이 있다.


여기에서 '패러다임(paradigm)'은 방법론 정도로 보면 된다.


여러가지 언어 중에서 자바는 OOP(Object Oriented Programming)라고 해서 객체지향형 프로그래밍의 대표 언어이고 Functional Programming(펑셔널 프로그래밍)을 대표하는건 파이썬, node.js등이라고 할 수 있다. 자바로 Reactive Programming을 해야할 일이 생겨서 이 라이브러리가 등장 하였다. 그리고 자바가 버젼이 올라가면서 여러가지 요즘 트렌드에 맞게 기술들이 추가 되어서 OOP기반이었던 자바가 Functional, Reactive 등의 프로그래밍 방법론으로도 개발이 가능하게 되었다.


리액티브란 외부에서 자극이 오고 그에 대해 반응 한다는 뜻이다. 


아래는 rxJava로 hello를 출력한 코드이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
import io.reactivex.Observable;
 
public class FirstExample {
    public void emit(){
        Observable.just("hello""rxjava2!!")
            .subscribe(System.out::println);
    }
    public static void main(String[] args) {
        FirstExample firstExample = new FirstExample();
        firstExample.emit();
    }
}
 
cs

코드를 잠깐 살펴보면 Observable와 .just()가 나오고 .subscribe()가 나오고 그 안으로 System.out::println이 들어가는 구조를 볼 수 있다.


Observer라는 것을 사용하는게 RxJava이다.


Function

Function은 쉽게 이야기 해서 제네릭으로 <기존타입, 리턴타입>을 받아서 .apply()를 하면 기존 타입의 연산 결과를 결과 타입으로 반환을 해준다.

소스코드는 아래와 같다.


1
2
3
4
5
6
7
8
9
10
import java.util.function.Function;
 
public class FunctionExample {
    public static void main(String[] args) {
        Function<String, Integer> function = str -> Integer.parseInt(str);
        Integer integer = function.apply("10");
        System.out.println(integer);
    }
}
 
cs


gradle로 빌드한 소스코드는 아래 repository에 올려 놓았다.

https://github.com/Kyeongrok/rxjava_helloworld/



Consumer(컨슈머)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable<Integer> source = Observable.create((ObservableEmitter<Integer> emitter) -> {
    emitter.onNext(100);
    emitter.onNext(200);
    emitter.onNext(300);
    emitter.onComplete();
 
});
 
// 람다 + 메소드 레퍼런스
source.subscribe(System.out::println);
 
// 그냥 코드
source.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        System.out.println("result : " + integer);
    }
});
cs

컨슈머는 값을 받는 익명 void함수다.


위 코드 10번줄 처럼 한줄이면 끝나는 코드를 그냥 쓸려면 13~18 이렇게 길게 써야 한다.




Observable, Single, .from()

Observable

옵저버블, 옵설버블 등으로 읽는다. RxJava뿐만 아니라 리액티브 프로그래밍의 기본적인 모델이다. 일단 여기(Observable)에 넣고 처리한다고 생각 하면 된다.
마치 스프링이 일단은 ApplicationContext에 모든 빈을 넣고 시작하는것과 비슷하다고 보면 된다.
1
2
Observable.just("hello""rxjava2!!")
.subscribe(System.out::println);
cs

실행 결과

hello

rxjava2!! 


.subscribe()에 실행할 메소드를 넣어주면 된다. 엄밀히 말하면 Consumer(컨슈머)라고 파라메터(parameter)가 한개인 void method이다. 


.from()

1
2
Observable<String> source = Observable.just("singleFromObservable");
Single.fromObservable(source).subscribe(System.out::println);
cs

Single

데이터를 1개만 발행할 수 있는 클래스가 Single이다.
Single은 class는 결과가 한개인 것을 엄격하게 통제 해야 할 때 사용한다.

1
2
Observable<String> source2 = Observable.just("singleFromObservable""hello");
Single.fromObservable(source2).subscribe(System.out::println);
cs

위와 같이 .just()안에 2개의 item이 있는데 이걸 Single로 바꿀려고 하면 에러가 난다.

singleFromObservable

io.reactivex.exceptions.OnErrorNotImplementedException: Sequence contains more than one element!

... 10 more 





.doOnComplete()


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import io.reactivex.Flowable;
 
public class DoOnComplete {
 
    public void run() {
        String[] ar = {"a""b"};
        Flowable.fromArray(ar)
                .doOnComplete(() -> System.out.println("completed 1"));
    }
 
    public void run2() {
        String[] ar = {"a""b"};
        Flowable.fromArray(ar)
                .doOnComplete(() -> System.out.println("completed 2"))
                .subscribe();
    }
 
    public void run3() {
        String[] ar = {};
        Flowable.fromArray(ar)
                .doOnComplete(() -> System.out.println("completed 3"))
                .subscribe();
    }
 
    public void run4() {
        Flowable.just("1")
                .doOnComplete(() -> System.out.println("completed 4"))
                .subscribe();
    }
 
    public static void main(String[] args) {
        new DoOnComplete().run();
        new DoOnComplete().run2();
        new DoOnComplete().run3();
        new DoOnComplete().run4();
    }
}
cs

결과
completed 2
completed 3
completed 4

.doOnComplete()는 Flowable, Observable등에서 모든 item을 발행 하고 나면 한번만 실행 된다.

주로 끝났는지 여부를 로그를 찍기 위해 사용한다. 그래서 .doOnComplete(Action)은 파라메터로 Action을 받는다.

.doOnNext등 다른 메소드 들은 주로 Consumer를 받는 것과는 조금 다르다.

Action은 () -> {} 이렇게 아무런 파라메터가 들어오지 않는 인터페이스이다.

 completed 1 이 출력 안된 이유는 .subscribe()를 안해주어서 그렇다.



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import io.reactivex.Flowable;
import java.util.Arrays;
 
public class DoOnError {
 
    public void run5() {
        Flowable.fromIterable(Arrays.asList(120))
                .doOnComplete(() -> System.out.println("completed 5"))
                .map(item -> {
                    return 10 / item;
                })
                .doOnNext(System.out::println)
                .subscribe();
    }
 
    public static void main(String[] args) {
        new DoOnError().run5();
    }
 
}
cs

결과
io.reactivex.exceptions.OnErrorNotImplementedException: / by zero
10
5

그러면 발행중 에러가 나면 어떻게 될까?

보시다시피 에러나고 10, 5가 찍히고 끝난다.

completed 5는 찍히지 않는다.

그러면 .doOnComplete()로 세가지를 알 수 있다.

끝까지 잘 실행이 되었는지 또는 시작을 하지 않았는지 중간에 에러가 났는지 이렇게 세가지를 알 수 있다.


end.




728x90
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함