雑多に技術メモと他色々

主に自分用な技術メモが多くなる気がする。他色々が書かれるかどうかは不明。

SqoopのせいでDBに数値型サロゲートキー(代替キー)をつけておいたほうがいいなと思った話 ※とりあえずMySQL限定

Apache SqoopとMySQLを連携させる場合、数値型サロゲートキーがないと面倒なケースの紹介。ナチュラルキー、サロゲートキーどっちがいいのって話ではない。

環境などの前提

MySQL InnoDBエンジンのテーブルから、Sqoopでデータexportをしたい場合の話。
ナチュラルキー、サロゲートキーって何?という部分には触れない。
きちんとした説明を公開してる場所が多いからググった方がいいです。

簡単な経緯

DBに大量蓄積したデータに対してHadoop処理を適用しよう。
せっかくだしデータ抽出にApache Sqoopを利用しようぜ。

こんな意見が出てデータ抽出を検証してみた。
しかし、データ抽出してみると重複が発生するなど思うように動かないところが出てきて、何が問題なのかという点にハマッてしまった。

結論から書くと、抽出対象のテーブルに数値型のサロゲートキー(もしくはそれに準ずるキー)があるかということには注目しておくべき。無かったとしたら、Sqoopを使うにあたって数値型サロゲートキーを追加するという変更も検討した方がいい。

ここを見ておかないと自分(達)のようにハマるかもしれない。

Sqoopの概要

参考:Apache Sqoop - Wikipedia

Apache Sqoop は関係データベースとHadoopの間でデータ転送を行うためのコマンドラインインターフェースアプリケーションである。

RDBに大量蓄積したデータをHadoopで処理させたいとなった場合、SQLなどでデータを抽出した上で、最低でもHDFS、可能ならHBaseやHiveにデータを取り込みたい、ということになるだろう。

Sqoopはこのデータの抽出や変換部分をいい感じで担当してくれるプロジェクト。環境さえ整えば、特定のRDBテーブルからデータ抽出し、望む形へデータ変換して取り込むところまでをコマンドライン一本でやってくれる。(逆方向もできる)

また、下記のような特徴もある。

  • RDBからのデータ抽出時に自動でテーブル定義を特定してくれる。
  • Sqoop処理はHadoop上のMapタスクとして動作して、タスク数を指定することもできる。タスク数に応じて、SELECT範囲の分散も勝手にやってくれる。

つまり、利用者は詳細なテーブル定義を把握していなくてもちゃんとデータ抽出できるし、タスク数の指定だけで自動的にHadoop上で性能分散もできるのだ。


…というのが理想的な触れ込みである。

データ抽出分散の仕組み

ハマったポイントはデータ抽出を分散する箇所に関連するため、その仕組みを説明する。
Sqoopでのデータ抽出処理の分散は、下記のように実施される。

  1. 抽出対象テーブル主キーのMIN, MAX値を取る。
  2. 上記で抽出したMIN - MAXの中間の値を、分散数に応じて計算する。
  3. MIN, MAX, 中間値を利用したSELECT文を構築し、データを抽出する。

例を書くと、単純なデータで分散数を2にしたときならこんなイメージになる。

1. MIN, MAX値を取る
SELECT MIN(id), MAX(id) FROM test_tbl;

MIN(id) MAX(id)
0 100

2. 中間値を計算する
上記のMIN, MAX値だとこんなデータが出来上がるイメージだ。

split[0] = 0
split[1] = 50
split[2] = 100

3. SELECT文を構築
中間値50を利用して、MIN-MAXを全て抽出するためのSQL2つを構築する。
(1) SELECT * FROM test_tbl WHERE (id => 0) AND (id < 50);
(2) SELECT * FROM test_tbl WHERE (id => 50) AND (id <= 100);


こんな仕組みで動いてるとなると、こういうケースはどうやって利用するんだ?という疑問もいくつか出てくる。

例として、下記のようなものはコマンドオプションで解決することができる。

  • 主キー以外を分散キーとして利用する - --split-by [column-name]
  • データ抽出カラムを絞り込む - --columns [col,col,col…]
  • データ抽出対象をWHERE句で絞り込む - --where [where clause]
  • 1テーブルじゃなくてJOINとかもして抽出する - --query [statement]
  • MIN-MAXのとり方をカスタマイズする - --boundary-query [statement]


しかしながら、そう簡単にいかないケースもある。

ケース1:複合キーで分散

例えばPRIMARY KEY(col1,col2) として複合主キーで分散しているケース。
これはどう処理されるか。

残念ながら複合主キー両方を器用に考慮して計算なんてことはしてくれない。デフォルトの処理に任せると、複合主キーの場合は1カラム目のみが分散キーとして利用される。

このケースで問題になる例は、ある程度の分散数でデータ抽出をしたいのに1カラム目のCardinalityが非常に低い場合。結局、処理がどれだけ分散できるかはカラムのCardinalityに依存してしまう。
1カラム目にはデータが2種類しか入らないのだとしたら、どんなに頑張っても2分割が効率化の限界になってしまうわけだ。

2カラム目のCardinalityが高いならそっちで分割すればいいじゃんってなるかもしれないが、それだとINDEXが効かない。フルスキャンで並列処理するくらいなら、分割やめたほうがよくね?ってなりかねない。

一応DBの複合キーはCardinalityの高いものから貼るべし、という基本ノウハウがあり、これに従っていればある程度避けられるケースではある。

ケース2:文字列キーで分散

使ってみた感じ、一番ハマッてしまった箇所。

例えばCHAR(5)の主キーを使っていたとして、ID000ID100を2分割したときの中間値はどうなるのか。
ID050だと思った人は甘い。IDに続く3桁を数値のように扱っているのは、データ利用者がそのようにデータを挿入しているからに過ぎない。
ID009の次はID010ではなく、ASCIIコード表に従い、ID00:となる。こんな考え方をベースに分割ポイントが計算される。

結局ID000ID100の間は何か?というとSqoopの答えはこう。

Split[0] = ID000
Split[1] = ID0耰0
Split[2] = ID100

実行環境によって表示は変わるかもしれないが、とにかくわけわからない分割が実行されている。

なぜこうなるかという理由を追ってみた。
文字列の分割計算処理はorg.apache.sqoop.mapreduce.db.TextSplitterで行われている。gitから落としたソースを読んだ限り、分割計算はこんな流れで実施されている。

  1. MINの文字列とMAXの文字列の先頭から共通している部分をPREFIXとする。
  2. PREFIXに続く文字を8文字まで抽出し、BigDecimalに変換する。
  3. BigDecimalの値をベースにして、数値で分割ポイントを計算する。
  4. 分割ポイントの計算結果を、BigDecimalから文字列に戻す。

文字をBigDecimalに変換ってどう処理しているか気になるが、TextSplitterのjavadocにはこんなことが書かれている。

The algorithm used is as follows: Since there are 2**16 unicode characters, we interpret characters as digits in base 65536.

ソースの処理も含めて読み解き切れていないのだが、文字列を65536を基数とした数値に変換してから計算するってことだろうか。

単純に数値で分割できないためか、これを使うと色々問題が発生することもある。
以降に文字列キーで分散を試みた場合の問題点の例(嫌疑含む)。

文字列キー分散の問題1:DBが大文字小文字の区別をしない場合

これはまだわかりやすいケース。
このことはご丁寧にWARNログでも出してくれる。

Generating splits for a textual index column.
If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
You are strongly encouraged to choose an integral split column.

大文字小文字の区別をしないでソートされるDBの場合、データの重複や欠損が発生する場合があるってこと。
例えばMINが0Z, MAXが0zなら中間値は0jになる。これでデータ抽出をすると下記の範囲になる。

(1) seq >= "0Z" AND seq < "0j"
(2) seq >= "0j" AND seq <= "0z"

DBが大文字小文字の区別をしていない場合、上記範囲では"0a" - "0i"が抽出対象から漏れる、といったことが起こり得る。*1

このように、開発側が公式にTextSplitterが正しく適用できないケースを例示しているのだが、実際の所in a case-insensitive orderではないのにデータ重複や欠損が発生してしまうケースがあるようだ。
以降、この問題以外でも動作が怪しいと思われるケースを記載。

文字列キー分散の問題2:3バイト文字, 4バイト文字

公式に情報が得られていないので嫌疑レベルの話。
3バイト文字, 4バイト文字を含むエンコーディングの場合にちゃんと動くのかという点。

これは文字列分割をするときに65536を基数として計算しているところが怪しい。
65536分割って2バイト表現が限界じゃね…?その範囲で扱える文字でしか正常に処理できないのではないかという気がしてしまう。

3バイト文字, 4バイト文字が入ったカラムを分割キーすることは少ないと思うが、たとえ分割キーのカラムASCIIオンリーだったとしてもSqoopがそれを判断するわけじゃない。
テーブルの文字コードが2バイトエンコードの種別にでもなっていない限り、動作は怪しい気がする。

文字列キー分散の問題3:制御文字やエスケープシーケンス

文字列分割の結果だが、どうも制御文字0x1B ESCエスケープシーケンス0x5C \も選択されるようだ。

TextSplitterのテストケースをベースにして下記のように処理させてみると、制御文字なども分割結果のsplitsに含まれることがわかる。

    TextSplitter splitter = new TextSplitter();
    List<String> splits = splitter.split(65536, "00", "10", "PRE_");


こうなると、制御文字をSQLに送った時にちゃんと処理されるかといったことや、偶然エスケープシーケンス含めて\nといったシーケンスが分割文字列に含まれてしまっても大丈夫なのかって話が出てきてしまう。
問題になるケースってかなりパターンがありそうで、漏れ無く対応するのは難しいんじゃないかという印象。処理の穴も残っているのではないかという疑いがある。*2

解決方法

記事タイトルにもあるのだが、処理分割で問題を発生させないためにはBIGINT AUTO_INCREMENTあたりでサロゲートキーを貼っておいて、それを分割キーとして利用するのが単純、かつ分割処理も活用できる解決方法だと思っている。
文字列型を分割キーにした時のTextSplitterはどうにも挙動が謎で、使われる場合の処理妥当性が判断しにくい。

データ重複欠損を発生させないためには下記のような手も使えるが、性能などに制約がかかってしまう。

処理の分割数を1にする
分散できないならSqoopじゃなくていいじゃん…当然だが、性能スケールアウトも捨てることになる。
キー以外で今ある数値型のカラムで代替する
分散の妥当性が微妙だし、そもそもあるかどうかわからん。INDEXが貼ってなければ超低性能。
RANK関数を噛ませて処理する
ググるとこんな意見もあるが他のDBMSの話。MySQLにRANK関数はない。

まとめ

  • SqoopとMySQLを連携させて分散処理したいなら、数値型のキーが使えるかどうか要チェック。
  • 数値型キーがないなら、追加できないか検討した方がいい。
  • 文字列型を分割計算するTextSplitterは厄介。中途半端なコードポイントも普通に選択される。
  • メタ文字の扱いが怪しいので、TextSplitterでの分割はまだ実用レベルじゃない気がする。
  • ぶっちゃけTextSplitterほど多数の文字を考慮しない、AlphaNumSplitterとかが欲しい。(希望)

参考資料

GitHub - apache/sqoop: Mirror of Apache Sqoop
branch-1.4.6
Sqoop User Guide (v1.4.2)

ソースとユーザガイドのバージョン合ってないけど勘弁して下さい。

*1:そもそもDBが大文字小文字の区別をしないなら、MINが"0Z"でMAXが"0z"とはならないはず。とりあえずここでは例の簡略化のため、強引にそうなったものとする。

*2:PreparedStatementでバインド変数にすればある程度解決できるはずだけど、ソース見るとそれすらやってなさそうなのよね…