在 Kafka 中,生产者发送消息后,可以通过以下几种方式判断消息是否发送成功:
- 同步发送方式:生产者调用
send()
方法后,会等待消息的确认返回,如果发送成功,send()
方法会返回一个RecordMetadata
对象,其中包含了消息的元数据信息;如果发送失败,则可能抛出异常。
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value");
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功,offset:" + metadata.offset());
} catch (Exception e) {
System.out.println("消息发送失败:" + e.getMessage());
}
- 异步发送方式:生产者调用
send()
方法后,可以传递一个回调函数,在消息发送完成后,会调用该回调函数,通过回调函数可以获取到发送结果。
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
System.out.println("消息发送失败:" + e.getMessage());
} else {
System.out.println("消息发送成功,offset:" + metadata.offset());
}
}
});
无论是同步发送还是异步发送,如果发送失败,可以根据异常信息进行错误处理。