快速入门
刚开始我们就一步一步来, 先搞什么spring集成。
先使用原始的java代码来操作一下MQ。
这里给新手兄弟的建议,这种技术性的学习 一定要先动手,从简单的地方动手,一步一步来,不然上来就搞理论或者复杂的应用很难坚持学下去。
在空项目中 创建2个 module 配置为生产者和消费者:
消息生产者
public static void main(String[] args) throws IOException, TimeoutException {
// 设定工厂参数
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("****");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/****");
connectionFactory.setUsername("****");
connectionFactory.setPassword("****");
//建立连接
Connection con = connectionFactory.newConnection();
//创建channel
Channel channel = con.createChannel();
//创建队列
channel.queueDeclare();
}
先把该配置的配置了。 然后创建队列, 这个创建队列方法有很多参数,因为这个方法很关键 所以我们要理解一下 这些参数:
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
*
* queue 队列名称
* durable 是否持久化, 当mq重启之后 是否还在
* exclusive 是否独占:只能有一个消费者监听这个队列。 当con关闭时,是否删除队列
* autoDelete 当这个队列没有消费者的时候 自动删除
* */
//如果没有这个队列 会自动创建,如果有就不会再创建了
channel.queueDeclare("hello_world",true,false,false,null);
创建好队列之后, 紧接着就是发送消息 publish这个方法 这个参数也很多 每个参数看一下。(MQ就是这样 它想各种定制化适配不同情况 所以刚开始接触 你就会发现 到处是参数)
/**
* basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
* exchange 交换机名称(简单模式下可以传null 他会给一个默认交换机)
* routingKey 路由名称
* props 配置信息
* body 真实发送的消息数据
* */
String body = "星际2是最好的游戏";
channel.basicPublish("","hello_world",null,body.getBytes());
最后close释放资源:
//释放资源
channel.close();
connectionFactory.clone();
ok 我们直接跑一下:
ok 你会发现已经有一个队列了。不难对吧 这就是最基本的 java MQ发送到服务器。
这里我们要明白几个要点:
- MQ的解耦性, 有了MQ这个中转站 你的消息发出去放在哪里, 你就不用管了, 你不需要再担心 接收方到底收到没有。
- 创建队列的方法 相同队列他就不会再创建了
- 一定记得这种最后要close释放资源
消息消费者
消费者同理 前面的建工厂 创建队列 都一样,
到了 channel.basicConsume()
参数这个地方 它传进来一个consumer 调用回调
所谓的回调就是 它站在这里等 等到有消息的时候 它再调用这个方法。
我们来调用handleDelivery 利用匿名内部类 重写 打印出来
Consumer consumer = new DefaultConsumer(channel){
/***
* 回调方法 当收到消息后 会自动执行该方法
* consumerTag 标识
* envelope 获取信息 交换机,路由key
* properties 配置的信息
* body 数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("1:"+consumerTag);
System.out.println("2.1:"+envelope.getExchange()+" "+"2.2"+envelope.getRoutingKey());
System.out.println("3:"+properties);
System.out.println("4:"+ new String(body));
}
};
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue 队列名称
* autoAck 是否自动确认 消费者收到之后自动给mq说一声 我收到了 作为确认
* callback 回调对象 它会监听一些方法
* */
channel.basicConsume("hello_world",true,consumer);
注意消费者是不需要关close的,因为它处在一个监听状态:
1:amq.ctag-F7bTRdh6FCpw5Nr-Dks_ng
2.1: 2.2hello_world
3:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
4:星际2是最好的游戏
1:amq.ctag-F7bTRdh6FCpw5Nr-Dks_ng
2.1: 2.2hello_world
3:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
4:maru是最强的选手
1:amq.ctag-F7bTRdh6FCpw5Nr-Dks_ng
2.1: 2.2hello_world
3:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
4:P是最imba的种族
这个地方我们观察一下 首先第一个打出来的是
consumerTag:三条消息的consumerTag是相同
然后打出来的是
exchange 这个是空 因为我们生产者 发送就是空
然后是routingKey 路由名称 和我们生产者的参数相同
然后是一些配置信息
最后是实际的body数据
消息追踪
这里分享一个消息追踪插件: firehouse
在rabbitmq 管理台里面 我们可以看到这样一个 交换机:
是专门用于消息追踪的
开启追踪:
rabbitmqctl trace_on
当我们发送一条消息的时候,这个交换机也会发一条相关的更详细的消息
而且在这里我们可以讲追踪信息打到日志里面:
当然开发的同学 不需要太深究这个 这种监控一般都是运维的活
你主要知道有这么个东西 当你想要追踪消息日志的时候 知道去哪看就好了