ActiveMQ

news2024/11/23 15:36:15

ActiveMQ

安装

下载网址:ActiveMQ

一定要和自己安装的jdk版本匹配,不然会报错

在这里插入图片描述
下载到本地之后解压缩

在这里插入图片描述
在这里插入图片描述
有可能端口号被占用

在这里插入图片描述

解除端口号占用,参考:Windows_端口被占用

打开cmd

查询所有的端口号

netstat -nao

在这里插入图片描述

查询指定端口号

netstat -ano|findstr 5672

查询什么程序在占用

tasklist | findstr "4756"

在这里插入图片描述

打开任务管理器看看这个程序

在这里插入图片描述

erl.exe是什么进程

Erlang 的执行程序,Erlang一种编程语言,多用于并发和分布式系统,现在最广泛使用在消息队列里面。

安装RabbitMQ的好像就是安装的这个,看来是RabbitMQ把这个端口给占用掉了,换个端口号

在这里插入图片描述

将5672改成55672

在这里插入图片描述

网址:http://localhost:8161/admin/

用户名:admin

密码:admin

在这里插入图片描述
成功。

使用

参考:消息队列之 ActiveMQ

在这里插入图片描述

Java访问ActiveMQ实例

在这里插入图片描述

引入依赖
		<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.2</version>
        </dependency>
消息生产者
package mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicPublisher {

    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默认连接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //创建连接
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话,不需要事务
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建 Topic,用作消费者订阅消息
            Topic myTestTopic = session.createTopic("activemq-topic-test1");
            //消息生产者
            MessageProducer producer = session.createProducer(myTestTopic);

            for (int i = 1; i <= 3; i++) {
                TextMessage message = session.createTextMessage("发送消息 " + i);
                producer.send(myTestTopic, message);
            }

            //关闭资源
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
消息消费者
package mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {

    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默认连接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //创建连接
            Connection connection = connectionFactory.createConnection();
            //开启连接
            connection.start();
            //创建会话,不需要事务
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建 Topic
            Topic myTestTopic = session.createTopic("activemq-topic-test1");

            MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者1 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
            messageConsumer2.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者2 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
            messageConsumer3.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者3 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            //让主线程休眠100秒,使消息消费者对象能继续存活一段时间从而能监听到消息
            Thread.sleep(100 * 1000);
            //关闭资源
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


gitee:JAVA集成AcitveMQ

启动ActiveMQ服务器

在这里插入图片描述

启动

在这里插入图片描述

Spring整合ActiveMQ

引入依赖
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.2</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.10.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.0</version>
</dependency>

Spring配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <!--这里要改base-package="基础包"-->
    <!--下面还有要改的,看注释-->
    <context:component-scan base-package="com.example.spring_activemq.activeMQ"/>

    <!--连接池-->
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://localhost:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--缓存-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--获取连接、会话等对象-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--消息转换器-->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <!--实际发送和接收消息的目的地-->
    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-queue"/>
    </bean>

    <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>

    <!--这里也要改class=""-->
    <!--队列消息下的监视器-->
    <bean id="queueListener" class="com.example.spring_activemq.activeMQ.QueueListener"/>
    <!--主题模式下的接收器-->
    <bean id="topic1Listener" class="com.example.spring_activemq.activeMQ.Topic1Listener"/>
    <bean id="topic2Listener" class="com.example.spring_activemq.activeMQ.Topic2Listener"/>

    <!--将消息监视器绑定到具体的消息目的地上-->
    <bean id="queueContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testQueue"/>
        <property name="messageListener" ref="queueListener"/>
    </bean>

    <bean id="topic1Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!--缓存-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--目的地-->
        <property name="destination" ref="testTopic"/>
        <!--监视器-->
        <property name="messageListener" ref="topic1Listener"/>
    </bean>

    <bean id="topic2Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic2Listener"/>
    </bean>

</beans>

消息服务类
package com.example.spring_activemq.activeMQ;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

@Service
public class MessageService {

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    @Resource(name = "testQueue")
    private Destination testQueue;

    @Resource(name = "testTopic")
    private Destination testTopic;

    //向队列发送消息
    public void sendQueueMessage(String messageContent) {
        jmsTemplate.send(testQueue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                // 设置消息内容
                msg.setText(messageContent);
                return msg;
            }
        });

    }

    //向主题发送消息
    public void sendTopicMessage(String messageContent) {
        jmsTemplate.send(testTopic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                // 设置消息内容
                msg.setText(messageContent);
                return msg;
            }
        });

    }
}

消息监听器类

队列监听器

package com.example.spring_activemq.activeMQ;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("队列监听器接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}

订阅消息监听器

package com.example.spring_activemq.activeMQ;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic1Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("主题监听器1 接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}

package com.example.spring_activemq.activeMQ;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic2Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("主题监听器2 接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}

启动应用
package com.example.spring_activemq.activeMQ;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartApplication {
    public static void main(String[] args) {
    	//这里总是显示空指针,找半天也没找出来哪里错了,不找了,等以后学学再找。
        ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring-context.xml");
        MessageService messageService = (MessageService) ctx.getBean("messageService");

        messageService.sendQueueMessage("我的测试消息1");
        messageService.sendTopicMessage("我的测试消息2");
        messageService.sendTopicMessage("我的测试消息3");
    }

}

gitee:Spring集成ActiveMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1148105.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于STM32的示波器信号发生器设计

**单片机设计介绍&#xff0c;基于STM32的示波器信号发生器设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序文档 六、 文章目录 一 概要 基于STM32的示波器信号发生器是一种高性能的电子仪器&#xff0c;用于测试和分析电路中的电信号。在该系统中&a…

ZKP7.1 Polynomial Commitments Based on Error-correcting Codes (Background)

ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 7: Polynomial Commitments Based on Error-correcting Codes (Yupeng Zhang) Recall: common paradigm for efficient SNARK A polynomial commitment scheme A polynomial interactive oracle proof (IOP) SNARK for gene…

AD9371 官方例程HDL详解之JESD204B RX侧格式配置及各层主要功能

AD9371 系列快速入口 AD9371ZCU102 移植到 ZCU106 &#xff1a; AD9371 官方例程构建及单音信号收发 采样率和各个时钟之间的关系 &#xff1a; AD9371 官方例程HDL详解之JESD204B TX侧时钟生成 &#xff08;三&#xff09; 参考资料&#xff1a; UltraScale Architecture G…

Jenkins git 克隆代码超时问题解决

目录 一、问题描述二、解决方案方式一&#xff1a;手动配置超时时间方式二&#xff1a;浅克隆&#xff08;推荐&#xff09; 一、问题描述 在使用 Jenkins 首次进行服务部署的时候&#xff0c;如果我们项目的 .git 文件夹太大&#xff0c;可能会导致 git clone 失败。 在 Jen…

当『后设学习』碰上『工程学思维』

只要我成为一个废物&#xff0c;就没人能够利用我&#xff01; 雷猴啊&#xff0c;我是一只临期程序猿。打过几年工&#xff0c;写过几行代码。但今天我不想聊代码&#xff0c;我们聊聊学习这件事。 技术年年更新&#xff0c;尤其是前端框架&#xff0c;很多时候觉得学习速度都…

leetcode-栈与队列

C中stack 是容器么&#xff1f; 栈&#xff0c;队列往往不被归类为容器&#xff0c;而被归类为container adapter(容器适配器)。因为由底层的容器实现&#xff0c;同时使用适配器模式的设计模式&#xff0c;封装了一层。 我们使用的stack是属于哪个版本的STL&#xff1f;SGI ST…

【Java 进阶篇】解决Java Web应用中请求参数中文乱码问题

在Java Web应用开发中&#xff0c;处理请求参数时经常会遇到中文乱码的问题。当浏览器向服务器发送包含中文字符的请求参数时&#xff0c;如果不正确处理&#xff0c;可能会导致乱码问题&#xff0c;使得参数无法正确解析和显示。本文将详细探讨Java Web应用中请求参数中文乱码…

微信小程序 php java nodejs python课堂学生考勤签到系统zng1p

课堂考勤也是学校的核心&#xff0c;是必不可少的一个部分。在学校的整个教学行业中&#xff0c;学生担负着最重要的角色。为满足如今日益复杂的管理需求&#xff0c;各类基于微信小程序也在不断改进。本课题所设计的基于微信小程序的课堂考勤系统&#xff0c;使用微信开发者与…

shell中的运算

目录 1.运算符号 2.运算指令 练习 1.运算符号 运算符号意义加法-减法*乘法/除法%除法后的余数**乘方自加一- -自减一<小于<小于等于>大于>大于等于等于ji ->jji*j*i->jj*i/j/i->jj/i%j%i->jj%i 2.运算指令 (()) //((a12))let //let a12 …

JavaScript_Pig Game保存当前分数

上个文章我们基本上完成了摇色子和切换当前玩家的功能。 现在我们开始写用户选择不再摇骰子的话&#xff0c;我们将用户的当前分数存入到持有分数中&#xff01; ● 首先我们应该利用一个数组去存储两个用户的分数 const scores [0, 0];● 接着我们利用数组来对分数进行累…

SHCTF2023 山河CTF Reverse方向[Week1]全WP 详解

文章目录 [WEEK1]ez_asm[WEEK1]easy_re[WEEK1]seed[WEEK1]signin[WEEK1]easy_math[WEEK1]ez_apk [WEEK1]ez_asm 从上往下读&#xff0c;第一处是xor 1Eh&#xff0c;第二处是sub 0Ah&#xff1b;逆向一下先加0A后异或1E 写个EXP data "nhuo[M7mc7uhc$7midgbTf7$7%#ubf7 …

nginx请求时找路径问题

nginx请求时找路径问题 你是否遇到过这样的情况&#xff1a; 当你安装了nginx的时候&#xff0c;为nginx配置了如下的location&#xff0c;想要去访问路径下面的内容&#xff0c;可是总是出现404&#xff0c;找不到文件&#xff0c;这是什么原因呢&#xff0c;今天我们就来解…

OpenAI : GPT-4 发布更新,整合了画图、插件、代码等能力

本心、输入输出、结果 文章目录 OpenAI : GPT-4 发布更新,整合了画图、插件、代码等能力前言GPT-4 的复合能力更新中的 automatic (自动的)获取天气我们看看讯飞星火的表现放大后内容并不是我们想要的我们看看百度文心一言的表现弘扬爱国精神OpenAI : GPT-4 发布更新,整合…

CSS3中的字体和文本样式

CSS3优化了CSS 2.1的字体和文本属性&#xff0c;同时新增了各种文字特效&#xff0c;使网页文字更具表现力和感染力&#xff0c;丰富了网页设计效果&#xff0c;如自定义字体类型、更多的色彩模式、文本阴影、生态生成内容、各种特殊值、函数等。 1、字体样式 字体样式包括类…

画个哆啦A梦吧

可自定义名字 源代码 #!/usr/bin/python # -*- coding:utf-8 -*-# from turtle import * import turtle as t# 无轨迹跳跃 def my_goto(x, y):t.penup()t.goto(x, y

多个相同地址的I2C设备,如何挂载在同一条总线上

前言 &#xff08;1&#xff09;如果有嵌入式企业需要招聘湖南区域日常实习生&#xff0c;任何区域的暑假Linux驱动实习岗位&#xff0c;可C站直接私聊&#xff0c;或者邮件&#xff1a;zhangyixu02gmail.com&#xff0c;此消息至2025年1月1日前均有效 &#xff08;2&#xff0…

提高抖音小店用户黏性和商品销量的有效策略

抖音小店是抖音平台上的电商模式&#xff0c;用户可以在抖音上购买各类商品。要提高用户黏性和商品销量&#xff0c;四川不若与众帮你整理了需要注意以下几个方面。 首先&#xff0c;提供优质的商品和服务。在抖音小店中&#xff0c;用户会通过观看商品展示视频和用户评价来选…

M1安装OpenPLC Editor

下载OpenPLC Editor for macOS.zip文件后&#xff0c;使用tar -zvxf命令解压&#xff0c;然后将"OpenPLC Editor"拖入到"应用程序"文件夹 右键点击"OpenPLC Editor"&#xff0c;打开这个""文件&#xff0c;替换为以下内容 #!/bin/bash…

No174.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…