import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
public class App {
@SuppressWarnings({ "resource", "unchecked" })
public static void main(String[] args) {
ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
// 使用 kafkaTemplate 发送消息
KafkaTemplate
kafkaTemplate = context.getBean(KafkaTemplate.class);
// 异步发送
kafkaTemplate.send("topic0", "message x1");
// 注册 callback
ListenableFuture> future = kafkaTemplate.send("topic0", "message x2");
future.addCallback(new ListenableFutureCallback>() {
@Override
public void onSuccess(SendResult result) {
RecordMetadata metadata = result.getRecordMetadata();
System.out.println("message sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());
@Override
public void onFailure(Throwable ex) {
System.out.println("send message failed with " + ex.getMessage());
// 使用阻塞
ListenableFuture> future2 = kafkaTemplate.send("topic0", "message x3");
try {
SendResult result = future2.get();
RecordMetadata metadata = result.getRecordMetadata();
System.out.println("message sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());
}catch (Exception e) {
System.out.println("send message failed with " + e.getMessage());
package shangbo.kafka.example7;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class AppConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ACKS_CONFIG, "1");
return props;
– 更多参见:Kafka 精萃
– 声 明:转载请注明出处
– Last Edited on 2018-06-14
– Written by ShangBo on 2018-06-14
– End
本文主要是使用 Java 语言中 spring-kafka 依赖 对 Kafka 进行使用。创建项目,先创建一个简单的 Maven 项目,删除无用的包、类之后,使用其作为一个父级项目。
源代码:from tkinter import *import win32guiimport win32conimport win32clipboard as wLOG_LINE_NUM = 0class Play():def __init__(self, init_window_name):self.init_window_name = init_window_namedef set_init_...
public void testKafkaTemplate () {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topic", "{\"key\": \"value\"}");
Complet
public class KafkaSendResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
* kafka发送成功回调
* @param
转自:http://blog.seasedge.cn/archives/15.html
Spring-Kafka史上最强入门教程:https://www.jianshu.com/c/0c9d83802b0c
spring kafka消息同步发送、异步发送、异步回调和分区路由发送同步发送异步发送异步发送失败回调分区路由发送
我们在使用spring kafka发送消息的时候是调用send方法点进send方法发现有很多重载方法,下面我们介绍下使用spring kafka同步/异步发送消息并接收异步回调
同步发送
最简单的同步发送方式如下:
public class KafkaServer {
@A...
Kafka Producer默认是异步发送。在初始化producer实例时,会创建一个sender线程负责批量发送消息;producer将消息暂存在缓冲区,消息根据topic-partition分类缓存;消息达到batch.size或者时间达到了linger.ms,sender线程将该批量的消息发送到topic-partition所在的broker一、异步发送消息KafkaTemplate默认是异...
Kafka生产者异步发送消息的方法,看似异步,实则可能阻塞。由于发送消息前需要获取元数据Metadata,如果一直获取失败(可能原因包括Broker连接失败、Topic未创建等),将导致长时间阻塞。这点与我们的一般理解不符,需要特别注意。