Fluentd→Bigquery タグごとに別テーブルにinsert
Dec 28, 2019 11:12 · 634 words · 2 minute read
こんな設定でできる
<source>
@type forward
</source>
<match bigquery.**>
@type bigquery_insert
<buffer tag>
flush_interval 10
</buffer>
<inject>
time_key time
utc true
</inject>
auth_method json_key
json_key /home/jibun/fluentd/xxxxxx-123456-35640a504591.json
project myprj-135512
dataset ${tag[1]}
table ${tag[2]}
</match>
プレースホルダー
この部分の設定で、タグごとに別テーブルへとinsertしている
dataset ${tag[1]}
table ${tag[2]}
fluentdは現在のバージョンでは ${tag[1]}
というようなプレースホルダーを設定に使うことができる
たとえば bigquery.mydataset.temperature
タグを渡すと、
mydatasetというデータセットのtemperatureテーブルにデータを入れてくれる
time_key
<inject>
time_key time
time_key time
という設定をいれておくと、
時間を指定せずにメッセージを送ってもtimeカラムに送信日時が入ってくれる
例
こんなコマンドを何回か叩いたとして
# timeカラムをメッセージに含めていない
echo "{\"value\": $RANDOM}" | fluent-cat bigquery.mydataset.temperature
これでちゃんと時間が入ってくれる
+---------------------+-------+
| time | value |
+---------------------+-------+
| 2019-12-28 02:11:26 | 25425 |
| 2019-12-28 02:11:28 | 13507 |
| 2019-12-28 02:11:33 | 31525 |
+---------------------+-------+
buffer tag
この設定がないと
<buffer tag>
こんなエラーが出る
Parameter ‘bigquery_insert: project=xxxxxx-123456/dataset=${tag[1]}/table=${tag[2]}/fetch_schema_table=/template_suffix=’ has tag placeholders, but chunk key ‘tag’ is not configured
ちゃんとプレースホルダーのドキュメントに書いてあった
When the chunk keys are specified, these values can be extracted in configuration parameter values.
タグプレースホルダーを使うときはchunk-key(bufferの設定)のところでtagを指定しないとダメ
その他
@type bigquery_load
を使う場合、 サービスアカウントに bigquery.jobs.create
権限が必要 (BigQuery編集者だとその権限はついていなかった)
また、bigquery_load
ではデータが少量すぎると以下のようなエラーが出ていた
failed to poll load job error_class=Fluent::BigQuery::UnRetryableError error=“notFound: Not found: Job penguin-171212:job_H555BgtYLRUQpxq-bnSBH3uAbBfn”
データはbigqueryに入っていたので、おそらく初回のポーリングまでに処理が終わってしまうような量だと発生するエラーかも