一、在java中配置pom
junit junit 4.11 test org.apache.kafka kafka-clients 2.8.0 org.apache.kafka kafka_2.12 2.8.0
二、生产者方法
(1)、Producer
Java中写在生产者输入内容在kafka中可以让消费者提取
[root@kb144 config]# kafka-console-consumer.sh --bootstrap-server 192.168.153.144:9092 --topic kb22
package nj.zb.kb22.Kafka;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.Scanner;public class MyProducer { public static void main(String[] args) { Properties properties = new Properties(); //生产者的配置文件 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092"); //key的序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //value的序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); properties.put(ProducerConfig.ACKS_CONFIG,"1"); KafkaProducer producer = new KafkaProducer(properties); Scanner scanner = new Scanner(System.in); while (true){ System.out.println("请输入kafka的内容"); String msg =scanner.next(); ProducerRecord record = new ProducerRecord("kb22",msg); producer.send(record); } }}
(2)、Producer进行多线程操作
生产者多线程是一种常见的技术实践,可以提高消息生产的并发性和吞吐量。通过将消息生产任务分配给多个线程来并行地发送消息,可以有效地利用系统资源,加快消息的发送速度。
package nj.zb.kb22.Kafka;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class MyProducer2 { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) {//i代表线程 Thread thread =new Thread(new Runnable() { @Override public void run() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); properties.put(ProducerConfig.ACKS_CONFIG,"0"); KafkaProducer producer = new KafkaProducer(properties); //多线程操作 j代表消息 for (int j = 0; j < 100; j++) { String msg=Thread.currentThread().getName()+" "+ j; System.out.println(msg); ProducerRecord re = new ProducerRecord("kb22", msg); producer.send(re); } } }); executorService.execute(thread); } executorService.shutdown(); while (true){ if (executorService.isTerminated()){ System.out.println("game over"); break; } } }}
三、消费者方法
(1)、Consumer
通过java来实现消费者
package nj.zb.kb22.Kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class MyConsumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); //设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3"); KafkaConsumer consumer = new KafkaConsumer(properties); //创建好kafka消费者对象后,订阅消息,指定消费的topic consumer.subscribe(Collections.singleton("kb22")); while (true){ Duration mills = Duration.ofMillis(100); ConsumerRecords records = consumer.poll(mills); for (ConsumerRecord record:records){ String topic = record.topic(); int partition = record.partition(); long offset = record.offset(); String key = record.key(); String value = record.value(); long timestamp = record.timestamp(); System.out.println("topic:"+topic+"\tpartition"+partition+"\toffset"+offset+"\tkey"+key+"\tvalue"+value+"\ttimestamp"+timestamp); } //consumer.commitAsync();//手动提交 } }}
(2)、设置多人访问
package nj.zb.kb22.Kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class MyConsumerThread { //模仿多人访问 public static void main(String[] args) { for (int i = 0; i <3; i++) { new Thread(new Runnable() { @Override public void run() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); //设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3"); KafkaConsumer consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singleton("kb22")); while (true){ //poll探寻数据 ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecordrecord:records){String topic = record.topic();int partition = record.partition();long offset = record.offset();String key = record.key();String value = record.value();long timestamp = record.timestamp();String name = Thread.currentThread().getName();System.out.println("name"+name +"\ttopic:"+topic +"\tpartition" +partition +"\toffset"+offset +"\tkey"+key +"\tvalue"+value +"\ttimestamp"+timestamp); } } } }).start(); } }}
来源地址:https://blog.csdn.net/ycz926940/article/details/131562785