@Configuration(
proxyBeanMethods = false
@EnableKafka
@ConditionalOnMissingBean(
name = {"org.springframework.kafka.config.internalKafkaListenerAnnotationProcessor"}
static class EnableKafkaConfiguration {
EnableKafkaConfiguration() {
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] { KafkaBootstrapConfiguration.class.getName() };
public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
@Override
public void run() {
while (isRunning()) {
try {
pollAndInvoke();
wrapUp();
protected void pollAndInvoke() {
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
idleBetweenPollIfNecessary();
if (this.seeks.size() > 0) {
processSeeks();
pauseConsumerIfNecessary();
this.lastPoll = System.currentTimeMillis();
this.polling.set(true);
ConsumerRecords<K, V> records = doPoll();
if (!this.polling.compareAndSet(true, false) && records != null) {
* There is a small race condition where wakeIfNecessary was called between
* exiting the poll and before we reset the boolean.
if (records.count() > 0) {
this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
return;
resumeConsumerIfNeccessary();
debugRecords(records);
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
this.lastReceive = System.currentTimeMillis();
invokeListener(records);
else {
checkIdle();