1.消息队列
- 消息: 在应用间传送的数据
- 队列,先进先出
1.2. 作用
- 好处:解耦, 容错,削峰
- 坏处:降低系统可用性,系统复杂度提高,一致性问题;
RabbitMQ组成部分:生产者,消费者,队列,交换机;
2. 安装部署rabbitmq
---
apiVersion: v1
kind: Secret
metadata:
name: rabbitmq-secret
namespace: rabbitmq
data:
username: YWRtaW4K
password: MTIzNDU2Cg==
type: Opaque
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rabbitmq
namespace: rabbitmq
labels:
app: rabbitmq
spec:
replicas: 1
selector:
matchLabels:
app: rabbitmq
serviceName: rabbitmq-headless
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: registry.cn-hangzhou.aliyuncs.com/yuanli123/rabbitmq:3.9.22-management
ports:
- name: tcp-5672
containerPort: 5672
protocol: TCP
- name: tcp-15672
containerPort: 15672
protocol: TCP
# 不知道为什么自己使用的username会多出一个回车字符导致rabbitmq无法识别到
# env:
# - name: RABBITMQ_DEFAULT_USER
# valueFrom:
# secretKeyRef:
# name: rabbitmq-secret
# key: username
# - name: RABBITMQ_DEFAULT_PASS
# valueFrom:
# secretKeyRef:
# name: rabbitmq-secret
# key: password
resources:
limits:
cpu: '1'
memory: '2Gi'
requests:
cpu: '200m'
memory: '500Mi'
imagePullSecrets:
- name: regcred
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-headless
namespace: rabbitmq
labels:
app: rabbitmq
spec:
ports:
- name: tcp-rabbitmq-5672
port: 5672
targetPort: 5672
nodePort: 32672
selector:
app: rabbitmq
type: NodePort
---
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-external
namespace: rabbitmq
labels:
app: rabbitmq-external
spec:
ports:
- name: http-rabbitmq-external
protocol: TCP
port: 15672
targetPort: 15672
selector:
app: rabbitmq
type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: rabbitmq-ingress
namespace: rabbitmq
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /
spec:
ingressClassName: nginx
rules:
- host: rabbitmq.liyuan.com
http:
paths:
- backend:
service:
name: rabbitmq-external
port:
number: 15672
pathType: Prefix
path: /
根据上述yaml,再结合修改 /etc/hosts 文件
通过 http://rabbitmq.liyuan.com:30001/#/exchanges 访问
并暴露了 192,168.31.175:32672 用于发消息
2.1.名词解释
- Broker: 接收和分发消息的应用
- Virtual Host: 虚拟主机,一个Broker可以有多个Virtual Host, 每个Virtual Host都有自己一套的Exchange和Queue
- Connection: 生产者/消费者和Broker之间的TCP链接
- Channel: 发送消息的通道,channel是在connection内部建立逻辑链接,AMQP method包含了channel id帮助客户端和message Broker识别Broker,减少建立TCP Connection的开销;
- Exchange:message到达broker的第一站,根据分发规则,查询表中的routing key,分发消息到queue中去,常用类型有:direct, topic, fanout(multicast)
- Queue: 存放消息的队列
- Binding:Exchange和Queue之间的虚拟链接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据;
3.使用测试
3.1.pom.xml
# pom.yaml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rqbbitmq-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
</dependencies>
</project>
3.2.生产者Producer
// Producer.java
package com.liyuan.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.31.175");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(32672);
try (Connection connection = connectionFactory.newConnection()) {
Channel channel = connection.createChannel();
String exchangeName = "xc_exchange_name";
AMQP.Exchange.DeclareOk exchangeDeclare = channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
String queueName = "xc_queue_name";
AMQP.Queue.DeclareOk queueDeclare = channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, queueName);
String message = "Hello, my name is liyuan.";
channel.basicPublish(exchangeName, queueName, null, message.getBytes());
channel.close();
}
}
}
3.3.消费者Consumer
package com.liyuan.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.31.175");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(32672);
try (Connection connection = connectionFactory.newConnection()) {
Channel channel = connection.createChannel();
String exchangeName = "xc_exchange_name";
String queueName = "xc_queue_name";
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("Delivered consuming: " + consumerTag + " " + new String(message.getBody()));
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("Canceled: " + consumerTag);
}
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
channel.close();
}
}
}
4.rabbitmq交换机态度
https://www.bilibili.com/video/BV1Am4y1z7Tu/?p=9&spm_id_from=pageDriver&vd_source=19a7becebd650259d15b703d8e74edef