消息队列-RockMQ-Demo案例拓展输入输出渠道

news2024/12/26 21:56:38

基于Spirng Cloud Alibaba基础搭建

下面为一个Demo 生产者和消费者是一起的。
父工程pom

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <com.alibaba.cloud.version>2.2.8.RELEASE</com.alibaba.cloud.version>
    <com.cloud.version>Hoxton.SR12</com.cloud.version>
    <com.dubbo.version>2.2.7.RELEASE</com.dubbo.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${com.alibaba.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${com.cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-dubbo</artifactId>
            <version>${com.dubbo.version}</version>
        </dependency>
    </dependencies>
</dependencyManagement>

工程POM

<dependencies>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

配置文件

server:
  port: 9500
spring:
  application:
    name: rocket-demo
  cloud:
    stream:
      bindings:
        input:
          content-type: application/json
          destination: test-topic
          group: test-group
        output:
          content-type: application/json
          destination: test-topic
          group: test-group
      rocketmq:
        binder:
          name-server: ip:9876
          group: rocket-demo
@SpringBootApplication
// 绑定输入输出
@EnableBinding({Source.class, Sink.class})
public class RocketDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketDemoApplication.class, args);
    }
    // 监听这个输入通道,收到消息直接在这里打印,消费者
    @StreamListener("input")
    public void receiveInput(String receiveMsg) {
        System.out.println("input receive: " + receiveMsg);
    }
}

生产者

public static void main(String[] args) throws Exception {
      DefaultMQProducer producer = new DefaultMQProducer("producer_group");
      producer.setNamesrvAddr("ip:9876");
      producer.start();
      for (int i = 0; i < 3; i++) {
          Message msg = new Message("test-topic", "tagStr", "message from rocketmq producer".getBytes());
          producer.send(msg);
      }

  }
拓展输入输出渠道

新建自定义Sink和Source 继承Sink 和 Source,原来的渠道也会保留。

public interface CustomSink extends Sink {
    /**
     * Input channel name.
     */
    String INPUT2 = "input2";
    /**
     * @return input channel.
     */
    @Input(CustomSink.INPUT2)
    SubscribableChannel input2();
}

public interface CustomSource extends Source {
    String OUTPUT2 = "output2";

    /**
     * @return output channel
     */
    @Output(CustomSource.OUTPUT2)
    MessageChannel output2();
}

启动类

@SpringBootApplication
@EnableBinding({CustomSource.class, CustomSink.class})
public class RocketDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketDemoApplication.class, args);
    }
    @StreamListener("input")
    public void receiveInput(String receiveMsg) {
        System.out.println("input receive: " + receiveMsg);
    }
    @StreamListener("input2")
    public void receiveInputSecond(String receiveMsg) {
        System.out.println("input2 receive: " + receiveMsg);
    }
}

配置

server:
  port: 9500
spring:
  application:
    name: rocket-demo
  cloud:
    stream:
      bindings:
        input:
          content-type: application/json
          destination: test-topic
          group: test-group
        output:
          content-type: application/json
          destination: test-topic
          group: test-group
        input2:
          content-type: application/json
          destination: test-topic-second
          group: test-group-second
        output2:
          content-type: application/json
          destination: test-topic-second
          group: test-group-second
      rocketmq:
        binder:
          name-server: ip:9876
          group: rocket-demo

生产者

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_group_second");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        for (int i = 0; i < 3; i++) {
            Message msg = new Message("test-topic-second", "tagStr2", "message from rocketmq producer2".getBytes());
            producer.send(msg);
        }
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1368608.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用TrafficWatch根据PCAP文件监控和分析网络流量

关于TrafficWatch TrafficWatch是一款功能强大的网络数据包嗅探和分析工具&#xff0c;该工具能够帮助我们通过PCAP文件监控和分析目标网络中的网络通信流量。除此之外&#xff0c;该工具还可以为广大研究人员提供针对各种网络协议的内部分析功能&#xff0c;并帮助解决各种网…

初始Linux(部署项目)

Linux学什么 作为一名主学Java的小白&#xff0c;主要学一下三个部分&#xff1a; 1.基础命令 使用图形命令是使用Linux的重要基础。并且有很多好处&#xff0c;如可以节省系统、网络资源&#xff0c;能够批量处理工作等等。 2.系统编程 & 网络编程 由于Java跨平台的特…

【unity小技巧】FPS游戏实现相机的偏移震动、武器射击后退和后坐力效果

最终效果 文章目录 最终效果前言相机偏移震动相机震动脚本换弹节点震动 武器射击后退效果武器后坐力效果完结 前言 关于后坐力之前其实已经分享了一个&#xff1a;FPS游戏后坐力制作思路 但是实现起来比较复杂&#xff0c;如果你只是想要简单的实现&#xff0c;可以看看这个&…

网络通信(12)-C#TCP客户端封装帮助类实例

本文使用Socket在C#语言环境下完成TCP客户端封装帮助类的实例。 实例完成的功能: 客户端与服务器连接,实现实时刷新状态。 客户端接收服务器的数据。 客户端发送给服务器的数据。 客户端实时判定状态,断开连接后自动重连。 客户端与服务器端发送心跳包。 在VS中创建C…

Mysql 恢复误删库表数据

一、前提 1、如果你的数据库有备份文件&#xff0c;自己还原即可。 2、如果没有备份文件&#xff0c;那首先检查下你的 binlog 是否开启。如果未开启&#xff0c;那你就不用往下看了。如果开启了&#xff0c;可以往下看看。 1.1 查看位置 可以通过以下的命令查看是否开启了 bi…

机器学习:手撕 AlphaGo(二)

计算机下围棋的问题描述请见上篇&#xff1a;机器学习&#xff1a;手撕 AlphaGo&#xff08;一&#xff09;-CSDN博客 3. MCTS 算法介绍 MCTS&#xff08;Monte Carlo Tree Search&#xff09; 算法的中文名称叫做蒙特卡洛树搜 索。第一次接触这个算法时&#xff0c;便惊叹于它…

使用echarts制作柱状图、折线图,并且下方带表格

实现效果: 调试地址: https://echarts.apache.org/examples/zh/editor.html?cline-simple 源码: option { title: { left: center, top: 0, text: 2022-05月 制造产量 达成情况(单位: 吨) (图1)\n\n集团目标产量: 106,675吨 集团实际产量: 2,636吨, text…

NVIDIA官网如何下载所有历史版本的驱动,包括上古化石版本?

NVIDIA官网如何下载所有历史版本的驱动&#xff0c;包括上古化石版本&#xff1f; 1.软件环境⚙️2.问题描述&#x1f50d;3.解决方法&#x1f421;4.结果预览&#x1f914; 1.软件环境⚙️ Windows10 教育版64位 GeForce GTX 1060 (Notebooks) Chrome 120.0.6099.199&#xff…

Linux限制用户可用硬盘空间

为了防止某个用户占用大量资源导致其他用户无法正常使用&#xff0c;一般会对单个用户可占用资源进行限制。就磁盘限额&#xff0c;XFS文件系统原生支持目录级别的限制。ext文件系统不支持目录限制&#xff0c;曲线方式是限制用户的总占用空间。 本文介绍使用quota程序限制用户…

【银行测试】金融项目测试注意点汇总,一篇带你不再背锅

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、数据保护 在测…

vue3 +TS 安装使用router路由模块

一.安装 1.下载安装依赖 npm install vue-routernextnpm install types/vue-router2.router目录创建 在src 目录下 创建 /src/router文件夹 包含两个文件 route.ts import { RouteRecordRaw } from vue-routerconst routes: Array<RouteRecordRaw> [{path: /,name:…

Pytorch种torch.cat与torch.stack的区别

torch.cat 和 torch.stack 是 PyTorch 中用于拼接张量的两个不同的函数&#xff0c;它们的主要区别在于拼接的方式和创建的维度。 torch.cat&#xff1a; 拼接方式&#xff1a; torch.cat 是按照给定的维度&#xff08;dim 参数&#xff09;将多个张量沿着该维度拼接。在拼接的…

快手在线查权重源码,附带查询接口

源码介绍 新增了用户访问 IP 和时间的统计功能。要使用此功能&#xff0c;只需将“bygoukai.sql”数据库导入源码中&#xff0c;然后修改“config.php”文件中的数据库用户名、密码和数据库名即可。如果需要修改水印&#xff0c;可以在第40行进行更改。要修改查询限制&#xf…

Qt连接数据库(内含完整安装包)

遇到问题必须多思考 这里是最全的Qt连接数据库步骤 qt下载地址 链接&#xff1a;https://pan.baidu.com/s/1wdnTfyL9MQlNOCrSmIOxrQ?pwddgqi 提取码&#xff1a;dgqi --来自百度网盘超级会员V1的分享 数据库百度网盘地址 链接&#xff1a;https://pan.baidu.com/s/1orCczey…

C#VS2022 打包成安装包

步骤参考&#xff1a;VisualStudio&#xff08;2022&#xff09;- 打包项目文件为.exe安装包_vs2022打包exe-CSDNja 步骤参考上方链接&#xff0c;不过在Application Folder文件夹中加的是\项目名称\bin\Debug\下的全部文件&#xff0c;其他地方一样。 最终生成的安装包在Deb…

蓝桥杯省赛无忧 竞赛常用库函数 课件5 排序

01 sort简介 02 sort的用法 sort(起始地址&#xff0c;结束地址的下一位,比较函数);默认用小于号#include<bits/stdc.h> using namespace std; int main(){int a[1000];int n;//读取数组大小cin>>n;//读取元素for(int i1;i<n;i)cin>>a[i];//对数组进行排…

记录一次接近24万条数据导入Mysql的过程

由于开发项目的需求&#xff0c;之前有部分数据要写入阿里云的表格存储&#xff0c;过了一年多时间&#xff0c;表A的数据量接近24万条&#xff0c;现在需要将表A的数据转到Mysql中。 利用官方工具导出数据后&#xff0c;发现文件里面有238999条数据&#xff0c;文件大小是460…

C++之​虚函数

虚函数是C中的一个重要概念&#xff0c;它主要用于实现多态。在基类中声明一个虚函数&#xff0c;派生类可以重写这个函数&#xff0c;从而实现不同的功能。当基类指针或引用指向派生类对象时&#xff0c;调用虚函数会根据实际对象类型来调用相应的派生类中的函数实现&#xff…

DVWA-Hight-DOM型XSS漏洞

首先打开hight模块的DVWA,并来到DOM型XSS漏洞处 首先试探 这里普通的js代码被过滤 再利用img试探 同样被过滤 这里后端代码不太可能将所有可能利用黑名单的形式全部写入过滤代码中&#xff0c;所以这里后端的过滤代码大概率是白名单&#xff0c;也就是除了这个下拉列表中的名单…

Excel5:自动化周报的制作

自动化周报的数据引用来源于8月成交数据-纯数值表格&#xff0c;因为8月成交数据表格中部分单元格中有vlookup函数&#xff0c;且存在跨表连接。 对于跨表连接的解释和说明&#xff1f; 首先打开我们之前做好的成交数据。打开后我们可以看到这上面出现了一个安全警告&#xff0…