読者です 読者をやめる 読者になる 読者になる

Treasure Dataにもれなくログをつっこむ

なんとなくログをTreasureDataに突っ込み始めると、開始時期のデータが不完全な状態になっちゃいます。
というわけで『過去ログのアップも含めて、もれなくデータを突っ込むにはどうしたらいいのか』ということです!


今回はこういう手順でやってみました。

  • 当日のログはFluentdのin_tailをゴニョって
  • 過去ログはtd bulk_importで

当日分のログはfluentdのin_tailで

※この手順は公式なものではないので自己責任で!

Fluentdのin_tailにはpos_fileという設定があります。
ここで指定したポジションファイルには、どのログをどこまで送信したかが記録されます。

<source>
  type tail
  format apache
  path /var/log/httpd/site01-access_log.link
  tag forward.apache.access.site01
  pos_file /var/log/td-agent/site01-access_log.pos
</source>

このファイルの書式はこうなってます。

<ログのフルパス>TAB<送信済みサイズ>TAB<ファイルのinode>

のぞいてみるとこんな感じ(1ログ1ポジションファイルの場合)。数値は桁数指定された16進数です。

$ cat /var/log/httpd/site01-access_log.link
/var/log/httpd/site01-access_log.link	00000000044c1416	007203dd

というわけで送信済みサイズを0に指定したポジションファイルを作ってtd-agentを起動すれば、最初から送信が始まりそうです。

# ruby -e 'f=ARGV[0];s=File::stat(f);puts "%s\t%016x\t%08x" % [f, 0, s.ino]' /var/log/httpd/site01-access_log.link > /var/log/td-agent/site01-access_log.pos
# chown td-agent. /var/log/td-agent/site01-access_log.pos
# chmod 666 /var/log/td-agent/site01-access_log.pos
# cat /var/log/td-agent/site01-access_log.pos
/var/log/httpd/site01-access_log.link	0000000000000000	007203dd
# /etc/init.d/td-agent start

WEBサーバ、集約サーバともに一気に負荷がかかるのでローテート直後に実施するか、こっそりやりましょう。

過去ログはtd bulk_importで

tdコマンドのヘルプではtable:importも紹介されていますが、

$ td table:import example_db table1 --apache access.log
$ td table:import example_db table1 --json -t time test.json

table:importはデータ保証がなく、アップロードにも時間がかかります。
(msgpack+gz形式を使えば転送時間は短縮できるそうです)


大量のログを安全にアップロードするにはbulk_importを使います。
bulk_importでは以下の処理を個別に行います。

  • アップロード
  • 変換
  • tableへの取り込み

詳しくは公式で!


こういうテーブルがあるという前提でいきます。
(Database名とTable名は逆にすべきだな・・・)

$ td tables
+----------+---------------+------+------------+----------+
| Database | Table         | Type | Count      | Schema   |
+----------+---------------+------+------------+----------+
| access   | site01        | log  | 11626969   |          |
| access   | site02        | log  | 8057329    |          |
+----------+---------------+------+------------+----------+
アップロードファイルを作成

bulk_importで扱うのはMsgpack形式+gzip圧縮なファイルです。
Fluentdでやる手もありますが、今回は以下のフィルタを書いて変換しました。

#!/usr/lib64/fluent/ruby/bin/ruby
require 'json'
require 'time'
require 'msgpack'

format = /^(?<host>[^ ]*) [^ ]* [^ ]* \[(?<time>[^\]]*)\] "(?<method>\S+) +(?<path>[^ ]+) +\S*" (?<code>[^ ]*) (?<size>[^ ]*) "(?<referer>[^\"]*)" "(?<agent>[^\"]*)" /
time_format = "%d/%b/%Y:%H:%M:%S %z"
path_filter = Regexp.new "^/static/"

while line = STDIN.gets
  line.chomp!
  
  m = format.match line
  unless m
    STDERR.puts "not match! : #{line}"
    next
  end

  record = {}

  m.names.each {|n| record[n] = m[n] }
  next if path_filter =~ record["path"]

  record["time"] = Time.strptime(record["time"], time_format).to_i

  #
  # いろいろ内部変換処理があったり
  #

  print MessagePack.pack record
end

こんなかんじで使います。

$ zcat access_log.20120617.gz | ./td_convert.rb 2>td_convert.err | gzip -c >access_log.20120617.mpack.gz

※すんごく時間がかかります。だれかもっと効率的なの書いて><
ファイルのサイズが大きいとアップロードに失敗することが多くなります。
32M-64Mがオススメらしいので必要に応じて分割しましょう!

bulk_import手順

bulk_importを作成します
ほどよい単位で作りましょう。あまり大きいと失敗した時に泣けます。
今回は日時単位で作っていきます。

$ td bulk_import:create 20120617 access site01


ログをアップロードします
サーバごとに分割されたログをアップするならこんな感じになります。

$ td bulk_import:upload_part 20120617 web01 web01/site01-access_log.20120617.mpack.gz
$ td bulk_import:upload_part 20120617 web02 web02/site01-access_log.20120617.mpack.gz
$ td bulk_import:upload_part 20120617 web03 web03/site01-access_log.20120617.mpack.gz
...

td bulk_import:showで確認しながらやりましょう。Uploaded Partsが全部揃ったらアップロード完了です。

$ td bulk_import:show 20120617
Name         : 20120617
Database     : access
Table        : site01
Status       : Uploading
Frozen       : false
JobID        :
Valid Records:
Error Records:
Valid Parts  :
Error Parts  :
Uploaded Parts :
  web01
  web02
  web03


全部揃ったらアップロードを凍結して変換処理を走らせます

$ td bulk_import:freeze 20120608
Bulk import session '20120608' is frozen.
$ td bulk_import:perform 20120608


bulk_import:listでReadyになったら変換完了です。
この時、必ず「Valid Records」のレコードの行数が正しいか確認しておきましょう。

$ td bulk_import:list
+----------+---------------+------------+--------+--------+-------------+-------------+---------------+---------------+
| Name     | Table         | Status     | Frozen | JobID  | Valid Parts | Error Parts | Valid Records | Error Records |
+----------+---------------+------------+--------+--------+-------------+-------------+---------------+---------------+
| 20120615 | access.site01 | Committed  | frozen | 419867 | 3           | 0           | 527510        | 0             |
| 20120616 | access.site01 | Committed  | frozen | 421640 | 3           | 0           | 498026        | 0             |
| 20120617 | access.site01 | Ready      | frozen | 421859 | 3           | 0           | 416908        | 0             |
| 20120618 | access.site01 | Performing | frozen | 439196 |             |             |               |               |
| 20120619 | access.site01 | Uploading  |        |        |             |             |               |               |
+----------+---------------+------------+--------+--------+-------------+-------------+---------------+---------------+


commitするtableにデータが入ります

$ td bulk_import:commit 20120617

まとめ

過去ログのアップロードめんどくさいよ(;´Д`)ハァハァ