NetflowをElastiflowで取り込む

Elasticsearchで取り込んだデータをKibanaでインデックス化まではいけたのだけれど, ダッシュボードにNetflowがないのでフォーラムに問い合わせしてみたら, 「ElastiFlowをおすすめする」と言われたのでそちらでやってみた。

手順はここにある。
https://github.com/robcowart/elastiflow

しかし必要リソースが多い・・・。

flows/sec (v)CPUs Memory Disk (30-days) ES JVM Heap LS JVM Heap
250 4 24 GB 305 GB 8 GB 4 GB
1000 8 32 GB 1.22 TB 12 GB 4 GB
2500 12 64 GB 3.05 TB 24 GB 6 GB

手順

  1. 確認
  2. Javaのヒープサイズ確認
  3. Logstash Pluginインストール
  4. Git Hubから関連ファイル取得・配置
  5. 設定ファイル編集
  6. プロセス再起動
  7. Kibanaでインデックス作成・ダッシュボードjsonを読み込む

アップデート

もろもろアップデートしておく。なお, Elasticsearchのバージョンは6.2.4。

yum update -y

Javaのヒープサイズ変更

It is recommended that Logstash be given at least 2GB of JVM heap. If all options, incl. DNS lookups (requires version 3.0.10 or later of the DNS filter), are enabled increase this to 4GB. 

とあったので初期値1G, MAX値を4Gへ変更。

vi /etc/logstash/jvm.options
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

#-Xms256m
#-Xmx1g
-Xms1g
-Xmx4g

Logstash Pluginインストール

# ./logstash-plugin install logstash-codec-sflow
Validating logstash-codec-sflow
Installing logstash-codec-sflow
Installation successful
# ./logstash-plugin update logstash-codec-netflow
Updating logstash-codec-netflow
Updated logstash-codec-netflow 3.13.2 to 3.14.0
# ./logstash-plugin update logstash-input-udp
Updating logstash-input-udp
Updated logstash-input-udp 3.3.2 to 3.3.3
# ./logstash-plugin update logstash-filter-dns
Updating logstash-filter-dns
Updated logstash-filter-dns 3.0.9 to 3.0.10

Git Hubから関連ファイル取得・配置・編集

% git clone https://github.com/robcowart/elastiflow.git
% ls -l elastiflow
total 32
drwxrwxr-x. 2 centos centos 75 May 24 20:02 kibana
-rw-rw-r--. 1 centos centos 1026 May 24 20:02 LICENSE.md
drwxrwxr-x. 3 centos centos 23 May 24 20:02 logstash
drwxrwxr-x. 2 centos centos 54 May 24 22:49 logstash.service.d
drwxrwxr-x. 2 centos centos 26 May 24 20:02 profile.d
-rw-rw-r--. 1 centos centos 28091 May 24 20:02 README.md

設定ファイル配置

# cp -r ./elastiflow/logstash/elastiflow/ /etc/logstash/

設定ファイル編集。
netflow以外使わないので, それ以外のファイルはdisableにした。

10_input_ipfix_ipv4.logstash.conf.disabled
10_input_ipfix_ipv6.logstash.conf.disabled
10_input_netflow_ipv4.logstash.conf
10_input_netflow_ipv6.logstash.conf.disabled
10_input_sflow_ipv4.logstash.conf.disabled
10_input_sflow_ipv6.logstash.conf.disabled
20_filter_10_begin.logstash.conf
20_filter_20_netflow.logstash.conf
20_filter_30_ipfix.logstash.conf.disabled
20_filter_40_sflow.logstash.conf.disabled
20_filter_90_post_process.logstash.conf
30_output.logstash.conf

インプットファイル編集

# vi 10_input_netflow_ipv4.logstash.conf

変更点。

host => "${ELASTIFLOW_NETFLOW_IPV4_HOST:172.16.10.50}"
port => "${ELASTIFLOW_NETFLOW_IPV4_PORT:9995}"

アウトプットファイル編集

# vi 30_output.logstash.conf

変更点。

hosts => [ "${ELASTIFLOW_ES_HOST:172.16.10.50:9200}" ]
user => "${ELASTIFLOW_ES_USER:elastic}"
password => "${ELASTIFLOW_ES_PASSWD:elastic}"

起動スクリプト配置

# cp -r ./elastiflow/logstash.service.d/ /etc/systemd/system/

起動スクリプト編集

# vi /etc/systemd/system/logstash.service.d/elastiflow.conf

変えたところは以下。

Environment="ELASTIFLOW_NAMESERVER=1.1.1.1"
Environment="ELASTIFLOW_ES_HOST=172.16.10.50"
Environment="ELASTIFLOW_ES_PASSWD=changeme"
Environment="ELASTIFLOW_NETFLOW_IPV4_HOST=172.16.10.50"
Environment="ELASTIFLOW_NETFLOW_IPV4_PORT=9995"

アプリケーションID登録。

# vi /etc/logstash/elastiflow/dictionaries/app_id.srctype.yml

Cisco841を登録。

"192.168.1.2": "c841m"

pipeline.ymlに以下追加。合わせてnetflowの行はコメントアウト。

# For ElastiFlow
- pipeline.id: elastiflow
path.config: "/etc/logstash/elastiflow/conf.d/*.conf"

プロセス再起動

#systemctl restart logstash
#systemctl daemon-reload

Kibanaにjson取り込み

「Management -> Save Objects -> Import」でGitから取得したelastiflow.dashboards.jsonをインポート。

できた!

ただ, うちの仮想マシンのスペック不足で結構な頻度でエラーが出る。
この辺は今の状況ではどうしようもないなー。

【メモ】elasticsearchアップグレードしたらエラーで起動しない件

Just a note。

こんなメッセージが出て起動しなかった。

java.lang.IllegalArgumentException: plugin [ingest-geoip] is incompatible with version [6.2.4]; was designed for version [6.1.2]
at org.elasticsearch.plugins.PluginInfo.readFromProperties(PluginInfo.java:237) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.plugins.PluginInfo.readFromProperties(PluginInfo.java:184) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Spawner.spawnNativePluginControllers(Spawner.java:75) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:167) ~[elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:323) [elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:121) [elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:112) [elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) [elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) [elasticsearch-cli-6.2.4.jar:6.2.4]
at org.elasticsearch.cli.Command.main(Command.java:90) [elasticsearch-cli-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:92) [elasticsearch-6.2.4.jar:6.2.4]
at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:85) [elasticsearch-6.2.4.jar:6.2.4]

一度6.1.3へダウングレードして, ingest-geoipを削除。 その後またアップグレードで復旧。

Logstashのマルチパイプライン設定

Filebeatsからの入力とローカルのファイルを読み込んでの処理を分けるためにマルチパイプラインの設定をする。

blog-pipeline-log.jpg
公式サイト(Introducing Multiple Pipelines in Logstash)より

https://www.elastic.co/blog/logstash-multiple-pipelines
https://www.elastic.co/guide/en/logstash/master/multiple-pipelines.html

以前作成したFilebeatsの設定ファイルを転用して, CSVファイル用の設定ファイルを作成して, それぞれ異なるパイプラインで処理する設定を行う。
CSVファイルで使うのは長年エクセルで管理してグラフ化していた光熱費のデータを使う(笑)。

手順
1. /etc/logstash/pipelines.yml 作成
2. filebeatsのConfig(既存転用)とCSV用のConfigを作成
3. index Pattern作成
4. グラフ作成

1. pipelines.yml 作成

pipelines.ymlには各パイプラインの設定を記述する。ここに記述されなかった設定はlogstash.ymlを参照してそれに従うとのこと。
・pipelines.yml には個別設定
・logstash.yml には共通設定
という住み分けらしい。(多分)

vi /etc/logstash/pipelines.yml

# For beats
- pipeline.id: filebeat
pipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 5
config.reload.automatic: true
config.reload.interval: 5s
path.config: "/etc/logstash/pipeconf.d/beats.conf"

# For csv
- pipeline.id: csvfile
pipeline.workers: 1
pipeline.batch.size: 125
pipeline.batch.delay: 5
config.reload.automatic: true
config.reload.interval: 5s
path.config: "/etc/logstash/pipeconf.d/csv.conf"

これに伴って /etc/logstash/logstash.yml を以下のように変更した。
vi /etc/logstash/logstash.yml

path.data: /var/lib/logstash
log.level: info
path.logs: /var/log/logstash

最低限のものに絞ったので設定値はpipelines.ymlへ持っていった。

2. csv用設定ファイル作成

csvファイルは次のような要素になっている。
「YYYYM,電気代,水道代,ガス代」
実際のデータはこんな感じ。
「201712,9308,11765,10018」

logstashはlogstashユーザで起動するので, データの配置場所は参照できるところへ配置する。(私はユーザのホームにおいてread権限あるから大丈夫だろうと思っていたらハマった。)
各パイプライン用の設定ファイルは /etc/logstash に pipeconf.d というディレクトリを新たに作成してそこに配置した。
vi /etc/logstash/pipeconf.d/csv.conf

input {
file {
path => "/tmp/logstash/kounetsu.csv"
start_position => "beginning"
}
}

filter {
csv {
columns => ["date", "elec", "water", "gus"]
skip_empty_columns => true
convert => {
"elec" => "integer"
"water" => "integer"
"gus" => "integer"
}
}
date {
match => [ "date", "yyyyM"]
}
}

output {
elasticsearch {
hosts => [ "http://172.16.10.50:9200" ]
index => "kounetsu"
}
}

3. index pattern作成

elasticsearchへ”kounetsu”というインデックスで渡しているので kibana でこれを指定してインデックスパターンを作成する。

4. グラフ作成

ここはお好みでとなりますが。。。

完成。


できるまで結構ハマっていたのでこれをポチった。 無駄にはならないよね・・・。

CiscoのAPPFWのログをfilebeat→logstash→elasticsearchからのkibanaでMap表示させる

昨年からダラダラと座学に取り組んできたものが年を超えてようやく形になったのでメモ。
もともとSplunkに変わる大体手段として何かないかなーと探していたところ, ELKでお試しという試みだったけど, 諸々の事情(後でやるやる詐欺)でこんなにも時間がかかった。

全体像

1. CiscoからSyslogを飛ばし, Rsyslogで受け取る。
2. Filebeatで該当ファイルをLogstashへ送る。
3. LogstashでパースしてElasticsearchへ送る。
4. Kibanaで表示。

全体像

オールインワン構成とした。

FilebeatからElasticsearchへ直接という方法もあるようだけど, インデックス作成周りでよくわからないことに陥りそうだから, Webでよく見るfluentd+logstash構成に習った。
なお, 有償のX-packは使わない。

一連の流れ

1. インスタンス作成, 事前準備
2. 各種インストール・設定
3. 詳細設定
4. 動作確認

1. インスタンス作成, 事前準備

はじめにELKを立てるインスタンスを準備する。
インデックス作成にJavaを使うということで, このJavaがリソースを結構持っていくのでリソースは多めに割り当てた。CentOS7でvcpu: 4 mem: 12GB disk: 60GB。
$ openstack flavor create --ram 12288 --disk 60 --vcpus 4 elastic-flavor
$ openstack server create --flavor elastic-flavor --image CentOS7 --key-name elastic-key elastic

インスタンスができたらrsyslogの設定。(※florting IPの設定は省略)
<Syslog方針>
– Cisco側はfacility local3でサーバへ渡す。
– Local3で受信したログは /var/log/rsyslog/ 配下に”hostname.log”で作成させる。

@Cisco

logging facility local3

vi /etc/rsyslog.conf

$template DynFile,"/var/log/rsyslog/%HOSTNAME%.log
local3.* -?DynFile

*.info;mail.none;authpriv.none;cron.none;local3.none /var/log/messages

続いてJDKのインストール。

# yum install java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-debug.x86_64 java-1.8.0-openjdk-devel.x86_64

オールインワン構成だけど, IP指定で構成するのとCiscoの名前解決必要なのでhostsファイルを編集する。
/etc/hosts 編集

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

172.16.10.50 elastic
192.168.10.1 local-c841m

2. 各種インストール・設定

オフィシャルのオンラインマニュアルを参考にインストールできる。
Elasticsearch手順
https://www.elastic.co/guide/en/elasticsearch/reference/6.1/rpm.html
Kibana手順
https://www.elastic.co/guide/en/kibana/6.1/rpm.html
Logstash手順
https://www.elastic.co/guide/en/logstash/6.1/installing-logstash.html
Filebeat手順
https://www.elastic.co/guide/en/beats/filebeat/6.1/filebeat-installation.html

2.1 リポジトリ登録

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

/etc/yum.repos.d/elasticsearch.repo 作成

[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md


2.2 インストール

yum install elasticsearch kibana logstash
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.1.1-x86_64.rpm
yum install ./filebeat-6.1.1-x86_64.rpm

2.3 Elasticsearch設定
とりあえず動作確認までこぎつけるため, 基本的な設定のみを実施。(Cluster.nameやnetwork.hostは気分で変えた)
/etc/elasticsearch/elasticsearch.yml

cluster.name: elastichome
node.name: elastic
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200

起動設定。

systemctl enable elasticsearch
systemctl start elasticsearch

2.4 Logstash設定
filebeat用の設定ファイルは後にして一通り設定を済ます。
(ログ読み込まないなー等うまくいかない時はここで自動読み込み設定ONにしてlogレベルを適宜変更するとよい。)

path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d/*.conf
config.reload.automatic: true
config.reload.interval: 5s
log.level: info
path.logs: /var/log/logstash

起動設定。

systemctl enable logstash
systemctl start logstash

2.5 Kibana設定
ここもほぼ基本的な設定のみ。
/etc/kibana/kibana.yml

server.port: 5601
server.host: "172.16.10.50"
server.name: "elastic"
elasticsearch.url: "http://172.16.10.50:9200"
elasticsearch.username: "kibana"
elasticsearch.password: "kibanapassword"
logging.dest: /var/log/kibana/kibana.log

起動設定。

systemctl enable kibana
systemctl start kibana

2.6 Filebeat設定
監視対象のログ指定と出力先をlogstashにする設定を入れる。
logstashでのフィルタ条件で利用するtagの設定も入れる。

filebeat.prospectors:
- type: log
paths:
- /var/log/rsyslog/*.log
fields:
log_type : ciscolog
fields_under_root: true
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
reload.period: 10s
index.number_of_shards: 3
index.codec: best_compression
_source.enabled: false
setup.dashboards.enabled: false
output.logstash:
hosts: ["172.16.10.50:5044"]
index: "filebeat"
username: "logstash_internal"
password: "logstashpassword"
logging.level: debug

起動設定。

systemctl enable filebeat
systemctl start filebeat

3 フィルタリングとかインデックスとか設定

3.1 設定ファイル作成
Filebeat用のlogstash設定ファイルを /etc/logstash/conf.d/beat.conf として作成する。

input {
beats {
port => 5044
}
}
filter {
if [log_type] == "ciscolog" {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:log_host} %{NUMBER:unixtimestamp}: %{SYSLOGTIMESTAMP:ciscotimestamp} %{WORD:timezone}: %{GREEDYDATA:facility}: %{DATA:application} %{GREEDYDATA:msg_1} - %{WORD:protocol} %{WORD:msg_2} %{IP:src_ip}:%{NUMBER:src_port} %{IP:dst_ip}:%{NUMBER:dst_port} %{GREEDYDATA:msg_3}"
}
}
geoip {
source => "dst_ip"
}
mutate {
convert => {
"src_port" => "integer"
"dst_port" => "integer"
}
}
}
}
output {
elasticsearch {
hosts => [ "http://172.16.10.50:9200" ]
index => "myfilebeat-%{+YYYY.MM}"
}
}

※補足説明
7行目:   if [log_type] == “ciscolog” {

filebeatの設定で /var/log/rsyslog/*.log に対してciscologというlog_typeというフィールドを付与したので, ここで条件マッチングさせている。
他にもログを捕捉する場合はこのあたりで適宜フールドを追加して処理を変える目的。
ここでは省略しているけど, audit.log等も対象としていたのでこの設定入れた。

10行目:  “message” => “%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:log_host} %{NUMBER:unixtimestamp}: %{SYSLOGTIMESTAMP:ciscotimestamp} %{WORD:timezone}: %{GREEDYDATA:facility}: %{DATA:application} %{GREEDYDATA:msg_1} – %{WORD:protocol} %{WORD:msg_2} %{IP:src_ip}:%{NUMBER:src_port} %{IP:dst_ip}:%{NUMBER:dst_port} %{GREEDYDATA:msg_3}”
CiscoルータのゾーンベースのFWログフォーマットを見ながらパーシングしたい箇所を切り出せるように記述する。
grokフィルタが良く分からないときはここ(Grok Debugger)を参考にトライアンドエラーでがんばる。GREEDYDATA(*と同義)を使って少しずつ細分化していくのがおすすめ。
14行目:  source => “dst_ip”
geoipフィルタで対象となるIPアドレスを指定する。
今回はログから通信先がどこか地図にマッピングすることを考えているので, ログ内のdst_ip(通信先IPアドレス)を指定した。
16行~21行: mutateの行
ここは実際にパースしたらなぜかtypeがtextになっていたので入れた。気になったのでtypeをコンバートする設定を追加。テンプレートあたりをしっかり理解すればこんなことしなくてもよいのかもしれない。
27行目:  index => “myfilebeat-%{+YYYY.MM}”
elasitcsearchへ出力する際にインデックスを追加する。

3.2 インデックス作成
インデックスはKibanaにログインし, 「Management → Index Patterns」でインデックスを作成する。

インデックス作成画面

これでいけるか!と思ったけど実際にマップ作成をやってみると, “kibanaのGeo Coordinatesマップで参照されるgeo_pointとなるタイプが無い”となって作成できない。

デフォルトのfilebeatのテンプレートを元に, 新たに作成する。
<手順>
Kibana上のDevToolsに「GET _template/filebeat-*」と入れ, 出力結果を編集してPUTする。
1. 元となるテンプレートを元に新しいテンプレートを作成する。

{
"filebeat": {
"order": 1,
"index_patterns": [
"myfilebeat-*"
],

~中略~
}
}
},
"geoip": {
"properties": {
"continent_name": {
"type": "keyword",
"ignore_above": 1024
},
"country_iso_code": {
"type": "keyword",
"ignore_above": 1024
},
"location": {
"type": "geo_point"
},
"region_name": {
"type": "keyword",
"ignore_above": 1024
},
"city_name": {
"type": "keyword",
"ignore_above": 1024
}
}
}

}
}
},
"aliases": {}
}
}

2. テンプレートをPUTする。
1の内容をDevToolsでPUTする。

PUT _template/filebeat
※ ここに1の内容を入れる

4. 確認

ここまで来ればKibana上で次のように表示されるはず。

インデックスが作成されている
geoip.locationというフィールドがgeo_pointタイプで作成されている

Map作成は

  1. Visualize → 「+」新規作成 → MapsのCoordinate Map
  2. myfilebeat-* を選択
  3. Bucketsで「GeoCoodinate」を選択
  4. Aggregationで「Geohash」を選択
  5. Fieldで「geoip.location」を選択

で完了。

次のようなMapが表示される。

Map表示。

Splunkのほうが大分楽にMap作成はできるけどSPLを覚えるのと比較すると, まぁどっこいどっこいかな。

とりあえずの動作確認ができた。
これからは細かく分析・その他データの可視化を目標として検証すすめる。