2018년 9월 20일 목요일

Logstash 필터 kv

같은 구조임에도 배열이 조금씩 다른 다음과 같은 로그가  있다.

sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 action = permit
sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 attack = 10.10.10.10
sip = 1.1.1.1 dip = 2.2.2.2 attack = 10.10.10.10

이런 로그를 처리하려면 필터 설정이 복잡해지게 마련.

filter {
  if "action" in [message] {
    dissect { mapping => { "message" => "sip = %{sip} dip = %{dip} sport = %{sport} dport = %{dport} action = %{action}" } }
  } else if "port" in [message] and "attack" in [message] {
      dissect { mapping => { "message" => "sip = %{sip} dip = %{dip} sport = %{sport} dport = %{dport} attack = %{atack}" } }
  } else {
      dissect { mapping => { "message" => "sip = %{sip} dip = %{dip} attack = %{attack}" } }
  }
}

다음은 실행 결과.

[2018-09-19T21:34:12,425][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
{
    "@timestamp" => 2018-09-19T12:34:12.596Z,
          "path" => "D:/test.log",
      "@version" => "1",
           "sip" => "1.1.1.1",
       "message" => "sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 attack = 10.10.10.10\r",
          "host" => "MHKANG",
         "dport" => "200",
         "sport" => "100",
           "dip" => "2.2.2.2",
         "atack" => "10.10.10.10\r"
}
{
    "@timestamp" => 2018-09-19T12:34:12.553Z,
          "path" => "D:/test.log",
      "@version" => "1",
           "sip" => "1.1.1.1",
       "message" => "sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 action = permit\r",
          "host" => "MHKANG",
         "dport" => "200",
         "sport" => "100",
        "action" => "permit\r",
           "dip" => "2.2.2.2"
}
{
        "attack" => "10.10.10.10\r",
    "@timestamp" => 2018-09-19T12:34:12.597Z,
          "path" => "D:/test.log",
      "@version" => "1",
           "sip" => "1.1.1.1",
       "message" => "sip = 1.1.1.1 dip = 2.2.2.2 attack = 10.10.10.10\r",
          "host" => "MHKANG",
           "dip" => "2.2.2.2"
}

원했던 필드 분류는 이뤄졌지만 복잡한 필터 설정이 마음에 들지 않는다. 해당 로그는 배열이 일관되지 않을 뿐, 동일한 'key공백=공백value' 형식이 단순 나열된 구조. 더 간단하게 필드를 분류할 수 있는 방법은 없을까?

kv 필터

이름에서 감이 오는 필터. 해당 필터는 'key구분자value' 구조의 데이터를 분류해준다. 다음은 해당 필터 표현식.

filter {
  kv {
    source => "message" #message 필드 데이터를
    field_split => " "  #'공백'을 구분자로 필드 분리
    value_split => "="  #'='을 구분자로 key와 value 분리(default)
  }
}

실행 결과는 다음과 같다. 이전 필터 설정과 같은 결과.

[2018-09-19T22:14:18,825][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
{
      "@version" => "1",
           "dip" => "2.2.2.2",
         "dport" => "200",
       "message" => "sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 action = permit\r",
    "@timestamp" => 2018-09-19T13:14:18.879Z,
         "sport" => "100",
          "path" => "D:/test.log",
          "host" => "MHKANG",
           "sip" => "1.1.1.1",
        "action" => "permit\r"
}
{
      "@version" => "1",
           "dip" => "2.2.2.2",
        "attack" => "10.10.10.10\r",
       "message" => "sip = 1.1.1.1 dip = 2.2.2.2 attack = 10.10.10.10\r",
    "@timestamp" => 2018-09-19T13:14:18.944Z,
          "path" => "D:/test.log",
          "host" => "MHKANG",
           "sip" => "1.1.1.1"
}
{
      "@version" => "1",
           "dip" => "2.2.2.2",
         "dport" => "200",
        "attack" => "10.10.10.10\r",
       "message" => "sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 attack = 10.10.10.10\r",
    "@timestamp" => 2018-09-19T13:14:18.942Z,
         "sport" => "100",
          "path" => "D:/test.log",
          "host" => "MHKANG",
           "sip" => "1.1.1.1"
}

같은 구분자를 사용하는 key-value 구조의 로그라면 kv 필터만한 게 없을 듯. 참고로 해당 사례에서 로그 구조가 'key=value'가 아닌 'key공백=공백value'인데도 value_split 구분자 '='을 기준으로 정확한 필드 분류가 이루어진 이유는 value_split 구분자 주위의 공백은 무시하는 게 기본 설정이기 때문.


댓글 없음:

댓글 쓰기

크리에이티브 커먼즈 라이선스