2017년 6월 25일 일요일

Elasticsearch 활용(DNS Event Log 분석 - 2nd)

데이터 정리가 문제지, 정리된 데이터의 분석은 쉽다
- 데이터 분석이 쉬워지는 정규표현식(459페이지)

엘라스틱도 마찬가지. 3006, 5156 이벤트 로그를 분석에 편리한 형태로 정리해주는 Logstash 설정은 다음과 같다.
input { 
 beats { 
  port => 5044 
  } 
 }

filter {
 if [event_id] == 3006 {
  if [event_data][QueryName] in ["localhost","wpad","isatap","lg","LG"] or [event_data][QueryType] == "28" {
   drop { }
  }

  mutate { 
   replace => {
    "type" => "dns_event"
   }
  }

  grok { 
   match => {
    "[event_data][QueryName]" => "([^.]+\.)*?(?<url>[^.]+\.((ac|co|go|ne|nm|or|pe|re)\.)?[^.]+\.?$)"
   }
  }
 }

 if [event_id] == 5156 {
  if [event_data][DestAddress] == "172.20.10.1" and [event_data][DestPort] != "53" or [event_data][SourcePort] == "1900" or [event_data][DestPort] == "1900" or [event_data][SourceAddress] == [event_data][DestAddress] or [event_data][SourceAddress] =~".*:.*"  or [event_data][SourceAddress] == "192.168.56.1" {
   drop { }
  }

  mutate { 
   replace => {
    "type" => "network_event"
   }
  }

  grok { 
   match => {
    "[event_data][Application]" => "(?<proc_path>.+\\).+"
   }
  }

  grok {
   match => {
    "[event_data][Application]" => "(.+\\)?(?<proc_name>.+)" 
   }
  }

  if [event_data][Protocol] == "6" {
   mutate {
    replace => { "[event_data][Protocol]" => "TCP" }
   }
  }

  if [event_data][Protocol] == "17" {
   mutate {
    replace => { "[event_data][Protocol]" => "UDP" }
   }
  }

  if [event_data][Direction] == "%%14593" {
   mutate {
    replace => { "[event_data][Direction]" => "OUT" }
   }
  }

  if [event_data][Direction] == "%%14592" {
   mutate {
    replace => { "[event_data][Direction]" => "IN" }
   }
  }

  if [event_data][SourceAddress] =~ "(\d+\.){3}\d+" {
   geoip {
    source => "[event_data][SourceAddress]"
    target => "src"
   }
  }

  if [event_data][DestAddress] =~ "(\d+\.){3}\d+" {
   geoip {
    source => "[event_data][DestAddress]"
    target => "dest"
   }
  }

  mutate { 
   add_field => {
    "sPort" => "%{[event_data][SourcePort]}"
    "dPort" => "%{[event_data][DestPort]}"
   }
   convert => {
    "sPort" => "float"
    "dPort" => "float"
   }
  }
 }
}

output {
 elasticsearch { 
  hosts => [ "localhost:9200" ] 
  index => "win_event"
 }
  stdout {
   codec => rubydebug 
 }
}

불필요한 데이터를 제거하고, 가독성을 높이는 과정에서 filter 영역이 좀 복잡해졌지만, 대부분 grok이나 mutate 필터를 이용해서 분석을 원하는 특정 데이터 영역을 추출하거나, 읽기 편하게 바꾸는 과정. 마지막 부분에 추가한 geoip 필터는 IP의 지리 정보를 추가해준다.


가장 마지막에 사용된 mutate 필터는 포트 정보의 숫자 크기 비교를 위해 별도의 출발/목적지 포트 필드를 추가하고, 해당 필드로 전송되는 데이터 타입을 숫자형으로 바꿔준다. 엘라스틱이 자신이 잘 모르는 데이터는 문자형으로 자동 저장하기 때문.

목적지포트 1024 미만

추가로 해당 작업이 성공하기 위해서는 엘라스틱이 자동으로 만들어준 인덱스에 대한 수정이 필요하다. 숫자형으로 데이터를 저장할 필드를 미리 만들어놔야 한다는 얘기. 그렇지 않으면 숫자형으로 전송해도 다시 문자로 저장돼버림.

다음은 기존 인덱스에 숫자 타입 필드를 추가하는 과정. 크롬 확장도구인 postman을 이용해서 작업.


필드 추가 후, 해당 인덱스를 조회하면 필드가 잘 추가되었음을 알 수 있다.


결국 엘라스틱 활용에서 데이터를 정리해주는 Logstash의 역할이 가장 중요하다고 할 수 있다. 정리되지 않은 데이터의 분석은 너무나 어렵기 때문.

댓글 없음:

댓글 쓰기

크리에이티브 커먼즈 라이선스