2017년 8월 22일 화요일

Elasticsearch 활용(Snort 데이터베이스 연동)

패턴매칭 로그 대시보드를 아무리 잘 꾸며봐야 별 의미는 없지만, 그래도 보기는 좋으니깐. 예쁘게 꾸며진 대시보드를 바라보고 있으면 왠지 일 다 한 것 같은 기분도 들고(..)

보고만 있어도 좋구나^O^

다음은 Snort 데이터베이스를 엘라스틱서치로 연동해주는 Logstash 설정. 참고로 데이터베이스 연동을 안 하는 Snort는 경보와 패킷 페이로드를 따로 저장하는데, 두 개의 로그를 하나로 합쳐서 연동하는 방법은 모르겠다(..)

input {

 #데이터베이스 연동을 위한 jdbc 플러그인
 jdbc {
  jdbc_driver_library => "C:\ELK\mysql-connector-java-5.1.42\mysql-connector-java-5.1.42-bin.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost/snort"
  jdbc_user => "user"
  jdbc_password => "password"

  #로그 중복 전송을 막기 위한 기준 필드(tracking_column)로 timestamp가 아닌 필드 사용
  use_column_value => true 

  #tracking_column으로 로그 일련번호 필드 지정   
  tracking_column => cid

  #데이터 타입 지정
  tracking_column_type => "numeric" 

  #마지막으로 읽은 로그 무시, 같은 로그로 반복 테스트할 때 필수  
  clean_run => true 

  #1분 단위 실행(분, 시, 일, 월, 요일)   
  schedule => "* * * * *" 

  #실행 쿼리문(밑줄 친 부분이 tracking_column 조건문)
  statement => "select a.cid, a.timestamp, b.sig_name, inet_ntoa(c.ip_src), inet_ntoa(c.ip_dst), unhex(d.data_payload) from event a, signature b, iphdr c, data d where date_format(a.timestamp, '%Y') = '2010' and a.signature = b.sig_id and a.sid = c.sid and a.cid = c.cid and a.sid = d.sid and a.cid = d.cid and a.cid > :sql_last_value"
 }
}

filter {

  #출발지 IP에 지리 정보 매핑
 geoip {
  source => "inet_ntoa(c.ip_src)"
 }
}

output {
 elasticsearch {
  hosts => "localhost:9200"

    #지도 위치를 표시해주는 'geoip.location' 필드 타입이 'geo_point'인 기본 매핑을 사용하기 위해 'logstash'로 시작하는 인덱스명 사용. 
    #'logstash'를 붙이지 않으면 'geoip.location' 필드 타입이 숫자로 바뀌면서 지도 표시가 안 되기 때문에 별도로 매핑을 만들어줘야 함.
  index => "logstash-snort_log"
 }
 stdout { codec => rubydebug }
}

실행 쿼리문이 좀 복잡한데, Snort 데이터베이스 정규화가 너무 잘(?) 된 탓. 그래서 테이블 구조를 잘 파악할 필요가 있음.

Snort 주요 테이블 스키마

select a.cid, a.timestamp, b.sig_name, inet_ntoa(c.ip_src), inet_ntoa(c.ip_dst), unhex(d.data_payload)
from event a, signature b, iphdr c, data d
where date_format(a.timestamp, '%Y') = '2010'
#룰 일련번호 필드로 eventsignature 테이블 조인
and a.signature = b.sig_id
#로그/센서 일련버호 필드로 eventiphdr 테이블 조인
and a.sid = c.sid and a.cid = c.cid
#로그/센서 일련버호 필드로 eventdata 테이블 조인
and a.sid = d.sid and a.cid = d.cid

쿼리문 특징을 살펴보면, 'inet_ntoa' 함수는 iphdr 필드의 정수값을 원래 IP 형식으로 변환해주며, 'unhex' 함수는 16진수로 저장된 패킷 페이로드를 문자열로 변환해준다.

데이터 정합성 유지를 위한 테이블 조인 구문만 주의하면 구조 파악이 그리 어렵지는 않을 것이다. 다음은 해당 쿼리문 실행 결과.


댓글 없음:

댓글 쓰기

크리에이티브 커먼즈 라이선스