我有一个卡夫卡处理器是这样定义的。
import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.slf4j.LoggerFactory import org.springframework.context.annotation.Bean import org.springframework.stereotype.Component import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions import reactor.kafka.receiver.ReceiverRecord import reactor.kotlin.core.publisher.toMono import reactor.util.retry.Retry import java.time.Duration import java.util.* @Component class KafkaProcessor { private val logger = LoggerFactory.getLogger(javaClass) private val consumerProps = hashMapOf( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::javaClass, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::javaClass, ConsumerConfig.GROUP_ID_CONFIG to "groupId", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092" private val receiverOptions = ReceiverOptions.create<String, String>(consumerProps) .subscription(Collections.singleton("some-topic")) .commitInterval(Duration.ofSeconds(1)) .commitBatchSize(1000) .maxCommitAttempts(1) private val kafkaReceiver: KafkaReceiver<String, String> = KafkaReceiver.create(receiverOptions) @Bean fun processKafkaMessages(): Unit { kafkaReceiver.receive() .groupBy { m -> m.receiverOffset().topicPartition() } .flatMap { partitionFlux -> partitionFlux.publishOn(Schedulers.elastic()) .concatMap { receiverRecord -> processRecord(receiverRecord) .map { it.receiverOffset().acknowledge() } .retryWhen( Retry.backoff(3, Duration.ofSeconds(1)) .maxBackoff(Duration.ofSeconds(3)) .doBeforeRetry { rs -> logger.warn("Retrying: ${rs.totalRetries() + 1}/3 due to ${rs.failure()}") .onRetryExhaustedThrow { _, u -> logger.error("All ${u.totalRetries() + 1} attempts failed with the last exception: ${u.failure()}") u.failure() .subscribe() private fun processRecord(record: ReceiverRecord<String, String>): Mono<ReceiverRecord<String, String>> { return record.toMono()