文章详情

短信预约信息系统项目管理师 报名、考试、查分时间动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

一站式Kafka平台解决方案——KafkaCenter

2019-07-25 04:11

关注

一站式Kafka平台解决方案——KafkaCenter

KafkaCenter是什么

KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。

对于Kafka的平台化,一直缺少一个成熟的解决方案,之前比较流行的kafka监控方案,如kafka-manager提供了集群管理与topic管理等等功能。但是对于生产者、消费者的监控,以及Kafka的新生态,如Connect,KSQL还缺少响应的支持。Confluent Control Center功能要完整一些,但却是非开源收费的。

对于Kafka的使用,一直都是一个让人头疼的问题,由于实时系统的强运维特性,我们不得不投入大量的时间用于集群的维护,kafka的运维,比如:

功能模块介绍

系统截图:

安装与入门

安装需要依赖 mysql es email server

组件 是否必须 功能
mysql 必须 配置信息存在mysql
elasticsearch(7.0+) 可选 各种监控信息的存储
email server 可选 Apply, approval, warning e-mail alert

1、初始化

在MySQL中执行sql建表

-- Dumping database structure for kafka_center
CREATE DATABASE IF NOT EXISTS `kafka_center` ;
USE `kafka_center`;


-- Dumping structure for table kafka_center.alert_group
CREATE TABLE IF NOT EXISTS `alert_group` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `threshold` int(11) DEFAULT NULL,
  `dispause` int(11) DEFAULT NULL,
  `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_date` datetime DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `disable_alerta` tinyint(1) DEFAULT 0,
  `enable` tinyint(1) NOT NULL DEFAULT 1,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.cluster_info
CREATE TABLE IF NOT EXISTS `cluster_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL,
  `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `enable` int(11) DEFAULT NULL,
  `broker_size` int(4) DEFAULT 0,
  `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT "",
  `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.ksql_info
CREATE TABLE IF NOT EXISTS `ksql_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) DEFAULT NULL,
  `cluster_name` varchar(255) DEFAULT NULL,
  `ksql_url` varchar(255) DEFAULT NULL,
  `ksql_serverId` varchar(255) DEFAULT NULL,
  `version` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.task_info
CREATE TABLE IF NOT EXISTS `task_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT "",
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `message_rate` int(50) DEFAULT NULL,
  `ttl` int(11) DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  `approved` int(11) DEFAULT NULL,
  `approved_id` int(11) DEFAULT NULL,
  `approved_time` datetime DEFAULT NULL,
  `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.team_info
CREATE TABLE IF NOT EXISTS `team_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_collection
CREATE TABLE IF NOT EXISTS `topic_collection` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `user_id` int(11) NOT NULL,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.topic_info
CREATE TABLE IF NOT EXISTS `topic_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cluster_id` int(11) NOT NULL,
  `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `partition` int(11) DEFAULT NULL,
  `replication` int(11) DEFAULT NULL,
  `ttl` bigint(11) DEFAULT NULL,
  `config` varchar(512) COLLATE utf8_bin DEFAULT NULL,
  `owner_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT "",
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_info
CREATE TABLE IF NOT EXISTS `user_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `real_name` varchar(255) COLLATE utf8_bin DEFAULT "",
  `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "",
  `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT "100",
  `create_time` datetime DEFAULT NULL,
  `password` varchar(255) COLLATE utf8_bin DEFAULT "",
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

-- Data exporting was unselected.


-- Dumping structure for table kafka_center.user_team
CREATE TABLE IF NOT EXISTS `user_team` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `team_id` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

2、配置

相关配置位于application.properties

可对端口 日志等信息做一些修改

server.port=8080
debug=false
# 设置session timeout为6小时
server.servlet.session.timeout=21600
spring.security.user.name=admin
spring.security.user.password=admin
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/kafka_center?useUnicode=true&characterEncoding=utf-8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=15
spring.datasource.hikari.pool-name=KafkaCenterHikariCP
spring.datasource.hikari.max-lifetime =30000
spring.datasource.hikari.connection-test-query=SELECT 1
management.health.defaults.enabled=false

public.url=http://localhost:8080
connect.url=http://localhost:8000/#/
system.topic.ttl.h=16

monitor.enable=true
monitor.collect.period.minutes=5
monitor.elasticsearch.hosts=localhost:9200
monitor.elasticsearch.index=kafka_center_monitor
#是否启用收集线程指定集群收集
monitor.collector.include.enable=false
#收集线程指定location,必须属于remote.locations之中
monitor.collector.include.location=dev
collect.topic.enable=true
collect.topic.period.minutes=10
# remote的功能是为了提高lag查询和收集,解决跨location网络延迟问题
remote.query.enable=false
remote.hosts=gqc@localhost2:8080
remote.locations=dev,gqc
#发送consumer group的lag发送给alert service
alert.enable=false
alert.dispause=2
alert.service=
alert.threshold=1000
alter.env=other
#是否开启邮件功能,true:启用,false:禁用
mail.enable=false
spring.mail.host=
spring.mail.username=KafkaCenter@xaecbd.com
# oauth2
generic.enabled=false
generic.name=oauth2 Login
generic.auth_url=
generic.token_url=
generic.redirect_utl=
generic.api_url=
generic.client_id=
generic.client_secret=
generic.scopes=

3、运行

推荐使用docker

docker run -d -p 8080:8080 --name KafkaCenter -v ${PWD}/application.properties:/opt/app/kafka-center/config/application.properties xaecbd/kafka-center:2.1.0

不用docker

$ git clone https://github.com/xaecbd/KafkaCenter.git
$ cd KafkaCenter
$ mvn clean package -Dmaven.test.skip=true
$ cd KafkaCenterKafkaCenter-Core	arget
$ java -jar KafkaCenter-Core-2.1.0-SNAPSHOT.jar

4、查看

访问http://localhost:8080 管理员用户与密码默认:admin / admin

功能介绍

Topics

用户可以在此模块完成Topic查看,已经申请新建Topic,同时可以对Topic进行生产消费测试。

Monitor

用户可以在此模块中可以查看Topic的生成以及消费情况,同时可以针对消费延迟情况设置预警信息。

Alerts

此模块用于维护预警信息。用户可以看到自己所有预警信息,管理员可以看到所有人的预警信息。

Kafka Connect

实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。

KSQL

实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。

Approve

此模块主要用于当普通用户申请创建Topic 或者Job时,管理员进行审批操作。

Setting

此模块主要功能为管理员维护User、Team以及kafka cluster信息

Cluster Manager

此模块用于管理员对集群的正常维护操作。

Home

这里是一些基本的统计信息

My Favorite

集群与topic列表

Topic

这里是一些topic的管理功能

Topic List

操作范围:

用户所属Team的所有Topic

申请创建topic

Important: admin不能申请task,普通用户必须先让管理员新建team后,将用户加入指定team后,才可以申请task。

操作范围:

用户所属Team的所有Task

Topic命名规则:

只能包含:数字、大小写字母、下划线、中划线、点;长度大于等于3小于等于100。

不推荐:下划线开头;

可对所有Topic进行消费测试

Monitor

监控模块

生产者监控

消费者监控

消息积压

报警功能

Connect

这里是一些Connect的操作

KSQL

可以进行KQL的查询操作

Approve

这里主要是管理员做一些审核操作

Kafka Manager
Topic管理

Cluster管理

broker管理

group管理

Setting

这些主要是用户的一些设置

KafkaCenter还是一个非常不错的kafka管理工具,可以满足大部分需求。
更多实时数据分析相关博文与科技资讯,欢迎关注 “实时流式计算”

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯