close
關注我,回復關鍵字「spring」,
免費領取Spring學習資料。

來源:https://blog.csdn.net/yuechuzhixing/article/details/124725713

消息監聽容器1、KafkaMessageListenerContainer

由spring提供用於監聽以及拉取消息,並將這些消息按指定格式轉換後交給由@KafkaListener註解的方法處理,相當於一個消費者;

看看其整體代碼結構:

圖片
可以發現其入口方法為doStart(), 往上追溯到實現了SmartLifecycle接口,很明顯,由spring管理其start和stop操作;
ListenerConsumer, 內部真正拉取消息消費的是這個結構,其 實現了Runable接口,簡言之,它就是一個後台線程輪訓拉取並處理消息
在doStart方法中會創建ListenerConsumer並交給線程池處理
以上步驟就開啟了消息監聽過程
KafkaMessageListenerContainer#doStartprotectedvoiddoStart(){if(isRunning()){return;}if(this.clientIdSuffix==null){//stand-alonecontainercheckTopics();}ContainerPropertiescontainerProperties=getContainerProperties();checkAckMode(containerProperties);......//創建ListenerConsumer消費者並放入到線程池中執行this.listenerConsumer=newListenerConsumer(listener,listenerType);setRunning(true);this.startLatch=newCountDownLatch(1);this.listenerConsumerFuture=containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);......}KafkaMessageListenerContainer.ListenerConsumer#runpublicvoidrun(){//NOSONARcomplexity.......ThrowableexitThrowable=null;while(isRunning()){try{//拉去消息並處理消息pollAndInvoke();}catch(@SuppressWarnings(UNUSED)WakeupExceptione){......}......}wrapUp(exitThrowable);}2、ConcurrentMessageListenerContainer

並發消息監聽,相當於創建消費者;其底層邏輯仍然是通過KafkaMessageListenerContainer實現處理;從實現上看就是在KafkaMessageListenerContainer上做了層包裝,有多少的concurrency就創建多個KafkaMessageListenerContainer,也就是concurrency個消費者

圖片ConcurrentMessageListenerContainer#doStartprotectedvoiddoStart(){if(!isRunning()){checkTopics();......setRunning(true);for(inti=0;i<this.concurrency;i++){KafkaMessageListenerContainer<K,V>container=constructContainer(containerProperties,topicPartitions,i);StringbeanName=getBeanName();container.setBeanName((beanName!=null?beanName:"consumer")+"-"+i);......if(isPaused()){container.pause();}//這裡調用KafkaMessageListenerContainer啟動相關監聽方法container.start();this.containers.add(container);}}}@KafkaListener底層監聽原理

上面已經介紹了KafkaMessageListenerContainer的作用是拉取並處理消息,但還缺少關鍵的一步,即 如何將我們的業務邏輯與KafkaMessageListenerContainer的處理邏輯聯繫起來?

那麼這個橋樑就是@KafkaListener註解

KafkaListenerAnnotationBeanPostProcessor, 從後綴BeanPostProcessor就可以知道這是Spring IOC初始化bean相關的操作,當然這裡也是;此類會掃描帶@KafkaListener註解的類或者方法,通過 KafkaListenerContainerFactory工廠創建對應的KafkaMessageListenerContainer,並調用start方法啟動監聽,也就是這樣打通了這條路…
Spring Boot 自動加載kafka相關配置1、KafkaAutoConfiguration
自動生成kafka相關配置,比如當缺少這些bean的時候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默認創建bean實例
2、KafkaAnnotationDrivenConfiguration
主要是針對於spring-kafka提供的註解背後的相關操作,比如 @KafkaListener;
在開啟了@EnableKafka註解後,spring會掃描到此配置並創建缺少的bean實例,比如當配置的工廠beanName不是kafkaListenerContainerFactory的時候,就會默認創建一個beanName為kafkaListenerContainerFactory的實例,這也是為什麼在springboot中不用定義consumer的相關配置也可以通過@KafkaListener正常的處理消息
生產配置1、單條消息處理@Configuration@EnableKafkapublicclassKafkaConfig{@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);returnfactory;}@BeanpublicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object>props=newHashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,embeddedKafka.getBrokersAsString());...returnprops;}}@KafkaListener(id="myListener",topics="myTopic",autoStartup="${listen.auto.start:true}",concurrency="${listen.concurrency:3}")publicvoidlisten(Stringdata){...}2、批量處理@Configuration@EnableKafkapublicclassKafkaConfig{@BeanpublicKafkaListenerContainerFactory<?,?>batchFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setBatchListener(true);//<<<<<<<<<<<<<<<<<<<<<<<<<returnfactory;}@BeanpublicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object>props=newHashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,embeddedKafka.getBrokersAsString());...returnprops;}}@KafkaListener(id="list",topics="myTopic",containerFactory="batchFactory")publicvoidlisten(List<String>list){...}3、同一個消費組支持單條和批量處理

場景:

生產上最初都採用單條消費模式,隨着量的積累,部分topic常常出現消息積壓,最開始通過新增消費者實例和分區來提升消費端的能力;一段時間後又開始出現消息積壓,由此便從代碼層面通過批量消費來提升消費能力。

只對部分topic做批量消費處理

簡單的說就是需要配置批量消費和單條記錄消費(從單條消費逐步向批量消費演進)

假設最開始就是配置的單條消息處理的相關配置,原配置基本不變
然後新配置 批量消息監聽KafkaListenerContainerFactory
@Configuration@EnableKafkapublicclassKafkaConfig{@Bean(name=[batchListenerContainerFactory])publicKafkaListenerContainerFactory<?,?>batchFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//開啟批量處理factory.setBatchListener(true);returnfactory;}@Bean(name=[batchConsumerFactory])publicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}@Bean(name=[batchConsumerConfig])publicMap<String,Object>consumerConfigs(){Map<String,Object>props=newHashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,embeddedKafka.getBrokersAsString());...returnprops;}}

注意點:

如果自定義的ContainerFactory其beanName不是kafkaListenerContainerFactory,spring會通過KafkaAnnotationDrivenConfiguration創建新的bean實例,所以需要注意的是你最終的@KafkaListener會使用到哪個ContainerFactory
單條或在批量處理的ContainerFactory可以共存,默認會使用beanName為kafkaListenerContainerFactory的bean實例,因此你可以為batch container Factory實例指定不同的beanName,並在@KafkaListener使用的時候指定containerFactory即可
總結
spring為了將kafka融入其生態,方便在spring大環境下使用kafka,開發了spring-kafa這一模塊,本質上是為了幫助開發者更好的以spring的方式使用kafka
@KafkaListener就是這麼一個工具,在同一個項目中既可以有單條的消息處理,也可以配置多條的消息處理,稍微改變下配置即可實現,很是方便
當然,@KafkaListener單條或者多條消息處理仍然是spring自行封裝處理,與kafka-client客戶端的拉取機制無關;比如一次性拉取50條消息,對於單條處理來說就是循環50次處理,而多條消息處理則可以一次性處理50條;本質上來說這套邏輯都是spring處理的,並不是說單條消費就是通過kafka-client一次只拉取一條消息
在使用過程中需要注意spring自動的創建的一些bean實例,當然也可以覆蓋其自動創建的實例以滿足特定的需求場景


END


面試被問到了String相關的幾道題,你能答上來嗎?
Spring Cloud Alibaba基礎教程:使用Nacos作為配置中心
ping 命令還能這麼玩?
VS Code 7 月更新:Spring 支持再增強,Lombok 支持重大提升!
45 個 Git 經典操作場景,專治不會合代碼

關注後端面試那些事,回復【2022面經】

獲取最新大廠Java面經


最後重要提示:高質量的技術交流群,限時免費開放,今年抱團最重要。想進群的,關注SpringForAll社區,回復關鍵詞:加群,拉你進群。




點擊「閱讀原文」領取2022大廠面經
↓↓↓
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 鑽石舞台 的頭像
    鑽石舞台

    鑽石舞台

    鑽石舞台 發表在 痞客邦 留言(0) 人氣()