關注我
,回復關鍵字「spring」,來源:https://blog.csdn.net/yuechuzhixing/article/details/124725713
消息監聽容器1、KafkaMessageListenerContainer由spring提供用於監聽以及拉取消息,並將這些消息按指定格式轉換後交給由@KafkaListener註解的方法處理,相當於一個消費者;
看看其整體代碼結構:
圖片可以發現其入口方法為doStart(), 往上追溯到實現了SmartLifecycle接口,很明顯,由spring管理其start和stop操作;ListenerConsumer, 內部真正拉取消息消費的是這個結構,其 實現了Runable接口,簡言之,它就是一個後台線程輪訓拉取並處理消息在doStart方法中會創建ListenerConsumer並交給線程池處理KafkaMessageListenerContainer#doStartprotectedvoiddoStart(){if(isRunning()){return;}if(this.clientIdSuffix==null){//stand-alonecontainercheckTopics();}ContainerPropertiescontainerProperties=getContainerProperties();checkAckMode(containerProperties);......//創建ListenerConsumer消費者並放入到線程池中執行this.listenerConsumer=newListenerConsumer(listener,listenerType);setRunning(true);this.startLatch=newCountDownLatch(1);this.listenerConsumerFuture=containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);......}KafkaMessageListenerContainer.ListenerConsumer#runpublicvoidrun(){//NOSONARcomplexity.......ThrowableexitThrowable=null;while(isRunning()){try{//拉去消息並處理消息pollAndInvoke();}catch(@SuppressWarnings(UNUSED)WakeupExceptione){......}......}wrapUp(exitThrowable);}2、ConcurrentMessageListenerContainer並發消息監聽,相當於創建消費者;其底層邏輯仍然是通過KafkaMessageListenerContainer實現處理;從實現上看就是在KafkaMessageListenerContainer上做了層包裝,有多少的concurrency就創建多個KafkaMessageListenerContainer,也就是concurrency個消費者
圖片ConcurrentMessageListenerContainer#doStartprotectedvoiddoStart(){if(!isRunning()){checkTopics();......setRunning(true);for(inti=0;i<this.concurrency;i++){KafkaMessageListenerContainer<K,V>container=constructContainer(containerProperties,topicPartitions,i);StringbeanName=getBeanName();container.setBeanName((beanName!=null?beanName:"consumer")+"-"+i);......if(isPaused()){container.pause();}//這裡調用KafkaMessageListenerContainer啟動相關監聽方法container.start();this.containers.add(container);}}}@KafkaListener底層監聽原理上面已經介紹了KafkaMessageListenerContainer的作用是拉取並處理消息,但還缺少關鍵的一步,即 如何將我們的業務邏輯與KafkaMessageListenerContainer的處理邏輯聯繫起來?
那麼這個橋樑就是@KafkaListener註解
KafkaListenerAnnotationBeanPostProcessor, 從後綴BeanPostProcessor就可以知道這是Spring IOC初始化bean相關的操作,當然這裡也是;此類會掃描帶@KafkaListener註解的類或者方法,通過 KafkaListenerContainerFactory工廠創建對應的KafkaMessageListenerContainer,並調用start方法啟動監聽,也就是這樣打通了這條路…Spring Boot 自動加載kafka相關配置1、KafkaAutoConfiguration自動生成kafka相關配置,比如當缺少這些bean的時候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默認創建bean實例2、KafkaAnnotationDrivenConfiguration主要是針對於spring-kafka提供的註解背後的相關操作,比如 @KafkaListener;在開啟了@EnableKafka註解後,spring會掃描到此配置並創建缺少的bean實例,比如當配置的工廠beanName不是kafkaListenerContainerFactory的時候,就會默認創建一個beanName為kafkaListenerContainerFactory的實例,這也是為什麼在springboot中不用定義consumer的相關配置也可以通過@KafkaListener正常的處理消息生產配置1、單條消息處理@Configuration@EnableKafkapublicclassKafkaConfig{@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);returnfactory;}@BeanpublicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object>props=newHashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,embeddedKafka.getBrokersAsString());...returnprops;}}@KafkaListener(id="myListener",topics="myTopic",autoStartup="${listen.auto.start:true}",concurrency="${listen.concurrency:3}")publicvoidlisten(Stringdata){...}2、批量處理@Configuration@EnableKafkapublicclassKafkaConfig{@BeanpublicKafkaListenerContainerFactory<?,?>batchFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setBatchListener(true);//<<<<<<<<<<<<<<<<<<<<<<<<<returnfactory;}@BeanpublicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object>props=newHashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,embeddedKafka.getBrokersAsString());...returnprops;}}@KafkaListener(id="list",topics="myTopic",containerFactory="batchFactory")publicvoidlisten(List<String>list){...}3、同一個消費組支持單條和批量處理場景:
生產上最初都採用單條消費模式,隨着量的積累,部分topic常常出現消息積壓,最開始通過新增消費者實例和分區來提升消費端的能力;一段時間後又開始出現消息積壓,由此便從代碼層面通過批量消費來提升消費能力。
只對部分topic做批量消費處理簡單的說就是需要配置批量消費和單條記錄消費(從單條消費逐步向批量消費演進)
假設最開始就是配置的單條消息處理的相關配置,原配置基本不變然後新配置 批量消息監聽KafkaListenerContainerFactory@Configuration@EnableKafkapublicclassKafkaConfig{@Bean(name=[batchListenerContainerFactory])publicKafkaListenerContainerFactory<?,?>batchFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String>factory=newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//開啟批量處理factory.setBatchListener(true);returnfactory;}@Bean(name=[batchConsumerFactory])publicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}@Bean(name=[batchConsumerConfig])publicMap<String,Object>consumerConfigs(){Map<String,Object>props=newHashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,embeddedKafka.getBrokersAsString());...returnprops;}}注意點:
如果自定義的ContainerFactory其beanName不是kafkaListenerContainerFactory,spring會通過KafkaAnnotationDrivenConfiguration創建新的bean實例,所以需要注意的是你最終的@KafkaListener會使用到哪個ContainerFactory單條或在批量處理的ContainerFactory可以共存,默認會使用beanName為kafkaListenerContainerFactory的bean實例,因此你可以為batch container Factory實例指定不同的beanName,並在@KafkaListener使用的時候指定containerFactory即可總結spring為了將kafka融入其生態,方便在spring大環境下使用kafka,開發了spring-kafa這一模塊,本質上是為了幫助開發者更好的以spring的方式使用kafka@KafkaListener就是這麼一個工具,在同一個項目中既可以有單條的消息處理,也可以配置多條的消息處理,稍微改變下配置即可實現,很是方便當然,@KafkaListener單條或者多條消息處理仍然是spring自行封裝處理,與kafka-client客戶端的拉取機制無關;比如一次性拉取50條消息,對於單條處理來說就是循環50次處理,而多條消息處理則可以一次性處理50條;本質上來說這套邏輯都是spring處理的,並不是說單條消費就是通過kafka-client一次只拉取一條消息在使用過程中需要注意spring自動的創建的一些bean實例,當然也可以覆蓋其自動創建的實例以滿足特定的需求場景
面試被問到了String相關的幾道題,你能答上來嗎?Spring Cloud Alibaba基礎教程:使用Nacos作為配置中心VS Code 7 月更新:Spring 支持再增強,Lombok 支持重大提升!
關注後端面試那些事,回復【2022面經】
獲取最新大廠Java面經
最後重要提示:高質量的技術交流群,限時免費開放,今年抱團最重要。想進群的,關注SpringForAll社區
,回復關鍵詞:加群,拉你進群。