RabbitMQ初步到精通-第十章-RabbitMQ之Spring客户端源码

news2025/1/16 1:31:17

目录

第十章-RabbitMQ之Spring客户端源码

        1. 前言

2. 客户端消费代码

2.1 消费的实现方式

2.2 消费中注解解释

2.3 推测Spring实现过程

3.MQ消费源码分析

3.1 集成SpringBoot 启动过程

3.2 Broker投递消息给客户端过程

3.3 客户端消费过程

4. 总结


第十章-RabbitMQ之Spring客户端源码

1. 前言

经过前面前面的学习,我们已经掌握了rabbitmq的基本用法,高级用法延迟队列、死信队列等,已经研究过了amqp-client的java客户端源码,由于我们在使用的时候,一般还是以SpringBoot为主,那经过Spring封装后的客户端源码是是如何实现的呢?

同学们最好需要有研读过 Spring源码及SpringBoot 源码的经验,会更好衔接一下,不过关系也不大。

由于Spring 体系的庞大,封装的rabbit客户端实现的功能也很多,例 创建连接、生产者推送消息,事务,消费者消费等等内容,那我们这次只抽取rabbitmq消费的部分,进行研读。

集成starter

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2. 客户端消费代码

2.1 消费的实现方式

如之前我们提到的集成SpringBoot后的使用方式:

@RabbitHandler
@RabbitListener(queues = "SolarWaterHeater")
 @RabbitHandler
 @RabbitListener(queuesToDeclare = @Queue("SolarWaterHeater"))
    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("SolarWaterHeater-RedWine"),
            key = "REDWINE",
            exchange = @Exchange(value = "routing-exchange", type = ExchangeTypes.DIRECT, durable = "false")))

2.2 消费中注解解释

这里面出现了两个注解

第一个:RabbitHandler 看下它的解释:

* Annotation that marks a method to be the target of a Rabbit message
* listener within a class that is annotated with {@link RabbitListener}.

如果一天类上面的注解是RabbitListener,那RabbitHandler标注的方法,即是Rabbit的消息监听者。

@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
这个注解只能标注到Method

第二个 RabbitListener

1. Annotation that marks a method to be the target of a Rabbit message listener 

标注的方法是一个消息监听者

2. When defined at the class level, a single message listener container is used to
* service all methods annotated with {@code @RabbitHandler}

如果标注到类上,那标注RabbitHandler的方法即是消息监听

链一个:@RabbitListener和@RabbitHandler的使用_sliver1836的博客-CSDN博客

2.3 推测Spring实现过程

所以,我们后续的源码分析即基于此两个注解开展。

在开始看代码之前,我们先想一想,我们之前的使用java amqp客户端开发消费逻辑的过程,

1、创建连接

2、创建Channel

3、声明队列、Exchange、绑定关系

4、监听方法实现 继承DefaultConumer

5、basic.Consume 注册到Broker

6、Broker消息推送,监听方法实现消费

那现在Spring就靠两个注解就帮我们实现了消息的消费,有没有很神奇。顿时感叹程序猿越来越幸福,写代码如此简单了呢?但有利就有弊,Spring帮我们封装的太多,而我们知道的底层却太少了。

闲话少说,到这,大家想一下,如果让你写个注解,就去实现上面6个步骤的内容,你该如何去做呢?

开发自定义注解大家都应该做过,大致的逻辑应该是不是可以,在系统启动的时候,我们就会抓取到标注注解的方法,有此类的方法时,我们认为需要使用mq,我们在后端服务中依次的去执行上面中的6个步骤。这样把注解的方法实现了监听,后续监听消息进行消费。

这里只是一个大概的推测,大家自己自行发挥想像。

3.MQ消费源码分析

从哪入手呢?首先点开 RabbitListener 的源码,然后Download源码。

到这个界面:

我们不再研读RabbitListener这个注解的功能了,大家自己看。

然后紧接着看到 RabbitListenerAnnotationBeanPostProcessor

这个类有什么特点呢?首先是处理RabbitListener 的处理类,然后呢是一个BeanPostProcessor继承了BeanPostProcessor 接口-读过Spring源码的同学,肯定就能得到最有效的信息了,这个类会在系统初始化的时候,执行postProcessAfterInitialization()这个方法。如果没读过Spring源码的话就先跟着节奏走吧。

从这开始了我们的切入。

3.1 集成SpringBoot 启动过程

接着上面的步骤呢,我们往上简单倒一下,

首先 这是一个SpringBoot 项目,通过SpringBoot 的启动类的Main 方法进行启动,然后开始扫描各个组件,初始化各种信息,这个不再细聊。【需要读SpringBoot源码】

其次呢,SpringBoot 只是对Spring 的封装,还是需要回到Spring 的类初始化的过程中去。【需要读Spring源码】

如下呢,即Spring 的核心初始化方法:无论Spring 再怎么升级,这几个核心方法基本不会怎么变化了,这里面我们找到 【registerBeanPostProcessors】,从这里面就会触发到我们上面所说的-

RabbitListenerAnnotationBeanPostProcessor

@Override
	public void refresh() throws BeansException, IllegalStateException {
		synchronized (this.startupShutdownMonitor) {
			// Prepare this context for refreshing.
			prepareRefresh();

			// Tell the subclass to refresh the internal bean factory.
			ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();

			// Prepare the bean factory for use in this context.
			prepareBeanFactory(beanFactory);

			try {
				// Allows post-processing of the bean factory in context subclasses.
				postProcessBeanFactory(beanFactory);

				// Invoke factory processors registered as beans in the context.
				invokeBeanFactoryPostProcessors(beanFactory);

				// Register bean processors that intercept bean creation.
				registerBeanPostProcessors(beanFactory);

				// Initialize message source for this context.
				initMessageSource();

				// Initialize event multicaster for this context.
				initApplicationEventMulticaster();

				// Initialize other special beans in specific context subclasses.
				onRefresh();

				// Check for listener beans and register them.
				registerListeners();

				// Instantiate all remaining (non-lazy-init) singletons.
				finishBeanFactoryInitialization(beanFactory);

				// Last step: publish corresponding event.
				finishRefresh();
			}

			catch (BeansException ex) {
				if (logger.isWarnEnabled()) {
					logger.warn("Exception encountered during context initialization - " +
							"cancelling refresh attempt: " + ex);
				}

				// Destroy already created singletons to avoid dangling resources.
				destroyBeans();

				// Reset 'active' flag.
				cancelRefresh(ex);

				// Propagate exception to caller.
				throw ex;
			}

			finally {
				// Reset common introspection caches in Spring's core, since we
				// might not ever need metadata for singleton beans anymore...
				resetCommonCaches();
			}
		}
	}

随着Spring 的启动,开始触发到了RabbitListenerAnnotationBeanPostProcessor 中的 

postProcessAfterInitialization 方法。

代码:

这就很好解释了,bean 就是我们的消费类,

解析到了 标有注解的方法 @RabbitListener,然后进行处理。processAmqpListener

@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		Class<?> targetClass = AopUtils.getTargetClass(bean);
		final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
		for (ListenerMethod lm : metadata.listenerMethods) {
			for (RabbitListener rabbitListener : lm.annotations) {
				processAmqpListener(rabbitListener, lm.method, bean, beanName);
			}
		}
		if (metadata.handlerMethods.length > 0) {
			processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
		}
		return bean;
	}
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
        // 对应的消费方法
		Method methodToUse = checkProxy(method, bean);
        //封装对象
		MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
		endpoint.setMethod(methodToUse);
        // 继续处理
		processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
	}

继续:

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
			Object adminTarget, String beanName) {
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(rabbitListener));
		endpoint.setQueueNames(resolveQueues(rabbitListener));
		endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
		endpoint.setBeanFactory(this.beanFactory);
		endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
		Object errorHandler = resolveExpression(rabbitListener.errorHandler());
		if (errorHandler instanceof RabbitListenerErrorHandler) {
			endpoint.setErrorHandler((RabbitListenerErrorHandler) errorHandler);
		}
		else if (errorHandler instanceof String) {
			String errorHandlerBeanName = (String) errorHandler;
			if (StringUtils.hasText(errorHandlerBeanName)) {
				endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
			}
		}
		else {
			throw new IllegalStateException("error handler mut be a bean name or RabbitListenerErrorHandler, not a "
					+ errorHandler.getClass().toString());
		}
		String group = rabbitListener.group();
		if (StringUtils.hasText(group)) {
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		String autoStartup = rabbitListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
		}

		endpoint.setExclusive(rabbitListener.exclusive());
		String priority = resolve(rabbitListener.priority());
		if (StringUtils.hasText(priority)) {
			try {
				endpoint.setPriority(Integer.valueOf(priority));
			}
			catch (NumberFormatException ex) {
				throw new BeanInitializationException("Invalid priority value for " +
						rabbitListener + " (must be an integer)", ex);
			}
		}
        // 以上 前面都完成了对 MethodRabbitListenerEndpoint 对象的封装,封装的也都是注解中的属性
    //此方法内部实际没执行 跳过
		resolveAdmin(endpoint, rabbitListener, adminTarget);
    //跳过
		RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, adminTarget, beanName);
    // 属性填充 放入List ,不重要

		this.registrar.registerEndpoint(endpoint, factory);
	}

程序回转:

这里面来到一个

public void afterSingletonsInstantiated() 方法,这是由于实现了接口SmartInitializingSingleton, 后续得到了处理。

这里面会涉及到两个类:

1. RabbitListenerEndpointRegistrar

2. RabbitListenerEndpointRegistry

有没有长得很像,这里面是把 RabbitListenerEndpointRegistry 手工注册到了RabbitListenerEndpointRegistrar 里面,然后进行了一系列初始化,

这里面不再详细展开了,但这个RabbitListenerEndpointRegistry 很重要,后面还会涉及到它

 RabbitListenerEndpointRegistry 实现了一个Lifecycle接口,后续会调用到它的实现start()

将对应的消费Class 做好了封装 ,返回,继续Spring的初始化过程。

来到Spring核心流程 

finishRefresh();
	/**
	 * Finish the refresh of this context, invoking the LifecycleProcessor's
	 * onRefresh() method and publishing the
	 * {@link org.springframework.context.event.ContextRefreshedEvent}.
	 */
	protected void finishRefresh() {
		// Clear context-level resource caches (such as ASM metadata from scanning).
		clearResourceCaches();

		// Initialize lifecycle processor for this context.
		initLifecycleProcessor();

		// Propagate refresh to lifecycle processor first.
		getLifecycleProcessor().onRefresh();

		// Publish the final event.
		publishEvent(new ContextRefreshedEvent(this));

		// Participate in LiveBeansView MBean, if active.
		LiveBeansView.registerApplicationContext(this);
	}

其中第三个方法

getLifecycleProcessor().onRefresh();

这个方法是获取 lifecycle的处理器,进行lifecycle接口实现类的处理,这就呼应到了上面的 RabbitListenerEndpointRegistry ,他实现了lifecycle的接口。

最终一番流转终于到了 这个Registry处理逻辑中:

	@Override
	public void start() {
		for (MessageListenerContainer listenerContainer : getListenerContainers()) {
			startIfNecessary(listenerContainer);
		}
	}
	/**
	 * Start the specified {@link MessageListenerContainer} if it should be started
	 * on startup or when start is called explicitly after startup.
	 * @param listenerContainer the container.
	 * @see MessageListenerContainer#isAutoStartup()
	 */
	private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
			listenerContainer.start();
		}
	}
MessageListenerContainer 也是在上面afterSingletonsInstantiated 处理好的,现在要启动这个监听者容器。

来到了 AbstractMessageListenerContainer 中的启动方法:

/**
	 * Start this container.
	 * @see #doStart
	 */
	@Override
	public void start() {
		if (isRunning()) {
			return;
		}
		if (!this.initialized) {
			synchronized (this.lifecycleMonitor) {
				if (!this.initialized) {
					afterPropertiesSet();
				}
			}
		}
		try {
			logger.debug("Starting Rabbit listener container.");
			configureAdminIfNeeded();
			checkMismatchedQueues();
			doStart();
		}
		catch (Exception ex) {
			throw convertRabbitAccessException(ex);
		}
		finally {
			this.lazyLoad = false;
		}
	}
configureAdminIfNeeded() 获取RabbitAdmin 
checkMismatchedQueues() 这个方法就很关键了,运行到此时打开我们的抓包工具,这里面开始创建Connection了。
protected void checkMismatchedQueues() {
		if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {
			try {
				this.amqpAdmin.initialize();
			}
			catch (AmqpConnectException e) {
				logger.info("Broker not available; cannot check queue declarations");
			}
			catch (AmqpIOException e) {
				if (RabbitUtils.isMismatchedQueueArgs(e)) {
					throw new FatalListenerStartupException("Mismatched queues", e);
				}
				else {
					logger.info("Failed to get connection during start(): " + e);
				}
			}
		}
		else {
			try {
                // 创建连接方法
				Connection connection = getConnectionFactory().createConnection(); // NOSONAR
				if (connection != null) {
					connection.close();
				}
			}
			catch (Exception e) {
				logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage());
			}
		}
	}

有没有很熟悉

Connection connection = getConnectionFactory().createConnection(); 
@Override
	public final Connection createConnection() throws AmqpException {
		if (this.stopped) {
			throw new AmqpApplicationContextClosedException(
					"The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
		}
		synchronized (this.connectionMonitor) {
			if (this.cacheMode == CacheMode.CHANNEL) {
				if (this.connection.target == null) {
					this.connection.target = super.createBareConnection();
					// invoke the listener *after* this.connection is assigned
					if (!this.checkoutPermits.containsKey(this.connection)) {
						this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
					}
					this.connection.closeNotified.set(false);
					getConnectionListener().onCreate(this.connection);
				}
				return this.connection;
			}
			else if (this.cacheMode == CacheMode.CONNECTION) {
				return connectionFromCache();
			}
		}
		return null; // NOSONAR - never reach here - exceptions
	}

运行完此步,如上的代码中,两个重要的点:

1. 此步直接就创建了Connection、

this.connection.target = super.createBareConnection();

看下抓包:

2. 继续这一步也很关键,创建完连接后,会把接下来的 Exchange、Queue、绑定关系根据注解配置中的内容,该创建的都创建一遍。

getConnectionListener().onCreate(this.connection);

直接运行到了

RabbitAdmin.initialize()

看方法头上的注释也很清晰

/**
	 * Declares all the exchanges, queues and bindings in the enclosing application context, if any. It should be safe
	 * (but unnecessary) to call this method more than once.
	 */
	@Override // NOSONAR complexity
	public void initialize() {

		if (this.applicationContext == null) {
			this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
			return;
		}

		this.logger.debug("Initializing declarations");
		Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
				this.applicationContext.getBeansOfType(Exchange.class).values());
		Collection<Queue> contextQueues = new LinkedList<Queue>(
				this.applicationContext.getBeansOfType(Queue.class).values());
		Collection<Binding> contextBindings = new LinkedList<Binding>(
				this.applicationContext.getBeansOfType(Binding.class).values());

		processLegacyCollections(contextExchanges, contextQueues, contextBindings);
		processDeclarables(contextExchanges, contextQueues, contextBindings);

		final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
		final Collection<Queue> queues = filterDeclarables(contextQueues);
		final Collection<Binding> bindings = filterDeclarables(contextBindings);

		for (Exchange exchange : exchanges) {
			if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
						+ exchange.getName()
						+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
						+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
						+ "reopening the connection.");
			}
		}

		for (Queue queue : queues) {
			if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
						+ queue.getName()
						+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
						+ queue.isExclusive() + ". "
						+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
						+ "alive, but all messages will be lost.");
			}
		}

		if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
			this.logger.debug("Nothing to declare");
			return;
		}
		this.rabbitTemplate.execute(channel -> {
			declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
			declareQueues(channel, queues.toArray(new Queue[queues.size()]));
			declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
			return null;
		});
		this.logger.debug("Declarations finished");

	}

由于我们只创建了Queue,使用默认的Exchange,代码不贴太多了,只贴声明Queue的内容:

DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(),
								queue.isExclusive(), queue.isAutoDelete(), queue.getArguments());

我们看下抓包情况:

 到此呢,Queue也声明好了。下面呢,下面就该basic.Consume 了吧,把消费者注册到Broker中去。

好,我们继续:

继续代码又倒回去,倒到:

/**
	 * Start this container.
	 * @see #doStart
	 */
	@Override
	public void start() {
		if (isRunning()) {
			return;
		}
		if (!this.initialized) {
			synchronized (this.lifecycleMonitor) {
				if (!this.initialized) {
					afterPropertiesSet();
				}
			}
		}
		try {
			logger.debug("Starting Rabbit listener container.");
			configureAdminIfNeeded();
			checkMismatchedQueues();
			doStart();
		}
		catch (Exception ex) {
			throw convertRabbitAccessException(ex);
		}
		finally {
			this.lazyLoad = false;
		}
	}
doStart(); 

一看doxxx,那一定是要干实际的事情的,很重要对吧,

我们进入到 

SimpleMessageListenerContainer

中的实现方法中:

/**
	 * Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer
	 * to this container's task executor.
	 */
	@Override
	protected void doStart() {
		checkListenerContainerAware();
		super.doStart();
		synchronized (this.consumersMonitor) {
			if (this.consumers != null) {
				throw new IllegalStateException("A stopped container should not have consumers");
			}
			int newConsumers = initializeConsumers();
			if (this.consumers == null) {
				logger.info("Consumers were initialized and then cleared " +
						"(presumably the container was stopped concurrently)");
				return;
			}
			if (newConsumers <= 0) {
				if (logger.isInfoEnabled()) {
					logger.info("Consumers are already running");
				}
				return;
			}
			Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
			for (BlockingQueueConsumer consumer : this.consumers) {
				AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
				processors.add(processor);
				getTaskExecutor().execute(processor);
				if (getApplicationEventPublisher() != null) {
					getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
				}
			}
			waitForConsumersToStart(processors);
		}
	}

前面几步意义不大,走到

int newConsumers = initializeConsumers();
protected int initializeConsumers() {
		int count = 0;
		synchronized (this.consumersMonitor) {
			if (this.consumers == null) {
				this.cancellationLock.reset();
				this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
				for (int i = 0; i < this.concurrentConsumers; i++) {
					BlockingQueueConsumer consumer = createBlockingQueueConsumer();
					this.consumers.add(consumer);
					count++;
				}
			}
		}
		return count;
	}

重点来咯,

BlockingQueueConsumer consumer = createBlockingQueueConsumer();

这里把BlockingQueueConsumer做了一个初始化,相关的不再展开。

BlockingQueueConsumer -这将是后续我们非常重要的一个类

继续重点内容,回到我们上面代码块中的内容:

for (BlockingQueueConsumer consumer : this.consumers) {
				AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
				processors.add(processor);
				getTaskExecutor().execute(processor);
				if (getApplicationEventPublisher() != null) {
					getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
				}
			}

这个for循环很重要了,由于我们是一个消费者,循环一次。

初始化一个

AsyncMessageProcessingConsumer

对象。这个对象点进去,大家看下这是个实现了Runnable接口的线程对象。哦哦,真正的核心哦。使用 SimpleAsyncTaskExecutor   来new的线程,这个执行器可不是线程池哦,来一个线程就会New一个,大家自行研究。

这里面我们可以得到一个结论,就是一个消费者,就会开启一个线程进行监听。

从此开启了新线程,【打断点记得Thread模式】

看线程的实现:

@Override // NOSONAR - complexity - many catch blocks
		public void run() { // NOSONAR - line count
			if (!isActive()) {
				return;
			}

			boolean aborted = false;

			this.consumer.setLocallyTransacted(isChannelLocallyTransacted());

			String routingLookupKey = getRoutingLookupKey();
			if (routingLookupKey != null) {
				SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSONAR both never null
			}

			if (this.consumer.getQueueCount() < 1) {
				if (logger.isDebugEnabled()) {
					logger.debug("Consumer stopping; no queues for " + this.consumer);
				}
				SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
				if (getApplicationEventPublisher() != null) {
					getApplicationEventPublisher().publishEvent(
							new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
				}
				this.start.countDown();
				return;
			}

			try {
				initialize();
				while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
					mainLoop();
				}
			}

摘出核心点:

1、initialize();

	private void initialize() throws Throwable { // NOSONAR
			try {
				redeclareElementsIfNecessary();
				this.consumer.start();
				this.start.countDown();
			}

初始化内容,

1.  redeclareElementsIfNecessary - 这个是再进行检查进行Exchange 、Queue、Binding的声明与前面声明的方法实现的共用。

2.this.consumer.start();  

public void start() throws AmqpException {
		if (logger.isDebugEnabled()) {
			logger.debug("Starting consumer " + this);
		}

		this.thread = Thread.currentThread();

		try {
			this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
					this.transactional);
			this.channel = this.resourceHolder.getChannel();
			ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel); // NOSONAR never null here
		}
		catch (AmqpAuthenticationException e) {
			throw new FatalListenerStartupException("Authentication failure", e);
		}
		this.deliveryTags.clear();
		this.activeObjectCounter.add(this);

		passiveDeclarations();
		setQosAndreateConsumers();
	}
这里面我们看这个方法就行
setQosAndreateConsumers();

Qos是设定消费时每次抓取的数量

并CreadConsumers

private void setQosAndreateConsumers() {
		if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
			// Set basicQos before calling basicConsume (otherwise if we are not acking the broker
			// will send blocks of 100 messages)
			try {
				this.channel.basicQos(this.prefetchCount);
			}
			catch (IOException e) {
				this.activeObjectCounter.release(this);
				throw new AmqpIOException(e);
			}
		}

		try {
			if (!cancelled()) {
				for (String queueName : this.queues) {
					if (!this.missingQueues.contains(queueName)) {
						consumeFromQueue(queueName);
					}
				}
			}
		}
		catch (IOException e) {
			throw RabbitExceptionTranslator.convertRabbitAccessException(e);
		}
	}

有没有很熟悉:

this.channel.basicQos(this.prefetchCount);

抓包:

继续:

consumeFromQueue(queueName);
private void consumeFromQueue(String queue) throws IOException {
		InternalConsumer consumer = new InternalConsumer(this.channel, queue);
		String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
				(this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
				this.exclusive, this.consumerArgs,
				consumer);

		if (consumerTag != null) {
			this.consumers.put(queue, consumer);
			if (logger.isDebugEnabled()) {
				logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this);
			}
		}
		else {
			logger.error("Null consumer tag received for queue " + queue);
		}
	}

 有没有很熟悉:

String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
      (this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : ""), this.noLocal,
      this.exclusive, this.consumerArgs,
      consumer);

那这里有有一个核心的类出现了。InternalConsumer

这里转向 3.2 Broker投递消息给客户端  解释

到这里呢,我们把消费者注册到了Broker中去了,看下抓包情况:

 到这呢,所以Broker也就能给我们投递消息了。

2、mainLoop();

				initialize();
				while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
					mainLoop();
				}

这里也有个mainLoop ,于是想到了,java 的amqp客户端也存在呢mainLoop ,这里的逻辑难道也和他的逻辑契合的?我们转向 3.3 客户端消费过程继续。

3.2 Broker投递消息给客户端过程

上面说到了,已经将消费者注册到了Broker中去了,但一定注意哦,注册到Broker 中的,可不是我们使用注解 RabbitListener 标注的实际消费方法哦,而是新创建了一个内部的消费者:InternalConsumer

我们看下他的一个实现

private final class InternalConsumer extends DefaultConsumer {

		private final String queueName;

		boolean canceled;

		InternalConsumer(Channel channel, String queue) {
			super(channel);
			this.queueName = queue;
		}

		@Override
		public void handleConsumeOk(String consumerTag) {
			super.handleConsumeOk(consumerTag);
			if (logger.isDebugEnabled()) {
				logger.debug("ConsumeOK: " + BlockingQueueConsumer.this);
			}
			if (BlockingQueueConsumer.this.applicationEventPublisher != null) {
				BlockingQueueConsumer.this.applicationEventPublisher
						.publishEvent(new ConsumeOkEvent(this, this.queueName, consumerTag));
			}
		}

		@Override
		public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
			if (logger.isDebugEnabled()) {
				if (RabbitUtils.isNormalShutdown(sig)) {
					logger.debug("Received shutdown signal for consumer tag=" + consumerTag + ": " + sig.getMessage());
				}
				else {
					logger.debug("Received shutdown signal for consumer tag=" + consumerTag, sig);
				}
			}
			BlockingQueueConsumer.this.shutdown = sig;
			// The delivery tags will be invalid if the channel shuts down
			BlockingQueueConsumer.this.deliveryTags.clear();
			BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);
		}

		@Override
		public void handleCancel(String consumerTag) throws IOException {
			if (logger.isWarnEnabled()) {
				logger.warn("Cancel received for " + consumerTag + " ("
						+ this.queueName
						+ "); " + BlockingQueueConsumer.this);
			}
			BlockingQueueConsumer.this.consumers.remove(this.queueName);
			if (!BlockingQueueConsumer.this.consumers.isEmpty()) {
				basicCancel(false);
			}
			else {
				BlockingQueueConsumer.this.cancelled.set(true);
			}
		}

		@Override
		public void handleCancelOk(String consumerTag) {
			if (logger.isDebugEnabled()) {
				logger.debug("Received cancelOk for tag " + consumerTag + " ("
						+ this.queueName
						+ "); " + BlockingQueueConsumer.this);
			}
			this.canceled = true;
		}

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
				byte[] body) {
			if (logger.isDebugEnabled()) {
				logger.debug("Storing delivery for consumerTag: '"
						+ consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "
						+ BlockingQueueConsumer.this);
			}
			try {
				if (BlockingQueueConsumer.this.abortStarted > 0) {
					if (!BlockingQueueConsumer.this.queue.offer(
							new Delivery(consumerTag, envelope, properties, body, this.queueName),
							BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {

						Channel channelToClose = super.getChannel();
						RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
						// Defensive - should never happen
						BlockingQueueConsumer.this.queue.clear();
						if (!this.canceled) {
							getChannel().basicCancel(consumerTag);
						}
						try {
							channelToClose.close();
						}
						catch (@SuppressWarnings("unused") TimeoutException e) {
							// no-op
						}
					}
				}
				else {
					BlockingQueueConsumer.this.queue
							.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
				}
			}
			catch (@SuppressWarnings("unused") InterruptedException e) {
				Thread.currentThread().interrupt();
			}
			catch (Exception e) {
				BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
			}
		}

		@Override
		public String toString() {
			return "InternalConsumer{" + "queue='" + this.queueName + '\'' +
					", consumerTag='" + getConsumerTag() + '\'' +
					'}';
		}

	}

哇,内部类,而且继承了 DefaultConsumer ,这和我们前面学习Rabbitmq工作模式的过程中,自己手动开发的代码一样了吧,那我找到 投递方法:

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

好亲切有木有,所以到这里真相大白咯。Broker将消息投递到了这里,我们看看他接收到消息搞什么动作?

BlockingQueueConsumer.this.queue
							.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));

很明显,和java amqp client 实现一样,他这也用到了Queue,去存储了,

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);

也是个阻塞Queue哦,看来spring搞了一通,从客户端那边的queue里拿来,又放了一次queue。

那放进去了,就等着取呗,看谁来取咯。

3.3 客户端消费过程

接续上面的 mainLoop(),既然消息又存到了本地的queue中,那mainLoop 的目的岂不是很明确了,那就是死循环的去取消息消息,然后再转调到我们实际的 加入@RabbitListener 的方法中去呢。究竟是不是呢,验证下:

private void mainLoop() throws Exception { // NOSONAR Exception
			try {
				boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
				if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
					checkAdjust(receivedOk);
				}
				long idleEventInterval = getIdleEventInterval();
				if (idleEventInterval > 0) {
					if (receivedOk) {
						updateLastReceive();
					}
					else {
						long now = System.currentTimeMillis();
						long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
						long lastReceive = getLastReceive();
						if (now > lastReceive + idleEventInterval
								&& now > lastAlertAt + idleEventInterval
								&& SimpleMessageListenerContainer.this.lastNoMessageAlert
								.compareAndSet(lastAlertAt, now)) {
							publishIdleContainerEvent(now - lastReceive);
						}
					}
				}
			}
			catch (ListenerExecutionFailedException ex) {
				// Continue to process, otherwise re-throw
				if (ex.getCause() instanceof NoSuchMethodException) {
					throw new FatalListenerExecutionException("Invalid listener", ex);
				}
			}
			catch (AmqpRejectAndDontRequeueException rejectEx) {
				/*
				 *  These will normally be wrapped by an LEFE if thrown by the
				 *  listener, but we will also honor it if thrown by an
				 *  error handler.
				 */
			}
		}

看下重点方法:

boolean receivedOk = receiveAndExecute(this.consumer); 
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR

		PlatformTransactionManager transactionManager = getTransactionManager();
		if (transactionManager != null) {
			try {
				if (this.transactionTemplate == null) {
					this.transactionTemplate =
							new TransactionTemplate(transactionManager, getTransactionAttribute());
				}
				return this.transactionTemplate
						.execute(status -> { // NOSONAR null never returned
							RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
									new RabbitResourceHolder(consumer.getChannel(), false),
									getConnectionFactory(), true);
							// unbound in ResourceHolderSynchronization.beforeCompletion()
							try {
								return doReceiveAndExecute(consumer);
							}
							catch (RuntimeException e1) {
								prepareHolderForRollback(resourceHolder, e1);
								throw e1;
							}
							catch (Exception e2) {
								throw new WrappedTransactionException(e2);
							}
						});
			}
			catch (WrappedTransactionException e) { // NOSONAR exception flow control
				throw (Exception) e.getCause();
			}
		}

		return doReceiveAndExecute(consumer);

	}

抛开事务,我们不关注。

return doReceiveAndExecute(consumer);
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR

		Channel channel = consumer.getChannel();

		for (int i = 0; i < this.txSize; i++) {

			logger.trace("Waiting for message from consumer.");
			Message message = consumer.nextMessage(this.receiveTimeout);
			if (message == null) {
				break;
			}
			try {
				executeListener(channel, message);
			}

重点哦:

			Message message = consumer.nextMessage(this.receiveTimeout);

从内部消费者取消息咯

public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
		if (logger.isTraceEnabled()) {
			logger.trace("Retrieving delivery for " + this);
		}
		checkShutdown();
		if (this.missingQueues.size() > 0) {
			checkMissingQueues();
		}
		Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
		if (message == null && this.cancelled.get()) {
			throw new ConsumerCancelledException();
		}
		return message;
	}

看到poll 我们就放心了,把消息取出来,包装成Message对象。

快调头回来,继续看:

try {
    executeListener(channel, message);
}

这就要真正处理这个消息了

protected void executeListener(Channel channel, Message messageIn) {
		if (!isRunning()) {
			if (logger.isWarnEnabled()) {
				logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
			}
			throw new MessageRejectedWhileStoppingException();
		}
		try {
			doExecuteListener(channel, messageIn);
		}
		catch (RuntimeException ex) {
			if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
				if (this.statefulRetryFatalWithNullMessageId) {
					throw new FatalListenerExecutionException(
							"Illegal null id in message. Failed to manage retry for message: " + messageIn, ex);
				}
				else {
					throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",
							new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),
							messageIn);
				}
			}
			handleListenerException(ex);
			throw ex;
		}
	}

代码不往下贴了,继续追就可以,最终还是找到了,打标@RabbitListener的那个方法上,得到了执行。真正让业务逻辑执行到了MQ推送过来的消息,

太不容易了,消息从发送-> Exchange->Queue -> java amqp client  ->spring client - >consume 最终得到了消费。

4. 总结

小结一下,我们从注解RabbitHandler RabbitListener 入手,一步步追踪到 与Broker链接的创建,Queue的声明,接着,启动新线程 注册一个内部的消费者到Broker中,Broker有消息的时候会推送到本地的BlockingQueue中去。

使用MainLoop 消费本地Blockinqueue的内容

贴个小图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/27373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】线程安全

文章目录1.线程互斥1.1.线程间互斥的相关概念1.2互斥量1.3互斥量接口1.4互斥量实现原理2.可重入VS线程安全3.常见锁概念3.1死锁3.2常见死锁情况3.2.1情况一&#xff1a;忘记释放锁3.2.2情况二&#xff1a;线程重复申请锁3.2.3情况三&#xff1a;双线程多锁申请3.3锁的相关概念4…

m在VBLAST协作MIMO系统分部使用LDPC,Turbo,卷积三种信道编译码进行误码率matlab仿真

目录 1.算法描述 2.仿真效果预览 3.MATLAB部分代码预览 4.完整MATLAB程序 1.算法描述 从上面的结构可知&#xff0c;整个卷积编码的结构可由CRC校验&#xff0c;卷积编码&#xff0c;打孔组成&#xff0c;其中打孔的作用就是讲卷积编码后的码率变为所需要的码率进行发送。 …

一种在行末隐藏有效载荷的新供应链攻击技术研判

近期&#xff0c;Phylum检测到数十个新发布的Pypi软件包执行供应链攻击&#xff0c;在这些软件包中&#xff0c;通过隐藏的__import__将窃取程序投递到开发人员的机器上。攻击者利用代码审核者所使用IDE默认的不换行代码显示设置隐藏自身的行为与载荷&#xff0c;本文将就其中出…

栈简介、手写顺序栈、手写链栈和栈的应用

一. 简介 1. 什么是栈&#xff1f; 栈是一种只能从表的一端存取数据且遵循 "先进后出"&#xff08;"后进先出"&#xff09; 原则的线性存储结构。栈也是用来存储逻辑关系为 "一对一" 数据的线性存储结构。 C#中提供顺序栈&#xff1a;Stack&…

【MySQL基础】如何安装MySQL?如何将MySQL设置成服务?

目录 一、MySQL的安装 1、解压配置 2、步骤安装 &#x1f49f; 创作不易&#xff0c;不妨点赞&#x1f49a;评论❤️收藏&#x1f499;一下 一、MySQL的安装 MySQL的安装有两种方式&#xff1a;解压配置和步骤安装 1、解压配置 需提前从官网直接下载压缩包&#xff0c;进…

【MySQL篇】第二篇——库的操作

目录 创建数据库 创建数据库案例 字符集和校验规则 查看系统默认字符集以及校验规则 查看数据库支持的字符集 查看数据库支持的字符集校验规则 校验规则对数据库的影响 操纵数据库 查看数据库 显示创建语句 修改数据库 数据库删除 备份和恢复 备份 还原 注意事…

常见磁盘调度算法总结

磁盘调度算法&#x1f4d6;1. 最短寻道时间优先&#xff08;SSTF&#xff09;&#x1f4d6;2. 电梯算法&#xff08;SCAN或C-SCAN&#xff09;&#x1f4d6;3. 最短定位时间优先&#xff08;SPTF&#xff09;&#x1f4d6;4. 总结由于IO的高成本&#xff0c;操作系统在决定发送…

C语言 0 —— 计算机硬件架构及信息在计算机中的表示

当前的计算机系统&#xff0c;如Window &#xff0c;Linux&#xff0c;Mac 基本都是基于冯诺依曼的驱动架构设计的。 冯诺依曼架构输入设备先输入公式&#xff0c;给运算器&#xff0c;运算器先算 先算2*5 &#xff0c;临时放在CPU内部寄存器中&#xff0c;寄存器不够用的时候会…

vscode插件开发(四)Webview(1)

上一篇详细讲解了命令&#xff0c;这回我们一起来看一下Webview。vscode的插件其实可以分为两种&#xff0c;一种是webview插件&#xff0c;另一种是非webview插件。 webview插件的自由度很高&#xff0c;可以满足开发者的各种定制化的要求&#xff1b;而非webview插件只能使用…

我悟了!Mysql事务隔离级别其实是这样!

问题描述 ​ 最近几天在忙项目&#xff0c;有个项目是将业务收集到的数据变动&#xff0c;异步同步到一张数据表中。在测试的过程时&#xff0c;收到QA的反馈&#xff0c;说有订单的数据同步时好时坏。我怀着疑惑的表情打开了那段代码&#xff0c;它的逻辑大概是这样的&#x…

Zookeeper实现分布式锁的原理。

之前学习Redis时候&#xff0c;我们利用Redis实现了分布式锁。 黑马点评项目Redis实现分布式锁_兜兜转转m的博客-CSDN博客 为什么提出了分布式锁的概念呢&#xff1f; 因为在单体项目中&#xff0c;锁是基于JVM虚拟机实现的&#xff0c;在分布式情况下&#xff0c;JVM就不唯…

FullGC频繁,线程数持续增长排查

告警 线上应用fullgc频繁&#xff0c;收到告警 GC监控—堆内存不足 查看近12小时的监控&#xff0c;发现Survivor区一直处于 满状态、fullgc非常频繁、但没有内存溢出的现象&#xff0c;很明显是堆内存不足 GC日志分析—暂停时间并不长 因为fullgc相当频繁&#xff0c;抽…

项目管理(知识体系概述)

项目的定义:为创造独特的产品、服务或者成果进行的临时性工作。 项目的特性:1、独特的产品、服务、成果;2、临时性工作。 项目管理的目的(为了解决什么问题): 1、达成业务目标 2、满足相关方期望 3、提供项目的可预测性 4、提高项目的成果性。 5、在适当的时刻交付…

机器人运动学标定:基于考虑约束的指数积的运动学标定方法——只需要测量位置,避免冗余约束

文章目录写在前面为什么要消除归一化和正交化操作&#xff1f;只用位置而不是位姿去做标定的原因基于消除冗余约束步骤的参数辨识模型分析参考文献写在前面 基于指数积的运动学标定方法介绍&#xff1a; 机器人运动学标定&#xff1a;基于指数积的串联机构运动学标定 机器人运…

Vue表单修饰符:v-model.lazy、v-model.number、v-model.trim

表单修饰符有&#xff1a;lazy、number、trim&#xff1b;修饰符加在v-model后面&#xff1b; lazy修饰符&#xff1a; v-model的作用是双向绑定表单&#xff0c;能获取到input输入框的值&#xff0c;而且是实时获取的&#xff0c;就是当你输入框里的值发生改变就会获取到&…

【Shell 脚本速成】02、Shell 变量详解

目录 一、变量介绍 变量存取原理 二、变量定义 2.1 什么时候需要定义变量&#xff1f; 2.2 定义一个变量 定义变量举例&#xff1a; 定义变量演示&#xff1a; 2.3 取消变量 unset 2.4 有类型变量 declare declare 命令参数&#xff1a; 案例演示&#xff1a; 三…

向前迈进!走入GC世界:G1 GC原理深入解析

第零章&#xff1a;名词解释 mutator&#xff1a;应用线程 STW&#xff1a;Stop-The-World&#xff0c;指除了GC线程&#xff0c;其它所有线程全部暂停的一段时间 并发&#xff1a;指代GC线程与mutator在同一时刻执行任务 并行&#xff1a;指代多个GC线程在同一时刻执行任务…

一站式元数据治理平台——Datahub

一站式元数据治理平台——Datahub万字保姆级长文——Linkedin元数据管理平台Datahub离线安装指南 - 独孤风 - 博客园 (cnblogs.com)企业级数据治理工作怎么开展&#xff1f;Datahub这样做 - 独孤风 - 博客园 (cnblogs.com)【DataHub】 现代数据栈的元数据平台–如何与spark集成…

如何设计金融机构多场景关键应用下的存储架构

【摘要】银行、保险等金融机构存在多场景下的关键应用,如何选择适合各场景下的存储,如何设计适合业务的存储架构,显得尤为重要。本文从当前主流存储架构分析入手,提出金融机构业务场景分析与架构选型思路,以Glusterfs为例,分享如何根据业务场景的特点,有针对性的选取适合…

SQL优化

文章目录提升group by的效率分页查询优化覆盖索引子查询起始位置重定义检查 where,order by,group by后面的列尽量使用 varchar 代替 char。&#xff08;SQL 性能优化&#xff09;如果修改 / 更新数据过多&#xff0c;考虑批量进行提升group by的效率 select user_id,user_nam…