文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

轻量级的日志采集组件 Filebeat 讲解与实战操作

2024-11-30 07:42

关注

以下是Filebeat的主要概述和特点:

工作的流程图如下:

图片

Filebeat的采集原理的主要步骤

  1. 数据源检测:

Filebeat首先配置要监视的数据源,这可以是日志文件、系统日志、Docker容器日志、Windows事件日志等。Filebeat可以通过输入模块配置来定义数据源。

  1. 数据收集:
  1. 数据处理:

  1. 数据传输:

  1. 安全性和可靠性:

  1. 数据目的地:

  1. 实时性和监控:

总的来说,Filebeat采集原理是通过轮询监视数据源,将新数据采集并发送到目标位置,同时确保数据的安全传输和可靠性。它提供了一种高效且灵活的方式来处理各种类型的日志和事件数据,以便进行后续的分析和可视化。

二、Kafka 安装

为了快速部署,这里选择通过docker-compose部署,可以参考我这篇文章:【中间件】通过 docker-compose 快速部署 Kafka 保姆级教程

# 先安装 zookeeper
git clone https://gitee.com/hadoop-bigdata/docker-compose-zookeeper.git
cd docker-compose-zookeeper 
docker-compose -f docker-compose.yaml up -d

# 安装kafka
git clone https://gitee.com/hadoop-bigdata/docker-compose-kafka.git
cd docker-compose-kafka
docker-compose -f docker-compose.yaml up -d

如果仅仅只是为测试也可以部署一个单机kafka官方下载地址:http://kafka.apache.org/downloads

### 1、下载kafka
wget https://downloads.apache.org/kafka/3.4.1/kafka_2.12-3.4.1.tgz --no-check-certificate
### 2、解压
tar -xf kafka_2.12-3.4.1.tgz

### 3、配置环境变量
# ~/.bashrc添加如下内容:
export PATH=$PATH:/opt/docker-compose-kafka/images/kafka_2.12-3.4.1/bin

### 4、配置zookeeper 新版Kafka已内置了ZooKeeper,如果没有其它大数据组件需要使用ZooKeeper的话,直接用内置的会更方便维护。
# vi kafka_2.12-3.4.1/config/zookeeper.properties
#注释掉
#maxClientCnxns=0

#设置连接参数,添加如下配置
#为zk的基本时间单元,毫秒
tickTime=2000
#Leader-Follower初始通信时限 tickTime*10
initLimit=10
#Leader-Follower同步通信时限 tickTime*5
syncLimit=5

#设置broker Id的服务地址
#hadoop-node1对应于前面在hosts里面配置的主机映射,0是broker.id, 2888是数据同步和消息传递端口,3888是选举端口
server.0=local-168-182-110:2888:3888

### 5、配置kafka
# vi kafka_2.12-3.4.1/config/server.properties
#添加以下内容:
broker.id=0
listeners=PLAINTEXT://local-168-182-110:9092
# 上面容器的zookeeper
zookeeper.cnotallow=local-168-182-110:2181
# topic不存在的,kafka就会创建该topic。
#auto.create.topics.enable=true

### 6、启动服务
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
./bin/kafka-server-start.sh -daemon config/server.properties

### 7、测试验证
#创建topic
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --create --topic topic1 --partitions 8 --replication-factor 1

#列出所有topic
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --list

#列出所有topic的信息
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe

#列出指定topic的信息
kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe --topic topic1

#生产者(消息发送程序)
kafka-console-producer.sh --broker-list local-168-182-110:9092 --topic topic1

#消费者(消息接收程序)
kafka-console-consumer.sh --bootstrap-server local-168-182-110:9092 --topic topic1

三、Filebeat 安装

1)下载 Filebeat

官网地址:https://www.elastic.co/cn/downloads/past-releases#filebeat

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.6.2-linux-x86_64.tar.gz

tar -xf filebeat-7.6.2-linux-x86_64.tar.gz

2)Filebeat 配置参数讲解

Filebeat的配置文件通常是YAML格式,包含各种配置参数,用于定义数据源、输出目标、数据处理和其他选项。以下是一些常见的Filebeat配置参数及其含义:

  1. filebeat.inputs:指定要监视的数据源。可以配置多个输入,每个输入定义一个数据源。每个输入包括以下参数:

type:数据源的类型,例如日志文件、系统日志、Docker日志等。

paths:要监视的文件路径或者使用通配符指定多个文件。

enabled:是否启用该输入。

示例:

filebeat.inputs:
  - type: log
    paths:
      - /var/log/*.log
  - type: docker
    enabled: true
  1. filebeat.modules:定义要加载的模块,每个模块用于解析特定类型的日志或事件数据。每个模块包括以下参数:

module:模块名称。

enabled:是否启用模块。

var:自定义模块变量。

示例:

filebeat.modules:
  - module: apache
    access:
      enabled: true
    error:
      enabled: true
  1. output.elasticsearch:指定将数据发送到Elasticsearch的配置参数,包括Elasticsearch主机、索引名称等。

hosts:Elasticsearch主机列表。

index:索引名称模板。

username和password:用于身份验证的用户名和密码。

pipeline:用于数据预处理的Ingest节点管道。

示例:

output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
  username: "your_username"
  password: "your_password"
  1. output.logstash:指定将数据发送到Logstash的配置参数,包括Logstash主机和端口等。

hosts:Logstash主机列表。

index:索引名称模板。

ssl:是否使用SSL/TLS加密传输数据。

示例:

output.logstash:
  hosts: ["localhost:5044"]
  index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
  ssl.enabled: true
  1. processors:定义对数据的预处理步骤,包括字段分割、重命名、添加字段等。

add_fields:添加字段到事件数据。

decode_json_fields:解码JSON格式的字段。

drop_fields:删除指定字段。

rename:重命名字段。

示例:

processors:
  - add_fields:
      target: "my_field"
      value: "my_value"
  - drop_fields:
      fields: ["field1", "field2"]
  1. filebeat.registry.path:指定Filebeat用于跟踪已经读取的文件和位置信息的注册文件的路径。
  2. filebeat.autodiscover:自动发现数据源,特别是用于容器化环境,配置自动检测新容器的策略。
  3. logging.level:指定Filebeat的日志级别,可选项包括info、debug、warning等。

这些是 Filebeat 的一些常见配置参数,具体的配置取决于您的使用场景和需求。您可以根据需要自定义配置文件,以满足您的数据采集和处理需求。详细的配置文档可以在Filebeat官方文档中找到。

3)filebeat.prospectors 推送kafka完整配置

这里主要用到几个核心字段:filebeat.prospectors、processors、output.kafka

1、filebeat.prospectors

filebeat.prospectors:用于定义要监视的数据源和采集规则。每个 prospector 包含一个或多个输入规则,它们指定要监视的文件或数据源以及如何采集和解析数据。

以下是一个示例 filebeat.prospectors 部分的配置:

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
  exclude_files:
    - "*.gz"
  multiline.pattern: '^\['
  multiline.negate: false
  multiline.match: after
  tags: ["tag1", "tag2"]
  tail_files: true
  fields:
    app: myapp
    env: production

在上述示例中,我们定义了一个 filebeat.prospectors 包含一个 type: log 的 prospector,下面是各个字段的解释:

这些字段允许您配置Filebeat以满足特定的数据源和采集需求。您可以根据需要定义多个 prospector 来监视不同类型的数据源,每个 prospector 可以包含不同的参数。通过灵活配置 filebeat.prospectors,Filebeat可以适应各种日志和数据采集场景。

2、processors

processors 是Filebeat配置中的一个部分,用于定义在事件传输到输出目标之前对事件数据进行预处理的操作。您可以使用 processors 来修改事件数据、添加字段、删除字段,以及执行其他自定义操作。以下是一些常见的 processors 配置示例和说明:

processors:
  - add_fields:
      fields:
        app: myapp
        env: production
processors:
  - drop_fields:
      fields: ["sensitive_data"]
processors:
  - decode_json_fields:
      fields: ["json_data"]
      target: ""
processors:
  - rename:
      fields:
        - from: old_field
          to: new_field
processors:
  - add_tags:
      tags: ["error"]
    when:
      equals:
        log_level: "error"

processors 部分允许您对事件数据进行复杂的处理和转换,以适应特定的需求。您可以根据需要组合不同的处理器来执行多个操作,以确保事件数据在传输到输出目标之前满足您的要求。

3、output.kafka

output.kafka 是Filebeat配置文件中的一个部分,用于配置将事件数据发送到Kafka消息队列的相关设置。以下是 output.kafka 部分的常见参数及其解释:

output.kafka:
  hosts: ["kafka-broker1:9092", "kafka-broker2:9092"]
  topic: "my-log-topic"
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

以下是各个参数的详细解释:

以上是常见的 output.kafka 参数,您可以根据您的Kafka集群配置和需求来调整这些参数。确保配置正确的Kafka主题和分区策略以满足您的数据传输需求。同时,要确保Filebeat服务器可以连接到指定的Kafka broker地址。

以下是一个完整的Filebeat配置文件示例,其中包括了 filebeat.prospectors、processors 和 output.kafka 的配置部分,以用于从日志文件采集数据并将其发送到Kafka消息队列:

4)filebeat.inputs 与 filebeat.prospectors区别

Filebeat 从 7.x 版本开始引入了新的配置方式 filebeat.inputs,以提供更灵活的输入配置选项,同时保留了向后兼容性。以下是 filebeat.inputs 和 filebeat.prospectors 之间的主要区别:

filebeat.inputs 是较新版本的配置方式,用于定义输入配置。

允许您以更灵活的方式配置不同类型的输入。您可以在配置文件中定义多个独立的输入块,每个块用于配置不同类型的输入。

每个输入块可以包含多个字段,用于定制不同输入类型的配置,如 type、enabled、paths、multiline 等。

使配置更具可读性,因为每个输入类型都有自己的配置块。

示例:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/app/*.log

- type: syslog
  enabled: true
  port: 514
  protocol.udp: true

filebeat.prospectors 是旧版配置方式,用于定义输入配置。

所有的输入类型(如日志文件、系统日志、stdin 等)都需要放在同一个部分中。

需要在同一个配置块中定义不同输入类型的路径等细节。

旧版配置方式,不如 filebeat.inputs 配置方式那么灵活和可读性好。

以下是一些常见的 type 值以及它们的含义:

  1. log(常用):用于监视和收集文本日志文件,例如应用程序日志。
- type: log
  paths:
    - /var/log/*.log
  1. stdin:用于从标准输入(stdin)收集数据。
- type: stdin
  1. syslog:用于收集系统日志数据,通常是通过UDP或TCP协议从远程或本地 syslog 服务器接收。
- type: syslog
  port: 514
  protocol.udp: true
  1. filestream:用于收集 Windows 上的文件日志数据。
- type: filestream
  enabled: true
  1. httpjson:用于通过 HTTP 请求从 JSON API 收集数据。
- type: httpjson
  enabled: true
  urls:
    - http://example.com/api/data
  1. tcp 和 udp:用于通过 TCP 或 UDP 协议收集网络数据。
- type: tcp
  enabled: true
  host: "localhost"
  port: 12345
- type: udp
  enabled: true
  host: "localhost"
  port: 12345

总的来说,filebeat.inputs 提供了更灵活的方式来配置不同类型的输入,更容易组织和管理配置。如果您使用的是较新版本的 Filebeat,推荐使用 filebeat.inputs 配置方式。但对于向后兼容性,旧版的 filebeat.prospectors 仍然可以使用。

5)filebeat.yml 配置

filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
  multiline.pattern: '^\['
  multiline.negate: false
  multiline.match: after
  tail_files: true
  fields:
    app: myapp
    env: production
    topicname: my-log-topic
- type: log
  enabled: true
  paths:
    - /var/log/messages
  multiline.pattern: '^\['
  multiline.negate: false
  multiline.match: after
  tail_files: true
  fields:
    app: myapp
    env: production
    topicname: my-log-topic
processors:
  - add_fields:
      fields:
        app: myapp
        env: production
  - drop_fields:
      fields: ["sensitive_data"]

output.kafka:
  hosts: ["local-168-182-110:9092"]
  #topic: "my-log-topic"
  # 这里也可以应用上面filebeat.prospectors.fields的值
  topic: '%{[fields][topicname]}'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

6)启动 Filebeat 服务

nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

# -e 将启动信息输出到屏幕上
# filebeat本身运行的日志默认位置${install_path}/logs/filebeat

要修改filebeat的日子路径,可以添加一下内容在filebeat.yml配置文件:

#logging.level :debug 日志级别
path.logs: /var/log/

使用 systemctl 启动 filebeat

# vi /usr/lib/systemd/system/filebeat.service

[Unit]
Descriptinotallow=filebeat server daemon
Documentatinotallow=/opt/filebeat-7.6.2-linux-x86_64/filebeat -help
Wants=network-online.target
After=network-online.target
 
[Service]
User=root
Group=root
Envirnotallow="BEAT_CONFIG_OPTS=-c /opt/filebeat-7.6.2-linux-x86_64/filebeat.yml"
ExecStart=/opt/filebeat-7.6.2-linux-x86_64/filebeat $BEAT_CONFIG_OPTS
Restart=always
 
[Install]
WantedBy=multi-user.target

【温馨提示】记得更换自己的 filebeat 目录。

systemctl 启动 filebeat 服务

#刷新一下配置文件
systemctl daemon-reload

# 启动
systemctl start filebeat
 
# 查看状态
systemctl status filebeat

# 查看进程
ps -ef|grep filebeat

# 查看日志
vi logs/filebeat

7)检测日志是否已经采集到 kafka

# 设置环境变量
export KAFKA_HOME=/opt/docker-compose-kafka/images/kafka_2.12-3.4.1

# 查看topic列表
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --list

# 查看topic列表详情
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe

# 指定topic
${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server local-168-182-110:9092 --describe --topic my-log-topic

# 查看kafka数据
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092

#上述命令会连接到指定的Kafka集群并打印my_topic主题上的所有消息。如果要查看特定数量的最新消息,则应将“--from-beginning”添加到命令中。
# 在较高版本的 Kafka 中(例如 Kafka 2.4.x 和更高版本),消费者默认需要明确指定要消费的分区。
#以下是查看特定最新消息数量的示例:
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092 --from-beginning --max-messages 10 --partition 0

# 查看kafka数据量,在较高版本的 Kafka 中(例如 Kafka 2.4.x 和更高版本),消费者默认需要明确指定要消费的分区。
${KAFKA_HOME}/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list local-168-182-110:9092 --topic my-log-topic --time -1

# 消费数据查看数据,这里指定一个分区
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server local-168-182-110:9092 --topic my-log-topic --partition 0 --offset 100

# 也可以通过消费组消费,可以不指定分区
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic my-log-topic --bootstrap-server local-168-182-110:9092 --from-beginning --group my-group

这将返回主题 的分区和偏移量信息,您可以根据这些信息计算出数据量。

来源:大数据与云原生技术分享内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯