koukiblog

たぶんweb系の話題

MySQLのテーブルをEmbulkを利用してBigQueryに転送する

MySQLのデータをBigQueryに転送する方法はいくつかありますが、Embulkを利用してMySQLから直接データを取得してBigQueryに流し込んでみました。 別途Fluentdで集めているアクセスログRDBのデータを突き合わせて分析するのが目的です。

Embulkとは

TreasureData製のDataLoaderです。

Embulk は、リアルタイムなログ収集では常識となった fluentd のバッチ版のようなツールで、ファイルやデータベースからデータを吸い出し、別のストレージやデータベースにロードするためのコンパクトなツールです。

とのことです。( 並列データ転送ツール『Embulk』リリース! - Blog by Sadayuki Furuhashi に記載あり)

fluentdのようにプラグインを追加することで様々なデータソースに対応することができます。

Embulkのセットアップ

インストールは↓に記載があります。Javaで開発されているため、Javaが必要です。 GitHub - embulk/embulk: Embulk: Pluggable Bulk Data Loader. http://www.embulk.org

MySQL→Embulk→BigQuery という流れになるので、inがMySQL、outがBigQueryになります。それぞれプラグインが存在しているのでそれを利用します。

それぞれ、

embulk-input-mysql

https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql

embulk-out-bugquery

GitHub - embulk/embulk-output-bigquery: Embulk output plugin to load/insert data into Google BigQuery

を利用します。

インストールは、

embulk gem install embulk-out-bigquery

のように行えます。

embulk-input-mysql

embulkを実行するサーバからMySQLに接続できるようになっている必要があります。 inputプラグインなので、inセクションに記述します。 下記のように設定します。

 in:
   type: mysql
   user: "MySQLのユーザー"
   password: "MySQLのパスワード"
   database: "データベース名"
   table: "テーブル名"
   host: "MySQLのホスト"
   select: "セレクトするカラム(カンマ区切り)"
   parser:
    type: json

parserがjsonになっているのは、BigQueryに転送するときにこちらの方が都合がよいからです。(デフォルトはCSV) CSVの場合、データ内に改行コードが含まれている場合の考慮が必要なのですが、jsonであればエスケープされるため楽になります。BigQueryにCSVをimportするときは行の区切りがCRLFでないといけないようで、データ内にCRLFが存在していた場合、CSVだとうまく取り込むことができませんでした(しっかり検証したわけではないので、設定次第では取り込めるかもしれません) embulk-input-mysql のオプションでwhere句を追加したり、select後にupdate文を実行することもできます。

embulk-output-bigquery

embulkを実行するサーバからBigQueryに接続できるようになっている必要があります。 outputプラグインなので、outセクションに記述します。 下記のように設定します。

out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: "json_keyのパス"
  path_prefix: tmp/
  file_ext: .jsonl.gz
  source_format: NEWLINE_DELIMITED_JSON
  project: "プロジェクトID"
  dataset: "データセット名"
  auto_create_table: true
  table: "テーブル名"
  schema_file: "スキーマファイルのパス"
  formatter: {type: jsonl}
  encoders:
  - {type: gzip}

modeをreplaceにすると、BigQuery側のテーブルをクリアして再生成されます。毎回全件を転送する想定だったのでreplaceにしてあります。 replace以外にもいくつかあります( https://github.com/embulk/embulk-output-bigquery#mode )が、ログデータでなければ追記していく場合はあまりないと思うので、基本的にはreplaceでよいと思います。 前述の通り、MySQLから取得したデータはjsonにしているので、formatterとsource_formatをJSONに設定します。 また、BigQuery側テーブルのスキーマを定義するファイルを別途用意する必要があります。

[
  {
    "name": "id",
    "type": "INTEGER"
  },
 ....
]

のようにカラム名と型を記述していきます。ここのnameとinput-mysqlのselect オプションで指定したカラム名は一致している必要があるようです。

ここまでで、 in/out 共に設定が終わっているので、embulkを実行するとMySQLからデータを取得し、BigQueryにデータが転送されますと思います。

設定ファイルの共通化

MySQLのテーブルごとに設定ファイルを記述していくのですが、上記の設定方法では、MySQLへの接続情報、BigQUeryへの接続情報など重複する部分が出てきます。 Embulkの設定ファイルでは実はLiquidというテンプレートエンジン( – Liquid template language ) が利用できるのでそれを利用して共通化します。

注意: 0.8.28, 0.8.29では、includeができなくなっているようです( Embulk 0.8.28 and 0.8.29 doesn't work Liquid {% include %}. · Issue #757 · embulk/embulk · GitHub)。0.8.18を利用しましょう。embulk selfupdate 0.8.18 でダウングレードすることできます。→ 0.8.30 で解消されました

↓にあるように、includeを利用して共通部分を別ファイルに切り出すことが可能です。 qiita.com 僕の場合は、

in:
  type: mysql
  user: "ユーザ名"
  password: "パスワード"
  database: "データベース名"
  table: {{ table_name }}
  host: "ホスト"
  select: {{ select_columns }}
  parser:
   type: json
out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: "keyfileのパス"
  path_prefix: tmp/
  file_ext: .jsonl.gz
  source_format: NEWLINE_DELIMITED_JSON
  project: "プロジェクト名"
  dataset: "データセット名" 
  auto_create_table: true 
  table: {{ bq_table_name }}
  schema_file: bq_schemas/{{ bq_schema_name }}.json
  formatter: {type: jsonl}
  encoders: 
  - {type: gzip}

という共通で利用する設定ファイルを作成し、各設定ファイルで下記のようにincludeして利用するようにしました

{% include 'partials/mysql2bq',
   table_name: 'MySQLのテーブル名',
   select_columns: "セレクトするカラム",
   bq_table_name: 'BigQueryのテーブル名',
   bq_schema_name: 'スキーマファイルのパス'
  %}

ステージング環境、本番環境向けの接続設定の切り替えもLiquidを利用して分岐することもできるようなのですが、僕の場合はそこはchefでやることにしました。yamlファイルを生成すればよいだけなのでいろいろやり方はあると思います。 twitterで聞いてみたところyaml_masterもよく使われているみたいです。

特定のディレクトリの設定ファイルを一括で実行する

embulk の runコマンドにはワイルドカードを指定できなかったので↓のようにしました

ls configs/*.yml.liquid | xargs -I {} embulk run {}

まとめ

プラグインインストールしてちょっと設定するだけで動くのでよい感じでした。 input-mysql, output-bigquery 共にいろいろオプションがあるのですが、数万件程度のテーブルをBigQueryに転送する用途であればこのままで使えています。 まだ分析がきちんと立ち上がっているわけではないので、cronの定期実行&失敗はcronのログで検知で進めてますが、今後はDigDag,Airflowなどのワークフローエンジンの導入を検討しようと思ってます あと、他にはAWSのDataPipelineも検討したのですが、GUIが難解でしかも重すぎるので挫折しました。。