MySQLのテーブルをEmbulkを利用してBigQueryに転送する
MySQLのデータをBigQueryに転送する方法はいくつかありますが、Embulkを利用してMySQLから直接データを取得してBigQueryに流し込んでみました。 別途Fluentdで集めているアクセスログとRDBのデータを突き合わせて分析するのが目的です。
Embulkとは
TreasureData製のDataLoaderです。
Embulk は、リアルタイムなログ収集では常識となった fluentd のバッチ版のようなツールで、ファイルやデータベースからデータを吸い出し、別のストレージやデータベースにロードするためのコンパクトなツールです。
とのことです。( 並列データ転送ツール『Embulk』リリース! - Blog by Sadayuki Furuhashi に記載あり)
fluentdのようにプラグインを追加することで様々なデータソースに対応することができます。
Embulkのセットアップ
インストールは↓に記載があります。Javaで開発されているため、Javaが必要です。 GitHub - embulk/embulk: Embulk: Pluggable Bulk Data Loader. http://www.embulk.org
MySQL→Embulk→BigQuery という流れになるので、inがMySQL、outがBigQueryになります。それぞれプラグインが存在しているのでそれを利用します。
それぞれ、
embulk-input-mysql
https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql
embulk-out-bugquery
を利用します。
インストールは、
embulk gem install embulk-out-bigquery
のように行えます。
embulk-input-mysql
embulkを実行するサーバからMySQLに接続できるようになっている必要があります。 inputプラグインなので、inセクションに記述します。 下記のように設定します。
in: type: mysql user: "MySQLのユーザー" password: "MySQLのパスワード" database: "データベース名" table: "テーブル名" host: "MySQLのホスト" select: "セレクトするカラム(カンマ区切り)" parser: type: json
parserがjsonになっているのは、BigQueryに転送するときにこちらの方が都合がよいからです。(デフォルトはCSV) CSVの場合、データ内に改行コードが含まれている場合の考慮が必要なのですが、jsonであればエスケープされるため楽になります。BigQueryにCSVをimportするときは行の区切りがCRLFでないといけないようで、データ内にCRLFが存在していた場合、CSVだとうまく取り込むことができませんでした(しっかり検証したわけではないので、設定次第では取り込めるかもしれません) embulk-input-mysql のオプションでwhere句を追加したり、select後にupdate文を実行することもできます。
embulk-output-bigquery
embulkを実行するサーバからBigQueryに接続できるようになっている必要があります。 outputプラグインなので、outセクションに記述します。 下記のように設定します。
out: type: bigquery mode: replace auth_method: json_key json_keyfile: "json_keyのパス" path_prefix: tmp/ file_ext: .jsonl.gz source_format: NEWLINE_DELIMITED_JSON project: "プロジェクトID" dataset: "データセット名" auto_create_table: true table: "テーブル名" schema_file: "スキーマファイルのパス" formatter: {type: jsonl} encoders: - {type: gzip}
modeをreplaceにすると、BigQuery側のテーブルをクリアして再生成されます。毎回全件を転送する想定だったのでreplaceにしてあります。 replace以外にもいくつかあります( https://github.com/embulk/embulk-output-bigquery#mode )が、ログデータでなければ追記していく場合はあまりないと思うので、基本的にはreplaceでよいと思います。 前述の通り、MySQLから取得したデータはjsonにしているので、formatterとsource_formatをJSONに設定します。 また、BigQuery側テーブルのスキーマを定義するファイルを別途用意する必要があります。
[ { "name": "id", "type": "INTEGER" }, .... ]
のようにカラム名と型を記述していきます。ここのnameとinput-mysqlのselect オプションで指定したカラム名は一致している必要があるようです。
ここまでで、 in/out 共に設定が終わっているので、embulkを実行するとMySQLからデータを取得し、BigQueryにデータが転送されますと思います。
設定ファイルの共通化
MySQLのテーブルごとに設定ファイルを記述していくのですが、上記の設定方法では、MySQLへの接続情報、BigQUeryへの接続情報など重複する部分が出てきます。 Embulkの設定ファイルでは実はLiquidというテンプレートエンジン( – Liquid template language ) が利用できるのでそれを利用して共通化します。
注意: 0.8.28, 0.8.29では、includeができなくなっているようです( Embulk 0.8.28 and 0.8.29 doesn't work Liquid {% include %}. · Issue #757 · embulk/embulk · GitHub)。0.8.18を利用しましょう。embulk selfupdate 0.8.18 でダウングレードすることできます。→ 0.8.30 で解消されました
↓にあるように、includeを利用して共通部分を別ファイルに切り出すことが可能です。 qiita.com 僕の場合は、
in: type: mysql user: "ユーザ名" password: "パスワード" database: "データベース名" table: {{ table_name }} host: "ホスト" select: {{ select_columns }} parser: type: json out: type: bigquery mode: replace auth_method: json_key json_keyfile: "keyfileのパス" path_prefix: tmp/ file_ext: .jsonl.gz source_format: NEWLINE_DELIMITED_JSON project: "プロジェクト名" dataset: "データセット名" auto_create_table: true table: {{ bq_table_name }} schema_file: bq_schemas/{{ bq_schema_name }}.json formatter: {type: jsonl} encoders: - {type: gzip}
という共通で利用する設定ファイルを作成し、各設定ファイルで下記のようにincludeして利用するようにしました
{% include 'partials/mysql2bq', table_name: 'MySQLのテーブル名', select_columns: "セレクトするカラム", bq_table_name: 'BigQueryのテーブル名', bq_schema_name: 'スキーマファイルのパス' %}
ステージング環境、本番環境向けの接続設定の切り替えもLiquidを利用して分岐することもできるようなのですが、僕の場合はそこはchefでやることにしました。yamlファイルを生成すればよいだけなのでいろいろやり方はあると思います。 twitterで聞いてみたところyaml_masterもよく使われているみたいです。
皆さんyaml_masterなどのツールをつかってそのようにされているようです。https://t.co/GuS56DRHjt あとはhttps://t.co/DWu3RAOXNT と連携とかですかね。
— hiroyuki sato (@hiroysato) 2017年8月8日
特定のディレクトリの設定ファイルを一括で実行する
embulk の runコマンドにはワイルドカードを指定できなかったので↓のようにしました
ls configs/*.yml.liquid | xargs -I {} embulk run {}
まとめ
プラグインインストールしてちょっと設定するだけで動くのでよい感じでした。 input-mysql, output-bigquery 共にいろいろオプションがあるのですが、数万件程度のテーブルをBigQueryに転送する用途であればこのままで使えています。 まだ分析がきちんと立ち上がっているわけではないので、cronの定期実行&失敗はcronのログで検知で進めてますが、今後はDigDag,Airflowなどのワークフローエンジンの導入を検討しようと思ってます あと、他にはAWSのDataPipelineも検討したのですが、GUIが難解でしかも重すぎるので挫折しました。。