国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 開發 > Java > 正文

淺談spring-boot-rabbitmq動態管理的方法

2024-07-13 10:15:35
字體:
來源:轉載
供稿:網友

使用spring boot + rabbitmq的時候,在開發過程中,可能會想要臨時停用/啟用監聽,或修改監聽消費者數量。如果每次修改都重啟比較浪費時間,所以研究了一下不停機就啟用停用監聽或修改一些配置

一. 關于rabbitmq監聽的配置

  1. 配置屬性類:RabbitProperties,包含rabbitmq的認證、監聽、發送者以及其他的一些配置
  2. 自動配置類:RabbitAutoConfiguration,主要配置rabbitmq的連接工廠和發送者等,不包含監聽的配置
  3. rabbitmq監聽的配置是RabbitAnnotationDrivenConfiguration,是通過RabbitAutoConfiguration引入的
@Configuration@ConditionalOnClass({ RabbitTemplate.class, Channel.class })@EnableConfigurationProperties(RabbitProperties.class)@Import(RabbitAnnotationDrivenConfiguration.class)public class RabbitAutoConfiguration {  ...}

RabbitAnnotationDrivenConfiguration中主要就是監聽工廠的配置、監聽工廠,但是這里也只是創建bean,并沒有真正的初始化

通過配置里的bean類名,分析一下,rabbitmq的監聽肯定是由監聽工廠創建的,所以找到監聽工廠SimpleRabbitListenerContainerFactory

@Bean@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();  configurer.configure(factory, connectionFactory);  return factory;}

既然自動配置里面沒有初始化監聽,那就應該是在其他地方調用的,進入監聽工廠類中,發現有initializeContainer(SimpleMessageListenerContainer instance)方法,猜測初始化肯定與這個方法有關,所以查看有哪些地方調用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有創建監聽容器和初始化的代碼

/** * Create and start a new {@link MessageListenerContainer} using the specified factory. * @param endpoint the endpoint to create a {@link MessageListenerContainer}. * @param factory the {@link RabbitListenerContainerFactory} to use. * @return the {@link MessageListenerContainer}. */protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {  MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);  if (listenerContainer instanceof InitializingBean) {   try {      ((InitializingBean) listenerContainer).afterPropertiesSet();   }   catch (Exception ex) {      throw new BeanInitializationException("Failed to initialize message listener container", ex);   }  }  int containerPhase = listenerContainer.getPhase();  if (containerPhase < Integer.MAX_VALUE) { // a custom phase value   if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {      throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +       this.phase + " vs " + containerPhase);   }   this.phase = listenerContainer.getPhase();  }    return listenerContainer;}

繼續找調用這個方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,發現調用的地方很多了

spring,boot,rabbitmq,動態管理

看看afterPropertiesSet方法,是InitializingBean接口中的,猜測應該是spring容器創建bean之后都會調用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里創建的實例。原來是在RabbitListenerAnnotationBeanPostProcessor中的私有屬性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration這個自動配置里面初始化的,所以這就找到rabbitmq初始化監聽的源頭了

二. 動態管理rabbitmq監聽

回到最初的問題,想要動態的啟用停用mq的監聽,所以先看看初始化配置的類,既然有初始化,那可能會有相關的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有對監聽容器進行操作,主要源碼如下

/** * @return the managed {@link MessageListenerContainer} instance(s). */public Collection<MessageListenerContainer> getListenerContainers() {  return Collections.unmodifiableCollection(this.listenerContainers.values());} @Overridepublic void start() {  for (MessageListenerContainer listenerContainer : getListenerContainers()) {   startIfNecessary(listenerContainer);  }}/** * Start the specified {@link MessageListenerContainer} if it should be started * on startup or when start is called explicitly after startup. * @see MessageListenerContainer#isAutoStartup() */private void startIfNecessary(MessageListenerContainer listenerContainer) {  if (this.contextRefreshed || listenerContainer.isAutoStartup()) {   listenerContainer.start();  }}@Overridepublic void stop() {  for (MessageListenerContainer listenerContainer : getListenerContainers()) {   listenerContainer.stop();  }}

寫個controller,注入RabbitListenerEndpointRegistry,使用start()和stop()對監聽進行啟用停用的操作,并且RabbitListenerEndpointRegistry實例還可以獲取監聽容器,對監聽的一些參數也能進行修改,比如消費者數量。代碼如下:

import java.util.Set;import javax.annotation.Resource;import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import com.itopener.framework.ResultMap;/** * Created by fuwei.deng on 2017年7月24日. */@RestController@RequestMapping("rabbitmq/listener")public class RabbitMQController {  @Resource  private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;    @RequestMapping("stop")  public ResultMap stop(){   rabbitListenerEndpointRegistry.stop();   return ResultMap.buildSuccess();  }    @RequestMapping("start")  public ResultMap start(){   rabbitListenerEndpointRegistry.start();   return ResultMap.buildSuccess();  }    @RequestMapping("setup")  public ResultMap setup(int consumer, int maxConsumer){   Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds();   SimpleMessageListenerContainer container = null;   for(String id : containerIds){   container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id);   if(container != null){    container.setConcurrentConsumers(consumer);    container.setMaxConcurrentConsumers(maxConsumer);   }   }   return ResultMap.buildSuccess();  }}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 双柏县| 靖安县| 瑞丽市| 阳东县| 桐庐县| 凌海市| 高尔夫| 枣阳市| 沂南县| 洛南县| 阳江市| 顺义区| 金塔县| 汤阴县| 信阳市| 腾冲县| 舟曲县| 富源县| 上犹县| 浦县| 余姚市| 页游| 安图县| 遵化市| 三门峡市| 盘山县| 修武县| 孟津县| 三原县| 吕梁市| 烟台市| 肇庆市| 治多县| 泸溪县| 丹棱县| 沅江市| 辽阳市| 抚远县| 尖扎县| 石河子市| 安国市|