sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 action = permit
sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 attack = 10.10.10.10
sip = 1.1.1.1 dip = 2.2.2.2 attack = 10.10.10.10
이런 로그를 처리하려면 필터 설정이 복잡해지게 마련.
filter {
if "action" in [message] {
dissect { mapping => { "message" => "sip = %{sip} dip = %{dip} sport = %{sport} dport = %{dport} action = %{action}" } }
} else if "port" in [message] and "attack" in [message] {
dissect { mapping => { "message" => "sip = %{sip} dip = %{dip} sport = %{sport} dport = %{dport} attack = %{atack}" } }
} else {
dissect { mapping => { "message" => "sip = %{sip} dip = %{dip} attack = %{attack}" } }
}
}
다음은 실행 결과.
[2018-09-19T21:34:12,425][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
{
"@timestamp" => 2018-09-19T12:34:12.596Z,
"path" => "D:/test.log",
"@version" => "1",
"sip" => "1.1.1.1",
"message" => "sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 attack = 10.10.10.10\r",
"host" => "MHKANG",
"dport" => "200",
"sport" => "100",
"dip" => "2.2.2.2",
"atack" => "10.10.10.10\r"
}
{
"@timestamp" => 2018-09-19T12:34:12.553Z,
"path" => "D:/test.log",
"@version" => "1",
"sip" => "1.1.1.1",
"message" => "sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 action = permit\r",
"host" => "MHKANG",
"dport" => "200",
"sport" => "100",
"action" => "permit\r",
"dip" => "2.2.2.2"
}
{
"attack" => "10.10.10.10\r",
"@timestamp" => 2018-09-19T12:34:12.597Z,
"path" => "D:/test.log",
"@version" => "1",
"sip" => "1.1.1.1",
"message" => "sip = 1.1.1.1 dip = 2.2.2.2 attack = 10.10.10.10\r",
"host" => "MHKANG",
"dip" => "2.2.2.2"
}
원했던 필드 분류는 이뤄졌지만 복잡한 필터 설정이 마음에 들지 않는다. 해당 로그는 배열이 일관되지 않을 뿐, 동일한 'key공백=공백value' 형식이 단순 나열된 구조. 더 간단하게 필드를 분류할 수 있는 방법은 없을까?
kv 필터
이름에서 감이 온다. 해당 필터는 'key구분자value' 구조의 데이터를 분류해준다. 다음은 해당 필터 표현식.
filter {
kv {
source => "message" #message 필드 데이터를 (default)
field_split => " " #'공백'을 구분자로 필드 분리 (default)
value_split => "=" #'='을 구분자로 key와 value 분리 (default)
}
}
실행 결과는 다음과 같다. 이전 필터 설정과 같은 결과.
[2018-09-19T22:14:18,825][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
{
"@version" => "1",
"dip" => "2.2.2.2",
"dport" => "200",
"message" => "sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 action = permit\r",
"@timestamp" => 2018-09-19T13:14:18.879Z,
"sport" => "100",
"path" => "D:/test.log",
"host" => "MHKANG",
"sip" => "1.1.1.1",
"action" => "permit\r"
}
{
"@version" => "1",
"dip" => "2.2.2.2",
"attack" => "10.10.10.10\r",
"message" => "sip = 1.1.1.1 dip = 2.2.2.2 attack = 10.10.10.10\r",
"@timestamp" => 2018-09-19T13:14:18.944Z,
"path" => "D:/test.log",
"host" => "MHKANG",
"sip" => "1.1.1.1"
}
{
"@version" => "1",
"dip" => "2.2.2.2",
"dport" => "200",
"attack" => "10.10.10.10\r",
"message" => "sip = 1.1.1.1 dip = 2.2.2.2 sport = 100 dport = 200 attack = 10.10.10.10\r",
"@timestamp" => 2018-09-19T13:14:18.942Z,
"sport" => "100",
"path" => "D:/test.log",
"host" => "MHKANG",
"sip" => "1.1.1.1"
}
같은 구분자를 사용하는 key-value 구조의 로그라면 kv만한 게 없을 듯. 참고로 해당 사례에서 로그 구조가 'key=value'가 아닌 'key공백=공백value'인데도 value_split 구분자 '='을 기준으로 정확한 필드 분류가 이루어진 이유는 value_split 구분자 주위의 공백은 무시하는 게 기본 설정이기 때문.
관련 글
댓글 없음:
댓글 쓰기