ストリームでの処理は本当にパフォーマンス(速度面・メモリー面)が優れているのか?検証してみた
はじめに
こんにちは、SHIFTの開発部門に所属している Katayama です。
一般的に、大量データ処理(CSVファイルを読み込みそれを処理するなど)ではストリームが利用されることが多いと思う。そのストリームを利用するメリットとしてはWhy streamsに書かれているような、メモリ効率・時間効率の2つが良く挙げられる。
今回は速度面・メモリ効率面でストリームが本当にいいのか?をNode.jsで検証してみた(データ同期Batchを実装しようとして検証していた都合上、以下の検証ではMySQLからデータを読み取り…という場面での検証になっている)。
※以下の実装ではNode.jsでimport・export(ES6の構文)を使えるようにwebpack × Babelの設定をやってみたにあるような設定で、トランスパイルを実行する事を前提にしている。
なお、Babelによるトランスパイルをせずとも、Determining module systemに書かれているようなpackage.jsonの設定変更でも対応可能である(余談だが、最近ではいくつかのライブラリでPure ESM packageという考え方で、commonJSでは動かないライブラリも出てきており、今後Babelでのトランスパイルをするという流れは変わってくるかもしれない)。
※今回の検証環境は、VirtualBox上のCentOS7.9で、VirtualBoxの設定は以下。
事前準備
まずは大量のデータを用意する。
データ用意の方法は単純にBULK INSERTで2000行をinsertするSQLを500回実行する、というのを3回実行して300万行のデータを作成した。
const mysql = require('mysql2/promise');
const cliProgress = require('cli-progress');
const sql = `INSERT INTO \`texts\` VALUES (NULL,'この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーで'), \
...
(NULL,'この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーです。文字の大きさ、量、字間、行間等を確認するために入れています。この文章はダミーで');`
const bar = new cliProgress.SingleBar({
format: `CLI Progress |{bar}| {percentage}% || {value}/{total} counts`,
barCompleteChar: '\u2588',
barIncompleteChar: '\u2591',
hideCursor: true
});
const main = async () => {
const connection = await mysql.createConnection(config.get('mysql'));
bar.start(500, 0);
// 2000行×500回=100万行
await Promise.all(
[...Array(500)].map(async () => {
await connection.query(sql);
bar.increment();
})
);
bar.stop();
connection.destroy();
};
(async () => {
await main();
})();
※他にも大量データを用意する方法としてはデータの用意で取り上げた方法などもあるだろう。
ストリームの処理のパフォーマンス
実装
今回はmysql2というライブラリを利用する。
実装方法としてはコメント①やコメント②に書かれている事が参考になる。
import mysql from 'mysql2/promise';
import cliProgress from 'cli-progress';
// 省略
const bar = new cliProgress.SingleBar({
format: `CLI Progress |{bar}| {percentage}% || {value}/{total} counts`,
barCompleteChar: '\u2588',
barIncompleteChar: '\u2591',
hideCursor: true
});
// 省略
const main = async () => {
const hrstart = process.hrtime();
const connection = await mysql.createConnection(config.get('mysql'));
bar.start(3000000, 0);
const readerStream = connection.connection
.query('SELECT * FROM `texts`;')
.stream();
const writerStrem = new stream.Writable({
objectMode: true,
write(data, encoding, callback) {
// await connection.query(...) <- DB insert/update
bar.increment();
callback();
}
});
stream.pipeline(readerStream, writerStrem, (err) => {
if (err) console.error(err);
bar.stop();
const hrend = process.hrtime(hrstart);
console.info(`Execution time : ${hrend} sec`);
connection.destroy();
});
};
(async () => {
await main();
})();
上記の実装について少し補足をする。
stream.pipeline(readerStream, writerStrem, (err) => {...}
この実装は公式に書かれているNode.jsでストリームを実装する際に利用できる関数で、source・destinationと公式の説明にある通り、データの流入口→データの流出口をつなげるもの。
source・destinationのストリームは自分で好きなものを実装でき、Implementing a writable streamやImplementing a readable streamが参考になる。今回はnode-mysql2のストリームを流入口にし、(実際の処理は省略しているが)データの書き込みを行うwriterを流出口のストリームにしている。
※ちなみに、issueのコメントに以下のように書かれている通り、node-mysql2ではpromiseでラッピングしたconnectionのconnectionプロパティにアクセスすると、promiseではないconnectionにアクセスする事ができる。
stream()メソッド自体はpromiseでラッピングされたものには存在しないようなので、今回promiseではないconnectionにアクセスしている。
実行時間とメモリ使用率
上記のようなBatchを実行すると、以下のようなパフォーマンスになる(表の数値は3回実行した平均値であり、動画はその中の1回を映したもの)。
※上記のメモリの使用量の観察は以下のようなwatch コマンドとfreeコマンドを利用している。
study@localhost:~/workspace/learn-mysql (main *)
$ watch --interval 1 --no-title free
ストリームを用いない処理
実装
何も気にしないといったものの、300万行あるテーブルから全行をSELECTしようとすると、以下のようにOut of memoryで強制終了してしまうので、100万行ずつに区切って処理する事にする(以下のPIDはNode.jsのprocess.pidで確認できるもの)。
study@localhost:~/workspace/learn-mysql (main *)
$ node dist/index.js
32581
強制終了
処理の実装としては以下。
// 省略
const main = async () => {
// 省略
try {
bar.start(3000000, 0);
await Promise.all(
[...Array(3)].map(async (_, i) => {
const [rows] = await connection.query(
'SELECT * FROM `texts` LIMIT ? OFFSET ? ;',
[1000000, i * 1000000]
);
await Promise.all(
// eslint-disable-next-line no-unused-vars
rows.map(async (row) => {
// await connection.query(...) <- DB insert/update
bar.increment();
})
);
})
);
bar.stop();
const hrend = process.hrtime(hrstart);
console.info(`Execution time : ${hrend} sec`);
} catch (err) {
console.error(err);
} finally {
connection.destroy();
}
};
(async () => {
await main();
})();
実行時間とメモリ使用率
上記のようなBatchを実行すると、以下のようなパフォーマンスになる(表の数値は3回実行した平均値であり、動画はその中の1回を映したもの)。
まとめとして
上記の比較を表にまとめると以下のようになり、ストリームを用いない処理の方の実装でもPromise.allを利用した実装なため、並列に処理自体は行われるので速度面でのパフォーマンスが悪い実装ではないが、ストリームの処理の方がそうでない処理に比べて速度面・メモリ面で有利である事が分かった(ストリームを用いない処理のPromise.allを利用した実装についてはmap()で await をした場合のパフォーマンスを参照)。
この事から大量データの処理で、どこまで処理されたのか?というポインタのようなものが分かるような処理にしなくていい場合には、ストリームを利用すべきだろう(データ同期の緊急対応Batchの実装では、ストリームではなく、少し速度面で不利だとしても正確に処理できる(シグナル処理に対応し、意図した所まで処理が進んでBatchが止まる)ようにするために、for文でawaitするというESLintでは「forループ内でawaitしたらESLintにtoo heavyweightって言われたから本当なのか試してみた ESLintのno-await-in-loopルールの意味」にあるように怒られる処理の方が良かったりするだろう(場面場面の目的に合わせてどの処理方式を利用するか?検討する必要があるだろう))。
《この公式ブロガーの記事一覧》
お問合せはお気軽に
https://service.shiftinc.jp/contact/
SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/
SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/
SHIFTの導入事例
https://service.shiftinc.jp/case/
お役立ち資料はこちら
https://service.shiftinc.jp/resources/
SHIFTの採用情報はこちら
https://recruit.shiftinc.jp/career/