昨年からダラダラと座学に取り組んできたものが年を超えてようやく形になったのでメモ。
もともとSplunkに変わる大体手段として何かないかなーと探していたところ, ELKでお試しという試みだったけど, 諸々の事情(後でやるやる詐欺)でこんなにも時間がかかった。
全体像
1. CiscoからSyslogを飛ばし, Rsyslogで受け取る。
2. Filebeatで該当ファイルをLogstashへ送る。
3. LogstashでパースしてElasticsearchへ送る。
4. Kibanaで表示。
全体像 |
オールインワン構成とした。
一連の流れ
2. 各種インストール・設定
3. 詳細設定
4. 動作確認
1. インスタンス作成, 事前準備
$ openstack flavor create --ram 12288 --disk 60 --vcpus 4 elastic-flavor
$ openstack server create --flavor elastic-flavor --image CentOS7 --key-name elastic-key elastic
インスタンスができたらrsyslogの設定。(※florting IPの設定は省略)
<Syslog方針>
– Cisco側はfacility local3でサーバへ渡す。
– Local3で受信したログは /var/log/rsyslog/ 配下に”hostname.log”で作成させる。
@Cisco
logging facility local3
vi /etc/rsyslog.conf
$template DynFile,"/var/log/rsyslog/%HOSTNAME%.log
local3.* -?DynFile
*.info;mail.none;authpriv.none;cron.none;local3.none /var/log/messages
続いてJDKのインストール。
# yum install java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-debug.x86_64 java-1.8.0-openjdk-devel.x86_64
オールインワン構成だけど, IP指定で構成するのとCiscoの名前解決必要なのでhostsファイルを編集する。
/etc/hosts 編集
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
172.16.10.50 elastic
192.168.10.1 local-c841m
2. 各種インストール・設定
オフィシャルのオンラインマニュアルを参考にインストールできる。
Elasticsearch手順
https://www.elastic.co/guide/en/elasticsearch/reference/6.1/rpm.html
Kibana手順
https://www.elastic.co/guide/en/kibana/6.1/rpm.html
Logstash手順
https://www.elastic.co/guide/en/logstash/6.1/installing-logstash.html
Filebeat手順
https://www.elastic.co/guide/en/beats/filebeat/6.1/filebeat-installation.html
2.1 リポジトリ登録
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
/etc/yum.repos.d/elasticsearch.repo 作成
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
2.2 インストール
yum install elasticsearch kibana logstash
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.1.1-x86_64.rpm
yum install ./filebeat-6.1.1-x86_64.rpm
2.3 Elasticsearch設定
とりあえず動作確認までこぎつけるため, 基本的な設定のみを実施。(Cluster.nameやnetwork.hostは気分で変えた)
/etc/elasticsearch/elasticsearch.yml
cluster.name: elastichome
node.name: elastic
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200
起動設定。
systemctl enable elasticsearch
systemctl start elasticsearch
2.4 Logstash設定
filebeat用の設定ファイルは後にして一通り設定を済ます。
(ログ読み込まないなー等うまくいかない時はここで自動読み込み設定ONにしてlogレベルを適宜変更するとよい。)
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
config.reload.automatic: true
config.reload.interval: 5s
log.level: info
path.logs: /var/log/logstash
起動設定。
systemctl enable logstash
systemctl start logstash
2.5 Kibana設定
ここもほぼ基本的な設定のみ。
/etc/kibana/kibana.yml
server.port: 5601
server.host: "172.16.10.50"
server.name: "elastic"
elasticsearch.url: "http://172.16.10.50:9200"
elasticsearch.username: "kibana"
elasticsearch.password: "kibanapassword"
logging.dest: /var/log/kibana/kibana.log
起動設定。
systemctl enable kibana
systemctl start kibana
2.6 Filebeat設定
監視対象のログ指定と出力先をlogstashにする設定を入れる。
logstashでのフィルタ条件で利用するtagの設定も入れる。
filebeat.prospectors:
- type: log
paths:
- /var/log/rsyslog/*.log
fields:
log_type : ciscolog
fields_under_root: true
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
index.number_of_shards: 3
index.codec: best_compression
_source.enabled: false
setup.dashboards.enabled: false
output.logstash:
hosts: ["172.16.10.50:5044"]
index: "filebeat"
username: "logstash_internal"
password: "logstashpassword"
logging.level: debug
起動設定。
systemctl enable filebeat
systemctl start filebeat
3 フィルタリングとかインデックスとか設定
3.1 設定ファイル作成
Filebeat用のlogstash設定ファイルを /etc/logstash/conf.d/beat.conf として作成する。
input {
beats {
port => 5044
}
}
filter {
if [log_type] == "ciscolog" {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:log_host} %{NUMBER:unixtimestamp}: %{SYSLOGTIMESTAMP:ciscotimestamp} %{WORD:timezone}: %{GREEDYDATA:facility}: %{DATA:application} %{GREEDYDATA:msg_1} - %{WORD:protocol} %{WORD:msg_2} %{IP:src_ip}:%{NUMBER:src_port} %{IP:dst_ip}:%{NUMBER:dst_port} %{GREEDYDATA:msg_3}"
}
}
geoip {
source => "dst_ip"
}
mutate {
convert => {
"src_port" => "integer"
"dst_port" => "integer"
}
}
}
}
output {
elasticsearch {
hosts => [ "http://172.16.10.50:9200" ]
index => "myfilebeat-%{+YYYY.MM}"
}
}
※補足説明
7行目: if [log_type] == “ciscolog” {
10行目: “message” => “%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:log_host} %{NUMBER:unixtimestamp}: %{SYSLOGTIMESTAMP:ciscotimestamp} %{WORD:timezone}: %{GREEDYDATA:facility}: %{DATA:application} %{GREEDYDATA:msg_1} – %{WORD:protocol} %{WORD:msg_2} %{IP:src_ip}:%{NUMBER:src_port} %{IP:dst_ip}:%{NUMBER:dst_port} %{GREEDYDATA:msg_3}”
CiscoルータのゾーンベースのFWログフォーマットを見ながらパーシングしたい箇所を切り出せるように記述する。
grokフィルタが良く分からないときはここ(Grok Debugger)を参考にトライアンドエラーでがんばる。GREEDYDATA(*と同義)を使って少しずつ細分化していくのがおすすめ。
14行目: source => “dst_ip”
geoipフィルタで対象となるIPアドレスを指定する。
今回はログから通信先がどこか地図にマッピングすることを考えているので, ログ内のdst_ip(通信先IPアドレス)を指定した。
16行~21行: mutateの行
ここは実際にパースしたらなぜかtypeがtextになっていたので入れた。気になったのでtypeをコンバートする設定を追加。テンプレートあたりをしっかり理解すればこんなことしなくてもよいのかもしれない。
27行目: index => “myfilebeat-%{+YYYY.MM}”
elasitcsearchへ出力する際にインデックスを追加する。
3.2 インデックス作成
インデックスはKibanaにログインし, 「Management → Index Patterns」でインデックスを作成する。
インデックス作成画面 |
これでいけるか!と思ったけど実際にマップ作成をやってみると, “kibanaのGeo Coordinatesマップで参照されるgeo_pointとなるタイプが無い”となって作成できない。
デフォルトのfilebeatのテンプレートを元に, 新たに作成する。
<手順>
Kibana上のDevToolsに「GET _template/filebeat-*」と入れ, 出力結果を編集してPUTする。
1. 元となるテンプレートを元に新しいテンプレートを作成する。
{
"filebeat": {
"order": 1,
"index_patterns": [
"myfilebeat-*"
],
~中略~
}
}
},
"geoip": {
"properties": {
"continent_name": {
"type": "keyword",
"ignore_above": 1024
},
"country_iso_code": {
"type": "keyword",
"ignore_above": 1024
},
"location": {
"type": "geo_point"
},
"region_name": {
"type": "keyword",
"ignore_above": 1024
},
"city_name": {
"type": "keyword",
"ignore_above": 1024
}
}
}
}
}
},
"aliases": {}
}
}
2. テンプレートをPUTする。
1の内容をDevToolsでPUTする。
PUT _template/filebeat
※ ここに1の内容を入れる
4. 確認
ここまで来ればKibana上で次のように表示されるはず。
インデックスが作成されている |
geoip.locationというフィールドがgeo_pointタイプで作成されている |
Map作成は
- Visualize → 「+」新規作成 → MapsのCoordinate Map
- myfilebeat-* を選択
- Bucketsで「GeoCoodinate」を選択
- Aggregationで「Geohash」を選択
- Fieldで「geoip.location」を選択
で完了。
次のようなMapが表示される。
Map表示。 |
Splunkのほうが大分楽にMap作成はできるけどSPLを覚えるのと比較すると, まぁどっこいどっこいかな。
とりあえずの動作確認ができた。
これからは細かく分析・その他データの可視化を目標として検証すすめる。