올빼미공방
ElasticSearch로 웹 서비스 로그 분석 시스템 만들기 [3]- Event log를 분석하기 위한 ELK+ kafka 파이프라인 구축 본문
ElasticSearch로 웹 서비스 로그 분석 시스템 만들기 [3]- Event log를 분석하기 위한 ELK+ kafka 파이프라인 구축
운좋은올빼미 2019. 8. 22. 17:28ElasticSearch로 웹 서비스 로그 분석 시스템 만들기 [3]- Event log를 분석하기 위한 ELK+ kafka 파이프라인 구축
1. 개요
이벤트 로그를 실시간으로 처리하여 ElasticSearch에 적재하기 위하여, kafka를 이용해 파이프라인을 구축하기로 하였습니다.
kafka는 대용량의 실시간 로그 처리에 특화된 메시징 시스템이며, 발행-구독 모델로 동작합니다.
kafka의 broker는 topic을 기준으로 동작하며, producer가 특정 topic의 메시지를 생성하면 broker는 이를 topic별로 쌓아 둔뒤, 해당 topic을 구독하고 있는 consumer가 해당 메시지를 받아가는 방식으로 작동합니다.
이벤트 로그를 분석하기 위하여 구축한 현재의 시스템에서는, 이벤트 로그를 발생시키는 서버가 producer가 되고, 이 로그를 받아 es에 적재하는 logstash가 consumer가 되는 것입니다.
다만 글을 작성하는 현재 시점에서는 이벤트 로그를 발생시키는 서버가 완성되지 않아, nginx access log를 전달하는 filebeat을 producer로 이용하여 filebeat => kafka => logstash => ES 파이프라인을 구축하여 보겠습니다.
2. 구축
2.1 kafka/zookeeper 설치
kafka를 설치하기에 앞서, kafka와 같은 분산 메시지 큐의 정보를 관리해주는 역할을 하는 zookeeper를 설치해야 합니다.
sudo apt-get update && sudo apt-get install zookeeper
이제 kafka를 설치합시다.
wget http://apache.mirror.cdnetworks.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xzf kafka_2.12-2.3.0
cd kafka_2.12-2.3.0
2.2 kafka configuration 및 실행
cd config
sudo nano server.properties
를 통해 server.properties file을 수정하여 줍시다.
listeners란을 comment out해주고, port = 9092를 추가해 줍니다.
또한, advertised.host.name = localhost 도 추가해줍니다.
이는 kafka의 leader not available 오류를 해결하기 위함입니다.
이제 zookeeper와 kafka를 시작하여 줍시다.
kafka 홈 디렉토리에서, 아래의 커맨드를 입력하여 zookeeper와 kafka를 시작합시다.
sudo bin/zookeeper-server-start.sh config/zookeeper.properties
[결과]
sudo bin/kafka-server-start.sh config/server.properties
[결과]
2.3 filebeat configuration / 실행
이제 filebeat를 configuration하고 실행하여 kafka producer의 역할을 하도록 합시다.
cd /etc/filebeat
sudo nano filebeat.yml
input과 output을 아래와 같이 수정해줍니다.
hosts란에는 kafka가 설치된 서버의 주소를 작성하여 줍시다.
이제, filebeat를 실행합니다.
sudo filebeat -e
topic란에 작성한 topic명에 해당하는 topic이 작동으로 만들어지며, 이제 이 topic을 소모할 consumer인 logstash를 configure합시다.
2.4 logstash configuration /실행
cd /etc/logstash/conf.d
sudo nano logstash.conf
logstash.conf 파일의 내용을 아래와 같이 수정하여줍니다.
grok필터를 이용하여, nginx log를 parsing해 주었습니다.
input { kafka { bootstrap_servers => "localhost:9092" topics => ["nginx-logs"] } } filter { if [fileset][module] == "nginx" { if [fileset][name] == "access" { grok { match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][acce$ remove_field => "message" } mutate { add_field => { "read_timestamp" => "%{@timestamp}" } } date { match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ] remove_field => "[nginx][access][time]" } useragent { source => "[nginx][access][agent]" target => "[nginx][access][user_agent]" remove_field => "[nginx][access][agent]" } geoip { source => "[nginx][access][remote_ip]" target => "[nginx][access][geoip]" } } else if [fileset][name] == "error" { grok { match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} $ remove_field => "message" } mutate { rename => { "@timestamp" => "read_timestamp" } } date { match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ] remove_field => "[nginx][error][time]" } } } } output { elasticsearch{ hosts => ["localhost:9200"] index => "nginx-logs" } stdout { codec => rubydebug } }
이제 logstash를 consumer로서 실행시킵니다.
cd /usr/share/logstash
sudo bin/logstash -f /etc/logstash/conf.d/logstash.conf
자, 그러면 이제 nginx가 호스팅하고 있는 웹사이트에 접속하여 Access log를 변경하고 --- 1
이를 통해 filebeat harvester를 작동시켜 producer로 작동하게 하고 --- 2
logstash에 message가 로드되었는지를 확인해봅시다. --- 3
완료되었습니다! nginx가 호스팅하고 있는 사이트에 새로운 접속이 발생할때마다, filebeat => kafka => logstash로 전달되어 logstash에서 output이 발생하는 것(좌측 상단)을 확인할 수 있습니다.
'개발 > ElasticSearch' 카테고리의 다른 글
ElasticSearch로 웹 서비스 로그 분석 시스템 만들기 [2] - ELK + Filebeat module를 이용하여 내 로그를 분석하고 시각화하기 (0) | 2019.07.24 |
---|---|
ElasticSearch로 웹 서비스 로그 분석 시스템 만들기 [1] - Concept, 설치하기 (0) | 2019.07.24 |