2025년 3월 17일 월요일

ingest pipeline - 6th

로그스태시는 조건문으로 필터의 유기적인 실행 관계를 설정할 수 있다. beat processor도 마찬가지. 다음은 데이터 조건에 따라 달라지는 로그스태시 필터.
filter {
 mutate {
  remove_field => ["@timestamp", "@version", "path", "host"]
 }

 dissect {
  mapping => {"message" => '%{} "%{} %{uri} %{}" %{}'}
 }
 
if [uri] =~ '\?' {
  grok {
   match => {'uri' => '(?<url>[^?]+)\?(?<param>.*)'}
  }
 } else {
  mutate {
   copy => {'uri' => 'url'}
  }
 }
}
{
    "message" => "1.2.3.4 - - [12/Oct/2024:02:42:00 +0900] \"GET /bbs/view.html HTTP/1.1\" 200 37727\r",
        "uri" => "/bbs/view.html",
        "url" => "/bbs/view.html"
}
{
    "message" => "192.168.56.1 - - [12/Oct/2024:02:42:00 +0900] \"GET /bbs/view.php?board_id=kor%5Fmedia&gul_no=1106&idx=17&m=4&upage=25&tpage=&PAGE=4 HTTP/1.1\" 200 37727\r",
      "param" => "board_id=kor%5Fmedia&gul_no=1106&idx=17&m=4&upage=25&tpage=&PAGE=4",
        "uri" => "/bbs/view.php?board_id=kor%5Fmedia&gul_no=1106&idx=17&m=4&upage=25&tpage=&PAGE=4",
        "url" => "/bbs/view.php"
}

역시 데이터 조건에 따라 달라지는 beat processor.
processors:
  - include_fields:
      fields: "message"
  - dissect:
      tokenizer: '%{} "%{} %{uri} %{}" %{}'
      target_prefix: ""
  - if:
      contains.uri: "?"
    then:
      - dissect:
          field: "uri"
          tokenizer: "%{url}?%{param}"
          target_prefix: ""
    else:
      copy_fields:
        fields:
          - from: uri
            to: url
{
  "@timestamp": "2025-03-17T08:13:47.537Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.17.0"
  },
  "message": "1.2.3.4 - - [12/Oct/2024:02:42:00 +0900] \"GET /bbs/view.html HTTP/1.1\" 200 37727",
  "uri": "/bbs/view.html",
  "url": "/bbs/view.html"
}
{
  "@timestamp": "2025-03-17T08:13:47.537Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.17.0"
  },
  "message": "192.168.56.1 - - [12/Oct/2024:02:42:00 +0900] \"GET /bbs/view.php?board_id=kor%5Fmedia&gul_no=1106&idx=17&m=4&upage=25&tpage=&PAGE=4 HTTP/1.1\" 200 37727",
  "uri": "/bbs/view.php?board_id=kor%5Fmedia&gul_no=1106&idx=17&m=4&upage=25&tpage=&PAGE=4",
  "param": "board_id=kor%5Fmedia&gul_no=1106&idx=17&m=4&upage=25&tpage=&PAGE=4",
  "url": "/bbs/view.php"
}

그런데 ingest pipeline은 processor에 종속된 조건문만을 지원한다. 다음은 ?가 없는 데이터를 처리하지 못하는 ingest pipeline. 

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "grok": {
          "field": "uri",
          "patterns": ["(?<url>[^?]+)\\?(?<param>.*)"]
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "uri": "index.php?a=b"
      }
    },
    {
      "_source": {
        "uri": "index.html"
      }
    }
  ]
}
{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.php?a=b",
          "param": "a=b",
          "url": "index.php"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:23:55.1105587Z"
        }
      }
    },
    {
      "error": {
        "root_cause": [
          {
            "type": "illegal_argument_exception",
            "reason": "Provided Grok expressions do not match field value: [index.html]"
          }
        ],
        "type": "illegal_argument_exception",
        "reason": "Provided Grok expressions do not match field value: [index.html]"
      }
    }
  ]
}

? 조건을 주면 ?가 있는 데이터만을 처리한다.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "grok": {
          "field": "uri",
          "patterns": ["(?<url>[^?]+)\\?(?<param>.*)"],
          "if": "ctx.uri.contains('?')"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "uri": "index.php?a=b"
      }
    },
    {
      "_source": {
        "uri": "index.html"
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.php?a=b",
          "param": "a=b",
          "url": "index.php"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:27:14.3998071Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.html"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:27:14.3998071Z"
        }
      }
    }
  ]
}

이때 해법은 정규표현식을 추가하거나,

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "grok": {
          "field": "uri",
          "patterns": [
            "(?<url>[^?]+)\\?(?<param>.*)",
            "(?<url>.*)"
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "uri": "index.php?a=b"
      }
    },
    {
      "_source": {
        "uri": "index.html"
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.php?a=b",
          "param": "a=b",
          "url": "index.php"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:35:46.6631911Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.html",
          "url": "index.html"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:35:46.6631911Z"
        }
      }
    }
  ]
}

on_failure 옵션으로 예외처리를 하거나,

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "grok": {
          "field": "uri",
          "patterns": ["(?<url>[^?]+)\\?(?<param>.*)"],
          "on_failure": [
            {
              "set": {
                "field": "url",
                "copy_from": "uri"
              }
            }
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "uri": "index.php?a=b"
      }
    },
    {
      "_source": {
        "uri": "index.html"
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.php?a=b",
          "param": "a=b",
          "url": "index.php"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:38:37.0469351Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.html",
          "url": "index.html"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:38:37.0469351Z"
        }
      }
    }
  ]
}

script processor를 사용하거나.

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "script": {
          "source": """
            if (ctx.uri.contains('?')) {
              def str = /([^?]+)\?(.*)/.matcher(ctx.uri);
              if (str.find()) {
                ctx.url = str.group(1);
                ctx.param = str.group(2);
              }  
            } else {
              ctx.url = ctx.uri
            }
          """
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "uri": "index.php?a=b"
      }
    },
    {
      "_source": {
        "uri": "index.html"
      }
    }
  ]
}

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.php?a=b",
          "param": "a=b",
          "url": "index.php"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:41:08.5446373Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "uri": "index.html",
          "url": "index.html"
        },
        "_ingest": {
          "timestamp": "2025-03-17T08:41:08.5446373Z"
        }
      }
    }
  ]
}

크리에이티브 커먼즈 라이선스