Node.jsでシグナル処理の実装をして可視化してみた
はじめに
こんにちは、SHIFTの開発部門に所属している Katayama です。
ストリームでの処理は本当にパフォーマンス(速度面・メモリー面)が優れているのか?検証してみたではストリームを利用したバッチを実装し、そのパフォーマンスを通常の実装との比較で検証してみた。
その記事のまとめでは以下のように書いていた通り、一部のバッチ処理ではストリームを利用すべきでない場合があると触れていた。
バッチ処理では少なくとも処理中に外部から何らかのシグナルが送られてきたときの処理(シグナル処理)を実装する必要がある。
理由としては、バッチ処理はビジネス処理上必要な処理を実行したり、データ不整合の修正に利用されるため、手動実行や何かのトラブルを想定してシグナル処理を実装しておかないと、データ不整合が発生したり、データ修正バッチなのにさらにデータ不整合が出てしまうという事になってしまうため。その意味で最低限、バッチ処理の実装時にはシグナル処理を実装する事が必要になる。
今回はシグナルを受信した時の実際の動きをNode.jsで確認してみたいと思う。
また、緊急対応用のデータ同期バッチなど特にクリティカルな要件(どこまで処理したか?ログに出す、一定データまでは処理が完了してからバッチが終了する、など)のあるバッチ処理で、シグナル処理を実装しようとした時の一番簡単そうな実装例を取り上げている(Windows は扱わず、今回は Linux のみとしている)。
シグナル受信時の動きを可視化してみる
実装について
/* eslint-disable no-unused-vars */
/* eslint-disable no-await-in-loop */
/* eslint-disable no-promise-executor-return */
const { createLogger, format, transports } = require("winston");
const logger = createLogger({
format: format.simple(),
transports: [new transports.File({ filename: "signal_example.log" })],
});
const sleep = (ms) => new Promise((resolve, reject) => setTimeout(resolve, ms));
const main = async () => {
for (let i = 0; i < 100; i += 1) {
logger.info(i);
await sleep(500);
}
};
(async () => {
await main();
})();
console.log(process.pid);
const exit = (signal, value) => {
logger.info(`signal ${signal} received`);
console.log("now shut donw ... please wait");
setTimeout(() => {
logger.info(`process exit by ${signal}`);
process.exit(value);
}, 1000);
};
process.on("SIGHUP", () => {
exit("SIGHUP", 1);
});
process.on("SIGINT", () => {
exit("SIGINT", 2);
});
process.on("SIGTERM", () => {
exit("SIGTERM", 15);
});
上記のように、Node.js ではビルトインのモジュールであるProcessを利用して process.on('SIGINT', () => {...}); のようにリスナーを登録する事ができ、それによりシグナル処理を実装する事ができる(Signal eventsを参照)。
具体的にはシグナルを受信した際、何もシグナル処理を実装していない場合はすぐにプロセスが終了して処理が中断するが、シグナル処理を実装をしている場合には、プロセスが終了する前に任意の処理を挟み、それが完了してからプロセスを終了させる事ができる(上記の実装では、バッチ処理のイメージでシグナル受信後1000ミリ秒待ってからプロセスを終了する、といい実装をしている)。
各シグナルを受信した時の挙動を確認してみると以下の動画の通り(それぞれ意図した通り、シグナルを受信後すぐにプロセスが中断するのではなく、1000 ミリ秒待ってから終了している事が分かる)。
SIGHUP(ターミナルを閉じる)
info: 0
info: 1
info: 2
info: 3
info: 4
info: signal SIGHUP received
info: now shut donw ... please wait
info: 5
info: 6
info: process exit by SIGHUP
※ターミナルを閉じると console.log は表示されないので、今回はファイルに logging できるwinstonというライブラリを利用している。
SIGINT(Ctrl キー + C キー)
info: 0
info: 1
info: 2
info: signal SIGINT received
info: now shut donw ... please wait
info: 3
info: 4
info: process exit by SIGINT
SIGTERM(kill コマンドのデフォルト実行時)
info: 0
info: 1
info: 2
info: 3
info: 4
info: 5
info: signal SIGTERM received
info: now shut donw ... please wait
info: 6
info: 7
info: process exit by SIGTERM
※kill コマンドについては強制終了("kill -KILL {process id}" or "kill -9 {process id}")の場合、リスナー登録はできず即時でプロセスが停止してしまうので注意(強制終了のシグナルは SIGKILL であり、SIGTERM とは別物)。
処理が完了した位置をログに出力したり、一定データまで処理してからバッチが止まるようにする
上記でシグナルを受信して直にバッチ処理を終了させるのではなく、安全に終了させる事はできるようになったので、上記の実装を少し拡張して、以下のような要件も満たせる実装をやってみようと思う。
・中断時に、処理が完了した位置をログに出力させ、どこまで処理が完了しているか?を把握できるようにする
・一定のデータ分で処理を区切り、その処理が完了してからバッチが終了するようにする
実装としては以下のようになるだろう。
import { program, InvalidArgumentError } from "commander";
// 省略
// eslint-disable-next-line no-unused-vars
const validateParseInt = (value, _) => {
const parsed = parseInt(value, 10);
if (Number.isNaN(parsed)) throw new InvalidArgumentError("Not a Number");
if (parsed < 0 || parsed > 1000)
throw new InvalidArgumentError(
"must be greater than 0 and less than or equal 1000"
);
return parsed;
};
program
.requiredOption(
"-bs, --block-size <size>",
"size of the block to be processed",
validateParseInt
)
.parse(process.argv);
const { blockSize } = program.opts();
// 省略
const main = async () => {
const hrstart = process.hrtime();
const connection = await mysql.createConnection(config.get("mysql"));
try {
const [count] = await connection.query(
"SELECT MAX(id) as max FROM `texts`;"
);
const maxId = count.shift().max;
bar.start(Math.floor(maxId / blockSize), 0);
for (let i = 0; i < maxId; i += blockSize) {
const startId = i + 1;
const currentId = i + blockSize;
// eslint-disable-next-line no-await-in-loop
const [rows] = await connection.query(
"SELECT * FROM `texts` LIMIT ? OFFSET ? ;",
[blockSize, startId]
);
// eslint-disable-next-line no-restricted-syntax, no-await-in-loop, no-unused-vars
for await (const row of rows) {
// await connection.query(...) <- DB insert/update
bar.increment();
}
if (receivedSignal) {
console.log(`Processed id is "${currentId}"`);
break;
}
}
bar.stop();
const hrend = process.hrtime(hrstart);
console.info(`Execution time : ${hrend} sec`);
} catch (err) {
console.error(err);
} finally {
connection.destroy();
}
};
(async () => {
await main();
})();
const exit = (signal) => {
bar.stop();
receivedSignal = signal;
console.log(`signal ${signal} received`);
console.log("now shut donw ... please wait");
};
process.on("SIGINT", () => {
exit("SIGINT");
});
実装はなかなか簡易的なものだが、上記でやりたい事は実現できている。以下で少し上記の実装について補足する。
for (let i = 0; i ・中断時に、処理が完了した位置をログに出力させ、どこまで処理が完了しているか?を把握できるようにする
という要件を満たすために敢えて for 文内で await を行っている。本来はfor ループ内で await したら ESLint に too heavyweight って言われたから本当なのか試してみた ESLint の no-await-in-loop ルールの意味にある通り、ESLint で(パフォーマンス上、不利な実装なため)エラーになるが、今回の要件としてどこまで処理されたか?が分かる、一定のデータ分で区切って処理が必ず完了するようにする、というのがあるので、並列処理よりは直列処理の方が要件を安全に満たせるので、このような実装をしている。
if (receivedSignal) {...}
ここの実装が一番のポイントになるが、SIGINT などのシグナルを受け取った場合、receivedSignal に値が入るため、バッチ処理の途中で中断して終了になるようにしている。このシグナル受信チェックを for 文の 1 つ目(データをブロック単位に区切り処理する部分)の終わりに実装する事で、以下の要件を満たせるようにしている。
※上記の実装では SIGINT のみだが、本来は他のシグナル(SIGTERM など)にも対応する必要があるが今回は省略している
まとめとして
今回はバッチ処理の際に考慮すべきシグナル処理について触れ、シグナル受信時の動きを可視化したり、実際にミッションクリティカルな要件のバッチ処理でのシグナル処理の実装をやってみた。
シグナル処理をきちんと実装することで、意図した所まで処理を完了させて、そのあと安全に処理を終了させたり、中断したところからバッチ処理を再開するといったことができるようになるので、バッチ処理においては最低限、シグナル処理を実装する事は重要だと感じた。
《この公式ブロガーの記事一覧》
お問合せはお気軽に
https://service.shiftinc.jp/contact/
SHIFTについて(コーポレートサイト)
https://www.shiftinc.jp/
SHIFTのサービスについて(サービスサイト)
https://service.shiftinc.jp/
SHIFTの導入事例
https://service.shiftinc.jp/case/
お役立ち資料はこちら
https://service.shiftinc.jp/resources/
SHIFTの採用情報はこちら
https://recruit.shiftinc.jp/career/