【分布式计算】java消息队列机制

news2024/12/27 10:48:49

        消息队列是一种在不同组件或应用之间进行数据传递的技术,通常用于处理异步通信。它允许消息的发送者(生产者)和接收者(消费者)之间进行解耦。


概念


        消息队列是一种先进先出(FIFO)的数据结构,它存储待处理的消息直到它们被消费。消息是生产者发送给队列的数据单元,消费者则从队列中读取这些消息进行处理。


原理


1. 生产者:
   - 生产者是创建消息的实体,它负责将消息发送到队列。生产者不需要关心消息的具体处理过程,只需确保消息正确发送到队列。

2. 消息队列:
   - 消息队列充当缓冲区,暂时存储从生产者那里发送过来的消息。队列管理消息的顺序,并确保按照发送的顺序逐一传递给消费者。

3. 消费者:
   - 消费者从消息队列中读取消息,并进行相应的处理。消费者可以是同一应用的其他部分,或者是完全独立的应用。

4. 消息处理:
   - 一旦消息被消费者读取,它可以被确认和删除,或者在处理失败时重新放回队列等待再次处理。


使用场景


异步处理:当应用执行耗时任务时,可以将任务封装成消息发送到队列,由消费者异步处理。
流量控制:在高流量事件如大促销或黑色星期五时,消息队列可以帮助缓冲入站流量,防止系统过载。
解耦服务:在微服务架构中,消息队列可以帮助减少服务之间的直接依赖,通过消息传递来通信,从而提高系统的可维护性和扩展性。


Java消息队列技术

在Java中,消息队列是一种数据结构或服务,用于在不同的应用组件或系统之间异步传递消息。它支持松耦合的架构,允许发送者和接收者独立地进行开发和扩展。消息队列可以帮助缓解高负载、增强系统的可伸缩性,并提供容错机制。下面是一些常见的Java消息队列技术:

1. Apache Kafka:
        Kafka是一个分布式流处理平台,它不仅能够处理消息队列的功能,还能处理复杂的事件流。它特别适合需要高吞吐量和可靠性的大规模数据处理场景。

2. RabbitMQ:
        RabbitMQ是一个开源消息代理,支持多种消息协议。它提供灵活的路由功能,能够保证消息的可靠传输。适合于复杂的消息传递需求和多种不同的通信模式。

3. ActiveMQ:
        Apache ActiveMQ是一个强大的开源消息代理,支持多种JMS(Java Message Service)协议和客户端语言。适用于那些需要JMS标准支持的企业应用。

4. Amazon SQS (Simple Queue Service):
        SQS是一个托管的消息队列服务,提供简单的Web服务API来完全管理队列的消息传输。它能够无限扩展,并且不需要预先安装消息队列基础设施。

5. Google Cloud Pub/Sub:
        Google的Pub/Sub提供了一种全球分布式的消息传递平台,适合处理大量数据的实时交换。


这个流程图展示了使用 ActiveMQ 实现消息队列的基本步骤,包括消息的发送和接收。以下是每个步骤的详细讲解。

 1. 创建 ConnectionFactory

`ConnectionFactory` 是一个接口,用于创建连接到消息中间件(ActiveMQ)的工厂。它是创建连接的起点。

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");


2. 使用 ConnectionFactory 创建 Connection

通过 `ConnectionFactory` 创建一个连接对象 `Connection`。

Connection connection = connectionFactory.createConnection();

3. 启动 Connection

在使用连接之前,必须启动它。

connection.start();

4. 使用 Connection 创建一个或多个 JMS Session

通过 `Connection` 创建会话 `Session`。会话是生产和消费消息的上下文。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


5. 使用 Session 创建 Queue 或 Topic

通过会话创建队列(Queue)或主题(Topic)。队列用于点对点消息传递,主题用于发布/订阅消息传递。

Queue queue = session.createQueue("testQueue");
// 或者
Topic topic = session.createTopic("testTopic");

6. 使用 Session 创建 MessageProducer 或 MessageConsumer

根据需要创建消息生产者 `MessageProducer` 或消息消费者 `MessageConsumer`。

创建 MessageProducer

MessageProducer producer = session.createProducer(queue);

创建 MessageConsumer

MessageConsumer consumer = session.createConsumer(queue);

7. 发送消息

使用 `MessageProducer` 发送消息。

TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
producer.send(message);

8. 接收消息

异步接收

设置消息监听器,当有消息到达时自动触发。

consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                String text = ((TextMessage) message).getText();
                System.out.println("Received: " + text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
});

同步接收

使用 `MessageConsumer.receive()` 方法同步接收消息。

Message message = consumer.receive();
if (message instanceof TextMessage) {
    String text = ((TextMessage) message).getText();
    System.out.println("Received: " + text);
}

完整代码示例

生产者代码

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创建连接
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列
        Queue queue = session.createQueue("testQueue");

        // 创建生产者
        MessageProducer producer = session.createProducer(queue);
        // 创建消息
        TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
        // 发送消息
        producer.send(message);

        // 关闭连接
        connection.close();
    }
}

消费者代码

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创建连接
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列
        Queue queue = session.createQueue("testQueue");

        // 创建消费者
        MessageConsumer consumer = session.createConsumer(queue);

        // 同步接收消息
        Message message = consumer.receive();
        if (message instanceof TextMessage) {
            String text = ((TextMessage) message).getText();
            System.out.println("Received: " + text);
        }

        // 异步接收消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        String text = ((TextMessage) message).getText();
                        System.out.println("Received: " + text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        // 为了测试异步接收,保持程序运行一段时间
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 关闭连接
        connection.close();
    }
}

具体应用

调用百度云api

使用消息队列实现一个调用百度智能云 API 的校园卡程序有助于提高系统的可扩展性和可靠性。消息队列可以解耦生产者和消费者,并实现异步处理。

实现步骤

1. 配置消息队列:
    - 安装并配置 RabbitMQ 或 ActiveMQ。
    - 配置 Spring Boot 项目以连接到消息队列。

2. 创建生产者(Producer):
    - 接收用户上传的图片。
    - 将图片编码为 Base64 格式,并发送到消息队列。

3. 创建消费者(Consumer):
    - 监听消息队列中的消息。
    - 调用百度智能云 API 进行图片识别。
    - 将识别结果存储以便后续查询。

4. 实现控制器(Controller):
    - 提供上传图片的接口。
    - 提供获取识别结果的接口。

系统架构图

+-------------------+        +--------------------+        +-------------------+
|                   |        |                    |        |                   |
|   User Uploads    |        |   Message Queue    |        |    API Consumer   |
|   (Controller)    | -----> |  (RabbitMQ/ActiveMQ)| -----> | (Baidu API Call)  |
|                   |        |                    |        |                   |
+-------------------+        +--------------------+        +-------------------+

具体实现

  • 用户上传图片:用户通过前端页面上传图片,图片通过 RecognitionController 接收并保存到消息队列。
  • 消息队列:图片以消息的形式存储在消息队列中,保证消息的可靠传递。
  • 消息处理RecognitionListener 监听消息队列,当有新消息到达时,调用百度智能云 API 进行图片识别,并将结果保存到 RecognitionService
  • 获取结果:用户可以通过访问 /api/resultPage 来获取最新的识别结果。

代码部分:

1. 配置消息队列:

   配置 `application.properties` 以连接到消息队列(以 ActiveMQ 为例):

   spring.activemq.broker-url=tcp://localhost:61616
   spring.activemq.user=admin
   spring.activemq.password=admin
   spring.jms.pool.enabled=true

2. 创建生产者(Producer):

  package com.example.mq;

   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.jms.core.JmsTemplate;
   import org.springframework.stereotype.Service;

   @Service
   public class RecognitionService {

       @Autowired
       private JmsTemplate jmsTemplate;

       public void sendImageForRecognition(byte[] imageBytes) {
           jmsTemplate.convertAndSend("animal.recognition.queue", imageBytes);
       }

       // 其他方法...
   }

3. 创建消费者(Consumer):
 

  package com.example.mq;

   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.jms.annotation.JmsListener;
   import org.springframework.stereotype.Component;

   @Component
   public class RecognitionListener {

       @Autowired
       private RecognitionService recognitionService;

       @JmsListener(destination = "animal.recognition.queue")
       public void processImage(byte[] imageBytes) {
           String result = recognitionService.recognizeImage(imageBytes);
           recognitionService.saveResult(result);
       }
   }

4. 实现控制器(Controller):

 package com.example.mq;

   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.stereotype.Controller;
   import org.springframework.ui.Model;
   import org.springframework.web.bind.annotation.*;
   import org.springframework.web.multipart.MultipartFile;
   import org.springframework.web.servlet.view.RedirectView;

   import java.io.IOException;

   @Controller
   @RequestMapping("/api")
   public class RecognitionController {

       @Autowired
       private RecognitionService recognitionService;

       @PostMapping("/recognize")
       public RedirectView recognizeAnimal(@RequestParam("file") MultipartFile file, Model model) throws IOException {
           if (file.isEmpty()) {
               model.addAttribute("message", "File is empty");
               return new RedirectView("/errorPage.html");
           }

           byte[] bytes = file.getBytes();
           recognitionService.saveResult("等待识别结果...");
           recognitionService.sendImageForRecognition(bytes);

           return new RedirectView("/resultPage.html");
       }

       @GetMapping("/resultPage")
       @ResponseBody
       public String getResult() {
           return recognitionService.getResult();
       }
   }
 

消息队列的意义

异步处理:允许用户在上传图片后立即获得响应,而不是等待图片识别结果。
解耦:上传图片的部分和图片识别的部分可以独立开发和扩展。
负载均衡:可以轻松增加更多的消费者以处理高并发请求。
可靠性:消息队列持久化消息,确保即使在系统故障时也不会丢失消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1829598.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【react小项目】bmi-calculator

bmi-calculator 目录 bmi-calculator初始化项目01大致布局01代码 02完善样式02代码 03输入信息模块03代码 04 使用图表04代码 05详细记录信息渲染05代码 06 让数据变成响应式的06-1输入框的数据处理06-2图表,和记录信息的区域数据处理 07 删除功能,撤销功…

DeepDriving | 经典的目标检测算法:CenterNet

本文来源公众号“DeepDriving”,仅用于学术分享,侵权删,干货满满。 原文链接:经典的目标检测算法:CenterNet 1 前言 CenterNet是2019年发表的一篇文章《Objects as Points》中提出的一个经典的目标检测算法&#xf…

仓储管理系统WMS构架设计B/S和C/S:如何选?

导语 大家好,我是社长,老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 在设计仓库管理系统(WMS)时,架构的选择至关重要,因为它直接影响到系统的可用性、可维护性、灵活性…

17岁中专女生,闯进全球数学竞赛12强

今年阿里的数学竞赛结果出来了,在榜单的前列包含一个 17 岁的中专女生。 在 2018 年时,阿里巴巴达摩院发起了一个国际数学竞赛,基本每年举办一次,参赛不设报名条件,向全球所有数学爱好者开放,竞赛由阿里创…

从FasterTransformer源码解读开始了解大模型(2.1)代码通读02

从FasterTransformer源码解读开始了解大模型(2.0)代码解读02-初始化和forward 写在前面的话 本篇的内容主要是介绍ParallelGpt.cc中的代码内容,首先介绍一些初始化和工具函数,然后会从forward主函数开始介绍一部分。 零、初始化…

【ROS里程计】中部分代码解释

bool OdomNodePub::Odom_Reset(ubt_odom::odomreset::Request& req, ubt_odom::odomreset::Response& res) {if(req.cmd "reset"){OdomResetFlag true;}else{OdomResetFlag false;}res.state "success";return true; } 该函数是一个ROS节点中…

元数据、数据元、数据字典、数据模型及元模型的区别详解

在数据管理和分析领域,有许多相似的概念,如元数据、数据元、数据字典、数据模型和元模型。这些概念的定义和应用往往容易混淆。 数据元 数据元是通过一系列属性描述的数据单元,包括定义、标识、表示以及允许值等。这些属性帮助我们理解和使用…

aop注解快速实现数据脱敏返回

说明: 公司之前数据接口数据管理不严格,很多接口的敏感数据都没有脱敏处理,直接返回给前端了,然后被甲方的第三方安全漏洞扫出来,老板要求紧急处理,常用的话在单个字段上加上脱敏注解会更加的灵活&#xf…

Parallels Desktop 19 for mac破解版安装激活使用指南

Parallels Desktop 19 for Mac 乃是一款适配于 Mac 的虚拟化软件。它能让您在 Mac 计算机上同时运行多个操作系统。您可借此创建虚拟机,并于其中装设不同的操作系统,如 Windows、Linux 或 macOS。使用 Parallels Desktop 19 mac 版时,您可在 …

EarMaster7.5.74官方版安装激活使用教程

EarMaster就是你音乐路上的良师益友。这是一款来自丹麦皇家音乐学院的多媒体音乐教育软件,针对视唱练耳为音乐学生,音乐爱好者以及音乐专业人员都带来了很多的帮助,让你们可以获得音乐家般的耳朵,通过专业视唱练耳培训考试&#x…

52. QT插件开发--插件程序(带ui文件)的创建与编译

1. 说明 一般情况下,针对代码量比较小的QT程序不需要进行插件集成化开发,但是针对大型程序来说,代码结构比较复杂,使用插件开发的方式可以提高代码开发和维护效率,团队之间的分工合作也会更加的明确。所谓插件式开发,实际上就是把程序的一部分功能封装起来,编译成一个单…

Modbus为何要转成ProfiNET

Modbus与ProfiNET代表了工业通讯不同阶段的发展,各自具有优缺点。Modbus简单易用,适合小型系统;ProfiNET高效稳定,适用于大型复杂网络。转换Modbus为ProfiNET可提高系统性能和扩展性。实际场景下,升级生产线控制器为Pr…

Cisco Packet Tracer实验(四)

生成树协议(Spanning Tree Protocol) 交换机在目的地址未知或接收到广播帧时是要进行广播的。如果交换机之间存在回路/环路,那么就会产生广播循环风暴,从而严重影响网络性能。 而交换机中运行的STP协议能避免交换机之间发生广播…

Python(三)---字符串

文章目录 前言1.创建字符串2.字符串的编码3.空字符串和len()函数4.转义字符5.从控制台读取字符串6.字符串的相关操作6.1.通过[]访问元素6.2.字符串切片slice操作6.3.字符串拼接和字符串复制6.4.split()分割和join()合并6.5.常用查找方法6.6.replace() 实现字符串替换6.7.去除首…

基于CPS-SPWM链式STATCOM系统在电压不平衡环境下控制策略的simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 基于CPS-SPWM链式STATCOM系统在电压不平衡环境下控制策略的simulink建模与仿真。利用电压外环PI调节器得到有功 电流指令值结合由负载侧电流检测 到 的无功 电流指令值 &#…

GPU的工作原理

location: Beijing 1. why is GPU CPU的存储单元和计算单元的互通过慢直接促进了GPU的发展 先介绍一个概念:FLOPS(Floating Point Operations Per Second,浮点运算每秒)是一个衡量其执行浮点运算的能力,可以作为计算…

Gstreamer学习3----灌数据给管线之appsrc

参考资料 Basic tutorial 8: Short-cutting the pipeline gstreamer向appsrc发送帧画面的代码_gst appsrc可变帧率-CSDN博客 在官网教程Basic tutorial 8: Short-cutting the pipeline 里面,讲了一个例子,push音频数据给管线,视频的例子更…

归纳贪心好题

很有趣的一道归纳贪心题目 class Solution { public:int minimumAddedCoins(vector<int>& coins, int target) {sort(coins.begin(),coins.end());int n coins.size();int s 0,i0;int res 0;while(s<target){if(i<n&&coins[i]<s1)scoins[i];els…

Photoshop中图像美化工具的应用

Photoshop中图像美化工具的应用 Photoshop中的裁剪工具Photoshop中的修饰工具模糊工具锐化工具涂抹工具 Photoshop中的颜色调整工具减淡工具加深工具海绵工具 Photoshop中的修复工具仿制图章工具污点修复画笔工具修复画笔工具修补工具内容感知移动工具红眼工具 Photoshop中的裁…

Ubuntu 的 apt 相关问题

错误:1 http://mirrors.tuna.tsinghua.edu.cn/ubuntu focal InRelease Couldnt create temporary file /tmp/apt.conf.KSeTlI for passing config to apt-key 原因 无法创建配置文件 /tmp/apt.conf.KSeTlI 并传递给 apt-key apt-key 等实际上并不是直接使…