Hello World
Hello World是官网给出的第一个模型,使用的交换机类型是直连direct,也是默认的交换机类型。
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- Queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
此模型中只有一个生产者、一个队列、一个消费者。
这种模式适合于消息任务不是很密集,并且处理任务不算太过耗时的场景。消费者消费的速度赶得上生产者生产的速度。
创建生产者
public class MyProducer {
@Test
public void test() throws Exception {
// 队列名称
String queue = "xw_queue";
String message = "Hello World -> ";
// 创建工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("xuewei.world");
factory.setUsername("xuewei");
factory.setPassword("123456");
factory.setPort(5672);
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
for (int i = 0; i < 10; i++) {
// 发布消息
channel.basicPublish("xw_exchange", queue, null, (message + i).getBytes());
}
}
}
创建消费者
public class MyConsumer {
public static void main(String[] args) throws Exception {
// 队列名称
String queue = "xw_queue";
// 创建工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("xuewei.world");
factory.setUsername("xuewei");
factory.setPassword("123456");
factory.setPort(5672);
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queue, true, false, false, null);
channel.queueBind("", "xw_exchange", queue);
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
// TODO 业务处理
}
});
}
}