데이터 정리가 문제지, 정리된 데이터의 분석은 쉽다- 데이터 분석이 쉬워지는 정규표현식(459페이지)
엘라스틱도 마찬가지. 3006, 5156 이벤트 로그를 분석에 편리한 형태로 정리해주는 Logstash 설정은 다음과 같다.
input { beats { port => 5044 } }
filter { if [event_id] == 3006 { if [event_data][QueryName] in ["localhost","wpad","isatap","lg","LG"] or [event_data][QueryType] == "28" { drop { } }
replace => { "type" => "dns_event" } }
grok { match => { "[event_data][QueryName]" => "([^.]+\.)*?(?<url>[^.]+\.((ac|co|go|ne|nm|or|pe|re)\.)?[^.]+\.?$)" } } }
if [event_id] == 5156 { if [event_data][DestAddress] == "172.20.10.1" and [event_data][DestPort] != "53" or [event_data][SourcePort] == "1900" or [event_data][DestPort] == "1900" or [event_data][SourceAddress] == [event_data][DestAddress] or [event_data][SourceAddress] =~".*:.*" or [event_data][SourceAddress] == "192.168.56.1" { drop { } }
mutate { replace => { "type" => "network_event" } }
grok { match => { "[event_data][Application]" => "(?<proc_path>.+\\).+" } }
grok { match => { "[event_data][Application]" => "(.+\\)?(?<proc_name>.+)" } }
if [event_data][Protocol] == "6" { mutate { replace => { "[event_data][Protocol]" => "TCP" } } }
if [event_data][Protocol] == "17" { mutate { replace => { "[event_data][Protocol]" => "UDP" } } }
if [event_data][Direction] == "%%14593" { mutate { replace => { "[event_data][Direction]" => "OUT" } } }
if [event_data][Direction] == "%%14592" { mutate { replace => { "[event_data][Direction]" => "IN" } } }
if [event_data][SourceAddress] =~ "(\d+\.){3}\d+" { geoip { source => "[event_data][SourceAddress]" target => "src" } }
if [event_data][DestAddress] =~ "(\d+\.){3}\d+" { geoip { source => "[event_data][DestAddress]" target => "dest" } }
mutate { add_field => { "sPort" => "%{[event_data][SourcePort]}" "dPort" => "%{[event_data][DestPort]}" } convert => { "sPort" => "float" "dPort" => "float" } } }}
output { elasticsearch { hosts => [ "localhost:9200" ] index => "win_event" } stdout { codec => rubydebug }}
가장 마지막에 사용된 mutate 필터는 포트 정보의 숫자 크기 비교를 위해 별도의 출발/목적지 포트 필드를 추가하고, 해당 필드로 전송되는 데이터 타입을 숫자형으로 바꿔준다. 엘라스틱이 자신이 잘 모르는 데이터는 문자형으로 자동 저장하기 때문.
목적지포트 1024 미만 |
추가로 해당 작업이 성공하기 위해서는 엘라스틱이 자동으로 만들어준 인덱스에 대한 수정이 필요하다. 숫자형으로 데이터를 저장할 필드를 미리 만들어놔야 한다는 얘기. 그렇지 않으면 숫자형으로 전송해도 다시 문자로 저장돼버림.
다음은 기존 인덱스에 숫자 타입 필드를 추가하는 과정. 크롬 확장도구인 postman을 이용해서 작업.
필드 추가 후, 해당 인덱스를 조회하면 필드가 잘 추가되었음을 알 수 있다.
결국 엘라스틱 활용에서 데이터를 정리해주는 Logstash의 역할이 가장 중요하다고 할 수 있다. 정리되지 않은 데이터의 분석은 너무나 어렵기 때문.
댓글 없음:
댓글 쓰기