文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Kafka之kafka-topics.sh如何使用

2023-07-05 10:23

关注

本文小编为大家详细介绍“Kafka之kafka-topics.sh如何使用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Kafka之kafka-topics.sh如何使用”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

一、kafka的基本操作

1.1、创建topic

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:

成功则显示:

Created topic "test".

1.2、查看topic

sh kafka-topics.sh --list --zookeeper localhost:2181

显示:

test

1.3、查看topic属性

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

显示:

Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

1.4、发送消息

sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

发送端输入:

>hello>where are you>let's go

1.5、消费消息

sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test--from-beginning

消费端显示:

hellowhere are youlet's go^CProcessed a total of 3 messages

二、kafka-topics.sh 使用方式

创建、修改、删除以及查看等功能。

2.1、查看帮助

/bin目录下的每一个脚本工具,都有着众多的参数选项,不可能所有命令都记得住,这些脚本都可以使用 --help 参数来打印列出其所需的参数信息。

$ sh kafka-topics.sh --helpCommand must include exactly one action: --list, --describe, --create, --alter or --deleteOption                                   Description                            ------                                   -----------                            --alter                                  Alter the number of partitions,                                                   replica assignment, and/or                                                      configuration for the topic.         --config <String: name=value>            A topic configuration override for the                                            topic being created or altered.The                                              following is a list of valid                                                    configurations:                                                                   cleanup.policy                                                                     compression.type                                                                   delete.retention.ms                                                                file.delete.delay.ms                                                               flush.messages                                                                     flush.ms                                                                           follower.replication.throttled.                                                  replicas                                                                          index.interval.bytes                                                               leader.replication.throttled.replicas                                              max.message.bytes                                                                  message.downconversion.enable                                                      message.format.version                                                             message.timestamp.difference.max.ms                                                message.timestamp.type                                                             min.cleanable.dirty.ratio                                                          min.compaction.lag.ms                                                              min.insync.replicas                                                                preallocate                                                                        retention.bytes                                                                    retention.ms                                                                       segment.bytes                                                                      segment.index.bytes                                                                segment.jitter.ms                                                                  segment.ms                                                                         unclean.leader.election.enable                                                 See the Kafka documentation for full                                              details on the topic configs.        --create                                 Create a new topic.                    --delete                                 Delete a topic                         --delete-config <String: name>           A topic configuration override to be                                              removed for an existing topic (see                                              the list of configurations under the                                            --config option).                    --describe                               List details for the given topics.     --disable-rack-aware                     Disable rack aware replica assignment  --force                                  Suppress console prompts               --help                                   Print usage information.               --if-exists                              if set when altering or deleting                                                  topics, the action will only execute                                            if the topic exists                  --if-not-exists                          if set when creating topics, the                                                  action will only execute if the                                                 topic does not already exist         --list                                   List all available topics.             --partitions <Integer: # of partitions>  The number of partitions for the topic                                            being created or altered (WARNING:                                              If partitions are increased for a                                               topic that has a key, the partition                                             logic or ordering of the messages                                               will be affected                     --replica-assignment <String:            A list of manual partition-to-broker     broker_id_for_part1_replica1 :           assignments for the topic being        broker_id_for_part1_replica2 ,           created or altered.                    broker_id_for_part2_replica1 :                                                  broker_id_for_part2_replica2 , ...>                                           --replication-factor <Integer:           The replication factor for each          replication factor>                      partition in the topic being created.--topic <String: topic>                  The topic to be create, alter or                                                  describe. Can also accept a regular                                             expression except for --create option--topics-with-overrides                  if set when describing topics, only                                               show topics that have overridden                                                configs                              --unavailable-partitions                 if set when describing topics, only                                               show partitions whose leader is not                                             available                            --under-replicated-partitions            if set when describing topics, only                                               show under replicated partitions     --zookeeper <String: hosts>              REQUIRED: The connection string for                                               the zookeeper connection in the form                                            host:port. Multiple hosts can be                                                given to allow fail-over.

2.2、副本数量规则

副本数量不能大于broker的数量。

kafka 创建主题的时候其副本数量不能大于broker的数量,否则创建主题 topic 失败。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic test1

报错:

Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2022-11-24 14:08:18,745] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
 (kafka.admin.TopicCommand$)

注意:副本数量和分区数量的区别。

2.3、创建主题

创建主题时候,有3个参数是必填的:

同时还需使用 --create 参数表明本次操作是想要创建一个主题操作。

sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

返回显示:

Created topic "test1".

另外在创建主题的时候,还可以附加以下两个选项:&ndash;if-not-exists 和 --if-exists . 第一个参数表明仅当该主题不存在时候,创建; 第二个参数表明当修改或删除这个主题时候,仅在该主题存在的时候去执行操作。

2.4、查看broker上所有的主题

&ndash;list。

sh kafka-topics.sh --list --zookeeper localhost:2181

结果显示:

__consumer_offsets
test
test1

2.5、查看指定主题 topic 的详细信息

&ndash;describe。

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

结果显示:

Topic:test1    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

2.6、修改主题信息之增加主题分区数量

&ndash;alter。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test1 --alter --partitions 2

结果显示:

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

查看主题信息:

sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

可以看到已经成功的将主题的分区数量从1修改为了2。

Topic:test1    PartitionCount:2    ReplicationFactor:1    Configs:    Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0    Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0

当去修改一个不存在的topic信息时(比如修改主题 test2,当前这主题是不存在的)。

sh kafka-topics.sh --zookeeper localhost:2181 --topic test2 --alter --partitions 2

会报错:

Error while executing topic command : Topic test2 does not exist on ZK path localhost:2181
[2022-11-24 14:21:33,564] ERROR java.lang.IllegalArgumentException: Topic test2 does not exist on ZK path localhost:2181
    at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:123)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:65)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)

注意:不要使用 --alter 去尝试减少分区的数量,如果非要减少分区的数量,只能删除整个主题 topic, 然后重新创建。

2.7、删除主题

&ndash;delete。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

日志信息提示,主题 test1已经被标记删除状态,但是若delete.topic.enable 没有设置为 true , 则将不会有任何作用。

Topic test1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

可以测试一些:

# 一个终端启动生产者:sh kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test1# 另一个终端启动消费者:sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test1--from-beginning

发现此时还是可以发送消息和接收消息。如果要支持能够删除主题的操作,则需要在 /bin 的同级目录 /config目录下的文件server.properties中,修改配置delete.topic.enable=true(如果置为false,则kafka broker 是不允许删除主题的)。

然后就重启kafka:

# 停止:sh kafka-server-stop.sh -daemon ../config/server.properties# 启动:sh kafka-server-start.sh -daemon ../config/server.properties

再次删除就可以了。

sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test1

读到这里,这篇“Kafka之kafka-topics.sh如何使用”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯