Embulkを使ってMySQLからGoogle BigQueryにデータをロードしてみる

2015-03-19


※注意 この記事はembulk-output-bigquery v0.2.x以前に基づいて書かれています。 v0.3.0で大幅な変更が加わっているためv0.2.xで使っていたconfig.ymlはそのままでは使えません。 主な変更点は

  • JavaプラグインからJRubyベースのプラグインに変わりました。それまでよりpull-requestは送りやすくなったと思うので早く送るん(ry
  • formattersのブロックの指定が不要になりました。(FileOutputプラグインからOutputプラグインに変わったため)
  • ローカルにGZIP圧縮されたファイルを吐くために必要だったencodersのブロックの指定が不要になりました。(理由は同上)。代わりにcompression: GZIPを指定してください。

その他の変更点についてはCHANGELOGを参照してください。

4/1追記 v0.1.2で動的テーブル作成に対応したので一部内容を変更しています

この前リリースしたEmbulkのGoogle BigQuery向けアウトプットプラグインembulk-output-bigqueryを使ってMySQL→BigQueryへデータをバルクロードする動作デモになります。

Embulk自体についてはオープンソースのバルクデータローダー「Embulk」登場。fluentdのバッチ版、トレジャーデータが支援 - Publickeyとか読んでみてください。

  1. embulk-input-randomでダミーデータの作成&embulk-input-mysqlでダミーデータをMySQLへインポート
  2. embulk-output-mysqlとembulk-output-bigqueryでMySQLからBigQueryへデータをロード(CSVフォーマット)

という流れになっています。

gemのインストール

embulk本体のインストールはEmbulkのQuick Start読んで下さい。

embulk gem install embulk-input-random
embulk gem install embulk-output-bigquery
embulk gem install embulk-input-mysql
embulk gem install embulk-output-mysql

MySQLのデータベース、テーブルの作成とデータのロード

embulk-input-randomプラグインを使って10000件ほど作成します。 embulk-output-mysqlも使ってそのままMySQLに流し込んじゃいましょう。 まずはMySQLのデータベース、ユーザ、テーブルの作成から。

mysql> CREATE DATABASE embulk DEFAULT CHARACTER SET utf8;
mysql> GRANT ALL ON embulk.* TO embulk_user@"%" IDENTIFIED BY 'embulk_pass';
mysql> USE embulk;

mysql> CREATE TABLE example (\
    id bigint,\
    number bigint,\
    value1 varchar(60),\
    value2 varchar(60),\
    value3 varchar(60),\
    value4 varchar(60),\
    value5 varchar(60),\
    value6 varchar(60),\
    value7 varchar(60),\
    value8 varchar(60),\
    value9 varchar(60),\
    value10 varchar(60),\
    primary key(id)\
);

設定ファイル

# vi random_to_mysql.yml
in:
  type: random
  # 10000レコード生成、お好きな数字をどうぞ
  rows: 10000
  schema:
    id: primary_key
    number: integer
    value1: string
    value2: string
    value3: string
    value4: string
    value5: string
    value6: string
    value7: string
    value8: string
    value9: string
    value10: string
out: 
    type: mysql
    host: localhost
    database: embulk
    user: embulk_user
    password: embulk_pass
    table: example
    mode: insert

MySQLにデータをロード

embulk runで実行します。

# embulk run /path/to/random_to_mysql.yml
2015-03-19 20:04:11.957 +0900: Embulk v0.5.3
Random generation started.
2015-03-19 20:04:15.068 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS `example` (`id` BIGINT, `number` BIGINT, `value1` TEXT, `value2` TEXT, `value3` TEXT, `value4` TEXT, `value5` TEXT, `value6` TEXT, `value7` TEXT, `value8` TEXT, `value9` TEXT, `value10` TEXT)
2015-03-19 20:04:15.070 +0900 [INFO] (transaction): > 0.00 seconds
2015-03-19 20:04:15.118 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-03-19 20:04:15.159 +0900 [INFO] (task-0000): Prepared SQL: INSERT INTO `example` (`id`, `number`, `value1`, `value2`, `value3`, `value4`, `value5`, `value6`, `value7`, `value8`, `value9`, `value10`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
Random generator input thread 0...
2015-03-19 20:04:18.588 +0900 [INFO] (task-0000): Loading 10,000 rows
2015-03-19 20:04:19.718 +0900 [INFO] (task-0000): > 1.13 seconds (loaded 10,000 rows in total)
2015-03-19 20:04:19.721 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
Random input finished. Commit reports = [{"rows":10000,"columns":12}]
2015-03-19 20:04:19.777 +0900 [INFO] (main): Committed.
2015-03-19 20:04:19.777 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

数秒で完了しました。

BigQueryへのデータロード

それではembulk-input-mysqlとembulk-output-bigqueryを使ってMySQL→BigQueryへのデータロードを行います。

embulk-output-bigqueryはschema.jsonのパスを指定しての動的テーブルに対応しているのでテーブルも実行時に作成してしまいます。なおテーブルが既に存在する場合は既存テーブルに追記されます。

  • プロジェクトID: your-project-id
  • データセット名: demodatasets
  • テーブル名: demotable
  • テーブルのスキーマ(schema.json):
[
  {
    "name": "id",
    "type": "INTEGER"
  },
  {
    "name": "number",
    "type": "INTEGER"
  },
  {
    "name": "value1",
    "type": "STRING"
  },
  {
    "name": "value2",
    "type": "STRING"
  },
  {
    "name": "value3",
    "type": "STRING"
  },
  {
    "name": "value4",
    "type": "STRING"
  },
  {
    "name": "value5",
    "type": "STRING"
  },
  {
    "name": "value6",
    "type": "STRING"
  },
  {
    "name": "value7",
    "type": "STRING"
  },
  {
    "name": "value8",
    "type": "STRING"
  },
  {
    "name": "value9",
    "type": "STRING"
  },
  {
    "name": "value10",
    "type": "STRING"
  }
]

設定ファイル

このデモそのまま動かしている場合は以下の設定で問題ないはずです。 他のフォーマットのデータを試していて[Too many errors encountered. Limit is: 0.]等言われた場合はmax_bad_records: 100等と指定するとひとまずロードはできると思います。 ただしデータは欠損します。

このオプションはbqコマンドラインツールbq loadする際のオプションと同じようにエラーを何レコードまで許容するかのオプションです。 これを超えるエラーが見つかると全てロールバックされます。 デフォルトは0なので1レコードでもエラーになるとジョブ実行に失敗します。

field_delimiterも指定可能です。デフォルトは”,”です。encodingもサポートしていますがBigQueryがサポートしているのはUTF-8(デフォルト)とISO-8859-1のみなので殆どの場合は指定する必要はないはずです。 skip_leading_rowsはサポートしていません。CSVフォーマッタープラグインのheader_line: falseを使って下さい。

# vi mysql_to_bigquery.yml

in:
  type: mysql
  user: embulk_user
  password: embulk_pass
  database: embulk
  table: example
  host: localhost
  select: "*"
out:
  type: bigquery
  service_account_email: your-service-account.gserviceaccount.com
  p12_keyfile_path: /path/to/private.p12
  path_prefix: /var/tmp/sample
  # 生成するファイルはCSV
  source_format: CSV
  # 一応拡張子は.csv.gz
  file_ext: .csv.gz
  # ジョブ実行完了時にローカルファイルを削除する
  delete_from_local_when_job_end: 1
  project: your-project-id
  dataset: demodatasets
  # 実行時にテーブルを作成する
  auto_create_table: 1
  schema_path: /path/to/schema.json
  table: demotable
  formatter:
    type: csv
    #ヘッダ行は不要
    header_line: false
    timezone: Asia/Tokyo
  encoders:
  - {type: gzip}

実行

# embulk run mysql_to_bigquery.yml
2015-04-01 20:28:16.039 +0900: Embulk v0.5.3
2015-04-01 20:28:18.563 +0900 [INFO] (transaction): Fetch size is 10000. Using server-side prepared statement.
2015-04-01 20:28:20.746 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-04-01 20:28:20.847 +0900 [INFO] (task-0000): Writing file [/var/tmp/sample.000.00.csv.gz]
2015-04-01 20:28:20.877 +0900 [INFO] (task-0000): Fetch size is 10000. Using server-side prepared statement.
2015-04-01 20:28:20.909 +0900 [INFO] (task-0000): SQL: SELECT * FROM `example`
2015-04-01 20:28:20.981 +0900 [INFO] (task-0000): > 0.07 seconds
2015-04-01 20:28:21.649 +0900 [INFO] (task-0000): Fetched 500 rows.
2015-04-01 20:28:21.918 +0900 [INFO] (task-0000): Fetched 1,000 rows.
2015-04-01 20:28:22.283 +0900 [INFO] (task-0000): Fetched 2,000 rows.
2015-04-01 20:28:22.710 +0900 [INFO] (task-0000): Fetched 4,000 rows.
2015-04-01 20:28:22.983 +0900 [INFO] (task-0000): Fetched 8,000 rows.
2015-04-01 20:28:23.102 +0900 [INFO] (task-0000): Job preparing... project:your-project-id dataset:demodatasets table:demotable
2015-04-01 20:28:23.115 +0900 [INFO] (task-0000): table:[demotable] will be create if not exists
# アップロード開始
2015-04-01 20:28:23.127 +0900 [INFO] (task-0000): Upload start [/var/tmp/sample.000.00.csv.gz]
# アップロード正常終了
2015-04-01 20:28:26.047 +0900 [INFO] (task-0000): Upload completed [/var/tmp/sample.000.00.csv.gz]
2015-04-01 20:28:26.055 +0900 [INFO] (task-0000): Job executed. job id:[job_ABCDEFGHIJKLMN] file:[/var/tmp/sample.000.00.csv.gz]
# ジョブ実行ステータスのポーリング。PENDING→RUNNING→SUCCESS
2015-04-01 20:28:26.160 +0900 [INFO] (task-0000): Checking job status... job id:[job_ABCDEFGHIJKLMN] elapsed_time:105ms status:[PENDING]
2015-04-01 20:28:36.270 +0900 [INFO] (task-0000): Checking job status... job id:[job_ABCDEFGHIJKLMN] elapsed_time:10215ms status:[PENDING]
2015-04-01 20:28:46.439 +0900 [INFO] (task-0000): Checking job status... job id:[job_ABCDEFGHIJKLMN] elapsed_time:20384ms status:[RUNNING]
# 正常に完了した場合はStatisticsがINFOで表示される
# 失敗した場合はエラーとなった原因がWARNで表示される
2015-04-01 20:28:56.567 +0900 [INFO] (task-0000): Job statistics [{"inputFileBytes":"3420374","inputFiles":"1","outputBytes":"4660000","outputRows":"10000"}]
2015-04-01 20:28:56.568 +0900 [INFO] (task-0000): Job completed successfully. job id:[job_ABCDEFGHIJKLMN] elapsed_time:30513ms status:[SUCCESS]
# ローカルファイルの削除
2015-04-01 20:28:56.568 +0900 [INFO] (task-0000): Delete local file [/var/tmp/sample.000.00.csv.gz]
2015-04-01 20:28:56.569 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-04-01 20:28:56.596 +0900 [INFO] (main): Committed.
2015-04-01 20:28:56.597 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

1分ほどで実行が完了しました。 普段RDBMS等を使われている方は長いと思われるかもしれませんが、45秒ほどはBigQuery上でのジョブの実行です。 ローカルでの処理は数秒で完了しています。

実行結果の確認

bqコマンドで確認してみます。

正常に動作していればテーブル作成時の1件を含め10001件入っているはずです。

bq query "SELECT COUNT(*) FROM demodatasets.demotable"
Waiting on bqjob_abcdefghijklmn ... (0s) Current status: DONE
+-------+
|  f0_  |
+-------+
| 10001 |
+-------+

未知のデータで試す場合はいきなりBigQueryにロードせずtype:fileとしてローカルファイルに出力しbq loadで正常にロードできるか確認すると問題の切り分けができてハマらないと思います。

最後に

ね、簡単でしょ?


Profile picture of sakama

Written by sakama Data engineer who lives and works in Tokyo for successful data analytics You can follow me on Twitter

© 2022, sakama.dev - Built with Gatsby

Author

Profile picture of sakama

Written by sakama Data engineer who lives and works in Tokyo for successful data analytics You can follow me on Twitter

Socials

github.com/sakama

Recent Posts

Categories

Tags