koukiblog

たぶんweb系の話題

MySQLのテーブルをEmbulkを利用してBigQueryに転送する

MySQLのデータをBigQueryに転送する方法はいくつかありますが、Embulkを利用してMySQLから直接データを取得してBigQueryに流し込んでみました。 別途Fluentdで集めているアクセスログRDBのデータを突き合わせて分析するのが目的です。

Embulkとは

TreasureData製のDataLoaderです。

Embulk は、リアルタイムなログ収集では常識となった fluentd のバッチ版のようなツールで、ファイルやデータベースからデータを吸い出し、別のストレージやデータベースにロードするためのコンパクトなツールです。

とのことです。( 並列データ転送ツール『Embulk』リリース! - Blog by Sadayuki Furuhashi に記載あり)

fluentdのようにプラグインを追加することで様々なデータソースに対応することができます。

Embulkのセットアップ

インストールは↓に記載があります。Javaで開発されているため、Javaが必要です。 GitHub - embulk/embulk: Embulk: Pluggable Bulk Data Loader. http://www.embulk.org

MySQL→Embulk→BigQuery という流れになるので、inがMySQL、outがBigQueryになります。それぞれプラグインが存在しているのでそれを利用します。

それぞれ、

embulk-input-mysql

https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql

embulk-out-bugquery

GitHub - embulk/embulk-output-bigquery: Embulk output plugin to load/insert data into Google BigQuery

を利用します。

インストールは、

embulk gem install embulk-out-bigquery

のように行えます。

embulk-input-mysql

embulkを実行するサーバからMySQLに接続できるようになっている必要があります。 inputプラグインなので、inセクションに記述します。 下記のように設定します。

 in:
   type: mysql
   user: "MySQLのユーザー"
   password: "MySQLのパスワード"
   database: "データベース名"
   table: "テーブル名"
   host: "MySQLのホスト"
   select: "セレクトするカラム(カンマ区切り)"
   parser:
    type: json

parserがjsonになっているのは、BigQueryに転送するときにこちらの方が都合がよいからです。(デフォルトはCSV) CSVの場合、データ内に改行コードが含まれている場合の考慮が必要なのですが、jsonであればエスケープされるため楽になります。BigQueryにCSVをimportするときは行の区切りがCRLFでないといけないようで、データ内にCRLFが存在していた場合、CSVだとうまく取り込むことができませんでした(しっかり検証したわけではないので、設定次第では取り込めるかもしれません) embulk-input-mysql のオプションでwhere句を追加したり、select後にupdate文を実行することもできます。

embulk-output-bigquery

embulkを実行するサーバからBigQueryに接続できるようになっている必要があります。 outputプラグインなので、outセクションに記述します。 下記のように設定します。

out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: "json_keyのパス"
  path_prefix: tmp/
  file_ext: .jsonl.gz
  source_format: NEWLINE_DELIMITED_JSON
  project: "プロジェクトID"
  dataset: "データセット名"
  auto_create_table: true
  table: "テーブル名"
  schema_file: "スキーマファイルのパス"
  formatter: {type: jsonl}
  encoders:
  - {type: gzip}

modeをreplaceにすると、BigQuery側のテーブルをクリアして再生成されます。毎回全件を転送する想定だったのでreplaceにしてあります。 replace以外にもいくつかあります( https://github.com/embulk/embulk-output-bigquery#mode )が、ログデータでなければ追記していく場合はあまりないと思うので、基本的にはreplaceでよいと思います。 前述の通り、MySQLから取得したデータはjsonにしているので、formatterとsource_formatをJSONに設定します。 また、BigQuery側テーブルのスキーマを定義するファイルを別途用意する必要があります。

[
  {
    "name": "id",
    "type": "INTEGER"
  },
 ....
]

のようにカラム名と型を記述していきます。ここのnameとinput-mysqlのselect オプションで指定したカラム名は一致している必要があるようです。

ここまでで、 in/out 共に設定が終わっているので、embulkを実行するとMySQLからデータを取得し、BigQueryにデータが転送されますと思います。

設定ファイルの共通化

MySQLのテーブルごとに設定ファイルを記述していくのですが、上記の設定方法では、MySQLへの接続情報、BigQUeryへの接続情報など重複する部分が出てきます。 Embulkの設定ファイルでは実はLiquidというテンプレートエンジン( – Liquid template language ) が利用できるのでそれを利用して共通化します。

注意: 0.8.28, 0.8.29では、includeができなくなっているようです( Embulk 0.8.28 and 0.8.29 doesn't work Liquid {% include %}. · Issue #757 · embulk/embulk · GitHub)。0.8.18を利用しましょう。embulk selfupdate 0.8.18 でダウングレードすることできます。

↓にあるように、includeを利用して共通部分を別ファイルに切り出すことが可能です。 qiita.com 僕の場合は、

in:
  type: mysql
  user: "ユーザ名"
  password: "パスワード"
  database: "データベース名"
  table: {{ table_name }}
  host: "ホスト"
  select: {{ select_columns }}
  parser:
   type: json
out:
  type: bigquery
  mode: replace
  auth_method: json_key
  json_keyfile: "keyfileのパス"
  path_prefix: tmp/
  file_ext: .jsonl.gz
  source_format: NEWLINE_DELIMITED_JSON
  project: "プロジェクト名"
  dataset: "データセット名" 
  auto_create_table: true 
  table: {{ bq_table_name }}
  schema_file: bq_schemas/{{ bq_schema_name }}.json
  formatter: {type: jsonl}
  encoders: 
  - {type: gzip}

という共通で利用する設定ファイルを作成し、各設定ファイルで下記のようにincludeして利用するようにしました

{% include 'partials/mysql2bq',
   table_name: 'MySQLのテーブル名',
   select_columns: "セレクトするカラム",
   bq_table_name: 'BigQueryのテーブル名',
   bq_schema_name: 'スキーマファイルのパス'
  %}

ステージング環境、本番環境向けの接続設定の切り替えもLiquidを利用して分岐することもできるようなのですが、僕の場合はそこはchefでやることにしました。yamlファイルを生成すればよいだけなのでいろいろやり方はあると思います。 twitterで聞いてみたところyaml_masterもよく使われているみたいです。

特定のディレクトリの設定ファイルを一括で実行する

embulk の runコマンドにはワイルドカードを指定できなかったので↓のようにしました

ls configs/*.yml.liquid | xargs -I {} embulk run {}

まとめ

プラグインインストールしてちょっと設定するだけで動くのでよい感じでした。 input-mysql, output-bigquery 共にいろいろオプションがあるのですが、数万件程度のテーブルをBigQueryに転送する用途であればこのままで使えています。 まだ分析がきちんと立ち上がっているわけではないので、cronの定期実行&失敗はcronのログで検知で進めてますが、今後はDigDag,Airflowなどのワークフローエンジンの導入を検討しようと思ってます あと、他にはAWSのDataPipelineも検討したのですが、GUIが難解でしかも重すぎるので挫折しました。。

RailsからRDSを利用するときの設定

RailsからRDSを利用する場合、そのままコネクションプーリングを行うと、RDSのfailover時に利用していないサーバに接続してしまいます。

コネクションプーリングを止めてる事例( http://blog.livedoor.jp/sonots/archives/38797925.html とか ) がいくつか見つかりますが、ここで利用されているgem ( https://github.com/sonots/activerecord-refresh_connection ) は、マルチスレッドに対応していないため、アプリケーションサーバにpumaを利用している場合、このgemを使うことはできません。

そこで、なんとかできないかなーと調べてみたところ、ActiveRecordにreaping_frequencyという設定があるのを発見しました。
rails/connection_pool.rb at 9337f646902955a76dd1975b1201032042ed5004 · rails/rails · GitHub
この設定を行うと、設定した値(秒)を過ぎたコネクションはプールから削除されるようになります。
database.ymlに↓を追記すればokです。

  reaping_frequency: 10

設定してみたところサーバに接続がない状態だと、DBへの接続数が減っていることを確認できました。
RDSがfailoverしてendpointのIPが変わった場合でも、しばらく待てば切り替わるはずです。
ほとんどの場合これでなんとかなるんじゃないでしょうか。

Rails5.1からidカラムがbigintになるのでその対応

Rails5.1からidカラムがbigintになるのでその対応メモ。
↓は対応した人のエントリです。bigintが必要になってから変更するのでは間に合わないので最初から設定しちゃおうみたいな感じですね。
Rails 5.1: Default Primary Keys Are Now BIGINT

このPR(Restore the behaviour of the compatibility layer for integer-like PKs by kamipo · Pull Request #27389 · rails/rails · GitHub) にもあるように互換性には気を使って開発してくれていますが、schema.rbは migrate:reset タスクを実施するなどして再生成する必要があるみたいです。

schema.rbが下記のようになっていたとして、

create_table "users" do |t|
  t.string   "name",       null: false

create_table "addresses" do |t|
  t.integer  "user_id"
  t.string   "name",       null: false

 add_foreign_key "addresses", "users"

5.0以前では問題なくテーブルの作成と外部キーの設定が行えていましたが、5.1ではエラーになります。
それはusers,addressテーブルのidカラムがbigintになりますが、外部キーを設定しようとしているaddressesテーブルのuser_idカラムはint, usersテーブルのidカラムはbigintと差異が出てしまうからです。
migrateファイルが正しく管理されている場合は、 migrate:resetタスクを実行することで、Rails5.1でも以前のDB定義を維持することができるschema.rbを出力することができます。

再生成すると以下のようになります。

create_table "users", id: :integer  do |t|
  t.string   "name",       null: false

create_table "addresses", id: :integer do |t|
  t.integer  "user_id"
  t.string   "name",       null: false

 add_foreign_key "addresses", "users"

id: :integerが各テーブルに付与され、エラーが解決されます。

schema.rbを以前のDB定義を維持できるように変更し、そのあとで、必要に応じて各テーブルのidカラムをbigintにしていくのが良いんじゃないかなーと思います。

pm2でhubotを起動するとき

pm2でhubotを起動する場合、

 pm2 start -f bin/hubot -x --name botname -- -a adapter

のように紹介されている記事が多かったのだけど、bin/hubotがJavascriptとして解釈されてしまいうまくいかない。

 pm2 start -f bin/hubot.sh -x --name botname -- -a adapter

としてみると、無事起動。

拡張子で判断しているみたいだったので、ソースを見てみるとそれっぽい記述を見つけたのでたぶんそうなんだと思う

https://github.com/Unitech/pm2/blob/1c3fcb83b65e430a5734d576610d27cc7a0bf4fd/lib/Common.js#L355

  var noInterpreter     = (!app.exec_interpreter || 'none' == app.exec_interpreter),
      extName           = path.extname(app.pm_exec_path),
      betterInterpreter = extItps[extName];

のextItpsはintpreter.jsonで定義されていて以下のような内容になっている
https://github.com/Unitech/pm2/blob/3f5641a67d13b746b861dd2a491206df2eb50cca/lib/API/interpreter.json

拡張子がshだとbashで解釈される。何もないとnodeになる様子。

sprockets-commonerについて

sprocket-commonerというgemがよい感じだったので調べてみました。

sprocket-commonerで何ができるのかは下記qiitaに詳しくかいてあって、僕もこの記事で知りました。
qiita.com

結局のところ
・ES6構文とmoduleのimport/export
・npmモジュール
の2つが使えればよいと思っていたので、ちょうどよい感じです。

sprocket-commonerがなにをやっているのかをざっと眺めてみました。

sprockets-commoner/commoner.rb at master · Shopify/sprockets-commoner · GitHub

module Sprockets
  module Commoner
    def self.sprockets4?
      @@sprockets4 ||= Gem::Version.new(Sprockets::VERSION) >= Gem::Version.new('4.0.0.beta')
    end
  end

  register_postprocessor 'application/javascript', ::Sprockets::Commoner::Processor
  register_transformer 'application/json', 'application/javascript', ::Sprockets::Commoner::JSONProcessor
  register_bundle_metadata_reducer 'application/javascript', :commoner_enabled, false, :|
  register_bundle_metadata_reducer 'application/javascript', :commoner_required, Set.new, :+
  register_bundle_metadata_reducer 'application/javascript', :commoner_used_helpers, Set.new, :+
  register_bundle_processor 'application/javascript', ::Sprockets::Commoner::Bundle
  register_dependency_resolver 'commoner-environment-variable' do |env, str|
    _, variable = str.split(':', 2)
    ENV[variable]
  end
end

Sprockets::Commner::Processor が何かやっていそうです

sprockets-commoner/processor.rb at master · Shopify/sprockets-commoner · GitHub

Schmooze::Baseというクラスを継承しているのですが、
Schmoozeというgemは何をやっているかというと
schmooze/base.rb at master · Shopify/schmooze · GitHub

      def spawn_process
        process_data = Open3.popen3(
          @_schmooze_env,
          'node',
          '-e',
          @_schmooze_code,
          chdir: @_schmooze_root
        )
        ensure_packages_are_initiated(*process_data)
        ObjectSpace.define_finalizer(self, self.class.send(:finalize, *process_data))
        @_schmooze_stdin, @_schmooze_stdout, @_schmooze_stderr, @_schmooze_process_thread = process_data
      end

となっていて、子プロセスでnodeを起動させてそこでJSを実行しているようです。

Sprocketのcompile時に子プロセスでnode.jsを起動して、JSをトランスコンパイルしています。
ExecJSへの依存はありません。

importの辺りはSprocket::Commonerr::Bundleとbabel-plugin-sprockets-commoner-internalで頑張っているみたい。


まとめ

  • Sprocketのcompile時に子プロセスでnode.jsを起動して、JSをトランスコンパイルしてる

 特別なことをせずにnpmモジュールが使える

  • ExecJSへの依存はない

逆にいうと、ExecJSではないので、rubyracerなどは使えない。
  ファイルずつnode起動していくので、パフォーマンスの懸念はあるみたいです。
github.com

  • coffeescriptのimportなど特別な機能を使わなければロックインされることはない

Rails5.1から、SprocketにES6、npmモジュールへの対応が入るし、Webpackerも出るので、5.1が出るまではsprocket-commoner、5.1がリリースされたらRailsデフォルトに戻るのがよいんじゃないかなーと思ってます。

sudoするとdocker-composeがcommand not found

現象:
docker-composeをインストールしているとき、sudo したときだけコマンドが見つからない。

原因:
visudo で編集できる secure_path に/usr/local/bin が含まれていない。
docker-composeは /usr/local/bin 配下にインストールされる

対応:

Defaults    secure_path = /sbin:/bin:/usr/sbin:/usr/bin

に/usr/local/binを追加

Defaults    secure_path = /sbin:/bin:/usr/sbin:/usr/local/bin:/usr/bin

何回もやって都度忘れてたのでかいとく

ELB作成時にSSL証明書インストールでエラー

コンソールから

Failed to create load balancer: Server Certificate not found for the key: 

ELBの作成とSSL証明書のインストールを同時にはできないらしい。(コマンドからはできるっぽい?)
一度作成したあとにSSLの設定を追加すればok。