로그스태시 장애 발생 시 데이터 유실을 방지하기 위해 카프카를 많이 쓰는 듯하다. 그런데 카프카는 데이터를 보낼 때 직렬화(?) 과정을 거친다고.
Filebeat > Logstash
input { beats { port => 5044 }}
output { stdout {}}
[2021-03-08T22:31:17,307][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}{ "ecs" => { "version" => "1.6.0" }, "@timestamp" => 2021-03-08T13:31:40.222Z, "tags" => [ [0] "beats_input_codec_plain_applied" ], "host" => { "name" => "MHKANG" }, "agent" => { "hostname" => "MHKANG", "ephemeral_id" => "2ab15975-0626-4c63-bd8f-c9ac835d88e8", "version" => "7.11.1", "id" => "07b659c3-8d08-4892-8b8c-e78169421f87", "type" => "filebeat", "name" => "MHKANG" }, "message" => "2018 Jan 9 17:31:21 Sensor sshd[1854]: Accepted password for root from 192.168.56.1 port 1939 ssh2", "log" => { "file" => { "path" => "d:\\edu\\multicam\\elastic\\log\\secure_sample.log" }, "offset" => 0 }, "input" => { "type" => "log" }, "@version" => "1"}
Filebeat > Kafka > Logstash
카프카 설치는 여기를 참고했다.
Filebeat output 설정 |
filebeat에서 카프카로 보내면 로그스태시가 가져오는 구조.
input { kafka { bootstrap_servers => "localhost:9092" topics => ["test"] }}
output { stdout {}}
연동했더니 JSON 데이터가 일렬횡대로 들어온다(..) 이런 걸 직렬화됐다고 하는 건가?
[2021-03-08T22:34:07,829][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][c4d1cdfa8de21b9b613f9ce0493c5fac195fdd45e7f14851d794f56e8d0edfc8] [Consumer clientId=logstash-0, groupId=logstash] Setting offset for partition test-0 to the committed offset FetchPosition{offset=68, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[MHKANG.mshome.net:9092 (id: 0 rack: null)], epoch=0}}{ "message" => "{\"@timestamp\":\"2021-03-08T13:36:08.888Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"7.11.1\"},\"host\":{\"name\":\"MHKANG\"},\"agent\":{\"name\":\"MHKANG\",\"type\":\"filebeat\",\"version\":\"7.11.1\",\"hostname\":\"MHKANG\",\"ephemeral_id\":\"2cb0286a-93b8-429b-9eaf-567dea8148b0\",\"id\":\"07b659c3-8d08-4892-8b8c-e78169421f87\"},\"log\":{\"offset\":101,\"file\":{\"path\":\"d:\\\\edu\\\\multicam\\\\elastic\\\\log\\\\secure_sample.log\"}},\"message\":\"2018 Jan 9 17:31:21 Sensor sshd[1854]: pam_unix(sshd:session): session opened for user root by (uid=0)\",\"input\":{\"type\":\"log\"},\"ecs\":{\"version\":\"1.6.0\"}}", "@timestamp" => 2021-03-08T13:36:09.940Z, "@version" => "1"}
기존 필터가 있다면 다 뜯어고쳐야하는 상황. 하지만 데이터를 원래 구조로 되돌릴 수만 있으면 해피엔딩으로 끝낼 수도 있다. 'filebeat kafka logstash' 키워드로 검색을 해봤다. codec 옵션을 추가하라는 답을 주시는 구글신.
input { kafka { bootstrap_servers => "localhost:9092" topics => ["test"] codec => json }}
output { stdout {}}
해피엔딩.
[2021-03-08T22:37:15,259][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][099f9c52ec96bb79c9865c493a1b6d4648f35de5c55d61632acd7e63d0e48c0e] [Consumer clientId=logstash-0, groupId=logstash] Setting offset for partition test-0 to the committed offset FetchPosition{offset=74, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[MHKANG.mshome.net:9092 (id: 0 rack: null)], epoch=0}}{ "ecs" => { "version" => "1.6.0" }, "@timestamp" => 2021-03-08T13:37:38.290Z, "host" => { "name" => "MHKANG" }, "agent" => { "hostname" => "MHKANG", "ephemeral_id" => "27a762bb-e756-48d5-a5b3-5f06d4d04d28", "version" => "7.11.1", "id" => "07b659c3-8d08-4892-8b8c-e78169421f87", "type" => "filebeat", "name" => "MHKANG" }, "log" => { "file" => { "path" => "d:\\edu\\multicam\\elastic\\log\\secure_sample.log" }, "offset" => 101 }, "message" => "2018 Jan 9 17:31:21 Sensor sshd[1854]: pam_unix(sshd:session): session opened for user root by (uid=0)", "input" => { "type" => "log" }, "@version" => "1"}
관련 글
댓글 없음:
댓글 쓰기