logstash从入门到放弃

1 安装logstash

yum install -y java-1.8.0
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
cat /etc/yum.repos.d/logstash.repo 
[logstash-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

yum -y install logstash

或者下载
yum -y localinstall logstash-7.17.3-x86_64.rpm 
ln -sv /usr/share/logstash/bin/logstash /usr/local/bin/
下载地址:
https://www.elastic.co/downloads/past-releases#logstash

启动logstash

2 修改logstash的配置⽂件

(1)编写配置⽂件
cat > conf.d/01-stdin-to-stdout.conf <<'EOF'
input {
 stdin {}
}
output {
 stdout {}
}
EOF
(2)检查配置⽂件语法
logstash -tf conf.d/01-stdin-to-stdout.conf
(3)启动logstash实例
logstash -f conf.d/01-stdin-to-stdout.conf

3 input插件使用

3.1 input插件基于file案例

input {
 file {
 # 指定收集的路径
 path => ["/tmp/test/*.txt"]
 # 指定⽂件的读取位置,仅在".sincedb*"⽂件中没有记录的情况下⽣效!
 start_position => "beginning" 
 # start_position => "end" 
 }
}
output {
 stdout {}
}

3.2 input插件基于tcp案例

input {
 tcp {
 port => 8888
 }
 tcp {
 port => 9999
 }
}
output {
 stdout {}
}

3.3 input插件基于http案例

input {
 http {
 port => 8888
 }
 http {
 port => 9999
 }
}
output {
 stdout {}
}

3.4 input插件基于redis案例

filebeat的配置:(仅供参考)
filebeat.inputs:
- type: tcp
 host: "0.0.0.0:9000"
output.redis:
 # 写⼊redis的主机地址
 hosts: ["10.0.0.101:6379"]
 # 指定redis的认证⼝令
 password: "oldboyedu"
 # 指定连接数据库的编号
 db: 5
 # 指定的key值
 key: "oldboyedu-linux80-filebeat"
 # 规定超时时间.
 timeout: 3
 logstash的配置:
input {
 redis {
   # 指定的是REDIS的键(key)的类型
   data_type => "list"
   # 指定数据库的编号,默认值是0号数据库
   db => 5
   # 指定数据库的ip地址,默认值是localhost
   host => "10.0.0.101"
   # 指定数据库的端⼝号,默认值为6379
   port => 6379
   # 指定redis的认证密码
   password => "oldboyedu"
   # 指定从redis的哪个key取数据
   key => "oldboyedu-linux80-filebeat" 
 }
}
output {
 stdout {}
}

3.5 input插件基于beats案例

filbeat配置:
filebeat.inputs:
- type: tcp
 host: "0.0.0.0:9000"
output.logstash:
 hosts: ["10.0.0.101:5044"]
 logstsh配置:
input {
 beats {
 port => 5044
 }
}
output {
 stdout {}
}

4 output插件使用

4.1 output插件基于redis案例

input {
 tcp {
 port => 9999
 }
}
output {
 stdout {}
 redis {
 # 指定redis的主机地址
 host => "10.0.0.101"
 # 指定redis的端⼝号
 port => "6379"
 # 指定redis数据库编号
 db => 10
 # 指定redis的密码
 password => "oldboyedu"
 # 指定写⼊数据的key类型
 data_type => "list"
 # 指定的写⼊的key名称
 key => "devopstack-logstash"
 }
}

验证

[root@logstash config-logstah]# nc 127.0.0.1 9999
fff
fff
ffffff
fffffff
^C

127.0.0.1:6379[10]> LRANGE devopstack-logstash 1 -1 
1) "{\"@version\":\"1\",\"port\":46670,\"message\":\"fff\",\"@timestamp\":\"2023-06-28T03:17:43.659Z\",\"host\":\"localhost\"}"
2) "{\"@version\":\"1\",\"port\":46670,\"message\":\"ffffff\",\"@timestamp\":\"2023-06-28T03:17:45.240Z\",\"host\":\"localhost\"}"
3) "{\"@version\":\"1\",\"port\":46670,\"message\":\"fffffff\",\"@timestamp\":\"2023-06-28T03:17:47.087Z\",\"host\":\"localhost\"}"

4.2 output插件基于file案例

input {
 tcp {
 port => 9999
 }
}
output {
 stdout {}
 file {
 # 指定磁盘的落地位置
 path => "/tmp/oldboyedu-linux80-logstash.log"

4.3 output插件基于elasticsearch案例

[root@elk101.oldboyedu.com ~]# cat config-logstash/11-many-to-es.conf 
input {
 beats {
 port => 8888
 }
 redis {
 data_type => "list"
 db => 8
 host => "10.0.0.101"
 port => 6379
 password => "oldboyedu"
 key => "oldboyedu-linux80-filebeat" 
 }
}
output {
 stdout {}
 elasticsearch {
 hosts => ["10.0.0.101:9200","10.0.0.102:9200","10.0.0.103:9200"]
 index => "oldboyedu-linux80-logstash-%{+YYYY.MM.dd}"
 }
}
[root@elk101.oldboyedu.com ~]# logstash -f config-logstash/11-many-toes.conf

4.4 logstash综合案例

logstash配置 
input {
   tcp {
     type => "devopstack-tcp"
     port => 6666
 }
   beats {
     type => "devopstack-beat"
     port => 7777
 }
   redis {
     type => "devopstack-redis"
     data_type => "list"
     db => 8
     host => "192.168.10.37"
     port => 6379
     password => "devstack.com"
     key => "devopstack-filebeat-redis" 
 }
}
output {
   stdout {}
   if [type] == "devopstack-tcp" {
      elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"

        index => "devopstack-tcp-%{+YYYY.MM.dd}"
 }
   } else if [type] == "devopstack-beat" {
       elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"
          index => "devopstack-beat-%{+YYYY.MM.dd}"
 }
   } else if [type] == "devopstack-redis" {
       elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"
          index => "devopstack-redis-%{+YYYY.MM.dd}"
 }
  } else {
      elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"
          index => "devopstack-other-%{+YYYY.MM.dd}"
 }
 }
}

filebeat配置 

filebeat.inputs:
 - type: tcp
   host: "0.0.0.0:8888"
output.redis:
   hosts: ["192.168.10.37"]
   db: 8
   password: "devstack.com"
   key: "devopstack-filebeat-redis" 
   timeout: 3

filebeat.inputs:
  - type: tcp
    host: "0.0.0.0:9999"
output.logstash:
 hosts: ["192.168.10.37:7777"]

通过nc命令输入数据测试,结果如下图

logstash从入门到放弃logstash从入门到放弃

5 logstash插件filter

5.1 logstash插件grok

Grok 是 Logstash 最重要的插件。它可以解析任意文本并把它结构化。因此 Grok 是将非结构化的日志数据解析为可查询的结构化数据的好方法。 这个工具非常适合 syslog 日志、apache 和其他 web 服务器日志、mysql 日志,以及那些通常为人(而不是计算机)编写的日志格式。 Grok 使用正则表达式提取日志记录中的数据,这也正是 grok 强大的原因。Grok 使用的正则表达式语法与 Perl 和 Ruby 语言中的正则表达式语法类似。你还可以在 grok 里预定义好命名正则表达式,并在稍后(grok 参数或者其他正则表达式里)引用它。

5.1.1 Grok 语法

语法格式: %{SYNTAX:SEMANTIC} SYNTAX 是文本匹配的正则表达式模式。比如 NUMBER 模式可以匹配到 3.15 之类的数字;IP 模式可以匹配到 192.168.0.1 等 IP 地址。 SEMANTIC 是为匹配的文本提供的标识符。比如,3.15 可以是事件的 duration(持续时间),因此可以简单地将其称为 duration;字符串 192.168.0.1 用来标识发出请求的 client。 因此和用下面的模式来结构化日志记录: %{NUMBER:duration} %{IP:client} 其实上面的模式还不完善,%{NUMBER:duration} 匹配到的内容为字符串。如果你通过 duration 字段进行查询,比较的方式为字符串间的比较。我们可以在 duration 模式中添加数据类型转换的逻辑,将字符串更改为整数,只需要添加目标数据类型的后缀就可以了。例如 %{NUMBER:duration:float},它将 duration 语义从字符串转换为浮点数。目前支持的转换的类型只有 int 和 float。 我们可以通过 Grok Debugger 来学习和测试 grok 模式:

5.1.2 使用grok内置案例

filebeat 
cat 13_filebeat_to_logstash.yml 
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log

output.logstash:
 hosts: ["192.168.10.37:7777"]

logstash 

input {
   beats {
     type => "devopstack-beat"
     port => 7777
 }
}

filter {

  grok {

  match => {
  "message" => "%{HTTPD_COMMONLOG}"    

   }
 }

}

output {
   stdout {}
   if [type] == "devopstack-beat" {
       elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"
          index => "devopstack-beat-%{+YYYY.MM.dd}"
 }
 }
}

5.1.3 使用grok内置变量2案例

简单的例子

127.0.0.1 GET /index.php 87344 0.061
gork表达式

%{IP:client} %{WORD:method} %{URIPATHPARAM:path} %{NUMBER:bytes} %{NUMBER:duration}
写入配置文件为:

input {
  file {
    path => "/path/to/file"
  }
}
filter {
  grok {
    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:path} %{NUMBER:bytes} %{NUMBER:duration}" }
  }
}

output{
    stdout{ }
}
最终匹配如下:

55.3.244.1 GET /index.html 15824 0.043
{
       "request" => "/index.html",
         "bytes" => "15824",
       "message" => "55.3.244.1 GET /index.html 15824 0.043",
        "client" => "55.3.244.1",
      "@version" => "1",
      "duration" => "0.043",
    "@timestamp" => 2023-07-20T15:15:01.818Z,
          "host" => "logstash",
        "method" => "GET"
}

5.1.4 使用grok自定义变量案例

[root@logstash config-logstah]# cat 08-stdin-grok3-to-stdout.conf 
input {
  stdin {}
}

filter {

  grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{POSTFIX_QUEUEID:devopstack_queue_id} ---> %{DEVOPSTACK:DEVOPSTACK_elk}" } 
   }
 }

output {
  stdout {}

}
[root@logstash config-logstah]# cat patterns
POSTFIX_QUEUEID [0-9A-F]{10,11}
DEVOPSTACK [\d]{3}

启动查看结果

The stdin plugin is now waiting for input:
ABCDE12345678910 ---> 333FGHIJK
{
               "@version" => "1",
    "devopstack_queue_id" => "12345678910",
                "message" => "ABCDE12345678910 ---> 333FGHIJK",
             "@timestamp" => 2023-07-24T14:36:48.561Z,
                   "host" => "logstash",
         "DEVOPSTACK_elk" => "333"
}

5.1.5 grok内置变量参考

参考地址:
https://github.com/logstash-plugins/logstash-patternscore/tree/main/patterns/legacy

5.2 filter通用字段

[root@logstash config-logstah]# cat 09-beat-grok-tongyong.conf 
input {
   beats {
     port => 7777
 }
}

filter {

  grok {

  match => {
  "message" => "%{HTTPD_COMMONLOG}"    

   }

  # 移除指定的字段
  remove_field => [ "host", "@version", "ecs", "tags","agent","input", "log" ]
  # 添加指定的字段
  add_field => { 
  "address" => "https://devopstack.cn"
  "devopstack-clientip" => "clientip --- %{clientip}"
  "study" => "k8s,jenkins,python"
  }
  # 添加tag 
  add_tag => [ "devopstack","zookeeper","kafka","elk" ] 
  # 移除tag
  remove_tag => [ "zookeeper", "kafka" ]
  # 创建插件的唯⼀ID,如果不创建则系统默认⽣成
  id => "nginx"

 }

}

output {
   stdout {}
}

结果如下
{
                   "tags" => [
        [0] "devopstack",
        [1] "elk"
    ],
                   "auth" => "-",
                "address" => "https://devopstack.cn",
                  "bytes" => "4833",
              "timestamp" => "24/Jul/2023:23:17:09 +0800",
                  "study" => "k8s,jenkins,python",
                "request" => "/",
             "@timestamp" => 2023-07-24T15:17:17.428Z,
    "devopstack-clientip" => "clientip --- 127.0.0.1",
            "httpversion" => "1.1",
                "message" => "127.0.0.1 - - [24/Jul/2023:23:17:09 +0800] \"GET / HTTP/1.1\" 200 4833 \"-\" \"curl/7.29.0\" \"-\"",
                  "ident" => "-",
               "response" => "200",
                   "verb" => "GET",
               "clientip" => "127.0.0.1"
}

5.3 filter插件之mutate

产生数据python脚本
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author : oldboyedu-linux80
import datetime
import random
import logging
import time
import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT, filename=sys.argv[1], filemode='a',)
actions = ["浏览⻚面", "评论商品", "加入收藏", "加入购物⻋", "提交订单", "使用优 惠券", "领取优惠券","搜索", "查看订单", "付款", "清空购物⻋"]
while True:
  time.sleep(random.randint(1, 5))
  user_id = random.randint(1, 10000)
# 对生成的浮点数保留2位有效数字.
  price = round(random.uniform(15000, 30000),2)
  action = random.choice(actions)
  svip = random.choice([0,1])
  logging.info("DAU|{0}|{1}|{2}|{3}".format(user_id,action,svip,price))

nohup python generate_log.py /tmp/app.log &>/dev/null

INFO 2023-08-07 23:11:59 [com.oldboyedu.generate_log] - DAU|5119|搜索|1|20606.28 
INFO 2023-08-07 23:12:01 [com.oldboyedu.generate_log] - DAU|3936|提交订单|0|15216.44 
INFO 2023-08-07 23:12:04 [com.oldboyedu.generate_log] - DAU|2910|浏览页面|0|22506.79 
INFO 2023-08-07 23:12:09 [com.oldboyedu.generate_log] - DAU|682|清空购物车|1|21900.13 
INFO 2023-08-07 23:12:14 [com.oldboyedu.generate_log] - DAU|117|清空购物车|1|19561.11 
INFO 2023-08-07 23:12:17 [com.oldboyedu.generate_log] - DAU|4250|查看订单|0|25748.39 
INFO 2023-08-07 23:12:18 [com.oldboyedu.generate_log] - DAU|4017|评论商品|0|29291.62

filebeat配置
cat 15_filebeat_to_logstash_mutate.yml 
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /tmp/app.log

output.logstash:
 hosts: ["192.168.10.37:7777"]

logstash配置
[root@logstash config-logstah]# cat 13-beat-grok-mutate.conf 
input {
   beats {
     port => 7777
 }
}

filter {
  mutate {
     add_field => {
     "school" => "北京市昌平区沙河镇老男孩IT教育" 
     }
     remove_field => [ "@timestamp", "agent", "host", "@version", "ecs","tags","input", "log" ]
}

  mutate {
# 对"message"字段内容使用"|"进行切分。
    split => {
        "message" => "|"
    }
  }
  mutate {
# 添加字段,其中引用到了变量
    add_field => {
         "user_id" => "%{[message][1]}"
         "action" => "%{[message][2]}"
         "svip" => "%{[message][3]}"
         "price" => "%{[message][4]}"
    }
  }
  mutate {
      strip => ["svip"]
}
    mutate {
# 将指定字段转换成相应对数据类型.
      convert => {
       "user_id" => "integer"
       "svip" => "boolean"
       "price" => "float"
      }
    }
    mutate {
# 将"price"字段拷⻉到"oldboyedu-linux80-price"字段中.
      copy => { "price" => "oldboyedu-linux80-price" }
    }
    mutate {
# 修改字段到名称
     rename => { "svip" => "oldboyedu-ssvip" }
   }
    mutate {
# 替换字段的内容
      replace => { "message" => "%{message}: My new message" }
    }
    mutate {
# 将指定字段的字母全部大写
     uppercase => [ "message" ]
    }
  }

output {
   stdout {}
 #  if [type] == "devopstack-beat" {
 #      elasticsearch {
 #         hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
 #         user => "elastic"
 #         password => "aka2velfFJGz178SExyi"
 #         index => "devopstack-beat-%{+YYYY.MM.dd}"
# }
# }
}

结果
{
            "oldboyedu-ssvip" => true,
                    "message" => "INFO 2023-08-07 23:11:33 [COM.OLDBOYEDU.GENERATE_LOG] - DAU,5369,领取优惠券,1,24315.33 : MY NEW MESSAGE",
    "oldboyedu-linux80-price" => 24315.33,
                     "action" => "领取优惠券",
                     "school" => "北京市昌平区沙河镇老男孩IT教育",
                    "user_id" => 5369,
                      "price" => 24315.33
}

5.3.1 gsub

5.3.2 convert

5.4 filter插件之date

统一es写入时间和日志实际产生时间一致。
          "ident" => "-",
        "address" => "https://devopstack.cn",
           "auth" => "-",
           "verb" => "GET",
          "bytes" => "3650",
     "@timestamp" => 2023-07-25T15:21:32.399Z,
        "request" => "/ttesxt",
       "response" => "404",
          "study" => "k8s,jenkins,python",
      "timestamp" => "24/Jul/2023:23:17:15 +0800",
        "message" => "127.0.0.1 - - [24/Jul/2023:23:17:15 +0800] \"GET /ttesxt HTTP/1.1\" 404 3650 \"-\" \"curl/7.29.0\" \"-\"",
    "httpversion" => "1.1",
       "clientip" => "127.0.0.1"

如上 @timestamp 和timestamp时间是不统一的,我们需要把日志的实际产生时间写入 @timestamp

配置如下
input {
   beats {
     port => 7777
 }
}

filter {

  grok {

  match => {
  "message" => "%{HTTPD_COMMONLOG}"    

   }

  # 移除指定的字段
  remove_field => [ "host", "@version", "ecs", "tags","agent","input", "log" ]
  # 添加指定的字段
  add_field => { 
  "address" => "https://devopstack.cn"
  "study" => "k8s,jenkins,python"
  }

 }

  date {
    #以下2种match写法均可
    match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]

    #match => ["timestamp","dd/MMM/yyyy:HH:mm:ss +0800"] 

    timezone => "Asia/Shanghai"
    target => "@timestamp"
    remove_field => ["timestamp"]
 } 
}

output {
   stdout {}
   if [type] == "devopstack-beat" {
       elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"
          index => "devopstack-beat-%{+YYYY.MM.dd}"
 }
 }
}

结果如下 
{
        "address" => "https://devopstack.cn",
       "response" => "404",
          "bytes" => "3650",
          "study" => "k8s,jenkins,python",
           "verb" => "GET",
        "request" => "/ttesxt",
    "httpversion" => "1.1",
     "@timestamp" => 2023-07-24T15:17:15.000Z,
        "message" => "127.0.0.1 - - [24/Jul/2023:23:17:15 +0800] \"GET /ttesxt HTTP/1.1\" 404 3650 \"-\" \"curl/7.29.0\" \"-\"",
       "clientip" => "127.0.0.1",
          "ident" => "-",
           "auth" => "-"
}

5.5 filter插件之kv

5.6 filter插件之json

5.7 filter插件dissect

5.8 filter插件之geoip

配置如下
  geoip {
   #指定用于解析ip的字段 
   source => "clientip"
   #设置要展示的字段,不设置的话是展示所有geoip解析字段。
   fields => ["city_name","country_name","ip"]
   #指定geoip的输出字段,自定义。不指定默认字段为geoip
   target => "devopstack-geoip"     
 } 

结果
{
             "address" => "https://devopstack.cn",
               "bytes" => "4833",
               "study" => "k8s,jenkins,python",
                "verb" => "GET",
            "response" => "200",
          "@timestamp" => 2023-07-24T15:17:11.000Z,
    "devopstack-geoip" => {
        "country_name" => "China",
           "city_name" => "Shanghai",
                  "ip" => "139.226.12.66"
    },

5.9 filter插件之useragent分析客户端的设备类型

配置
input {
   beats {
     port => 7777
 }
}

filter {

  date {
    match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
    timezone => "Asia/Shanghai"
    target => "@timestamp"
    remove_field => ["timestamp"]
 }

  mutate {
    # 移除指定的字段
    remove_field => [ "host", "@version", "ecs", "tags","agent","input", "log" ]
    # 添加指定的字段
    add_field => {
     "address" => "https://devopstack.cn"
     "study" => "k8s,jenkins,python"
  }
}
  geoip {
   source => "clientip"
   fields => ["city_name","country_name","ip"]
   target => "devopstack-geoip"
 }

 useragent {
  # 指定客户端的设备相关信息的字段
  source => "http_user_agent"
  # 将分析的数据存储在⼀个指定的字段中,若不指定,则默认存储在target字段中。
  target => "devopstack-useragent"

 }

} 

output {
   stdout {}
   if [type] == "devopstack-beat" {
       elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"
          index => "devopstack-beat-%{+YYYY.MM.dd}"
 }  
 }
} 

结果
   "devopstack-useragent" => {
           "os_full" => "Windows 10",
           "version" => "114.0.0.0",
          "os_major" => "10",
            "device" => "Other",
             "minor" => "0",
              "name" => "Chrome",
             "patch" => "0",
           "os_name" => "Windows",
             "major" => "114",
                "os" => "Windows",
        "os_version" => "10"
    },

6 logstash过滤nginx匹配规则案例

数据如下 
[03/Aug/2023:10:15:30 +0000] 192.168.1.100 - - 127.0.0.1 keep-alive example.com "GET /page.html HTTP/1.1" 200 1024 "https://www.google.com" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36" 0.123 0.045

匹配规则
 grok {
        match => {
          "message" => '\[%{HTTPDATE:timestamp}\] %{IPORHOST:req.remote_addr} (?:%{DATA:req.x_forward_for}|-) (?:%{DATA:req.upstream_addr}|-) %{DATA:req.server_addr} %{DATA:req.connection} %{DATA:req.host} \"(?:%{WORD:req.method} %{NOTSPACE:req.url}(?: %{NOTSPACE:req.protocol})?|%{DATA:req.rawrequest})\" %{NUMBER:resp.code} (?:%{NUMBER:req.bytes}|-) (?:%{URI:req.referer}|-) %{QS:req.user_agent} %{NUMBER:req.request_time} (?:%{NUMBER:req.upstream_response_time}|-)'
        }
        remove_field => ["message"]
        timeout_millis => 5000
      }
      date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
        target => "@timestamp"
        timezone => "Asia/Shanghai"
        remove_field => ["timestamp"]
      }
      mutate {
        gsub => [
          "resp.code", "-", "0",
          "req.bytes", "-", "0",
          "req.request_time", "-", "0",
          "req.upstream_response_time", "-", "0"
        ]
        convert => {
          "resp.code" => "integer"
          "req.bytes" => "integer"
          "req.request_time" => "float"
          "req.upstream_response_time" => "float"
        }

结果

timestamp: 03/Aug/2023:10:15:30 +0000
req.remote_addr: 192.168.1.100
req.x_forward_for: -
req.upstream_addr: -
req.server_addr: 127.0.0.1
req.connection: keep-alive
req.host: example.com
req.method: GET
req.url: /page.html
req.protocol: HTTP/1.1
resp.code: 200
req.bytes: 1024
req.referer: https://www.google.com
req.user_agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 Safari/537.36
req.request_time: 0.123
req.upstream_response_time: 0.045

7 logstash 多if分支案例

input {
  beats {
    type => "devopstack-beats"
    port => 8888
  }
  tcp {
     type => "devopstack-tcp"
     port => 9999
  }
  tcp {
     type => "devopstack-tcp-new"
     port => 7777
  }
  http {
     type => "devopstack-http"
     port => 6666
  }
  file {
     type => "devopstack-file"
     path => "/tmp/apps.log"
  }
}
filter {
  mutate {
    add_field => {
      "address" => "shanghai-qingpu"
    }
  }
  if [type] in ["devopstack-beats","devopstack-tcp-new","devopstack-http"]
  {
    mutate {
      add_field => {
      remove_field => ["agent","host","@version","ecs","tags","input","log"]
    }
   } 
    geoip {
       source => "clientip"
       target => "devopstack-geoip"
    }
    useragent {
       source => "http_user_agent"
       target => "http_user_agentnew"
    }
    }else if [type] == "devopstack-file" {
    mutate {
      add_field => {
        "address" => "https://devopstack.cn" "hobby" => ["LOL","王者荣耀"]
      }
      remove_field => ["host","@version"]
      }
  } else {
      mutate {
         remove_field => ["port","@version","host"]
      }
      mutate {
         split => {
            "message" => "|"
         }
         add_field => {
             "user_id" => "%{[message][1]}"
             "action" => "%{[message][2]}"
             "svip" => "%{[message][3]}"
             "price" => "%{[message][4]}"
}
# 利用完message字段后,在删除是可以等!注意代码等执行顺序! 
         remove_field => ["message"]
         strip => ["svip"]
      }
      mutate {
        convert => {
           "user_id" => "integer"
           "svip" => "boolean"
           "price" => "float"
      }
   }
  }
}

output {
   stdout {}
   if [type] == "devopstack-beat" {
       elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"
          index => "devopstack-beat-%{+YYYY.MM.dd}"
     }
   } else {
         elasticsearch {
          hosts => ["http://192.168.10.31:9200","http://192.168.10.32:9200","http://192.168.10.33:9200"]
          user => "elastic"
          password => "aka2velfFJGz178SExyi"
          index => "devopstack-tcp-%{+YYYY.MM.dd}"
    }
  }

}

logstash 匹配丢弃

if "app" in [log][path] {
      drop {}
    }

https://blog.csdn.net/FaithWh/article/details/126982545
https://www.cnblogs.com/sparkdev/p/10606810.html

  • 我的微信
  • 这是我的微信扫一扫
  • weinxin
  • 我的微信公众号
  • 我的微信公众号扫一扫
  • weinxin
avatar

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: