rabbitmq第二课-RabbitMQ核心编程模型以及消息应用场景详解

news2025/1/17 6:13:14

一、回顾RabbitMQ基础概念

在这里插入图片描述

二、RabbitMQ基础编程模型

使用RabbitMQ提供的原生客户端API进行交互。这是使用RabbitMQ的基础。

1.1、maven依赖

<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.9.0</version>
</dependency>

1.2、基础编程模型

1.首先创建连接,获取Channel

	private static final String HOST_NAME="192.168.56.10";
	private static final int HOST_PORT=5672;
	ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(HOST_NAME);
			factory.setPort(HOST_PORT);
			factory.setUsername("guest");
			factory.setPassword("guest");
			factory.setVirtualHost("/");
			connection = factory.newConnection();

2.声明Exchange-可选

channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException

3、声明queue

channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);

4、声明Exchange与Queue的绑定关系-可选

channel.queueBind(String queue, String exchange, String routingKey) throws IOException;

总结:说白了,就是声明一个交换机和队列,然后进行绑定,至于Channel和连接connection实际是一种物理概念,连接rabbitmq使用的,
当然Channel也可以声明多个,但是需要注意命名!

  • 在创建channel时,可以在createChannel方法中传入一个分配的int参数channelNumber。这个ChannelNumber就会作为Channel的唯一标识。而RabbitMQ防止ChannelNumber重复的方式是:如果对应的Channel没有创建过,就会创建一个新的Channel。但是如果ChannelNumber已经创建过一个Channel了,这时就会返回一个null。

5、Producer根据应用场景发送消息到queue

channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8")) ;

6、Consumer消费消息

  • 1、被动消费模式

    Consumer等待rabbitMQ 服务器将message推送过来再消费。一般是启一个一直挂起的线程来等待。

channel.basicConsume(String queue, boolean autoAck, Consumer callback);
  • 另一种是主动消费模式
    Comsumer主动到rabbitMQ服务器上去拉取messge进行消费。
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck)

其中需要注意点的是autoAck。autoAck为true则表示消息被Consumer消费成功后,后续就无法再消费了。而如果autoAck设置为false,就需要在处理过程中手动去调用channel的basicAck方法进行应答。如果
不应答的话,这个消息同样会继续被Consumer重复处理。所以这里要注意,如果消费者一直不对消息进行应答,那么消息就会不断的发起重试,这就会不断的消耗系统资源,最终造成服务宕机。

7、完成以后关闭连接,释放资源

channel.close();
conection.clouse();

三、RabbitMQ常用的消息场景

在这里插入图片描述
在这里插入图片描述

  1. hello world体验
    在这里插入图片描述
    最直接的方式,P端发送一个消息到一个指定的queue,中间不需要任何exchange规则。C端按queue方式进行消费
//生产者
public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//声明队列会在服务端自动创建。
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		String message = "Hello World!333";

		AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
		builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
		builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
		//携带消息ID
		builder.messageId(""+channel.getNextPublishSeqNo());
		Map<String, Object> headers = new HashMap<>();
		//携带订单号
		headers.put("order", "123");
		builder.headers(headers);

		channel.basicPublish("", QUEUE_NAME, builder.build(), message.getBytes("UTF-8"));
		System.out.println(" [x] Sent '" + message + "'");
		
		channel.close();
		connection.close();
	}
//消费者
public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
		//Consumer接口还一个实现QueueConsuemr 但是代码注释过期了。
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				System.out.println("messageId:"+properties.getMessageId());
				properties.getHeaders().forEach((key,value)-> System.out.println("key: "+key +"; value: "+value));
				// (process the message components here ...)
				 //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
				 //没有答复过的消息,服务器会一直不停转发。
				 channel.basicAck(deliveryTag, false);
			}
		};
		
		channel.basicConsume(QUEUE_NAME, false, myconsumer);
	}
  1. Work queues 工作序列
    在这里插入图片描述

工作任务模式,领导部署一个任务,由下面的一个员工来处理,Producer消息发送给queue,多个Consumer同时往队列上消费消息。

/**
	 * 发布一个task,交由多个Worker去处理。 每个task只要由一个Worker完成就行。
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
		for (int i = 0;i<5;i++){
			String message = "task 1";
			channel.basicPublish("", RabbitMQUtil.QUEUE_WORK,
					MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

		}

		channel.close();
		connection.close();
	}
//消费者1
public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//这个任务场景一般任务不能因为rabbitmq崩溃而消失,所以把第二个是否持久化设置成true。
		//这样,即使rabbitmq服务重启,任务不会丢失
		channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
		//每个worker同时最多只处理一个消息
		channel.basicQos(1);
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 channel.basicAck(deliveryTag, false);
				}
			};
			channel.basicConsume(RabbitMQUtil.QUEUE_WORK, myconsumer);
	}
//消费者2
public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//这个任务场景一般任务不能因为rabbitmq崩溃而消失,所以把第二个是否持久化设置成true。
		//这样,即使rabbitmq服务重启,任务不会丢失
		channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
		//每个worker同时最多只处理一个消息
		channel.basicQos(1);
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 channel.basicAck(deliveryTag, false);
				}
			};
			channel.basicConsume(RabbitMQUtil.QUEUE_WORK, myconsumer);
	}

在这里插入图片描述
在这里插入图片描述
这个模式应该是最常用的模式,也是官网讨论比较详细的一种模式,所以官网上也对这种模式做了重点讲述。

  1. 首先。Consumer端的autoAck字段设置的是false,这表示consumer在接收到消息后不会自动反馈服务器已消费了message,而要改在对message处理完成了之后,再调用channel.basicAck来通知服务器已经消费了该message.这样即使Consumer在执行message过程中出问题了,也不会造成message被忽略,因为没有ack的message会被服务器重新进行投递。
    但是,这其中也要注意一个很常见的BUG,就是如果所有的consumer都忘记调用basicAck()了,就会造成message被不停的分发,也就造成不断的消耗系统资源。这也就是 Poison Message(毒消息)
  2. 其次,官方特意提到的message的持久性。关键的message不能因为服务出现问题而被忽略。还要注意,官方特意提到,所有的queue是不能被多次定义的。如果一个queue在开始时被声明为durable,那在后面再次声明这个queue时,即使声明为 not durable,那这个queue的结果也还是durable的。
  3. 然后,是中间件最为关键的分发方式。这里,RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而他们改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。

3.Publish/Subscribe 订阅 发布 机制

在这里插入图片描述

这个机制是对上面的一种补充。也就是把preducer与Consumer进行进一步的解耦。producer只负责发送消息,至于消息进入哪个queue,由exchange来分配。如上图,就是把producer发送的消息,交由exchange同时发送到两个queue里,然后由不同的Consumer去进行消费。

/**
	 * exchange有四种类型, fanout topic headers direct
	 * fanout类型的exchange会往其上绑定的所有queue转发消息。
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		String message = "LOG INFO 222";
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		
		channel.close();
		connection.close();
		
		
	}
public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		
	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    String queueName = channel.queueDeclare().getQueue();
		System.out.println(queueName);
	    channel.queueBind(queueName, EXCHANGE_NAME, "");

		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
				 //没有答复过的消息,服务器会一直不停转发。
//				 channel.basicAck(deliveryTag, false);
			}
		};
		
		channel.basicConsume(queueName,true, myconsumer);
		
//		channel.close();
//		connection.close();
	}

关键处就是type为”fanout” 的exchange,这种类型的exchange只负责往所有已绑定的队列上发送消息。

  1. Routing 基于内容的路由
    type为”direct” 的exchange
    在这里插入图片描述
    这种模式一看图就清晰了。 在上一章 exchange 往所有队列发送消息的基础上,增加一个路由配置,指定exchange如何将不同类别的消息分发到不同的queue上。
/**
	 * exchange有四种类型, fanout topic headers direct
	 * direct类型的exchange会根据routingkey,将消息转发到该exchange上绑定了该routingkey的所有queue
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		String message = "LOG INFO 44444";
		channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
		channel.basicPublish(EXCHANGE_NAME, "debug", null, message.getBytes());
		channel.basicPublish(EXCHANGE_NAME, "warn", null, message.getBytes());

		channel.close();
		connection.close();
		
		
	}
 public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//        String queueName = channel.queueDeclare().getQueue();
        String queueName="direct_queue";
        channel.queueDeclare(queueName,false,false,false,null);

//        channel.queueBind(queueName, EXCHANGE_NAME, "info");
//        channel.queueBind(queueName, EXCHANGE_NAME, "debug");
        channel.queueBind(queueName, EXCHANGE_NAME, "warn");
        Consumer myconsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("========================");
                String routingKey = envelope.getRoutingKey();
                System.out.println("routingKey >" + routingKey);
                String contentType = properties.getContentType();
                System.out.println("contentType >" + contentType);
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("deliveryTag >" + deliveryTag);
                System.out.println("content:" + new String(body, "UTF-8"));
                // (process the message components here ...)
                //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
                //没有答复过的消息,服务器会一直不停转发。
//				 channel.basicAck(deliveryTag, false);
            }
        };
        channel.basicConsume(queueName, false, myconsumer);
    }

5:Topics 基于话题的路由
type为"topic" 的exchange
在这里插入图片描述
这个模式也就在上一个模式的基础上,对routingKey进行了模糊匹配
单词之间用,隔开,* 代表一个具体的单词。# 代表0个或多个单词。

/**
	 * exchange有四种类型, fanout topic headers direct
	 * topic类型的exchange在根据routingkey转发消息时,可以对rouytingkey做一定的规则,比如anonymous.info可以被*.info匹配到。
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		String message = "LOG INFO";
		channel.basicPublish(EXCHANGE_NAME, "anonymous.info", null, message.getBytes());
		channel.basicPublish(EXCHANGE_NAME, "tuling.loulan.debug", null, message.getBytes());

		channel.close();
		connection.close();
	}
public static void main(String[] args) throws Exception {
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		
	    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
	    String queueName = channel.queueDeclare().getQueue();
	    //topic的routingkey,*代表一个具体的单词,#代表0个或多个单词。
	    channel.queueBind(queueName, EXCHANGE_NAME, "*.info");
	    channel.queueBind(queueName, EXCHANGE_NAME, "#.debug");
	    
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				 System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
				 //没有答复过的消息,服务器会一直不停转发。
//				 channel.basicAck(deliveryTag, false);
			}
		};
		channel.basicConsume(queueName,true, myconsumer);
	}

6:Publisher Confirms 发送者消息确认
RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。但是发送者发送消息是否成功是没有保证的
Producer.basicPublish方法是没有返回值的,也就是说,一次发送消息是否成功,应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者提供一些确认机制,来保证这个消息发送的过程是成功的。

发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。

channel.confirmSelect();
  1. 发布单条消息
    即发布一条消息就确认一条消息。核心代码:
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
channel.basicPublish("", queue, null, body.getBytes());
channel.waitForConfirmsOrDie(5_000);
}

channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。

  1. 发送批量消息
    之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认。
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
		String body = String.valueOf(i);
		ch.basicPublish("", queue, null, body.getBytes());
		outstandingMessageCount++;
	if (outstandingMessageCount == batchSize) {
		ch.waitForConfirmsOrDie(5_000);
		outstandingMessageCount = 0;
	}
}
if (outstandingMessageCount > 0) {
	ch.waitForConfirmsOrDie(5_000);
}

这种方式可以稍微缓解下发送者确认模式对吞吐量的影响。但是也有个固有的问题就是,当确认出现异常时,发送者只能知道是这一批消息出问题了, 而无法确认具体是哪一条消息出了问题。所以接下来就需要增
加一个机制能够具体对每一条发送出错的消息进行处理

  1. 异步确认消息
    实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。核心代码就是一个:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)

按说监听只要注册一个就可以了,那为什么这里要注册两个呢?成功一个,失败一个然后关于这个ConfirmCallback,这是个监听器接口,里面只有一个方法: void handle(long sequenceNumber, boolean multiple) throws IOException; 这方法中的两个参数,

  • sequenceNumer:这个是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,他的消息体只是一个二进制数组,默认消息是没有序列号的。那么在回调的时候,Producer怎么知道是哪一条消息成功或者失败呢?RabbitMQ提供了一个方法
    int sequenceNumber =channel.getNextPublishSeqNo();
    来生成一个全局递增的序列号,这个序列号将会分配给新发送的那一条消息。然后应用程序需要自己来将这个序列号与消息对应起来。
  • multiple:这个是一个Boolean型的参数。如果是false,就表示这一次只确认了当前一条消息。如果是true,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了。
    这三种确认机制都能够提升Producer发送消息的安全性。通常情况下,第三种异步确认机制的性能是最好的。

7、Headers 头部路由机制
在官网的体验示例中,还有一种路由策略并没有提及,那就是Headers路由。其实官网之所以没有过多介绍,就是因为这种策略在实际中用得比较少,但是在某些比较特殊的业务场景,还是挺好用的。

/**
	 * exchange有四种类型, fanout topic headers direct
	 * headers用得比较少,他是根据头信息来判断转发路由规则。头信息可以理解为一个Map
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{
		// header模式不需要routingKey来转发,他是根据header里的信息来转发的。比如消费者可以只订阅logLevel=info的消息。
		// 然而,消息发送的API还是需要一个routingKey。 
		// 如果使用header模式来转发消息,routingKey可以用来存放其他的业务消息,客户端接收时依然能接收到这个routingKey消息。
		String routingKey = "ourTestRoutingKey";
		// The map for the headers.
		Map<String, Object> headers = new HashMap<>();
		headers.put("loglevel", "info");
		headers.put("buslevel", "product");
		headers.put("syslevel", "admin");
		
		String message = "LOG INFO asdfasdf";
		
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
		
		AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 
		builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
		builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
		builder.headers(headers);

		channel.basicPublish(EXCHANGE_NAME, routingKey, builder.build(), message.getBytes("UTF-8"));

//		channel.txSelect();
//		channel.txCommit();
//		channel.txRollback();

		channel.close();
		connection.close();
	}
public static void main(String[] args) throws Exception {
		
		String routingKey= "ourTestRoutingKey";
		x-match:特定的参数。all表示必须全部匹配才算成功。
		//any表示只要匹配一个就算成功。
		Map<String, Object> headers = new HashMap<String, Object>();
		headers.put("x-match","any");
//		headers.put("loglevel", "info");
		headers.put("buslevel", "product");
//		headers.put("syslevel", "admin");
		
		Connection connection = RabbitMQUtil.getConnection();
		Channel channel = connection.createChannel();
		
	    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
	    String queueName = channel.queueDeclare("ReceiverHeader",true,false,false,null).getQueue();
	    
	    channel.queueBind(queueName, EXCHANGE_NAME,routingKey,headers);
	    
		Consumer myconsumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					BasicProperties properties, byte[] body)
					throws IOException {
				 System.out.println("========================");
				 String routingKey = envelope.getRoutingKey();
				 System.out.println("routingKey >"+routingKey);
				 String contentType = properties.getContentType();
				 System.out.println("contentType >"+contentType);
				 long deliveryTag = envelope.getDeliveryTag();
				 System.out.println("deliveryTag >"+deliveryTag);
				Map<String, Object> headerInfo = properties.getHeaders();
				headerInfo.forEach((key,value)-> System.out.println("header key: "+key+"; value: "+value));
				System.out.println("content:"+new String(body,"UTF-8"));
				 // (process the message components here ...)
				 //消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
				 //没有答复过的消息,服务器会一直不停转发。
				 channel.basicAck(deliveryTag, false);
			}
		};
		
		String consumerTag = channel.basicConsume(queueName,true, myconsumer);
		System.out.println("consumerTag > "+consumerTag);
	}

四、SpringBoot集成RabbitMQ

SpringBoot官方就集成了RabbitMQ,所以RabbitMQ与SpringBoot的集成是非常简单的。不过,SpringBoot集成RabbitMQ的方式是按照Spring的一套统一的MQ模型创建的,因此SpringBoot集成插件中对于生产者、消息、消费者等重要的对象模型,与RabbitMQ原生的各个组件有对应关系,但是并不完全相同。

  1. 引入依赖
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置关键参数
server.port=8080
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

# 单词推送消息数量
spring.rabbitmq.listener.simple.prefetch=1
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency=5
# 消费者的最大线程数量
spring.rabbitmq.listener.simple.max-concurrency=10

spring.rabbitmq.listener.simple.acknowledge-mode=none
# 2.6以上版本SpringBoot集成Swagger2要修改配置。坑爹
spring.mvc.pathmatch.matching-strategy=ant_path_matcher

3: 声明Exchange,Queue和Binding
所有的exchange, queue, binding的配置,都需要以对象的方式声明。默认情况下,这些业务对象一经声明,应用就会自动到RabbitMQ上常见对应的业务对象。但是也是可以配置成绑定已有业务对象的。

/**
 * 直连模式只需要声明队列,所有消息都通过队列转发。
 * @author roykingw
 */
@Configuration
public class DirectConfig {

	@Bean
	public Queue directQueue() {
		return new Queue(MyConstants.QUEUE_DIRECT);
	}
}


  1. 使用RabbitmqTemplate对象发送消息
@ApiOperation(value="direct发送接口",notes="直接发送到队列。task模式")
	@GetMapping(value="/directSend")
	public Object directSend(String message) throws AmqpException, UnsupportedEncodingException {
		//设置部分请求参数
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		messageProperties.setPriority(2);
		//设置消息转换器,如json
		rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
		//将对象转换成json再发送。
//		rabbitTemplate.convertandsend("",Object);
		//发消息
		rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));
		return "message sended : "+message;
	}
  1. 使用@RabbitListener注解声明消费者
    消费者都是通过@RabbitListener注解来声明。在@RabbitMQListener注解中包含了非常多对Queue进行定制的属性,大部分的属性都是有默认值的。例如ackMode默认是null,就表示自动应答。在日常开发过程中,通常都会简化业务模型,让消费者只要绑定队列消费即可。
@RabbitListener(queues=MyConstants.QUEUE_DIRECT)
	public void directReceive2(String message) {
		System.out.println("consumer2 received message : " +message);
	}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/678378.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言:使用指针使字符串逆序

题目&#xff1a; 链接&#xff1a;字符逆序__牛客网 来源&#xff1a;牛客网 将一个字符串str的内容颠倒过来&#xff0c;并输出。 输入描述: 输入一个字符串&#xff0c;可以有空格 输出描述: 输出逆序的字符串 示例1 输入 I am a student 输出 tneduts a ma I …

VUE 2X 生命周期 ⑩①

目录 文章有误请指正&#xff0c;如果觉得对你有用&#xff0c;请点三连一波&#xff0c;蟹蟹支持✨ V u e j s Vuejs Vuejs初见生面周期分析生命周期生命周期总结总结 文章有误请指正&#xff0c;如果觉得对你有用&#xff0c;请点三连一波&#xff0c;蟹蟹支持✨ ⡖⠒⠒⠒⠤⢄…

《算法设计与分析》学习笔记

目录 算法基本概念 算法的定义 算法复杂度分析 渐近记号 ①渐近上界记号O ②渐近下界记号Ω ③渐近紧确界记号 Θ ④非渐近紧确上界记号o ⑤非渐近紧确下界记号ω 渐进记号极限定义 分治 分治步骤 递归树 ​编辑代入法 主方法 改变变量 二叉树 堆 建堆 堆排…

【promptulate专栏】使用GPT和XMind快速构建思维导图

本文节选自笔者博客&#xff1a;https://www.blog.zeeland.cn/archives/ao302950h3j &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是Zeeland&#xff0c;全栈领域优质创作者。&#x1f4dd; CSDN主页&#xff1a;Zeeland&#x1f525;&#x1f4e3; 我的博客&#…

【Java】缓存常见问题及解决方式

文章目录 一、缓存常见问题二、数据不一致2.1、一致性问题2.2、解决方案 三、缓存穿透3.1、问题3.2、解决方案布隆过滤器使用布隆过滤器解决缓存穿透 四、缓存击穿4.1、问题4.2、解决方案 五、缓存雪崩5.1、问题5.2、解决方案 六、大key及热点key6.1、问题6.2、解决方案大key热…

【Leetcode刷题】字符串匹配

本篇文章为LeetCode 字符串匹配模块的刷题笔记&#xff0c;仅供参考。 目录 Leetcode28.找出字符串中第一个匹配项的下标Leetcode214.最短回文串Leetcode459.重复的子字符串Leetcode686.重复叠加字符串匹配Leetcode1023.驼峰式匹配Leetcode1392.最长快乐前缀Leetcode1668.最大重…

【SpringBoot】一、SpringBoot3改变新特性

前言 本文适合具有springboot的基础的同学。 SpringBoot3改变&新特性 一、前置条件二、自动配置包位置变化1、Springboot2.X2、Springboot3.X 三、jakata api迁移1、Springboot2.X2、Springboot3.X3、SpringBoot3使用druid有问题&#xff0c;因为它引用的是旧的包 四 新特…

App Crawler

Google官方出了一款App遍历工具App Crawler。 文档&#xff1a;应用抓取工具 | Android 开发者 | Android Developers App Crawler工具是Android Jetpack的一部分&#xff0c;它可自动的运行你的App&#xff0c;不需要编写或维护任何代码。 通过App Crawler运行App&…

实训四:索引与视图 - 索引(teachingdb数据库)

索引与数据库完整性 第1关&#xff1a;索引任务描述相关知识索引是什么索引的分类索引的创建和删除查询表中索引 编程要求参考代码 第2关&#xff1a;删除索引-练习任务描述相关知识编程要求测试说明参考代码 第1关&#xff1a;索引 任务描述 本关任务&#xff1a;为 student…

【Leetcode60天带刷】day21二叉树——530.二叉搜索树的最小绝对差 ,501.二叉搜索树中的众数 ,236. 二叉树的最近公共祖先

题目&#xff1a; 530. 二叉搜索树的最小绝对差 给你一个二叉搜索树的根节点 root &#xff0c;返回 树中任意两不同节点值之间的最小差值 。 差值是一个正数&#xff0c;其数值等于两值之差的绝对值。 示例 1&#xff1a; 输入&#xff1a;root [4,2,6,1,3] 输出&#xff1…

chatgpt赋能python:Python与电影评分

Python与电影评分 近年来&#xff0c;越来越多的人选择通过网络来观看电影。然而&#xff0c;在选择一部电影时&#xff0c;看到的只是电影名称和海报。这时就需要借助电影评分来给自己做出更明智的选择。Python作为一门流行的编程语言&#xff0c;它的应用程序提供了许多有用…

图形视图体系结构(Graphics View)

Graphics View框架结构的主要特点 Graphics View框架结构的主要特点如下。 &#xff08;1&#xff09;在Graphics View框架结构中&#xff0c;系统可以利用Qt绘图系统的反锯齿、OpenGL工具来改善绘图性能。 &#xff08;2&#xff09;Graphics View支持事件传播体系结构&…

利用Charles进行Mock测试

一、Charles介绍 Charles是一款用Java编写的代理软件&#xff0c;电脑或者手机访问网站首先会访问到Charles代理工具上&#xff0c;由代理工具再把访问数据转发到相应的网站上&#xff0c;所以可以很好的通过设置Charles&#xff0c;对接口的请求和响应进行加工处理。 …

【Linux】Linux权限的概念、Linux权限管理、文件类型和访问权限的设置、粘滞位介绍

文章目录 1.Linux权限的概念2.Linux权限管理2.1文件访问者的分类2.2文件类型的访问权限2.3文件权限值的表示方法2.4文件访问权限的相关设置方法 3.目录的权限4.粘滞位 1.Linux权限的概念 在生活中&#xff0c;一件事情是否允许被一个人做&#xff0c;就是叫做权限&#xff0c;权…

【Leetcode60天带刷】day32回溯算法——122.买卖股票的最佳时机II ,55. 跳跃游戏 ,45.跳跃游戏II

​ 题目&#xff1a; 122. 买卖股票的最佳时机 II 给你一个整数数组 prices &#xff0c;其中 prices[i] 表示某支股票第 i 天的价格。 在每一天&#xff0c;你可以决定是否购买和/或出售股票。你在任何时候 最多 只能持有 一股 股票。你也可以先购买&#xff0c;然后在 同一…

MYSQL数据库应用中的17个关键问题

一、单Master 单Master的情况是普遍存在的&#xff0c;对于很多个人站点、初创公司、小型内部系统&#xff0c;考虑到成本、更新频率、系统重要性等问题&#xff0c;系统只依赖一个单例数据库提供服务&#xff0c;基本上已经满足需求。这种场景下我觉得重点应该关注的话题有上图…

图像预处理 Tricks【1】:Contours

系列文章目录 文章目录 系列文章目录前言1. cv2.findContours()1.1. 方法概述1.2. cv2.findContours()1.2.1. 轮廓检索模式1.2.2. 轮廓逼近方法 2. cv2.drawContours()2.1. 方法概述2.2. cv2.drawContours() 3. cv2.contourArea()3.1. 方法概述3.2. cv2.contourArea()3.3. 存在…

java springboot整合MyBatis联合查询

前面文章 java springboot整合MyBatis做数据库查询操作写了springboot整合MyBatis的方法 并演示了基础查询的语法 根据id查 那么 我们这次来演示联合查询 我们staff 表 内容如下 每条数据 对应的都有一个departmentid 这是 department部门表的外键id department表内容如下 如…

Redis 分布式缓存

分布式缓存 单点 Redis 的问题及解决 数据丢失&#xff1a;实现Redis数据持久化并发能力&#xff1a;搭建主从集群&#xff0c;实现读写分离存储能力&#xff1a;搭建分片集群&#xff0c;利用插槽机制实现动态扩容故障恢复能力&#xff1a;利用哨兵机制&#xff0c;实现健康…

Linux系统编程(进程基础知识讲解)

文章目录 前言一、进程的概念二、进程的生命周期三、进程树四、进程的创建五、一个进程可以执行几个程序&#xff1f;六、子进程中调用execve函数总结 前言 本篇文章来讲解Linux中的进程&#xff0c;进程在Linux中是非常重要的一个知识点&#xff0c;掌握好进程是非常重要的。…