MySQLのテーブルをEmbulkを利用してBigQueryに転送する
MySQLのデータをBigQueryに転送する方法はいくつかありますが、Embulkを利用してMySQLから直接データを取得してBigQueryに流し込んでみました。 別途Fluentdで集めているアクセスログとRDBのデータを突き合わせて分析するのが目的です。
Embulkとは
TreasureData製のDataLoaderです。
Embulk は、リアルタイムなログ収集では常識となった fluentd のバッチ版のようなツールで、ファイルやデータベースからデータを吸い出し、別のストレージやデータベースにロードするためのコンパクトなツールです。
とのことです。( 並列データ転送ツール『Embulk』リリース! - Blog by Sadayuki Furuhashi に記載あり)
fluentdのようにプラグインを追加することで様々なデータソースに対応することができます。
Embulkのセットアップ
インストールは↓に記載があります。Javaで開発されているため、Javaが必要です。 GitHub - embulk/embulk: Embulk: Pluggable Bulk Data Loader. http://www.embulk.org
MySQL→Embulk→BigQuery という流れになるので、inがMySQL、outがBigQueryになります。それぞれプラグインが存在しているのでそれを利用します。
それぞれ、
embulk-input-mysql
https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql
embulk-out-bugquery
を利用します。
インストールは、
embulk gem install embulk-out-bigquery
のように行えます。
embulk-input-mysql
embulkを実行するサーバからMySQLに接続できるようになっている必要があります。 inputプラグインなので、inセクションに記述します。 下記のように設定します。
in: type: mysql user: "MySQLのユーザー" password: "MySQLのパスワード" database: "データベース名" table: "テーブル名" host: "MySQLのホスト" select: "セレクトするカラム(カンマ区切り)" parser: type: json
parserがjsonになっているのは、BigQueryに転送するときにこちらの方が都合がよいからです。(デフォルトはCSV) CSVの場合、データ内に改行コードが含まれている場合の考慮が必要なのですが、jsonであればエスケープされるため楽になります。BigQueryにCSVをimportするときは行の区切りがCRLFでないといけないようで、データ内にCRLFが存在していた場合、CSVだとうまく取り込むことができませんでした(しっかり検証したわけではないので、設定次第では取り込めるかもしれません) embulk-input-mysql のオプションでwhere句を追加したり、select後にupdate文を実行することもできます。
embulk-output-bigquery
embulkを実行するサーバからBigQueryに接続できるようになっている必要があります。 outputプラグインなので、outセクションに記述します。 下記のように設定します。
out: type: bigquery mode: replace auth_method: json_key json_keyfile: "json_keyのパス" path_prefix: tmp/ file_ext: .jsonl.gz source_format: NEWLINE_DELIMITED_JSON project: "プロジェクトID" dataset: "データセット名" auto_create_table: true table: "テーブル名" schema_file: "スキーマファイルのパス" formatter: {type: jsonl} encoders: - {type: gzip}
modeをreplaceにすると、BigQuery側のテーブルをクリアして再生成されます。毎回全件を転送する想定だったのでreplaceにしてあります。 replace以外にもいくつかあります( https://github.com/embulk/embulk-output-bigquery#mode )が、ログデータでなければ追記していく場合はあまりないと思うので、基本的にはreplaceでよいと思います。 前述の通り、MySQLから取得したデータはjsonにしているので、formatterとsource_formatをJSONに設定します。 また、BigQuery側テーブルのスキーマを定義するファイルを別途用意する必要があります。
[ { "name": "id", "type": "INTEGER" }, .... ]
のようにカラム名と型を記述していきます。ここのnameとinput-mysqlのselect オプションで指定したカラム名は一致している必要があるようです。
ここまでで、 in/out 共に設定が終わっているので、embulkを実行するとMySQLからデータを取得し、BigQueryにデータが転送されますと思います。
設定ファイルの共通化
MySQLのテーブルごとに設定ファイルを記述していくのですが、上記の設定方法では、MySQLへの接続情報、BigQUeryへの接続情報など重複する部分が出てきます。 Embulkの設定ファイルでは実はLiquidというテンプレートエンジン( – Liquid template language ) が利用できるのでそれを利用して共通化します。
注意: 0.8.28, 0.8.29では、includeができなくなっているようです( Embulk 0.8.28 and 0.8.29 doesn't work Liquid {% include %}. · Issue #757 · embulk/embulk · GitHub)。0.8.18を利用しましょう。embulk selfupdate 0.8.18 でダウングレードすることできます。→ 0.8.30 で解消されました
↓にあるように、includeを利用して共通部分を別ファイルに切り出すことが可能です。 qiita.com 僕の場合は、
in: type: mysql user: "ユーザ名" password: "パスワード" database: "データベース名" table: {{ table_name }} host: "ホスト" select: {{ select_columns }} parser: type: json out: type: bigquery mode: replace auth_method: json_key json_keyfile: "keyfileのパス" path_prefix: tmp/ file_ext: .jsonl.gz source_format: NEWLINE_DELIMITED_JSON project: "プロジェクト名" dataset: "データセット名" auto_create_table: true table: {{ bq_table_name }} schema_file: bq_schemas/{{ bq_schema_name }}.json formatter: {type: jsonl} encoders: - {type: gzip}
という共通で利用する設定ファイルを作成し、各設定ファイルで下記のようにincludeして利用するようにしました
{% include 'partials/mysql2bq', table_name: 'MySQLのテーブル名', select_columns: "セレクトするカラム", bq_table_name: 'BigQueryのテーブル名', bq_schema_name: 'スキーマファイルのパス' %}
ステージング環境、本番環境向けの接続設定の切り替えもLiquidを利用して分岐することもできるようなのですが、僕の場合はそこはchefでやることにしました。yamlファイルを生成すればよいだけなのでいろいろやり方はあると思います。 twitterで聞いてみたところyaml_masterもよく使われているみたいです。
皆さんyaml_masterなどのツールをつかってそのようにされているようです。https://t.co/GuS56DRHjt あとはhttps://t.co/DWu3RAOXNT と連携とかですかね。
— hiroyuki sato (@hiroysato) 2017年8月8日
特定のディレクトリの設定ファイルを一括で実行する
embulk の runコマンドにはワイルドカードを指定できなかったので↓のようにしました
ls configs/*.yml.liquid | xargs -I {} embulk run {}
まとめ
プラグインインストールしてちょっと設定するだけで動くのでよい感じでした。 input-mysql, output-bigquery 共にいろいろオプションがあるのですが、数万件程度のテーブルをBigQueryに転送する用途であればこのままで使えています。 まだ分析がきちんと立ち上がっているわけではないので、cronの定期実行&失敗はcronのログで検知で進めてますが、今後はDigDag,Airflowなどのワークフローエンジンの導入を検討しようと思ってます あと、他にはAWSのDataPipelineも検討したのですが、GUIが難解でしかも重すぎるので挫折しました。。
RailsからRDSを利用するときの設定
RailsからRDSを利用する場合、そのままコネクションプーリングを行うと、RDSのfailover時に利用していないサーバに接続してしまいます。
コネクションプーリングを止めてる事例( http://blog.livedoor.jp/sonots/archives/38797925.html とか ) がいくつか見つかりますが、ここで利用されているgem ( https://github.com/sonots/activerecord-refresh_connection ) は、マルチスレッドに対応していないため、アプリケーションサーバにpumaを利用している場合、このgemを使うことはできません。
そこで、なんとかできないかなーと調べてみたところ、ActiveRecordにreaping_frequencyという設定があるのを発見しました。
rails/connection_pool.rb at 9337f646902955a76dd1975b1201032042ed5004 · rails/rails · GitHub
この設定を行うと、設定した値(秒)を過ぎたコネクションはプールから削除されるようになります。
database.ymlに↓を追記すればokです。
reaping_frequency: 10
設定してみたところサーバに接続がない状態だと、DBへの接続数が減っていることを確認できました。
RDSがfailoverしてendpointのIPが変わった場合でも、しばらく待てば切り替わるはずです。
ほとんどの場合これでなんとかなるんじゃないでしょうか。
Rails5.1からidカラムがbigintになるのでその対応
Rails5.1からidカラムがbigintになるのでその対応メモ。
↓は対応した人のエントリです。bigintが必要になってから変更するのでは間に合わないので最初から設定しちゃおうみたいな感じですね。
Rails 5.1: Default Primary Keys Are Now BIGINT
このPR(Restore the behaviour of the compatibility layer for integer-like PKs by kamipo · Pull Request #27389 · rails/rails · GitHub) にもあるように互換性には気を使って開発してくれていますが、schema.rbは migrate:reset タスクを実施するなどして再生成する必要があるみたいです。
schema.rbが下記のようになっていたとして、
create_table "users" do |t| t.string "name", null: false create_table "addresses" do |t| t.integer "user_id" t.string "name", null: false add_foreign_key "addresses", "users"
5.0以前では問題なくテーブルの作成と外部キーの設定が行えていましたが、5.1ではエラーになります。
それはusers,addressテーブルのidカラムがbigintになりますが、外部キーを設定しようとしているaddressesテーブルのuser_idカラムはint, usersテーブルのidカラムはbigintと差異が出てしまうからです。
migrateファイルが正しく管理されている場合は、 migrate:resetタスクを実行することで、Rails5.1でも以前のDB定義を維持することができるschema.rbを出力することができます。
再生成すると以下のようになります。
create_table "users", id: :integer do |t| t.string "name", null: false create_table "addresses", id: :integer do |t| t.integer "user_id" t.string "name", null: false add_foreign_key "addresses", "users"
id: :integerが各テーブルに付与され、エラーが解決されます。
schema.rbを以前のDB定義を維持できるように変更し、そのあとで、必要に応じて各テーブルのidカラムをbigintにしていくのが良いんじゃないかなーと思います。
pm2でhubotを起動するとき
pm2でhubotを起動する場合、
pm2 start -f bin/hubot -x --name botname -- -a adapter
のように紹介されている記事が多かったのだけど、bin/hubotがJavascriptとして解釈されてしまいうまくいかない。
pm2 start -f bin/hubot.sh -x --name botname -- -a adapter
としてみると、無事起動。
拡張子で判断しているみたいだったので、ソースを見てみるとそれっぽい記述を見つけたのでたぶんそうなんだと思う
https://github.com/Unitech/pm2/blob/1c3fcb83b65e430a5734d576610d27cc7a0bf4fd/lib/Common.js#L355
var noInterpreter = (!app.exec_interpreter || 'none' == app.exec_interpreter), extName = path.extname(app.pm_exec_path), betterInterpreter = extItps[extName];
のextItpsはintpreter.jsonで定義されていて以下のような内容になっている
https://github.com/Unitech/pm2/blob/3f5641a67d13b746b861dd2a491206df2eb50cca/lib/API/interpreter.json
拡張子がshだとbashで解釈される。何もないとnodeになる様子。
sprockets-commonerについて
sprocket-commonerというgemがよい感じだったので調べてみました。
sprocket-commonerで何ができるのかは下記qiitaに詳しくかいてあって、僕もこの記事で知りました。
qiita.com
結局のところ
・ES6構文とmoduleのimport/export
・npmモジュール
の2つが使えればよいと思っていたので、ちょうどよい感じです。
sprocket-commonerがなにをやっているのかをざっと眺めてみました。
sprockets-commoner/commoner.rb at master · Shopify/sprockets-commoner · GitHub
module Sprockets module Commoner def self.sprockets4? @@sprockets4 ||= Gem::Version.new(Sprockets::VERSION) >= Gem::Version.new('4.0.0.beta') end end register_postprocessor 'application/javascript', ::Sprockets::Commoner::Processor register_transformer 'application/json', 'application/javascript', ::Sprockets::Commoner::JSONProcessor register_bundle_metadata_reducer 'application/javascript', :commoner_enabled, false, :| register_bundle_metadata_reducer 'application/javascript', :commoner_required, Set.new, :+ register_bundle_metadata_reducer 'application/javascript', :commoner_used_helpers, Set.new, :+ register_bundle_processor 'application/javascript', ::Sprockets::Commoner::Bundle register_dependency_resolver 'commoner-environment-variable' do |env, str| _, variable = str.split(':', 2) ENV[variable] end end
Sprockets::Commner::Processor が何かやっていそうです
sprockets-commoner/processor.rb at master · Shopify/sprockets-commoner · GitHub
Schmooze::Baseというクラスを継承しているのですが、
Schmoozeというgemは何をやっているかというと
schmooze/base.rb at master · Shopify/schmooze · GitHub
def spawn_process process_data = Open3.popen3( @_schmooze_env, 'node', '-e', @_schmooze_code, chdir: @_schmooze_root ) ensure_packages_are_initiated(*process_data) ObjectSpace.define_finalizer(self, self.class.send(:finalize, *process_data)) @_schmooze_stdin, @_schmooze_stdout, @_schmooze_stderr, @_schmooze_process_thread = process_data end
となっていて、子プロセスでnodeを起動させてそこでJSを実行しているようです。
Sprocketのcompile時に子プロセスでnode.jsを起動して、JSをトランスコンパイルしています。
ExecJSへの依存はありません。
importの辺りはSprocket::Commonerr::Bundleとbabel-plugin-sprockets-commoner-internalで頑張っているみたい。
まとめ
- Sprocketのcompile時に子プロセスでnode.jsを起動して、JSをトランスコンパイルしてる
特別なことをせずにnpmモジュールが使える
- ExecJSへの依存はない
逆にいうと、ExecJSではないので、rubyracerなどは使えない。
ファイルずつnode起動していくので、パフォーマンスの懸念はあるみたいです。
github.com
- coffeescriptのimportなど特別な機能を使わなければロックインされることはない
Rails5.1から、SprocketにES6、npmモジュールへの対応が入るし、Webpackerも出るので、5.1が出るまではsprocket-commoner、5.1がリリースされたらRailsデフォルトに戻るのがよいんじゃないかなーと思ってます。
sudoするとdocker-composeがcommand not found
現象:
docker-composeをインストールしているとき、sudo したときだけコマンドが見つからない。
原因:
visudo で編集できる secure_path に/usr/local/bin が含まれていない。
docker-composeは /usr/local/bin 配下にインストールされる
対応:
Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/bin
に/usr/local/binを追加
Defaults secure_path = /sbin:/bin:/usr/sbin:/usr/local/bin:/usr/bin
何回もやって都度忘れてたのでかいとく