Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 1636|回复: 1

[默认分类] 基于elk 实现nginx日志收集与数据分析。

  [复制链接]
  • TA的每日心情
    开心
    2021-12-13 21:45
  • 签到天数: 15 天

    [LV.4]偶尔看看III

    发表于 2018-5-16 11:04:26 | 显示全部楼层 |阅读模式
    一。背景
          前端web服务器为nginx,采用filebeat + Logstash + Elasticsearch + granfa 进行数据采集与展示,对客户端ip进行地域统计,监控服务器响应时间等。
      
    二。业务整体架构:
          nginx日志落地——》filebear——》logstash——》elasticsearch——》grafna(展示)
    三。先上个效果图,慢慢去一步步实现

    如上只是简单的几个实用的例子,实际上有多维度的数据之后还可以定制很多需要的内容,如终端ip访问数,国家、地区占比,访问前十的省份,请求方法占比,referer统计,user_agent统计,慢响应时间统计,更有世界地图坐标展示等等,只要有数据就能多维度进行展示。这里提供模板搜索位置大家可以查找参考:https://grafana.com/dashboards
    四,准备条件
    需要具备如下条件:
    1.nginx日志落地,需要主要落地格式,以及各个字段对应的含义。
    2.安装filebeat。 filebeat轻量,功能相比较logstash而言比较单一。
    3.安装logstash 作为中继服务器。这里需要说明一下的是,起初设计阶段并没有计划使用filebeat,而是直接使用logstash发往elasticsearch,但是当前端机数量增加之后logstash数量也随之增加,同时发往elasticsearch的数量增大,logstash则会抛出由于elasticsearch 限制导致的错误,大家遇到后搜索相应错误的代码即可。为此只采用logstash作为中继。
    4.elasticsearch 集群。坑点是index templates的创建会影响新手操作 geoip模块。后文会有。
    5.grafana安装,取代传统的kibana,grafana有更友好、美观的展示界面。
      
    五。实现过程
    1.nginx日志落地配置
      nginx日志格式、字段的内容和顺序都是高度可定制化的,将需要收集的字段内容排列好。定义一个log_format
      定义的形势实际上直接决定了logstash配置中对于字段抽取的模式,这里有两种可用,一种是直接在nginx日志中拼接成json的格式,在logstash中用codec => "json"来转换,
      一种是常规的甚至是默认的分隔符的格式,在logstash中需要用到grok来进行匹配,这个会是相对麻烦些。两种方法各有优点。直接拼接成json的操作比较简单,但是在转码过程中
      会遇到诸如 \x 无法解析的情况。 这里我也遇到过,如有必要后续会详谈。采用grok来匹配的方法相对难操作,但是准确性有提升。我们这里采用的是第一种方法,下面logstash部分
      也会给出采用grok的例子。   nginx日志中日志格式定义如下:
      

    1. log_format access_json   "{"timestamp":"$time_iso8601","
    2.                         ""hostname":"$hostname","
    3.                         ""ip":"$remote_addrx","
    4.                         ""request_method":"$request_method","
    5.                         ""domain":"XXXX","
    6.                         ""size":$body_bytes_sent,"
    7.                         ""status": $status,"
    8.                         ""responsetime":$request_time,"
    9.                         ""sum":"1""
    10.                         "}";
    复制代码


      
    2.filebeat配置文件
       关于filebeat更多内容请参考https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html
       配置文件内容:filebeat.yml  这里应该不会遇到坑。

    1. filebeat.prospectors:
    2. - input_type: log
    3.   paths:
    4.     - /data0/logs/log_json/*.log      #nginx日志路径
    5. output.logstash:
    6.     hosts: ["xxxx.xxxx.xxxx.xxx:12121"]   #logstash 服务器地址
    复制代码


      
    3.logstahs配置文件内容:
    这里是针对json已经拼接号,直接进行json转码的情况:
    需要注意如下:
       1)date模块必须有,否则会造成数据无法回填导致最终的图像出现锯齿状影响稳定性(原因是排列时间并不是日志产生的时间,而是进入logstash的时间)。这里后面的  yyyy-MM-dd"T"HH:mm:ssZZ   需要根据你日志中的日期格式进行匹配匹配规则见:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html   
     2)需要说明的是我下面去掉了好多的字段(remove_field),原因是我们数据量大,es服务器有限。 可以根据需要随时调整收集的字段。
     3)  geoip 仅需要指定源ip字段名称即可,fields并不是必须的,我加入的原因还是由于资源有限导致的。

    1. input {
    2.     beats {
    3.     port => 12121
    4.     host => "10.13.0.80"
    5.     codec => "json"
    6.     }
    7. }
    8. filter {
    9.     date {
    10.       match => [ "timestamp", "yyyy-MM-dd"T"HH:mm:ssZZ" ]
    11.       #timezone => "Asia/Shanghai"
    12.       #timezone => "+00:00"
    13.     }
    14.     mutate {
    15.       convert => [ "status","integer" ]
    16.       convert => [ "sum","integer" ]
    17.       convert => [ "size","integer" ]
    18.       remove_field => "message"
    19.       remove_field => "source"
    20.       remove_field => "tags"
    21.       remove_field => "beat"
    22.       remove_field => "offset"
    23.       remove_field => "type"
    24.       remove_field => "@source"
    25.       remove_field => "input_type"
    26.       remove_field => "@version"
    27.       remove_field => "host"
    28.       remove_field => "client"
    29.       #remove_field => "request_method"
    30.       remove_field => "size"
    31.       remove_field => "timestamp"
    32.       #remove_field => "domain"
    33.       #remove_field => "ip"
    34.     }
    35.     geoip {
    36.       source => "ip"
    37.       fields => ["country_name", "city_name", "timezone","region_name","location"]
    38.     }
    39. }
    40. output {
    41.     elasticsearch {
    42.        hosts => ["xxx:19200","xxx:19200","xxx:19200"]
    43.        user => "xxx"
    44.        password => "xxx"
    45.        index => "logstash-suda-alllog-%{+YYYY.MM.dd}"
    46.        flush_size => 10000
    47.        idle_flush_time => 35
    48.     }
    49. }
    复制代码


      下面给出一个采用grok的例子:
           其中match内的内容不用参考我的,需要根据你的字段个数,以及格式来定制。 这里其实是正则表达式。自带了一部分几乎就可以满足所有的需求了无需自己写了,可以参考:https://github.com/elastic/logstash/blob/v1.4.0/patterns/grok-patterns 直接用即可。

    1. input {
    2.   file {
    3.   type => "access"
    4.   path => ["/usr/local/nginx/logs/main/*.log"]
    5.   }
    6. }
    7. filter {
    8.   if [type] == "access" {
    9.   if [message] =~ "^#" {
    10.     drop {}
    11.   }
    12.   grok {
    13.     match => ["message", "\[%{HTTPDATE:log_timestamp}\] %{HOSTNAME:server_name} "%{WORD:request_method} %{NOTSPACE:query_string} HTTP/%{NUMBER:httpversion}" "%{GREEDYDATA:http_user_agent}" %{NUMBER:status} %{IPORHOST:server_addr} "%{IPORHOST:remote_addr}" "%{NOTSPACE:http_referer}" %{NUMBER:body_bytes_sent} %{NUMBER:time_taken} %{GREEDYDATA:clf_body_bytes_sent} %{NOTSPACE:uri} %{NUMBER:m_request_time}"]
    14.   }
    15.   date {
    16.     match => [ "log_timestamp", "dd/MMM/yyyy:mm:ss:SS Z" ]
    17.     timezone => "Etc/UTC"
    18.   }
    19.      
    20.   mutate {
    21.   convert => [ "status","integer" ]
    22.   convert => [ "body_bytes_sent","integer" ]
    23.   convert => [ "m_request_time","float" ]
    24.   }
    复制代码


    在提供一个高级的用法:
    ruby:强大的模块, 可以进行诸如时间转换,单位计算等,多个可以用分号隔开。

    1.   ruby {
    2.   code => "event.set("logdateunix",event.get("@timestamp").to_i);event.set("request_time", event.get("m_request_time") / 1000000 )"  
    3.   }
    4.   mutate {
    5.    add_field => {
    6.         "http_host" => "%{server_name}"
    7.         "request_uri" => "%{uri}"
    8.         }
    复制代码


    在windowns上使用lgostsh需要注意的是:(win上收集iis的日志,我想正常环境是不会用到的,但是我确实用到了。。。。。。)
    path路径一定要采用 linux中的分割符来拼接路径,用win的格式则正则不能实现,大家可以测试下。其他配置则无区别。

    1. input {
    2.   file {
    3.     type => "access"
    4.     path => ["C:/WINDOWS/system32/LogFiles/W3SVC614874788/*.log"]
    5.   }
    6. }
    复制代码


      
    4.elasticsearch配置,集群的安装以及启动,调优这里不多说(说不完),需要注意的一个是,geoip location的格式,我这里采用的是index templates来实现的如下:
    最重要的是 "location" 定义 (否则geoip_location字段格式有问题,无法拼接成坐标),其他可以根据情况自定:

    1. {
    2.   "order": 0,
    3.   "version": 50001,
    4.   "index_patterns": [
    5.     "suda-*"
    6.   ],
    7.   "settings": {
    8.     "index": {
    9.       "number_of_shards": "5",
    10.       "number_of_replicas": "1",
    11.       "refresh_interval": "200s"
    12.     }
    13.   },
    14.   "mappings": {
    15.     "_default_": {
    16.       "dynamic_templates": [
    17.         {
    18.           "message_field": {
    19.             "path_match": "message",
    20.             "mapping": {
    21.               "norms": false,
    22.               "type": "text"
    23.             },
    24.             "match_mapping_type": "string"
    25.           }
    26.         },
    27.         {
    28.           "string_fields": {
    29.             "mapping": {
    30.               "norms": false,
    31.               "type": "text",
    32.               "fields": {
    33.                 "keyword": {
    34.                   "ignore_above": 256,
    35.                   "type": "keyword"
    36.                 }
    37.               }
    38.             },
    39.             "match_mapping_type": "string",
    40.             "match": "*"
    41.           }
    42.         }
    43.       ],
    44.       "properties": {
    45.         "@timestamp": {
    46.           "type": "date"
    47.         },
    48.         "geoip": {
    49.           "dynamic": true,
    50.           "properties": {
    51.             "ip": {
    52.               "type": "ip"
    53.             },
    54.             "latitude": {
    55.               "type": "half_float"
    56.             },
    57. [b] "location": { "type": "geo_point" },[/b]
    58.             "longitude": {
    59.               "type": "half_float"
    60.             }
    61.           }
    62.         },
    63.         "@version": {
    64.           "type": "keyword"
    65.         }
    66.       }
    67.     }
    68.   },
    69.   "aliases": {}
    70. }
    复制代码


      
    5   grafana 安装以及模板创建,这个比较简单,安装完直接写语句即可附一个例子如下:

      
    这里的变量需要自定义:

      
    通过上面应该能够完成一个完整的收集和展示情况,这里实际上提供了一种可行的方法,并么有太大的具体操作。
    希望多多交流
    推荐内容:kibana中文指南(有ibook版本,看着挺方便) 三斗室著
         https://www.elastic.co/cn/   
           https://grafana.com/dashboards
           https://grafana.com/
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-25 03:15 , Processed in 0.491775 second(s), 50 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表