見出し画像

RxJSのオペレーターの動きをデモアプリを自作して確認してみた

はじめに

こんにちは、SHIFT の開発部門に所属している Katayama です。

RxJSは例えば、フロントエンドにおいてテキスト入力をして検索を行うようなコンポーネントを開発する際に利用すると、バックエンドへの問い合わせ(APIの実行回数)が最適化できたりし便利である。抽象化して表現すると、RxJSはpub/subモデルであり、非同期的な変更(データ)を継続的に受信でき、そのデータに対し操作を加えたりできたりします。

ただこのRxJS、そのオペレーターは文章を読んでもわかりにくい部分があり、実際に手元で動かして視覚的に確認できると良いなと思っていました。そこで今回、簡易的にデモアプリを作成し、RxJS のオペレーターの動きを確認してみました。

RxJS とは?

こちらは公式や以下のような他の記事に書かれている内容なので本記事では割愛する。

Angular Observable と RxJS
RxJS の基礎中の基礎
RxJS とは

RxJS のオペレーターの動きを見てみる

今回は以下の動画のように、RxJS の Observable の実行で流れるデータを可視化するデモアプリを実装してみた。このデモアプリを動かしながら RxJS のオペレーターについて視覚的にその挙動を見ていく。

以下では実際にデモアプリを動かしながら、RxJS のオペレーターのそれぞれの動きについて見ていきたいと思う。

※ソースコードについては「おまけ」の「デモアプリのソースコードについて」を参照。

of()

"of()"は、その引数で受け取った値(データ)をそのままストリームに流すためのオペレーター。Observable を作成できるので、Creation Operatorsに分類される。

※上記の動画では〇が重なっているので分かりにくいが、3 つのデータがストリームに流れている(ログの部分に 3 つ値が出力されている事からそれは分かる)。

interval()

"interval()"は、指定時間ごとに連番でデータをストリームに流すためのオペレーター。"of()"と同じ Observable を作成する Creation Operators。 動画では 1000 ミリ秒を指定しているので、1 秒ごとに 1000 になるまでカウントアップして数字がストリームに流れていく事になる(全部だと動画が長くなるので動画は途中で中断している)。

merge()

"merge()"は、複数の Observable を 1 本にまとめる(1 本のストリームに流す)ためのオペレーター。これは単に Observable を作成するのではなく、複数の Observable をまとめて新しい Observable を作成するのでJoin Creation Operatorsに分類される。

動きとしては動画の通りで、2 つの Observable が 1 本にまとまってストリームにデータが流れるので、各 Observable からのデータがミックスされる。動画上では〇の色を青色とグレー色で分けて視覚的に分かりやすくしている。

map()

"map()"は、ストリームに流れるデータを整形するためのオペレーター。データの整形をするのでTransformation Operatorsに分類される。

動画では、入力 hoge をストリームに流しているが、map()により「hoge」が「maped!: hoge」に変わっている事が分かると思う。

switchMap()

"switchMap()"は、ストリーム A に流れるデータを元に、別のストリーム B を流すためのオペレーター。これも Transformation Operators の 1 つ。ただし、特徴的な動きとして別のストリーム B が流れ終わる前に、元のストリーム A にデータが流れると、switchMap で作成された別のストリーム B は破棄され、また最初からストリーム B が流れるような動きになる。

実際に動画を見ると分かるが、送信をクリックする事をでストリームにデータが流れるはずが直には流れず、1500 ミリ秒経過後に初めてストリームにデータが流れている。これは switchMap によりストリーム A として流された送信クリック時のデータ(「ほげ」)が、interval()により作成された別のストリーム B に置き換わったためこういう動きになっている。 また、switchMap で新しく作成されたストリーム B が流れ終わる前に再度送信をクリックすると、途中で switchMap で作成されたストリーム B は破棄され、また最初からストリーム B が流れている事もわかる(ログの出力を見てみると、「2: switchMaped: 1 => ほげ」の次は「3: switchMaped: 2 => ほげ」ではなく、「3: switchMaped: 0 => ほげ」になっている)。

filter()

"filter()"は、条件に合致するデータのみストリームに流すためのオペレーター。ストリームに流すデータをフィルターするのでFiltering Operatorsに分類される。

動画は「test」に合致するデータのみストリームに流れるような実装をした場合の動きになっている。

throttleTime()

"throttleTime()"は、指定時間だけ連続するデータを無視する(指定時間だけデータを間引きストリームにデータを流す)ためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。

動画では、送信を連続でクリックしてストリームにデータを流そうとしているが、1000 ミリ秒ごとでしかデータが流れていない事が分かると思う。

debounceTime()

"debounceTime()"は、連続するデータの最後のデータを、指定時間経過後に 1 度だけストリームに流すためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。

動画では、連続して送信をクリックするのをやめると、クリックをやめた後 250 ミリ秒後にストリームにデータが流れている事が分かると思う。

distinctUntilChanged()

"distinctUntilChanged()"は、ストリームに流すデータの値が前回と異なるデータのみストリームに流すためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。

動画では、送信を連続でクリックしても、入力している値が同じであればストリームにデータが流れていない事が分かると思う。

skip()

"skip()"は、指定回数のデータをストリームに流さずにスキップするためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。

動画では送信を 3 回クリックしてもストリームにデータが流れず、4 回目以降のクリックでデータが流れている事が分かると思う。

take()

"take()"は、指定回数のみデータをストリームに流すためのオペレーター。これもデータのフィルターと言えばフィルターなので Filtering Operators に分類される。skip()の逆バージョン。

動画では送信のクリック 3 回までであればストリームにデータが流れているが、4 回目以降だとデータが流れなくなっている事が分かると思う。

まとめとして

今回は Rxjs のオペレーターの動きを視覚的に確認するためにデモアプリを実装して、実際にオペレーターの動きを理解するという事をやってみた。

フロントエンドで入力して検索を行うようなコンポーネントを開発する時に RxJS を使うと、API の実行回数が最適化できたり RxJS は便利だが、概念として分かりにくい部分もあり、導入を渋る場合もあると思う。そんな時には今回のように実際に視覚的に動きを確認できるようにする事で、概念の理解を深める事ができるのではないかと感じた。

※以下の「おまけ」にソースコード全体(ローカルに index.html を作成しそこにコピペし Chrome で開けば実行できるもの)を載せていますので、RxJS のオペレーターの動きを手元で確認する事もできます。

おまけ

デモアプリのソースコードについて

面倒だったので全部 CDN を使ってライブラリを読み込む形をとっている。また、デモアプリという事で少し雑に作っている部分もある。

以下のhtmlをローカルのindex.htmlのようなファイルにコピペし、Chromeで開けば本記事でRxJSのオペレーターの動きを確認していたでもアプリが実行できる(環境によっては、CDNからファイルを読み込めず動かない場合もあり得ます)。

<!DOCTYPE html>
<html lang="ja">
  <head>
    <meta charset="UTF-8" />
    <meta http-equiv="X-UA-Compatible" content="IE=edge" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Understand RxJS</title>
    <link
      href="https://cdn.jsdelivr.net/npm/@mdi/font@6.x/css/materialdesignicons.min.css"
      rel="stylesheet"
    />
    <link
      href="https://cdn.jsdelivr.net/npm/vuetify@2.x/dist/vuetify.min.css"
      rel="stylesheet"
    />
    <link
      rel="stylesheet"
      href="https://cdn.jsdelivr.net/gh/highlightjs/cdn-release@11.5.0/build/styles/vs2015.min.css"
    />
    <style>
      html,
      body {
        height: 100%;
      }
      .fade-enter {
        left: -30px;
      }
      .slide-enter-active {
        animation: slide-in 2s reverse;
      }
      @keyframes slide-in {
        from {
          transform: translateX(100%);
        }
        to {
          transform: translateX(-30px);
        }
      }
      .stream-line {
        position: relative;
        display: block;
        min-height: 50px;
      }
      .stream-line:after {
        content: "";
        position: absolute;
        top: calc(50%);
        width: 100%;
        height: 1px;
        background-color: black;
      }
      .stream-item {
        position: absolute;
        list-style: none;
        top: calc(50% - 20px);
        left: 0;
        width: 100%;
      }
      .log-area {
        overflow: auto;
        height: 150px;
      }
      ul {
        list-style: none;
      }
      /* vuetifyのCSS上書き */
      .theme--light.v-application code {
        background-color: #1e1e1e;
        color: #dcdcdc;
      }
    </style>
  </head>
  <body>
    <v-app id="app">
      <v-main>
        <v-container>
          <h1>RxJSのオペレーターを視覚的に理解する</h1>

          <div class="stream-line my-5">
            <transition-group name="slide" tag="ul" @after-enter="afterEnter">
              <li v-for="(message, i) in messages" :key="i" class="stream-item">
                <v-icon
                  v-if="message.toString().startsWith('a')"
                  color="primary"
                  x-large
                  >mdi-circle
                </v-icon>
                <v-icon v-else x-large>mdi-circle</v-icon>
                <input type="hidden" :value="message" />
              </li>
            </transition-group>
          </div>

          <v-card class="mx-auto" max-width="344" outlined>
            <v-card-text class="log-area">
              <ul>
                <li v-for="(log, i) in logs" :key="i">{{ log }}</li>
              </ul>
            </v-card-text>
            <v-card-actions>
              <v-spacer></v-spacer>
              <v-btn @click="clear"> クリア </v-btn>
            </v-card-actions>
          </v-card>

          <v-select
            :value="selected"
            @input="select"
            :items="items"
            item-text="text"
            item-value="value"
          ></v-select>
          <v-text-field v-model="message" outlined clearable type="text">
            <template #append-outer>
              <v-btn color="primary" @click="push" :disabled="disabledBtn">
                送信
              </v-btn>
            </template>
          </v-text-field>

          <pre v-if="selected === 'of'" key="of">
            <code class="javascript">
            /** https://rxjs-dev.translate.goog/api/index/function/of */  
            import { of } from 'rxjs';
 
            // 単に受け取った値(データ)をストリームに流す
            const subscription = of(10, 20, 30).subscribe({
              next: (next) => console.log("next:", next),
              error: (err) => console.log("error:", err),
              complete: () => console.log("the end"),
            });

            /** Outputs */
            // next: 10
            // next: 20
            // next: 30
            // the end
            </code>
          </pre>
          <pre v-if="selected === 'interval'" key="interval">
            <code class="javascript">
              /** https://rxjs.dev/api/index/function/interval */  
              import { interval } from 'rxjs';
 
              // 指定時間ごとに連番でデータをストリームに流す
              const subscription = interval(1000).subscribe(
                text => console.log('Next: ', text)
              );
 
              /** Outputs */
              // Next: 0
              // Next: 1
              // Next: 2
              // ...
            </code>
          </pre>
          <pre v-if="selected === 'merge'" key="merge">
            <code class="javascript">
              /** https://rxjs.dev/api/index/function/merge */  
              import { merge, interval } from 'rxjs';
              import { take } from 'rxjs/operators';
   
              // 複数のObservableを1本にまとめる(1本のストリームに流す)
              const subscription = merge(
                interval(1200).pipe(
                  take(5),
                  map((index) => `a: ${index + 1} / 5`)
                ),
                interval(500).pipe(
                  take(10),
                  map((index) => `b: ${index + 1} / 10`)
                )
              ).subscribe(
                text => console.log(text)
              );
   
              /** Outputs */
              // 1: b: 1 / 10
              // 2: b: 2 / 10
              // 3: a: 1 / 5
              // 4: b: 3 / 10
              // ...
            </code>
          </pre>
          <pre v-if="selected === 'map'" key="map">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/map */  
              import { of } from 'rxjs';
              import { map } from 'rxjs/operators';
   
              // ストリームに流れるデータを整形する
              const subscription = of(1, 2, 3).pipe(
                map((text) => `maped!: ${textx}`)
              ).subscribe((text) => console.log(text));
   
              /** Outputs */
              // maped!: 1
              // maped!: 2
              // maped!: 3
            </code>
          </pre>
          <pre v-if="selected === 'switchMap'" key="switchMap">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/switchMap */  
              import { Observable } from 'rxjs';
              import { switchMap } from 'rxjs/operators';
   
              // ストリームAに流れるデータを元に、別のストリームBを流す
              // ※別のストリームBが流れ終わる前に、元のストリームAにデータが流れると、
              //   ストリームBは破棄され、最初からストリームBが流れる
              const subscription = Observable.pipe(
                switchMap((text) => {
                  return interval(1500).pipe(
                    take(3),
                    map((index) => `switchMaped: ${index} => ${text}`)
                  );
                })
              );
   
              /** Outputs */
              // switchMaped: 0 => hoge
              // switchMaped: 1 => hoge
              // switchMaped: 2 => hoge
            </code>
          </pre>
          <pre v-if="selected === 'filter'" key="filter">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/filter */  
              import { Observable } from 'rxjs';
              import { filter } from 'rxjs/operators';
   
              // 条件に合致するデータのみストリームに流す
              const subscription = Observable.pipe(
                filter((text) => text === "test")
              ).subscribe((text) => console.log(text));
   
              /** Outputs */
              // test
            </code>
          </pre>
          <pre v-if="selected === 'throttleTime'" key="throttleTime">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/throttleTime */  
              import { Observable } from 'rxjs';
              import { throttleTime } from 'rxjs/operators';
   
              // 指定時間だけ連続するデータを無視する(指定時間だけデータを間引きストリームにデータを流す)
              const subscription = Observable.pipe(throttleTime(1000)).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // test(1000ミリ秒経過ごとに出力される)
            </code>
          </pre>
          <pre v-if="selected === 'debounceTime'" key="debounceTime">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/debounceTime */  
              import { Observable } from 'rxjs';
              import { debounceTime } from 'rxjs/operators';
   
              // 連続するデータの最後のデータを、指定時間経過後に1度だけストリームに流す
              const subscription = Observable.pipe(debounceTime(250)).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // test(250ミリ秒経過後1回だけ出力される)
            </code>
          </pre>
          <pre
            v-if="selected === 'distinctUntilChanged'"
            key="distinctUntilChanged"
          >
            <code class="javascript">
              /** https://rxjs.dev/api/operators/distinctUntilChanged */  
              import { Observable } from 'rxjs';
              import { distinctUntilChanged } from 'rxjs/operators';
   
              // 前回と値が異なるデータのみストリームに流す
              const subscription = Observable.pipe(distinctUntilChanged()).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // hoge
              // foo
            </code>
          </pre>
          <pre v-if="selected === 'skip'" key="skip">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/skip */  
              import { Observable } from 'rxjs';
              import { skip } from 'rxjs/operators';
   
              // 指定回数のデータをストリームに流さずにスキップする
              const subscription = Observable.pipe(skip(3)).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // test(4回送信をクリックした後に出力される)
            </code>
          </pre>
          <pre v-if="selected === 'take'" key="take">
            <code class="javascript">
              /** https://rxjs.dev/api/operators/take */  
              import { Observable } from 'rxjs';
              import { take } from 'rxjs/operators';
   
              // 指定回数のみデータをストリームに流す
              const subscription = Observable.pipe(take(3)).subscribe(
                (text) => console.log(text)
              );
   
              /** Outputs */
              // test
              // test
              // test(これ以降はログが出力されない)
            </code>
          </pre>
        </v-container>
      </v-main>
    </v-app>

    <script src="https://cdn.jsdelivr.net/npm/vue@2.x/dist/vue.js"></script>
    <script src="https://cdn.jsdelivr.net/npm/vuetify@2.x/dist/vuetify.js"></script>
    <script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
    <script src="https://cdn.jsdelivr.net/gh/highlightjs/cdn-release@11.5.0/build/highlight.min.js"></script>
    <script>
      const { Subject, of, interval, merge } = rxjs;
      const {
        take,
        map,
        switchMap,
        filter,
        throttleTime,
        debounceTime,
        distinctUntilChanged,
        skip,
      } = rxjs.operators;

      new Vue({
        el: "#app",
        vuetify: new Vuetify(),
        data: () => ({
          selected: "nothing",
          items: [
            {
              text: "nothing(「送信」をクリックする事でストリームにデータが流れる様子をデモできます。一部disabledもあります。)",
              value: "nothing",
            },
            { text: "[Creation Operators] of()", value: "of" },
            { text: "[Creation Operators] interval()", value: "interval" },
            { text: "[Join Creation Operators] merge()", value: "merge" },
            { text: "[Transformation Operators] map()", value: "map" },
            {
              text: "[Transformation Operators] switchMap()",
              value: "switchMap",
            },
            {
              text: "[Filtering Operators] filter()",
              value: "filter",
            },
            {
              text: "[Filtering Operators] throttleTime()",
              value: "throttleTime",
            },
            {
              text: "[Filtering Operators] debounceTime()",
              value: "debounceTime",
            },
            {
              text: "[Filtering Operators] distinctUntilChanged()",
              value: "distinctUntilChanged",
            },
            { text: "[Filtering Operators] skip()", value: "skip" },
            { text: "[Filtering Operators] take()", value: "take" },
          ],
          stream: null,
          stream$: null,
          subscription: null,
          counter: 0,
          logs: [],
          message: null,
          messages: [],
        }),
        computed: {
          disabledBtn() {
            return (
              this.selected === "of" ||
              this.selected === "interval" ||
              this.selected === "merge"
            );
          },
        },
        created() {
          this.stream = new Subject();
          this.stream$ = this.stream.asObservable();
          this.subscription = this.stream$.subscribe((text) =>
            this.messages.push(text)
          );
        },
        methods: {
          select(value) {
            this.selected = value;
            setTimeout(() => {
              hljs.highlightAll();
            }, 50);

            this.subscription.unsubscribe();
            this.subscription = this[value]().subscribe((text) => {
              this.messages.push(text);
            });
          },
          push() {
            this.stream.next(this.message || "not input");
          },
          addLog(value) {
            this.logs.push(`${++this.counter}: ${value}`);
          },
          clear() {
            this.logs.splice(0, this.logs.length);
            this.messages.splice(0, this.messages.length);
            this.counter = 0;
          },
          afterEnter(el) {
            this.addLog(el.querySelector("input").value);
            el.style = "display: none;";
          },
          nothing() {
            return this.stream$;
          },
          // https://rxjs.dev/guide/operators#creation-operators-2
          of() {
            return of(10, 20, 30);
          },
          interval() {
            return interval(1000);
          },
          // https://rxjs.dev/guide/operators#join-creation-operators
          merge() {
            return merge(
              interval(1200).pipe(
                take(5),
                map((index) => `a: ${index + 1} / 5`)
              ),
              interval(500).pipe(
                take(10),
                map((index) => `b: ${index + 1} / 10`)
              )
            );
          },
          // https://rxjs.dev/guide/operators#transformation-operators
          map() {
            return this.stream$.pipe(map((text) => `maped!: ${text}`));
          },
          switchMap() {
            return this.stream$.pipe(
              switchMap((text) => {
                return interval(1500).pipe(
                  take(3),
                  map((index) => `switchMaped: ${index} => ${text}`)
                );
              })
            );
          },
          // https://rxjs.dev/guide/operators#filtering-operators
          filter() {
            return this.stream$.pipe(filter((text) => text === "test"));
          },
          throttleTime() {
            return this.stream$.pipe(throttleTime(1000));
          },
          debounceTime() {
            return this.stream$.pipe(debounceTime(250));
          },
          distinctUntilChanged() {
            return this.stream$.pipe(distinctUntilChanged());
          },
          skip() {
            return this.stream$.pipe(skip(3));
          },
          take() {
            return this.stream$.pipe(take(3));
          },
        },
      });
    </script>
  </body>
</html>


《この公式ブロガーの記事一覧》


執筆者プロフィール:Katayama Yuta
SaaS ERPパッケージベンダーにて開発を2年経験。 SHIFTでは、GUIテストの自動化やUnitテストの実装などテスト関係の案件に従事したり、DevOpsの一環でCICD導入支援をする案件にも従事。 昨年に開発部門へ異動し、再び開発エンジニアに。座学で読み物を読むより、色々手を動かして試したり学んだりするのが好きなタイプ。

お問合せはお気軽に
https://service.shiftinc.jp/contact/

SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/

SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/

SHIFTの導入事例
https://service.shiftinc.jp/case/

お役立ち資料はこちら
https://service.shiftinc.jp/resources/

SHIFTの採用情報はこちら
https://recruit.shiftinc.jp/career/