RxJava์ ๋ํ์ฌ ์์ธํ ์์๋ณด์
์์ฑ์ : ๋ฐํ์
Present Time : 2018โ09-07-FRI
[TOC]
์๋ฐ ์ธ์ด์ ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ๋๋ต ๋ ๊ฐ์ง ๊ด๊ณ๊ฐ ์๋ค๊ณ ์ ๋ฆฌ ํ ์ ์์.
-
๊ธฐ์กด pull ๋ฐฉ์์ ํ๋ก๊ทธ๋๋ฐ ๊ฐ๋ ์ push ๋ฐฉ์์ ํ๋ก๊ทธ๋๋ฐ ๊ฐ๋ ์ผ๋ก ๋ฐ๊พผ๋ค.
- ์๋ฅผ ๋ค์ด, ์ ๊ตญ ๋งค์ฅ์ ๋งค์ถ์ก ์ ๋ณด๋ฅผ ์ค์๊ฐ์ผ๋ก ์ง๊ณํ๋ค๊ณ ํ ๋, ๊ธฐ์กด์๋ ๊ฐ ๋งค์ฅ์ ๋ณํ ์ํฉ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ๊ฐ์ ธ(pull)์์ผํ๋ค. ํ์ง๋ง, ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์๋ ๋ฐ์ดํฐ์ ๋ณํ๊ฐ ๋ฐ์ํ์ ๋ ๋ณ๊ฒฝ์ด ๋ฐ์ํ ๊ณณ์์ ์๋ก์ด ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ด(push ๋ฐฉ์)์ค๋ค.
-
ํจ์ํ ํ๋ก๊ทธ๋๋ฐ์ ์ง์์ ๋ฐ๋๋ค.
- ์ฐ๋ฆฌ๊ฐ ์๋ ์ฝ๋ฐฑ์ด๋ ์ต์๋ฒ ํจํด์ ๋์ด RxJava๊ธฐ๋ฐ์ ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ด ๋๋ ค๋ฉด ํจ์ํ ํ๋ก๊ทธ๋๋ฐ์ด ํ์
- ์ฝ๋ฐฑ์ด๋ ์ต์ํผ ํจํด์ ์ต์๋ฒ๊ฐ 1๊ฐ์ด๊ฑฐ๋ ๋จ์ผ ์ค๋ ๋ ํ๊ฒฝ์์๋ ๋ฌธ์ ๊ฐ ์์ง๋ง ๋ฉํฐ ์ค๋ ๋ ํ๊ฒฝ์์๋ ๋ง์ ์ฃผ์๊ฐ ํ์ํ๋ค. (๋ํ์ ์ : ๋ฐ๋๋ฝ, ๋๊ธฐํ ๋ฌธ์ )
- ํจ์ํ ํ๋ก๊ทธ๋จ์ ๋ถ์ ํจ๊ณผ๊ฐ ์๋ค. (๋ถ์ํจ๊ณผ: ๊ฐ์ ์์์ ์ฌ๋ฌ ์ค๋ ๋๊ฐ ๊ฒฝ์์กฐ๊ฑด์ ๋น ์ง๊ฒ ๋์์ ๋ ์์ธกํ ์ ์๋ ์๋ชป๋ ๊ฒฐ๊ณผ๊ฐ ๋์ค๋ ํ์.)
- ํ ๋ ๊ฐ์ ์ค๋ ๋๊ฐ ์์ ๋๋ ์ ์ ๋์ํ๋ค๊ฐ ์์ญ ์๋ฐฑ ๊ฐ์ ์ค๋ ๋๊ฐ ๋์์ ๋จ์ผ ์์์ ์ ๊ทผํ๋ฉด ๊ณ์ฐ ๊ฒฐ๊ณผ๊ฐ ๊ผฌ์ด๊ณ ๋๋ฒ๊น ํ๋ค๊ฐ ๋งค์ฐ ์ด๋ ต๋ค.
- ํจ์ํ ํ๋ก๊ทธ๋๋ฐ์ ๋ถ์ ํจ๊ณผ๊ฐ ์๋ ์์ ํจ์๋ฅผ ์งํฅํ๋ค โโ> ์์ฅฌ? . ๋ฐ๋ผ์, ๋ฉํฐ์ค๋ ๋ ํ๊ฒฝ์์๋ ์์ ํ๋ค.
-
ํ๋ง๋๋ก, โํจ์ํ ํ๋ก๊ทธ๋๋ฐ ๋๊ตฌ๋ฅผ ํ์ฉํ ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐโ
๋ทํ๋ฆญ์ค์์ RxJava๋ฅผ ๋ง๋ค๊ฒ ๋ ํต์ฌ์ ์ธ ์ด์ 3๊ฐ์ง
-
๋์์ฑ์ ์ ๊ทน์ ์ผ๋ก ๋์ด์์ ํ์๊ฐ ์๋ค. (Embrance Concurrency)
์๋ฐ๊ฐ ๋์์ฑ ์ฒ๋ฆฌ๋ฅผ ํ๋ ๋ฐ ๋ฒ๊ฑฐ๋ก์์ด ์๋ค. ์ด๋ฅผ ํด๊ฒฐํ๋ ค๊ณ ๋ทํ๋ฆญ์ค๋ ํด๋ผ์ด์ธํธ์ ์์ฒญ์ ์ฒ๋ฆฌํ๋ ์๋น์ค ๊ณ์ธต์์ ๋์์ฑ์ ์ ๊ทน์ ์ผ๋ก ๋์ด์์๋ค. ํด๋ผ์ด์ธํธ์ ์์ฒญ์ ์ฒ๋ฆฌํ ๋ ๋ค์์ ๋น๋๊ธฐ ์คํ ํ๋ฆ(์ค๋ ๋ ๋ฑ)์ ์์ฑํ๊ณ ๊ทธ๊ฒ์ ๊ฒฐ๊ณผ๋ฅผ ์ทจํฉํ์ฌ ์ต์ข ๋ฆฌํดํ๋ ๋ฐฉ์์ผ๋ก ๋ด๋ถ ๋ก์ง์ ๋ณ๊ฒฝํ๋ค.
โ
-
์๋ฐ Future๋ฅผ ์กฐํฉํ๊ธฐ ์ด๋ ต๋ค๋ ์ ์ ํด๊ฒฐํด์ผ ํ๋ค. (Java Futures are Expensive to Compose)
2013๋ ๋น์ ์๋ฐ 8์์ ์ ๊ณตํ๋ CompletableFuture ๊ฐ์ ํด๋์ค๊ฐ ์ ๊ณต๋์ง ์์๊ธฐ ๋๋ฌธ์ด๋ค. ๊ทธ๋์ ๋น๋๊ธฐ ํ๋ฆ์ ์กฐํฉํ ๋ฐฉ๋ฒ์ด ๊ฑฐ์ ์์๋ค. RxJava์์๋ ์ด๋ฅผ ํด๊ฒฐํ๋ ค๊ณ ๋น๋๊ธฐ ํ๋ฆ์ ์กฐํฉํ ์ ์๋ ๋ฐฉ๋ฒ์ ์ ๊ณตํ๋ค. RxJava์์๋ ์กฐํจํ๋ ์คํ ๋จ์๋ฅผ ๋ฆฌ์กํฐ๋ธ ์ฐ์ฐ์๋ผ๊ณ ํ๋ค.
โ
-
์ฝ๋ฐฑ ๋ฐฉ์์ ๋ฌธ์ ์ ์ ๊ฐ์ ํด์ผ ํ๋ค. (Callbacks Have Their Own Problems)
์ฝ๋ฐฑ์ด ์ฝ๋ฐฑ์ ๋ถ๋ฅด๋ ์ฝ๋ฐฑ ์ง์ฅ ์ํฉ์ด ์ฝ๋์ ๊ฐ๋ ์ฑ์ ๋จ์ด๋จ๋ฆฌ๊ณ ๋ฌธ์ ๋ฐ์ ์ ๋๋ฒ๊น ์ ์ด๋ ต๊ฒ ๋ง๋ ๋ค.
๋น๋๊ธฐ ๋ฐฉ์์ผ๋ก ๋์ํ๋ ๊ฐ์ฅ ๋ํ์ ์ธ ํ๋ก๊ทธ๋๋ฐ ํจํด์ ์ฝ๋ฐฑ์ด๋ค. ๊ทธ๋์ RxJava๋ ์ฝ๋ฐฑ์ ์ฌ์ฉํ์ง ์๋ ๋ฐฉํฅ์ผ๋ก ์ค๊ณํ์ฌ ์ด๋ฅผ ํด๊ฒฐํ๋ค.
๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ๋น๋๊ธฐ ์ฐ์ฐ์ ํํฐ๋ง, ๋ณํ, ์กฐํฉํ์ฌ ์ ์ธ๊ฐ์ง ํต์ฌ์ด์ ๋ฅผ ํด๊ฒฐํ ์ ์๋ค.
Observable.just("Hello", "RxJava2")
.subscribe(System.out::println);
-
Observable ํด๋์ค : ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ด Observable์์ ์์
-
just() ํจ์ : ๊ฐ์ฅ ๋จ์ํ Observable ์ ์ธ ๋ฐฉ์. Ex) Observable.just(โHello!โ, โRxJava2!!โ).subscribe(System.out::println);
-
subscribe() ํจ์ : Observable์ ๊ตฌ๋ . Observable์ subscribe() ํจ์๋ฅผ ํธ์ถํด์ผ ๋น๋ก์ ๋ณํํ ๋ฐ์ดํฐ๋ฅผ ๊ตฌ๋ ์์๊ฒ ๋ฐํํ๋ค.
- just()ํจ์๋ง ํธ์ถํ๋ฉด ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ์ง ์์.
-
๋ฐ๋์ ๋ฐ์ดํฐ๋ฅผ ์์ ํ ๊ตฌ๋ ์๊ฐ subscribe()๋ฅผ ํธ์ถํด์ผ Observable์์ ๋ฐ์ดํฐ๊ฐ ๋ฐํ๋๋ค.
-
System.out::println
-
data -> System.out.println(data)
-
RxJava๋ ํจ์ํ ํ๋ก๊ทธ๋๋ฐ
- ํจ์ํ ํ๋ก๊ทธ๋๋ฐ์ ๋ถ์ํจ๊ณผ๊ฐ ์๋ ์์ ํจ์๋ฅผ ์งํฅํ๋ฏ๋ก ์ค๋ ๋์ ์์ ํ๋ค.
-
์๋ฐ๋ ํจ์ํ ์ธ์ด๊ฐ ์๋๋ฏ๋ก RxJava ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ ์์ ํจ์๋ก ์์ฑ๋ ๋ฆฌ์กํฐ๋ธ ์ฐ์ฐ์๋ฅผ ์ ๊ณต.
- ์ด ๋ฆฌ์กํฐ๋ธ ์ฐ์ฐ์ ๋๋ถ์ RxJava๋ ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ด ๋๋ ๊ฒ
- ๋ฆฌ์กํฐ๋ธ ์ฐ์ฐ์๋ฅผ ํ์ฉํ๋ฉด ๋ชฉ์ ์ ๋ฌ์ฑํ ์ ์๋ ๋๊ตฌ์ธ โํจ์ํ ํ๋ก๊ทธ๋๋ฐโ ๋ฐฉ์์ผ๋ก โ์ค๋ ๋์ ์์ ํ ๋น๋๊ธฐ ํ๋ก๊ทธ๋จโ์ ์์ฑํ ์ ์์
-
RxJava๊ฐ ์ด๋ ค์ด ์ด์
-
ํจ์ํ ์ฐ์ฐ์๋ฅผ ์ด๋ป๊ฒ ํธ์ถํด์ผ ํ๋์ง ๋ชจ๋ฅด๊ธฐ ๋๋ฌธ
-
โRxJava๋ ์๋๋ก์ด๋ ๊ฐ๋ฐ์ ์ฌ์ด์์ ๊ฐ์ฅ ํ์ ๋ค. ์ ์ผํ ๋จ์ ์ ์ฒ์์ ๋ฐฐ์ฐ๊ธฐ ์ด๋ ต๋ค๋ ๊ฒ์ด๋ค.โ
-
-
๊ถ์ฅํ๋ ํ์ต์์
-
Observable ํด๋์ค ๋ช ํํ๊ฒ ์ดํด (ํนํ ๋จ๊ฑฐ์ด(Hot) Observable๊ณผ ์ฐจ๊ฐ์ด(Cold) Observable์ ๊ฐ๋ ์ ๊ผญ ์ดํดํด์ผ ํจ)
-
๊ฐ๋จํ ์์ ๋ก map(), filter(), reduce, flatMap() ํจ์์ ์ฌ์ฉ๋ฒ์ ์ตํ๋๋ค.
-
์์ฑ ์ฐ์ฐ์, ๊ฒฐํฉ ์ฐ์ฐ์, ๋ณํ ์ฐ์ฐ์ ๋ฑ ์นดํ ๊ณ ๋ฆฌ๋ณ ์ฃผ์ ํจ์๋ฅผ ๊ณต๋ถ
-
์ค์ผ์ค๋ฌ์ ์๋ฏธ๋ฅผ ๋ฐฐ์ฐ๊ณ subscribeOn()๊ณผ observeOn() ํจ์์ ์ฐจ์ด๋ฅผ ์์๋ก๋๋ค.
โ5. ๊ทธ ๋ฐ์ ๋๋ฒ๊น , ํ๋ฆ ์ ์ด ํจ์๋ฅผ ์ตํ๋๋ค.
2์ฅ(Observable) => 3์ฅ(map-filter-reduce) => 4์ฅ(์ฃผ์ ์ฐ์ฐ์) => 5์ฅ(RxJava ๋๋ฌธ์ ๋ฌ๋ผ์ง๋ ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ, ์ค์ผ์ค๋ฌ) => 6์ฅ(RxAndroid) => 7์ฅ(๋๋ฒ๊น , ์์ธ์ฒ๋ฆฌ)
- ์๋ฐ 8์ ์ฌ์ฉํ์ง ์์๋ ๋จ. ์๋ฐ 8์์ ์ ๊ณตํ๋ Consumer, Predicate, Function๊ณผ ๊ฐ์ ํจ์ํ ์ธํฐํ์ด์ค๋ฅผ ์์ฒด ๊ตฌํํ์ผ๋ฏ๋ก ์๋ฐ 6 ์ด์์ด๋ฉด ๋์.
- But, 8์ ๋๋ค ํํ์๊ณผ ํจ์ ๋ ํผ๋ฐ์ค๋ฅผ ํ์ฉํ๋ ์ฝ๋๋ ๊ฐ๋ ์ฑ์ ์ข๊ฒํจ. (๋๋ค์ & ํจ์ ๋ ํผ๋ฐ์ค ๊ณต๋ถ)
- ๋ง๋ธ ๋ค์ด์ด๊ทธ๋จ์ RxJava๋ฅผ ์ดํดํ๋ ํต์ฌ ๋๊ตฌ
- RxJava๋ ๋ง๋ธ ๋ค์ด์ด๊ทธ๋จ์ผ๋ก ๋ฐฐ์ด๋ค๊ณ ํด๋ ๊ณผ์ธ์ด ์๋๋ค.
-
RxJava๋ Observable์์ ์์ํด Observable๋ก ๋๋๋ค๊ณ ํด๋ ๊ณผ์ธ์ด ์๋ ์ ๋๋ก ์ค์ํ ๊ฐ๋ .
-
RxJava 1.x ์์๋ Observable, Single ํด๋์ค
-
2.x ์์๋ Observable, Maybe, Flowable ํด๋์ค (์ํฉ์ ๋ง๊ฒ ์ธ๋ถํํด ๊ตฌ๋ถํด ์ฌ์ฉ)
โ
-
-
Observable์ ์ต์๋ฒ(observer) ํจํด์ ๊ตฌํ
- ์ต์๋ฒ ํจํด์ ๊ฐ์ฒด์ ์ํ ๋ณํ๋ฅผ ๊ด์ฐฐํ๋ ๊ด์ฐฐ์(์ต์๋ฒ) ๋ชฉ๋ก์ ๊ฐ์ฒด์ ๋ฑ๋ก
-
๊ทธ๋ฆฌ๊ณ ์ํ ๋ณํ๊ฐ ์์ ๋๋ง๋ค ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ๊ฐ์ฒด๊ฐ ์ง์ ๋ชฉ๋ก์ ๊ฐ ์ต์๋ฒ์๊ฒ ๋ณํ๋ฅผ ์๋ ค์ค๋ค.
- ๋ผ์ด๋ธ์ฌ์ดํด์ ์กด์ฌํ์ง ์์ผ๋ฉฐ, ๋ณดํต ๋จ์ผ ํจ์๋ฅผ ํตํด ๋ณํ๋ง ์๋ฆฐ๋ค.
-
Observable์ ๋ฌด์จ ๋ป?
- ์ง๊ด์ ์ผ๋ก => ๊ด์ฐฐ์(Observer)๊ฐ ๊ด์ฐฐํ๋ ๋์
-
์กฐ๊ธ ๋ถ์กฑํด! => Observed๋ผ๋ ๋จ์ด๊ฐ ๊ด์ฐฐ์ ํตํด์ ์ป์ ๊ฒฐ๊ณผ๋ฅผ ์๋ฏธํ๋ค๋ฉด, Observable์ ํ์ฌ๋ ๊ด์ฐฐ๋์ง ์์์ง๋ง ์ด๋ก ์ ํตํด์ ์์ผ๋ก ๊ด์ฐฐํ ๊ฐ๋ฅ์ฑ์ ์๋ฏธํ๋ค.
-
์ต์๋ฒ ํจํด์ ๋ํ์ ์ธ ์ ) ๋ฒํผ์ ๋๋ฅด๋ฉด ๋ฒํผ์ ๋ฏธ๋ฆฌ ๋ฑ๋กํด ๋ onClick() ๋ฉ์๋๋ฅผ ํธ์ถํด ์ํ๋ ์ฒ๋ฆฌ๋ฅผ ํ๋ ๊ฒ
-
RxJava์ Observable์ ์ธ๊ฐ์ง์ ์๋ฆผ์ ๊ตฌ๋ ์์๊ฒ ์ ๋ฌ
- onNext : Observable์ด ๋ฐ์ดํฐ์ ๋ฐํ์ ์๋ฆฐ๋ค. ๊ธฐ์กด์ ์ต์๋ฒ ํจํด๊ณผ ๊ฐ์.
-
onComplete : ๋ชจ๋ ๋ฐ์ดํฐ์ ๋ฐํ์ ์๋ฃํ์์ ์๋ฆฐ๋ค. onComplete์ด๋ฒคํธ๋ ๋จ ํ๋ฒ๋ง ๋ฐ์ํ๋ฉฐ, ๋ฐ์ํ ํ์๋ ๋ ์ด์ onNext ์ด๋ฒคํธ๊ฐ ๋ฐ์ํด์ ์ ๋๋ค.
- onError : ์๋ฌ ๋ฐ์์ ์๋ฆผ. onError ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ๋ฉด ์ดํ์ onNext ๋ฐ onComplete ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ง ์๋๋ค. ์ฆ, Observable์ ์คํ์ ์ข ๋ฃ.
-
Observable ํด๋์ค์ ๋ง์ ์์ ํจ์ ์กด์ฌ (Observable์ ์์ฑํ๋ ํฉํ ๋ฆฌ ํจ์, ์ค๊ฐ ๊ฒฐ๊ณผ๋ฅผ ์ฒ๋ฆฌํ๋ ํจ์, ๋๋ฒ๊ทธ ๋ฐ ์์ธ ํจ์๊ฐ ๋ชจ๋ ํฌํจ)
-
Observable์ ์์ฑํ ๋๋ ์ง์ ์ธ์คํด์ค๋ฅผ ๋ง๋ค์ง ์๊ณ ์ ์ ํฉํ ๋ฆฌ ํจ์๋ฅผ ํธ์ถ.
- 1.x ๊ธฐ๋ณธ ํฉํ ๋ฆฌ ํจ์ : create(), just(), from()
-
2.x ์ถ๊ฐ ํฉํ ๋ฆฌ ํจ์ : fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher()
- ๊ธฐํ ํฉํ ๋ฆฌ ํจ์ : interval(), range(), timer(), defer()
Observable์ ๊ฐ์ฒด๋ฅผ ์์ฑํ๋ ํฉํ ๋ฆฌ ํจ์.
Observable์ ์์ฑํ ๋๋ ์ธ์คํด์ค๋ฅผ ์ง์ ๋ง๋ค์ง ์๊ณ ์ ์ ํฉํ ๋ฆฌ ํจ์๋ฅผ ํธ์ถํ๋ค.
- ์ธ์๋ก ๋ฃ์ ๋ฐ์ดํฐ๋ฅผ ์ฐจ๋ก๋ก ๋ฐํํ๊ณ Observable์ ์์ฑ
- 1~10๊ฐ์ ์ธ์๋ฅผ ๋ฃ์ ์ ์์. (๋จ, ํ์ ์ ๋ชจ๋ ๊ฐ์์ผํจ)
-
๋ด๊ฐ ๋์์ํค๊ธฐ ์ํ๋ ๊ฒ์ ์ฌ์ ์ ์ ์ํด๋ ๋ค์ ์ค์ ๊ทธ๊ฒ์ด ์คํ๋๋ ์์ ์ ์กฐ์ => subscribe()
-
just() ๋ฑ์ ํฉํ ๋ฆฌ ํจ์๋ก ๋ฐ์ดํฐ ํ๋ฆ์ ์ ์ํ ํ, subscribe()๋ฅผ ํธ์ถํด์ผ ์ค์ ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ฐํํจ.
-
์ฃผ์ ์ํ(4๊ฐ์ง)
- ํน์ง : ๋ชจ๋ Disposable ์ธํฐํ์ด์ค์ ๊ฐ์ฒด๋ฅผ ๋ฆฌํดํจ.
์ํ ํน์ง Disposable subscribe() ์ธ์์์, onNext์ onComplete ์ด๋ฒคํธ๋ฅผ ๋ฌด์ํ๊ณ onError ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ ๋๋ง exception throwํจ. ==> ํ ์คํธํ๊ฑฐ๋ ๋๋ฒ๊น ํ ๋ ํ์ฉ Disposable subscribe(Consumer<? super T> onNext) ์ธ์๊ฐ 1๊ฐ ์๋ ์ค๋ฒ๋ก๋ฉ์ onNext ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌ / ์ด๋๋ onError ๋ฐ์ํ๋ฉด exception์ ๋์ง Disposable subscribe (Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError) onNext์ onError ์ด๋ฒคํธ ์ฒ๋ฆฌ Disposable subscibe(Consumer<? super T> onNext, Consumer<? super java.lang.Throwable> onError, Action OnComplete) onNext, onError, onComplete ์ด๋ฒคํธ ๋ชจ๋ ์ฒ๋ฆฌ โ
-
void dispose() / boolean isDisposed() => 2๊ฐ ํจ์๋ง ์กด์ฌ
-
dispose()
-
Observable์๊ฒ ๋ ์ด์ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ์ง ์๋๋ก ๊ตฌ๋ ์ ํด์งํ๋ ํจ์
-
Observable์ด onComplete ์๋ฆผ์ ๋ณด๋์ ๋, ์๋์ผ๋ก dispose()๋ฅผ ํธ์ถํด
Observable๊ณผ ๊ตฌ๋ ์์ ๊ด๊ณ๋ฅผ ๋๋๋ค. => ๋ฐ๋ก dispose() ํธ์ถ X
-
-
isDisposed()
- Observable์ด ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ์ง ์๋์ง (๊ตฌ๋ ์ ํด์งํ๋์ง) ํ์ธํ๋ ํจ์
- ๊ตฌ๋ ์ ํด์งํ์ผ๋ฉด -> true
- onNext, onComplete, onError ๊ฐ์ ์๋ฆผ์ ๊ฐ๋ฐ์๊ฐ ์ง์ ํธ์ถํด์ผํจ.
- ๋ฐ๋ฉด์) just() ํจ์๋ ๋ฐ์ดํฐ๋ฅผ ์ธ์๋ก ๋ฃ์ผ๋ฉด ์๋์ผ๋ก ์๋ฆผ ์ด๋ฒคํธ๊ฐ ๋ฐ์
- (RxJava์ javadoc์ ๋ฐ๋ฅด๋ฉด) create()๋ RxJava์ ์ต์ํ ์ฌ์ฉ์๋ง ํ์ฉํ๋๋ก ๊ถ๊ณ
- ๋ค๋ฅธ ํฉํ ๋ฆฌ ํจ์๋ฅผ ํ์ฉํ๋ฉด ๊ฐ์ ํจ๊ณผ๋ฅผ ๋ผ ์ ์์
- ๋จ์ผ ๋ฐ์ดํฐ๊ฐ ์๋ ๋ fromXXX() ๊ณ์ด ํจ์ ์ฌ์ฉ
- just(), create() ๋ ๋จ์ผ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ค์.
- RxJava2์์ from() ํจ์ ์ธ๋ถํ ๋จ.
- RxJava 1.x์์๋ from(), fromCallable() ๋ง ์ฌ์ฉ
- from() ์ ๋ฐฐ์ด, ๋ฐ๋ณต์, ๋น๋๊ธฐ ๊ณ์ฐ ๋ฑ์ ๋ชจ๋ ์ฌ์ฉํ๋ค ๋ณด๋ ๋ชจํธํจ์ด ์์ด ์ธ๋ถํํจ.
- ๋ฐฐ์ด์ด ๋ค์ด์๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋ ์ฌ์ฉ
- Observable.fromArray(arr)
- RxJava์์ int ๋ฐฐ์ด์ ์ธ์์ํค๋ ค๋ฉด Integer[] ๋ก ๋ณํํด์ผ ํจ.
- Observable์ ๋ง๋๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ Iterable ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ ํด๋์ค์์ Observable ๊ฐ์ฒด๋ฅผ ์์ฑํ๋ ๊ฒ.
- Iterable< E > ์ธํฐํ์ด์ค ๊ตฌํํ๋ ๋ํ์ ์ธ ํด๋์ค
- ArrayList(List), ArrayBlockingQueue(BlockingQueue), HashSet(Set), LinkedList, Stack, TreeSet, Vector ๋ฑ..
List<String> foods = new ArrayList<>();
foods.add("pizza");
foods.add("chicken");
foods.add("hamburger");
Observable<String> source = Observable.fromIterable(foods);
source.subscribe(System.out::println);
/* ์คํ๊ฒฐ๊ณผ
pizza
chicken
hamburger
*/
-
Observable์ ํน์ํ ํํ (1.x ๋ฒ์ ๋ถํฐ ์กด์ฌ)
-
Observable ํด๋์ค๋ ๋ฐ์ดํฐ๋ฅผ ๋ฌดํํ๊ฒ ๋ฐํํ ์ ์์ง๋ง, Single ํด๋์ค๋ ์ค์ง 1๊ฐ์ ๋ฐ์ดํฐ๋ง ๋ฐํํ๋๋ก ํ์
-
- ๋ณดํต ๊ฒฐ๊ณผ๊ณผ ์ ์ผํ ์๋ฒ API๋ฅผ ํธ์ถํ ๋ ์ ์ฉํ๊ฒ ์ฌ์ฉ
-
๋ฐ์ดํฐ ํ๋๊ฐ ๋ฐํ๊ณผ ๋์์ ์ข ๋ฃ
-
- ๋ผ์ดํ์ฌ์ดํด ๊ด์ ์์ onNext() / onComplete() ํจ์๊ฐ onSuccess() ๋ก ํตํฉ
- onSuccess(T value) / onError() ํจ์๋ก ๊ตฌ์ฑ
Single ๊ฐ์ฒด๋ฅผ ์์ฑํ ๋, ๊ฐ์ฅ ๊ฐ๋จํ ๋ฐฉ๋ฒ์ just() ํจ์๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด๋ค.
Single.just(โHello Singleโ);
- Single์ Observable์ ํน์ํ ํํ์ด๋ฏ๋ก Observable์์ ๋ณํํ ์ ์๋ค.
- Observable ํด๋์ค์์ Single ํด๋์ค ์ฌ์ฉ
just() ์ ์ฌ๋ฌ๊ฐ์ ๊ฐ์ ๋ฃ์ผ๋ฉด ์ด๋ป๊ฒ ๋ ๊น~?
Observable.just("one","error").single("default item");
- ์๋ฌ ๋ฐ์!!
- ๋๋ฒ์งธ ๊ฐ์ ๋ฐํํ๋ฉด์ onNext ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ ๋ ์๋ฌ๊ฐ ๋ฐ์๋จ
- (IllegalArgumentException: Sequence contains more than one element!)
- RxJava2์ ์ฒ์ ๋์
๋ Observable์ ๋ ๋ค๋ฅธ ํน์ํํ
- RxJava1.X ์๋ ์์์.
- Single ํด๋์ค์ ๋ง์ฐฌ๊ฐ์ง๋ก ์ต๋ ๋ฐ์ดํฐ ํ๋๋ฅผ ๊ฐ์ง ์ ์์ง๋ง ๋ฐ์ดํฐ ๋ฐํ ์์ด ๋ฐ๋ก ๋ฐ์ดํฐ ๋ฐ์์ ์๋ฃํ ์ ์๋ค.
- Single : 1๊ฐ ์๋ฃ / Maybe : 0 or 1 ์๋ฃ
- ์์ฑ๋ฐฉ๋ฒ
- Maybe ํด๋์ค๋ฅผ ํตํด ์์ฑํ ์ ์์ง๋ง,
- ๋ณดํต Observable์ ํน์ ์ฐ์ฐ์๋ฅผ ํตํด ์์ฑํ ๋๊ฐ ๋ง์
- elementAt(), firstElement(), flatMapMaybe(), lastElement(), reduce(), singleElement() ํจ์ ๋ฑ
- Observable 2๊ฐ์ง๋ก ๋๋จ ==> ๋จ๊ฑฐ์ด Observable / ์ฐจ๊ฐ์ด Observable
- ์ฐจ๊ฐ์ด -> ๋จ๊ฑฐ์ด ๋ณํ ๊ฐ๋ฅ (Subject ๊ฐ์ฒด๋ฅผ ๋ง๋ค๊ฑฐ๋ ConnectableObservable ํด๋์ค ํ์ฉ)
- Observable์ ์ ์ธํ๊ณ just() ํจ์๋ฅผ ํธ์ถํด๋ ์ต์๋ฒ๊ฐ subscribe() ํจ์๋ฅผ ํธ์ถํ์ฌ ๊ตฌ๋
ํ์ง ์์ผ๋ฉด ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ์ง ์๋๋ค. ==> ๊ฒ์ผ๋ฅธ ์ ๊ทผ๋ฒ
- ์์) ์น ์์ฒญ, ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฟผ๋ฆฌ์ ์ฝ๊ธฐ ๋ฑ / ๋ด๊ฐ ์ํ๋ URL์ด๋ ๋ฐ์ดํฐ๋ฅผ ์ง์ ํ๋ฉด ๊ทธ๋๋ถํฐ ์๋ฒ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์์ฒญ์ ๋ณด๋ด๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์์ด
- ๊ตฌ๋ ์๊ฐ ๊ตฌ๋ ํ๋ฉด ์ค๋น๋ ๋ฐ์ดํฐ๋ฅผ ์ฒ์๋ถํฐ ๋ฐํ.
- ์ง๊ธ๊น์ง ์ฐ๋ฆฌ๊ฐ ๋ค๋ฃฌ๊ฑฐ, ์์ผ๋ก๋ ๋ณ๋์ ์ธ๊ธ์ด ์์ผ๋ฉด ์ฐจ๊ฐ์ด Observable
- ๊ตฌ๋ ์๊ฐ ์กด์ฌ ์ฌ๋ถ์ ๊ด๊ณ์์ด ๋ฐ์ดํฐ๋ฅผ ๋ฐํํจ. => ์ฌ๋ฌ ๊ตฌ๋ ์๋ฅผ ๊ณ ๋ คํ ์ ์๋ค.
- ๋จ, ๊ตฌ๋ ์๋ก์๋ Observable์์ ๋ฐํํ๋ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์์ ํ ๊ฒ์ผ๋ก ๋ณด์ฅํ ์ ์๋ค.
- ๊ตฌ๋
ํ ์์ ๋ถํฐ Observable์์ ๋ฐํํ ๊ฐ์ ๋ฐ๋๋ค.
- ์์) ๋ง์ฐ์ค ์ด๋ฒคํธ, ํค๋ณด๋ ์ด๋ฒคํธ, ์์คํ ์ด๋ฒคํธ, ์ผ์ ๋ฐ์ดํฐ, ์ฃผ์ ๊ฐ๊ฒฉ ๋ฑ
- ์ฃผ์ํ ์ ) ๋ฐฐ์์ ๊ณ ๋ คํด์ผ ํจ.
- ๋ฐฐ์์ Observable์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ๋ ์๋์ ๊ตฌ๋ ์๊ฐ ์ฒ๋ฆฌํ๋ ์๋์ ์ฐจ์ด๊ฐ ํด ๋ ๋ฐ์
- Flowable์ด๋ผ๋ ํนํ ํด๋์ค์์ ๋ฐฐ์์ ์ฒ๋ฆฌ
Subject ํด๋์ค๋ ์ฐจ๊ฐ์ด Observable์ ๋จ๊ฑฐ์ด Observable๋ก ๋ณํํด์ค๋ค.
- Observable์ ์์ฑ๊ณผ ๊ตฌ๋
์์ ์์ฑ์ด ๋ชจ๋ ์๋ค.
- Observable ์ฒ๋ผ) ๋ฐ์ดํฐ ๋ฐํ ๊ฐ๋ฅ
- ๊ตฌ๋ ์ ์ฒ๋ผ ) ๋ฐํ๋ ๋ฐ์ดํฐ๋ฅผ ๋ฐ๋ก ์ฒ๋ฆฌ ๊ฐ๋ฅ
์ฃผ์ Subject ํด๋์ค์๋ AsyncSubject / BehaviorSubject / PublishSubject / ReplaySubject ๋ฑ์ด ์๋ค.
-
๋ค์ํ ์ฐ์ฐ์ ํจ์๊ฐ ์กด์ฌ
- ํ์ง๋ง, ๋ชจ๋ ์์์ผ ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ํ ์ ์๋๊ฑด ์๋๋ค
- ํ์ ์ฐ์ฐ์์ ๊ฐ๋ ์ ์๋ฉด ๋๋จธ์ง๋ ํ์ ์ฐ์ฐ์์์ ํ์๋ ์ฐ์ฌ์์ด๋ฏ๋ก ์ดํดํ ์ ์๊ธฐ ๋๋ฌธ
- ํ์ง๋ง, ๋ชจ๋ ์์์ผ ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ํ ์ ์๋๊ฑด ์๋๋ค
-
์ธ์ด ํน์ฑ๊ณผ ํฌ๊ฒ ์ฐ๊ด์ด ์๋ค.
- ReactiveX๋ ์๋ฐ ๋ฟ๋ง ์๋๋ผ ์๋ฐ์คํฌ๋ฆฝํธ, ๋ท๋ท, ์ค์นผ๋ผ, ํด๋ก์ , ์ค์ํํธ์ ๋ฆฌ์กํฐ๋ธ ์ฐ์ฐ์ ๋ชฉ๋ก ํจ๊ป ์ ๊ณต
- RxJava์ ์ต์ํด์ง๋ฉด ๋ค๋ฅธ ํ๋ก๊ทธ๋๋ฐ ์ธ์ด์์๋ ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ ์ฝ๊ฒ ์ตํ ์ ์์.
์ฐ์ฐ์ ์ข ๋ฅ | ๋ฌด์ | Ex |
---|---|---|
์์ฑ(Creating) ์ฐ์ฐ์ | Observable, Single ํด๋์ค ๋ฑ์ผ๋ก ๋ฐ์ดํฐ์ ํ๋ฆ์ ๋ง๋ค์ด ๋ด๋ ํจ์ | create(), just(), fromArray(), interval(), range(), timer() ๋ฑ |
๋ณํ(Transforming) ์ฐ์ฐ์ | ์ ๋ ฅ์ ๋ฐ์์ ์ํ๋ ์ถ๋ ฅ์ ๋ด๋ ์ ํต์ ์ธ ์๋ฏธ์ ํจ์ | map(), flatmap() ๋ฑ |
ํํฐ(Filter) ์ฐ์ฐ์ | ์ ๋ ฅ ๋ฐ์ดํฐ ์ค ์ํ๋ ๋ฐ์ดํฐ๋ง ๊ฑธ๋ฌ๋ | filter(), first(), take() ๋ฑ |
ํฉ์ฑ(Combining) ์ฐ์ฐ์ | ์ฌ๋ฌ Observable ์กฐํฉ | |
์ค๋ฅ์ฒ๋ฆฌ(Error Handling) ์ฐ์ฐ์ | onErrorReturn(), onErrorResumeNext(), retry() ๋ฑ | |
์ ํธ๋ฆฌํฐ(Utility) ์ฐ์ฐ์ | subscribeOn(), observeOn() | |
์กฐ๊ฑด(Conditional) ์ฐ์ฐ์ | Observable์ ํ๋ฆ์ ์ ์ดํ๋ ์ญํ | |
์ํ๊ณผ ์งํฉํ ์ฐ์ฐ์ | ์ํ ํจ์์ ์ฐ๊ด์๋ ์ฐ์ฐ์ | |
๋ฐฐ์(Back pressure) ์ฐ์ฐ์ | ๋ฐฐ์ ์ด์์ ๋์ํ๋ ์ฐ์ฐ์ |
- 2017๋ 5์ ๋ฑ๋ก๋ ์ฐ์ฐ์ ๊ฐ์: 400๊ฐ ์ด์
- ์
๋ ฅ๊ฐ์ ์ด๋ค ํจ์์ ๋ฃ์ด์ ์ํ๋ ๊ฐ์ผ๋ก ๋ณํํ๋ ํจ์.
- String์ String์ผ๋ก ๋ณํํ ์๋ ์๊ณ , String์ Integer๋ ๋ค๋ฅธ ๊ฐ์ฒด๋ก๋ ๋ณํํ ์ ์์
- .map(ball -> ball + โ*โ);
- map ์ํ : public final Observable map(Function<? super T, ? extends R> maper)
- ์ฆ map ํจ์ ์ธ์ โ Function<? super T, ?extends R> mapperโ ์ โball->ball+โ*โ โ ๋ค์ด๊ฐ๊ฑฐ
- Function ์ธํฐํ์ด์ค๋ฅผ ์ด์ฉํด ๋ถ๋ฆฌ
- Function<String, String> getStar = ball -> ball + โ*โ ;
- .map(getStar) // ์๋๋ .map(ball -> ball + โ*โ); ์ด๊ฑฐ ์์
- map ์ํ : public final Observable map(Function<? super T, ? extends R> maper)
- map() ํต์ฌ
- ๋ด๊ฐ ์ํ๋ ๊ฐ์ โ์ด๋ค ํจ์โ ์ ๋ฃ๋ ๊ฒ
- ์ด๋ค ํจ์ : Function ์ธํฐํ์ด์ค ๊ฐ์ฒด / ๋๋ค ํํ์
- ์ํ๋ ํจ์๋ฅผ ์ ์ํ ์ ์๋๋๊ฐ ๊ด๊ฑด
- ๋ด๊ฐ ์ํ๋ ๊ฐ์ โ์ด๋ค ํจ์โ ์ ๋ฃ๋ ๊ฒ
- map()์ ์ข ๋ ๋ฐ์ ์ํจ ํจ์
- ๊ฒฐ๊ณผ๊ฐ Observable๋ก ๋์ด
- flatMap() => ์ผ๋๋ค or ์ผ๋์ผ Observable ํจ์
- map() => ์ผ๋์ผ ํจ์
- RxJava์์ ์ฌ๋ฌ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ๋ ๋ฐฉ๋ฒ์ Observable ๋ฐ์ ์์ (๋ฐฐ์(back pressure)์ ๊ณ ๋ คํ๋ฉด Observable ๋์ ์ Flowable)
-
Observable์์ ์ํ๋ ๋ฐ์ดํฐ๋ง ๊ฑธ๋ฌ๋ด๋ ์ญํ
-
- ํ์์๋ ๋ฐ์ดํฐ๋ ์ ๊ฑฐํ๊ณ ์ค์ง ๊ด์ฌ ์๋ ๋ฐ์ดํฐ๋ง filter() ํจ์๋ฅผ ํต๊ณผํ๊ฒ ๋จ.
-
Predicate๋ฅผ ์ธ์๋ก ๋ฃ์ (Predicate - ์ง์ ํ๋ณ์ด๋ผ๋ ๋ป, boolean ๊ฐ์ ๋ฆฌํดํ๋ ํน์ํ ํจ์ํ ์ธํฐํ์ด์ค)
-
- Map()์ Function ๊ฐ์ฒด๋ฅผ ์ธ์๋ก ๋ฃ์
- ๋๋ค๋ฅผ ์ฌ์ฉํ๋ฉด Function์ธ์ง Predicate์ธ์ง ์ ๊ฒฝ์ฐ์ง ์๊ณ ๋์ผํ๊ฒ ์ฝ๋ฉ ํ ์ ์๋ ์ฅ์ (๊ตฌ๋ถ์ ์ปดํ์ผ๋ฌ๊ฐ~)
-
Observable source = Observable.fromArray(data).filter(number -> number % 2 ==0 );
-
filter()์ ๋น์ทํ ํจ์๋ค
-
- first(default) : Observable์ ์ฒซ ๋ฒ์งธ ๊ฐ์ ํํฐ. ๊ฐ์ด ์์ด ์๋ฃ๋๋ฉด ๊ธฐ๋ณธ๊ฐ ๋ฆฌํด.
- last(default) : ๋ง์ง๋ง ๊ฐ
- take(N) : ์ต์ด N ๊ฐ ๊ฐ๋ง ๊ฐ์ ธ์ด.
- takeLast(N) : ๋ง์ง๋ง N ๊ฐ ๊ฐ๋ง ํํฐํจ.
- skip(N) : ์ต์ด N ๊ฐ์ ๊ฑด๋๋.
- skipLast(N) : ๋ง์ง๋ง N๊ฐ ๊ฐ์ ๊ฑด๋๋.
- ๊ฐ์ฅ ์ ์ฉํ ํจ์๋ take()
-
์ํฉ์ ๋ฐ๋ผ ๋ฐํ๋ ๋ฐ์ดํฐ๋ฅผ ์ทจํฉํ์ฌ ์ด๋ค ๊ฒฐ๊ณผ๋ฅผ ๋ง๋ค์ด๋ผ ๋
-
- ๋ฐํํ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์ฌ์ฉํ์ฌ ์ด๋ค ์ต์ข ๊ฒฐ๊ณผ ๋ฐ์ดํฐ๋ฅผ ํฉ์ฑํ ๋
-
Maybe source = Observable.fromArray(balls).reduce((ball1,ball2) -> ball2 + โ(โ + ball1 + โ)โ );
-
-
Observable ์ด ์๋๋ผ Maybe
-
- Why? : reduce()ํจ์๋ฅผ ํธ์ถํ๋ฉด ์ธ์๋ก ๋๊ธด ๋๋ค ํํ์์ ์ํด ๊ฒฐ๊ณผ ์์ด ์๋ฃ๋ ์๋ ์์.
- ๋ฐ๋ผ์ Observable์ด ์๋๋ผ Maybe ๊ฐ์ฒด๋ก ๋ฆฌํด
-
-
reduce(BiFunction<T,T,T> reducer) => ์ํ
-
- BiFunction ์ธํฐํ์ด์ค๋ฅผ ์ธ์๋ก ํ์ฉ (Function X)
-
- BiFunction์ด๋? ์ ๋ ฅ ์ธ์๋ก 2๊ฐ์ ๊ฐ์ ๋ฐ๋ ํจ์ํ ์ธํฐํ์ด์ค
- ๋๋ค์์ ์ธ์์ ๊ฐ์๊ฐ 2๊ฐ ์ด์์ผ ๋๋, ๊ดํธ๋ก ์ธ์์ ๋ชฉ๋ก์ ๋ช ์์ ์ผ๋ก ํํํด์ค์ผ ํจ. Ex) (ball1,ball2) -> ball2 + โ(โ + ball1 + โ)โ
- BiFunction<String, String, String> : ์ธ์1, ์ธ์2, ๋ฆฌํด ํ์ ๋ชจ๋ String
์ฐ์ฐ์ ์ข ๋ฅ๊ฐ ๋ง์ ์นดํ ๊ณ ๋ฆฌ ๋ณ๋ก ๋๋ (ReactiveX ํํ์ด์ง ๊ธฐ์ค)
์์ฑ ์ฐ์ฐ์ | just(), fromXXX(), create(), interval(), range(), timer(), intervalRange(), defer(), repeat() |
๋ณํ ์ฐ์ฐ์ | map(), flatMap(), concatMap(), switchMap), groupBy(), scan(), buffer(), window() |
ํํฐ ์ฐ์ฐ์ | filter(), take(), skip(), distinct() |
๊ฒฐํฉ ์ฐ์ฐ์ | zip(), combineLatest(), Merge(), concat() |
์กฐ๊ฑด ์ฐ์ฐ์ | amb(), takeUntil(), skipUntil(), all() |
์๋ฌ ์ฒ๋ฆฌ ์ฐ์ฐ์ | onErrorReturn(), onErrorResumeNext(), retry(), retryUntil() |
๊ธฐํ ์ฐ์ฐ์ | subscribe(), subscribeOn(), observeOn(), reduce(), count() |
์์ฑ ์ฐ์ฐ์์ ์ญํ ์ ๋ฐ์ดํฐ์ ํ๋ฆ์ ๋ง๋๋ ๊ฒ์ด๋ค.
๊ฐ๋จํ๊ฒ Observable (Observable, Single, Maybe ๊ฐ์ฒด ๋ฑ)์ ๋ง๋๋ ๊ฒ
-
์ผ์ ์๊ฐ ๊ฐ๊ฒฉ์ผ๋ก ๋ฐ์ดํฐ ํ๋ฆ ์์ฑ
-
์ฃผ์ด์ง ์๊ฐ ๊ฐ๊ฒฉ์ผ๋ก 0๋ถํฐ 1์ฉ ์ฆ๊ฐํ๋ Long ๊ฐ์ฒด(๊ธฐ๋ณธํX, ๋ํผํด๋์คO) ๋ฐํ
-
ํจ์ ์ํ ๋๊ฐ์ง
ํจ์ ์ํ ์ค๋ช ๋น๊ณ interval(long period, TimeUnit unit) ์ผ์ ์๊ฐ(period) ์ฌ์๋ค๊ฐ ๋ฐ์ดํฐ ๋ฐํ interval(long initialDelay, long period, TimeUnit unit) ๋์์ ๊ฐ๊ณ ์ต์ด ์ง์ฐ ์๊ฐ(initialDelay)์ ์กฐ์ ๋ณดํต ์ด๊ธฐ ์ง์ฐ์๊ฐ ์์ด(initialDelay๋ฅผ 0์ผ๋ก) ๋ฐ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ๊ธฐ ์ํด ์ฌ์ฉ
@SchedulerSupport(SchedulerSupport.COMPUTATION)
- interval() ํจ์์ ๋์์ด ๊ณ์ฐ ์ค์ผ์ค๋ฌ์์ ์คํ๋๋ค๋ ์๋ฏธ
- ํ์ฌ ์ค๋ ๋๊ฐ ์๋๋ผ ๊ณ์ฐ์ ์ํ ๋ณ๋์ ์ค๋ ๋(RxJava์์ ์ค์ผ์ค๋ฌ)์์ ๋์
- interval()๊ณผ ์ ์ฌํ์ง๋ง ํ๋ฒ๋ง ์คํ๋๋ ํจ์
- ์ผ์ ์๊ฐ์ด ์ง๋ ํ, ํ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ๊ณ onComplete() ์ด๋ฒคํธ ๋ฐํ
- ์ฃผ์ด์ง ๊ฐ (n) ๋ถํฐ m๊ฐ์ Integer ๊ฐ์ฒด๋ฅผ ๋ฐํ
- interval() / timer() ๋ Long ๊ฐ์ฒด๋ฅผ ๋ฐํํ์ง๋ง range()๋ Integer ๊ฐ์ฒด๋ฅผ ๋ฐํํ๋ ๊ฒ์ด ๋ค๋ฆ
- ์ค์ผ์ค๋ฌ์์ ์คํ๋์ง ์๊ณ ํ์ฌ ์ค๋ ๋์์ ์คํ
- (interval(), timer()์ ๋ค๋ฅด๊ฒ)
-
interval() + range() ๋ฅผ ํผํฉํด๋์ ํจ์
- interval() ์ฒ๋ผ ์ผ์ ์๊ฐ ๊ฐ๊ฒฉ์ผ๋ก ๊ฐ์ ์ถ๋ ฅํ์ง๋ง
- range() ์ฒ๋ผ ์์ ์ซ์(n)๋ก ๋ถํฐ m๊ฐ๋งํผ์ ๊ฐ๋ง ์์ฑํ๊ณ onComplete์ด๋ฒคํธ๊ฐ ๋ฐ์
- ์ฆ, interval() ์ฒ๋ผ ๋ฌดํํ ๋ฐ์ดํฐ ํ๋ฆ์ ๋ฐํํ์ง ์์
-
๋ฆฌํด ํ์ ์ interval()ํจ์์ ๋์ผํ๊ฒ Long ํ์
โ
public static Observable<Long> intervalRange
(long start, long count, long initialDelay, long period, TimeUnit unit) { }
์ธ์์ ๊ฐ์๊ฐ ๋๋ฌด ๋ง์์ ์ง๊ด์ ์ด์ง ์๋ค. ์ฐจ๋ผ๋ฆฌ ๋ค๋ฅธ ํจ์๋ค์ ์กฐํฉํด intervalRange() ๋ฅผ ๋ง๋ค ์ ์๋ค.
Observable<Long> souce = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map (val -> val+1)
.take (5);
source.subscribe(Log::i);
CommonUtils.sleep(1000);
- timer() ์ ๋น์ทํ์ง๋ง ๋ฐ์ดํฐ ํ๋ฆ์์ฑ์ ๊ตฌ๋ ์๊ฐ subscribe() ํจ์๋ฅผ ํธ์ถํ ๋๊น์ง ๋ฏธ๋ฃฐ ์ ์์. ์ด๋ ์๋ก์ด Observable์ด ์์ฑ๋จ
- ๋จ์ํ ๋ฐ๋ณต ์คํ์ ํจ.
- ํด๋น ์๋ฒ๊ฐ ์ ์ด์์๋์ง ํ์ธ(ping, heart beat..) ํ๋ ์ฝ๋์ ์ ์ฉ
๋ณํ ์ฐ์ฐ์๋ ๋ง๋ค์ด์ง ๋ฐ์ดํฐ ํ๋ฆ์ ์ํ๋ ๋๋ก ๋ณํํ ์ ์๋ค.
๊ธฐ๋ณธ์ด ๋๋ ํจ์(map(), flatMap())์ ๋น๊ตํ์ฌ ์ด๋ป๊ฒ ๋ค๋ฅธ์ง ๊ทธ ์ฐจ์ด์ ์ ๊ธฐ์ตํ๋๊ฒ ์ข์. ์ค๋ฌด์ ๋์์ด๋จ
- flatMap() ๊ณผ ๋น์ท
- flatMap() ์ ๋จผ์ ๋ค์ด์จ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ์ค์ ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ฉด ๋์ค์ ๋ค์ด์จ ๋ฐ์ดํฐ์ ์ฒ๋ฆฌ๊ฒฐ๊ณผ๊ฐ ๋จผ์ ์ถ๋ ฅ ๋ ์๋ ์๋ค. ==> ์ธํฐ๋ฆฌ๋น(๋ผ์ด๋ค๊ธฐ) ๋ผ๊ณ ํจ.
- But, concatMap()์ ๋จผ์ ๋ค์ด์จ ๋ฐ์ดํฐ ์์๋๋ก ์ฒ๋ฆฌํด์ ๊ฒฐ๊ณผ๋ฅผ ๋
- ๊ณ์ฐ ์๋๋ flatMap()์ด ํจ์ฌ ๋ฐ๋ฅด๋ค
- ์ด์ ) ์ธํฐ๋ฆฌ๋น์ ํ์ฉํ๊ธฐ ๋๋ฌธ
- concatMap() ํจ์์ ์์๋ฅผ ๋ณด์ฅํด์ฃผ๋ ค๋ฉด ์ถ๊ฐ ์๊ฐ์ด ํ์ํจ.
-
concatMap() ํจ์๊ฐ ์ธํฐ๋ฆฌ๋น์ด ๋ฐ์ํ ์ ์๋ ์ํฉ์์ ๋์์ ์์๋ฅผ ๋ณด์ฅํด์ค๋ค๋ฉด,
switchMap() ํจ์๋ ์์๋ฅผ ๋ณด์ฅํ๊ธฐ ์ํด ๊ธฐ์กด์ ์งํ ์ค์ด๋ ์์ ์ ๋ฐ๋ก ์ค๋จ
-
์ฌ๋ฌ ๊ฐ์ ๊ฐ์ด ๋ฐํ๋์์๋, ๋ง์ง๋ง์ ๋ค์ด์จ ๊ฐ๋ง ์ฒ๋ฆฌํ๊ณ ์ถ์ ๋ ์ฌ์ฉ
- ์ค๊ฐ์ ๋๊ธฐ๋๋ผ๋ ๋ง์ง๋ง ๋ฐ์ดํฐ์ ์ฒ๋ฆฌ๋ ๋ณด์ฅํ๊ธฐ ๋๋ฌธ.
- ์ด๋ค ๊ธฐ์ค(keySelector ์ธ์)์ผ๋ก ๋จ์ผ Observable์ ์ฌ๋ฌ ๊ฐ๋ก ์ด๋ฃจ์ด์ง Observable ๊ทธ๋ฃน(GroupedObservable)์ผ๋ก ๋ง๋ฌ.
- ์ด๋ค ๊ธฐ์ค์ผ๋ก Observable ๊ฐ๊ฐ์ ์ฌ๋ฌ ๊ฐ Observable์ ๊ทธ๋ฃน์ผ๋ก ๊ตฌ๋ถํ๋ค๊ณ ์๊ฐํ๋ฉด ๋จ
- map() ํจ์ - 1๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฅธ ๊ฐ์ด๋ ๋ค๋ฅธ ํ์ ์ผ๋ก ๋ณํ
- flatMap() ํจ์ - 1๊ฐ์ ๊ฐ์ ๋ฐ์์ ์ฌ๋ฌ ๊ฐ์ ๋ฐ์ดํฐ(Observable)๋ก ํ์ฅํด์ค.
- groupBy() - ๊ฐ๋ค์ ๋ฐ์์ ์ด๋ค ๊ธฐ์ค์ ๋ง๋ ์๋ก์ด Observable ๋ค์ ์์ฑ
- ์คํํ ๋๋ง๋ค ์ ๋ ฅ๊ฐ์ ๋ง๋ ์ค๊ฐ ๊ฒฐ๊ณผ ๋ฐ ์ต์ข ๊ฒฐ๊ณผ๋ฅผ ๊ตฌ๋ ์์๊ฒ ๋ฐํ
- reduce() ํจ์์ ๋น์ท
- reduce() ๋ Observable์์ ๋ชจ๋ ๋ฐ์ดํฐ๊ฐ ์ ๋ ฅ๋ ํ ๊ทธ๊ฒ์ ์ข ํฉํ์ฌ ๋ง์ง๋ง 1๊ฐ์ ๋ฐ์ดํฐ๋ง์ ๊ตฌ๋ ์์๊ฒ ๋ฐํํจ.
- ์ฆ, ๋ค๋ฅธ ์ ์ ๋ง์ง๋ง 1๊ฐ or ์ค๊ฐ๊ฒฐ๊ณผ๊น์ง ๊ณ์ ๋ฐํํ๋
๊ฒฐํฉ ์ฐ์ฐ์๋ ์ฌ๋ฌ๊ฐ์ Observable์ ์กฐํฉํ์ฌ ํ์ฉํ๋ ์ฐ์ฐ์์ด๋ค.
๋ค์์ Observable์ ํ๋๋ก ํฉํ๋ ๋ฐฉ๋ฒ์ ์ ๊ณตํ๋ค.
- ๊ฐ๊ฐ์ Observable์ ๋ชจ๋ ํ์ฉํด 2๊ฐ ํน์ ๊ทธ ์ด์์ Observable์ ๊ฒฐํฉ
- ์) A,B ๋๊ฐ์ Observable์ ๊ฒฐํฉํ๋ค๋ฉด 2๊ฐ์ Observable์์ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํด์ผ ๊ฒฐํฉ ๊ฐ๋ฅ. ๊ทธ์ ๊น์ง๋ ๋ฐํ์ ๊ธฐ๋ค๋ฆผ
- 2๊ฐ ์ด์์ Observable์ ๊ธฐ๋ฐ์ผ๋ก Observable ๊ฐ๊ฐ์ ๊ฐ์ด ๋ณ๊ฒฝ๋์์ ๋ ๊ฐฑ์ ํด์ฃผ๋ ํจ์.
- ๋ Observable ๋ชจ๋ ๊ฐ์ ๋ฐํํ๋ฉด ๊ทธ๋๋ ๊ฒฐ๊ด๊ฐ์ด ๋์ด
- ๊ทธ ๋ค์๋ถํฐ๋ ๋ ์ค์ ์ด๋ค ๊ฒ์ด ๊ฐฑ์ ๋๋์ง ์ต์ ๊ฒฐ๊ด๊ฐ์ ๋ณด์ฌ์ค(์ด ๋ถ๋ถ์ด zip()ํจ์์ ๋ค๋ฆ)
- ๊ฐ์ฅ ๋จ์ํ ๊ฒฐํฉํจ์ (zip, combineLatest์ ๋น๊ตํ๋ฉด)
- ์ ๋ ฅ Observable์ ์์์ ๋ชจ๋ Observable์ด ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ๋์ง ๋ฑ์ ๊ด์ฌํ์ง ์๊ณ ์ด๋ ๊ฒ์ด๋ ์ ์คํธ๋ฆผ์์ ๋จผ์ ์ ๋ ฅ๋๋ ๋ฐ์ดํฐ๋ฅผ ๊ทธ๋๋ก ๋ฐํํจ.
- 2๊ฐ ์ด์์ Observable์ ์ด์ด ๋ถ์ฌ์ฃผ๋ ํจ์
- ์ฒซ๋ฒ์งธ Observable์ onComplete ์ด๋ฒคํธ๊ฐ ๋ฐ์ํด์ผ ๋ ๋ฒ์งธ Observable์ ๊ตฌ๋
ํจ. (์ค๋ ๋๋ฅผ ํ์ฉํ ์ผ๋ฐ์ ์ธ ์ฝ๋๋ก ์ด์ ๊ฐ์ ๋ด์ฉ์ ๊ตฌํํ๋ ค๋ฉด ๋ง๋ง์น ์์ ๊ฒ)
- ์ฒซ๋ฒ์งธ Observabl์ onComplete ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ง ์์ผ๋ฉด, ๋๋ฒ ์งธ Observable์ ์์ํ ๋๊ธฐ
- ์ด๋ ์ ์ฌ์ ์ธ ๋ฉ๋ชจ๋ฆฌ ๋์(memory leak)์ ์ํ์ ๋ดํฌํจ.
- ๋ฐ๋ผ์, ์ ๋ ฅ Observable์ด ๋ฐ๋์ ์๋ฃ(onComplete ์ด๋ฒคํธ)๋ ์ ์๊ฒ ํด์ผํจ.
- ์ฒซ๋ฒ์งธ Observabl์ onComplete ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์ง ์์ผ๋ฉด, ๋๋ฒ ์งธ Observable์ ์์ํ ๋๊ธฐ
- ๊ฒฐํฉํ ์ ์๋ Observable์ ์ต๋ 4๊ฐ
- concat() ํจ์๋ฅผ ํ์ฉํ ๋๋ onComplete ์ด๋ฒคํธ์ ๋ฐ์ ์ฌ๋ถ ํ์ธ์ด ์ค์.
์กฐ๊ฑด ์ฐ์ฐ์๋ Observable์ ํ๋ฆ์ ์ ์ดํ๋ ์ญํ ์ ํ๋ค.
ํํฐ ์ฐ์ฐ์๋ ๋ฐํ๋ ๊ฐ์ ์ฑํํ๋๋ ๊ธฐ๊ฐํ๋๋ ์ฌ๋ถ์ ์ด์ ์ ๋ง์ถ๋ค๋ฉด, ์กฐ๊ฑด ์ฐ์ฐ์๋ ์ง๊ธ๊น์ง์ ํ๋ฆ์ ์ด๋ป๊ฒ ์ ์ดํ ๊ฒ์ธ์ง์ ์ด์ ์ ๋ง์ถ๋ค.
- ๋ ์ค ์ด๋ ๊ฒ์ด๋ ๋จผ์ ๋์ค๋ Observable์ ์ฑํํจ.
- ambiguous(๋ชจํธํ) ๋ผ๋ ์์ด ๋จ์ด์ ์ค์๋ง
- ์ฌ๋ฌ ๊ฐ์ Observable ์ค์์ 1๊ฐ์ Observable์ ์ ํํ๋๋ฐ,
- ์ ํ ๊ธฐ์ค์ ๊ฐ์ฅ ๋จผ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ๋ Observable.
- ์ดํ ๋๋จธ์ง Observable์์ ๋ฐํํ๋ ๋ฐ์ดํฐ๋ ๋ชจ๋ ๋ฌด์ํ๋ค.
- take() ํจ์์ ์กฐ๊ฑด์ ์ค์
- ์ธ์๋ก ๋ฐ์ Observable์์ ์ด๋ค ๊ฐ์ ๋ฐํํ๋ฉด, ํ์ฌ Observable์ ๋ฐ์ดํฐ ๋ฐํ์ ์ค๋จํ๊ณ ์ฆ์ ์๋ฃ(onComplete ์ด๋ฒคํธ ๋ฐ์)
- ์ฆ, take() ์ฒ๋ผ ์ผ์ ๊ฐ์๋ง ๊ฐ์ ๋ฐํํ๋, ์๋ฃ ๊ธฐ์ค์ ๋ค๋ฅธ Observable์์ ๊ฐ์ ๋ฐํํ๋์ง๋ก ํ๋จํ๋ ๊ฒ.
takeUntil (ObservableSource<U> other)
- ์ธ์๋ก ๊ฐ์ ๋ฐํํ ์ ์๋ other Observable์ด ํ์ํ๋ค.
- takeUntil() ๊ณผ ์ ๋ฐ๋ ํจ์
- other Observable ์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐํํ ๋๊น์ง ๊ฐ์ ๊ฑด๋ ๋.
- other Observable ์์ ๊ฐ์ ๋ฐํํ๋ ์๊ฐ๋ถํฐ ์๋ Observable์์ ๊ฐ์ ์ ์์ ์ผ๋ก ๋ฐํํ๊ธฐ ์์
- ์ฃผ์ด์ง ์กฐ๊ฑด์ 100% ๋ง์ ๋๋ง true ๊ฐ์ ๋ฐํ
- ์กฐ๊ฑด์ ๋ง์ง ์์ผ๋ฉด ๋ฐ๋ก false ๊ฐ์ ๋ฐํ
Single<Boolean> all (Predicate<? super T> predicate)
-
predicate ์ธ์๋ filter() ํจ์์ ์ธ์์ ๋์ผ
-
์ฃผ์ด์ง ๋๋ค๊ฐ true์ธ์ง false ์ธ์ง ํ์ ํด์ค.
- RxJava์๋ ์ฌ๋ฌ๊ฐ์ง ํ์ฅ ๋ชจ๋์ด ์กด์ฌํ๋ค. (RxAndroid, RxNetty, RxApacheHttp๋ฑ)
- RxJava1 => ์ํํจ์ ๋ชจ์ RxJavaMath ๊ฐ ์์.
- RxJava2 => RxJavaMath ์ง์์๋จ. โ> ๋ค๋ฅธ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉํด์ผ ํจ.
- RxJava2์ ํต์ฌ ์ปค๋ฏธํฐ์ธ ๋ฐ์ด๋น ์นด๋ฅด๋ น์ด ๋ง๋ RxJava2Extensions ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉ
- Single< Long> count()
- Observable์์ ๋ฐํํ ๋ฐ์ดํฐ์ ๊ฐ์๋ฅผ ๋ฐํ
- ๊ฒฐ๊ณผ๊ฐ 1๊ฐ ๊ฐ์ด๋ฏ๋ก Single< Long>์ ๋ฐํ
Integer[] data = {1,2,3,4,7};
Single<Long> source = Observable.fromArray(data)
.count();
source.subscribe(count -> Log.i("count >> " + count));
// [์ถ๋ ฅ] count >> 5
- Flowable max(Publisher source)
- Flowable min(Publisher source)
Integer[] data = {1,2,3,4,7};
Floawable.fromArray(data)
.to(MathFlowable::max)
.subscribe(max -> Log.i("max >> " + max));
// [์ถ๋ ฅ] max >> 7
- sum()๊ณผ average()๋ ๊ฐ๊ฐ ์๋์ฒ๋ผ sumInt(), averageDouble() ์ฒ๋ผ ์ธ์ ํ์ ์ด ํจ์ ์ด๋ฆ์ ๊ทธ๋๋ก ๋ฐ์๋์ด ์์. (์ํ๋ ํ์ ๊ณจ๋ผ ์ฌ์ฉ)
- Flowable sumInt(Publisher source)
- Flowable averageDouble(Publisher<? extends Number> source)
Integer[] data = {1,2,3,4,7};
Flowable<Integer> flowable = Flowable.fromArray(data)
.to(MathFlowable::sumInt);
flowable.subscribe(sum -> Lob.i("sum >> " + sum));
// [์ถ๋ ฅ] sum >> 17
- ๋จ์ํ๊ฒ ์ธ์๋ก ์ ๋ฌ๋ฐ๋ time๊ณผ ์๊ฐ๋จ์(TimeUnit)๋งํผ ์
๋ ฅ๋ฐ์ Observable์ ๋ฐ์ดํฐ ๋ฐํ์ ์ง์ฐ์์ผ์ฃผ๋ ์ญํ
- Observable delay (long delay, TimeUnit unit)
- Interval()๊ณผ ๋ง์ฐฌ๊ฐ์ง๋ก ๊ณ์ฐ ์ค์ผ์ค๋ฌ์์ ์คํ.
- ์ด๋ค ๊ฐ์ ๋ฐํํ์ ๋ ์ด์ ๊ฐ์ ๋ฐํํ ์ดํ ์ผ๋ง๋ ์๊ฐ์ด ํ๋ ๋์ง ์๋ ค์ค๋ค.
์ค์ผ์ค๋ฌ๋ RxJava์ ํต์ฌ ์์๋ก Observable ๋งํผ ์ค์ํ๋ค.
-
์ค์ผ์ค๋ฌ๋ ์ค๋ ๋๋ฅผ ์ง์ ํ ์ ์๊ฒ ํด์ค.
-
์ง๊ธ๊น์ง ์๋ฐ๋ก ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ํ ๋ ์๋ฐ ์ค๋ ๋๋ฅผ ๋ง๋ค๋ฉด์ ๊ฒฝ์ ์กฐ๊ฑด์ด๋ synchronized ํค์๋๋ฅผ ์๊ฐํ๋ค๋ฉด ์ค์ผ์ค๋ฌ์ ์ฝ๋๋ฅผ ๋ณด๊ณ ๋๋ ๊ฒ!! (๊ฐ๊ฒฐํ ์ฝ๋๋ก ๋ค์ ํ์!!)
-
subscribeOn() ํจ์์ onserveOn() ํจ์๋ฅผ ๋ชจ๋ ์ง์ ํ๋ฉด, [Observable์์ ๋ฐ์ดํฐ ํ๋ฆ์ด ๋ฐํํ๋ ์ค๋ ๋]์ [์ฒ๋ฆฌ๋ ๊ฒฐ๊ณผ๋ฅผ ๊ตฌ๋ ์์๊ฒ ๋ฐํํ๋ ์ค๋ ๋]๋ฅผ ๋ถ๋ฆฌํ ์ ์๋ค.
-
subscribeOn() ํจ์๋ง ํธ์ถํ๋ฉด Observable์ ๋ชจ๋ ํ๋ฆ์ด ๋์ผํ ์ค๋ ๋์์ ์คํ๋จ
-
์ค์ผ์ค๋ฌ๋ฅผ ๋ณ๋๋ก ์ง์ ํ์ง ์์ผ๋ฉด ํ์ฌ(main) ์ค๋ ๋์์ ๋์์ ์คํ
- RxJava๋ ๋ค์ํ ์ค์ผ์ค๋ฌ๋ฅผ ์ ๊ณต.
- ์ํฉ์ ๋ง๊ฒ ์ด๋ค ์ค์ผ์ค๋ฌ๋ฅผ ์ฌ์ฉํด์ผ ํ๋์ง
- RxJava ๋ฉ์ง ์ : ํน์ ์ค์ผ์ค๋ฌ๋ฅผ ์ฌ์ฉํ๋ค๊ฐ ๋ค๋ฅธ ์ค์ผ์ค๋ฌ๋ก ๋ณ๊ฒฝํ๊ธฐ ์ฌ์
- ๋ง์น map() ํจ์๋ฅผ ํ๋ฒ ๋ ํธ์ถํ๋ ๊ฒ์ฒ๋ผ ์๋กญ๊ฒ ์ค์ผ์ค๋ฌ๋ฅผ ์ถ๊ฐํ๊ฑฐ๋ ๊ธฐ์กด์ ์ค์ผ์ค๋ฌ๋ฅผ ๋ค๋ฅธ ๊ฒ์ผ๋ก ๊ต์ฒดํ ์ ์๋ค.
- ์ด๋ฆ์ฒ๋ผ ์๋ก์ด ์ค๋ ๋ ์์ฑ
- ์๋ก์ด ์ค๋ ๋๋ฅผ ๋ง๋ค์ด ์ด๋ค ๋์์ ์คํํ๊ณ ์ถ์ ๋
Schedulers.newThread()
๋ฅผ ์ธ์๋ก ๋ฃ์ด์ฃผ๋ฉด ๋จ.- ๊ทธ๋ผ ๋ด ์ค๋ ๋ ์ค์ผ์ค๋ฌ๋ ์์ฒญ์ ๋ฐ์ ๋๋ง๋ค ์๋ก์ด ์ค๋ ๋ ์์ฑ
- ์๋ก์ด ์ค๋ ๋๋ฅผ ์์ฑํ์ฌ ๋ด๊ฐ ์ํ๋ ๋์์ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ
- ํ์ง๋ง, ์ ๊ทน์ ์ผ๋ก ์ถ์ฒํ๋ ๋ฐฉ๋ฒ์ ์๋.
- RxJava์๋ ์ด๊ฑฐ๋ณด๋ค ํ์ฉ๋๊ฐ ๋์ ๊ณ์ฐ ์ค์ผ์ค๋ฌ์ IO ์ค์ผ์ค๋ฌ์ ๊ฐ์ ๋ค๋ฅธ ์ค์ผ์ค๋ฌ๋ฅผ ์ ๊ณตํ๊ธฐ ๋๋ฌธ.
- ํ์ง๋ง, ์ ๊ทน์ ์ผ๋ก ์ถ์ฒํ๋ ๋ฐฉ๋ฒ์ ์๋.
- 1.๊ณ์ฐ(Computation) ์ค์ผ์ค๋ฌ / 2. IO ์ค์ผ์ค๋ฌ / 3. ํธ๋จํ๋ฆฐ ์ค์ผ์ค๋ฌ
- ๋ด ์ค๋ ๋ ์ค์ผ์ค๋ฌ๋ ๋ค๋ฅธ ์ค์ผ์ค๋ฌ๋ ํน์ํ ์ํฉ์์ ์ ์ฉํ๊ธธ ๊ถ์ฅ
-
CPU์ ๋์ํ๋ ๊ณ์ฐ์ฉ ์ค์ผ์ค๋ฌ
-
'๊ณ์ฐ' ์์ ์ ํ ๋๋ ๋๊ธฐ ์๊ฐ ์์ด ๋น ๋ฅด๊ฒ ๊ฒฐ๊ณผ๋ฅผ ๋์ถํ๋ ๊ฒ์ด ์ค์.
- ๊ณ์ฐ ์์ ์ด ์ด๋ ต๊ฒ ๋๊ปด์ง๋ค๋ฉด => ์ ์ถ๋ ฅ(I/O) ์์ ์ ํ์ง ์๋ ์ค์ผ์ค๋ฌ๋ผ๊ณ ์๊ฐํ๋ฉด ๋จ.
-
๋ด๋ถ์ ์ผ๋ก ์ค๋ ๋ ํ์ ์์ฑ
.subscribeOn (Schedulers.computatioin())
- ๊ณ์ฐ ์ค์ผ์ค๋ฌ์๋ ๋ค๋ฅด๊ฒ ๋คํธ์ํฌ์์ ์์ฒญ์ ์ฒ๋ฆฌํ๊ฑฐ๋ ๊ฐ์ข
์
,์ถ๋ ฅ ์์
์ ์คํํ๊ธฐ ์ํ ์ค์ผ์ค๋ฌ
- ๊ณ์ฐ ์ค์ผ์ค๋ฌ์ ๋ค๋ฅธ ์ : ๊ธฐ๋ณธ์ผ๋ก ์์ฑ๋๋ ์ค๋ ๋ ๊ฐ์๊ฐ ๋ค๋ฅด๋ค๋ ๊ฒ
- ์ฆ, ๊ณ์ฐ ์ค์ผ์ค๋ฌ๋ CPU ๊ฐ์๋งํผ ์ค๋ ๋๋ฅผ ์์ฑํ์ง๋ง,
- IO ์ค์ผ์ค๋ฌ๋ ํ์ํ ๋๋ง๋ค ์ค๋ ๋๋ฅผ ๊ณ์ ์์ฑํจ.
- ๊ณ์ฐ ์ค์ผ์ค๋ฌ => ์ผ๋ฐ์ ์ธ ๊ณ์ฐ ์์
- IO ์ค์ผ์ค๋ฌ => ๋คํธ์ํฌ์์ ์์ฒญ, ํ์ผ ์ ์ถ๋ ฅ, DB ์ฟผ๋ฆฌ ๋ฑ
- ๊ณ์ฐ ์ค์ผ์ค๋ฌ์ ๋ค๋ฅธ ์ : ๊ธฐ๋ณธ์ผ๋ก ์์ฑ๋๋ ์ค๋ ๋ ๊ฐ์๊ฐ ๋ค๋ฅด๋ค๋ ๊ฒ
.subscribeOn (Schedulers.io())
String root ="C:\\";
File[] files = new File(root).listFiles();
Observable<String> source = Observable.fromArray(files)
.filter(f -> !f.isDirectory)
.map(f -> f.getAbsolutePath())
.subscribeOn(Schedulers.io());
source.subscribe(Log::i);
- source๋ณ์๋ File[] ๋ฐฐ์ด์ ์๋ ํ์ผ์ ์ ๋ ๊ฒฝ๋ก๋ฅผ ๋ฐํํ๋ Observable => IO ์ค์ผ์ค๋ฌ์์ ์คํ๋จ
- ์๋ก์ด ์ค๋ ๋๋ฅผ ์์ฑํ์ง ์๊ณ ํ์ฌ ์ค๋ ๋์ ๋ฌดํํ ํฌ๊ธฐ์ ๋๊ธฐ ํ๋ ฌ(Queue)๋ฅผ ์์ฑํ๋ ์ค์ผ์ค๋ฌ
- ์๋ก์ด ์ค๋ ๋ ์์ฑํ์ง ์๋ ๊ฒ๊ณผ ๋๊ธฐ ํ๋ ฌ์ ์๋์ผ๋ก ๋ง๋ค์ด ์ค๋ค๋ ๊ฒ์ด ๋ด ์ค๋ ๋, ๊ณ์ฐ, IO ์ค์ผ์ค๋ฌ์ ๋ค๋ฅด๋ค.
- ์์ฑ๋ฐฉ๋ฒ :
.subscribeOn (Schedulers.trampoline())
- RxJava ๋ด๋ถ์์ ๋จ์ผ ์ค๋ ๋๋ฅผ ๋ณ๋๋ก ์์ฑํ์ฌ ๊ตฌ๋ (subscribe) ์์ ์ ์ฒ๋ฆฌ.
- But, ๋ฆฌ์กํฐ๋ธ ํ๋ก๊ทธ๋๋ฐ์ด ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ์ ์งํฅํ๊ธฐ ๋๋ฌธ์ ์ฑ๊ธ ์ค๋ ๋ ์ค์ผ์ค๋ฌ๋ฅผ ํ์ฉํ ํ๋ฅ ์ ๋ฎ๋ค.
- ์์ฑ๋ฐฉ๋ฒ :
.subscribeOn (Schedulers.single())
- ์๋ฐ์์๋ java.util.current ํจํค์ง์์ ์ ๊ณตํ๋ ์คํ์(Executor)๋ฅผ ๋ณํํ์ฌ ์ค์ผ์ค๋ฌ๋ฅผ ์์ฑํ ์ ์์.
- ํ์ง๋ง Executor ํด๋์ค์ ์ค์ผ์ค๋ฌ์ ๋์ ๋ฐฉ์์ด ๋ค๋ฅด๋ฏ๋ก ์ถ์ฒ๋ฐฉ๋ฒ์ ์๋
- ๊ธฐ์กด์ ์ฌ์ฉํ๋ Executor ํด๋์ค๋ฅผ ์ฌ์ฌ์ฉํ ๋๋ฌธ ํ์ ์ ์ผ๋ก ํ์ฉํ๋ค.
- ์์ฑ๋ฐฉ๋ฒ :
.subscribeOn (Schedulers.from(executor))
RxJava ์ค์ผ์ค๋ฌ์ ํต์ฌ์ ๊ฒฐ๊ตญ ์ ๊ณต๋๋ ์ค์ผ์ค๋ฌ์ ์ข ๋ฅ๋ฅผ ์ ํํ ํ subscribeOn() ๊ณผ observeOn() ํจ์๋ฅผ ํธ์ถํ๋ ๊ฒ์ด๋ค.
subscribeOn() : Observable์์ ๊ตฌ๋ ์๊ฐ subscribe() ํจ์๋ฅผ ํธ์ถํ์ ๋, ๋ฐ์ดํฐ ํ๋ฆ์ ๋ฐํํ๋ ์ค๋ ๋๋ฅผ ์ง์
- ์ฒ์ ์ง์ ํ ์ค๋ ๋๋ฅผ ๊ณ ์ ์ํค๋ฏ๋ก ๋ค์ subscribeOn() ํจ์๋ฅผ ํธ์ถํด๋ ๋ฌด์. (ํ์ง๋ง, observeOn() ํจ์๋ ๋ค๋ฆ)
observeOn() : ์ฒ๋ฆฌ๋ ๊ฒฐ๊ณผ๋ฅผ ๊ตฌ๋ ์์๊ฒ ์ ๋ฌํ๋ ์ค๋ ๋ ์ง์ .
- ์ฌ๋ฌ๋ฒ ํธ์ถํ ์ ์์ผ๋ฉฐ, ํธ์ถ๋๋ฉด ๊ทธ ๋ค์๋ถํฐ ๋์ํ๋ ์ค๋ ๋๋ฅผ ๋ฐ๊ฟ ์ ์๋ค.
- ์ ํต์ ์ธ ์ค๋ ๋ ํ๋ก๊ทธ๋๋ฐ์์๋ ์ผ์ผ์ด ์ค๋ ๋๋ฅผ ๋ง๋ค์ด์ผ ํ๊ณ ์ค๋ ๋๊ฐ ๋์ด๋ ๋๋ง๋ค ๋๊ธฐํํ๋ ๊ฒ์ด ๋งค์ฐ ๋ถ๋ด์ค๋ฝ๊ธฐ ๋๋ฌธ์ ์ด๋ฌํ ๋ก์ง์ ๊ตฌํํ๋ ๊ฒ์ด ๋งค์ฐ ํ๋ฌ.
- ํ์ง๋ง, observeOn() ํจ์๋ ์ค๋ ๋ ๋ณ๊ฒฝ์ด ์ฌ์ฐ๋ฏ๋ก ํ์ฉํ ์ ์๋ ๋ฒ์๊ฐ ๋์
์ด๋ฒ ๋ฐํ๋ฅผ ์ค๋นํ๋ฉด์ RxJava์ ๋ง์ ๊ฒ์ ์๊ฒ ๋์๋ค. ์์ญ๊ฐ์ ์ฐ์ฐ์์, Observable, Single, Maybe์ ์ค์ผ์ค๋ฌ ๋ฑ ๋ง์ ๊ฐ๋ ๋ค์ ๊ณต๋ถํ์๋ค. RxJava์ ์ ๋ฐ์ ์ธ ์ด๋ก ๋ด์ฉ์ ๊ณต๋ถํ ๊ฒ ๊ฐ์ ์ด์ ๋ ์ค์ ๋ก ์ฝ๋๋ฅผ ์ง๋ณด๋ฉด์ ํ์คํ ์ดํดํ ๊ฒ์ด๋ค. ์ง์ ์ฝ๋์ Rx๋ฅผ ์ ์ฉํด ๊ธด์ฝ๋๊ฐ ์งง์์ง๋ ๋งค์ง~๊ณผ ๋น๋๊ธฐ ํ๋ก๊ทธ๋๋ฐ, ํนํ Rerofit์ ์ด์ฉํ ์๋ฒํต์ ์ Rx๋ฅผ ์ ์ฉ ํด๋ณด๊ณ ์ถ๋ค.
- RxJava ํ๋ก๊ทธ๋๋ฐ (์ ๋ํ, ๋ฐ์ ์ค ์ง์)