RxJSのオペレーターの動きをデモアプリを自作して確認してみた
はじめに
こんにちは、SHIFT の開発部門に所属している Katayama です。
RxJSは例えば、フロントエンドにおいてテキスト入力をして検索を行うようなコンポーネントを開発する際に利用すると、バックエンドへの問い合わせ(APIの実行回数)が最適化できたりし便利である。抽象化して表現すると、RxJSはpub/subモデルであり、非同期的な変更(データ)を継続的に受信でき、そのデータに対し操作を加えたりできたりします。
ただこのRxJS、そのオペレーターは文章を読んでもわかりにくい部分があり、実際に手元で動かして視覚的に確認できると良いなと思っていました。そこで今回、簡易的にデモアプリを作成し、RxJS のオペレーターの動きを確認してみました。
RxJS とは?
こちらは公式や以下のような他の記事に書かれている内容なので本記事では割愛する。
・Angular Observable と RxJS
・RxJS の基礎中の基礎
・RxJS とは
RxJS のオペレーターの動きを見てみる
今回は以下の動画のように、RxJS の Observable の実行で流れるデータを可視化するデモアプリを実装してみた。このデモアプリを動かしながら RxJS のオペレーターについて視覚的にその挙動を見ていく。
以下では実際にデモアプリを動かしながら、RxJS のオペレーターのそれぞれの動きについて見ていきたいと思う。
※ソースコードについては「おまけ」の「デモアプリのソースコードについて」を参照。
of()
"of()"は、その引数で受け取った値(データ)をそのままストリームに流すためのオペレーター。Observable を作成できるので、Creation Operatorsに分類される。
※上記の動画では〇が重なっているので分かりにくいが、3 つのデータがストリームに流れている(ログの部分に 3 つ値が出力されている事からそれは分かる)。
interval()
"interval()"は、指定時間ごとに連番でデータをストリームに流すためのオペレーター。"of()"と同じ Observable を作成する Creation Operators。 動画では 1000 ミリ秒を指定しているので、1 秒ごとに 1000 になるまでカウントアップして数字がストリームに流れていく事になる(全部だと動画が長くなるので動画は途中で中断している)。
merge()
"merge()"は、複数の Observable を 1 本にまとめる(1 本のストリームに流す)ためのオペレーター。これは単に Observable を作成するのではなく、複数の Observable をまとめて新しい Observable を作成するのでJoin Creation Operatorsに分類される。
動きとしては動画の通りで、2 つの Observable が 1 本にまとまってストリームにデータが流れるので、各 Observable からのデータがミックスされる。動画上では〇の色を青色とグレー色で分けて視覚的に分かりやすくしている。
map()
"map()"は、ストリームに流れるデータを整形するためのオペレーター。データの整形をするのでTransformation Operatorsに分類される。
動画では、入力 hoge をストリームに流しているが、map()により「hoge」が「maped!: hoge」に変わっている事が分かると思う。
switchMap()
"switchMap()"は、ストリーム A に流れるデータを元に、別のストリーム B を流すためのオペレーター。これも Transformation Operators の 1 つ。ただし、特徴的な動きとして別のストリーム B が流れ終わる前に、元のストリーム A にデータが流れると、switchMap で作成された別のストリーム B は破棄され、また最初からストリーム B が流れるような動きになる。
実際に動画を見ると分かるが、送信をクリックする事をでストリームにデータが流れるはずが直には流れず、1500 ミリ秒経過後に初めてストリームにデータが流れている。これは switchMap によりストリーム A として流された送信クリック時のデータ(「ほげ」)が、interval()により作成された別のストリーム B に置き換わったためこういう動きになっている。 また、switchMap で新しく作成されたストリーム B が流れ終わる前に再度送信をクリックすると、途中で switchMap で作成されたストリーム B は破棄され、また最初からストリーム B が流れている事もわかる(ログの出力を見てみると、「2: switchMaped: 1 => ほげ」の次は「3: switchMaped: 2 => ほげ」ではなく、「3: switchMaped: 0 => ほげ」になっている)。
filter()
"filter()"は、条件に合致するデータのみストリームに流すためのオペレーター。ストリームに流すデータをフィルターするのでFiltering Operatorsに分類される。
動画は「test」に合致するデータのみストリームに流れるような実装をした場合の動きになっている。
throttleTime()
"throttleTime()"は、指定時間だけ連続するデータを無視する(指定時間だけデータを間引きストリームにデータを流す)ためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。
動画では、送信を連続でクリックしてストリームにデータを流そうとしているが、1000 ミリ秒ごとでしかデータが流れていない事が分かると思う。
debounceTime()
"debounceTime()"は、連続するデータの最後のデータを、指定時間経過後に 1 度だけストリームに流すためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。
動画では、連続して送信をクリックするのをやめると、クリックをやめた後 250 ミリ秒後にストリームにデータが流れている事が分かると思う。
distinctUntilChanged()
"distinctUntilChanged()"は、ストリームに流すデータの値が前回と異なるデータのみストリームに流すためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。
動画では、送信を連続でクリックしても、入力している値が同じであればストリームにデータが流れていない事が分かると思う。
skip()
"skip()"は、指定回数のデータをストリームに流さずにスキップするためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。
動画では送信を 3 回クリックしてもストリームにデータが流れず、4 回目以降のクリックでデータが流れている事が分かると思う。
take()
"take()"は、指定回数のみデータをストリームに流すためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。skip()の逆バージョン。
動画では送信のクリック 3 回までであればストリームにデータが流れているが、4 回目以降だとデータが流れなくなっている事が分かると思う。
まとめとして
今回は Rxjs のオペレーターの動きを視覚的に確認するためにデモアプリを実装して、実際にオペレーターの動きを理解するという事をやってみた。
フロントエンドで入力して検索を行うようなコンポーネントを開発する時に RxJS を使うと、API の実行回数が最適化できたり RxJS は便利だが、概念として分かりにくい部分もあり、導入を渋る場合もあると思う。そんな時には今回のように実際に視覚的に動きを確認できるようにする事で、概念の理解を深める事ができるのではないかと感じた。
※以下の「おまけ」にソースコード全体(ローカルに index.html を作成しそこにコピペし Chrome で開けば実行できるもの)を載せていますので、RxJS のオペレーターの動きを手元で確認する事もできます。
おまけ
デモアプリのソースコードについて
面倒だったので全部 CDN を使ってライブラリを読み込む形をとっている。また、デモアプリという事で少し雑に作っている部分もある。
以下のhtmlをローカルのindex.htmlのようなファイルにコピペし、Chromeで開けば本記事でRxJSのオペレーターの動きを確認していたでもアプリが実行できる(環境によっては、CDNからファイルを読み込めず動かない場合もあり得ます)。
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Understand RxJS</title>
<link
href="https://cdn.jsdelivr.net/npm/@mdi/font@6.x/css/materialdesignicons.min.css"
rel="stylesheet"
/>
<link
href="https://cdn.jsdelivr.net/npm/vuetify@2.x/dist/vuetify.min.css"
rel="stylesheet"
/>
<link
rel="stylesheet"
href="https://cdn.jsdelivr.net/gh/highlightjs/cdn-release@11.5.0/build/styles/vs2015.min.css"
/>
<style>
html,
body {
height: 100%;
}
.fade-enter {
left: -30px;
}
.slide-enter-active {
animation: slide-in 2s reverse;
}
@keyframes slide-in {
from {
transform: translateX(100%);
}
to {
transform: translateX(-30px);
}
}
.stream-line {
position: relative;
display: block;
min-height: 50px;
}
.stream-line:after {
content: "";
position: absolute;
top: calc(50%);
width: 100%;
height: 1px;
background-color: black;
}
.stream-item {
position: absolute;
list-style: none;
top: calc(50% - 20px);
left: 0;
width: 100%;
}
.log-area {
overflow: auto;
height: 150px;
}
ul {
list-style: none;
}
/* vuetifyのCSS上書き */
.theme--light.v-application code {
background-color: #1e1e1e;
color: #dcdcdc;
}
</style>
</head>
<body>
<v-app id="app">
<v-main>
<v-container>
<h1>RxJSのオペレーターを視覚的に理解する</h1>
<div class="stream-line my-5">
<transition-group name="slide" tag="ul" @after-enter="afterEnter">
<li v-for="(message, i) in messages" :key="i" class="stream-item">
<v-icon
v-if="message.toString().startsWith('a')"
color="primary"
x-large
>mdi-circle
</v-icon>
<v-icon v-else x-large>mdi-circle</v-icon>
<input type="hidden" :value="message" />
</li>
</transition-group>
</div>
<v-card class="mx-auto" max-width="344" outlined>
<v-card-text class="log-area">
<ul>
<li v-for="(log, i) in logs" :key="i">{{ log }}</li>
</ul>
</v-card-text>
<v-card-actions>
<v-spacer></v-spacer>
<v-btn @click="clear"> クリア </v-btn>
</v-card-actions>
</v-card>
<v-select
:value="selected"
@input="select"
:items="items"
item-text="text"
item-value="value"
></v-select>
<v-text-field v-model="message" outlined clearable type="text">
<template #append-outer>
<v-btn color="primary" @click="push" :disabled="disabledBtn">
送信
</v-btn>
</template>
</v-text-field>
<pre v-if="selected === 'of'" key="of">
<code class="javascript">
/** https://rxjs-dev.translate.goog/api/index/function/of */
import { of } from 'rxjs';
// 単に受け取った値(データ)をストリームに流す
const subscription = of(10, 20, 30).subscribe({
next: (next) => console.log("next:", next),
error: (err) => console.log("error:", err),
complete: () => console.log("the end"),
});
/** Outputs */
// next: 10
// next: 20
// next: 30
// the end
</code>
</pre>
<pre v-if="selected === 'interval'" key="interval">
<code class="javascript">
/** https://rxjs.dev/api/index/function/interval */
import { interval } from 'rxjs';
// 指定時間ごとに連番でデータをストリームに流す
const subscription = interval(1000).subscribe(
text => console.log('Next: ', text)
);
/** Outputs */
// Next: 0
// Next: 1
// Next: 2
// ...
</code>
</pre>
<pre v-if="selected === 'merge'" key="merge">
<code class="javascript">
/** https://rxjs.dev/api/index/function/merge */
import { merge, interval } from 'rxjs';
import { take } from 'rxjs/operators';
// 複数のObservableを1本にまとめる(1本のストリームに流す)
const subscription = merge(
interval(1200).pipe(
take(5),
map((index) => `a: ${index + 1} / 5`)
),
interval(500).pipe(
take(10),
map((index) => `b: ${index + 1} / 10`)
)
).subscribe(
text => console.log(text)
);
/** Outputs */
// 1: b: 1 / 10
// 2: b: 2 / 10
// 3: a: 1 / 5
// 4: b: 3 / 10
// ...
</code>
</pre>
<pre v-if="selected === 'map'" key="map">
<code class="javascript">
/** https://rxjs.dev/api/operators/map */
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
// ストリームに流れるデータを整形する
const subscription = of(1, 2, 3).pipe(
map((text) => `maped!: ${textx}`)
).subscribe((text) => console.log(text));
/** Outputs */
// maped!: 1
// maped!: 2
// maped!: 3
</code>
</pre>
<pre v-if="selected === 'switchMap'" key="switchMap">
<code class="javascript">
/** https://rxjs.dev/api/operators/switchMap */
import { Observable } from 'rxjs';
import { switchMap } from 'rxjs/operators';
// ストリームAに流れるデータを元に、別のストリームBを流す
// ※別のストリームBが流れ終わる前に、元のストリームAにデータが流れると、
// ストリームBは破棄され、最初からストリームBが流れる
const subscription = Observable.pipe(
switchMap((text) => {
return interval(1500).pipe(
take(3),
map((index) => `switchMaped: ${index} => ${text}`)
);
})
);
/** Outputs */
// switchMaped: 0 => hoge
// switchMaped: 1 => hoge
// switchMaped: 2 => hoge
</code>
</pre>
<pre v-if="selected === 'filter'" key="filter">
<code class="javascript">
/** https://rxjs.dev/api/operators/filter */
import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';
// 条件に合致するデータのみストリームに流す
const subscription = Observable.pipe(
filter((text) => text === "test")
).subscribe((text) => console.log(text));
/** Outputs */
// test
</code>
</pre>
<pre v-if="selected === 'throttleTime'" key="throttleTime">
<code class="javascript">
/** https://rxjs.dev/api/operators/throttleTime */
import { Observable } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
// 指定時間だけ連続するデータを無視する(指定時間だけデータを間引きストリームにデータを流す)
const subscription = Observable.pipe(throttleTime(1000)).subscribe(
(text) => console.log(text)
);
/** Outputs */
// test(1000ミリ秒経過ごとに出力される)
</code>
</pre>
<pre v-if="selected === 'debounceTime'" key="debounceTime">
<code class="javascript">
/** https://rxjs.dev/api/operators/debounceTime */
import { Observable } from 'rxjs';
import { debounceTime } from 'rxjs/operators';
// 連続するデータの最後のデータを、指定時間経過後に1度だけストリームに流す
const subscription = Observable.pipe(debounceTime(250)).subscribe(
(text) => console.log(text)
);
/** Outputs */
// test(250ミリ秒経過後1回だけ出力される)
</code>
</pre>
<pre
v-if="selected === 'distinctUntilChanged'"
key="distinctUntilChanged"
>
<code class="javascript">
/** https://rxjs.dev/api/operators/distinctUntilChanged */
import { Observable } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
// 前回と値が異なるデータのみストリームに流す
const subscription = Observable.pipe(distinctUntilChanged()).subscribe(
(text) => console.log(text)
);
/** Outputs */
// hoge
// foo
</code>
</pre>
<pre v-if="selected === 'skip'" key="skip">
<code class="javascript">
/** https://rxjs.dev/api/operators/skip */
import { Observable } from 'rxjs';
import { skip } from 'rxjs/operators';
// 指定回数のデータをストリームに流さずにスキップする
const subscription = Observable.pipe(skip(3)).subscribe(
(text) => console.log(text)
);
/** Outputs */
// test(4回送信をクリックした後に出力される)
</code>
</pre>
<pre v-if="selected === 'take'" key="take">
<code class="javascript">
/** https://rxjs.dev/api/operators/take */
import { Observable } from 'rxjs';
import { take } from 'rxjs/operators';
// 指定回数のみデータをストリームに流す
const subscription = Observable.pipe(take(3)).subscribe(
(text) => console.log(text)
);
/** Outputs */
// test
// test
// test(これ以降はログが出力されない)
</code>
</pre>
</v-container>
</v-main>
</v-app>
<script src="https://cdn.jsdelivr.net/npm/vue@2.x/dist/vue.js"></script>
<script src="https://cdn.jsdelivr.net/npm/vuetify@2.x/dist/vuetify.js"></script>
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
<script src="https://cdn.jsdelivr.net/gh/highlightjs/cdn-release@11.5.0/build/highlight.min.js"></script>
<script>
const { Subject, of, interval, merge } = rxjs;
const {
take,
map,
switchMap,
filter,
throttleTime,
debounceTime,
distinctUntilChanged,
skip,
} = rxjs.operators;
new Vue({
el: "#app",
vuetify: new Vuetify(),
data: () => ({
selected: "nothing",
items: [
{
text: "nothing(「送信」をクリックする事でストリームにデータが流れる様子をデモできます。一部disabledもあります。)",
value: "nothing",
},
{ text: "[Creation Operators] of()", value: "of" },
{ text: "[Creation Operators] interval()", value: "interval" },
{ text: "[Join Creation Operators] merge()", value: "merge" },
{ text: "[Transformation Operators] map()", value: "map" },
{
text: "[Transformation Operators] switchMap()",
value: "switchMap",
},
{
text: "[Filtering Operators] filter()",
value: "filter",
},
{
text: "[Filtering Operators] throttleTime()",
value: "throttleTime",
},
{
text: "[Filtering Operators] debounceTime()",
value: "debounceTime",
},
{
text: "[Filtering Operators] distinctUntilChanged()",
value: "distinctUntilChanged",
},
{ text: "[Filtering Operators] skip()", value: "skip" },
{ text: "[Filtering Operators] take()", value: "take" },
],
stream: null,
stream$: null,
subscription: null,
counter: 0,
logs: [],
message: null,
messages: [],
}),
computed: {
disabledBtn() {
return (
this.selected === "of" ||
this.selected === "interval" ||
this.selected === "merge"
);
},
},
created() {
this.stream = new Subject();
this.stream$ = this.stream.asObservable();
this.subscription = this.stream$.subscribe((text) =>
this.messages.push(text)
);
},
methods: {
select(value) {
this.selected = value;
setTimeout(() => {
hljs.highlightAll();
}, 50);
this.subscription.unsubscribe();
this.subscription = this[value]().subscribe((text) => {
this.messages.push(text);
});
},
push() {
this.stream.next(this.message || "not input");
},
addLog(value) {
this.logs.push(`${++this.counter}: ${value}`);
},
clear() {
this.logs.splice(0, this.logs.length);
this.messages.splice(0, this.messages.length);
this.counter = 0;
},
afterEnter(el) {
this.addLog(el.querySelector("input").value);
el.style = "display: none;";
},
nothing() {
return this.stream$;
},
// https://rxjs.dev/guide/operators#creation-operators-2
of() {
return of(10, 20, 30);
},
interval() {
return interval(1000);
},
// https://rxjs.dev/guide/operators#join-creation-operators
merge() {
return merge(
interval(1200).pipe(
take(5),
map((index) => `a: ${index + 1} / 5`)
),
interval(500).pipe(
take(10),
map((index) => `b: ${index + 1} / 10`)
)
);
},
// https://rxjs.dev/guide/operators#transformation-operators
map() {
return this.stream$.pipe(map((text) => `maped!: ${text}`));
},
switchMap() {
return this.stream$.pipe(
switchMap((text) => {
return interval(1500).pipe(
take(3),
map((index) => `switchMaped: ${index} => ${text}`)
);
})
);
},
// https://rxjs.dev/guide/operators#filtering-operators
filter() {
return this.stream$.pipe(filter((text) => text === "test"));
},
throttleTime() {
return this.stream$.pipe(throttleTime(1000));
},
debounceTime() {
return this.stream$.pipe(debounceTime(250));
},
distinctUntilChanged() {
return this.stream$.pipe(distinctUntilChanged());
},
skip() {
return this.stream$.pipe(skip(3));
},
take() {
return this.stream$.pipe(take(3));
},
},
});
</script>
</body>
</html>
《この公式ブロガーの記事一覧》
お問合せはお気軽に
https://service.shiftinc.jp/contact/
SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/
SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/
SHIFTの導入事例
https://service.shiftinc.jp/case/
お役立ち資料はこちら
https://service.shiftinc.jp/resources/
SHIFTの採用情報はこちら
https://recruit.shiftinc.jp/career/