一套好的日志分析系统可以详细记录系统的运行情况,方便我们定位分析系统性能瓶颈、查找定位系统问题。上一篇说明了日志的多种业务场景以及日志记录的实现方式,那么日志记录下来,相关人员就需要对日志数据进行处理与分析,基于E(ElasticSearch)L(Logstash)K(Kibana)组合的日志分析系统可以说是目前各家公司普遍的首选方案。
- Elasticsearch: 分布式、RESTful 风格的搜索和数据分析引擎,可快速存储、搜索、分析海量的数据。在ELK中用于存储所有日志数据。
- Logstash: 开源的数据采集引擎,具有实时管道传输功能。Logstash 能够将来自单独数据源的数据动态集中到一起,对这些数据加以标准化并传输到您所选的地方。在ELK中用于将采集到的日志数据进行处理、转换然后存储到Elasticsearch。
- Kibana: 免费且开放的用户界面,能够让您对 Elasticsearch 数据进行可视化,并让您在 Elastic Stack 中进行导航。您可以进行各种操作,从跟踪查询负载,到理解请求如何流经您的整个应用,都能轻松完成。在ELK中用于通过界面展示存储在Elasticsearch中的日志数据。
作为微服务集群,必须要考虑当微服务访问量暴增时的高并发场景,此时系统的日志数据同样是爆发式增长,我们需要通过消息队列做流量削峰处理,Logstash官方提供Redis、Kafka、RabbitMQ等输入插件。Redis虽然可以用作消息队列,但其各项功能显示不如单一实现的消息队列,所以通常情况下并不使用它的消息队列功能;Kafka的性能要优于RabbitMQ,通常在日志采集,数据采集时使用较多,所以这里我们采用Kafka实现消息队列功能。
ELK日志分析系统中,数据传输、数据保存、数据展示、流量削峰功能都有了,还少一个组件,就是日志数据的采集,虽然log4j2可以将日志数据发送到Kafka,甚至可以将日志直接输入到Logstash,但是基于系统设计解耦的考虑,业务系统运行不会影响到日志分析系统,同时日志分析系统也不会影响到业务系统,所以,业务只需将日志记录下来,然后由日志分析系统去采集分析即可,Filebeat是ELK日志系统中常用的日志采集器,它是 Elastic Stack 的一部分,因此能够与 Logstash、Elasticsearch 和 Kibana 无缝协作。
- Kafka: 高吞吐量的分布式发布订阅消息队列,主要应用于大数据的实时处理。
- Filebeat: 轻量型日志采集器。在 Kubernetes、Docker 或云端部署中部署 Filebeat,即可获得所有的日志流:信息十分完整,包括日志流的 pod、容器、节点、VM、主机以及自动关联时用到的其他元数据。此外,Beats Autodiscover 功能可检测到新容器,并使用恰当的 Filebeat 模块对这些容器进行自适应监测。
软件下载:
因经常遇到在内网搭建环境的问题,所以这里习惯使用下载软件包的方式进行安装,虽没有使用Yum、Docker等安装方便,但是可以对软件目录、配置信息等有更深的了解,在后续采用Yum、Docker等方式安装时,也能清楚安装了哪些东西,安装配置的文件是怎样的,即使出现问题,也可以快速的定位解决。
Elastic Stack全家桶下载主页:
https://www.elastic.co/cn/downloads/。
我们选择如下版本:
- Elasticsearch8.0.0,下载地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.0.0-linux-x86_64.tar.gz。
- Logstash8.0.0,下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-8.0.0-linux-x86_64.tar.gz。
- Kibana8.0.0,下载地址:https://artifacts.elastic.co/downloads/kibana/kibana-8.0.0-linux-x86_64.tar.gz。
- Filebeat8.0.0,下载地址:https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.0.0-linux-x86_64.tar.gz。
Kafka下载:
- Kafka3.1.0,下载地址:https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz。
安装配置:
安装前先准备好三台CentOS7服务器用于集群安装,这是IP地址为:172.16.20.220、172.16.20.221、172.16.20.222,然后将上面下载的软件包上传至三台服务器的/usr/local目录。因服务器资源有限,这里所有的软件都安装在这三台集群服务器上,在实际生产环境中,请根据业务需求设计规划进行安装。
在集群搭建时,如果能够编写shell安装脚本就会很方便,如果不能编写,就需要在每台服务器上执行安装命令,多数ssh客户端提供了多会话同时输入的功能,这里一些通用安装命令可以选择启用该功能。
一、安装Elasticsearch集群
1、Elasticsearch是使用Java语言开发的,所以需要在环境上安装jdk并配置环境变量
下载jdk软件包安装,https://www.oracle.com/java/technologies/downloads/#java8。
新建/usr/local/java目录:
mkdir /usr/local/java
将下载的jdk软件包jdk-8u64-linux-x64.tar.gz上传到/usr/local/java目录,然后解压。
tar -zxvf jdk-8u77-linux-x64.tar.gz
配置环境变量/etc/profile。
vi /etc/profile
在底部添加以下内容:
JAVA_HOME=/usr/local/java/jdk1.8.0_64
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar
export PATH JAVA_HOME CLASSPATH
使环境变量生效。
source /etc/profile
另外一种十分快捷的方式,如果不是内网环境,可以直接使用命令行安装,这里安装的是免费版本的openjdk。
yum install java-1.8.0-openjdk* -y
2、安装配置Elasticsearch
进入/usr/local目录,解压Elasticsearch安装包,请确保执行命令前已将环境准备时的Elasticsearch安装包上传至该目录。
tar -zxvf elasticsearch-8.0.0-linux-x86_64.tar.gz
重命名文件夹。
mv elasticsearch-8.0.0 elasticsearch
elasticsearch不能使用root用户运行,这里创建运行elasticsearch的用户组和用户。
# 创建用户组
groupadd elasticsearch
# 创建用户并添加至用户组
useradd elasticsearch -g elasticsearch
# 更改elasticsearch密码,设置一个自己需要的密码,这里设置为和用户名一样:El12345678
passwd elasticsearch
新建elasticsearch数据和日志存放目录,并给elasticsearch用户赋权限。
mkdir -p /data/elasticsearch/data
mkdir -p /data/elasticsearch/log
chown -R elasticsearch:elasticsearch /data/elasticsearch*operation.log
#- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#prospector.scanner.exclude_files: ['.gz$']
# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
fields:
topic: operation_log
# level: debug
# review: 1
# filestream is an input for collecting log messages from files.
- type: filestream
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /data/gitegg/log*debug.log
#- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#prospector.scanner.exclude_files: ['.gz$']
# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
fields:
topic: debugger_log
# level: debug
# review: 1
# filestream is an input for collecting log messages from files.
- type: filestream
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /usr/local/nginx/logs/access.log
#- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#prospector.scanner.exclude_files: ['.gz$']
# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
fields:
topic: nginx_log
# level: debug
# review: 1
# ============================== Filebeat modules ==============================
filebeat.config.modules:
# Glob pattern for configuration loading
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: false
# Period on which files under path should be checked for changes
#reload.period: 10s
# ======================= Elasticsearch template setting =======================
setup.template.settings:
index.number_of_shards: 3
index.number_of_replicas: 1
#index.codec: best_compression
#_source.enabled: false
# 允许自动生成index模板
setup.template.enabled: true
# # 生成index模板时字段配置文件
setup.template.fields: fields.yml
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # 生成index模板的名称
setup.template.name: "gitegg_log"
# # 生成index模板匹配的index格式
setup.template.pattern: "filebeat-*"
#索引生命周期管理ilm功能默认开启,开启的情况下索引名称只能为filebeat-*, 通过setup.ilm.enabled: false进行关闭;
setup.ilm.pattern: "{now/d}"
setup.ilm.enabled: false
# ================================== General ===================================
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
#name:
# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
# Optional fields that you can specify to add additional information to the
# output.
#fields:
# env: staging
# ================================= Dashboards =================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here or by using the `setup` command.
setup.dashboards.enabled: true
# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:
# =================================== Kibana ===================================
# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:
# Kibana Host
# Scheme and port can be left out and will be set to the default (http and 5601)
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
host: "172.16.20.220:5601"
# Optional protocol and basic auth credentials.
#protocol: "https"
username: "elastic"
password: "123456"
# Optional HTTP path
#path: ""
# Optional Kibana space ID.
#space.id: ""
# Custom HTTP headers to add to each request
#headers:
# X-My-Header: Contents of the header
# Use SSL settings for HTTPS.
#ssl.enabled: true
# =============================== Elastic Cloud ================================
# These settings simplify using Filebeat with the Elastic Cloud (https://cloud.elastic.co/).
# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:
# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `:`.
#cloud.auth:
# ================================== Outputs ===================================
# Configure what output to use when sending the data collected by the beat.
# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
# Protocol - either `http` (default) or `https`.
#protocol: "https"
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
#username: "elastic"
#password: "changeme"
# ------------------------------ Logstash Output -------------------------------
#output.logstash:
# The Logstash hosts
#hosts: ["localhost:5044"]
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
# -------------------------------- Kafka Output --------------------------------
output.kafka:
# Boolean flag to enable or disable the output module.
enabled: true
# The list of Kafka broker addresses from which to fetch the cluster metadata.
# The cluster metadata contain the actual Kafka brokers events are published
# to.
hosts: ["172.16.20.220:9092","172.16.20.221:9092","172.16.20.222:9092"]
# The Kafka topic used for produced events. The setting can be a format string
# using any event field. To set the topic from document type use `%{[type]}`.
topic: '%{[fields.topic]}'
# The Kafka event key setting. Use format string to create a unique event key.
# By default no event key will be generated.
#key: ''
# The Kafka event partitioning strategy. Default hashing strategy is `hash`
# using the `output.kafka.key` setting or randomly distributes events if
# `output.kafka.key` is not configured.
partition.hash:
# If enabled, events will only be published to partitions with reachable
# leaders. Default is false.
reachable_only: true
# Configure alternative event field names used to compute the hash value.
# If empty `output.kafka.key` setting will be used.
# Default value is empty list.
#hash: []
# Authentication details. Password is required if username is set.
#username: ''
#password: ''
# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
# Defaults to PLAIN when `username` and `password` are configured.
#sasl.mechanism: ''
# Kafka version Filebeat is assumed to run against. Defaults to the "1.0.0".
#version: '1.0.0'
# Configure JSON encoding
#codec.json:
# Pretty-print JSON event
#pretty: false
# Configure escaping HTML symbols in strings.
#escape_html: false
# Metadata update configuration. Metadata contains leader information
# used to decide which broker to use when publishing.
#metadata:
# Max metadata request retry attempts when cluster is in middle of leader
# election. Defaults to 3 retries.
#retry.max: 3
# Wait time between retries during leader elections. Default is 250ms.
#retry.backoff: 250ms
# Refresh metadata interval. Defaults to every 10 minutes.
#refresh_frequency: 10m
# Strategy for fetching the topics metadata from the broker. Default is false.
#full: false
# The number of concurrent load-balanced Kafka output workers.
#worker: 1
# The number of times to retry publishing an event after a publishing failure.
# After the specified number of retries, events are typically dropped.
# Some Beats, such as Filebeat, ignore the max_retries setting and retry until
# all events are published. Set max_retries to a value less than 0 to retry
# until all events are published. The default is 3.
#max_retries: 3
# The number of seconds to wait before trying to republish to Kafka
# after a network error. After waiting backoff.init seconds, the Beat
# tries to republish. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful publish, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s
# The maximum number of seconds to wait before attempting to republish to
# Kafka after a network error. The default is 60s.
#backoff.max: 60s
# The maximum number of events to bulk in a single Kafka request. The default
# is 2048.
#bulk_max_size: 2048
# Duration to wait before sending bulk Kafka request. 0 is no delay. The default
# is 0.
#bulk_flush_frequency: 0s
# The number of seconds to wait for responses from the Kafka brokers before
# timing out. The default is 30s.
#timeout: 30s
# The maximum duration a broker will wait for number of required ACKs. The
# default is 10s.
#broker_timeout: 10s
# The number of messages buffered for each Kafka broker. The default is 256.
#channel_buffer_size: 256
# The keep-alive period for an active network connection. If 0s, keep-alives
# are disabled. The default is 0 seconds.
#keep_alive: 0
# Sets the output compression codec. Must be one of none, snappy and gzip. The
# default is gzip.
compression: gzip
# Set the compression level. Currently only gzip provides a compression level
# between 0 and 9. The default value is chosen by the compression algorithm.
#compression_level: 4
# The maximum permitted size of JSON-encoded messages. Bigger messages will be
# dropped. The default value is 1000000 (bytes). This value should be equal to
# or less than the broker's message.max.bytes.
max_message_bytes: 1000000
# The ACK reliability level required from broker. 0=no response, 1=wait for
# local commit, -1=wait for all replicas to commit. The default is 1. Note:
# If set to 0, no ACKs are returned by Kafka. Messages might be lost silently
# on error.
required_acks: 1
# The configurable ClientID used for logging, debugging, and auditing
# purposes. The default is "beats".
#client_id: beats
# Use SSL settings for HTTPS.
#ssl.enabled: true
# Controls the verification of certificates. Valid values are:
# * full, which verifies that the provided certificate is signed by a trusted
# authority (CA) and also verifies that the server's hostname (or IP address)
# matches the names identified within the certificate.
# * strict, which verifies that the provided certificate is signed by a trusted
# authority (CA) and also verifies that the server's hostname (or IP address)
# matches the names identified within the certificate. If the Subject Alternative
# Name is empty, it returns an error.
# * certificate, which verifies that the provided certificate is signed by a
# trusted authority (CA), but does not perform any hostname verification.
# * none, which performs no verification of the server's certificate. This
# mode disables many of the security benefits of SSL/TLS and should only be used
# after very careful consideration. It is primarily intended as a temporary
# diagnostic mechanism when attempting to resolve TLS errors; its use in
# production environments is strongly discouraged.
# The default value is full.
#ssl.verification_mode: full
# List of supported/valid TLS versions. By default all TLS versions from 1.1
# up to 1.3 are enabled.
#ssl.supported_protocols: [TLSv1.1, TLSv1.2, TLSv1.3]
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client certificate key
#ssl.key: "/etc/pki/client/cert.key"
# Optional passphrase for decrypting the certificate key.
#ssl.key_passphrase: ''
# Configure cipher suites to be used for SSL connections
#ssl.cipher_suites: []
# Configure curve types for ECDHE-based cipher suites
#ssl.curve_types: []
# Configure what types of renegotiation are supported. Valid options are
# never, once, and freely. Default is never.
#ssl.renegotiation: never
# Configure a pin that can be used to do extra validation of the verified certificate chain,
# this allow you to ensure that a specific certificate is used to validate the chain of trust.
#
# The pin is a base64 encoded string of the SHA-256 fingerprint.
#ssl.ca_sha256: ""
# A root CA HEX encoded fingerprint. During the SSL handshake if the
# fingerprint matches the root CA certificate, it will be added to
# the provided list of root CAs (`certificate_authorities`), if the
# list is empty or not defined, the matching certificate will be the
# only one in the list. Then the normal SSL validation happens.
#ssl.ca_trusted_fingerprint: ""
# Enable Kerberos support. Kerberos is automatically enabled if any Kerberos setting is set.
#kerberos.enabled: true
# Authentication type to use with Kerberos. Available options: keytab, password.
#kerberos.auth_type: password
# Path to the keytab file. It is used when auth_type is set to keytab.
#kerberos.keytab: /etc/security/keytabs/kafka.keytab
# Path to the Kerberos configuration.
#kerberos.config_path: /etc/krb5.conf
# The service name. Service principal name is contructed from
# service_name/hostname@realm.
#kerberos.service_name: kafka
# Name of the Kerberos user.
#kerberos.username: elastic
# Password of the Kerberos user. It is used when auth_type is set to password.
#kerberos.password: changeme
# Kerberos realm.
#kerberos.realm: ELASTIC
# Enables Kerberos FAST authentication. This may
# conflict with certain Active Directory configurations.
#kerberos.enable_krb5_fast: false
# ================================= Processors =================================
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
# ================================== Logging ===================================
# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publisher", "service".
#logging.selectors: ["*"]
# ============================= X-Pack Monitoring ==============================
# Filebeat can export internal metrics to a central Elasticsearch monitoring
# cluster. This requires xpack monitoring to be enabled in Elasticsearch. The
# reporting is disabled by default.
# Set to true to enable the monitoring reporter.
#monitoring.enabled: false
# Sets the UUID of the Elasticsearch cluster under which monitoring data for this
# Filebeat instance will appear in the Stack Monitoring UI. If output.elasticsearch
# is enabled, the UUID is derived from the Elasticsearch cluster referenced by output.elasticsearch.
#monitoring.cluster_uuid:
# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well.
# Note that the settings should point to your Elasticsearch *monitoring* cluster.
# Any setting that is not set is automatically inherited from the Elasticsearch
# output configuration, so if you have the Elasticsearch output configured such
# that it is pointing to your Elasticsearch monitoring cluster, you can simply
# uncomment the following line.
#monitoring.elasticsearch:
# ============================== Instrumentation ===============================
# Instrumentation support for the filebeat.
#instrumentation:
# Set to true to enable instrumentation of filebeat.
#enabled: false
# Environment in which filebeat is running on (eg: staging, production, etc.)
#environment: ""
# APM Server hosts to report instrumentation results to.
#hosts:
# - http://localhost:8200
# API Key for the APM Server(s).
# If api_key is set then secret_token will be ignored.
#api_key:
# Secret token for the APM Server(s).
#secret_token:
# ================================= Migration ==================================
# This allows to enable 6.7 migration aliases
#migration.6_to_7.enabled: true
执行filebeat启动命令:
./filebeat -e -c filebeat.yml
后台启动命令:
nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &
停止命令:
ps -ef |grep filebeat
kill -9 进程号
六、测试配置是否正确
1、测试filebeat是否能够采集log文件并发送到Kafka
在kafka服务器开启消费者,监听api_log主题和operation_log主题。
./kafka-console-consumer.sh --bootstrap-server 172.16.20.221:9092 --topic api_log
./kafka-console-consumer.sh --bootstrap-server 172.16.20.222:9092 --topic operation_log
手动写入日志文件,按照filebeat配置的采集目录写入。
echo "api log1111" > /data/gitegg/log/gitegg-service-system/api.log
echo "operation log1111" > /data/gitegg/log/gitegg-service-system/operation.log
观察消费者是消费到日志推送内容。
2、测试logstash是消费Kafka的日志主题,并将日志内容存入Elasticsearch
手动写入日志文件:
echo "api log8888888888888888888888" > /data/gitegg/log/gitegg-service-system/api.log
echo "operation loggggggggggggggggggg" > /data/gitegg/log/gitegg-service-system/operation.log
打开Elasticsearch Head界面 http://172.16.20.220:9100/?auth_user=elastic&auth_password=123456 ,查询Elasticsearch是否有数据。
自动新增的两个index,规则是logstash中配置的。
数据浏览页可以看到Elasticsearch中存储的日志数据内容,说明我们的配置已经生效。
七、配置Kibana用于日志统计和展示
依次点击左侧菜单Management -> Kibana -> Data Views -> Create data view , 输入logstash_* ,选择@timestamp,再点击Create data view按钮,完成创建。
点击日志分析查询菜单Analytics -> Discover,选择logstash_* 进行日志查询。