2015年4月15日水曜日

Embulkを使ってJSONデータをGoogle BigQueryに投入する

erokism Follow Bulk? paste up in Manchester https://www.flickr.com/photos/10295270@N05/4163403080
Embulkのアウトプット・プラグインであるembulk-output-bigqueryを使って、JSONデータをGoogle BigQueryに投入します。

JSONのサンプルデータを用意。
# emacs /tmp/json_sample.json
{ "first_name":"John", "last_name":"Lennon", "age":20 }
{ "first_name":"Paul", "last_name":"Maccartney", "age":22 }
Google BigQueryのスキーマファイルを用意。
# emacs /tmp/schema.json
[
  {"name":"first_name","mode":"REQUIRED","type":"STRING"},
  {"name":"last_name","mode":"REQUIRED","type":"STRING"},
  {"name":"age","mode":"REQUIRED","type":"INTEGER"}
]
Javaをインストール。
# yum install -y java-1.8.0-openjdk
Embulkをインストール。
# curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
# chmod +x ~/.embulk/bin/embulk
# echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
# source ~/.bashrc
# embulk --version
embulk 0.6.2
Embulkプラグインをインストール。
# embulk gem install embulk-parser-jsonl
# embulk gem install embulk-output-bigquery
# embulk gem install embulk-formatter-jsonl
Embulkの設定ファイルを用意。
service_account_email、project、path_prefix、datasetにGoogle BigQueryのアカウント情報を記述します。
テーブルは自動生成オプションを有効にして、テーブルの末尾に _%Y%m%d を付けるようにしています。
# emacs config.yml
exec: {}
in:
  type: file
  path_prefix: /tmp/json_
  parser:
    type: jsonl
    root: $.students
    schema:
      - {name: first_name, type: string}
      - {name: last_name, type: string}
      - {name: age, type: long}
out:
  type: bigquery
  service_account_email: your_id@developer.gserviceaccount.com
  project: your-project-000
  p12_keyfile_path: /path/to/p12_keyfile.p12
  dataset: my_dataset
  path_prefix: /var/tmp/sample
  source_format: NEWLINE_DELIMITED_JSON
  file_ext: .json.gz
  delete_from_local_when_job_end: 1
  auto_create_table: 1
  schema_path: /tmp/schema.json
  table: students_%Y%m%d
  formatter:
    type: jsonl
  encoders:
    - {type: gzip}
Embulkインプットのプレヴューを実行。
# embulk preview config.yml
2015-04-15 15:29:36.316 +0900: Embulk v0.6.2
2015-04-15 15:29:37.649 +0900 [INFO] (preview): Listing local files at directory '/tmp' filtering filename by prefix 'json_'
2015-04-15 15:29:37.654 +0900 [INFO] (preview): Loading files [/tmp/json_sample.json]
+-------------------+------------------+----------+
| first_name:string | last_name:string | age:long |
+-------------------+------------------+----------+
|              John |           Lennon |       20 |
|              Paul |       Maccartney |       22 |
+-------------------+------------------+----------+
Embulkを実行。
# embulk run config.yml
2015-04-15 06:35:03.544 +0000: Embulk v0.6.2
2015-04-15 06:35:05.422 +0000 [INFO] (transaction): Listing local files at directory '/tmp' filtering filename by prefix 'json_'
2015-04-15 06:35:05.426 +0000 [INFO] (transaction): Loading files [/tmp/json_sample.json]
2015-04-15 06:35:06.752 +0000 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-04-15 06:35:07.027 +0000 [INFO] (task-0000): Writing file [/var/tmp/sample.000.00.json.gz]
2015-04-15 06:35:07.044 +0000 [INFO] (task-0000): Job preparing... project:your-project-000 dataset:my_dataset table:students_20150415
2015-04-15 06:35:07.049 +0000 [INFO] (task-0000): table:[students_20150415] will be create if not exists
2015-04-15 06:35:07.054 +0000 [INFO] (task-0000): Upload start [/var/tmp/sample.000.00.json.gz]
2015-04-15 06:35:11.120 +0000 [INFO] (task-0000): Upload completed [/var/tmp/sample.000.00.json.gz]
2015-04-15 06:35:11.125 +0000 [INFO] (task-0000): Job executed. job id:[job_kRqcNXo8EXrCneT9h3cCUpR67lQ] file:[/var/tmp/sample.000.00.json.gz]
2015-04-15 06:35:11.487 +0000 [INFO] (task-0000): Checking job status... job id:[job_kRqcNXo8EXrCneT9h3cCUpR67lQ] elapsed_time:362ms status:[PENDING]
2015-04-15 06:35:21.766 +0000 [INFO] (task-0000): Checking job status... job id:[job_kRqcNXo8EXrCneT9h3cCUpR67lQ] elapsed_time:10641ms status:[PENDING]
2015-04-15 06:35:32.112 +0000 [INFO] (task-0000): Checking job status... job id:[job_kRqcNXo8EXrCneT9h3cCUpR67lQ] elapsed_time:20987ms status:[RUNNING]
2015-04-15 06:35:42.351 +0000 [INFO] (task-0000): Job statistics [{"inputFileBytes":"83","inputFiles":"1","outputBytes":"48","outputRows":"2"}]
2015-04-15 06:35:42.352 +0000 [INFO] (task-0000): Job completed successfully. job id:[job_kRqcNXo8EXrCneT9h3cCUpR67lQ] elapsed_time:31227ms status:[SUCCESS]
2015-04-15 06:35:42.352 +0000 [INFO] (task-0000): Delete local file [/var/tmp/sample.000.00.json.gz]
2015-04-15 06:35:42.352 +0000 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-15 06:35:42.367 +0000 [INFO] (main): Committed.
2015-04-15 06:35:42.367 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/tmp/json_sample.json"},"out":{}}
Embulkの処理が完了しました。
確認してみましょう。


JSONデータがGoogle BigQueryに投入されました。

続いて大量データを投入してみます。
環境は、EC2のc4.large(Amazon Linux AMI 2015.03 (HVM), SSD Volume Type)です。
1億行のJSONを用意しました。ファイルサイズは5.7GBです。
実行結果は以下のようになりました。
2015-04-15 07:33:04.426 +0000: Embulk v0.6.2
2015-04-15 07:33:06.352 +0000 [INFO] (transaction): Listing local files at directory '/tmp' filtering filename by prefix 'json_'
2015-04-15 07:33:06.357 +0000 [INFO] (transaction): Loading files [/tmp/json_sample.json]
2015-04-15 07:33:07.632 +0000 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-04-15 07:33:08.036 +0000 [INFO] (task-0000): Writing file [/var/tmp/sample.000.00.json.gz]
2015-04-15 07:57:19.158 +0000 [INFO] (task-0000): Job preparing... project:your-project-000 dataset:my_dataset table:students_20150415
2015-04-15 07:57:19.164 +0000 [INFO] (task-0000): table:[students_20150415] will be create if not exists
2015-04-15 07:57:19.171 +0000 [INFO] (task-0000): Upload start [/var/tmp/sample.000.00.json.gz]
2015-04-15 08:00:38.361 +0000 [INFO] (task-0000): Upload completed [/var/tmp/sample.000.00.json.gz]
2015-04-15 08:00:38.366 +0000 [INFO] (task-0000): Job executed. job id:[job_LoZky6tlYHA0hKEnT279_Ytni5c] file:[/var/tmp/sample.000.00.json.gz]
2015-04-15 08:00:38.620 +0000 [INFO] (task-0000): Checking job status... job id:[job_LoZky6tlYHA0hKEnT279_Ytni5c] elapsed_time:254ms status:[PENDING]
中略
2015-04-15 08:07:07.867 +0000 [INFO] (task-0000): Job completed successfully. job id:[job_LoZky6tlYHA0hKEnT279_Ytni5c] elapsed_time:389501ms status:[SUCCESS]
2015-04-15 08:07:07.867 +0000 [INFO] (task-0000): Delete local file [/var/tmp/sample.000.00.json.gz]
2015-04-15 08:07:07.894 +0000 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-15 08:07:07.911 +0000 [INFO] (main): Committed.
2015-04-15 08:07:07.911 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/tmp/json_sample.json"},"out":{}}

real 34m7.619s
user 24m33.712s
sys 0m38.524s
バッファ用のファイル作成に時間がかかっています。開始から完了まで34分でした。

まとめ
Embulkを使ってJSONデータをGoogle BigQueryに投入するまでの流れを追ってみました。
コマンドひとつでデータを投入できるのが良いです。
Google BigQueryはストリームでのインポートも可能ですが、ストリーム処理にかかる料金が必要だったり、APIが不安定で、データの投入失敗・重複などデメリットがあります。より正確なデータの投入を目指すのならば、Embulkは選択肢のひとつになると思います。
https://cloud.google.com/bigquery/quota-policy
実際に運用するときには、Quota Policyの制限に注意しましょう。