なんとなくログをTreasureDataに突っ込み始めると、開始時期のデータが不完全な状態になっちゃいます。
というわけで『過去ログのアップも含めて、もれなくデータを突っ込むにはどうしたらいいのか』ということです!
今回はこういう手順でやってみました。
- 当日のログはFluentdのin_tailをゴニョって
- 過去ログはtd bulk_importで
当日分のログはfluentdのin_tailで
※この手順は公式なものではないので自己責任で!
Fluentdのin_tailにはpos_fileという設定があります。
ここで指定したポジションファイルには、どのログをどこまで送信したかが記録されます。
<source> type tail format apache path /var/log/httpd/site01-access_log.link tag forward.apache.access.site01 pos_file /var/log/td-agent/site01-access_log.pos </source>
このファイルの書式はこうなってます。
<ログのフルパス>TAB<送信済みサイズ>TAB<ファイルのinode>
のぞいてみるとこんな感じ(1ログ1ポジションファイルの場合)。数値は桁数指定された16進数です。
$ cat /var/log/httpd/site01-access_log.link /var/log/httpd/site01-access_log.link 00000000044c1416 007203dd
というわけで送信済みサイズを0に指定したポジションファイルを作ってtd-agentを起動すれば、最初から送信が始まりそうです。
# ruby -e 'f=ARGV[0];s=File::stat(f);puts "%s\t%016x\t%08x" % [f, 0, s.ino]' /var/log/httpd/site01-access_log.link > /var/log/td-agent/site01-access_log.pos # chown td-agent. /var/log/td-agent/site01-access_log.pos # chmod 666 /var/log/td-agent/site01-access_log.pos # cat /var/log/td-agent/site01-access_log.pos /var/log/httpd/site01-access_log.link 0000000000000000 007203dd # /etc/init.d/td-agent start
WEBサーバ、集約サーバともに一気に負荷がかかるのでローテート直後に実施するか、こっそりやりましょう。
過去ログはtd bulk_importで
tdコマンドのヘルプではtable:importも紹介されていますが、
$ td table:import example_db table1 --apache access.log $ td table:import example_db table1 --json -t time test.json
table:importはデータ保証がなく、アップロードにも時間がかかります。
(msgpack+gz形式を使えば転送時間は短縮できるそうです)
大量のログを安全にアップロードするにはbulk_importを使います。
bulk_importでは以下の処理を個別に行います。
- アップロード
- 変換
- tableへの取り込み
詳しくは公式で!
こういうテーブルがあるという前提でいきます。
(Database名とTable名は逆にすべきだな・・・)
$ td tables +----------+---------------+------+------------+----------+ | Database | Table | Type | Count | Schema | +----------+---------------+------+------------+----------+ | access | site01 | log | 11626969 | | | access | site02 | log | 8057329 | | +----------+---------------+------+------------+----------+
アップロードファイルを作成
bulk_importで扱うのはMsgpack形式+gzip圧縮なファイルです。
Fluentdでやる手もありますが、今回は以下のフィルタを書いて変換しました。
#!/usr/lib64/fluent/ruby/bin/ruby require 'json' require 'time' require 'msgpack' format = /^(?<host>[^ ]*) [^ ]* [^ ]* \[(?<time>[^\]]*)\] "(?<method>\S+) +(?<path>[^ ]+) +\S*" (?<code>[^ ]*) (?<size>[^ ]*) "(?<referer>[^\"]*)" "(?<agent>[^\"]*)" / time_format = "%d/%b/%Y:%H:%M:%S %z" path_filter = Regexp.new "^/static/" while line = STDIN.gets line.chomp! m = format.match line unless m STDERR.puts "not match! : #{line}" next end record = {} m.names.each {|n| record[n] = m[n] } next if path_filter =~ record["path"] record["time"] = Time.strptime(record["time"], time_format).to_i # # いろいろ内部変換処理があったり # print MessagePack.pack record end
こんなかんじで使います。
$ zcat access_log.20120617.gz | ./td_convert.rb 2>td_convert.err | gzip -c >access_log.20120617.mpack.gz
※すんごく時間がかかります。だれかもっと効率的なの書いて><
ファイルのサイズが大きいとアップロードに失敗することが多くなります。
32M-64Mがオススメらしいので必要に応じて分割しましょう!
bulk_import手順
bulk_importを作成します
ほどよい単位で作りましょう。あまり大きいと失敗した時に泣けます。
今回は日時単位で作っていきます。
$ td bulk_import:create 20120617 access site01
ログをアップロードします
サーバごとに分割されたログをアップするならこんな感じになります。
$ td bulk_import:upload_part 20120617 web01 web01/site01-access_log.20120617.mpack.gz $ td bulk_import:upload_part 20120617 web02 web02/site01-access_log.20120617.mpack.gz $ td bulk_import:upload_part 20120617 web03 web03/site01-access_log.20120617.mpack.gz ...
td bulk_import:showで確認しながらやりましょう。Uploaded Partsが全部揃ったらアップロード完了です。
$ td bulk_import:show 20120617 Name : 20120617 Database : access Table : site01 Status : Uploading Frozen : false JobID : Valid Records: Error Records: Valid Parts : Error Parts : Uploaded Parts : web01 web02 web03
全部揃ったらアップロードを凍結して変換処理を走らせます
$ td bulk_import:freeze 20120608 Bulk import session '20120608' is frozen. $ td bulk_import:perform 20120608
bulk_import:listでReadyになったら変換完了です。
この時、必ず「Valid Records」のレコードの行数が正しいか確認しておきましょう。
$ td bulk_import:list +----------+---------------+------------+--------+--------+-------------+-------------+---------------+---------------+ | Name | Table | Status | Frozen | JobID | Valid Parts | Error Parts | Valid Records | Error Records | +----------+---------------+------------+--------+--------+-------------+-------------+---------------+---------------+ | 20120615 | access.site01 | Committed | frozen | 419867 | 3 | 0 | 527510 | 0 | | 20120616 | access.site01 | Committed | frozen | 421640 | 3 | 0 | 498026 | 0 | | 20120617 | access.site01 | Ready | frozen | 421859 | 3 | 0 | 416908 | 0 | | 20120618 | access.site01 | Performing | frozen | 439196 | | | | | | 20120619 | access.site01 | Uploading | | | | | | | +----------+---------------+------------+--------+--------+-------------+-------------+---------------+---------------+
commitするtableにデータが入ります
$ td bulk_import:commit 20120617
まとめ
過去ログのアップロードめんどくさいよ(;´Д`)ハァハァ