内窥镜消息队列发送消息原理
目的
有一个多线程的Java应用程序,使用消息队列来处理命令
时序图
@startuml
actor User
participant "sendCmdWhiteBalance()" as Controller
participant CommandConsumer
participant MessageQueue
User -> Controller: 调用sendCmdWhiteBalance()
Controller -> MessageQueue: 将白平衡命令加入队列
activate MessageQueue
Controller -> CommandConsumer: 等待命令完成
activate CommandConsumer
CommandConsumer -> MessageQueue: 从队列中获取命令
MessageQueue -> CommandConsumer: 返回命令
CommandConsumer -> CommandConsumer: 执行命令
CommandConsumer -> Controller: 命令执行完成
deactivate CommandConsumer
Controller -> User: 返回成功响应
@enduml
线程安全和性能优化
使用阻塞队列(ArrayBlockingQueue):
使用ArrayBlockingQueue是因为它是一种线程安全的队列数据结构,它在多线程环境中提供了同步。
它的核心原理是基于锁和条件变量,它能够安全地协调不同线程之间的操作,确保数据的正确性。这是在多线程编程中一种重要的工具,它避免了显式的锁管理,降低了错误的可能性,提高了代码的可维护性。
使用blockingQueue.take():
调用blockingQueue.take()的主要目的是防止线程空转(busy-waiting)。
在没有消息进入队列的情况下,如果你使用简单的轮询(如poll()或isEmpty()检查),线程可能会在空循环中浪费大量的CPU时间。而take()方法会使线程在队列为空时进入阻塞状态,直到队列中有数据可以被取出。这降低了线程的 CPU 消耗,有效地减少了开销,使得线程能够更有效地等待新消息。
Java代码
下面是我的代码
@Component
@Slf4j
public class QueueJob {
@PostConstruct
public void QueueJob() {
log.info("消费者开启****************************************************************************************************");
if (System.getProperty("os.name") != null && System.getProperty("os.name").toLowerCase().startsWith("windows")) {
return;
}
log.info("消费者开启");
// 创建消息队列 (MessageQueue) 实例和消费者线程 (CommandConsumer)
CommandConsumer commandConsumer = new CommandConsumer(messageQueue);
commandConsumer.start();
}
}
@Slf4j
public class CommandConsumer extends Thread {
private final MessageQueue messageQueue;
public CommandConsumer(MessageQueue messageQueue) {
log.info("放入参数");
this.messageQueue = messageQueue;
}
@Override
public void run() {
log.info("进入消费者模块中***********************************");
while (true) {
/**
* 初始化完成后在执行命令
* @author lst
* @date 2023/8/8 9:18
*/
if (!initStatus.get()) {
continue;
}
log.info("run****************************************");
CommandQueue commandQueue;
try {
commandQueue = messageQueue.takeFromQueue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (commandQueue != null) {
// 调用 sendMsg 方法发送指令,等待响应成功
try {
log.info("sendMsg***********************************");
sendMsg(commandQueue.getCommand());
log.info("success***********************************");
} catch (Exception e) {
catchSendMsgException(commandQueue.getMethod(), e);
} finally {
handleCountDownLatch(commandQueue.getMethod());
}
}
}
}
@Slf4j
public class MessageQueue {
/**
* 顺序发送
* 有应答才能发送下一条指令
*
* @author lst
* @date 2023/8/7 13:45
* @param null
* @return null
*/
private final BlockingQueue<CommandQueue> blockingQueue = new ArrayBlockingQueue<>(30, true);
public void addToQueue(CommandQueue c) {
log.info("添加队列中" + c.toString());
if (!blockingQueue.offer(c)) {
throw new IllegalStateException("队列已满,无法接受新的指令");
}
}
public CommandQueue takeFromQueue() throws InterruptedException {
return blockingQueue.take();
}
public int getQueueFreeSize() {
return blockingQueue.remainingCapacity();
}
}
基于上面我调用白平衡接口,会先将白平衡命令加入到阻塞队列中,阻塞队列中while循环持续消费
@Operation(summary = "白平衡")
@ConcurrentControl
@GetMapping("sendCmdWhiteBalance")
public CommonResult sendCmdWhiteBalance() throws InterruptedException {
String key = WHITEBALANCE;
messageQueue.addToQueue(new CommandQueue("sendCmdWhiteBalance", key));
cmdWhiteBalanceCountDownLatch.await();
cmdWhiteBalanceCountDownLatch = new CountDownLatch(1);
if (cmdWhiteBalanceCountDownLatchException != null) {
Exception e = cmdWhiteBalanceCountDownLatchException;
cmdWhiteBalanceCountDownLatchException = null;
throw new RuntimeException(e);
}
return CommonResult.success();
}