文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

【elasticsearch专题】:Logstash从入门到同步MySQL数据

2023-12-23 19:24

关注

1. 引言

  Elasticsearch是在数据处理生态系统中担任一个开源的分布式搜索分析引擎的角色,专为存储检索和分析大量的数据而打造。与此相伴的是Kibana,一个开源数据可视化平台,用于以优雅的方式展示Elasticsearch中的数据,并赋予用户创建仪表盘、图表和报告的能力。然而,实现完整的数据流并不仅止于此。Logstash担任数据处理的角色,使得数据处理的整个过程更加完整。这三大组件就构成了我们平时所说的ELK。下面就开始对Logstash进行详细的介绍。

1.1 什么是Logstash?

Logstash作为一个具备实时流水线功能的开源数据收集引擎,拥有强大的能力。它能够从不同来源收集数据,并将其动态地汇聚,进而根据我们定义的规范进行转换或者输出到我们定义的目标地址。

1.2 Logstash的主要特点

Logstash通过清洗和使数据多样化,Logstash使数据变得适用于各种高级下游分析和可视化用例。此外,Logstash提供广泛的输入、过滤器和输出插件,而且许多本地编解码器进一步简化了数据摄取的过程。无论是数据整理还是提供给下游应用,Logstash都是一个强大且灵活的解决方案。
引用自Logstash官网的图片,更能说明他的角色和功能:
在这里插入图片描述

2. 下载与配置

  本文使用的是7.17最新的版本7.17.12,7.x版本也是最后支持jdk8的版本,后续8.0默认jdk11起.

2.1 下载

登陆到elastic下载地址,选择产品 Logstash 和版本号 7.17.12
在这里插入图片描述
  关于系统选择,如果是行x86架构的linux的选择下载LINUX X86_64,如果是windows的就选择WINDOWS,这两个系统没有什么区别,启动和配置都差不多,建议学习测试可以使用windows版本的。

2.2 文件结构

解压后的文件目录如下:
在这里插入图片描述

2.3 环境配置

  Logstash 推荐在环境变量配置 LS_JAVA_HOME 变量指向jdk目录来使用jdk。配置jdk目录时,我们可以直接使用Logstash 中的jdk目录,省去了另外下载和有可能引起版本不能用的问题。(7.17.12支持的jdk版本只有 jdk8,jdk11和jdk15)

  在7.17.12以及之前的版本中还兼容使用我们配置的JAVA_HOME环境变量,后续会取消对该变量的支持。

3. Logstash三大核心组件

Logstash 管道有两个必需元素input(收集源数据)和output(输出数据),以及一个可选元素filter(格式化数据) 。三个插件在源数据和Elasticsearch中的关系如下:
在这里插入图片描述

3.1 Input

input是用于从不同数据源收集数据的插件或配置部分。input插件允许您定义数据输入的来源,并将数据发送到Logstash进行后续处理。Logstash支持多种类型的输入插件,每种插件适用于不同的数据源类型。常用的有:

详细的插件可以查看Logstash官网

3.2 Filter

filter是用于对输入的数据进行处理、转换和过滤的部分。filter插件允许在数据进入 Logstash 后(经过input处理),再次对其进行各种操作,以满足特定需求,例如数据清洗、解析、标准化等。Logstash 支持多种类型的 “filter” 插件,每种插件适用于不同的数据处理需求。以下是一些常见的 Logstash “filter” 插件:

3.3 Output

output是用于将处理过的数据发送到不同目标的插件或配置部分。output插件允许定义数据输出的目标,并将经过 Logstash 处理后的数据传输到这些目标。Logstash 支持多种类型的输出插件,每种插件适用于不同的数据存储、传输或处理需求。以下是一些常见的 Logstash “output” 插件:

4. 动手实践:Hello World例子

4.1 如何启动Logstash

启动logstash比较简单,只需要执行bin目录下的 logstash(linxu)或者logstash.bat(windows)。

# linux启动命令./bin/logstash# windows启动命令.\bin\logstash.bat

4.2 常用的配置文件详解

在编写示例前,需要先了解一下重要的配置文件

一些常见的配置项包括:
pipeline.batch.size:指定每个批次处理的事件数量。
pipeline.batch.delay:指定每个批次之间的延迟时间。
path.data:指定 Logstash 数据的存储路径。
http.host:指定 HTTP 监听的主机名。
http.port:指定 HTTP 监听的端口号。
pipeline.workers:指定并行处理事件的工作线程数量。
queue.type: 指定队列的存储类型,可选memory(内存)和persisted(持久)

一些常见的配置项包括:
-Xmx:指定 Java 堆内存的最大值。
-Xms:指定 Java 堆内存的初始值。
-XX:+UseConcMarkSweepGC:指定使用 CMS(Concurrent Mark-Sweep)垃圾回收器。
-Djava.io.tmpdir:指定临时文件的存储路径。

4.3 编写并运行"Hello World"示例

在logstash根目录下执行

# windows执行.\bin\logstash.bat -e "input { stdin { } } output { stdout {} }"#linux执行bin/logstash -e 'input { stdin { } } output { stdout {} }'

启动 Logstash 后,请等待,直到看到][main] Pipeline started {"pipeline.id"=>"main"},然后在命令提示符下输入:hello world
在这里插入图片描述

4.4 使用 -f 参数指定配置文件启动

上面的例子我们直接采用参数 -e后跟随管道配置方式启动,我们还可以使用-f参数指定我们的配置文件方式启动。在跟目录下创建hello.conf文件,文件内容:

input { stdin { } } output { stdout { } }

然后执行启动命令

# windows执行.\bin\logstash.bat -f hello.conf#linux执行bin/logstash -f hello.conf

4.5 在pipeline中配置启动

打开config目录下的pipelines.yml文件,输入

- pipeline.id: hello  pipeline.workers: 1  pipeline.batch.size: 1  config.string: "input { stdin { } } output { stdout {} }"

除了直接配置管道处理规则,我们还可以指向刚刚编写的hello.conf文件

- pipeline.id: hello  pipeline.workers: 1  pipeline.batch.size: 1  path.config: "/usr/local/logstash/hello.conf"

保存文件后执行:

# windows执行.\bin\logstash.bat#linux执行bin/logstash

5. 实战:定时滚动同步MySQL数据

5.1 环境与数据准备

5.1.1 数据库准备

需要提前准备好mysql的表结构和测试数据:

# 建表语句CREATE TABLE `test` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `content` varchar(255) DEFAULT NULL,  `status` int(11) DEFAULT NULL,  `update_time` bigint(20) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;# 插入数据insert into test (content,status,update_time) VALUES ("aaa",1,UNIX_TIMESTAMP());insert into test (content,status,update_time) VALUES ("bbb",1,UNIX_TIMESTAMP());insert into test (content,status,update_time) VALUES ("ccc",2,UNIX_TIMESTAMP());insert into test (content,status,update_time) VALUES ("ddd",1,UNIX_TIMESTAMP());

5.1.2 启动elasticsearch和kibana

需要启动好elasticsearchkibana,并且elasticsearch需要开启允许自动创建索引,如果没有开启需要事先创建好索引

5.1.3 导入mysql的jar

在logstash根目录下创建一个mylib的目录,用于存放java连接mysql的jar文件,例如:mysql-connector-java-8.0.27.jar

5.2 编写脚本

5.2.1 根据id滚动同步数据

需求:每分钟执行根据test表的id从小到大并且status等于1保存elasticsearch中,每次执行的数量是2条
在logstash跟目录下创建 mysql-by-id-to-es.conf 文件,文件内容:

input {  jdbc {    jdbc_driver_library => "./mylib/mysql-connector-java-8.0.27.jar"    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"    jdbc_connection_string => "jdbc:mysql://localhost:3306/test"    jdbc_user => "root"    jdbc_password => "123456"    parameters => { "myStatus" => 1 }    schedule => "* * * * *"    statement => "SELECT id,content,status,update_time FROM test WHERE status = :myStatus AND id > :sql_last_value ORDER BY id ASC LIMIT 2"    last_run_metadata_path => "mysql-by-id-to-es.index"    tracking_column => "id"    use_column_value => true    tracking_column_type => "numeric"  }}filter {mutate { add_field => { "from" => "logstash" } }}output {  elasticsearch {        index => "test-by-id-%{+YYYY.MM}"  }  stdout {  }}

保存好文件内容,在logstash根目录下执行-f启动命令

# windows执行.\bin\logstash.bat -f mysql-by-id-to-es.conf# linux执行./bin/logstash.bat -f mysql-by-id-to-es.conf

控制台打印信息:
在这里插入图片描述

到kibana执行查看数据:
1.先执行GET _cat/indices?v 查看是否存储以test-by-id开头的索引
在这里插入图片描述

如果索引存在,执行查看数据的语句GET test-by-id-2023.08/_search{ "query": {"match_all": {}}},可以发现status等于2的记录不会被录入

{        "_index" : "test-by-id-2023.08",        "_type" : "_doc",        "_id" : "FsWU74kBjZ5FwUtCTy7a",        "_score" : 1.0,        "_source" : {          "update_time" : 1691939803,          "@version" : "1",          "content" : "aaa",          "@timestamp" : "2023-08-13T15:47:01.008Z",          "status" : 1,          "id" : 1,          "from" : "logstash"        }      },      {        "_index" : "test-by-id-2023.08",        "_type" : "_doc",        "_id" : "FcWU74kBjZ5FwUtCTy7a",        "_score" : 1.0,        "_source" : {          "update_time" : 1691939803,          "@version" : "1",          "content" : "bbb",          "@timestamp" : "2023-08-13T15:47:01.020Z",          "status" : 1,          "id" : 2,          "from" : "logstash"        }      },      {        "_index" : "test-by-id-2023.08",        "_type" : "_doc",        "_id" : "F8WV74kBjZ5FwUtCNS6W",        "_score" : 1.0,        "_source" : {          "update_time" : 1691939803,          "@version" : "1",          "content" : "ddd",          "@timestamp" : "2023-08-13T15:48:00.413Z",          "status" : 1,          "id" : 4,          "from" : "logstash"        }      }

5.2.2 根据更新时间滚动同步数据

如果希望通过数据的更新时间录入,新建**mysql-by-uptime-to-es.conf文件,文件内容:

input {  jdbc {    jdbc_driver_library => ",/mylib/mysql-connector-java-8.0.27.jar"    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"    jdbc_connection_string => "jdbc:mysql://localhost:3306/test"    jdbc_user => "root"    jdbc_password => "123456"    parameters => { "myStatus" => 1 }    schedule => "* * * * *"    statement => "SELECT id,content,status,update_time FROM test WHERE status = :myStatus AND update_time > :sql_last_value"    last_run_metadata_path => "mysql-by-uptime-to-es.index"  }}filter {mutate { add_field => { "from" => "logstash" } }}output {  elasticsearch {        index => "test-by-uptime-%{+YYYY.MM}"  }  stdout {  }}

保存文件,启动和查看数据请参考前面的5.2.1

5.3 配置参数详解

例子:

  • 5 * 1-3 * 将在 5 月至 <> 月的每一天凌晨 <> 点执行一次。
    0 * * * * 将在每天每小时的第 0 分钟执行。
    0 6 * * * America/Chicago 将在每天上午 6:00 (UTC/GMT -5) 执行。
elasticsearch {  # 配置es地址,有多个使用逗号隔开,不填默认就是 localhost:9200  hosts => ["localhost:9200"]  # 配置索引  index => "test-by-uptime-%{+YYYY.MM}"  # 配置账号和密码,默认不填  user => "elastic"  password => "123456"}

6.总结

  本文通过详细介绍Logstash的核心组件和功能,涵盖如何从下载安装到编写第一个Hello World示例,再到定时同步MySQL数据到Elasticsearch的完整过程。希望此教程能成为您学习和掌握Logstash的重要参考。

来源地址:https://blog.csdn.net/dougsu/article/details/132261486

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯