2021년 3월 8일 월요일

카프카와 로그스태시 연동 시 참고

로그스태시 장애 발생 시 데이터 유실을 방지하기 위해 카프카를 많이 쓰는 듯하다. 그런데 카프카는 데이터를 보낼 때 직렬화(?) 과정을 거친다고.

Filebeat > Logstash
input {
 beats {
  port => 5044
 }
}

output {
 stdout {}
}

다음은 연동 결과. filebeat는 "key" => "value" 구조의 JSON 포맷으로 데이터를 전송한다.
[2021-03-08T22:31:17,307][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
{
           "ecs" => {
        "version" => "1.6.0"
    },
    "@timestamp" => 2021-03-08T13:31:40.222Z,
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
          "host" => {
        "name" => "MHKANG"
    },
         "agent" => {
            "hostname" => "MHKANG",
        "ephemeral_id" => "2ab15975-0626-4c63-bd8f-c9ac835d88e8",
             "version" => "7.11.1",
                  "id" => "07b659c3-8d08-4892-8b8c-e78169421f87",
                "type" => "filebeat",
                "name" => "MHKANG"
    },
       "message" => "2018 Jan  9 17:31:21 Sensor sshd[1854]: Accepted password for root from 192.168.56.1 port 1939 ssh2",
           "log" => {
          "file" => {
            "path" => "d:\\edu\\multicam\\elastic\\log\\secure_sample.log"
        },
        "offset" => 0
    },
         "input" => {
        "type" => "log"
    },
      "@version" => "1"
}

Filebeat > Kafka > Logstash

카프카 설치는 여기를 참고했다.

Filebeat output 설정

filebeat에서 카프카로 보내면 로그스태시가 가져오는 구조.
input {
 kafka {
  bootstrap_servers => "localhost:9092"
  topics => ["test"]
 }
}

output {
 stdout {}
}

연동했더니 JSON 데이터가 일렬횡대로 들어온다(..) 이런 걸 직렬화됐다고 하는 건가?
[2021-03-08T22:34:07,829][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][c4d1cdfa8de21b9b613f9ce0493c5fac195fdd45e7f14851d794f56e8d0edfc8] [Consumer clientId=logstash-0, groupId=logstash] Setting offset for partition test-0 to the committed offset FetchPosition{offset=68, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[MHKANG.mshome.net:9092 (id: 0 rack: null)], epoch=0}}
{
       "message" => "{\"@timestamp\":\"2021-03-08T13:36:08.888Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"7.11.1\"},\"host\":{\"name\":\"MHKANG\"},\"agent\":{\"name\":\"MHKANG\",\"type\":\"filebeat\",\"version\":\"7.11.1\",\"hostname\":\"MHKANG\",\"ephemeral_id\":\"2cb0286a-93b8-429b-9eaf-567dea8148b0\",\"id\":\"07b659c3-8d08-4892-8b8c-e78169421f87\"},\"log\":{\"offset\":101,\"file\":{\"path\":\"d:\\\\edu\\\\multicam\\\\elastic\\\\log\\\\secure_sample.log\"}},\"message\":\"2018 Jan  9 17:31:21 Sensor sshd[1854]: pam_unix(sshd:session): session opened for user root by (uid=0)\",\"input\":{\"type\":\"log\"},\"ecs\":{\"version\":\"1.6.0\"}}",
    "@timestamp" => 2021-03-08T13:36:09.940Z,
      "@version" => "1"
}

기존 필터가 있다면 다 뜯어고쳐야하는 상황. 하지만 데이터를 원래 구조로 되돌릴 수만 있으면 해피엔딩으로 끝낼 수도 있다. 'filebeat kafka logstash' 키워드로 검색을 해봤다. codec 옵션을 추가하라는 답을 주시는 구글신.
input {
 kafka {
  bootstrap_servers => "localhost:9092"
  topics => ["test"]
  codec => json
 }
}

output {
 stdout {}
}

해피엔딩.
[2021-03-08T22:37:15,259][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main][099f9c52ec96bb79c9865c493a1b6d4648f35de5c55d61632acd7e63d0e48c0e] [Consumer clientId=logstash-0, groupId=logstash] Setting offset for partition test-0 to the committed offset FetchPosition{offset=74, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[MHKANG.mshome.net:9092 (id: 0 rack: null)], epoch=0}}
{
           "ecs" => {
        "version" => "1.6.0"
    },
    "@timestamp" => 2021-03-08T13:37:38.290Z,
          "host" => {
        "name" => "MHKANG"
    },
         "agent" => {
            "hostname" => "MHKANG",
        "ephemeral_id" => "27a762bb-e756-48d5-a5b3-5f06d4d04d28",
             "version" => "7.11.1",
                  "id" => "07b659c3-8d08-4892-8b8c-e78169421f87",
                "type" => "filebeat",
                "name" => "MHKANG"
    },
           "log" => {
          "file" => {
            "path" => "d:\\edu\\multicam\\elastic\\log\\secure_sample.log"
        },
        "offset" => 101
    },
       "message" => "2018 Jan  9 17:31:21 Sensor sshd[1854]: pam_unix(sshd:session): session opened for user root by (uid=0)",
         "input" => {
        "type" => "log"
    },
      "@version" => "1"
}

관련 글

댓글 없음:

댓글 쓰기

크리에이티브 커먼즈 라이선스